Seastar
High performance C++ framework for concurrent servers
stream.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2014 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/future.hh>
25 #include <seastar/util/modules.hh>
26 #ifndef SEASTAR_MODULE
27 #include <exception>
28 #include <functional>
29 #include <cassert>
30 #endif
31 
32 namespace seastar {
33 
34 SEASTAR_MODULE_EXPORT_BEGIN
35 
36 // A stream/subscription pair is similar to a promise/future pair,
37 // but apply to a sequence of values instead of a single value.
38 //
39 // A stream<> is the producer side. It may call produce() as long
40 // as the future<> returned from the previous invocation is ready.
41 // To signify no more data is available, call close().
42 //
43 // A subscription<> is the consumer side. It is created by a call
44 // to stream::listen(). Calling subscription::start(),
45 // which registers the data processing callback, starts processing
46 // events. It may register for end-of-stream notifications by
47 // chaining the when_done() future, which also delivers error
48 // events (as exceptions).
49 //
50 // The consumer can pause generation of new data by returning
51 // a non-ready future; when the future becomes ready, the producer
52 // will resume processing.
53 
54 template <typename... T>
55 class stream;
56 
57 template <typename... T>
58 class subscription;
59 
60 template <typename... T>
61 class stream {
62 public:
63  using next_fn = noncopyable_function<future<> (T...)>;
64 
65 private:
66  promise<> _done;
67  promise<> _ready;
68  next_fn _next;
69 
73  void start(next_fn next) {
74  _next = std::move(next);
75  _ready.set_value();
76  }
77 
78 public:
79  stream() = default;
80  stream(const stream&) = delete;
81  stream(stream&&) = delete;
82  void operator=(const stream&) = delete;
83  void operator=(stream&&) = delete;
84 
85  // Returns a subscription that reads value from this
86  // stream.
87  subscription<T...> listen() {
88  return subscription<T...>(this);
89  }
90 
91  // Returns a subscription that reads value from this
92  // stream, and also sets up the listen function.
93  subscription<T...> listen(next_fn next) {
94  start(std::move(next));
95  return subscription<T...>(this);
96  }
97 
98  // Becomes ready when the listener is ready to accept
99  // values. Call only once, when beginning to produce
100  // values.
101  future<> started() {
102  return _ready.get_future();
103  }
104 
105  // Produce a value. Call only after started(), and after
106  // a previous produce() is ready.
107  future<> produce(T... data);
108 
109  // End the stream. Call only after started(), and after
110  // a previous produce() is ready. No functions may be called
111  // after this.
112  void close() {
113  _done.set_value();
114  }
115 
116  // Signal an error. Call only after started(), and after
117  // a previous produce() is ready. No functions may be called
118  // after this.
119  template <typename E>
120  void set_exception(E ex) {
121  _done.set_exception(ex);
122  }
123 
124  friend class subscription<T...>;
125 };
126 
127 template <typename... T>
129  stream<T...>* _stream;
130  future<> _done;
131  explicit subscription(stream<T...>* s) : _stream(s), _done(s->_done.get_future()) {
132  }
133 
134 public:
135  using next_fn = typename stream<T...>::next_fn;
136  subscription(subscription&& x) : _stream(x._stream), _done(std::move(x._done)) {
137  x._stream = nullptr;
138  }
139 
143  void start(next_fn next) {
144  return _stream->start(std::move(next));
145  }
146 
147  // Becomes ready when the stream is empty, or when an error
148  // happens (in that case, an exception is held).
149  future<> done() {
150  return std::move(_done);
151  }
152 
153  friend class stream<T...>;
154 };
155 SEASTAR_MODULE_EXPORT_END
156 
157 template <typename... T>
158 inline
159 future<>
160 stream<T...>::produce(T... data) {
161  auto ret = futurize_invoke(_next, std::move(data)...);
162  if (ret.available() && !ret.failed()) {
163  // Native network stack depends on stream::produce() returning
164  // a ready future to push packets along without dropping. As
165  // a temporary workaround, special case a ready, unfailed future
166  // and return it immediately, so that then_wrapped(), below,
167  // doesn't convert a ready future to an unready one.
168  return ret;
169  }
170  return ret.then_wrapped([this] (auto&& f) {
171  try {
172  f.get();
173  } catch (...) {
174  _done.set_exception(std::current_exception());
175  // FIXME: tell the producer to stop producing
176  throw;
177  }
178  });
179 }
180 }
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:982
void set_exception(std::exception_ptr &&ex) noexcept
Marks the promise as failed.
Definition: future.hh:990
Definition: stream.hh:61
Definition: stream.hh:128
void start(next_fn next)
Start receiving events from the stream.
Definition: stream.hh:143
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1917
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
Definition: noncopyable_function.hh:37