Seastar
High performance C++ framework for concurrent servers
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Modules Pages
stream.hh
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
20 */
21
22#pragma once
23
24#include <seastar/core/future.hh>
25#include <seastar/util/modules.hh>
26#ifndef SEASTAR_MODULE
27#include <exception>
28#include <cassert>
29#endif
30
31namespace seastar {
32
33SEASTAR_MODULE_EXPORT_BEGIN
34
35// A stream/subscription pair is similar to a promise/future pair,
36// but apply to a sequence of values instead of a single value.
37//
38// A stream<> is the producer side. It may call produce() as long
39// as the future<> returned from the previous invocation is ready.
40// To signify no more data is available, call close().
41//
42// A subscription<> is the consumer side. It is created by a call
43// to stream::listen(). Calling subscription::start(),
44// which registers the data processing callback, starts processing
45// events. It may register for end-of-stream notifications by
46// chaining the when_done() future, which also delivers error
47// events (as exceptions).
48//
49// The consumer can pause generation of new data by returning
50// a non-ready future; when the future becomes ready, the producer
51// will resume processing.
52
53template <typename... T>
54class stream;
55
56template <typename... T>
57class subscription;
58
59template <typename... T>
60class stream {
61public:
63
64private:
65 promise<> _done;
66 promise<> _ready;
67 next_fn _next;
68
72 void start(next_fn next) {
73 _next = std::move(next);
74 _ready.set_value();
75 }
76
77public:
78 stream() = default;
79 stream(const stream&) = delete;
80 stream(stream&&) = delete;
81 void operator=(const stream&) = delete;
82 void operator=(stream&&) = delete;
83
84 // Returns a subscription that reads value from this
85 // stream.
86 subscription<T...> listen() {
87 return subscription<T...>(this);
88 }
89
90 // Returns a subscription that reads value from this
91 // stream, and also sets up the listen function.
92 subscription<T...> listen(next_fn next) {
93 start(std::move(next));
94 return subscription<T...>(this);
95 }
96
97 // Becomes ready when the listener is ready to accept
98 // values. Call only once, when beginning to produce
99 // values.
100 future<> started() {
101 return _ready.get_future();
102 }
103
104 // Produce a value. Call only after started(), and after
105 // a previous produce() is ready.
106 future<> produce(T... data);
107
108 // End the stream. Call only after started(), and after
109 // a previous produce() is ready. No functions may be called
110 // after this.
111 void close() {
112 _done.set_value();
113 }
114
115 // Signal an error. Call only after started(), and after
116 // a previous produce() is ready. No functions may be called
117 // after this.
118 template <typename E>
119 void set_exception(E ex) {
120 _done.set_exception(ex);
121 }
122
123 friend class subscription<T...>;
124};
125
126template <typename... T>
128 stream<T...>* _stream;
129 future<> _done;
130 explicit subscription(stream<T...>* s) : _stream(s), _done(s->_done.get_future()) {
131 }
132
133public:
134 using next_fn = typename stream<T...>::next_fn;
135 subscription(subscription&& x) : _stream(x._stream), _done(std::move(x._done)) {
136 x._stream = nullptr;
137 }
138
142 void start(next_fn next) {
143 return _stream->start(std::move(next));
144 }
145
146 // Becomes ready when the stream is empty, or when an error
147 // happens (in that case, an exception is held).
148 future<> done() {
149 return std::move(_done);
150 }
151
152 friend class stream<T...>;
153};
154SEASTAR_MODULE_EXPORT_END
155
156template <typename... T>
157inline
158future<>
159stream<T...>::produce(T... data) {
160 auto ret = futurize_invoke(_next, std::move(data)...);
161 if (ret.available() && !ret.failed()) {
162 // Native network stack depends on stream::produce() returning
163 // a ready future to push packets along without dropping. As
164 // a temporary workaround, special case a ready, unfailed future
165 // and return it immediately, so that then_wrapped(), below,
166 // doesn't convert a ready future to an unready one.
167 return ret;
168 }
169 return ret.then_wrapped([this] (auto&& f) {
170 try {
171 f.get();
172 } catch (...) {
173 _done.set_exception(std::current_exception());
174 // FIXME: tell the producer to stop producing
175 throw;
176 }
177 });
178}
179}
A representation of a possibly not-yet-computed value.
Definition: future.hh:1240
void set_value(A &&... a) noexcept
Sets the promises value.
Definition: future.hh:990
void set_exception(std::exception_ptr &&ex) noexcept
Marks the promise as failed.
Definition: future.hh:998
Definition: stream.hh:60
Definition: stream.hh:127
void start(next_fn next)
Start receiving events from the stream.
Definition: stream.hh:142
future< T > get_future() noexcept
Gets the promise's associated future.
Definition: future.hh:1926
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
Definition: noncopyable_function.hh:37