1 +
//
 
2 +
// Copyright (c) 2026 Steve Gerbino
 
3 +
//
 
4 +
// Distributed under the Boost Software License, Version 1.0. (See accompanying
 
5 +
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 
6 +
//
 
7 +
// Official repository: https://github.com/cppalliance/corosio
 
8 +
//
 
9 +

 
10 +
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
 
11 +
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
 
12 +

 
13 +
#include <boost/corosio/detail/platform.hpp>
 
14 +

 
15 +
#if BOOST_COROSIO_HAS_EPOLL
 
16 +

 
17 +
#include <boost/corosio/detail/config.hpp>
 
18 +
#include <boost/capy/ex/execution_context.hpp>
 
19 +

 
20 +
#include <boost/corosio/native/native_scheduler.hpp>
 
21 +
#include <boost/corosio/detail/scheduler_op.hpp>
 
22 +

 
23 +
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
 
24 +
#include <boost/corosio/detail/timer_service.hpp>
 
25 +
#include <boost/corosio/detail/make_err.hpp>
 
26 +
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
 
27 +
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
 
28 +

 
29 +
#include <boost/corosio/detail/except.hpp>
 
30 +
#include <boost/corosio/detail/thread_local_ptr.hpp>
 
31 +

 
32 +
#include <atomic>
 
33 +
#include <chrono>
 
34 +
#include <condition_variable>
 
35 +
#include <cstddef>
 
36 +
#include <cstdint>
 
37 +
#include <limits>
 
38 +
#include <mutex>
 
39 +
#include <utility>
 
40 +

 
41 +
#include <errno.h>
 
42 +
#include <fcntl.h>
 
43 +
#include <sys/epoll.h>
 
44 +
#include <sys/eventfd.h>
 
45 +
#include <sys/socket.h>
 
46 +
#include <sys/timerfd.h>
 
47 +
#include <unistd.h>
 
48 +

 
49 +
namespace boost::corosio::detail {
 
50 +

 
51 +
struct epoll_op;
 
52 +
struct descriptor_state;
 
53 +
namespace epoll {
 
54 +
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context;
 
55 +
} // namespace epoll
 
56 +

 
57 +
/** Linux scheduler using epoll for I/O multiplexing.
 
58 +

 
59 +
    This scheduler implements the scheduler interface using Linux epoll
 
60 +
    for efficient I/O event notification. It uses a single reactor model
 
61 +
    where one thread runs epoll_wait while other threads
 
62 +
    wait on a condition variable for handler work. This design provides:
 
63 +

 
64 +
    - Handler parallelism: N posted handlers can execute on N threads
 
65 +
    - No thundering herd: condition_variable wakes exactly one thread
 
66 +
    - IOCP parity: Behavior matches Windows I/O completion port semantics
 
67 +

 
68 +
    When threads call run(), they first try to execute queued handlers.
 
69 +
    If the queue is empty and no reactor is running, one thread becomes
 
70 +
    the reactor and runs epoll_wait. Other threads wait on a condition
 
71 +
    variable until handlers are available.
 
72 +

 
73 +
    @par Thread Safety
 
74 +
    All public member functions are thread-safe.
 
75 +
*/
 
76 +
class BOOST_COROSIO_DECL epoll_scheduler final
 
77 +
    : public native_scheduler
 
78 +
    , public capy::execution_context::service
 
79 +
{
 
80 +
public:
 
81 +
    using key_type = scheduler;
 
82 +

 
83 +
    /** Construct the scheduler.
 
84 +

 
85 +
        Creates an epoll instance, eventfd for reactor interruption,
 
86 +
        and timerfd for kernel-managed timer expiry.
 
87 +

 
88 +
        @param ctx Reference to the owning execution_context.
 
89 +
        @param concurrency_hint Hint for expected thread count (unused).
 
90 +
    */
 
91 +
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
 
92 +

 
93 +
    /// Destroy the scheduler.
 
94 +
    ~epoll_scheduler() override;
 
95 +

 
96 +
    epoll_scheduler(epoll_scheduler const&)            = delete;
 
97 +
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
 
98 +

 
99 +
    void shutdown() override;
 
100 +
    void post(std::coroutine_handle<> h) const override;
 
101 +
    void post(scheduler_op* h) const override;
 
102 +
    bool running_in_this_thread() const noexcept override;
 
103 +
    void stop() override;
 
104 +
    bool stopped() const noexcept override;
 
105 +
    void restart() override;
 
106 +
    std::size_t run() override;
 
107 +
    std::size_t run_one() override;
 
108 +
    std::size_t wait_one(long usec) override;
 
109 +
    std::size_t poll() override;
 
110 +
    std::size_t poll_one() override;
 
111 +

 
112 +
    /** Return the epoll file descriptor.
 
113 +

 
114 +
        Used by socket services to register file descriptors
 
115 +
        for I/O event notification.
 
116 +

 
117 +
        @return The epoll file descriptor.
 
118 +
    */
 
119 +
    int epoll_fd() const noexcept
 
120 +
    {
 
121 +
        return epoll_fd_;
 
122 +
    }
 
123 +

 
124 +
    /** Reset the thread's inline completion budget.
 
125 +

 
126 +
        Called at the start of each posted completion handler to
 
127 +
        grant a fresh budget for speculative inline completions.
 
128 +
    */
 
129 +
    void reset_inline_budget() const noexcept;
 
130 +

 
131 +
    /** Consume one unit of inline budget if available.
 
132 +

 
133 +
        @return True if budget was available and consumed.
 
134 +
    */
 
135 +
    bool try_consume_inline_budget() const noexcept;
 
136 +

 
137 +
    /** Register a descriptor for persistent monitoring.
 
138 +

 
139 +
        The fd is registered once and stays registered until explicitly
 
140 +
        deregistered. Events are dispatched via descriptor_state which
 
141 +
        tracks pending read/write/connect operations.
 
142 +

 
143 +
        @param fd The file descriptor to register.
 
144 +
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
 
145 +
    */
 
146 +
    void register_descriptor(int fd, descriptor_state* desc) const;
 
147 +

 
148 +
    /** Deregister a persistently registered descriptor.
 
149 +

 
150 +
        @param fd The file descriptor to deregister.
 
151 +
    */
 
152 +
    void deregister_descriptor(int fd) const;
 
153 +

 
154 +
    void work_started() noexcept override;
 
155 +
    void work_finished() noexcept override;
 
156 +

 
157 +
    /** Offset a forthcoming work_finished from work_cleanup.
 
158 +

 
159 +
        Called by descriptor_state when all I/O returned EAGAIN and no
 
160 +
        handler will be executed. Must be called from a scheduler thread.
 
161 +
    */
 
162 +
    void compensating_work_started() const noexcept;
 
163 +

 
164 +
    /** Drain work from thread context's private queue to global queue.
 
165 +

 
166 +
        Called by thread_context_guard destructor when a thread exits run().
 
167 +
        Transfers pending work to the global queue under mutex protection.
 
168 +

 
169 +
        @param queue The private queue to drain.
 
170 +
        @param count Item count for wakeup decisions (wakes other threads if positive).
 
171 +
    */
 
172 +
    void drain_thread_queue(op_queue& queue, long count) const;
 
173 +

 
174 +
    /** Post completed operations for deferred invocation.
 
175 +

 
176 +
        If called from a thread running this scheduler, operations go to
 
177 +
        the thread's private queue (fast path). Otherwise, operations are
 
178 +
        added to the global queue under mutex and a waiter is signaled.
 
179 +

 
180 +
        @par Preconditions
 
181 +
        work_started() must have been called for each operation.
 
182 +

 
183 +
        @param ops Queue of operations to post.
 
184 +
    */
 
185 +
    void post_deferred_completions(op_queue& ops) const;
 
186 +

 
187 +
private:
 
188 +
    struct work_cleanup
 
189 +
    {
 
190 +
        epoll_scheduler* scheduler;
 
191 +
        std::unique_lock<std::mutex>* lock;
 
192 +
        epoll::scheduler_context* ctx;
 
193 +
        ~work_cleanup();
 
194 +
    };
 
195 +

 
196 +
    struct task_cleanup
 
197 +
    {
 
198 +
        epoll_scheduler const* scheduler;
 
199 +
        std::unique_lock<std::mutex>* lock;
 
200 +
        epoll::scheduler_context* ctx;
 
201 +
        ~task_cleanup();
 
202 +
    };
 
203 +

 
204 +
    std::size_t do_one(
 
205 +
        std::unique_lock<std::mutex>& lock,
 
206 +
        long timeout_us,
 
207 +
        epoll::scheduler_context* ctx);
 
208 +
    void
 
209 +
    run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx);
 
210 +
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
 
211 +
    void interrupt_reactor() const;
 
212 +
    void update_timerfd() const;
 
213 +

 
214 +
    /** Set the signaled state and wake all waiting threads.
 
215 +

 
216 +
        @par Preconditions
 
217 +
        Mutex must be held.
 
218 +

 
219 +
        @param lock The held mutex lock.
 
220 +
    */
 
221 +
    void signal_all(std::unique_lock<std::mutex>& lock) const;
 
222 +

 
223 +
    /** Set the signaled state and wake one waiter if any exist.
 
224 +

 
225 +
        Only unlocks and signals if at least one thread is waiting.
 
226 +
        Use this when the caller needs to perform a fallback action
 
227 +
        (such as interrupting the reactor) when no waiters exist.
 
228 +

 
229 +
        @par Preconditions
 
230 +
        Mutex must be held.
 
231 +

 
232 +
        @param lock The held mutex lock.
 
233 +

 
234 +
        @return `true` if unlocked and signaled, `false` if lock still held.
 
235 +
    */
 
236 +
    bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
 
237 +

 
238 +
    /** Set the signaled state, unlock, and wake one waiter if any exist.
 
239 +

 
240 +
        Always unlocks the mutex. Use this when the caller will release
 
241 +
        the lock regardless of whether a waiter exists.
 
242 +

 
243 +
        @par Preconditions
 
244 +
        Mutex must be held.
 
245 +

 
246 +
        @param lock The held mutex lock.
 
247 +

 
248 +
        @return `true` if a waiter was signaled, `false` otherwise.
 
249 +
    */
 
250 +
    bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
 
251 +

 
252 +
    /** Clear the signaled state before waiting.
 
253 +

 
254 +
        @par Preconditions
 
255 +
        Mutex must be held.
 
256 +
    */
 
257 +
    void clear_signal() const;
 
258 +

 
259 +
    /** Block until the signaled state is set.
 
260 +

 
261 +
        Returns immediately if already signaled (fast-path). Otherwise
 
262 +
        increments the waiter count, waits on the condition variable,
 
263 +
        and decrements the waiter count upon waking.
 
264 +

 
265 +
        @par Preconditions
 
266 +
        Mutex must be held.
 
267 +

 
268 +
        @param lock The held mutex lock.
 
269 +
    */
 
270 +
    void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
 
271 +

 
272 +
    /** Block until signaled or timeout expires.
 
273 +

 
274 +
        @par Preconditions
 
275 +
        Mutex must be held.
 
276 +

 
277 +
        @param lock The held mutex lock.
 
278 +
        @param timeout_us Maximum time to wait in microseconds.
 
279 +
    */
 
280 +
    void wait_for_signal_for(
 
281 +
        std::unique_lock<std::mutex>& lock, long timeout_us) const;
 
282 +

 
283 +
    int epoll_fd_;
 
284 +
    int event_fd_; // for interrupting reactor
 
285 +
    int timer_fd_; // timerfd for kernel-managed timer expiry
 
286 +
    mutable std::mutex mutex_;
 
287 +
    mutable std::condition_variable cond_;
 
288 +
    mutable op_queue completed_ops_;
 
289 +
    mutable std::atomic<long> outstanding_work_;
 
290 +
    bool stopped_;
 
291 +
    bool shutdown_;
 
292 +

 
293 +
    // True while a thread is blocked in epoll_wait. Used by
 
294 +
    // wake_one_thread_and_unlock and work_finished to know when
 
295 +
    // an eventfd interrupt is needed instead of a condvar signal.
 
296 +
    mutable std::atomic<bool> task_running_{false};
 
297 +

 
298 +
    // True when the reactor has been told to do a non-blocking poll
 
299 +
    // (more handlers queued or poll mode). Prevents redundant eventfd
 
300 +
    // writes and controls the epoll_wait timeout.
 
301 +
    mutable bool task_interrupted_ = false;
 
302 +

 
303 +
    // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
 
304 +
    mutable std::size_t state_ = 0;
 
305 +

 
306 +
    // Edge-triggered eventfd state
 
307 +
    mutable std::atomic<bool> eventfd_armed_{false};
 
308 +

 
309 +
    // Set when the earliest timer changes; flushed before epoll_wait
 
310 +
    // blocks. Avoids timerfd_settime syscalls for timers that are
 
311 +
    // scheduled then cancelled without being waited on.
 
312 +
    mutable std::atomic<bool> timerfd_stale_{false};
 
313 +

 
314 +
    // Sentinel operation for interleaving reactor runs with handler execution.
 
315 +
    // Ensures the reactor runs periodically even when handlers are continuously
 
316 +
    // posted, preventing starvation of I/O events, timers, and signals.
 
317 +
    struct task_op final : scheduler_op
 
318 +
    {
 
319 +
        void operator()() override {}
 
320 +
        void destroy() override {}
 
321 +
    };
 
322 +
    task_op task_op_;
 
323 +
};
 
324 +

 
325 +
//--------------------------------------------------------------------------
 
326 +
//
 
327 +
// Implementation
 
328 +
//
 
329 +
//--------------------------------------------------------------------------
 
330 +

 
331 +
/*
 
332 +
    epoll Scheduler - Single Reactor Model
 
333 +
    ======================================
 
334 +

 
335 +
    This scheduler uses a thread coordination strategy to provide handler
 
336 +
    parallelism and avoid the thundering herd problem.
 
337 +
    Instead of all threads blocking on epoll_wait(), one thread becomes the
 
338 +
    "reactor" while others wait on a condition variable for handler work.
 
339 +

 
340 +
    Thread Model
 
341 +
    ------------
 
342 +
    - ONE thread runs epoll_wait() at a time (the reactor thread)
 
343 +
    - OTHER threads wait on cond_ (condition variable) for handlers
 
344 +
    - When work is posted, exactly one waiting thread wakes via notify_one()
 
345 +
    - This matches Windows IOCP semantics where N posted items wake N threads
 
346 +

 
347 +
    Event Loop Structure (do_one)
 
348 +
    -----------------------------
 
349 +
    1. Lock mutex, try to pop handler from queue
 
350 +
    2. If got handler: execute it (unlocked), return
 
351 +
    3. If queue empty and no reactor running: become reactor
 
352 +
       - Run epoll_wait (unlocked), queue I/O completions, loop back
 
353 +
    4. If queue empty and reactor running: wait on condvar for work
 
354 +

 
355 +
    The task_running_ flag ensures only one thread owns epoll_wait().
 
356 +
    After the reactor queues I/O completions, it loops back to try getting
 
357 +
    a handler, giving priority to handler execution over more I/O polling.
 
358 +

 
359 +
    Signaling State (state_)
 
360 +
    ------------------------
 
361 +
    The state_ variable encodes two pieces of information:
 
362 +
    - Bit 0: signaled flag (1 = signaled, persists until cleared)
 
363 +
    - Upper bits: waiter count (each waiter adds 2 before blocking)
 
364 +

 
365 +
    This allows efficient coordination:
 
366 +
    - Signalers only call notify when waiters exist (state_ > 1)
 
367 +
    - Waiters check if already signaled before blocking (fast-path)
 
368 +

 
369 +
    Wake Coordination (wake_one_thread_and_unlock)
 
370 +
    ----------------------------------------------
 
371 +
    When posting work:
 
372 +
    - If waiters exist (state_ > 1): signal and notify_one()
 
373 +
    - Else if reactor running: interrupt via eventfd write
 
374 +
    - Else: no-op (thread will find work when it checks queue)
 
375 +

 
376 +
    This avoids waking threads unnecessarily. With cascading wakes,
 
377 +
    each handler execution wakes at most one additional thread if
 
378 +
    more work exists in the queue.
 
379 +

 
380 +
    Work Counting
 
381 +
    -------------
 
382 +
    outstanding_work_ tracks pending operations. When it hits zero, run()
 
383 +
    returns. Each operation increments on start, decrements on completion.
 
384 +

 
385 +
    Timer Integration
 
386 +
    -----------------
 
387 +
    Timers are handled by timer_service. The reactor adjusts epoll_wait
 
388 +
    timeout to wake for the nearest timer expiry. When a new timer is
 
389 +
    scheduled earlier than current, timer_service calls interrupt_reactor()
 
390 +
    to re-evaluate the timeout.
 
391 +
*/
 
392 +

 
393 +
namespace epoll {
 
394 +

 
395 +
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
 
396 +
{
 
397 +
    epoll_scheduler const* key;
 
398 +
    scheduler_context* next;
 
399 +
    op_queue private_queue;
 
400 +
    long private_outstanding_work;
 
401 +
    int inline_budget;
 
402 +
    int inline_budget_max;
 
403 +
    bool unassisted;
 
404 +

 
405 +
    scheduler_context(epoll_scheduler const* k, scheduler_context* n)
 
406 +
        : key(k)
 
407 +
        , next(n)
 
408 +
        , private_outstanding_work(0)
 
409 +
        , inline_budget(0)
 
410 +
        , inline_budget_max(2)
 
411 +
        , unassisted(false)
 
412 +
    {
 
413 +
    }
 
414 +
};
 
415 +

 
416 +
inline thread_local_ptr<scheduler_context> context_stack;
 
417 +

 
418 +
struct thread_context_guard
 
419 +
{
 
420 +
    scheduler_context frame_;
 
421 +

 
422 +
    explicit thread_context_guard(epoll_scheduler const* ctx) noexcept
 
423 +
        : frame_(ctx, context_stack.get())
 
424 +
    {
 
425 +
        context_stack.set(&frame_);
 
426 +
    }
 
427 +

 
428 +
    ~thread_context_guard() noexcept
 
429 +
    {
 
430 +
        if (!frame_.private_queue.empty())
 
431 +
            frame_.key->drain_thread_queue(
 
432 +
                frame_.private_queue, frame_.private_outstanding_work);
 
433 +
        context_stack.set(frame_.next);
 
434 +
    }
 
435 +
};
 
436 +

 
437 +
inline scheduler_context*
 
438 +
find_context(epoll_scheduler const* self) noexcept
 
439 +
{
 
440 +
    for (auto* c = context_stack.get(); c != nullptr; c = c->next)
 
441 +
        if (c->key == self)
 
442 +
            return c;
 
443 +
    return nullptr;
 
444 +
}
 
445 +

 
446 +
} // namespace epoll
 
447 +

 
448 +
inline void
 
449 +
epoll_scheduler::reset_inline_budget() const noexcept
 
450 +
{
 
451 +
    if (auto* ctx = epoll::find_context(this))
 
452 +
    {
 
453 +
        // Cap when no other thread absorbed queued work. A moderate
 
454 +
        // cap (4) amortizes scheduling for small buffers while avoiding
 
455 +
        // bursty I/O that fills socket buffers and stalls large transfers.
 
456 +
        if (ctx->unassisted)
 
457 +
        {
 
458 +
            ctx->inline_budget_max = 4;
 
459 +
            ctx->inline_budget     = 4;
 
460 +
            return;
 
461 +
        }
 
462 +
        // Ramp up when previous cycle fully consumed budget.
 
463 +
        // Reset on partial consumption (EAGAIN hit or peer got scheduled).
 
464 +
        if (ctx->inline_budget == 0)
 
465 +
            ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
 
466 +
        else if (ctx->inline_budget < ctx->inline_budget_max)
 
467 +
            ctx->inline_budget_max = 2;
 
468 +
        ctx->inline_budget = ctx->inline_budget_max;
 
469 +
    }
 
470 +
}
 
471 +

 
472 +
inline bool
 
473 +
epoll_scheduler::try_consume_inline_budget() const noexcept
 
474 +
{
 
475 +
    if (auto* ctx = epoll::find_context(this))
 
476 +
    {
 
477 +
        if (ctx->inline_budget > 0)
 
478 +
        {
 
479 +
            --ctx->inline_budget;
 
480 +
            return true;
 
481 +
        }
 
482 +
    }
 
483 +
    return false;
 
484 +
}
 
485 +

 
486 +
inline void
 
487 +
descriptor_state::operator()()
 
488 +
{
 
489 +
    is_enqueued_.store(false, std::memory_order_relaxed);
 
490 +

 
491 +
    // Take ownership of impl ref set by close_socket() to prevent
 
492 +
    // the owning impl from being freed while we're executing
 
493 +
    auto prevent_impl_destruction = std::move(impl_ref_);
 
494 +

 
495 +
    std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
 
496 +
    if (ev == 0)
 
497 +
    {
 
498 +
        scheduler_->compensating_work_started();
 
499 +
        return;
 
500 +
    }
 
501 +

 
502 +
    op_queue local_ops;
 
503 +

 
504 +
    int err = 0;
 
505 +
    if (ev & EPOLLERR)
 
506 +
    {
 
507 +
        socklen_t len = sizeof(err);
 
508 +
        if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
 
509 +
            err = errno;
 
510 +
        if (err == 0)
 
511 +
            err = EIO;
 
512 +
    }
 
513 +

 
514 +
    {
 
515 +
        std::lock_guard lock(mutex);
 
516 +
        if (ev & EPOLLIN)
 
517 +
        {
 
518 +
            if (read_op)
 
519 +
            {
 
520 +
                auto* rd = read_op;
 
521 +
                if (err)
 
522 +
                    rd->complete(err, 0);
 
523 +
                else
 
524 +
                    rd->perform_io();
 
525 +

 
526 +
                if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
 
527 +
                {
 
528 +
                    rd->errn = 0;
 
529 +
                }
 
530 +
                else
 
531 +
                {
 
532 +
                    read_op = nullptr;
 
533 +
                    local_ops.push(rd);
 
534 +
                }
 
535 +
            }
 
536 +
            else
 
537 +
            {
 
538 +
                read_ready = true;
 
539 +
            }
 
540 +
        }
 
541 +
        if (ev & EPOLLOUT)
 
542 +
        {
 
543 +
            bool had_write_op = (connect_op || write_op);
 
544 +
            if (connect_op)
 
545 +
            {
 
546 +
                auto* cn = connect_op;
 
547 +
                if (err)
 
548 +
                    cn->complete(err, 0);
 
549 +
                else
 
550 +
                    cn->perform_io();
 
551 +
                connect_op = nullptr;
 
552 +
                local_ops.push(cn);
 
553 +
            }
 
554 +
            if (write_op)
 
555 +
            {
 
556 +
                auto* wr = write_op;
 
557 +
                if (err)
 
558 +
                    wr->complete(err, 0);
 
559 +
                else
 
560 +
                    wr->perform_io();
 
561 +

 
562 +
                if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
 
563 +
                {
 
564 +
                    wr->errn = 0;
 
565 +
                }
 
566 +
                else
 
567 +
                {
 
568 +
                    write_op = nullptr;
 
569 +
                    local_ops.push(wr);
 
570 +
                }
 
571 +
            }
 
572 +
            if (!had_write_op)
 
573 +
                write_ready = true;
 
574 +
        }
 
575 +
        if (err)
 
576 +
        {
 
577 +
            if (read_op)
 
578 +
            {
 
579 +
                read_op->complete(err, 0);
 
580 +
                local_ops.push(std::exchange(read_op, nullptr));
 
581 +
            }
 
582 +
            if (write_op)
 
583 +
            {
 
584 +
                write_op->complete(err, 0);
 
585 +
                local_ops.push(std::exchange(write_op, nullptr));
 
586 +
            }
 
587 +
            if (connect_op)
 
588 +
            {
 
589 +
                connect_op->complete(err, 0);
 
590 +
                local_ops.push(std::exchange(connect_op, nullptr));
 
591 +
            }
 
592 +
        }
 
593 +
    }
 
594 +

 
595 +
    // Execute first handler inline — the scheduler's work_cleanup
 
596 +
    // accounts for this as the "consumed" work item
 
597 +
    scheduler_op* first = local_ops.pop();
 
598 +
    if (first)
 
599 +
    {
 
600 +
        scheduler_->post_deferred_completions(local_ops);
 
601 +
        (*first)();
 
602 +
    }
 
603 +
    else
 
604 +
    {
 
605 +
        scheduler_->compensating_work_started();
 
606 +
    }
 
607 +
}
 
608 +

 
609 +
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
 
610 +
    : epoll_fd_(-1)
 
611 +
    , event_fd_(-1)
 
612 +
    , timer_fd_(-1)
 
613 +
    , outstanding_work_(0)
 
614 +
    , stopped_(false)
 
615 +
    , shutdown_(false)
 
616 +
    , task_running_{false}
 
617 +
    , task_interrupted_(false)
 
618 +
    , state_(0)
 
619 +
{
 
620 +
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
 
621 +
    if (epoll_fd_ < 0)
 
622 +
        detail::throw_system_error(make_err(errno), "epoll_create1");
 
623 +

 
624 +
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
 
625 +
    if (event_fd_ < 0)
 
626 +
    {
 
627 +
        int errn = errno;
 
628 +
        ::close(epoll_fd_);
 
629 +
        detail::throw_system_error(make_err(errn), "eventfd");
 
630 +
    }
 
631 +

 
632 +
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
 
633 +
    if (timer_fd_ < 0)
 
634 +
    {
 
635 +
        int errn = errno;
 
636 +
        ::close(event_fd_);
 
637 +
        ::close(epoll_fd_);
 
638 +
        detail::throw_system_error(make_err(errn), "timerfd_create");
 
639 +
    }
 
640 +

 
641 +
    epoll_event ev{};
 
642 +
    ev.events   = EPOLLIN | EPOLLET;
 
643 +
    ev.data.ptr = nullptr;
 
644 +
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
 
645 +
    {
 
646 +
        int errn = errno;
 
647 +
        ::close(timer_fd_);
 
648 +
        ::close(event_fd_);
 
649 +
        ::close(epoll_fd_);
 
650 +
        detail::throw_system_error(make_err(errn), "epoll_ctl");
 
651 +
    }
 
652 +

 
653 +
    epoll_event timer_ev{};
 
654 +
    timer_ev.events   = EPOLLIN | EPOLLERR;
 
655 +
    timer_ev.data.ptr = &timer_fd_;
 
656 +
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
 
657 +
    {
 
658 +
        int errn = errno;
 
659 +
        ::close(timer_fd_);
 
660 +
        ::close(event_fd_);
 
661 +
        ::close(epoll_fd_);
 
662 +
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
 
663 +
    }
 
664 +

 
665 +
    timer_svc_ = &get_timer_service(ctx, *this);
 
666 +
    timer_svc_->set_on_earliest_changed(
 
667 +
        timer_service::callback(this, [](void* p) {
 
668 +
            auto* self = static_cast<epoll_scheduler*>(p);
 
669 +
            self->timerfd_stale_.store(true, std::memory_order_release);
 
670 +
            if (self->task_running_.load(std::memory_order_acquire))
 
671 +
                self->interrupt_reactor();
 
672 +
        }));
 
673 +

 
674 +
    // Initialize resolver service
 
675 +
    get_resolver_service(ctx, *this);
 
676 +

 
677 +
    // Initialize signal service
 
678 +
    get_signal_service(ctx, *this);
 
679 +

 
680 +
    // Push task sentinel to interleave reactor runs with handler execution
 
681 +
    completed_ops_.push(&task_op_);
 
682 +
}
 
683 +

 
684 +
inline epoll_scheduler::~epoll_scheduler()
 
685 +
{
 
686 +
    if (timer_fd_ >= 0)
 
687 +
        ::close(timer_fd_);
 
688 +
    if (event_fd_ >= 0)
 
689 +
        ::close(event_fd_);
 
690 +
    if (epoll_fd_ >= 0)
 
691 +
        ::close(epoll_fd_);
 
692 +
}
 
693 +

 
694 +
inline void
 
695 +
epoll_scheduler::shutdown()
 
696 +
{
 
697 +
    {
 
698 +
        std::unique_lock lock(mutex_);
 
699 +
        shutdown_ = true;
 
700 +

 
701 +
        while (auto* h = completed_ops_.pop())
 
702 +
        {
 
703 +
            if (h == &task_op_)
 
704 +
                continue;
 
705 +
            lock.unlock();
 
706 +
            h->destroy();
 
707 +
            lock.lock();
 
708 +
        }
 
709 +

 
710 +
        signal_all(lock);
 
711 +
    }
 
712 +

 
713 +
    outstanding_work_.store(0, std::memory_order_release);
 
714 +

 
715 +
    if (event_fd_ >= 0)
 
716 +
        interrupt_reactor();
 
717 +
}
 
718 +

 
719 +
inline void
 
720 +
epoll_scheduler::post(std::coroutine_handle<> h) const
 
721 +
{
 
722 +
    struct post_handler final : scheduler_op
 
723 +
    {
 
724 +
        std::coroutine_handle<> h_;
 
725 +

 
726 +
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
 
727 +

 
728 +
        ~post_handler() override = default;
 
729 +

 
730 +
        void operator()() override
 
731 +
        {
 
732 +
            auto h = h_;
 
733 +
            delete this;
 
734 +
            h.resume();
 
735 +
        }
 
736 +

 
737 +
        void destroy() override
 
738 +
        {
 
739 +
            delete this;
 
740 +
        }
 
741 +
    };
 
742 +

 
743 +
    auto ph = std::make_unique<post_handler>(h);
 
744 +

 
745 +
    // Fast path: same thread posts to private queue
 
746 +
    // Only count locally; work_cleanup batches to global counter
 
747 +
    if (auto* ctx = epoll::find_context(this))
 
748 +
    {
 
749 +
        ++ctx->private_outstanding_work;
 
750 +
        ctx->private_queue.push(ph.release());
 
751 +
        return;
 
752 +
    }
 
753 +

 
754 +
    // Slow path: cross-thread post requires mutex
 
755 +
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
 
756 +

 
757 +
    std::unique_lock lock(mutex_);
 
758 +
    completed_ops_.push(ph.release());
 
759 +
    wake_one_thread_and_unlock(lock);
 
760 +
}
 
761 +

 
762 +
inline void
 
763 +
epoll_scheduler::post(scheduler_op* h) const
 
764 +
{
 
765 +
    // Fast path: same thread posts to private queue
 
766 +
    // Only count locally; work_cleanup batches to global counter
 
767 +
    if (auto* ctx = epoll::find_context(this))
 
768 +
    {
 
769 +
        ++ctx->private_outstanding_work;
 
770 +
        ctx->private_queue.push(h);
 
771 +
        return;
 
772 +
    }
 
773 +

 
774 +
    // Slow path: cross-thread post requires mutex
 
775 +
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
 
776 +

 
777 +
    std::unique_lock lock(mutex_);
 
778 +
    completed_ops_.push(h);
 
779 +
    wake_one_thread_and_unlock(lock);
 
780 +
}
 
781 +

 
782 +
inline bool
 
783 +
epoll_scheduler::running_in_this_thread() const noexcept
 
784 +
{
 
785 +
    for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next)
 
786 +
        if (c->key == this)
 
787 +
            return true;
 
788 +
    return false;
 
789 +
}
 
790 +

 
791 +
inline void
 
792 +
epoll_scheduler::stop()
 
793 +
{
 
794 +
    std::unique_lock lock(mutex_);
 
795 +
    if (!stopped_)
 
796 +
    {
 
797 +
        stopped_ = true;
 
798 +
        signal_all(lock);
 
799 +
        interrupt_reactor();
 
800 +
    }
 
801 +
}
 
802 +

 
803 +
inline bool
 
804 +
epoll_scheduler::stopped() const noexcept
 
805 +
{
 
806 +
    std::unique_lock lock(mutex_);
 
807 +
    return stopped_;
 
808 +
}
 
809 +

 
810 +
inline void
 
811 +
epoll_scheduler::restart()
 
812 +
{
 
813 +
    std::unique_lock lock(mutex_);
 
814 +
    stopped_ = false;
 
815 +
}
 
816 +

 
817 +
inline std::size_t
 
818 +
epoll_scheduler::run()
 
819 +
{
 
820 +
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
821 +
    {
 
822 +
        stop();
 
823 +
        return 0;
 
824 +
    }
 
825 +

 
826 +
    epoll::thread_context_guard ctx(this);
 
827 +
    std::unique_lock lock(mutex_);
 
828 +

 
829 +
    std::size_t n = 0;
 
830 +
    for (;;)
 
831 +
    {
 
832 +
        if (!do_one(lock, -1, &ctx.frame_))
 
833 +
            break;
 
834 +
        if (n != (std::numeric_limits<std::size_t>::max)())
 
835 +
            ++n;
 
836 +
        if (!lock.owns_lock())
 
837 +
            lock.lock();
 
838 +
    }
 
839 +
    return n;
 
840 +
}
 
841 +

 
842 +
inline std::size_t
 
843 +
epoll_scheduler::run_one()
 
844 +
{
 
845 +
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
846 +
    {
 
847 +
        stop();
 
848 +
        return 0;
 
849 +
    }
 
850 +

 
851 +
    epoll::thread_context_guard ctx(this);
 
852 +
    std::unique_lock lock(mutex_);
 
853 +
    return do_one(lock, -1, &ctx.frame_);
 
854 +
}
 
855 +

 
856 +
inline std::size_t
 
857 +
epoll_scheduler::wait_one(long usec)
 
858 +
{
 
859 +
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
860 +
    {
 
861 +
        stop();
 
862 +
        return 0;
 
863 +
    }
 
864 +

 
865 +
    epoll::thread_context_guard ctx(this);
 
866 +
    std::unique_lock lock(mutex_);
 
867 +
    return do_one(lock, usec, &ctx.frame_);
 
868 +
}
 
869 +

 
870 +
inline std::size_t
 
871 +
epoll_scheduler::poll()
 
872 +
{
 
873 +
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
874 +
    {
 
875 +
        stop();
 
876 +
        return 0;
 
877 +
    }
 
878 +

 
879 +
    epoll::thread_context_guard ctx(this);
 
880 +
    std::unique_lock lock(mutex_);
 
881 +

 
882 +
    std::size_t n = 0;
 
883 +
    for (;;)
 
884 +
    {
 
885 +
        if (!do_one(lock, 0, &ctx.frame_))
 
886 +
            break;
 
887 +
        if (n != (std::numeric_limits<std::size_t>::max)())
 
888 +
            ++n;
 
889 +
        if (!lock.owns_lock())
 
890 +
            lock.lock();
 
891 +
    }
 
892 +
    return n;
 
893 +
}
 
894 +

 
895 +
inline std::size_t
 
896 +
epoll_scheduler::poll_one()
 
897 +
{
 
898 +
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
899 +
    {
 
900 +
        stop();
 
901 +
        return 0;
 
902 +
    }
 
903 +

 
904 +
    epoll::thread_context_guard ctx(this);
 
905 +
    std::unique_lock lock(mutex_);
 
906 +
    return do_one(lock, 0, &ctx.frame_);
 
907 +
}
 
908 +

 
909 +
inline void
 
910 +
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
 
911 +
{
 
912 +
    epoll_event ev{};
 
913 +
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
 
914 +
    ev.data.ptr = desc;
 
915 +

 
916 +
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
 
917 +
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
 
918 +

 
919 +
    desc->registered_events = ev.events;
 
920 +
    desc->fd                = fd;
 
921 +
    desc->scheduler_        = this;
 
922 +

 
923 +
    std::lock_guard lock(desc->mutex);
 
924 +
    desc->read_ready  = false;
 
925 +
    desc->write_ready = false;
 
926 +
}
 
927 +

 
928 +
inline void
 
929 +
epoll_scheduler::deregister_descriptor(int fd) const
 
930 +
{
 
931 +
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
 
932 +
}
 
933 +

 
934 +
inline void
 
935 +
epoll_scheduler::work_started() noexcept
 
936 +
{
 
937 +
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
 
938 +
}
 
939 +

 
940 +
inline void
 
941 +
epoll_scheduler::work_finished() noexcept
 
942 +
{
 
943 +
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
 
944 +
        stop();
 
945 +
}
 
946 +

 
947 +
inline void
 
948 +
epoll_scheduler::compensating_work_started() const noexcept
 
949 +
{
 
950 +
    auto* ctx = epoll::find_context(this);
 
951 +
    if (ctx)
 
952 +
        ++ctx->private_outstanding_work;
 
953 +
}
 
954 +

 
955 +
inline void
 
956 +
epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const
 
957 +
{
 
958 +
    // Note: outstanding_work_ was already incremented when posting
 
959 +
    std::unique_lock lock(mutex_);
 
960 +
    completed_ops_.splice(queue);
 
961 +
    if (count > 0)
 
962 +
        maybe_unlock_and_signal_one(lock);
 
963 +
}
 
964 +

 
965 +
inline void
 
966 +
epoll_scheduler::post_deferred_completions(op_queue& ops) const
 
967 +
{
 
968 +
    if (ops.empty())
 
969 +
        return;
 
970 +

 
971 +
    // Fast path: if on scheduler thread, use private queue
 
972 +
    if (auto* ctx = epoll::find_context(this))
 
973 +
    {
 
974 +
        ctx->private_queue.splice(ops);
 
975 +
        return;
 
976 +
    }
 
977 +

 
978 +
    // Slow path: add to global queue and wake a thread
 
979 +
    std::unique_lock lock(mutex_);
 
980 +
    completed_ops_.splice(ops);
 
981 +
    wake_one_thread_and_unlock(lock);
 
982 +
}
 
983 +

 
984 +
inline void
 
985 +
epoll_scheduler::interrupt_reactor() const
 
986 +
{
 
987 +
    // Only write if not already armed to avoid redundant writes
 
988 +
    bool expected = false;
 
989 +
    if (eventfd_armed_.compare_exchange_strong(
 
990 +
            expected, true, std::memory_order_release,
 
991 +
            std::memory_order_relaxed))
 
992 +
    {
 
993 +
        std::uint64_t val       = 1;
 
994 +
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
 
995 +
    }
 
996 +
}
 
997 +

 
998 +
inline void
 
999 +
epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const
 
1000 +
{
 
1001 +
    state_ |= 1;
 
1002 +
    cond_.notify_all();
 
1003 +
}
 
1004 +

 
1005 +
inline bool
 
1006 +
epoll_scheduler::maybe_unlock_and_signal_one(
 
1007 +
    std::unique_lock<std::mutex>& lock) const
 
1008 +
{
 
1009 +
    state_ |= 1;
 
1010 +
    if (state_ > 1)
 
1011 +
    {
 
1012 +
        lock.unlock();
 
1013 +
        cond_.notify_one();
 
1014 +
        return true;
 
1015 +
    }
 
1016 +
    return false;
 
1017 +
}
 
1018 +

 
1019 +
inline bool
 
1020 +
epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
 
1021 +
{
 
1022 +
    state_ |= 1;
 
1023 +
    bool have_waiters = state_ > 1;
 
1024 +
    lock.unlock();
 
1025 +
    if (have_waiters)
 
1026 +
        cond_.notify_one();
 
1027 +
    return have_waiters;
 
1028 +
}
 
1029 +

 
1030 +
inline void
 
1031 +
epoll_scheduler::clear_signal() const
 
1032 +
{
 
1033 +
    state_ &= ~std::size_t(1);
 
1034 +
}
 
1035 +

 
1036 +
inline void
 
1037 +
epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const
 
1038 +
{
 
1039 +
    while ((state_ & 1) == 0)
 
1040 +
    {
 
1041 +
        state_ += 2;
 
1042 +
        cond_.wait(lock);
 
1043 +
        state_ -= 2;
 
1044 +
    }
 
1045 +
}
 
1046 +

 
1047 +
inline void
 
1048 +
epoll_scheduler::wait_for_signal_for(
 
1049 +
    std::unique_lock<std::mutex>& lock, long timeout_us) const
 
1050 +
{
 
1051 +
    if ((state_ & 1) == 0)
 
1052 +
    {
 
1053 +
        state_ += 2;
 
1054 +
        cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
 
1055 +
        state_ -= 2;
 
1056 +
    }
 
1057 +
}
 
1058 +

 
1059 +
inline void
 
1060 +
epoll_scheduler::wake_one_thread_and_unlock(
 
1061 +
    std::unique_lock<std::mutex>& lock) const
 
1062 +
{
 
1063 +
    if (maybe_unlock_and_signal_one(lock))
 
1064 +
        return;
 
1065 +

 
1066 +
    if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
 
1067 +
    {
 
1068 +
        task_interrupted_ = true;
 
1069 +
        lock.unlock();
 
1070 +
        interrupt_reactor();
 
1071 +
    }
 
1072 +
    else
 
1073 +
    {
 
1074 +
        lock.unlock();
 
1075 +
    }
 
1076 +
}
 
1077 +

 
1078 +
inline epoll_scheduler::work_cleanup::~work_cleanup()
 
1079 +
{
 
1080 +
    if (ctx)
 
1081 +
    {
 
1082 +
        long produced = ctx->private_outstanding_work;
 
1083 +
        if (produced > 1)
 
1084 +
            scheduler->outstanding_work_.fetch_add(
 
1085 +
                produced - 1, std::memory_order_relaxed);
 
1086 +
        else if (produced < 1)
 
1087 +
            scheduler->work_finished();
 
1088 +
        ctx->private_outstanding_work = 0;
 
1089 +

 
1090 +
        if (!ctx->private_queue.empty())
 
1091 +
        {
 
1092 +
            lock->lock();
 
1093 +
            scheduler->completed_ops_.splice(ctx->private_queue);
 
1094 +
        }
 
1095 +
    }
 
1096 +
    else
 
1097 +
    {
 
1098 +
        scheduler->work_finished();
 
1099 +
    }
 
1100 +
}
 
1101 +

 
1102 +
inline epoll_scheduler::task_cleanup::~task_cleanup()
 
1103 +
{
 
1104 +
    if (!ctx)
 
1105 +
        return;
 
1106 +

 
1107 +
    if (ctx->private_outstanding_work > 0)
 
1108 +
    {
 
1109 +
        scheduler->outstanding_work_.fetch_add(
 
1110 +
            ctx->private_outstanding_work, std::memory_order_relaxed);
 
1111 +
        ctx->private_outstanding_work = 0;
 
1112 +
    }
 
1113 +

 
1114 +
    if (!ctx->private_queue.empty())
 
1115 +
    {
 
1116 +
        if (!lock->owns_lock())
 
1117 +
            lock->lock();
 
1118 +
        scheduler->completed_ops_.splice(ctx->private_queue);
 
1119 +
    }
 
1120 +
}
 
1121 +

 
1122 +
inline void
 
1123 +
epoll_scheduler::update_timerfd() const
 
1124 +
{
 
1125 +
    auto nearest = timer_svc_->nearest_expiry();
 
1126 +

 
1127 +
    itimerspec ts{};
 
1128 +
    int flags = 0;
 
1129 +

 
1130 +
    if (nearest == timer_service::time_point::max())
 
1131 +
    {
 
1132 +
        // No timers - disarm by setting to 0 (relative)
 
1133 +
    }
 
1134 +
    else
 
1135 +
    {
 
1136 +
        auto now = std::chrono::steady_clock::now();
 
1137 +
        if (nearest <= now)
 
1138 +
        {
 
1139 +
            // Use 1ns instead of 0 - zero disarms the timerfd
 
1140 +
            ts.it_value.tv_nsec = 1;
 
1141 +
        }
 
1142 +
        else
 
1143 +
        {
 
1144 +
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
 
1145 +
                            nearest - now)
 
1146 +
                            .count();
 
1147 +
            ts.it_value.tv_sec  = nsec / 1000000000;
 
1148 +
            ts.it_value.tv_nsec = nsec % 1000000000;
 
1149 +
            // Ensure non-zero to avoid disarming if duration rounds to 0
 
1150 +
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
 
1151 +
                ts.it_value.tv_nsec = 1;
 
1152 +
        }
 
1153 +
    }
 
1154 +

 
1155 +
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
 
1156 +
        detail::throw_system_error(make_err(errno), "timerfd_settime");
 
1157 +
}
 
1158 +

 
1159 +
inline void
 
1160 +
epoll_scheduler::run_task(
 
1161 +
    std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx)
 
1162 +
{
 
1163 +
    int timeout_ms = task_interrupted_ ? 0 : -1;
 
1164 +

 
1165 +
    if (lock.owns_lock())
 
1166 +
        lock.unlock();
 
1167 +

 
1168 +
    task_cleanup on_exit{this, &lock, ctx};
 
1169 +

 
1170 +
    // Flush deferred timerfd programming before blocking
 
1171 +
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
 
1172 +
        update_timerfd();
 
1173 +

 
1174 +
    // Event loop runs without mutex held
 
1175 +
    epoll_event events[128];
 
1176 +
    int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
 
1177 +

 
1178 +
    if (nfds < 0 && errno != EINTR)
 
1179 +
        detail::throw_system_error(make_err(errno), "epoll_wait");
 
1180 +

 
1181 +
    bool check_timers = false;
 
1182 +
    op_queue local_ops;
 
1183 +

 
1184 +
    // Process events without holding the mutex
 
1185 +
    for (int i = 0; i < nfds; ++i)
 
1186 +
    {
 
1187 +
        if (events[i].data.ptr == nullptr)
 
1188 +
        {
 
1189 +
            std::uint64_t val;
 
1190 +
            // Mutex released above; analyzer can't track unlock via ref
 
1191 +
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
 
1192 +
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
 
1193 +
            eventfd_armed_.store(false, std::memory_order_relaxed);
 
1194 +
            continue;
 
1195 +
        }
 
1196 +

 
1197 +
        if (events[i].data.ptr == &timer_fd_)
 
1198 +
        {
 
1199 +
            std::uint64_t expirations;
 
1200 +
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
 
1201 +
            [[maybe_unused]] auto r =
 
1202 +
                ::read(timer_fd_, &expirations, sizeof(expirations));
 
1203 +
            check_timers = true;
 
1204 +
            continue;
 
1205 +
        }
 
1206 +

 
1207 +
        // Deferred I/O: just set ready events and enqueue descriptor
 
1208 +
        // No per-descriptor mutex locking in reactor hot path!
 
1209 +
        auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
 
1210 +
        desc->add_ready_events(events[i].events);
 
1211 +

 
1212 +
        // Only enqueue if not already enqueued
 
1213 +
        bool expected = false;
 
1214 +
        if (desc->is_enqueued_.compare_exchange_strong(
 
1215 +
                expected, true, std::memory_order_release,
 
1216 +
                std::memory_order_relaxed))
 
1217 +
        {
 
1218 +
            local_ops.push(desc);
 
1219 +
        }
 
1220 +
    }
 
1221 +

 
1222 +
    // Process timers only when timerfd fires
 
1223 +
    if (check_timers)
 
1224 +
    {
 
1225 +
        timer_svc_->process_expired();
 
1226 +
        update_timerfd();
 
1227 +
    }
 
1228 +

 
1229 +
    lock.lock();
 
1230 +

 
1231 +
    if (!local_ops.empty())
 
1232 +
        completed_ops_.splice(local_ops);
 
1233 +
}
 
1234 +

 
1235 +
inline std::size_t
 
1236 +
epoll_scheduler::do_one(
 
1237 +
    std::unique_lock<std::mutex>& lock,
 
1238 +
    long timeout_us,
 
1239 +
    epoll::scheduler_context* ctx)
 
1240 +
{
 
1241 +
    for (;;)
 
1242 +
    {
 
1243 +
        if (stopped_)
 
1244 +
            return 0;
 
1245 +

 
1246 +
        scheduler_op* op = completed_ops_.pop();
 
1247 +

 
1248 +
        // Handle reactor sentinel - time to poll for I/O
 
1249 +
        if (op == &task_op_)
 
1250 +
        {
 
1251 +
            bool more_handlers = !completed_ops_.empty();
 
1252 +

 
1253 +
            // Nothing to run the reactor for: no pending work to wait on,
 
1254 +
            // or caller requested a non-blocking poll
 
1255 +
            if (!more_handlers &&
 
1256 +
                (outstanding_work_.load(std::memory_order_acquire) == 0 ||
 
1257 +
                 timeout_us == 0))
 
1258 +
            {
 
1259 +
                completed_ops_.push(&task_op_);
 
1260 +
                return 0;
 
1261 +
            }
 
1262 +

 
1263 +
            task_interrupted_ = more_handlers || timeout_us == 0;
 
1264 +
            task_running_.store(true, std::memory_order_release);
 
1265 +

 
1266 +
            if (more_handlers)
 
1267 +
                unlock_and_signal_one(lock);
 
1268 +

 
1269 +
            run_task(lock, ctx);
 
1270 +

 
1271 +
            task_running_.store(false, std::memory_order_relaxed);
 
1272 +
            completed_ops_.push(&task_op_);
 
1273 +
            continue;
 
1274 +
        }
 
1275 +

 
1276 +
        // Handle operation
 
1277 +
        if (op != nullptr)
 
1278 +
        {
 
1279 +
            bool more = !completed_ops_.empty();
 
1280 +

 
1281 +
            if (more)
 
1282 +
                ctx->unassisted = !unlock_and_signal_one(lock);
 
1283 +
            else
 
1284 +
            {
 
1285 +
                ctx->unassisted = false;
 
1286 +
                lock.unlock();
 
1287 +
            }
 
1288 +

 
1289 +
            work_cleanup on_exit{this, &lock, ctx};
 
1290 +

 
1291 +
            (*op)();
 
1292 +
            return 1;
 
1293 +
        }
 
1294 +

 
1295 +
        // No pending work to wait on, or caller requested non-blocking poll
 
1296 +
        if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
 
1297 +
            timeout_us == 0)
 
1298 +
            return 0;
 
1299 +

 
1300 +
        clear_signal();
 
1301 +
        if (timeout_us < 0)
 
1302 +
            wait_for_signal(lock);
 
1303 +
        else
 
1304 +
            wait_for_signal_for(lock, timeout_us);
 
1305 +
    }
 
1306 +
}
 
1307 +

 
1308 +
} // namespace boost::corosio::detail
 
1309 +

 
1310 +
#endif // BOOST_COROSIO_HAS_EPOLL
 
1311 +

 
1312 +
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP