1 +
//
 
2 +
// Copyright (c) 2026 Steve Gerbino
 
3 +
//
 
4 +
// Distributed under the Boost Software License, Version 1.0. (See accompanying
 
5 +
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 
6 +
//
 
7 +
// Official repository: https://github.com/cppalliance/corosio
 
8 +
//
 
9 +

 
10 +
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
 
11 +
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
 
12 +

 
13 +
#include <boost/corosio/detail/platform.hpp>
 
14 +

 
15 +
#if BOOST_COROSIO_HAS_SELECT
 
16 +

 
17 +
#include <boost/corosio/detail/config.hpp>
 
18 +
#include <boost/capy/ex/execution_context.hpp>
 
19 +

 
20 +
#include <boost/corosio/native/native_scheduler.hpp>
 
21 +
#include <boost/corosio/detail/scheduler_op.hpp>
 
22 +

 
23 +
#include <boost/corosio/native/detail/select/select_op.hpp>
 
24 +
#include <boost/corosio/detail/timer_service.hpp>
 
25 +
#include <boost/corosio/detail/make_err.hpp>
 
26 +
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
 
27 +
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
 
28 +

 
29 +
#include <boost/corosio/detail/except.hpp>
 
30 +
#include <boost/corosio/detail/thread_local_ptr.hpp>
 
31 +

 
32 +
#include <sys/select.h>
 
33 +
#include <sys/socket.h>
 
34 +
#include <unistd.h>
 
35 +
#include <errno.h>
 
36 +
#include <fcntl.h>
 
37 +

 
38 +
#include <algorithm>
 
39 +
#include <atomic>
 
40 +
#include <chrono>
 
41 +
#include <condition_variable>
 
42 +
#include <cstddef>
 
43 +
#include <limits>
 
44 +
#include <mutex>
 
45 +
#include <unordered_map>
 
46 +

 
47 +
namespace boost::corosio::detail {
 
48 +

 
49 +
struct select_op;
 
50 +

 
51 +
/** POSIX scheduler using select() for I/O multiplexing.
 
52 +

 
53 +
    This scheduler implements the scheduler interface using the POSIX select()
 
54 +
    call for I/O event notification. It uses a single reactor model
 
55 +
    where one thread runs select() while other threads wait on a condition
 
56 +
    variable for handler work. This design provides:
 
57 +

 
58 +
    - Handler parallelism: N posted handlers can execute on N threads
 
59 +
    - No thundering herd: condition_variable wakes exactly one thread
 
60 +
    - Portability: Works on all POSIX systems
 
61 +

 
62 +
    The design mirrors epoll_scheduler for behavioral consistency:
 
63 +
    - Same single-reactor thread coordination model
 
64 +
    - Same work counting semantics
 
65 +
    - Same timer integration pattern
 
66 +

 
67 +
    Known Limitations:
 
68 +
    - FD_SETSIZE (~1024) limits maximum concurrent connections
 
69 +
    - O(n) scanning: rebuilds fd_sets each iteration
 
70 +
    - Level-triggered only (no edge-triggered mode)
 
71 +

 
72 +
    @par Thread Safety
 
73 +
    All public member functions are thread-safe.
 
74 +
*/
 
75 +
class BOOST_COROSIO_DECL select_scheduler final
 
76 +
    : public native_scheduler
 
77 +
    , public capy::execution_context::service
 
78 +
{
 
79 +
public:
 
80 +
    using key_type = scheduler;
 
81 +

 
82 +
    /** Construct the scheduler.
 
83 +

 
84 +
        Creates a self-pipe for reactor interruption.
 
85 +

 
86 +
        @param ctx Reference to the owning execution_context.
 
87 +
        @param concurrency_hint Hint for expected thread count (unused).
 
88 +
    */
 
89 +
    select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
 
90 +

 
91 +
    ~select_scheduler() override;
 
92 +

 
93 +
    select_scheduler(select_scheduler const&)            = delete;
 
94 +
    select_scheduler& operator=(select_scheduler const&) = delete;
 
95 +

 
96 +
    void shutdown() override;
 
97 +
    void post(std::coroutine_handle<> h) const override;
 
98 +
    void post(scheduler_op* h) const override;
 
99 +
    bool running_in_this_thread() const noexcept override;
 
100 +
    void stop() override;
 
101 +
    bool stopped() const noexcept override;
 
102 +
    void restart() override;
 
103 +
    std::size_t run() override;
 
104 +
    std::size_t run_one() override;
 
105 +
    std::size_t wait_one(long usec) override;
 
106 +
    std::size_t poll() override;
 
107 +
    std::size_t poll_one() override;
 
108 +

 
109 +
    /** Return the maximum file descriptor value supported.
 
110 +

 
111 +
        Returns FD_SETSIZE - 1, the maximum fd value that can be
 
112 +
        monitored by select(). Operations with fd >= FD_SETSIZE
 
113 +
        will fail with EINVAL.
 
114 +

 
115 +
        @return The maximum supported file descriptor value.
 
116 +
    */
 
117 +
    static constexpr int max_fd() noexcept
 
118 +
    {
 
119 +
        return FD_SETSIZE - 1;
 
120 +
    }
 
121 +

 
122 +
    /** Register a file descriptor for monitoring.
 
123 +

 
124 +
        @param fd The file descriptor to register.
 
125 +
        @param op The operation associated with this fd.
 
126 +
        @param events Event mask: 1 = read, 2 = write, 3 = both.
 
127 +
    */
 
128 +
    void register_fd(int fd, select_op* op, int events) const;
 
129 +

 
130 +
    /** Unregister a file descriptor from monitoring.
 
131 +

 
132 +
        @param fd The file descriptor to unregister.
 
133 +
        @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
 
134 +
    */
 
135 +
    void deregister_fd(int fd, int events) const;
 
136 +

 
137 +
    void work_started() noexcept override;
 
138 +
    void work_finished() noexcept override;
 
139 +

 
140 +
    // Event flags for register_fd/deregister_fd
 
141 +
    static constexpr int event_read  = 1;
 
142 +
    static constexpr int event_write = 2;
 
143 +

 
144 +
private:
 
145 +
    std::size_t do_one(long timeout_us);
 
146 +
    void run_reactor(std::unique_lock<std::mutex>& lock);
 
147 +
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
 
148 +
    void interrupt_reactor() const;
 
149 +
    long calculate_timeout(long requested_timeout_us) const;
 
150 +

 
151 +
    // Self-pipe for interrupting select()
 
152 +
    int pipe_fds_[2]; // [0]=read, [1]=write
 
153 +

 
154 +
    mutable std::mutex mutex_;
 
155 +
    mutable std::condition_variable wakeup_event_;
 
156 +
    mutable op_queue completed_ops_;
 
157 +
    mutable std::atomic<long> outstanding_work_;
 
158 +
    std::atomic<bool> stopped_;
 
159 +
    bool shutdown_;
 
160 +

 
161 +
    // Per-fd state for tracking registered operations
 
162 +
    struct fd_state
 
163 +
    {
 
164 +
        select_op* read_op  = nullptr;
 
165 +
        select_op* write_op = nullptr;
 
166 +
    };
 
167 +
    mutable std::unordered_map<int, fd_state> registered_fds_;
 
168 +
    mutable int max_fd_ = -1;
 
169 +

 
170 +
    // Single reactor thread coordination
 
171 +
    mutable bool reactor_running_     = false;
 
172 +
    mutable bool reactor_interrupted_ = false;
 
173 +
    mutable int idle_thread_count_    = 0;
 
174 +

 
175 +
    // Sentinel operation for interleaving reactor runs with handler execution.
 
176 +
    // Ensures the reactor runs periodically even when handlers are continuously
 
177 +
    // posted, preventing timer starvation.
 
178 +
    struct task_op final : scheduler_op
 
179 +
    {
 
180 +
        void operator()() override {}
 
181 +
        void destroy() override {}
 
182 +
    };
 
183 +
    task_op task_op_;
 
184 +
};
 
185 +

 
186 +
/*
 
187 +
    select Scheduler - Single Reactor Model
 
188 +
    =======================================
 
189 +

 
190 +
    This scheduler mirrors the epoll_scheduler design but uses select() instead
 
191 +
    of epoll for I/O multiplexing. The thread coordination strategy is identical:
 
192 +
    one thread becomes the "reactor" while others wait on a condition variable.
 
193 +

 
194 +
    Thread Model
 
195 +
    ------------
 
196 +
    - ONE thread runs select() at a time (the reactor thread)
 
197 +
    - OTHER threads wait on wakeup_event_ (condition variable) for handlers
 
198 +
    - When work is posted, exactly one waiting thread wakes via notify_one()
 
199 +

 
200 +
    Key Differences from epoll
 
201 +
    --------------------------
 
202 +
    - Uses self-pipe instead of eventfd for interruption (more portable)
 
203 +
    - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
 
204 +
    - FD_SETSIZE limit (~1024 fds on most systems)
 
205 +
    - Level-triggered only (no edge-triggered mode)
 
206 +

 
207 +
    Self-Pipe Pattern
 
208 +
    -----------------
 
209 +
    To interrupt a blocking select() call (e.g., when work is posted or a timer
 
210 +
    expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
 
211 +
    always in the read_fds set, so select() returns immediately. We drain the
 
212 +
    pipe to clear the readable state.
 
213 +

 
214 +
    fd-to-op Mapping
 
215 +
    ----------------
 
216 +
    We use an unordered_map<int, fd_state> to track which operations are
 
217 +
    registered for each fd. This allows O(1) lookup when select() returns
 
218 +
    ready fds. Each fd can have at most one read op and one write op registered.
 
219 +
*/
 
220 +

 
221 +
namespace select {
 
222 +

 
223 +
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
 
224 +
{
 
225 +
    select_scheduler const* key;
 
226 +
    scheduler_context* next;
 
227 +
};
 
228 +

 
229 +
inline thread_local_ptr<scheduler_context> context_stack;
 
230 +

 
231 +
struct thread_context_guard
 
232 +
{
 
233 +
    scheduler_context frame_;
 
234 +

 
235 +
    explicit thread_context_guard(select_scheduler const* ctx) noexcept
 
236 +
        : frame_{ctx, context_stack.get()}
 
237 +
    {
 
238 +
        context_stack.set(&frame_);
 
239 +
    }
 
240 +

 
241 +
    ~thread_context_guard() noexcept
 
242 +
    {
 
243 +
        context_stack.set(frame_.next);
 
244 +
    }
 
245 +
};
 
246 +

 
247 +
struct work_guard
 
248 +
{
 
249 +
    select_scheduler* self;
 
250 +
    ~work_guard()
 
251 +
    {
 
252 +
        self->work_finished();
 
253 +
    }
 
254 +
};
 
255 +

 
256 +
} // namespace select
 
257 +

 
258 +
inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
 
259 +
    : pipe_fds_{-1, -1}
 
260 +
    , outstanding_work_(0)
 
261 +
    , stopped_(false)
 
262 +
    , shutdown_(false)
 
263 +
    , max_fd_(-1)
 
264 +
    , reactor_running_(false)
 
265 +
    , reactor_interrupted_(false)
 
266 +
    , idle_thread_count_(0)
 
267 +
{
 
268 +
    // Create self-pipe for interrupting select()
 
269 +
    if (::pipe(pipe_fds_) < 0)
 
270 +
        detail::throw_system_error(make_err(errno), "pipe");
 
271 +

 
272 +
    // Set both ends to non-blocking and close-on-exec
 
273 +
    for (int i = 0; i < 2; ++i)
 
274 +
    {
 
275 +
        int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
 
276 +
        if (flags == -1)
 
277 +
        {
 
278 +
            int errn = errno;
 
279 +
            ::close(pipe_fds_[0]);
 
280 +
            ::close(pipe_fds_[1]);
 
281 +
            detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
 
282 +
        }
 
283 +
        if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
 
284 +
        {
 
285 +
            int errn = errno;
 
286 +
            ::close(pipe_fds_[0]);
 
287 +
            ::close(pipe_fds_[1]);
 
288 +
            detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
 
289 +
        }
 
290 +
        if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
 
291 +
        {
 
292 +
            int errn = errno;
 
293 +
            ::close(pipe_fds_[0]);
 
294 +
            ::close(pipe_fds_[1]);
 
295 +
            detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
 
296 +
        }
 
297 +
    }
 
298 +

 
299 +
    timer_svc_ = &get_timer_service(ctx, *this);
 
300 +
    timer_svc_->set_on_earliest_changed(
 
301 +
        timer_service::callback(this, [](void* p) {
 
302 +
            static_cast<select_scheduler*>(p)->interrupt_reactor();
 
303 +
        }));
 
304 +

 
305 +
    // Initialize resolver service
 
306 +
    get_resolver_service(ctx, *this);
 
307 +

 
308 +
    // Initialize signal service
 
309 +
    get_signal_service(ctx, *this);
 
310 +

 
311 +
    // Push task sentinel to interleave reactor runs with handler execution
 
312 +
    completed_ops_.push(&task_op_);
 
313 +
}
 
314 +

 
315 +
inline select_scheduler::~select_scheduler()
 
316 +
{
 
317 +
    if (pipe_fds_[0] >= 0)
 
318 +
        ::close(pipe_fds_[0]);
 
319 +
    if (pipe_fds_[1] >= 0)
 
320 +
        ::close(pipe_fds_[1]);
 
321 +
}
 
322 +

 
323 +
inline void
 
324 +
select_scheduler::shutdown()
 
325 +
{
 
326 +
    {
 
327 +
        std::unique_lock lock(mutex_);
 
328 +
        shutdown_ = true;
 
329 +

 
330 +
        while (auto* h = completed_ops_.pop())
 
331 +
        {
 
332 +
            if (h == &task_op_)
 
333 +
                continue;
 
334 +
            lock.unlock();
 
335 +
            h->destroy();
 
336 +
            lock.lock();
 
337 +
        }
 
338 +
    }
 
339 +

 
340 +
    outstanding_work_.store(0, std::memory_order_release);
 
341 +

 
342 +
    if (pipe_fds_[1] >= 0)
 
343 +
        interrupt_reactor();
 
344 +

 
345 +
    wakeup_event_.notify_all();
 
346 +
}
 
347 +

 
348 +
inline void
 
349 +
select_scheduler::post(std::coroutine_handle<> h) const
 
350 +
{
 
351 +
    struct post_handler final : scheduler_op
 
352 +
    {
 
353 +
        std::coroutine_handle<> h_;
 
354 +

 
355 +
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
 
356 +

 
357 +
        ~post_handler() override = default;
 
358 +

 
359 +
        void operator()() override
 
360 +
        {
 
361 +
            auto h = h_;
 
362 +
            delete this;
 
363 +
            h.resume();
 
364 +
        }
 
365 +

 
366 +
        void destroy() override
 
367 +
        {
 
368 +
            delete this;
 
369 +
        }
 
370 +
    };
 
371 +

 
372 +
    auto ph = std::make_unique<post_handler>(h);
 
373 +
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
 
374 +

 
375 +
    std::unique_lock lock(mutex_);
 
376 +
    completed_ops_.push(ph.release());
 
377 +
    wake_one_thread_and_unlock(lock);
 
378 +
}
 
379 +

 
380 +
inline void
 
381 +
select_scheduler::post(scheduler_op* h) const
 
382 +
{
 
383 +
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
 
384 +

 
385 +
    std::unique_lock lock(mutex_);
 
386 +
    completed_ops_.push(h);
 
387 +
    wake_one_thread_and_unlock(lock);
 
388 +
}
 
389 +

 
390 +
inline bool
 
391 +
select_scheduler::running_in_this_thread() const noexcept
 
392 +
{
 
393 +
    for (auto* c = select::context_stack.get(); c != nullptr; c = c->next)
 
394 +
        if (c->key == this)
 
395 +
            return true;
 
396 +
    return false;
 
397 +
}
 
398 +

 
399 +
inline void
 
400 +
select_scheduler::stop()
 
401 +
{
 
402 +
    bool expected = false;
 
403 +
    if (stopped_.compare_exchange_strong(
 
404 +
            expected, true, std::memory_order_release,
 
405 +
            std::memory_order_relaxed))
 
406 +
    {
 
407 +
        // Wake all threads so they notice stopped_ and exit
 
408 +
        {
 
409 +
            std::lock_guard lock(mutex_);
 
410 +
            wakeup_event_.notify_all();
 
411 +
        }
 
412 +
        interrupt_reactor();
 
413 +
    }
 
414 +
}
 
415 +

 
416 +
inline bool
 
417 +
select_scheduler::stopped() const noexcept
 
418 +
{
 
419 +
    return stopped_.load(std::memory_order_acquire);
 
420 +
}
 
421 +

 
422 +
inline void
 
423 +
select_scheduler::restart()
 
424 +
{
 
425 +
    stopped_.store(false, std::memory_order_release);
 
426 +
}
 
427 +

 
428 +
inline std::size_t
 
429 +
select_scheduler::run()
 
430 +
{
 
431 +
    if (stopped_.load(std::memory_order_acquire))
 
432 +
        return 0;
 
433 +

 
434 +
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
435 +
    {
 
436 +
        stop();
 
437 +
        return 0;
 
438 +
    }
 
439 +

 
440 +
    select::thread_context_guard ctx(this);
 
441 +

 
442 +
    std::size_t n = 0;
 
443 +
    while (do_one(-1))
 
444 +
        if (n != (std::numeric_limits<std::size_t>::max)())
 
445 +
            ++n;
 
446 +
    return n;
 
447 +
}
 
448 +

 
449 +
inline std::size_t
 
450 +
select_scheduler::run_one()
 
451 +
{
 
452 +
    if (stopped_.load(std::memory_order_acquire))
 
453 +
        return 0;
 
454 +

 
455 +
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
456 +
    {
 
457 +
        stop();
 
458 +
        return 0;
 
459 +
    }
 
460 +

 
461 +
    select::thread_context_guard ctx(this);
 
462 +
    return do_one(-1);
 
463 +
}
 
464 +

 
465 +
inline std::size_t
 
466 +
select_scheduler::wait_one(long usec)
 
467 +
{
 
468 +
    if (stopped_.load(std::memory_order_acquire))
 
469 +
        return 0;
 
470 +

 
471 +
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
472 +
    {
 
473 +
        stop();
 
474 +
        return 0;
 
475 +
    }
 
476 +

 
477 +
    select::thread_context_guard ctx(this);
 
478 +
    return do_one(usec);
 
479 +
}
 
480 +

 
481 +
inline std::size_t
 
482 +
select_scheduler::poll()
 
483 +
{
 
484 +
    if (stopped_.load(std::memory_order_acquire))
 
485 +
        return 0;
 
486 +

 
487 +
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
488 +
    {
 
489 +
        stop();
 
490 +
        return 0;
 
491 +
    }
 
492 +

 
493 +
    select::thread_context_guard ctx(this);
 
494 +

 
495 +
    std::size_t n = 0;
 
496 +
    while (do_one(0))
 
497 +
        if (n != (std::numeric_limits<std::size_t>::max)())
 
498 +
            ++n;
 
499 +
    return n;
 
500 +
}
 
501 +

 
502 +
inline std::size_t
 
503 +
select_scheduler::poll_one()
 
504 +
{
 
505 +
    if (stopped_.load(std::memory_order_acquire))
 
506 +
        return 0;
 
507 +

 
508 +
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
509 +
    {
 
510 +
        stop();
 
511 +
        return 0;
 
512 +
    }
 
513 +

 
514 +
    select::thread_context_guard ctx(this);
 
515 +
    return do_one(0);
 
516 +
}
 
517 +

 
518 +
inline void
 
519 +
select_scheduler::register_fd(int fd, select_op* op, int events) const
 
520 +
{
 
521 +
    // Validate fd is within select() limits
 
522 +
    if (fd < 0 || fd >= FD_SETSIZE)
 
523 +
        detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
 
524 +

 
525 +
    {
 
526 +
        std::lock_guard lock(mutex_);
 
527 +

 
528 +
        auto& state = registered_fds_[fd];
 
529 +
        if (events & event_read)
 
530 +
            state.read_op = op;
 
531 +
        if (events & event_write)
 
532 +
            state.write_op = op;
 
533 +

 
534 +
        if (fd > max_fd_)
 
535 +
            max_fd_ = fd;
 
536 +
    }
 
537 +

 
538 +
    // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
 
539 +
    // with the newly registered fd.
 
540 +
    interrupt_reactor();
 
541 +
}
 
542 +

 
543 +
inline void
 
544 +
select_scheduler::deregister_fd(int fd, int events) const
 
545 +
{
 
546 +
    std::lock_guard lock(mutex_);
 
547 +

 
548 +
    auto it = registered_fds_.find(fd);
 
549 +
    if (it == registered_fds_.end())
 
550 +
        return;
 
551 +

 
552 +
    if (events & event_read)
 
553 +
        it->second.read_op = nullptr;
 
554 +
    if (events & event_write)
 
555 +
        it->second.write_op = nullptr;
 
556 +

 
557 +
    // Remove entry if both are null
 
558 +
    if (!it->second.read_op && !it->second.write_op)
 
559 +
    {
 
560 +
        registered_fds_.erase(it);
 
561 +

 
562 +
        // Recalculate max_fd_ if needed
 
563 +
        if (fd == max_fd_)
 
564 +
        {
 
565 +
            max_fd_ = pipe_fds_[0]; // At minimum, the pipe read end
 
566 +
            for (auto& [registered_fd, state] : registered_fds_)
 
567 +
            {
 
568 +
                if (registered_fd > max_fd_)
 
569 +
                    max_fd_ = registered_fd;
 
570 +
            }
 
571 +
        }
 
572 +
    }
 
573 +
}
 
574 +

 
575 +
inline void
 
576 +
select_scheduler::work_started() noexcept
 
577 +
{
 
578 +
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
 
579 +
}
 
580 +

 
581 +
inline void
 
582 +
select_scheduler::work_finished() noexcept
 
583 +
{
 
584 +
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
 
585 +
        stop();
 
586 +
}
 
587 +

 
588 +
inline void
 
589 +
select_scheduler::interrupt_reactor() const
 
590 +
{
 
591 +
    char byte               = 1;
 
592 +
    [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
 
593 +
}
 
594 +

 
595 +
inline void
 
596 +
select_scheduler::wake_one_thread_and_unlock(
 
597 +
    std::unique_lock<std::mutex>& lock) const
 
598 +
{
 
599 +
    if (idle_thread_count_ > 0)
 
600 +
    {
 
601 +
        // Idle worker exists - wake it via condvar
 
602 +
        wakeup_event_.notify_one();
 
603 +
        lock.unlock();
 
604 +
    }
 
605 +
    else if (reactor_running_ && !reactor_interrupted_)
 
606 +
    {
 
607 +
        // No idle workers but reactor is running - interrupt it
 
608 +
        reactor_interrupted_ = true;
 
609 +
        lock.unlock();
 
610 +
        interrupt_reactor();
 
611 +
    }
 
612 +
    else
 
613 +
    {
 
614 +
        // No one to wake
 
615 +
        lock.unlock();
 
616 +
    }
 
617 +
}
 
618 +

 
619 +
inline long
 
620 +
select_scheduler::calculate_timeout(long requested_timeout_us) const
 
621 +
{
 
622 +
    if (requested_timeout_us == 0)
 
623 +
        return 0;
 
624 +

 
625 +
    auto nearest = timer_svc_->nearest_expiry();
 
626 +
    if (nearest == timer_service::time_point::max())
 
627 +
        return requested_timeout_us;
 
628 +

 
629 +
    auto now = std::chrono::steady_clock::now();
 
630 +
    if (nearest <= now)
 
631 +
        return 0;
 
632 +

 
633 +
    auto timer_timeout_us =
 
634 +
        std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
 
635 +
            .count();
 
636 +

 
637 +
    // Clamp to [0, LONG_MAX] to prevent truncation on 32-bit long platforms
 
638 +
    constexpr auto long_max =
 
639 +
        static_cast<long long>((std::numeric_limits<long>::max)());
 
640 +
    auto capped_timer_us =
 
641 +
        (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
 
642 +
                              static_cast<long long>(0)),
 
643 +
                   long_max);
 
644 +

 
645 +
    if (requested_timeout_us < 0)
 
646 +
        return static_cast<long>(capped_timer_us);
 
647 +

 
648 +
    // requested_timeout_us is already long, so min() result fits in long
 
649 +
    return static_cast<long>(
 
650 +
        (std::min)(static_cast<long long>(requested_timeout_us),
 
651 +
                   capped_timer_us));
 
652 +
}
 
653 +

 
654 +
inline void
 
655 +
select_scheduler::run_reactor(std::unique_lock<std::mutex>& lock)
 
656 +
{
 
657 +
    // Calculate timeout considering timers, use 0 if interrupted
 
658 +
    long effective_timeout_us =
 
659 +
        reactor_interrupted_ ? 0 : calculate_timeout(-1);
 
660 +

 
661 +
    // Build fd_sets from registered_fds_
 
662 +
    fd_set read_fds, write_fds, except_fds;
 
663 +
    FD_ZERO(&read_fds);
 
664 +
    FD_ZERO(&write_fds);
 
665 +
    FD_ZERO(&except_fds);
 
666 +

 
667 +
    // Always include the interrupt pipe
 
668 +
    FD_SET(pipe_fds_[0], &read_fds);
 
669 +
    int nfds = pipe_fds_[0];
 
670 +

 
671 +
    // Add registered fds
 
672 +
    for (auto& [fd, state] : registered_fds_)
 
673 +
    {
 
674 +
        if (state.read_op)
 
675 +
            FD_SET(fd, &read_fds);
 
676 +
        if (state.write_op)
 
677 +
        {
 
678 +
            FD_SET(fd, &write_fds);
 
679 +
            // Also monitor for errors on connect operations
 
680 +
            FD_SET(fd, &except_fds);
 
681 +
        }
 
682 +
        if (fd > nfds)
 
683 +
            nfds = fd;
 
684 +
    }
 
685 +

 
686 +
    // Convert timeout to timeval
 
687 +
    struct timeval tv;
 
688 +
    struct timeval* tv_ptr = nullptr;
 
689 +
    if (effective_timeout_us >= 0)
 
690 +
    {
 
691 +
        tv.tv_sec  = effective_timeout_us / 1000000;
 
692 +
        tv.tv_usec = effective_timeout_us % 1000000;
 
693 +
        tv_ptr     = &tv;
 
694 +
    }
 
695 +

 
696 +
    lock.unlock();
 
697 +

 
698 +
    int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
 
699 +
    int saved_errno = errno;
 
700 +

 
701 +
    // Process timers outside the lock
 
702 +
    timer_svc_->process_expired();
 
703 +

 
704 +
    if (ready < 0 && saved_errno != EINTR)
 
705 +
        detail::throw_system_error(make_err(saved_errno), "select");
 
706 +

 
707 +
    // Re-acquire lock before modifying completed_ops_
 
708 +
    lock.lock();
 
709 +

 
710 +
    // Drain the interrupt pipe if readable
 
711 +
    if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
 
712 +
    {
 
713 +
        char buf[256];
 
714 +
        while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
 
715 +
        {
 
716 +
        }
 
717 +
    }
 
718 +

 
719 +
    // Process I/O completions
 
720 +
    int completions_queued = 0;
 
721 +
    if (ready > 0)
 
722 +
    {
 
723 +
        // Iterate over registered fds (copy keys to avoid iterator invalidation)
 
724 +
        std::vector<int> fds_to_check;
 
725 +
        fds_to_check.reserve(registered_fds_.size());
 
726 +
        for (auto& [fd, state] : registered_fds_)
 
727 +
            fds_to_check.push_back(fd);
 
728 +

 
729 +
        for (int fd : fds_to_check)
 
730 +
        {
 
731 +
            auto it = registered_fds_.find(fd);
 
732 +
            if (it == registered_fds_.end())
 
733 +
                continue;
 
734 +

 
735 +
            auto& state = it->second;
 
736 +

 
737 +
            // Check for errors (especially for connect operations)
 
738 +
            bool has_error = FD_ISSET(fd, &except_fds);
 
739 +

 
740 +
            // Process read readiness
 
741 +
            if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
 
742 +
            {
 
743 +
                auto* op = state.read_op;
 
744 +
                // Claim the op by exchanging to unregistered. Both registering and
 
745 +
                // registered states mean the op is ours to complete.
 
746 +
                auto prev = op->registered.exchange(
 
747 +
                    select_registration_state::unregistered,
 
748 +
                    std::memory_order_acq_rel);
 
749 +
                if (prev != select_registration_state::unregistered)
 
750 +
                {
 
751 +
                    state.read_op = nullptr;
 
752 +

 
753 +
                    if (has_error)
 
754 +
                    {
 
755 +
                        int errn      = 0;
 
756 +
                        socklen_t len = sizeof(errn);
 
757 +
                        if (::getsockopt(
 
758 +
                                fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
 
759 +
                            errn = errno;
 
760 +
                        if (errn == 0)
 
761 +
                            errn = EIO;
 
762 +
                        op->complete(errn, 0);
 
763 +
                    }
 
764 +
                    else
 
765 +
                    {
 
766 +
                        op->perform_io();
 
767 +
                    }
 
768 +

 
769 +
                    completed_ops_.push(op);
 
770 +
                    ++completions_queued;
 
771 +
                }
 
772 +
            }
 
773 +

 
774 +
            // Process write readiness
 
775 +
            if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
 
776 +
            {
 
777 +
                auto* op = state.write_op;
 
778 +
                // Claim the op by exchanging to unregistered. Both registering and
 
779 +
                // registered states mean the op is ours to complete.
 
780 +
                auto prev = op->registered.exchange(
 
781 +
                    select_registration_state::unregistered,
 
782 +
                    std::memory_order_acq_rel);
 
783 +
                if (prev != select_registration_state::unregistered)
 
784 +
                {
 
785 +
                    state.write_op = nullptr;
 
786 +

 
787 +
                    if (has_error)
 
788 +
                    {
 
789 +
                        int errn      = 0;
 
790 +
                        socklen_t len = sizeof(errn);
 
791 +
                        if (::getsockopt(
 
792 +
                                fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
 
793 +
                            errn = errno;
 
794 +
                        if (errn == 0)
 
795 +
                            errn = EIO;
 
796 +
                        op->complete(errn, 0);
 
797 +
                    }
 
798 +
                    else
 
799 +
                    {
 
800 +
                        op->perform_io();
 
801 +
                    }
 
802 +

 
803 +
                    completed_ops_.push(op);
 
804 +
                    ++completions_queued;
 
805 +
                }
 
806 +
            }
 
807 +

 
808 +
            // Clean up empty entries
 
809 +
            if (!state.read_op && !state.write_op)
 
810 +
                registered_fds_.erase(it);
 
811 +
        }
 
812 +
    }
 
813 +

 
814 +
    if (completions_queued > 0)
 
815 +
    {
 
816 +
        if (completions_queued == 1)
 
817 +
            wakeup_event_.notify_one();
 
818 +
        else
 
819 +
            wakeup_event_.notify_all();
 
820 +
    }
 
821 +
}
 
822 +

 
823 +
inline std::size_t
 
824 +
select_scheduler::do_one(long timeout_us)
 
825 +
{
 
826 +
    std::unique_lock lock(mutex_);
 
827 +

 
828 +
    for (;;)
 
829 +
    {
 
830 +
        if (stopped_.load(std::memory_order_acquire))
 
831 +
            return 0;
 
832 +

 
833 +
        scheduler_op* op = completed_ops_.pop();
 
834 +

 
835 +
        if (op == &task_op_)
 
836 +
        {
 
837 +
            bool more_handlers = !completed_ops_.empty();
 
838 +

 
839 +
            if (!more_handlers)
 
840 +
            {
 
841 +
                if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
842 +
                {
 
843 +
                    completed_ops_.push(&task_op_);
 
844 +
                    return 0;
 
845 +
                }
 
846 +
                if (timeout_us == 0)
 
847 +
                {
 
848 +
                    completed_ops_.push(&task_op_);
 
849 +
                    return 0;
 
850 +
                }
 
851 +
            }
 
852 +

 
853 +
            reactor_interrupted_ = more_handlers || timeout_us == 0;
 
854 +
            reactor_running_     = true;
 
855 +

 
856 +
            if (more_handlers && idle_thread_count_ > 0)
 
857 +
                wakeup_event_.notify_one();
 
858 +

 
859 +
            run_reactor(lock);
 
860 +

 
861 +
            reactor_running_ = false;
 
862 +
            completed_ops_.push(&task_op_);
 
863 +
            continue;
 
864 +
        }
 
865 +

 
866 +
        if (op != nullptr)
 
867 +
        {
 
868 +
            lock.unlock();
 
869 +
            select::work_guard g{this};
 
870 +
            (*op)();
 
871 +
            return 1;
 
872 +
        }
 
873 +

 
874 +
        if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
875 +
            return 0;
 
876 +

 
877 +
        if (timeout_us == 0)
 
878 +
            return 0;
 
879 +

 
880 +
        ++idle_thread_count_;
 
881 +
        if (timeout_us < 0)
 
882 +
            wakeup_event_.wait(lock);
 
883 +
        else
 
884 +
            wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
 
885 +
        --idle_thread_count_;
 
886 +
    }
 
887 +
}
 
888 +

 
889 +
} // namespace boost::corosio::detail
 
890 +

 
891 +
#endif // BOOST_COROSIO_HAS_SELECT
 
892 +

 
893 +
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP