Seastar
High performance C++ framework for concurrent servers
expiring_fifo.hh
1 /*
2  * This file is open source software, licensed to you under the terms
3  * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4  * distributed with this work for additional information regarding copyright
5  * ownership. You may not use this file except in compliance with the License.
6  *
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an
13  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14  * KIND, either express or implied. See the License for the
15  * specific language governing permissions and limitations
16  * under the License.
17  */
18 /*
19  * Copyright (C) 2016 ScyllaDB
20  */
21 
22 #pragma once
23 
24 #ifndef SEASTAR_MODULE
25 #include <seastar/core/future.hh>
26 #include <seastar/core/chunked_fifo.hh>
27 #include <stdexcept>
28 #include <exception>
29 #include <memory>
30 #include <seastar/core/timer.hh>
31 #include <seastar/core/lowres_clock.hh>
32 #include <seastar/core/timed_out_error.hh>
33 #include <seastar/util/modules.hh>
34 #endif
35 
36 namespace seastar {
37 
38 template<typename T>
39 struct dummy_expiry {
40  void operator()(T&) noexcept {};
41 };
42 
43 template<typename... T>
45  void operator()(promise<T...>& pr) noexcept {
46  pr.set_exception(std::make_exception_ptr(timed_out_error()));
47  };
48 };
49 
58 SEASTAR_MODULE_EXPORT
59 template <typename T, typename OnExpiry = dummy_expiry<T>, typename Clock = lowres_clock>
61 public:
62  using clock = Clock;
63  using time_point = typename Clock::time_point;
64 private:
65  struct entry {
66  std::optional<T> payload; // disengaged means that it's expired
67  timer<Clock> tr;
68  entry(T&& payload_) : payload(std::move(payload_)) {}
69  entry(const T& payload_) : payload(payload_) {}
70  entry(T payload_, expiring_fifo& ef, time_point timeout)
71  : payload(std::move(payload_))
72  , tr([this, &ef] {
73  ef._on_expiry(*payload);
74  payload = std::nullopt;
75  --ef._size;
76  ef.drop_expired_front();
77  })
78  {
79  tr.arm(timeout);
80  }
81  entry(entry&& x) = delete;
82  entry(const entry& x) = delete;
83  };
84 
85  // If engaged, represents the first element.
86  // This is to avoid large allocations done by chunked_fifo for single-element cases.
87  // expiring_fifo is used to implement wait lists in synchronization primitives
88  // and in some uses it's common to have at most one waiter.
89  std::unique_ptr<entry> _front;
90 
91  // There is an invariant that the front element is never expired.
92  chunked_fifo<entry> _list;
93  OnExpiry _on_expiry;
94  size_t _size = 0;
95 
96  // Ensures that front() is not expired by dropping expired elements from the front.
97  void drop_expired_front() noexcept {
98  while (!_list.empty() && !_list.front().payload) {
99  _list.pop_front();
100  }
101  if (_front && !_front->payload) {
102  _front.reset();
103  }
104  }
105 public:
106  expiring_fifo() noexcept = default;
107  expiring_fifo(OnExpiry on_expiry) noexcept(std::is_nothrow_move_constructible_v<OnExpiry>) : _on_expiry(std::move(on_expiry)) {}
108 
109  expiring_fifo(expiring_fifo&& o) noexcept
110  : expiring_fifo(std::move(o._on_expiry)) {
111  // entry objects hold a reference to this so non-empty containers cannot be moved.
112  assert(o._size == 0);
113  }
114 
115  expiring_fifo& operator=(expiring_fifo&& o) noexcept {
116  if (this != &o) {
117  this->~expiring_fifo();
118  new (this) expiring_fifo(std::move(o));
119  }
120  return *this;
121  }
122 
128  bool empty() const noexcept {
129  return _size == 0;
130  }
131 
133  explicit operator bool() const noexcept {
134  return !empty();
135  }
136 
139  T& front() noexcept {
140  if (_front) {
141  return *_front->payload;
142  }
143  return *_list.front().payload;
144  }
145 
148  const T& front() const noexcept {
149  if (_front) {
150  return *_front->payload;
151  }
152  return *_list.front().payload;
153  }
154 
158  size_t size() const noexcept {
159  return _size;
160  }
161 
166  void reserve(size_t size) {
167  return _list.reserve(size);
168  }
169 
172  void push_back(const T& payload) {
173  if (_size == 0) {
174  _front = std::make_unique<entry>(payload);
175  } else {
176  _list.emplace_back(payload);
177  }
178  ++_size;
179  }
180 
183  void push_back(T&& payload) {
184  if (_size == 0) {
185  _front = std::make_unique<entry>(std::move(payload));
186  } else {
187  _list.emplace_back(std::move(payload));
188  }
189  ++_size;
190  }
191 
195  void push_back(T&& payload, time_point timeout) {
196  if (timeout == time_point::max()) {
197  push_back(std::move(payload));
198  return;
199  }
200  if (_size == 0) {
201  _front = std::make_unique<entry>(std::move(payload), *this, timeout);
202  } else {
203  _list.emplace_back(std::move(payload), *this, timeout);
204  }
205  ++_size;
206  }
207 
210  void pop_front() noexcept {
211  if (_front) {
212  _front.reset();
213  } else {
214  _list.pop_front();
215  }
216  --_size;
217  drop_expired_front();
218  }
219 };
220 
221 }
Definition: expiring_fifo.hh:60
bool empty() const noexcept
Definition: expiring_fifo.hh:128
size_t size() const noexcept
Definition: expiring_fifo.hh:158
void pop_front() noexcept
Definition: expiring_fifo.hh:210
void push_back(const T &payload)
Definition: expiring_fifo.hh:172
T & front() noexcept
Definition: expiring_fifo.hh:139
const T & front() const noexcept
Definition: expiring_fifo.hh:148
void reserve(size_t size)
Definition: expiring_fifo.hh:166
void push_back(T &&payload)
Definition: expiring_fifo.hh:183
void push_back(T &&payload, time_point timeout)
Definition: expiring_fifo.hh:195
void set_exception(std::exception_ptr &&ex) noexcept
Marks the promise as failed.
Definition: future.hh:990
Definition: timed_out_error.hh:33
void arm(time_point until, std::optional< duration > period={}) noexcept
Seastar API namespace.
Definition: abort_on_ebadf.hh:26
Definition: expiring_fifo.hh:39
Definition: expiring_fifo.hh:44