1 +
//
 
2 +
// Copyright (c) 2026 Steve Gerbino
 
3 +
//
 
4 +
// Distributed under the Boost Software License, Version 1.0. (See accompanying
 
5 +
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 
6 +
//
 
7 +
// Official repository: https://github.com/cppalliance/corosio
 
8 +
//
 
9 +

 
10 +
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
 
11 +
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
 
12 +

 
13 +
#include <boost/corosio/detail/platform.hpp>
 
14 +

 
15 +
#if BOOST_COROSIO_HAS_EPOLL
 
16 +

 
17 +
#include <boost/corosio/detail/config.hpp>
 
18 +
#include <boost/corosio/io/io_object.hpp>
 
19 +
#include <boost/corosio/endpoint.hpp>
 
20 +
#include <boost/capy/ex/executor_ref.hpp>
 
21 +
#include <coroutine>
 
22 +
#include <boost/capy/error.hpp>
 
23 +
#include <system_error>
 
24 +

 
25 +
#include <boost/corosio/detail/make_err.hpp>
 
26 +
#include <boost/corosio/detail/dispatch_coro.hpp>
 
27 +
#include <boost/corosio/detail/scheduler_op.hpp>
 
28 +
#include <boost/corosio/detail/endpoint_convert.hpp>
 
29 +

 
30 +
#include <unistd.h>
 
31 +
#include <errno.h>
 
32 +

 
33 +
#include <atomic>
 
34 +
#include <cstddef>
 
35 +
#include <memory>
 
36 +
#include <mutex>
 
37 +
#include <optional>
 
38 +
#include <stop_token>
 
39 +

 
40 +
#include <netinet/in.h>
 
41 +
#include <sys/socket.h>
 
42 +
#include <sys/uio.h>
 
43 +

 
44 +
/*
 
45 +
    epoll Operation State
 
46 +
    =====================
 
47 +

 
48 +
    Each async I/O operation has a corresponding epoll_op-derived struct that
 
49 +
    holds the operation's state while it's in flight. The socket impl owns
 
50 +
    fixed slots for each operation type (conn_, rd_, wr_), so only one
 
51 +
    operation of each type can be pending per socket at a time.
 
52 +

 
53 +
    Persistent Registration
 
54 +
    -----------------------
 
55 +
    File descriptors are registered with epoll once (via descriptor_state) and
 
56 +
    stay registered until closed. The descriptor_state tracks which operations
 
57 +
    are pending (read_op, write_op, connect_op). When an event arrives, the
 
58 +
    reactor dispatches to the appropriate pending operation.
 
59 +

 
60 +
    Impl Lifetime Management
 
61 +
    ------------------------
 
62 +
    When cancel() posts an op to the scheduler's ready queue, the socket impl
 
63 +
    might be destroyed before the scheduler processes the op. The `impl_ptr`
 
64 +
    member holds a shared_ptr to the impl, keeping it alive until the op
 
65 +
    completes. This is set by cancel() and cleared in operator() after the
 
66 +
    coroutine is resumed.
 
67 +

 
68 +
    EOF Detection
 
69 +
    -------------
 
70 +
    For reads, 0 bytes with no error means EOF. But an empty user buffer also
 
71 +
    returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
 
72 +

 
73 +
    SIGPIPE Prevention
 
74 +
    ------------------
 
75 +
    Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
 
76 +
    SIGPIPE when the peer has closed.
 
77 +
*/
 
