1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_ASYNC_EVENT_HPP
10  
#ifndef BOOST_CAPY_ASYNC_EVENT_HPP
11  
#define BOOST_CAPY_ASYNC_EVENT_HPP
11  
#define BOOST_CAPY_ASYNC_EVENT_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/intrusive.hpp>
14  
#include <boost/capy/detail/intrusive.hpp>
15  
#include <boost/capy/concept/executor.hpp>
15  
#include <boost/capy/concept/executor.hpp>
16  
#include <boost/capy/error.hpp>
16  
#include <boost/capy/error.hpp>
17  
#include <boost/capy/ex/io_env.hpp>
17  
#include <boost/capy/ex/io_env.hpp>
18  
#include <boost/capy/io_result.hpp>
18  
#include <boost/capy/io_result.hpp>
19  

19  

20  
#include <stop_token>
20  
#include <stop_token>
21  

21  

22  
#include <atomic>
22  
#include <atomic>
23  
#include <coroutine>
23  
#include <coroutine>
24  
#include <new>
24  
#include <new>
25  
#include <utility>
25  
#include <utility>
26  

26  

27  
/*  async_event implementation notes
27  
/*  async_event implementation notes
28  
    =================================
28  
    =================================
29  

29  

30  
    Same cancellation pattern as async_mutex (see that file for the
30  
    Same cancellation pattern as async_mutex (see that file for the
31  
    full discussion on claimed_, stop_cb lifetime, member ordering,
31  
    full discussion on claimed_, stop_cb lifetime, member ordering,
32  
    and threading assumptions).
32  
    and threading assumptions).
33  

33  

34  
    Key difference: set() wakes ALL waiters (broadcast), not one.
34  
    Key difference: set() wakes ALL waiters (broadcast), not one.
35  
    It pops every waiter from the list and posts the ones it
35  
    It pops every waiter from the list and posts the ones it
36  
    claims. Waiters already claimed by a stop callback are skipped.
36  
    claims. Waiters already claimed by a stop callback are skipped.
37  

37  

38  
    Because set() pops all waiters, a canceled waiter may have been
38  
    Because set() pops all waiters, a canceled waiter may have been
39  
    removed from the list by set() before its await_resume runs.
39  
    removed from the list by set() before its await_resume runs.
40  
    This requires a separate in_list_ flag (unlike async_mutex where
40  
    This requires a separate in_list_ flag (unlike async_mutex where
41  
    active_ served double duty). await_resume only calls remove()
41  
    active_ served double duty). await_resume only calls remove()
42  
    when in_list_ is true.
42  
    when in_list_ is true.
43  
*/
43  
*/
44  

44  

45  
namespace boost {
45  
namespace boost {
46  
namespace capy {
46  
namespace capy {
47  

47  

48  
/** An asynchronous event for coroutines.
48  
/** An asynchronous event for coroutines.
49  

49  

50  
    This event provides a way to notify multiple coroutines that some
50  
    This event provides a way to notify multiple coroutines that some
51  
    condition has occurred. When a coroutine awaits an unset event, it
51  
    condition has occurred. When a coroutine awaits an unset event, it
52  
    suspends and is added to a wait queue. When the event is set, all
52  
    suspends and is added to a wait queue. When the event is set, all
53  
    waiting coroutines are resumed.
53  
    waiting coroutines are resumed.
54  

54  

55  
    @par Cancellation
55  
    @par Cancellation
56  

56  

57  
    When a coroutine is suspended waiting for the event and its stop
57  
    When a coroutine is suspended waiting for the event and its stop
58  
    token is triggered, the waiter completes with `error::canceled`
58  
    token is triggered, the waiter completes with `error::canceled`
59  
    instead of waiting for `set()`.
59  
    instead of waiting for `set()`.
60  

60  

61  
    Cancellation only applies while the coroutine is suspended in the
61  
    Cancellation only applies while the coroutine is suspended in the
62  
    wait queue. If the event is already set when `wait()` is called,
62  
    wait queue. If the event is already set when `wait()` is called,
63  
    the wait completes immediately even if the stop token is already
63  
    the wait completes immediately even if the stop token is already
64  
    signaled.
64  
    signaled.
65  

65  

66  
    @par Zero Allocation
66  
    @par Zero Allocation
67  

67  

68  
    No heap allocation occurs for wait operations.
68  
    No heap allocation occurs for wait operations.
69  

69  

70  
    @par Thread Safety
70  
    @par Thread Safety
71 -
    Distinct objects: Safe.@n
 
72 -
    Shared objects: Unsafe.
 
73 -

 
74  

71  

75  
    The event operations are designed for single-threaded use on one
72  
    The event operations are designed for single-threaded use on one
76  
    executor. The stop callback may fire from any thread.
73  
    executor. The stop callback may fire from any thread.
77 -
    This type is non-copyable and non-movable because suspended
 
78 -
    waiters hold intrusive pointers into the event's internal list.
 
79 -

 
80  

74  

81  
    @par Example
75  
    @par Example
82  
    @code
76  
    @code
83  
    async_event event;
77  
    async_event event;
84  

78  

85  
    task<> waiter() {
79  
    task<> waiter() {
86  
        auto [ec] = co_await event.wait();
80  
        auto [ec] = co_await event.wait();
87  
        if(ec)
81  
        if(ec)
88  
            co_return;
82  
            co_return;
89  
        // ... event was set ...
83  
        // ... event was set ...
90  
    }
84  
    }
91  

85  

92  
    task<> notifier() {
86  
    task<> notifier() {
93  
        // ... do some work ...
87  
        // ... do some work ...
94  
        event.set();  // Wake all waiters
88  
        event.set();  // Wake all waiters
95  
    }
89  
    }
96  
    @endcode
90  
    @endcode
97  
*/
91  
*/
98  
class async_event
92  
class async_event
99  
{
93  
{
100  
public:
94  
public:
101  
    class wait_awaiter;
95  
    class wait_awaiter;
102  

96  

103  
private:
97  
private:
104  
    bool set_ = false;
98  
    bool set_ = false;
105  
    detail::intrusive_list<wait_awaiter> waiters_;
99  
    detail::intrusive_list<wait_awaiter> waiters_;
106  

100  

107  
public:
101  
public:
108  
    /** Awaiter returned by wait().
102  
    /** Awaiter returned by wait().
109  
    */
103  
    */
110  
    class wait_awaiter
104  
    class wait_awaiter
111  
        : public detail::intrusive_list<wait_awaiter>::node
105  
        : public detail::intrusive_list<wait_awaiter>::node
112  
    {
106  
    {
113  
        friend class async_event;
107  
        friend class async_event;
114  

108  

115  
        async_event* e_;
109  
        async_event* e_;
116  
        std::coroutine_handle<> h_;
110  
        std::coroutine_handle<> h_;
117  
        executor_ref ex_;
111  
        executor_ref ex_;
118  

112  

119  
        // Declared before stop_cb_buf_: the callback
113  
        // Declared before stop_cb_buf_: the callback
120  
        // accesses these members, so they must still be
114  
        // accesses these members, so they must still be
121  
        // alive if the stop_cb_ destructor blocks.
115  
        // alive if the stop_cb_ destructor blocks.
122  
        std::atomic<bool> claimed_{false};
116  
        std::atomic<bool> claimed_{false};
123  
        bool canceled_ = false;
117  
        bool canceled_ = false;
124  
        bool active_ = false;
118  
        bool active_ = false;
125  
        bool in_list_ = false;
119  
        bool in_list_ = false;
126  

120  

127  
        struct cancel_fn
121  
        struct cancel_fn
128  
        {
122  
        {
129  
            wait_awaiter* self_;
123  
            wait_awaiter* self_;
130  

124  

131  
            void operator()() const noexcept
125  
            void operator()() const noexcept
132  
            {
126  
            {
133  
                if(!self_->claimed_.exchange(
127  
                if(!self_->claimed_.exchange(
134  
                    true, std::memory_order_acq_rel))
128  
                    true, std::memory_order_acq_rel))
135  
                {
129  
                {
136  
                    self_->canceled_ = true;
130  
                    self_->canceled_ = true;
137  
                    self_->ex_.post(self_->h_);
131  
                    self_->ex_.post(self_->h_);
138  
                }
132  
                }
139  
            }
133  
            }
140  
        };
134  
        };
141  

135  

142  
        using stop_cb_t =
136  
        using stop_cb_t =
143  
            std::stop_callback<cancel_fn>;
137  
            std::stop_callback<cancel_fn>;
144  

138  

145  
        // Aligned storage for stop_cb_t. Declared last:
139  
        // Aligned storage for stop_cb_t. Declared last:
146  
        // its destructor may block while the callback
140  
        // its destructor may block while the callback
147  
        // accesses the members above.
141  
        // accesses the members above.
148  
#ifdef _MSC_VER
142  
#ifdef _MSC_VER
149  
# pragma warning(push)
143  
# pragma warning(push)
150  
# pragma warning(disable: 4324) // padded due to alignas
144  
# pragma warning(disable: 4324) // padded due to alignas
151  
#endif
145  
#endif
152  
        alignas(stop_cb_t)
146  
        alignas(stop_cb_t)
153  
            unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
147  
            unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
154  
#ifdef _MSC_VER
148  
#ifdef _MSC_VER
155  
# pragma warning(pop)
149  
# pragma warning(pop)
156  
#endif
150  
#endif
157  

151  

158  
        stop_cb_t& stop_cb_() noexcept
152  
        stop_cb_t& stop_cb_() noexcept
159  
        {
153  
        {
160  
            return *reinterpret_cast<stop_cb_t*>(
154  
            return *reinterpret_cast<stop_cb_t*>(
161  
                stop_cb_buf_);
155  
                stop_cb_buf_);
162  
        }
156  
        }
163  

157  

164  
    public:
158  
    public:
165  
        ~wait_awaiter()
159  
        ~wait_awaiter()
166  
        {
160  
        {
167  
            if(active_)
161  
            if(active_)
168  
                stop_cb_().~stop_cb_t();
162  
                stop_cb_().~stop_cb_t();
169  
            if(in_list_)
163  
            if(in_list_)
170  
                e_->waiters_.remove(this);
164  
                e_->waiters_.remove(this);
171  
        }
165  
        }
172  

166  

173  
        explicit wait_awaiter(async_event* e) noexcept
167  
        explicit wait_awaiter(async_event* e) noexcept
174  
            : e_(e)
168  
            : e_(e)
175  
        {
169  
        {
176  
        }
170  
        }
177  

171  

178  
        wait_awaiter(wait_awaiter&& o) noexcept
172  
        wait_awaiter(wait_awaiter&& o) noexcept
179  
            : e_(o.e_)
173  
            : e_(o.e_)
180  
            , h_(o.h_)
174  
            , h_(o.h_)
181  
            , ex_(o.ex_)
175  
            , ex_(o.ex_)
182  
            , claimed_(o.claimed_.load(
176  
            , claimed_(o.claimed_.load(
183  
                std::memory_order_relaxed))
177  
                std::memory_order_relaxed))
184  
            , canceled_(o.canceled_)
178  
            , canceled_(o.canceled_)
185  
            , active_(std::exchange(o.active_, false))
179  
            , active_(std::exchange(o.active_, false))
186  
            , in_list_(std::exchange(o.in_list_, false))
180  
            , in_list_(std::exchange(o.in_list_, false))
187  
        {
181  
        {
188  
        }
182  
        }
189  

183  

190  
        wait_awaiter(wait_awaiter const&) = delete;
184  
        wait_awaiter(wait_awaiter const&) = delete;
191  
        wait_awaiter& operator=(wait_awaiter const&) = delete;
185  
        wait_awaiter& operator=(wait_awaiter const&) = delete;
192  
        wait_awaiter& operator=(wait_awaiter&&) = delete;
186  
        wait_awaiter& operator=(wait_awaiter&&) = delete;
193  

187  

194  
        bool await_ready() const noexcept
188  
        bool await_ready() const noexcept
195  
        {
189  
        {
196  
            return e_->set_;
190  
            return e_->set_;
197  
        }
191  
        }
198  

192  

199  
        /** IoAwaitable protocol overload. */
193  
        /** IoAwaitable protocol overload. */
200  
        std::coroutine_handle<>
194  
        std::coroutine_handle<>
201  
        await_suspend(
195  
        await_suspend(
202  
            std::coroutine_handle<> h,
196  
            std::coroutine_handle<> h,
203  
            io_env const* env) noexcept
197  
            io_env const* env) noexcept
204  
        {
198  
        {
205  
            if(env->stop_token.stop_requested())
199  
            if(env->stop_token.stop_requested())
206  
            {
200  
            {
207  
                canceled_ = true;
201  
                canceled_ = true;
208  
                return h;
202  
                return h;
209  
            }
203  
            }
210  
            h_ = h;
204  
            h_ = h;
211  
            ex_ = env->executor;
205  
            ex_ = env->executor;
212  
            e_->waiters_.push_back(this);
206  
            e_->waiters_.push_back(this);
213  
            in_list_ = true;
207  
            in_list_ = true;
214  
            ::new(stop_cb_buf_) stop_cb_t(
208  
            ::new(stop_cb_buf_) stop_cb_t(
215  
                env->stop_token, cancel_fn{this});
209  
                env->stop_token, cancel_fn{this});
216  
            active_ = true;
210  
            active_ = true;
217  
            return std::noop_coroutine();
211  
            return std::noop_coroutine();
218  
        }
212  
        }
219  

213  

220  
        io_result<> await_resume() noexcept
214  
        io_result<> await_resume() noexcept
221  
        {
215  
        {
222  
            if(active_)
216  
            if(active_)
223  
            {
217  
            {
224  
                stop_cb_().~stop_cb_t();
218  
                stop_cb_().~stop_cb_t();
225  
                active_ = false;
219  
                active_ = false;
226  
            }
220  
            }
227  
            if(canceled_)
221  
            if(canceled_)
228  
            {
222  
            {
229  
                if(in_list_)
223  
                if(in_list_)
230  
                {
224  
                {
231  
                    e_->waiters_.remove(this);
225  
                    e_->waiters_.remove(this);
232  
                    in_list_ = false;
226  
                    in_list_ = false;
233  
                }
227  
                }
234  
                return {make_error_code(
228  
                return {make_error_code(
235  
                    error::canceled)};
229  
                    error::canceled)};
236  
            }
230  
            }
237  
            return {{}};
231  
            return {{}};
238  
        }
232  
        }
239  
    };
233  
    };
240 -
    /// Construct an unset event.
 
241  

234  

242  
    async_event() = default;
235  
    async_event() = default;
243  

236  

244 -
    /// Copy constructor (deleted).
237 +
    // Non-copyable, non-movable
245 -

 
246 -
    /// Copy assignment (deleted).
 
247  
    async_event(async_event const&) = delete;
238  
    async_event(async_event const&) = delete;
248 -

 
249 -
    /// Move constructor (deleted).
 
250 -
    async_event(async_event&&) = delete;
 
251 -

 
252 -
    /// Move assignment (deleted).
 
253 -
    async_event& operator=(async_event&&) = delete;
 
254  
    async_event& operator=(async_event const&) = delete;
239  
    async_event& operator=(async_event const&) = delete;
255  

240  

256  
    /** Returns an awaiter that waits until the event is set.
241  
    /** Returns an awaiter that waits until the event is set.
257  

242  

258  
        If the event is already set, completes immediately.
243  
        If the event is already set, completes immediately.
259  

244  

260  
        @return An awaitable yielding `(error_code)`.
245  
        @return An awaitable yielding `(error_code)`.
261  
    */
246  
    */
262  
    wait_awaiter wait() noexcept
247  
    wait_awaiter wait() noexcept
263  
    {
248  
    {
264  
        return wait_awaiter{this};
249  
        return wait_awaiter{this};
265  
    }
250  
    }
266  

251  

267  
    /** Sets the event.
252  
    /** Sets the event.
268  

253  

269  
        All waiting coroutines are resumed. Canceled waiters
254  
        All waiting coroutines are resumed. Canceled waiters
270  
        are skipped. Subsequent calls to wait() complete
255  
        are skipped. Subsequent calls to wait() complete
271  
        immediately until clear() is called.
256  
        immediately until clear() is called.
272  
    */
257  
    */
273  
    void set()
258  
    void set()
274  
    {
259  
    {
275  
        set_ = true;
260  
        set_ = true;
276  
        for(;;)
261  
        for(;;)
277  
        {
262  
        {
278  
            auto* w = waiters_.pop_front();
263  
            auto* w = waiters_.pop_front();
279  
            if(!w)
264  
            if(!w)
280  
                break;
265  
                break;
281  
            w->in_list_ = false;
266  
            w->in_list_ = false;
282  
            if(!w->claimed_.exchange(
267  
            if(!w->claimed_.exchange(
283  
                true, std::memory_order_acq_rel))
268  
                true, std::memory_order_acq_rel))
284  
            {
269  
            {
285  
                w->ex_.post(w->h_);
270  
                w->ex_.post(w->h_);
286  
            }
271  
            }
287  
        }
272  
        }
288  
    }
273  
    }
289  

274  

290  
    /** Clears the event.
275  
    /** Clears the event.
291  

276  

292  
        Subsequent calls to wait() will suspend until
277  
        Subsequent calls to wait() will suspend until
293  
        set() is called again.
278  
        set() is called again.
294  
    */
279  
    */
295  
    void clear() noexcept
280  
    void clear() noexcept
296  
    {
281  
    {
297  
        set_ = false;
282  
        set_ = false;
298  
    }
283  
    }
299  

284  

300  
    /** Returns true if the event is currently set.
285  
    /** Returns true if the event is currently set.
301  
    */
286  
    */
302  
    bool is_set() const noexcept
287  
    bool is_set() const noexcept
303  
    {
288  
    {
304  
        return set_;
289  
        return set_;
305  
    }
290  
    }
306  
};
291  
};
307  

292  

308  
} // namespace capy
293  
} // namespace capy
309  
} // namespace boost
294  
} // namespace boost
310  

295  

311  
#endif
296  
#endif