1 +
//
 
2 +
// Copyright (c) 2026 Steve Gerbino
 
3 +
//
 
4 +
// Distributed under the Boost Software License, Version 1.0. (See accompanying
 
5 +
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 
6 +
//
 
7 +
// Official repository: https://github.com/cppalliance/corosio
 
8 +
//
 
9 +

 
10 +
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
 
11 +
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
 
12 +

 
13 +
#include <boost/corosio/detail/platform.hpp>
 
14 +

 
15 +
#if BOOST_COROSIO_HAS_SELECT
 
16 +

 
17 +
#include <boost/corosio/detail/config.hpp>
 
18 +
#include <boost/corosio/io/io_object.hpp>
 
19 +
#include <boost/corosio/endpoint.hpp>
 
20 +
#include <boost/capy/ex/executor_ref.hpp>
 
21 +
#include <coroutine>
 
22 +
#include <boost/capy/error.hpp>
 
23 +
#include <system_error>
 
24 +

 
25 +
#include <boost/corosio/detail/make_err.hpp>
 
26 +
#include <boost/corosio/detail/dispatch_coro.hpp>
 
27 +
#include <boost/corosio/detail/scheduler_op.hpp>
 
28 +
#include <boost/corosio/detail/endpoint_convert.hpp>
 
29 +

 
30 +
#include <unistd.h>
 
31 +
#include <errno.h>
 
32 +
#include <fcntl.h>
 
33 +

 
34 +
#include <atomic>
 
35 +
#include <cstddef>
 
36 +
#include <memory>
 
37 +
#include <optional>
 
38 +
#include <stop_token>
 
39 +

 
40 +
#include <netinet/in.h>
 
41 +
#include <sys/select.h>
 
42 +
#include <sys/socket.h>
 
43 +
#include <sys/uio.h>
 
44 +

 
45 +
/*
 
46 +
    select Operation State
 
47 +
    ======================
 
48 +

 
49 +
    Each async I/O operation has a corresponding select_op-derived struct that
 
50 +
    holds the operation's state while it's in flight. The socket impl owns
 
51 +
    fixed slots for each operation type (conn_, rd_, wr_), so only one
 
52 +
    operation of each type can be pending per socket at a time.
 
53 +

 
54 +
    This mirrors the epoll_op design for consistency across backends.
 
55 +

 
56 +
    Completion vs Cancellation Race
 
57 +
    -------------------------------
 
58 +
    The `registered` atomic uses a tri-state (unregistered, registering,
 
59 +
    registered) to handle two races: (1) between register_fd() and the
 
60 +
    reactor seeing an event, and (2) between reactor completion and cancel().
 
61 +

 
62 +
    The registering state closes the window where an event could arrive
 
63 +
    after register_fd() but before the boolean was set. The reactor and
 
64 +
    cancel() both treat registering the same as registered when claiming.
 
65 +

 
66 +
    Whoever atomically exchanges to unregistered "claims" the operation
 
67 +
    and is responsible for completing it. The loser sees unregistered and
 
68 +
    does nothing. The initiating thread uses compare_exchange to transition
 
69 +
    from registering to registered; if this fails, the reactor or cancel
 
70 +
    already claimed the op.
 
71 +

 
72 +
    Impl Lifetime Management
 
73 +
    ------------------------
 
74 +
    When cancel() posts an op to the scheduler's ready queue, the socket impl
 
75 +
    might be destroyed before the scheduler processes the op. The `impl_ptr`
 
76 +
    member holds a shared_ptr to the impl, keeping it alive until the op
 
77 +
    completes.
 
78 +

 
79 +
    EOF Detection
 
80 +
    -------------
 
81 +
    For reads, 0 bytes with no error means EOF. But an empty user buffer also
 
82 +
    returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
 
83 +

 
84 +
    SIGPIPE Prevention
 
85 +
    ------------------
 
86 +
    Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
 
87 +
    SIGPIPE when the peer has closed.
 
88 +
*/
 
89 +

 
90 +
namespace boost::corosio::detail {
 
91 +

 
92 +
// Forward declarations for cancellation support
 
93 +
class select_socket;
 
94 +
class select_acceptor;
 
95 +

 
96 +
/** Registration state for async operations.
 
97 +

 
98 +
    Tri-state enum to handle the race between register_fd() and
 
99 +
    run_reactor() seeing an event. Setting REGISTERING before
 
100 +
    calling register_fd() ensures events delivered during the
 
101 +
    registration window are not dropped.
 
102 +
*/
 
103 +
enum class select_registration_state : std::uint8_t
 
104 +
{
 
105 +
    unregistered, ///< Not registered with reactor
 
106 +
    registering,  ///< register_fd() called, not yet confirmed
 
107 +
    registered    ///< Fully registered, ready for events
 
108 +
};
 
109 +

 
110 +
struct select_op : scheduler_op
 
111 +
{
 
112 +
    struct canceller
 
113 +
    {
 
114 +
        select_op* op;
 
115 +
        void operator()() const noexcept;
 
116 +
    };
 
117 +

 
118 +
    std::coroutine_handle<> h;
 
119 +
    capy::executor_ref ex;
 
120 +
    std::error_code* ec_out = nullptr;
 
121 +
    std::size_t* bytes_out  = nullptr;
 
122 +

 
123 +
    int fd                        = -1;
 
124 +
    int errn                      = 0;
 
125 +
    std::size_t bytes_transferred = 0;
 
126 +

 
127 +
    std::atomic<bool> cancelled{false};
 
128 +
    std::atomic<select_registration_state> registered{
 
129 +
        select_registration_state::unregistered};
 
130 +
    std::optional<std::stop_callback<canceller>> stop_cb;
 
131 +

 
132 +
    // Prevents use-after-free when socket is closed with pending ops.
 
133 +
    std::shared_ptr<void> impl_ptr;
 
134 +

 
135 +
    // For stop_token cancellation - pointer to owning socket/acceptor impl.
 
136 +
    select_socket* socket_impl_     = nullptr;
 
137 +
    select_acceptor* acceptor_impl_ = nullptr;
 
138 +

 
139 +
    select_op() = default;
 
140 +

 
141 +
    void reset() noexcept
 
142 +
    {
 
143 +
        fd                = -1;
 
144 +
        errn              = 0;
 
145 +
        bytes_transferred = 0;
 
146 +
        cancelled.store(false, std::memory_order_relaxed);
 
147 +
        registered.store(
 
148 +
            select_registration_state::unregistered, std::memory_order_relaxed);
 
149 +
        impl_ptr.reset();
 
150 +
        socket_impl_   = nullptr;
 
151 +
        acceptor_impl_ = nullptr;
 
152 +
    }
 
153 +

 
154 +
    void operator()() override
 
155 +
    {
 
156 +
        stop_cb.reset();
 
157 +

 
158 +
        if (ec_out)
 
159 +
        {
 
160 +
            if (cancelled.load(std::memory_order_acquire))
 
161 +
                *ec_out = capy::error::canceled;
 
162 +
            else if (errn != 0)
 
163 +
                *ec_out = make_err(errn);
 
164 +
            else if (is_read_operation() && bytes_transferred == 0)
 
165 +
                *ec_out = capy::error::eof;
 
166 +
            else
 
167 +
                *ec_out = {};
 
168 +
        }
 
169 +

 
170 +
        if (bytes_out)
 
171 +
            *bytes_out = bytes_transferred;
 
172 +

 
173 +
        // Move to stack before destroying the frame
 
174 +
        capy::executor_ref saved_ex(ex);
 
175 +
        std::coroutine_handle<> saved_h(h);
 
176 +
        impl_ptr.reset();
 
177 +
        dispatch_coro(saved_ex, saved_h).resume();
 
178 +
    }
 
179 +

 
180 +
    virtual bool is_read_operation() const noexcept
 
181 +
    {
 
182 +
        return false;
 
183 +
    }
 
184 +
    virtual void cancel() noexcept = 0;
 
185 +

 
186 +
    void destroy() override
 
187 +
    {
 
188 +
        stop_cb.reset();
 
189 +
        impl_ptr.reset();
 
190 +
    }
 
191 +

 
192 +
    void request_cancel() noexcept
 
193 +
    {
 
194 +
        cancelled.store(true, std::memory_order_release);
 
195 +
    }
 
196 +

 
197 +
    // NOLINTNEXTLINE(performance-unnecessary-value-param)
 
198 +
    void start(std::stop_token token)
 
199 +
    {
 
200 +
        cancelled.store(false, std::memory_order_release);
 
201 +
        stop_cb.reset();
 
202 +
        socket_impl_   = nullptr;
 
203 +
        acceptor_impl_ = nullptr;
 
204 +

 
205 +
        if (token.stop_possible())
 
206 +
            stop_cb.emplace(token, canceller{this});
 
207 +
    }
 
208 +

 
209 +
    // NOLINTNEXTLINE(performance-unnecessary-value-param)
 
210 +
    void start(std::stop_token token, select_socket* impl)
 
211 +
    {
 
212 +
        cancelled.store(false, std::memory_order_release);
 
213 +
        stop_cb.reset();
 
214 +
        socket_impl_   = impl;
 
215 +
        acceptor_impl_ = nullptr;
 
216 +

 
217 +
        if (token.stop_possible())
 
218 +
            stop_cb.emplace(token, canceller{this});
 
219 +
    }
 
220 +

 
221 +
    // NOLINTNEXTLINE(performance-unnecessary-value-param)
 
222 +
    void start(std::stop_token token, select_acceptor* impl)
 
223 +
    {
 
224 +
        cancelled.store(false, std::memory_order_release);
 
225 +
        stop_cb.reset();
 
226 +
        socket_impl_   = nullptr;
 
227 +
        acceptor_impl_ = impl;
 
228 +

 
229 +
        if (token.stop_possible())
 
230 +
            stop_cb.emplace(token, canceller{this});
 
231 +
    }
 
232 +

 
233 +
    void complete(int err, std::size_t bytes) noexcept
 
234 +
    {
 
235 +
        errn              = err;
 
236 +
        bytes_transferred = bytes;
 
237 +
    }
 
238 +

 
239 +
    virtual void perform_io() noexcept {}
 
240 +
};
 
241 +

 
242 +
struct select_connect_op final : select_op
 
243 +
{
 
244 +
    endpoint target_endpoint;
 
245 +

 
246 +
    void reset() noexcept
 
247 +
    {
 
248 +
        select_op::reset();
 
249 +
        target_endpoint = endpoint{};
 
250 +
    }
 
251 +

 
252 +
    void perform_io() noexcept override
 
253 +
    {
 
254 +
        // connect() completion status is retrieved via SO_ERROR, not return value
 
255 +
        int err       = 0;
 
256 +
        socklen_t len = sizeof(err);
 
257 +
        if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
 
258 +
            err = errno;
 
259 +
        complete(err, 0);
 
260 +
    }
 
261 +

 
262 +
    // Defined in sockets.cpp where select_socket is complete
 
263 +
    void operator()() override;
 
264 +
    void cancel() noexcept override;
 
265 +
};
 
266 +

 
267 +
struct select_read_op final : select_op
 
268 +
{
 
269 +
    static constexpr std::size_t max_buffers = 16;
 
270 +
    iovec iovecs[max_buffers];
 
271 +
    int iovec_count        = 0;
 
272 +
    bool empty_buffer_read = false;
 
273 +

 
274 +
    bool is_read_operation() const noexcept override
 
275 +
    {
 
276 +
        return !empty_buffer_read;
 
277 +
    }
 
278 +

 
279 +
    void reset() noexcept
 
280 +
    {
 
281 +
        select_op::reset();
 
282 +
        iovec_count       = 0;
 
283 +
        empty_buffer_read = false;
 
284 +
    }
 
285 +

 
286 +
    void perform_io() noexcept override
 
287 +
    {
 
288 +
        ssize_t n = ::readv(fd, iovecs, iovec_count);
 
289 +
        if (n >= 0)
 
290 +
            complete(0, static_cast<std::size_t>(n));
 
291 +
        else
 
292 +
            complete(errno, 0);
 
293 +
    }
 
294 +

 
295 +
    void cancel() noexcept override;
 
296 +
};
 
297 +

 
298 +
struct select_write_op final : select_op
 
299 +
{
 
300 +
    static constexpr std::size_t max_buffers = 16;
 
301 +
    iovec iovecs[max_buffers];
 
302 +
    int iovec_count = 0;
 
303 +

 
304 +
    void reset() noexcept
 
305 +
    {
 
306 +
        select_op::reset();
 
307 +
        iovec_count = 0;
 
308 +
    }
 
309 +

 
310 +
    void perform_io() noexcept override
 
311 +
    {
 
312 +
        msghdr msg{};
 
313 +
        msg.msg_iov    = iovecs;
 
314 +
        msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
 
315 +

 
316 +
        ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
 
317 +
        if (n >= 0)
 
318 +
            complete(0, static_cast<std::size_t>(n));
 
319 +
        else
 
320 +
            complete(errno, 0);
 
321 +
    }
 
322 +

 
323 +
    void cancel() noexcept override;
 
324 +
};
 
325 +

 
326 +
struct select_accept_op final : select_op
 
327 +
{
 
328 +
    int accepted_fd                      = -1;
 
329 +
    io_object::implementation* peer_impl = nullptr;
 
330 +
    io_object::implementation** impl_out = nullptr;
 
331 +

 
332 +
    void reset() noexcept
 
333 +
    {
 
334 +
        select_op::reset();
 
335 +
        accepted_fd = -1;
 
336 +
        peer_impl   = nullptr;
 
337 +
        impl_out    = nullptr;
 
338 +
    }
 
339 +

 
340 +
    void perform_io() noexcept override
 
341 +
    {
 
342 +
        sockaddr_in addr{};
 
343 +
        socklen_t addrlen = sizeof(addr);
 
344 +

 
345 +
        // Note: select backend uses accept() + fcntl instead of accept4()
 
346 +
        // for broader POSIX compatibility
 
347 +
        int new_fd = ::accept(fd, reinterpret_cast<sockaddr*>(&addr), &addrlen);
 
348 +

 
349 +
        if (new_fd >= 0)
 
350 +
        {
 
351 +
            // Reject fds that exceed select()'s FD_SETSIZE limit.
 
352 +
            // Better to fail now than during later async operations.
 
353 +
            if (new_fd >= FD_SETSIZE)
 
354 +
            {
 
355 +
                ::close(new_fd);
 
356 +
                complete(EINVAL, 0);
 
357 +
                return;
 
358 +
            }
 
359 +

 
360 +
            // Set non-blocking and close-on-exec flags.
 
361 +
            // A non-blocking socket is essential for the async reactor;
 
362 +
            // if we can't configure it, fail rather than risk blocking.
 
363 +
            int flags = ::fcntl(new_fd, F_GETFL, 0);
 
364 +
            if (flags == -1)
 
365 +
            {
 
366 +
                int err = errno;
 
367 +
                ::close(new_fd);
 
368 +
                complete(err, 0);
 
369 +
                return;
 
370 +
            }
 
371 +

 
372 +
            if (::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
 
373 +
            {
 
374 +
                int err = errno;
 
375 +
                ::close(new_fd);
 
376 +
                complete(err, 0);
 
377 +
                return;
 
378 +
            }
 
379 +

 
380 +
            if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1)
 
381 +
            {
 
382 +
                int err = errno;
 
383 +
                ::close(new_fd);
 
384 +
                complete(err, 0);
 
385 +
                return;
 
386 +
            }
 
387 +

 
388 +
            accepted_fd = new_fd;
 
389 +
            complete(0, 0);
 
390 +
        }
 
391 +
        else
 
392 +
        {
 
393 +
            complete(errno, 0);
 
394 +
        }
 
395 +
    }
 
396 +

 
397 +
    // Defined in acceptors.cpp where select_acceptor is complete
 
398 +
    void operator()() override;
 
399 +
    void cancel() noexcept override;
 
400 +
};
 
401 +

 
402 +
} // namespace boost::corosio::detail
 
403 +

 
404 +
#endif // BOOST_COROSIO_HAS_SELECT
 
405 +

 
406 +
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP