include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp

81.8% Lines (404/494) 89.6% Functions (43/48)
include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include <boost/corosio/native/native_scheduler.hpp>
21 #include <boost/corosio/detail/scheduler_op.hpp>
22
23 #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
24 #include <boost/corosio/detail/timer_service.hpp>
25 #include <boost/corosio/detail/make_err.hpp>
26 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28
29 #include <boost/corosio/detail/except.hpp>
30 #include <boost/corosio/detail/thread_local_ptr.hpp>
31
32 #include <atomic>
33 #include <chrono>
34 #include <condition_variable>
35 #include <cstddef>
36 #include <cstdint>
37 #include <limits>
38 #include <mutex>
39 #include <utility>
40
41 #include <errno.h>
42 #include <fcntl.h>
43 #include <sys/epoll.h>
44 #include <sys/eventfd.h>
45 #include <sys/socket.h>
46 #include <sys/timerfd.h>
47 #include <unistd.h>
48
49 namespace boost::corosio::detail {
50
51 struct epoll_op;
52 struct descriptor_state;
53 namespace epoll {
54 struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context;
55 } // namespace epoll
56
57 /** Linux scheduler using epoll for I/O multiplexing.
58
59 This scheduler implements the scheduler interface using Linux epoll
60 for efficient I/O event notification. It uses a single reactor model
61 where one thread runs epoll_wait while other threads
62 wait on a condition variable for handler work. This design provides:
63
64 - Handler parallelism: N posted handlers can execute on N threads
65 - No thundering herd: condition_variable wakes exactly one thread
66 - IOCP parity: Behavior matches Windows I/O completion port semantics
67
68 When threads call run(), they first try to execute queued handlers.
69 If the queue is empty and no reactor is running, one thread becomes
70 the reactor and runs epoll_wait. Other threads wait on a condition
71 variable until handlers are available.
72
73 @par Thread Safety
74 All public member functions are thread-safe.
75 */
76 class BOOST_COROSIO_DECL epoll_scheduler final
77 : public native_scheduler
78 , public capy::execution_context::service
79 {
80 public:
81 using key_type = scheduler;
82
83 /** Construct the scheduler.
84
85 Creates an epoll instance, eventfd for reactor interruption,
86 and timerfd for kernel-managed timer expiry.
87
88 @param ctx Reference to the owning execution_context.
89 @param concurrency_hint Hint for expected thread count (unused).
90 */
91 epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
92
93 /// Destroy the scheduler.
94 ~epoll_scheduler() override;
95
96 epoll_scheduler(epoll_scheduler const&) = delete;
97 epoll_scheduler& operator=(epoll_scheduler const&) = delete;
98
99 void shutdown() override;
100 void post(std::coroutine_handle<> h) const override;
101 void post(scheduler_op* h) const override;
102 bool running_in_this_thread() const noexcept override;
103 void stop() override;
104 bool stopped() const noexcept override;
105 void restart() override;
106 std::size_t run() override;
107 std::size_t run_one() override;
108 std::size_t wait_one(long usec) override;
109 std::size_t poll() override;
110 std::size_t poll_one() override;
111
112 /** Return the epoll file descriptor.
113
114 Used by socket services to register file descriptors
115 for I/O event notification.
116
117 @return The epoll file descriptor.
118 */
119 int epoll_fd() const noexcept
120 {
121 return epoll_fd_;
122 }
123
124 /** Reset the thread's inline completion budget.
125
126 Called at the start of each posted completion handler to
127 grant a fresh budget for speculative inline completions.
128 */
129 void reset_inline_budget() const noexcept;
130
131 /** Consume one unit of inline budget if available.
132
133 @return True if budget was available and consumed.
134 */
135 bool try_consume_inline_budget() const noexcept;
136
137 /** Register a descriptor for persistent monitoring.
138
139 The fd is registered once and stays registered until explicitly
140 deregistered. Events are dispatched via descriptor_state which
141 tracks pending read/write/connect operations.
142
143 @param fd The file descriptor to register.
144 @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
145 */
146 void register_descriptor(int fd, descriptor_state* desc) const;
147
148 /** Deregister a persistently registered descriptor.
149
150 @param fd The file descriptor to deregister.
151 */
152 void deregister_descriptor(int fd) const;
153
154 void work_started() noexcept override;
155 void work_finished() noexcept override;
156
157 /** Offset a forthcoming work_finished from work_cleanup.
158
159 Called by descriptor_state when all I/O returned EAGAIN and no
160 handler will be executed. Must be called from a scheduler thread.
161 */
162 void compensating_work_started() const noexcept;
163
164 /** Drain work from thread context's private queue to global queue.
165
166 Called by thread_context_guard destructor when a thread exits run().
167 Transfers pending work to the global queue under mutex protection.
168
169 @param queue The private queue to drain.
170 @param count Item count for wakeup decisions (wakes other threads if positive).
171 */
172 void drain_thread_queue(op_queue& queue, long count) const;
173
174 /** Post completed operations for deferred invocation.
175
176 If called from a thread running this scheduler, operations go to
177 the thread's private queue (fast path). Otherwise, operations are
178 added to the global queue under mutex and a waiter is signaled.
179
180 @par Preconditions
181 work_started() must have been called for each operation.
182
183 @param ops Queue of operations to post.
184 */
185 void post_deferred_completions(op_queue& ops) const;
186
187 private:
188 struct work_cleanup
189 {
190 epoll_scheduler* scheduler;
191 std::unique_lock<std::mutex>* lock;
192 epoll::scheduler_context* ctx;
193 ~work_cleanup();
194 };
195
196 struct task_cleanup
197 {
198 epoll_scheduler const* scheduler;
199 std::unique_lock<std::mutex>* lock;
200 epoll::scheduler_context* ctx;
201 ~task_cleanup();
202 };
203
204 std::size_t do_one(
205 std::unique_lock<std::mutex>& lock,
206 long timeout_us,
207 epoll::scheduler_context* ctx);
208 void
209 run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx);
210 void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
211 void interrupt_reactor() const;
212 void update_timerfd() const;
213
214 /** Set the signaled state and wake all waiting threads.
215
216 @par Preconditions
217 Mutex must be held.
218
219 @param lock The held mutex lock.
220 */
221 void signal_all(std::unique_lock<std::mutex>& lock) const;
222
223 /** Set the signaled state and wake one waiter if any exist.
224
225 Only unlocks and signals if at least one thread is waiting.
226 Use this when the caller needs to perform a fallback action
227 (such as interrupting the reactor) when no waiters exist.
228
229 @par Preconditions
230 Mutex must be held.
231
232 @param lock The held mutex lock.
233
234 @return `true` if unlocked and signaled, `false` if lock still held.
235 */
236 bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
237
238 /** Set the signaled state, unlock, and wake one waiter if any exist.
239
240 Always unlocks the mutex. Use this when the caller will release
241 the lock regardless of whether a waiter exists.
242
243 @par Preconditions
244 Mutex must be held.
245
246 @param lock The held mutex lock.
247
248 @return `true` if a waiter was signaled, `false` otherwise.
249 */
250 bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
251
252 /** Clear the signaled state before waiting.
253
254 @par Preconditions
255 Mutex must be held.
256 */
257 void clear_signal() const;
258
259 /** Block until the signaled state is set.
260
261 Returns immediately if already signaled (fast-path). Otherwise
262 increments the waiter count, waits on the condition variable,
263 and decrements the waiter count upon waking.
264
265 @par Preconditions
266 Mutex must be held.
267
268 @param lock The held mutex lock.
269 */
270 void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
271
272 /** Block until signaled or timeout expires.
273
274 @par Preconditions
275 Mutex must be held.
276
277 @param lock The held mutex lock.
278 @param timeout_us Maximum time to wait in microseconds.
279 */
280 void wait_for_signal_for(
281 std::unique_lock<std::mutex>& lock, long timeout_us) const;
282
283 int epoll_fd_;
284 int event_fd_; // for interrupting reactor
285 int timer_fd_; // timerfd for kernel-managed timer expiry
286 mutable std::mutex mutex_;
287 mutable std::condition_variable cond_;
288 mutable op_queue completed_ops_;
289 mutable std::atomic<long> outstanding_work_;
290 bool stopped_;
291 bool shutdown_;
292
293 // True while a thread is blocked in epoll_wait. Used by
294 // wake_one_thread_and_unlock and work_finished to know when
295 // an eventfd interrupt is needed instead of a condvar signal.
296 mutable std::atomic<bool> task_running_{false};
297
298 // True when the reactor has been told to do a non-blocking poll
299 // (more handlers queued or poll mode). Prevents redundant eventfd
300 // writes and controls the epoll_wait timeout.
301 mutable bool task_interrupted_ = false;
302
303 // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
304 mutable std::size_t state_ = 0;
305
306 // Edge-triggered eventfd state
307 mutable std::atomic<bool> eventfd_armed_{false};
308
309 // Set when the earliest timer changes; flushed before epoll_wait
310 // blocks. Avoids timerfd_settime syscalls for timers that are
311 // scheduled then cancelled without being waited on.
312 mutable std::atomic<bool> timerfd_stale_{false};
313
314 // Sentinel operation for interleaving reactor runs with handler execution.
315 // Ensures the reactor runs periodically even when handlers are continuously
316 // posted, preventing starvation of I/O events, timers, and signals.
317 struct task_op final : scheduler_op
318 {
319 void operator()() override {}
320 void destroy() override {}
321 };
322 task_op task_op_;
323 };
324
325 //--------------------------------------------------------------------------
326 //
327 // Implementation
328 //
329 //--------------------------------------------------------------------------
330
331 /*
332 epoll Scheduler - Single Reactor Model
333 ======================================
334
335 This scheduler uses a thread coordination strategy to provide handler
336 parallelism and avoid the thundering herd problem.
337 Instead of all threads blocking on epoll_wait(), one thread becomes the
338 "reactor" while others wait on a condition variable for handler work.
339
340 Thread Model
341 ------------
342 - ONE thread runs epoll_wait() at a time (the reactor thread)
343 - OTHER threads wait on cond_ (condition variable) for handlers
344 - When work is posted, exactly one waiting thread wakes via notify_one()
345 - This matches Windows IOCP semantics where N posted items wake N threads
346
347 Event Loop Structure (do_one)
348 -----------------------------
349 1. Lock mutex, try to pop handler from queue
350 2. If got handler: execute it (unlocked), return
351 3. If queue empty and no reactor running: become reactor
352 - Run epoll_wait (unlocked), queue I/O completions, loop back
353 4. If queue empty and reactor running: wait on condvar for work
354
355 The task_running_ flag ensures only one thread owns epoll_wait().
356 After the reactor queues I/O completions, it loops back to try getting
357 a handler, giving priority to handler execution over more I/O polling.
358
359 Signaling State (state_)
360 ------------------------
361 The state_ variable encodes two pieces of information:
362 - Bit 0: signaled flag (1 = signaled, persists until cleared)
363 - Upper bits: waiter count (each waiter adds 2 before blocking)
364
365 This allows efficient coordination:
366 - Signalers only call notify when waiters exist (state_ > 1)
367 - Waiters check if already signaled before blocking (fast-path)
368
369 Wake Coordination (wake_one_thread_and_unlock)
370 ----------------------------------------------
371 When posting work:
372 - If waiters exist (state_ > 1): signal and notify_one()
373 - Else if reactor running: interrupt via eventfd write
374 - Else: no-op (thread will find work when it checks queue)
375
376 This avoids waking threads unnecessarily. With cascading wakes,
377 each handler execution wakes at most one additional thread if
378 more work exists in the queue.
379
380 Work Counting
381 -------------
382 outstanding_work_ tracks pending operations. When it hits zero, run()
383 returns. Each operation increments on start, decrements on completion.
384
385 Timer Integration
386 -----------------
387 Timers are handled by timer_service. The reactor adjusts epoll_wait
388 timeout to wake for the nearest timer expiry. When a new timer is
389 scheduled earlier than current, timer_service calls interrupt_reactor()
390 to re-evaluate the timeout.
391 */
392
393 namespace epoll {
394
395 struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
396 {
397 epoll_scheduler const* key;
398 scheduler_context* next;
399 op_queue private_queue;
400 long private_outstanding_work;
401 int inline_budget;
402 int inline_budget_max;
403 bool unassisted;
404
405 189 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
406 189 : key(k)
407 189 , next(n)
408 189 , private_outstanding_work(0)
409 189 , inline_budget(0)
410 189 , inline_budget_max(2)
411 189 , unassisted(false)
412 {
413 189 }
414 };
415
416 inline thread_local_ptr<scheduler_context> context_stack;
417
418 struct thread_context_guard
419 {
420 scheduler_context frame_;
421
422 189 explicit thread_context_guard(epoll_scheduler const* ctx) noexcept
423 189 : frame_(ctx, context_stack.get())
424 {
425 189 context_stack.set(&frame_);
426 189 }
427
428 189 ~thread_context_guard() noexcept
429 {
430 189 if (!frame_.private_queue.empty())
431 frame_.key->drain_thread_queue(
432 frame_.private_queue, frame_.private_outstanding_work);
433 189 context_stack.set(frame_.next);
434 189 }
435 };
436
437 inline scheduler_context*
438 384278 find_context(epoll_scheduler const* self) noexcept
439 {
440 384278 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
441 382592 if (c->key == self)
442 382592 return c;
443 1686 return nullptr;
444 }
445
446 } // namespace epoll
447
448 inline void
449 52846 epoll_scheduler::reset_inline_budget() const noexcept
450 {
451 52846 if (auto* ctx = epoll::find_context(this))
452 {
453 // Cap when no other thread absorbed queued work. A moderate
454 // cap (4) amortizes scheduling for small buffers while avoiding
455 // bursty I/O that fills socket buffers and stalls large transfers.
456 52846 if (ctx->unassisted)
457 {
458 52846 ctx->inline_budget_max = 4;
459 52846 ctx->inline_budget = 4;
460 52846 return;
461 }
462 // Ramp up when previous cycle fully consumed budget.
463 // Reset on partial consumption (EAGAIN hit or peer got scheduled).
464 if (ctx->inline_budget == 0)
465 ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
466 else if (ctx->inline_budget < ctx->inline_budget_max)
467 ctx->inline_budget_max = 2;
468 ctx->inline_budget = ctx->inline_budget_max;
469 }
470 }
471
472 inline bool
473 241847 epoll_scheduler::try_consume_inline_budget() const noexcept
474 {
475 241847 if (auto* ctx = epoll::find_context(this))
476 {
477 241847 if (ctx->inline_budget > 0)
478 {
479 193546 --ctx->inline_budget;
480 193546 return true;
481 }
482 }
483 48301 return false;
484 }
485
486 inline void
487 38796 descriptor_state::operator()()
488 {
489 38796 is_enqueued_.store(false, std::memory_order_relaxed);
490
491 // Take ownership of impl ref set by close_socket() to prevent
492 // the owning impl from being freed while we're executing
493 38796 auto prevent_impl_destruction = std::move(impl_ref_);
494
495 38796 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
496 38796 if (ev == 0)
497 {
498 scheduler_->compensating_work_started();
499 return;
500 }
501
502 38796 op_queue local_ops;
503
504 38796 int err = 0;
505 38796 if (ev & EPOLLERR)
506 {
507 1 socklen_t len = sizeof(err);
508 1 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
509 err = errno;
510 1 if (err == 0)
511 err = EIO;
512 }
513
514 {
515 38796 std::lock_guard lock(mutex);
516 38796 if (ev & EPOLLIN)
517 {
518 12337 if (read_op)
519 {
520 2172 auto* rd = read_op;
521 2172 if (err)
522 rd->complete(err, 0);
523 else
524 2172 rd->perform_io();
525
526 2172 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
527 {
528 rd->errn = 0;
529 }
530 else
531 {
532 2172 read_op = nullptr;
533 2172 local_ops.push(rd);
534 }
535 }
536 else
537 {
538 10165 read_ready = true;
539 }
540 }
541 38796 if (ev & EPOLLOUT)
542 {
543 36628 bool had_write_op = (connect_op || write_op);
544 36628 if (connect_op)
545 {
546 2172 auto* cn = connect_op;
547 2172 if (err)
548 1 cn->complete(err, 0);
549 else
550 2171 cn->perform_io();
551 2172 connect_op = nullptr;
552 2172 local_ops.push(cn);
553 }
554 36628 if (write_op)
555 {
556 auto* wr = write_op;
557 if (err)
558 wr->complete(err, 0);
559 else
560 wr->perform_io();
561
562 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
563 {
564 wr->errn = 0;
565 }
566 else
567 {
568 write_op = nullptr;
569 local_ops.push(wr);
570 }
571 }
572 36628 if (!had_write_op)
573 34456 write_ready = true;
574 }
575 38796 if (err)
576 {
577 1 if (read_op)
578 {
579 read_op->complete(err, 0);
580 local_ops.push(std::exchange(read_op, nullptr));
581 }
582 1 if (write_op)
583 {
584 write_op->complete(err, 0);
585 local_ops.push(std::exchange(write_op, nullptr));
586 }
587 1 if (connect_op)
588 {
589 connect_op->complete(err, 0);
590 local_ops.push(std::exchange(connect_op, nullptr));
591 }
592 }
593 38796 }
594
595 // Execute first handler inline — the scheduler's work_cleanup
596 // accounts for this as the "consumed" work item
597 38796 scheduler_op* first = local_ops.pop();
598 38796 if (first)
599 {
600 4344 scheduler_->post_deferred_completions(local_ops);
601 4344 (*first)();
602 }
603 else
604 {
605 34452 scheduler_->compensating_work_started();
606 }
607 38796 }
608
609 205 inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
610 205 : epoll_fd_(-1)
611 205 , event_fd_(-1)
612 205 , timer_fd_(-1)
613 205 , outstanding_work_(0)
614 205 , stopped_(false)
615 205 , shutdown_(false)
616 205 , task_running_{false}
617 205 , task_interrupted_(false)
618 410 , state_(0)
619 {
620 205 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
621 205 if (epoll_fd_ < 0)
622 detail::throw_system_error(make_err(errno), "epoll_create1");
623
624 205 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
625 205 if (event_fd_ < 0)
626 {
627 int errn = errno;
628 ::close(epoll_fd_);
629 detail::throw_system_error(make_err(errn), "eventfd");
630 }
631
632 205 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
633 205 if (timer_fd_ < 0)
634 {
635 int errn = errno;
636 ::close(event_fd_);
637 ::close(epoll_fd_);
638 detail::throw_system_error(make_err(errn), "timerfd_create");
639 }
640
641 205 epoll_event ev{};
642 205 ev.events = EPOLLIN | EPOLLET;
643 205 ev.data.ptr = nullptr;
644 205 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
645 {
646 int errn = errno;
647 ::close(timer_fd_);
648 ::close(event_fd_);
649 ::close(epoll_fd_);
650 detail::throw_system_error(make_err(errn), "epoll_ctl");
651 }
652
653 205 epoll_event timer_ev{};
654 205 timer_ev.events = EPOLLIN | EPOLLERR;
655 205 timer_ev.data.ptr = &timer_fd_;
656 205 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
657 {
658 int errn = errno;
659 ::close(timer_fd_);
660 ::close(event_fd_);
661 ::close(epoll_fd_);
662 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
663 }
664
665 205 timer_svc_ = &get_timer_service(ctx, *this);
666 205 timer_svc_->set_on_earliest_changed(
667 2576 timer_service::callback(this, [](void* p) {
668 2371 auto* self = static_cast<epoll_scheduler*>(p);
669 2371 self->timerfd_stale_.store(true, std::memory_order_release);
670 2371 if (self->task_running_.load(std::memory_order_acquire))
671 self->interrupt_reactor();
672 2371 }));
673
674 // Initialize resolver service
675 205 get_resolver_service(ctx, *this);
676
677 // Initialize signal service
678 205 get_signal_service(ctx, *this);
679
680 // Push task sentinel to interleave reactor runs with handler execution
681 205 completed_ops_.push(&task_op_);
682 205 }
683
684 410 inline epoll_scheduler::~epoll_scheduler()
685 {
686 205 if (timer_fd_ >= 0)
687 205 ::close(timer_fd_);
688 205 if (event_fd_ >= 0)
689 205 ::close(event_fd_);
690 205 if (epoll_fd_ >= 0)
691 205 ::close(epoll_fd_);
692 410 }
693
694 inline void
695 205 epoll_scheduler::shutdown()
696 {
697 {
698 205 std::unique_lock lock(mutex_);
699 205 shutdown_ = true;
700
701 439 while (auto* h = completed_ops_.pop())
702 {
703 234 if (h == &task_op_)
704 205 continue;
705 29 lock.unlock();
706 29 h->destroy();
707 29 lock.lock();
708 234 }
709
710 205 signal_all(lock);
711 205 }
712
713 205 outstanding_work_.store(0, std::memory_order_release);
714
715 205 if (event_fd_ >= 0)
716 205 interrupt_reactor();
717 205 }
718
719 inline void
720 4209 epoll_scheduler::post(std::coroutine_handle<> h) const
721 {
722 struct post_handler final : scheduler_op
723 {
724 std::coroutine_handle<> h_;
725
726 4209 explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
727
728 8418 ~post_handler() override = default;
729
730 4209 void operator()() override
731 {
732 4209 auto h = h_;
733 4209 delete this;
734 4209 h.resume();
735 4209 }
736
737 void destroy() override
738 {
739 delete this;
740 }
741 };
742
743 4209 auto ph = std::make_unique<post_handler>(h);
744
745 // Fast path: same thread posts to private queue
746 // Only count locally; work_cleanup batches to global counter
747 4209 if (auto* ctx = epoll::find_context(this))
748 {
749 2549 ++ctx->private_outstanding_work;
750 2549 ctx->private_queue.push(ph.release());
751 2549 return;
752 }
753
754 // Slow path: cross-thread post requires mutex
755 1660 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
756
757 1660 std::unique_lock lock(mutex_);
758 1660 completed_ops_.push(ph.release());
759 1660 wake_one_thread_and_unlock(lock);
760 4209 }
761
762 inline void
763 50924 epoll_scheduler::post(scheduler_op* h) const
764 {
765 // Fast path: same thread posts to private queue
766 // Only count locally; work_cleanup batches to global counter
767 50924 if (auto* ctx = epoll::find_context(this))
768 {
769 50898 ++ctx->private_outstanding_work;
770 50898 ctx->private_queue.push(h);
771 50898 return;
772 }
773
774 // Slow path: cross-thread post requires mutex
775 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
776
777 26 std::unique_lock lock(mutex_);
778 26 completed_ops_.push(h);
779 26 wake_one_thread_and_unlock(lock);
780 26 }
781
782 inline bool
783 682 epoll_scheduler::running_in_this_thread() const noexcept
784 {
785 682 for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next)
786 435 if (c->key == this)
787 435 return true;
788 247 return false;
789 }
790
791 inline void
792 203 epoll_scheduler::stop()
793 {
794 203 std::unique_lock lock(mutex_);
795 203 if (!stopped_)
796 {
797 168 stopped_ = true;
798 168 signal_all(lock);
799 168 interrupt_reactor();
800 }
801 203 }
802
803 inline bool
804 18 epoll_scheduler::stopped() const noexcept
805 {
806 18 std::unique_lock lock(mutex_);
807 36 return stopped_;
808 18 }
809
810 inline void
811 52 epoll_scheduler::restart()
812 {
813 52 std::unique_lock lock(mutex_);
814 52 stopped_ = false;
815 52 }
816
817 inline std::size_t
818 187 epoll_scheduler::run()
819 {
820 374 if (outstanding_work_.load(std::memory_order_acquire) == 0)
821 {
822 30 stop();
823 30 return 0;
824 }
825
826 157 epoll::thread_context_guard ctx(this);
827 157 std::unique_lock lock(mutex_);
828
829 157 std::size_t n = 0;
830 for (;;)
831 {
832 94053 if (!do_one(lock, -1, &ctx.frame_))
833 157 break;
834 93896 if (n != (std::numeric_limits<std::size_t>::max)())
835 93896 ++n;
836 93896 if (!lock.owns_lock())
837 42836 lock.lock();
838 }
839 157 return n;
840 157 }
841
842 inline std::size_t
843 2 epoll_scheduler::run_one()
844 {
845 4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
846 {
847 stop();
848 return 0;
849 }
850
851 2 epoll::thread_context_guard ctx(this);
852 2 std::unique_lock lock(mutex_);
853 2 return do_one(lock, -1, &ctx.frame_);
854 2 }
855
856 inline std::size_t
857 34 epoll_scheduler::wait_one(long usec)
858 {
859 68 if (outstanding_work_.load(std::memory_order_acquire) == 0)
860 {
861 7 stop();
862 7 return 0;
863 }
864
865 27 epoll::thread_context_guard ctx(this);
866 27 std::unique_lock lock(mutex_);
867 27 return do_one(lock, usec, &ctx.frame_);
868 27 }
869
870 inline std::size_t
871 2 epoll_scheduler::poll()
872 {
873 4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
874 {
875 1 stop();
876 1 return 0;
877 }
878
879 1 epoll::thread_context_guard ctx(this);
880 1 std::unique_lock lock(mutex_);
881
882 1 std::size_t n = 0;
883 for (;;)
884 {
885 3 if (!do_one(lock, 0, &ctx.frame_))
886 1 break;
887 2 if (n != (std::numeric_limits<std::size_t>::max)())
888 2 ++n;
889 2 if (!lock.owns_lock())
890 2 lock.lock();
891 }
892 1 return n;
893 1 }
894
895 inline std::size_t
896 4 epoll_scheduler::poll_one()
897 {
898 8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
899 {
900 2 stop();
901 2 return 0;
902 }
903
904 2 epoll::thread_context_guard ctx(this);
905 2 std::unique_lock lock(mutex_);
906 2 return do_one(lock, 0, &ctx.frame_);
907 2 }
908
909 inline void
910 4417 epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
911 {
912 4417 epoll_event ev{};
913 4417 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
914 4417 ev.data.ptr = desc;
915
916 4417 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
917 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
918
919 4417 desc->registered_events = ev.events;
920 4417 desc->fd = fd;
921 4417 desc->scheduler_ = this;
922
923 4417 std::lock_guard lock(desc->mutex);
924 4417 desc->read_ready = false;
925 4417 desc->write_ready = false;
926 4417 }
927
928 inline void
929 4417 epoll_scheduler::deregister_descriptor(int fd) const
930 {
931 4417 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
932 4417 }
933
934 inline void
935 7623 epoll_scheduler::work_started() noexcept
936 {
937 7623 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
938 7623 }
939
940 inline void
941 11685 epoll_scheduler::work_finished() noexcept
942 {
943 23370 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
944 162 stop();
945 11685 }
946
947 inline void
948 34452 epoll_scheduler::compensating_work_started() const noexcept
949 {
950 34452 auto* ctx = epoll::find_context(this);
951 34452 if (ctx)
952 34452 ++ctx->private_outstanding_work;
953 34452 }
954
955 inline void
956 epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const
957 {
958 // Note: outstanding_work_ was already incremented when posting
959 std::unique_lock lock(mutex_);
960 completed_ops_.splice(queue);
961 if (count > 0)
962 maybe_unlock_and_signal_one(lock);
963 }
964
965 inline void
966 4344 epoll_scheduler::post_deferred_completions(op_queue& ops) const
967 {
968 4344 if (ops.empty())
969 4344 return;
970
971 // Fast path: if on scheduler thread, use private queue
972 if (auto* ctx = epoll::find_context(this))
973 {
974 ctx->private_queue.splice(ops);
975 return;
976 }
977
978 // Slow path: add to global queue and wake a thread
979 std::unique_lock lock(mutex_);
980 completed_ops_.splice(ops);
981 wake_one_thread_and_unlock(lock);
982 }
983
984 inline void
985 399 epoll_scheduler::interrupt_reactor() const
986 {
987 // Only write if not already armed to avoid redundant writes
988 399 bool expected = false;
989 399 if (eventfd_armed_.compare_exchange_strong(
990 expected, true, std::memory_order_release,
991 std::memory_order_relaxed))
992 {
993 274 std::uint64_t val = 1;
994 274 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
995 }
996 399 }
997
998 inline void
999 373 epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const
1000 {
1001 373 state_ |= 1;
1002 373 cond_.notify_all();
1003 373 }
1004
1005 inline bool
1006 1686 epoll_scheduler::maybe_unlock_and_signal_one(
1007 std::unique_lock<std::mutex>& lock) const
1008 {
1009 1686 state_ |= 1;
1010 1686 if (state_ > 1)
1011 {
1012 lock.unlock();
1013 cond_.notify_one();
1014 return true;
1015 }
1016 1686 return false;
1017 }
1018
1019 inline bool
1020 120443 epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
1021 {
1022 120443 state_ |= 1;
1023 120443 bool have_waiters = state_ > 1;
1024 120443 lock.unlock();
1025 120443 if (have_waiters)
1026 cond_.notify_one();
1027 120443 return have_waiters;
1028 }
1029
1030 inline void
1031 2 epoll_scheduler::clear_signal() const
1032 {
1033 2 state_ &= ~std::size_t(1);
1034 2 }
1035
1036 inline void
1037 2 epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const
1038 {
1039 4 while ((state_ & 1) == 0)
1040 {
1041 2 state_ += 2;
1042 2 cond_.wait(lock);
1043 2 state_ -= 2;
1044 }
1045 2 }
1046
1047 inline void
1048 epoll_scheduler::wait_for_signal_for(
1049 std::unique_lock<std::mutex>& lock, long timeout_us) const
1050 {
1051 if ((state_ & 1) == 0)
1052 {
1053 state_ += 2;
1054 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
1055 state_ -= 2;
1056 }
1057 }
1058
1059 inline void
1060 1686 epoll_scheduler::wake_one_thread_and_unlock(
1061 std::unique_lock<std::mutex>& lock) const
1062 {
1063 1686 if (maybe_unlock_and_signal_one(lock))
1064 return;
1065
1066 1686 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
1067 {
1068 26 task_interrupted_ = true;
1069 26 lock.unlock();
1070 26 interrupt_reactor();
1071 }
1072 else
1073 {
1074 1660 lock.unlock();
1075 }
1076 }
1077
1078 93929 inline epoll_scheduler::work_cleanup::~work_cleanup()
1079 {
1080 93929 if (ctx)
1081 {
1082 93929 long produced = ctx->private_outstanding_work;
1083 93929 if (produced > 1)
1084 7 scheduler->outstanding_work_.fetch_add(
1085 produced - 1, std::memory_order_relaxed);
1086 93922 else if (produced < 1)
1087 8406 scheduler->work_finished();
1088 93929 ctx->private_outstanding_work = 0;
1089
1090 93929 if (!ctx->private_queue.empty())
1091 {
1092 51071 lock->lock();
1093 51071 scheduler->completed_ops_.splice(ctx->private_queue);
1094 }
1095 }
1096 else
1097 {
1098 scheduler->work_finished();
1099 }
1100 93929 }
1101
1102 62150 inline epoll_scheduler::task_cleanup::~task_cleanup()
1103 {
1104 31075 if (!ctx)
1105 return;
1106
1107 31075 if (ctx->private_outstanding_work > 0)
1108 {
1109 2363 scheduler->outstanding_work_.fetch_add(
1110 2363 ctx->private_outstanding_work, std::memory_order_relaxed);
1111 2363 ctx->private_outstanding_work = 0;
1112 }
1113
1114 31075 if (!ctx->private_queue.empty())
1115 {
1116 2363 if (!lock->owns_lock())
1117 lock->lock();
1118 2363 scheduler->completed_ops_.splice(ctx->private_queue);
1119 }
1120 31075 }
1121
1122 inline void
1123 4722 epoll_scheduler::update_timerfd() const
1124 {
1125 4722 auto nearest = timer_svc_->nearest_expiry();
1126
1127 4722 itimerspec ts{};
1128 4722 int flags = 0;
1129
1130 4722 if (nearest == timer_service::time_point::max())
1131 {
1132 // No timers - disarm by setting to 0 (relative)
1133 }
1134 else
1135 {
1136 4677 auto now = std::chrono::steady_clock::now();
1137 4677 if (nearest <= now)
1138 {
1139 // Use 1ns instead of 0 - zero disarms the timerfd
1140 148 ts.it_value.tv_nsec = 1;
1141 }
1142 else
1143 {
1144 4529 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
1145 4529 nearest - now)
1146 4529 .count();
1147 4529 ts.it_value.tv_sec = nsec / 1000000000;
1148 4529 ts.it_value.tv_nsec = nsec % 1000000000;
1149 // Ensure non-zero to avoid disarming if duration rounds to 0
1150 4529 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
1151 ts.it_value.tv_nsec = 1;
1152 }
1153 }
1154
1155 4722 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
1156 detail::throw_system_error(make_err(errno), "timerfd_settime");
1157 4722 }
1158
1159 inline void
1160 31075 epoll_scheduler::run_task(
1161 std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx)
1162 {
1163 31075 int timeout_ms = task_interrupted_ ? 0 : -1;
1164
1165 31075 if (lock.owns_lock())
1166 4561 lock.unlock();
1167
1168 31075 task_cleanup on_exit{this, &lock, ctx};
1169
1170 // Flush deferred timerfd programming before blocking
1171 31075 if (timerfd_stale_.exchange(false, std::memory_order_acquire))
1172 2359 update_timerfd();
1173
1174 // Event loop runs without mutex held
1175 epoll_event events[128];
1176 31075 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
1177
1178 31075 if (nfds < 0 && errno != EINTR)
1179 detail::throw_system_error(make_err(errno), "epoll_wait");
1180
1181 31075 bool check_timers = false;
1182 31075 op_queue local_ops;
1183
1184 // Process events without holding the mutex
1185 72332 for (int i = 0; i < nfds; ++i)
1186 {
1187 41257 if (events[i].data.ptr == nullptr)
1188 {
1189 std::uint64_t val;
1190 // Mutex released above; analyzer can't track unlock via ref
1191 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1192 69 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
1193 69 eventfd_armed_.store(false, std::memory_order_relaxed);
1194 69 continue;
1195 69 }
1196
1197 41188 if (events[i].data.ptr == &timer_fd_)
1198 {
1199 std::uint64_t expirations;
1200 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1201 [[maybe_unused]] auto r =
1202 2363 ::read(timer_fd_, &expirations, sizeof(expirations));
1203 2363 check_timers = true;
1204 2363 continue;
1205 2363 }
1206
1207 // Deferred I/O: just set ready events and enqueue descriptor
1208 // No per-descriptor mutex locking in reactor hot path!
1209 38825 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
1210 38825 desc->add_ready_events(events[i].events);
1211
1212 // Only enqueue if not already enqueued
1213 38825 bool expected = false;
1214 38825 if (desc->is_enqueued_.compare_exchange_strong(
1215 expected, true, std::memory_order_release,
1216 std::memory_order_relaxed))
1217 {
1218 38825 local_ops.push(desc);
1219 }
1220 }
1221
1222 // Process timers only when timerfd fires
1223 31075 if (check_timers)
1224 {
1225 2363 timer_svc_->process_expired();
1226 2363 update_timerfd();
1227 }
1228
1229 31075 lock.lock();
1230
1231 31075 if (!local_ops.empty())
1232 26068 completed_ops_.splice(local_ops);
1233 31075 }
1234
1235 inline std::size_t
1236 94087 epoll_scheduler::do_one(
1237 std::unique_lock<std::mutex>& lock,
1238 long timeout_us,
1239 epoll::scheduler_context* ctx)
1240 {
1241 for (;;)
1242 {
1243 125164 if (stopped_)
1244 158 return 0;
1245
1246 125006 scheduler_op* op = completed_ops_.pop();
1247
1248 // Handle reactor sentinel - time to poll for I/O
1249 125006 if (op == &task_op_)
1250 {
1251 31075 bool more_handlers = !completed_ops_.empty();
1252
1253 // Nothing to run the reactor for: no pending work to wait on,
1254 // or caller requested a non-blocking poll
1255 35636 if (!more_handlers &&
1256 9122 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1257 timeout_us == 0))
1258 {
1259 completed_ops_.push(&task_op_);
1260 return 0;
1261 }
1262
1263 31075 task_interrupted_ = more_handlers || timeout_us == 0;
1264 31075 task_running_.store(true, std::memory_order_release);
1265
1266 31075 if (more_handlers)
1267 26514 unlock_and_signal_one(lock);
1268
1269 31075 run_task(lock, ctx);
1270
1271 31075 task_running_.store(false, std::memory_order_relaxed);
1272 31075 completed_ops_.push(&task_op_);
1273 31075 continue;
1274 31075 }
1275
1276 // Handle operation
1277 93931 if (op != nullptr)
1278 {
1279 93929 bool more = !completed_ops_.empty();
1280
1281 93929 if (more)
1282 93929 ctx->unassisted = !unlock_and_signal_one(lock);
1283 else
1284 {
1285 ctx->unassisted = false;
1286 lock.unlock();
1287 }
1288
1289 93929 work_cleanup on_exit{this, &lock, ctx};
1290
1291 93929 (*op)();
1292 93929 return 1;
1293 93929 }
1294
1295 // No pending work to wait on, or caller requested non-blocking poll
1296 4 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1297 timeout_us == 0)
1298 return 0;
1299
1300 2 clear_signal();
1301 2 if (timeout_us < 0)
1302 2 wait_for_signal(lock);
1303 else
1304 wait_for_signal_for(lock, timeout_us);
1305 31077 }
1306 }
1307
1308 } // namespace boost::corosio::detail
1309
1310 #endif // BOOST_COROSIO_HAS_EPOLL
1311
1312 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
1313