78 +

 
79 +
namespace boost::corosio::detail {
 
80 +

 
81 +
// Forward declarations
 
82 +
class epoll_socket;
 
83 +
class epoll_acceptor;
 
84 +
struct epoll_op;
 
85 +

 
86 +
// Forward declaration
 
87 +
class epoll_scheduler;
 
88 +

 
89 +
/** Per-descriptor state for persistent epoll registration.
 
90 +

 
91 +
    Tracks pending operations for a file descriptor. The fd is registered
 
92 +
    once with epoll and stays registered until closed.
 
93 +

 
94 +
    This struct extends scheduler_op to support deferred I/O processing.
 
95 +
    When epoll events arrive, the reactor sets ready_events and queues
 
96 +
    this descriptor for processing. When popped from the scheduler queue,
 
97 +
    operator() performs the actual I/O and queues completion handlers.
 
98 +

 
99 +
    @par Deferred I/O Model
 
100 +
    The reactor no longer performs I/O directly. Instead:
 
101 +
    1. Reactor sets ready_events and queues descriptor_state
 
102 +
    2. Scheduler pops descriptor_state and calls operator()
 
103 +
    3. operator() performs I/O under mutex and queues completions
 
104 +

 
105 +
    This eliminates per-descriptor mutex locking from the reactor hot path.
 
106 +

 
107 +
    @par Thread Safety
 
108 +
    The mutex protects operation pointers and ready flags during I/O.
 
109 +
    ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
 
110 +
*/
 
111 +
struct descriptor_state final : scheduler_op
 
112 +
{
 
113 +
    std::mutex mutex;
 
114 +

 
115 +
    // Protected by mutex
 
116 +
    epoll_op* read_op    = nullptr;
 
117 +
    epoll_op* write_op   = nullptr;
 
118 +
    epoll_op* connect_op = nullptr;
 
119 +

 
120 +
    // Caches edge events that arrived before an op was registered
 
121 +
    bool read_ready  = false;
 
122 +
    bool write_ready = false;
 
123 +

 
124 +
    // Deferred cancellation: set by cancel() when the target op is not
 
125 +
    // parked (e.g. completing inline via speculative I/O). Checked when
 
126 +
    // the next op parks; if set, the op is immediately self-cancelled.
 
127 +
    // This matches IOCP semantics where CancelIoEx always succeeds.
 
128 +
    bool read_cancel_pending    = false;
 
129 +
    bool write_cancel_pending   = false;
 
130 +
    bool connect_cancel_pending = false;
 
131 +

 
132 +
    // Set during registration only (no mutex needed)
 
133 +
    std::uint32_t registered_events = 0;
 
134 +
    int fd                          = -1;
 
135 +

 
136 +
    // For deferred I/O - set by reactor, read by scheduler
 
137 +
    std::atomic<std::uint32_t> ready_events_{0};
 
138 +
    std::atomic<bool> is_enqueued_{false};
 
139 +
    epoll_scheduler const* scheduler_ = nullptr;
 
140 +

 
141 +
    // Prevents impl destruction while this descriptor_state is queued.
 
142 +
    // Set by close_socket() when is_enqueued_ is true, cleared by operator().
 
143 +
    std::shared_ptr<void> impl_ref_;
 
144 +

 
145 +
    /// Add ready events atomically.
 
146 +
    void add_ready_events(std::uint32_t ev) noexcept
 
147 +
    {
 
148 +
        ready_events_.fetch_or(ev, std::memory_order_relaxed);
 
149 +
    }
 
150 +

 
151 +
    /// Perform deferred I/O and queue completions.
 
152 +
    void operator()() override;
 
153 +

 
154 +
    /// Destroy without invoking.
 
155 +
    /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
 
156 +
    /// the self-referential cycle set by close_socket().
 
157 +
    void destroy() override
 
158 +
    {
 
159 +
        impl_ref_.reset();
 
160 +
    }
 
161 +
};
 
162 +

 
163 +
struct epoll_op : scheduler_op
 
164 +
{
 
165 +
    struct canceller
 
166 +
    {
 
167 +
        epoll_op* op;
 
168 +
        void operator()() const noexcept;
 
169 +
    };
 
170 +

 
171 +
    std::coroutine_handle<> h;
 
172 +
    capy::executor_ref ex;
 
173 +
    std::error_code* ec_out = nullptr;
 
174 +
    std::size_t* bytes_out  = nullptr;
 
175 +

 
176 +
    int fd                        = -1;
 
177 +
    int errn                      = 0;
 
178 +
    std::size_t bytes_transferred = 0;
 
179 +

 
180 +
    std::atomic<bool> cancelled{false};
 
181 +
    std::optional<std::stop_callback<canceller>> stop_cb;
 
182 +

 
183 +
    // Prevents use-after-free when socket is closed with pending ops.
 
184 +
    // See "Impl Lifetime Management" in file header.
 
185 +
    std::shared_ptr<void> impl_ptr;
 
186 +

 
187 +
    // For stop_token cancellation - pointer to owning socket/acceptor impl.
 
188 +
    // When stop is requested, we call back to the impl to perform actual I/O cancellation.
 
189 +
    epoll_socket* socket_impl_     = nullptr;
 
190 +
    epoll_acceptor* acceptor_impl_ = nullptr;
 
191 +

 
192 +
    epoll_op() = default;
 
193 +

 
194 +
    void reset() noexcept
 
195 +
    {
 
196 +
        fd                = -1;
 
197 +
        errn              = 0;
 
198 +
        bytes_transferred = 0;
 
199 +
        cancelled.store(false, std::memory_order_relaxed);
 
200 +
        impl_ptr.reset();
 
201 +
        socket_impl_   = nullptr;
 
202 +
        acceptor_impl_ = nullptr;
 
203 +
    }
 
204 +

 
205 +
    // Defined in sockets.cpp where epoll_socket is complete
 
206 +
    void operator()() override;
 
207 +

 
208 +
    virtual bool is_read_operation() const noexcept
 
209 +
    {
 
210 +
        return false;
 
211 +
    }
 
212 +
    virtual void cancel() noexcept = 0;
 
213 +

 
214 +
    void destroy() override
 
215 +
    {
 
216 +
        stop_cb.reset();
 
217 +
        impl_ptr.reset();
 
218 +
    }
 
219 +

 
220 +
    void request_cancel() noexcept
 
221 +
    {
 
222 +
        cancelled.store(true, std::memory_order_release);
 
223 +
    }
 
224 +

 
225 +
    // NOLINTNEXTLINE(performance-unnecessary-value-param)
 
226 +
    void start(std::stop_token token, epoll_socket* impl)
 
227 +
    {
 
228 +
        cancelled.store(false, std::memory_order_release);
 
229 +
        stop_cb.reset();
 
230 +
        socket_impl_   = impl;
 
231 +
        acceptor_impl_ = nullptr;
 
232 +

 
233 +
        if (token.stop_possible())
 
234 +
            stop_cb.emplace(token, canceller{this});
 
235 +
    }
 
236 +

 
237 +
    // NOLINTNEXTLINE(performance-unnecessary-value-param)
 
238 +
    void start(std::stop_token token, epoll_acceptor* impl)
 
239 +
    {
 
240 +
        cancelled.store(false, std::memory_order_release);
 
241 +
        stop_cb.reset();
 
242 +
        socket_impl_   = nullptr;
 
243 +
        acceptor_impl_ = impl;
 
244 +

 
245 +
        if (token.stop_possible())
 
246 +
            stop_cb.emplace(token, canceller{this});
 
247 +
    }
 
248 +

 
249 +
    void complete(int err, std::size_t bytes) noexcept
 
250 +
    {
 
251 +
        errn              = err;
 
252 +
        bytes_transferred = bytes;
 
253 +
    }
 
254 +

 
255 +
    virtual void perform_io() noexcept {}
 
256 +
};
 
257 +

 
258 +
struct epoll_connect_op final : epoll_op
 
259 +
{
 
260 +
    endpoint target_endpoint;
 
261 +

 
262 +
    void reset() noexcept
 
263 +
    {
 
264 +
        epoll_op::reset();
 
265 +
        target_endpoint = endpoint{};
 
266 +
    }
 
267 +

 
268 +
    void perform_io() noexcept override
 
269 +
    {
 
270 +
        // connect() completion status is retrieved via SO_ERROR, not return value
 
271 +
        int err       = 0;
 
272 +
        socklen_t len = sizeof(err);
 
273 +
        if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
 
274 +
            err = errno;
 
275 +
        complete(err, 0);
 
276 +
    }
 
277 +

 
278 +
    // Defined in sockets.cpp where epoll_socket is complete
 
279 +
    void operator()() override;
 
280 +
    void cancel() noexcept override;
 
281 +
};
 
282 +

 
283 +
struct epoll_read_op final : epoll_op
 
284 +
{
 
285 +
    static constexpr std::size_t max_buffers = 16;
 
286 +
    iovec iovecs[max_buffers];
 
287 +
    int iovec_count        = 0;
 
288 +
    bool empty_buffer_read = false;
 
289 +

 
290 +
    bool is_read_operation() const noexcept override
 
291 +
    {
 
292 +
        return !empty_buffer_read;
 
293 +
    }
 
294 +

 
295 +
    void reset() noexcept
 
296 +
    {
 
297 +
        epoll_op::reset();
 
298 +
        iovec_count       = 0;
 
299 +
        empty_buffer_read = false;
 
300 +
    }
 
301 +

 
302 +
    void perform_io() noexcept override
 
303 +
    {
 
304 +
        ssize_t n;
 
305 +
        do
 
306 +
        {
 
307 +
            n = ::readv(fd, iovecs, iovec_count);
 
308 +
        }
 
309 +
        while (n < 0 && errno == EINTR);
 
310 +

 
311 +
        if (n >= 0)
 
312 +
            complete(0, static_cast<std::size_t>(n));
 
313 +
        else
 
314 +
            complete(errno, 0);
 
315 +
    }
 
316 +

 
317 +
    void cancel() noexcept override;
 
318 +
};
 
319 +

 
320 +
struct epoll_write_op final : epoll_op
 
321 +
{
 
322 +
    static constexpr std::size_t max_buffers = 16;
 
323 +
    iovec iovecs[max_buffers];
 
324 +
    int iovec_count = 0;
 
325 +

 
326 +
    void reset() noexcept
 
327 +
    {
 
328 +
        epoll_op::reset();
 
329 +
        iovec_count = 0;
 
330 +
    }
 
331 +

 
332 +
    void perform_io() noexcept override
 
333 +
    {
 
334 +
        msghdr msg{};
 
335 +
        msg.msg_iov    = iovecs;
 
336 +
        msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
 
337 +

 
338 +
        ssize_t n;
 
339 +
        do
 
340 +
        {
 
341 +
            n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
 
342 +
        }
 
343 +
        while (n < 0 && errno == EINTR);
 
344 +

 
345 +
        if (n >= 0)
 
346 +
            complete(0, static_cast<std::size_t>(n));
 
347 +
        else
 
348 +
            complete(errno, 0);
 
349 +
    }
 
350 +

 
351 +
    void cancel() noexcept override;
 
352 +
};
 
353 +

 
354 +
struct epoll_accept_op final : epoll_op
 
355 +
{
 
356 +
    int accepted_fd                      = -1;
 
357 +
    io_object::implementation** impl_out = nullptr;
 
358 +
    sockaddr_in peer_addr{};
 
359 +

 
360 +
    void reset() noexcept
 
361 +
    {
 
362 +
        epoll_op::reset();
 
363 +
        accepted_fd = -1;
 
364 +
        impl_out    = nullptr;
 
365 +
        peer_addr   = {};
 
366 +
    }
 
367 +

 
368 +
    void perform_io() noexcept override
 
369 +
    {
 
370 +
        socklen_t addrlen = sizeof(peer_addr);
 
371 +
        int new_fd;
 
372 +
        do
 
373 +
        {
 
374 +
            new_fd = ::accept4(
 
375 +
                fd, reinterpret_cast<sockaddr*>(&peer_addr), &addrlen,
 
376 +
                SOCK_NONBLOCK | SOCK_CLOEXEC);
 
377 +
        }
 
378 +
        while (new_fd < 0 && errno == EINTR);
 
379 +

 
380 +
        if (new_fd >= 0)
 
381 +
        {
 
382 +
            accepted_fd = new_fd;
 
383 +
            complete(0, 0);
 
384 +
        }
 
385 +
        else
 
386 +
        {
 
387 +
            complete(errno, 0);
 
388 +
        }
 
389 +
    }
 
390 +

 
391 +
    // Defined in acceptors.cpp where epoll_acceptor is complete
 
392 +
    void operator()() override;
 
393 +
    void cancel() noexcept override;
 
394 +
};
 
395 +

 
396 +
} // namespace boost::corosio::detail
 
397 +

 
398 +
#endif // BOOST_COROSIO_HAS_EPOLL
 
399 +

 
400 +
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP