Seastar
High performance C++ framework for concurrent servers
stream.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2014 Cloudius Systems, Ltd.
20  */
21 
22 #pragma once
23 
24 #include <seastar/core/future.hh>
25 #include <exception>
26 #include <functional>
27 #include <cassert>
28 
29 namespace seastar {
30 
31 // A stream/subscription pair is similar to a promise/future pair,
32 // but apply to a sequence of values instead of a single value.
33 //
34 // A stream<> is the producer side. It may call produce() as long
35 // as the future<> returned from the previous invocation is ready.
36 // To signify no more data is available, call close().
37 //
38 // A subscription<> is the consumer side. It is created by a call
39 // to stream::listen(). Calling subscription::start(),
40 // which registers the data processing callback, starts processing
41 // events. It may register for end-of-stream notifications by
42 // chaining the when_done() future, which also delivers error
43 // events (as exceptions).
44 //
45 // The consumer can pause generation of new data by returning
46 // a non-ready future; when the future becomes ready, the producer
47 // will resume processing.
48 
49 template <typename... T>
50 class stream;
51 
52 template <typename... T>
53 class subscription;
54 
55 template <typename... T>
56 class stream {
57 public:
58  using next_fn = noncopyable_function<future<> (T...)>;
59 
60 private:
61  promise<> _done;
62  promise<> _ready;
63  next_fn _next;
64 
68  void start(next_fn next) {
69  _next = std::move(next);
70  _ready.set_value();
71  }
72 
73 public:
74  stream() = default;
75  stream(const stream&) = delete;
76  stream(stream&&) = delete;
77  void operator=(const stream&) = delete;
78  void operator=(stream&&) = delete;
79 
80  // Returns a subscription that reads value from this
81  // stream.
82  subscription<T...> listen() {
83  return subscription<T...>(this);
84  }
85 
86  // Returns a subscription that reads value from this
87  // stream, and also sets up the listen function.
88  subscription<T...> listen(next_fn next) {
89  start(std::move(next));
90  return subscription<T...>(this);
91  }
92 
93  // Becomes ready when the listener is ready to accept
94  // values. Call only once, when beginning to produce
95  // values.
96  future<> started() {
97  return _ready.get_future();
98  }
99 
100  // Produce a value. Call only after started(), and after
101  // a previous produce() is ready.
102  future<> produce(T... data);
103 
104  // End the stream. Call only after started(), and after
105  // a previous produce() is ready. No functions may be called
106  // after this.
107  void close() {
108  _done.set_value();
109  }
110 
111  // Signal an error. Call only after started(), and after
112  // a previous produce() is ready. No functions may be called
113  // after this.
114  template <typename E>
115  void set_exception(E ex) {
116  _done.set_exception(ex);
117  }
118 
119  friend class subscription<T...>;
120 };
121 
122 template <typename... T>
124  stream<T...>* _stream;
125  future<> _done;
126  explicit subscription(stream<T...>* s) : _stream(s), _done(s->_done.get_future()) {
127  }
128 
129 public:
130  using next_fn = typename stream<T...>::next_fn;
131  subscription(subscription&& x) : _stream(x._stream), _done(std::move(x._done)) {
132  x._stream = nullptr;
133  }
134 
138  void start(next_fn next) {
139  return _stream->start(std::move(next));
140  }
141 
142  // Becomes ready when the stream is empty, or when an error
143  // happens (in that case, an exception is held).
144  future<> done() {
145  return std::move(_done);
146  }
147 
148  friend class stream<T...>;
149 };
150 
151 template <typename... T>
152 inline
153 future<>
154 stream<T...>::produce(T... data) {
155  auto ret = futurize_invoke(_next, std::move(data)...);
156  if (ret.available() && !ret.failed()) {
157  // Native network stack depends on stream::produce() returning
158  // a ready future to push packets along without dropping. As
159  // a temporary workaround, special case a ready, unfailed future
160  // and return it immediately, so that then_wrapped(), below,
161  // doesn't convert a ready future to an unready one.
162  return ret;
163  }
164  return ret.then_wrapped([this] (auto&& f) {
165  try {
166  f.get();
167  } catch (...) {
168  _done.set_exception(std::current_exception());
169  // FIXME: tell the producer to stop producing
170  throw;
171  }
172  });
173 }
174 }
seastar::promise::get_future
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:2032
seastar::noncopyable_function< future<>(T...)>
seastar
Seastar API namespace.
Definition: abort_on_ebadf.hh:24
seastar::promise
promise - allows a future value to be made available at a later time.
Definition: future.hh:957
seastar::subscription::start
void start(next_fn next)
Start receiving events from the stream.
Definition: stream.hh:138
seastar::subscription
Definition: stream.hh:123
seastar::promise::set_value
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:1013
seastar::future
A representation of a possibly not-yet-computed value.
Definition: future.hh:1337
seastar::stream
Definition: stream.hh:56
seastar::promise::set_exception
void set_exception(std::exception_ptr &&ex) noexcept
Marks the promise as failed.
Definition: future.hh:1021
seastar::noncopyable_function
Definition: noncopyable_function.hh:33