include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp

77.5% Lines (265/342) 87.0% List of functions (40/46)
reactor_scheduler.hpp
f(x) Functions (46)
Function Calls Lines Blocks
boost::corosio::detail::reactor_find_context(boost::corosio::detail::reactor_scheduler const*) :78 1371465x 100.0% 86.0% boost::corosio::detail::reactor_flush_private_work(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&) :90 0 0.0% 0.0% boost::corosio::detail::reactor_drain_private_queue(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :107 1x 50.0% 64.0% boost::corosio::detail::reactor_scheduler::inline_budget_initial() const :252 493x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::is_single_threaded() const :258 64x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::configure_single_threaded(bool) :269 1x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::reactor_scheduler() :280 619x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::task_op::operator()() :325 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::task_op::destroy() :326 0 0.0% 0.0% boost::corosio::detail::reactor_thread_context_guard::reactor_thread_context_guard(boost::corosio::detail::reactor_scheduler const*) :372 493x 100.0% 100.0% boost::corosio::detail::reactor_thread_context_guard::~reactor_thread_context_guard() :380 493x 66.7% 80.0% boost::corosio::detail::reactor_scheduler_context::reactor_scheduler_context(boost::corosio::detail::reactor_scheduler const*, boost::corosio::detail::reactor_scheduler_context*) :392 493x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::configure_reactor(unsigned int, unsigned int, unsigned int, unsigned int) :406 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::reset_inline_budget() const :433 169736x 50.0% 43.0% boost::corosio::detail::reactor_scheduler::try_consume_inline_budget() const :464 762193x 87.5% 88.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const :480 2227x 100.0% 84.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :486 2227x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::~post_handler() :487 4454x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::operator()() :489 2218x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(std::__n4861::coroutine_handle<void>) const::post_handler::destroy() :498 9x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::post(boost::corosio::detail::scheduler_op*) const :523 171018x 100.0% 87.0% boost::corosio::detail::reactor_scheduler::running_in_this_thread() const :540 1422x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::stop() :546 466x 100.0% 82.0% boost::corosio::detail::reactor_scheduler::stopped() const :558 34x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::restart() :564 119x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::run() :570 454x 100.0% 76.0% boost::corosio::detail::reactor_scheduler::run_one() :595 2x 75.0% 64.0% boost::corosio::detail::reactor_scheduler::wait_one(long) :609 38x 100.0% 70.0% boost::corosio::detail::reactor_scheduler::poll() :623 22x 100.0% 76.0% boost::corosio::detail::reactor_scheduler::poll_one() :648 4x 100.0% 70.0% boost::corosio::detail::reactor_scheduler::work_started() :662 27805x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::work_finished() :668 39052x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::compensating_work_started() const :675 264869x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::drain_thread_queue(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&, long) const :683 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) const :696 16968x 30.0% 35.0% boost::corosio::detail::reactor_scheduler::shutdown_drain() :713 619x 100.0% 88.0% boost::corosio::detail::reactor_scheduler::signal_all(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :730 1061x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::maybe_unlock_and_signal_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :737 2401x 57.1% 50.0% boost::corosio::detail::reactor_scheduler::unlock_and_signal_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :751 538328x 85.7% 80.0% boost::corosio::detail::reactor_scheduler::clear_signal() const :763 1x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::wait_for_signal(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :769 1x 100.0% 100.0% boost::corosio::detail::reactor_scheduler::wait_for_signal_for(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, long) const :781 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler::wake_one_thread_and_unlock(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&) const :793 2401x 87.5% 92.0% boost::corosio::detail::reactor_scheduler::work_cleanup::~work_cleanup() :811 455065x 92.3% 92.0% boost::corosio::detail::reactor_scheduler::task_cleanup::~task_cleanup() :835 302784x 83.3% 86.0% boost::corosio::detail::reactor_scheduler::do_one(boost::corosio::detail::conditionally_enabled_mutex::scoped_lock&, long, boost::corosio::detail::reactor_scheduler_context*) :856 455539x 80.0% 75.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/capy/ex/execution_context.hpp>
15
16 #include <boost/corosio/detail/scheduler.hpp>
17 #include <boost/corosio/detail/scheduler_op.hpp>
18 #include <boost/corosio/detail/thread_local_ptr.hpp>
19
20 #include <atomic>
21 #include <chrono>
22 #include <coroutine>
23 #include <cstddef>
24 #include <cstdint>
25 #include <limits>
26 #include <memory>
27 #include <stdexcept>
28
29 #include <boost/corosio/detail/conditionally_enabled_mutex.hpp>
30 #include <boost/corosio/detail/conditionally_enabled_event.hpp>
31
32 namespace boost::corosio::detail {
33
34 // Forward declarations
35 class reactor_scheduler;
36 class timer_service;
37
38 /** Per-thread state for a reactor scheduler.
39
40 Each thread running a scheduler's event loop has one of these
41 on a thread-local stack. It holds a private work queue and
42 inline completion budget for speculative I/O fast paths.
43 */
44 struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
45 {
46 /// Scheduler this context belongs to.
47 reactor_scheduler const* key;
48
49 /// Next context frame on this thread's stack.
50 reactor_scheduler_context* next;
51
52 /// Private work queue for reduced contention.
53 op_queue private_queue;
54
55 /// Unflushed work count for the private queue.
56 std::int64_t private_outstanding_work;
57
58 /// Remaining inline completions allowed this cycle.
59 int inline_budget;
60
61 /// Maximum inline budget (adaptive, 2-16).
62 int inline_budget_max;
63
64 /// True if no other thread absorbed queued work last cycle.
65 bool unassisted;
66
67 /// Construct a context frame linked to @a n.
68 reactor_scheduler_context(
69 reactor_scheduler const* k,
70 reactor_scheduler_context* n);
71 };
72
73 /// Thread-local context stack for reactor schedulers.
74 inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
75
76 /// Find the context frame for a scheduler on this thread.
77 inline reactor_scheduler_context*
78 1371465x reactor_find_context(reactor_scheduler const* self) noexcept
79 {
80 1371465x for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
81 {
82 1368265x if (c->key == self)
83 1368265x return c;
84 }
85 3200x return nullptr;
86 }
87
88 /// Flush private work count to global counter.
89 inline void
90 reactor_flush_private_work(
91 reactor_scheduler_context* ctx,
92 std::atomic<std::int64_t>& outstanding_work) noexcept
93 {
94 if (ctx && ctx->private_outstanding_work > 0)
95 {
96 outstanding_work.fetch_add(
97 ctx->private_outstanding_work, std::memory_order_relaxed);
98 ctx->private_outstanding_work = 0;
99 }
100 }
101
102 /** Drain private queue to global queue, flushing work count first.
103
104 @return True if any ops were drained.
105 */
106 inline bool
107 1x reactor_drain_private_queue(
108 reactor_scheduler_context* ctx,
109 std::atomic<std::int64_t>& outstanding_work,
110 op_queue& completed_ops) noexcept
111 {
112 1x if (!ctx || ctx->private_queue.empty())
113 1x return false;
114
115 reactor_flush_private_work(ctx, outstanding_work);
116 completed_ops.splice(ctx->private_queue);
117 return true;
118 }
119
120 /** Non-template base for reactor-backed scheduler implementations.
121
122 Provides the complete threading model shared by epoll, kqueue,
123 and select schedulers: signal state machine, inline completion
124 budget, work counting, run/poll methods, and the do_one event
125 loop.
126
127 Derived classes provide platform-specific hooks by overriding:
128 - `run_task(lock, ctx)` to run the reactor poll
129 - `interrupt_reactor()` to wake a blocked reactor
130
131 De-templated from the original CRTP design to eliminate
132 duplicate instantiations when multiple backends are compiled
133 into the same binary. Virtual dispatch for run_task (called
134 once per reactor cycle, before a blocking syscall) has
135 negligible overhead.
136
137 @par Thread Safety
138 All public member functions are thread-safe.
139 */
140 class reactor_scheduler
141 : public scheduler
142 , public capy::execution_context::service
143 {
144 public:
145 using key_type = scheduler;
146 using context_type = reactor_scheduler_context;
147 using mutex_type = conditionally_enabled_mutex;
148 using lock_type = mutex_type::scoped_lock;
149 using event_type = conditionally_enabled_event;
150
151 /// Post a coroutine for deferred execution.
152 void post(std::coroutine_handle<> h) const override;
153
154 /// Post a scheduler operation for deferred execution.
155 void post(scheduler_op* h) const override;
156
157 /// Return true if called from a thread running this scheduler.
158 bool running_in_this_thread() const noexcept override;
159
160 /// Request the scheduler to stop dispatching handlers.
161 void stop() override;
162
163 /// Return true if the scheduler has been stopped.
164 bool stopped() const noexcept override;
165
166 /// Reset the stopped state so `run()` can resume.
167 void restart() override;
168
169 /// Run the event loop until no work remains.
170 std::size_t run() override;
171
172 /// Run until one handler completes or no work remains.
173 std::size_t run_one() override;
174
175 /// Run until one handler completes or @a usec elapses.
176 std::size_t wait_one(long usec) override;
177
178 /// Run ready handlers without blocking.
179 std::size_t poll() override;
180
181 /// Run at most one ready handler without blocking.
182 std::size_t poll_one() override;
183
184 /// Increment the outstanding work count.
185 void work_started() noexcept override;
186
187 /// Decrement the outstanding work count, stopping on zero.
188 void work_finished() noexcept override;
189
190 /** Reset the thread's inline completion budget.
191
192 Called at the start of each posted completion handler to
193 grant a fresh budget for speculative inline completions.
194 */
195 void reset_inline_budget() const noexcept;
196
197 /** Consume one unit of inline budget if available.
198
199 @return True if budget was available and consumed.
200 */
201 bool try_consume_inline_budget() const noexcept;
202
203 /** Offset a forthcoming work_finished from work_cleanup.
204
205 Called by descriptor_state when all I/O returned EAGAIN and
206 no handler will be executed. Must be called from a scheduler
207 thread.
208 */
209 void compensating_work_started() const noexcept;
210
211 /** Drain work from thread context's private queue to global queue.
212
213 Flushes private work count to the global counter, then
214 transfers the queue under mutex protection.
215
216 @param queue The private queue to drain.
217 @param count Private work count to flush before draining.
218 */
219 void drain_thread_queue(op_queue& queue, std::int64_t count) const;
220
221 /** Post completed operations for deferred invocation.
222
223 If called from a thread running this scheduler, operations
224 go to the thread's private queue (fast path). Otherwise,
225 operations are added to the global queue under mutex and a
226 waiter is signaled.
227
228 @par Preconditions
229 work_started() must have been called for each operation.
230
231 @param ops Queue of operations to post.
232 */
233 void post_deferred_completions(op_queue& ops) const;
234
235 /** Apply runtime configuration to the scheduler.
236
237 Called by `io_context` after construction. Values that do
238 not apply to this backend are silently ignored.
239
240 @param max_events Event buffer size for epoll/kqueue.
241 @param budget_init Starting inline completion budget.
242 @param budget_max Hard ceiling on adaptive budget ramp-up.
243 @param unassisted Budget when single-threaded.
244 */
245 virtual void configure_reactor(
246 unsigned max_events,
247 unsigned budget_init,
248 unsigned budget_max,
249 unsigned unassisted);
250
251 /// Return the configured initial inline budget.
252 493x unsigned inline_budget_initial() const noexcept
253 {
254 493x return inline_budget_initial_;
255 }
256
257 /// Return true if single-threaded (lockless) mode is active.
258 64x bool is_single_threaded() const noexcept
259 {
260 64x return single_threaded_;
261 }
262
263 /** Enable or disable single-threaded (lockless) mode.
264
265 When enabled, all scheduler mutex and condition variable
266 operations become no-ops. Cross-thread post() is
267 undefined behavior.
268 */
269 1x void configure_single_threaded(bool v) noexcept
270 {
271 1x single_threaded_ = v;
272 1x mutex_.set_enabled(!v);
273 1x cond_.set_enabled(!v);
274 1x }
275
276 protected:
277 timer_service* timer_svc_ = nullptr;
278 bool single_threaded_ = false;
279
280 619x reactor_scheduler() = default;
281
282 /** Drain completed_ops during shutdown.
283
284 Pops all operations from the global queue and destroys them,
285 skipping the task sentinel. Signals all waiting threads.
286 Derived classes call this from their shutdown() override
287 before performing platform-specific cleanup.
288 */
289 void shutdown_drain();
290
291 /// RAII guard that re-inserts the task sentinel after `run_task`.
292 struct task_cleanup
293 {
294 reactor_scheduler const* sched;
295 lock_type* lock;
296 context_type* ctx;
297 ~task_cleanup();
298 };
299
300 mutable mutex_type mutex_{true};
301 mutable event_type cond_{true};
302 mutable op_queue completed_ops_;
303 mutable std::atomic<std::int64_t> outstanding_work_{0};
304 std::atomic<bool> stopped_{false};
305 mutable std::atomic<bool> task_running_{false};
306 mutable bool task_interrupted_ = false;
307
308 // Runtime-configurable reactor tuning parameters.
309 // Defaults match the library's built-in values.
310 unsigned max_events_per_poll_ = 128;
311 unsigned inline_budget_initial_ = 2;
312 unsigned inline_budget_max_ = 16;
313 unsigned unassisted_budget_ = 4;
314
315 /// Bit 0 of `state_`: set when the condvar should be signaled.
316 static constexpr std::size_t signaled_bit = 1;
317
318 /// Increment per waiting thread in `state_`.
319 static constexpr std::size_t waiter_increment = 2;
320 mutable std::size_t state_ = 0;
321
322 /// Sentinel op that triggers a reactor poll when dequeued.
323 struct task_op final : scheduler_op
324 {
325 void operator()() override {}
326 void destroy() override {}
327 };
328 task_op task_op_;
329
330 /// Run the platform-specific reactor poll.
331 virtual void
332 run_task(lock_type& lock, context_type* ctx,
333 long timeout_us) = 0;
334
335 /// Wake a blocked reactor (e.g. write to eventfd or pipe).
336 virtual void interrupt_reactor() const = 0;
337
338 private:
339 struct work_cleanup
340 {
341 reactor_scheduler* sched;
342 lock_type* lock;
343 context_type* ctx;
344 ~work_cleanup();
345 };
346
347 std::size_t do_one(
348 lock_type& lock, long timeout_us, context_type* ctx);
349
350 void signal_all(lock_type& lock) const;
351 bool maybe_unlock_and_signal_one(lock_type& lock) const;
352 bool unlock_and_signal_one(lock_type& lock) const;
353 void clear_signal() const;
354 void wait_for_signal(lock_type& lock) const;
355 void wait_for_signal_for(
356 lock_type& lock, long timeout_us) const;
357 void wake_one_thread_and_unlock(lock_type& lock) const;
358 };
359
360 /** RAII guard that pushes/pops a scheduler context frame.
361
362 On construction, pushes a new context frame onto the
363 thread-local stack. On destruction, drains any remaining
364 private queue items to the global queue and pops the frame.
365 */
366 struct reactor_thread_context_guard
367 {
368 /// The context frame managed by this guard.
369 reactor_scheduler_context frame_;
370
371 /// Construct the guard, pushing a frame for @a sched.
372 493x explicit reactor_thread_context_guard(
373 reactor_scheduler const* sched) noexcept
374 493x : frame_(sched, reactor_context_stack.get())
375 {
376 493x reactor_context_stack.set(&frame_);
377 493x }
378
379 /// Destroy the guard, draining private work and popping the frame.
380 493x ~reactor_thread_context_guard() noexcept
381 {
382 493x if (!frame_.private_queue.empty())
383 frame_.key->drain_thread_queue(
384 frame_.private_queue, frame_.private_outstanding_work);
385 493x reactor_context_stack.set(frame_.next);
386 493x }
387 };
388
389 // ---- Inline implementations ------------------------------------------------
390
391 inline
392 493x reactor_scheduler_context::reactor_scheduler_context(
393 reactor_scheduler const* k,
394 493x reactor_scheduler_context* n)
395 493x : key(k)
396 493x , next(n)
397 493x , private_outstanding_work(0)
398 493x , inline_budget(0)
399 493x , inline_budget_max(
400 493x static_cast<int>(k->inline_budget_initial()))
401 493x , unassisted(false)
402 {
403 493x }
404
405 inline void
406 reactor_scheduler::configure_reactor(
407 unsigned max_events,
408 unsigned budget_init,
409 unsigned budget_max,
410 unsigned unassisted)
411 {
412 if (max_events < 1 ||
413 max_events > static_cast<unsigned>(std::numeric_limits<int>::max()))
414 throw std::out_of_range(
415 "max_events_per_poll must be in [1, INT_MAX]");
416 if (budget_max > static_cast<unsigned>(std::numeric_limits<int>::max()))
417 throw std::out_of_range(
418 "inline_budget_max must be in [0, INT_MAX]");
419
420 // Clamp initial and unassisted to budget_max.
421 if (budget_init > budget_max)
422 budget_init = budget_max;
423 if (unassisted > budget_max)
424 unassisted = budget_max;
425
426 max_events_per_poll_ = max_events;
427 inline_budget_initial_ = budget_init;
428 inline_budget_max_ = budget_max;
429 unassisted_budget_ = unassisted;
430 }
431
432 inline void
433 169736x reactor_scheduler::reset_inline_budget() const noexcept
434 {
435 // When budget is disabled (max==0), all paths below would no-op
436 // (inline_budget stays 0). Skip the TLS lookup entirely.
437 169736x if (inline_budget_max_ == 0)
438 return;
439 169736x if (auto* ctx = reactor_find_context(this))
440 {
441 // Cap when no other thread absorbed queued work
442 169736x if (ctx->unassisted)
443 {
444 169736x ctx->inline_budget_max =
445 169736x static_cast<int>(unassisted_budget_);
446 169736x ctx->inline_budget =
447 169736x static_cast<int>(unassisted_budget_);
448 169736x return;
449 }
450 // Ramp up when previous cycle fully consumed budget.
451 // max(1, ...) ensures the doubling escapes zero.
452 if (ctx->inline_budget == 0)
453 ctx->inline_budget_max = (std::min)(
454 (std::max)(1, ctx->inline_budget_max) * 2,
455 static_cast<int>(inline_budget_max_));
456 else if (ctx->inline_budget < ctx->inline_budget_max)
457 ctx->inline_budget_max =
458 static_cast<int>(inline_budget_initial_);
459 ctx->inline_budget = ctx->inline_budget_max;
460 }
461 }
462
463 inline bool
464 762193x reactor_scheduler::try_consume_inline_budget() const noexcept
465 {
466 762193x if (inline_budget_max_ == 0)
467 return false;
468 762193x if (auto* ctx = reactor_find_context(this))
469 {
470 762193x if (ctx->inline_budget > 0)
471 {
472 609759x --ctx->inline_budget;
473 609759x return true;
474 }
475 }
476 152434x return false;
477 }
478
479 inline void
480 2227x reactor_scheduler::post(std::coroutine_handle<> h) const
481 {
482 struct post_handler final : scheduler_op
483 {
484 std::coroutine_handle<> h_;
485
486 2227x explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
487 4454x ~post_handler() override = default;
488
489 2218x void operator()() override
490 {
491 2218x auto saved = h_;
492 2218x delete this;
493 // Ensure stores from the posting thread are visible
494 std::atomic_thread_fence(std::memory_order_acquire);
495 2218x saved.resume();
496 2218x }
497
498 9x void destroy() override
499 {
500 9x auto saved = h_;
501 9x delete this;
502 9x saved.destroy();
503 9x }
504 };
505
506 2227x auto ph = std::make_unique<post_handler>(h);
507
508 2227x if (auto* ctx = reactor_find_context(this))
509 {
510 6x ++ctx->private_outstanding_work;
511 6x ctx->private_queue.push(ph.release());
512 6x return;
513 }
514
515 2221x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
516
517 2221x lock_type lock(mutex_);
518 2221x completed_ops_.push(ph.release());
519 2221x wake_one_thread_and_unlock(lock);
520 2227x }
521
522 inline void
523 171018x reactor_scheduler::post(scheduler_op* h) const
524 {
525 171018x if (auto* ctx = reactor_find_context(this))
526 {
527 170838x ++ctx->private_outstanding_work;
528 170838x ctx->private_queue.push(h);
529 170838x return;
530 }
531
532 180x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
533
534 180x lock_type lock(mutex_);
535 180x completed_ops_.push(h);
536 180x wake_one_thread_and_unlock(lock);
537 180x }
538
539 inline bool
540 1422x reactor_scheduler::running_in_this_thread() const noexcept
541 {
542 1422x return reactor_find_context(this) != nullptr;
543 }
544
545 inline void
546 466x reactor_scheduler::stop()
547 {
548 466x lock_type lock(mutex_);
549 466x if (!stopped_.load(std::memory_order_acquire))
550 {
551 442x stopped_.store(true, std::memory_order_release);
552 442x signal_all(lock);
553 442x interrupt_reactor();
554 }
555 466x }
556
557 inline bool
558 34x reactor_scheduler::stopped() const noexcept
559 {
560 34x return stopped_.load(std::memory_order_acquire);
561 }
562
563 inline void
564 119x reactor_scheduler::restart()
565 {
566 119x stopped_.store(false, std::memory_order_release);
567 119x }
568
569 inline std::size_t
570 454x reactor_scheduler::run()
571 {
572 908x if (outstanding_work_.load(std::memory_order_acquire) == 0)
573 {
574 14x stop();
575 14x return 0;
576 }
577
578 440x reactor_thread_context_guard ctx(this);
579 440x lock_type lock(mutex_);
580
581 440x std::size_t n = 0;
582 for (;;)
583 {
584 455456x if (!do_one(lock, -1, &ctx.frame_))
585 440x break;
586 455016x if (n != (std::numeric_limits<std::size_t>::max)())
587 455016x ++n;
588 455016x if (!lock.owns_lock())
589 293049x lock.lock();
590 }
591 440x return n;
592 440x }
593
594 inline std::size_t
595 2x reactor_scheduler::run_one()
596 {
597 4x if (outstanding_work_.load(std::memory_order_acquire) == 0)
598 {
599 stop();
600 return 0;
601 }
602
603 2x reactor_thread_context_guard ctx(this);
604 2x lock_type lock(mutex_);
605 2x return do_one(lock, -1, &ctx.frame_);
606 2x }
607
608 inline std::size_t
609 38x reactor_scheduler::wait_one(long usec)
610 {
611 76x if (outstanding_work_.load(std::memory_order_acquire) == 0)
612 {
613 6x stop();
614 6x return 0;
615 }
616
617 32x reactor_thread_context_guard ctx(this);
618 32x lock_type lock(mutex_);
619 32x return do_one(lock, usec, &ctx.frame_);
620 32x }
621
622 inline std::size_t
623 22x reactor_scheduler::poll()
624 {
625 44x if (outstanding_work_.load(std::memory_order_acquire) == 0)
626 {
627 5x stop();
628 5x return 0;
629 }
630
631 17x reactor_thread_context_guard ctx(this);
632 17x lock_type lock(mutex_);
633
634 17x std::size_t n = 0;
635 for (;;)
636 {
637 47x if (!do_one(lock, 0, &ctx.frame_))
638 17x break;
639 30x if (n != (std::numeric_limits<std::size_t>::max)())
640 30x ++n;
641 30x if (!lock.owns_lock())
642 22x lock.lock();
643 }
644 17x return n;
645 17x }
646
647 inline std::size_t
648 4x reactor_scheduler::poll_one()
649 {
650 8x if (outstanding_work_.load(std::memory_order_acquire) == 0)
651 {
652 2x stop();
653 2x return 0;
654 }
655
656 2x reactor_thread_context_guard ctx(this);
657 2x lock_type lock(mutex_);
658 2x return do_one(lock, 0, &ctx.frame_);
659 2x }
660
661 inline void
662 27805x reactor_scheduler::work_started() noexcept
663 {
664 27805x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
665 27805x }
666
667 inline void
668 39052x reactor_scheduler::work_finished() noexcept
669 {
670 78104x if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
671 434x stop();
672 39052x }
673
674 inline void
675 264869x reactor_scheduler::compensating_work_started() const noexcept
676 {
677 264869x auto* ctx = reactor_find_context(this);
678 264869x if (ctx)
679 264869x ++ctx->private_outstanding_work;
680 264869x }
681
682 inline void
683 reactor_scheduler::drain_thread_queue(
684 op_queue& queue, std::int64_t count) const
685 {
686 if (count > 0)
687 outstanding_work_.fetch_add(count, std::memory_order_relaxed);
688
689 lock_type lock(mutex_);
690 completed_ops_.splice(queue);
691 if (count > 0)
692 maybe_unlock_and_signal_one(lock);
693 }
694
695 inline void
696 16968x reactor_scheduler::post_deferred_completions(op_queue& ops) const
697 {
698 16968x if (ops.empty())
699 16968x return;
700
701 if (auto* ctx = reactor_find_context(this))
702 {
703 ctx->private_queue.splice(ops);
704 return;
705 }
706
707 lock_type lock(mutex_);
708 completed_ops_.splice(ops);
709 wake_one_thread_and_unlock(lock);
710 }
711
712 inline void
713 619x reactor_scheduler::shutdown_drain()
714 {
715 619x lock_type lock(mutex_);
716
717 1342x while (auto* h = completed_ops_.pop())
718 {
719 723x if (h == &task_op_)
720 619x continue;
721 104x lock.unlock();
722 104x h->destroy();
723 104x lock.lock();
724 723x }
725
726 619x signal_all(lock);
727 619x }
728
729 inline void
730 1061x reactor_scheduler::signal_all(lock_type&) const
731 {
732 1061x state_ |= signaled_bit;
733 1061x cond_.notify_all();
734 1061x }
735
736 inline bool
737 2401x reactor_scheduler::maybe_unlock_and_signal_one(
738 lock_type& lock) const
739 {
740 2401x state_ |= signaled_bit;
741 2401x if (state_ > signaled_bit)
742 {
743 lock.unlock();
744 cond_.notify_one();
745 return true;
746 }
747 2401x return false;
748 }
749
750 inline bool
751 538328x reactor_scheduler::unlock_and_signal_one(
752 lock_type& lock) const
753 {
754 538328x state_ |= signaled_bit;
755 538328x bool have_waiters = state_ > signaled_bit;
756 538328x lock.unlock();
757 538328x if (have_waiters)
758 cond_.notify_one();
759 538328x return have_waiters;
760 }
761
762 inline void
763 1x reactor_scheduler::clear_signal() const
764 {
765 1x state_ &= ~signaled_bit;
766 1x }
767
768 inline void
769 1x reactor_scheduler::wait_for_signal(
770 lock_type& lock) const
771 {
772 2x while ((state_ & signaled_bit) == 0)
773 {
774 1x state_ += waiter_increment;
775 1x cond_.wait(lock);
776 1x state_ -= waiter_increment;
777 }
778 1x }
779
780 inline void
781 reactor_scheduler::wait_for_signal_for(
782 lock_type& lock, long timeout_us) const
783 {
784 if ((state_ & signaled_bit) == 0)
785 {
786 state_ += waiter_increment;
787 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
788 state_ -= waiter_increment;
789 }
790 }
791
792 inline void
793 2401x reactor_scheduler::wake_one_thread_and_unlock(
794 lock_type& lock) const
795 {
796 2401x if (maybe_unlock_and_signal_one(lock))
797 return;
798
799 2401x if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
800 {
801 57x task_interrupted_ = true;
802 57x lock.unlock();
803 57x interrupt_reactor();
804 }
805 else
806 {
807 2344x lock.unlock();
808 }
809 }
810
811 455065x inline reactor_scheduler::work_cleanup::~work_cleanup()
812 {
813 455065x if (ctx)
814 {
815 455065x std::int64_t produced = ctx->private_outstanding_work;
816 455065x if (produced > 1)
817 11x sched->outstanding_work_.fetch_add(
818 produced - 1, std::memory_order_relaxed);
819 455054x else if (produced < 1)
820 28215x sched->work_finished();
821 455065x ctx->private_outstanding_work = 0;
822
823 455065x if (!ctx->private_queue.empty())
824 {
825 161981x lock->lock();
826 161981x sched->completed_ops_.splice(ctx->private_queue);
827 }
828 }
829 else
830 {
831 sched->work_finished();
832 }
833 455065x }
834
835 605568x inline reactor_scheduler::task_cleanup::~task_cleanup()
836 {
837 302784x if (!ctx)
838 return;
839
840 302784x if (ctx->private_outstanding_work > 0)
841 {
842 8842x sched->outstanding_work_.fetch_add(
843 8842x ctx->private_outstanding_work, std::memory_order_relaxed);
844 8842x ctx->private_outstanding_work = 0;
845 }
846
847 302784x if (!ctx->private_queue.empty())
848 {
849 8842x if (!lock->owns_lock())
850 lock->lock();
851 8842x sched->completed_ops_.splice(ctx->private_queue);
852 }
853 302784x }
854
855 inline std::size_t
856 455539x reactor_scheduler::do_one(
857 lock_type& lock, long timeout_us, context_type* ctx)
858 {
859 for (;;)
860 {
861 758307x if (stopped_.load(std::memory_order_acquire))
862 444x return 0;
863
864 757863x scheduler_op* op = completed_ops_.pop();
865
866 // Handle reactor sentinel — time to poll for I/O
867 757863x if (op == &task_op_)
868 {
869 bool more_handlers =
870 302797x !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
871
872 522331x if (!more_handlers &&
873 439068x (outstanding_work_.load(std::memory_order_acquire) == 0 ||
874 timeout_us == 0))
875 {
876 13x completed_ops_.push(&task_op_);
877 13x return 0;
878 }
879
880 302784x long task_timeout_us = more_handlers ? 0 : timeout_us;
881 302784x task_interrupted_ = task_timeout_us == 0;
882 302784x task_running_.store(true, std::memory_order_release);
883
884 302784x if (more_handlers)
885 83263x unlock_and_signal_one(lock);
886
887 try
888 {
889 302784x run_task(lock, ctx, task_timeout_us);
890 }
891 catch (...)
892 {
893 task_running_.store(false, std::memory_order_relaxed);
894 throw;
895 }
896
897 302784x task_running_.store(false, std::memory_order_relaxed);
898 302784x completed_ops_.push(&task_op_);
899 302784x if (timeout_us > 0)
900 17x return 0;
901 302767x continue;
902 302767x }
903
904 // Handle operation
905 455066x if (op != nullptr)
906 {
907 455065x bool more = !completed_ops_.empty();
908
909 455065x if (more)
910 455065x ctx->unassisted = !unlock_and_signal_one(lock);
911 else
912 {
913 ctx->unassisted = false;
914 lock.unlock();
915 }
916
917 455065x work_cleanup on_exit{this, &lock, ctx};
918 (void)on_exit;
919
920 455065x (*op)();
921 455065x return 1;
922 455065x }
923
924 // Try private queue before blocking
925 1x if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
926 continue;
927
928 2x if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
929 timeout_us == 0)
930 return 0;
931
932 1x clear_signal();
933 1x if (timeout_us < 0)
934 1x wait_for_signal(lock);
935 else
936 wait_for_signal_for(lock, timeout_us);
937 302768x }
938 }
939
940 } // namespace boost::corosio::detail
941
942 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
943