1 +
//
 
2 +
// Copyright (c) 2026 Steve Gerbino
 
3 +
//
 
4 +
// Distributed under the Boost Software License, Version 1.0. (See accompanying
 
5 +
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 
6 +
//
 
7 +
// Official repository: https://github.com/cppalliance/corosio
 
8 +
//
 
9 +

 
10 +
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
 
11 +
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
 
12 +

 
13 +
#include <boost/corosio/detail/platform.hpp>
 
14 +

 
15 +
#if BOOST_COROSIO_HAS_EPOLL
 
16 +

 
17 +
#include <boost/corosio/detail/config.hpp>
 
18 +
#include <boost/capy/ex/execution_context.hpp>
 
19 +
#include <boost/corosio/detail/socket_service.hpp>
 
20 +

 
21 +
#include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
 
22 +
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
 
23 +

 
24 +
#include <boost/corosio/detail/endpoint_convert.hpp>
 
25 +
#include <boost/corosio/detail/make_err.hpp>
 
26 +
#include <boost/corosio/detail/dispatch_coro.hpp>
 
27 +
#include <boost/corosio/detail/except.hpp>
 
28 +
#include <boost/capy/buffers.hpp>
 
29 +

 
30 +
#include <coroutine>
 
31 +
#include <mutex>
 
32 +
#include <unordered_map>
 
33 +
#include <utility>
 
34 +

 
35 +
#include <errno.h>
 
36 +
#include <netinet/in.h>
 
37 +
#include <netinet/tcp.h>
 
38 +
#include <sys/epoll.h>
 
39 +
#include <sys/socket.h>
 
40 +
#include <unistd.h>
 
41 +

 
42 +
/*
 
43 +
    epoll Socket Implementation
 
44 +
    ===========================
 
45 +

 
46 +
    Each I/O operation follows the same pattern:
 
47 +
      1. Try the syscall immediately (non-blocking socket)
 
48 +
      2. If it succeeds or fails with a real error, post to completion queue
 
49 +
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
 
50 +

 
51 +
    This "try first" approach avoids unnecessary epoll round-trips for
 
52 +
    operations that can complete immediately (common for small reads/writes
 
53 +
    on fast local connections).
 
54 +

 
55 +
    One-Shot Registration
 
56 +
    ---------------------
 
57 +
    We use one-shot epoll registration: each operation registers, waits for
 
58 +
    one event, then unregisters. This simplifies the state machine since we
 
59 +
    don't need to track whether an fd is currently registered or handle
 
60 +
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
 
61 +
    simplicity is worth it.
 
62 +

 
63 +
    Cancellation
 
64 +
    ------------
 
65 +
    See op.hpp for the completion/cancellation race handling via the
 
66 +
    `registered` atomic. cancel() must complete pending operations (post
 
67 +
    them with cancelled flag) so coroutines waiting on them can resume.
 
68 +
    close_socket() calls cancel() first to ensure this.
 
69 +

 
70 +
    Impl Lifetime with shared_ptr
 
71 +
    -----------------------------
 
72 +
    Socket impls use enable_shared_from_this. The service owns impls via
 
73 +
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
 
74 +
    removal. When a user calls close(), we call cancel() which posts pending
 
75 +
    ops to the scheduler.
 
76 +

 
77 +
    CRITICAL: The posted ops must keep the impl alive until they complete.
 
78 +
    Otherwise the scheduler would process a freed op (use-after-free). The
 
79 +
    cancel() method captures shared_from_this() into op.impl_ptr before
 
80 +
    posting. When the op completes, impl_ptr is cleared, allowing the impl
 
81 +
    to be destroyed if no other references exist.
 
82 +

 
83 +
    Service Ownership
 
84 +
    -----------------
 
85 +
    epoll_socket_service owns all socket impls. destroy_impl() removes the
 
86 +
    shared_ptr from the map, but the impl may survive if ops still hold
 
87 +
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
 
88 +
    in-flight ops will complete and release their refs.
 
89 +
*/
 
90 +

 
91 +
namespace boost::corosio::detail {
 
92 +

 
93 +
/** State for epoll socket service. */
 
94 +
class epoll_socket_state
 
95 +
{
 
96 +
public:
 
97 +
    explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
 
98 +
    {
 
99 +
    }
 
100 +

 
101 +
    epoll_scheduler& sched_;
 
102 +
    std::mutex mutex_;
 
103 +
    intrusive_list<epoll_socket> socket_list_;
 
104 +
    std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
 
105 +
        socket_ptrs_;
 
106 +
};
 
107 +

 
108 +
/** epoll socket service implementation.
 
109 +

 
110 +
    Inherits from socket_service to enable runtime polymorphism.
 
111 +
    Uses key_type = socket_service for service lookup.
 
112 +
*/
 
113 +
class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
 
114 +
{
 
115 +
public:
 
116 +
    explicit epoll_socket_service(capy::execution_context& ctx);
 
117 +
    ~epoll_socket_service() override;
 
118 +

 
119 +
    epoll_socket_service(epoll_socket_service const&)            = delete;
 
120 +
    epoll_socket_service& operator=(epoll_socket_service const&) = delete;
 
121 +

 
122 +
    void shutdown() override;
 
123 +

 
124 +
    io_object::implementation* construct() override;
 
125 +
    void destroy(io_object::implementation*) override;
 
126 +
    void close(io_object::handle&) override;
 
127 +
    std::error_code open_socket(tcp_socket::implementation& impl) override;
 
128 +

 
129 +
    epoll_scheduler& scheduler() const noexcept
 
130 +
    {
 
131 +
        return state_->sched_;
 
132 +
    }
 
133 +
    void post(epoll_op* op);
 
134 +
    void work_started() noexcept;
 
135 +
    void work_finished() noexcept;
 
136 +

 
137 +
private:
 
138 +
    std::unique_ptr<epoll_socket_state> state_;
 
139 +
};
 
140 +

 
141 +
//--------------------------------------------------------------------------
 
142 +
//
 
143 +
// Implementation
 
144 +
//
 
145 +
//--------------------------------------------------------------------------
 
146 +

 
147 +
// Register an op with the reactor, handling cached edge events.
 
148 +
// Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
 
149 +
inline void
 
150 +
epoll_socket::register_op(
 
151 +
    epoll_op& op,
 
152 +
    epoll_op*& desc_slot,
 
153 +
    bool& ready_flag,
 
154 +
    bool& cancel_flag) noexcept
 
155 +
{
 
156 +
    svc_.work_started();
 
157 +

 
158 +
    std::lock_guard lock(desc_state_.mutex);
 
159 +
    bool io_done = false;
 
160 +
    if (ready_flag)
 
161 +
    {
 
162 +
        ready_flag = false;
 
163 +
        op.perform_io();
 
164 +
        io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
 
165 +
        if (!io_done)
 
166 +
            op.errn = 0;
 
167 +
    }
 
168 +

 
169 +
    if (cancel_flag)
 
170 +
    {
 
171 +
        cancel_flag = false;
 
172 +
        op.cancelled.store(true, std::memory_order_relaxed);
 
173 +
    }
 
174 +

 
175 +
    if (io_done || op.cancelled.load(std::memory_order_acquire))
 
176 +
    {
 
177 +
        svc_.post(&op);
 
178 +
        svc_.work_finished();
 
179 +
    }
 
180 +
    else
 
181 +
    {
 
182 +
        desc_slot = &op;
 
183 +
    }
 
184 +
}
 
185 +

 
186 +
inline void
 
187 +
epoll_op::canceller::operator()() const noexcept
 
188 +
{
 
189 +
    op->cancel();
 
190 +
}
 
191 +

 
192 +
inline void
 
193 +
epoll_connect_op::cancel() noexcept
 
194 +
{
 
195 +
    if (socket_impl_)
 
196 +
        socket_impl_->cancel_single_op(*this);
 
197 +
    else
 
198 +
        request_cancel();
 
199 +
}
 
200 +

 
201 +
inline void
 
202 +
epoll_read_op::cancel() noexcept
 
203 +
{
 
204 +
    if (socket_impl_)
 
205 +
        socket_impl_->cancel_single_op(*this);
 
206 +
    else
 
207 +
        request_cancel();
 
208 +
}
 
209 +

 
210 +
inline void
 
211 +
epoll_write_op::cancel() noexcept
 
212 +
{
 
213 +
    if (socket_impl_)
 
214 +
        socket_impl_->cancel_single_op(*this);
 
215 +
    else
 
216 +
        request_cancel();
 
217 +
}
 
218 +

 
219 +
inline void
 
220 +
epoll_op::operator()()
 
221 +
{
 
222 +
    stop_cb.reset();
 
223 +

 
224 +
    socket_impl_->svc_.scheduler().reset_inline_budget();
 
225 +

 
226 +
    if (cancelled.load(std::memory_order_acquire))
 
227 +
        *ec_out = capy::error::canceled;
 
228 +
    else if (errn != 0)
 
229 +
        *ec_out = make_err(errn);
 
230 +
    else if (is_read_operation() && bytes_transferred == 0)
 
231 +
        *ec_out = capy::error::eof;
 
232 +
    else
 
233 +
        *ec_out = {};
 
234 +

 
235 +
    *bytes_out = bytes_transferred;
 
236 +

 
237 +
    // Move to stack before resuming coroutine. The coroutine might close
 
238 +
    // the socket, releasing the last wrapper ref. If impl_ptr were the
 
239 +
    // last ref and we destroyed it while still in operator(), we'd have
 
240 +
    // use-after-free. Moving to local ensures destruction happens at
 
241 +
    // function exit, after all member accesses are complete.
 
242 +
    capy::executor_ref saved_ex(ex);
 
243 +
    std::coroutine_handle<> saved_h(h);
 
244 +
    auto prevent_premature_destruction = std::move(impl_ptr);
 
245 +
    dispatch_coro(saved_ex, saved_h).resume();
 
246 +
}
 
247 +

 
248 +
inline void
 
249 +
epoll_connect_op::operator()()
 
250 +
{
 
251 +
    stop_cb.reset();
 
252 +

 
253 +
    socket_impl_->svc_.scheduler().reset_inline_budget();
 
254 +

 
255 +
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
 
256 +

 
257 +
    // Cache endpoints on successful connect
 
258 +
    if (success && socket_impl_)
 
259 +
    {
 
260 +
        // Query local endpoint via getsockname (may fail, but remote is always known)
 
261 +
        endpoint local_ep;
 
262 +
        sockaddr_in local_addr{};
 
263 +
        socklen_t local_len = sizeof(local_addr);
 
264 +
        if (::getsockname(
 
265 +
                fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
 
266 +
            local_ep = from_sockaddr_in(local_addr);
 
267 +
        // Always cache remote endpoint; local may be default if getsockname failed
 
268 +
        static_cast<epoll_socket*>(socket_impl_)
 
269 +
            ->set_endpoints(local_ep, target_endpoint);
 
270 +
    }
 
271 +

 
272 +
    if (cancelled.load(std::memory_order_acquire))
 
273 +
        *ec_out = capy::error::canceled;
 
274 +
    else if (errn != 0)
 
275 +
        *ec_out = make_err(errn);
 
276 +
    else
 
277 +
        *ec_out = {};
 
278 +

 
279 +
    // Move to stack before resuming. See epoll_op::operator()() for rationale.
 
280 +
    capy::executor_ref saved_ex(ex);
 
281 +
    std::coroutine_handle<> saved_h(h);
 
282 +
    auto prevent_premature_destruction = std::move(impl_ptr);
 
283 +
    dispatch_coro(saved_ex, saved_h).resume();
 
284 +
}
 
285 +

 
286 +
inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
 
287 +
    : svc_(svc)
 
288 +
{
 
289 +
}
 
290 +

 
291 +
inline epoll_socket::~epoll_socket() = default;
 
292 +

 
293 +
inline std::coroutine_handle<>
 
294 +
epoll_socket::connect(
 
295 +
    std::coroutine_handle<> h,
 
296 +
    capy::executor_ref ex,
 
297 +
    endpoint ep,
 
298 +
    std::stop_token token,
 
299 +
    std::error_code* ec)
 
300 +
{
 
301 +
    auto& op = conn_;
 
302 +

 
303 +
    sockaddr_in addr = detail::to_sockaddr_in(ep);
 
304 +
    int result =
 
305 +
        ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
 
306 +

 
307 +
    if (result == 0)
 
308 +
    {
 
309 +
        sockaddr_in local_addr{};
 
310 +
        socklen_t local_len = sizeof(local_addr);
 
311 +
        if (::getsockname(
 
312 +
                fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
 
313 +
            local_endpoint_ = detail::from_sockaddr_in(local_addr);
 
314 +
        remote_endpoint_ = ep;
 
315 +
    }
 
316 +

 
317 +
    if (result == 0 || errno != EINPROGRESS)
 
318 +
    {
 
319 +
        int err = (result < 0) ? errno : 0;
 
320 +
        if (svc_.scheduler().try_consume_inline_budget())
 
321 +
        {
 
322 +
            *ec = err ? make_err(err) : std::error_code{};
 
323 +
            return dispatch_coro(ex, h);
 
324 +
        }
 
325 +
        op.reset();
 
326 +
        op.h               = h;
 
327 +
        op.ex              = ex;
 
328 +
        op.ec_out          = ec;
 
329 +
        op.fd              = fd_;
 
330 +
        op.target_endpoint = ep;
 
331 +
        op.start(token, this);
 
332 +
        op.impl_ptr = shared_from_this();
 
333 +
        op.complete(err, 0);
 
334 +
        svc_.post(&op);
 
335 +
        return std::noop_coroutine();
 
336 +
    }
 
337 +

 
338 +
    // EINPROGRESS — register with reactor
 
339 +
    op.reset();
 
340 +
    op.h               = h;
 
341 +
    op.ex              = ex;
 
342 +
    op.ec_out          = ec;
 
343 +
    op.fd              = fd_;
 
344 +
    op.target_endpoint = ep;
 
345 +
    op.start(token, this);
 
346 +
    op.impl_ptr = shared_from_this();
 
347 +

 
348 +
    register_op(
 
349 +
        op, desc_state_.connect_op, desc_state_.write_ready,
 
350 +
        desc_state_.connect_cancel_pending);
 
351 +
    return std::noop_coroutine();
 
352 +
}
 
353 +

 
354 +
inline std::coroutine_handle<>
 
355 +
epoll_socket::read_some(
 
356 +
    std::coroutine_handle<> h,
 
357 +
    capy::executor_ref ex,
 
358 +
    io_buffer_param param,
 
359 +
    std::stop_token token,
 
360 +
    std::error_code* ec,
 
361 +
    std::size_t* bytes_out)
 
362 +
{
 
363 +
    auto& op = rd_;
 
364 +
    op.reset();
 
365 +

 
366 +
    capy::mutable_buffer bufs[epoll_read_op::max_buffers];
 
367 +
    op.iovec_count =
 
368 +
        static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
 
369 +

 
370 +
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
 
371 +
    {
 
372 +
        op.empty_buffer_read = true;
 
373 +
        op.h                 = h;
 
374 +
        op.ex                = ex;
 
375 +
        op.ec_out            = ec;
 
376 +
        op.bytes_out         = bytes_out;
 
377 +
        op.start(token, this);
 
378 +
        op.impl_ptr = shared_from_this();
 
379 +
        op.complete(0, 0);
 
380 +
        svc_.post(&op);
 
381 +
        return std::noop_coroutine();
 
382 +
    }
 
383 +

 
384 +
    for (int i = 0; i < op.iovec_count; ++i)
 
385 +
    {
 
386 +
        op.iovecs[i].iov_base = bufs[i].data();
 
387 +
        op.iovecs[i].iov_len  = bufs[i].size();
 
388 +
    }
 
389 +

 
390 +
    // Speculative read
 
391 +
    ssize_t n;
 
392 +
    do
 
393 +
    {
 
394 +
        n = ::readv(fd_, op.iovecs, op.iovec_count);
 
395 +
    }
 
396 +
    while (n < 0 && errno == EINTR);
 
397 +

 
398 +
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
 
399 +
    {
 
400 +
        int err    = (n < 0) ? errno : 0;
 
401 +
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
 
402 +

 
403 +
        if (svc_.scheduler().try_consume_inline_budget())
 
404 +
        {
 
405 +
            if (err)
 
406 +
                *ec = make_err(err);
 
407 +
            else if (n == 0)
 
408 +
                *ec = capy::error::eof;
 
409 +
            else
 
410 +
                *ec = {};
 
411 +
            *bytes_out = bytes;
 
412 +
            return dispatch_coro(ex, h);
 
413 +
        }
 
414 +
        op.h         = h;
 
415 +
        op.ex        = ex;
 
416 +
        op.ec_out    = ec;
 
417 +
        op.bytes_out = bytes_out;
 
418 +
        op.start(token, this);
 
419 +
        op.impl_ptr = shared_from_this();
 
420 +
        op.complete(err, bytes);
 
421 +
        svc_.post(&op);
 
422 +
        return std::noop_coroutine();
 
423 +
    }
 
424 +

 
425 +
    // EAGAIN — register with reactor
 
426 +
    op.h         = h;
 
427 +
    op.ex        = ex;
 
428 +
    op.ec_out    = ec;
 
429 +
    op.bytes_out = bytes_out;
 
430 +
    op.fd        = fd_;
 
431 +
    op.start(token, this);
 
432 +
    op.impl_ptr = shared_from_this();
 
433 +

 
434 +
    register_op(
 
435 +
        op, desc_state_.read_op, desc_state_.read_ready,
 
436 +
        desc_state_.read_cancel_pending);
 
437 +
    return std::noop_coroutine();
 
438 +
}
 
439 +

 
440 +
inline std::coroutine_handle<>
 
441 +
epoll_socket::write_some(
 
442 +
    std::coroutine_handle<> h,
 
443 +
    capy::executor_ref ex,
 
444 +
    io_buffer_param param,
 
445 +
    std::stop_token token,
 
446 +
    std::error_code* ec,
 
447 +
    std::size_t* bytes_out)
 
448 +
{
 
449 +
    auto& op = wr_;
 
450 +
    op.reset();
 
451 +

 
452 +
    capy::mutable_buffer bufs[epoll_write_op::max_buffers];
 
453 +
    op.iovec_count =
 
454 +
        static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
 
455 +

 
456 +
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
 
457 +
    {
 
458 +
        op.h         = h;
 
459 +
        op.ex        = ex;
 
460 +
        op.ec_out    = ec;
 
461 +
        op.bytes_out = bytes_out;
 
462 +
        op.start(token, this);
 
463 +
        op.impl_ptr = shared_from_this();
 
464 +
        op.complete(0, 0);
 
465 +
        svc_.post(&op);
 
466 +
        return std::noop_coroutine();
 
467 +
    }
 
468 +

 
469 +
    for (int i = 0; i < op.iovec_count; ++i)
 
470 +
    {
 
471 +
        op.iovecs[i].iov_base = bufs[i].data();
 
472 +
        op.iovecs[i].iov_len  = bufs[i].size();
 
473 +
    }
 
474 +

 
475 +
    // Speculative write
 
476 +
    msghdr msg{};
 
477 +
    msg.msg_iov    = op.iovecs;
 
478 +
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
 
479 +

 
480 +
    ssize_t n;
 
481 +
    do
 
482 +
    {
 
483 +
        n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
 
484 +
    }
 
485 +
    while (n < 0 && errno == EINTR);
 
486 +

 
487 +
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
 
488 +
    {
 
489 +
        int err    = (n < 0) ? errno : 0;
 
490 +
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
 
491 +

 
492 +
        if (svc_.scheduler().try_consume_inline_budget())
 
493 +
        {
 
494 +
            *ec        = err ? make_err(err) : std::error_code{};
 
495 +
            *bytes_out = bytes;
 
496 +
            return dispatch_coro(ex, h);
 
497 +
        }
 
498 +
        op.h         = h;
 
499 +
        op.ex        = ex;
 
500 +
        op.ec_out    = ec;
 
501 +
        op.bytes_out = bytes_out;
 
502 +
        op.start(token, this);
 
503 +
        op.impl_ptr = shared_from_this();
 
504 +
        op.complete(err, bytes);
 
505 +
        svc_.post(&op);
 
506 +
        return std::noop_coroutine();
 
507 +
    }
 
508 +

 
509 +
    // EAGAIN — register with reactor
 
510 +
    op.h         = h;
 
511 +
    op.ex        = ex;
 
512 +
    op.ec_out    = ec;
 
513 +
    op.bytes_out = bytes_out;
 
514 +
    op.fd        = fd_;
 
515 +
    op.start(token, this);
 
516 +
    op.impl_ptr = shared_from_this();
 
517 +

 
518 +
    register_op(
 
519 +
        op, desc_state_.write_op, desc_state_.write_ready,
 
520 +
        desc_state_.write_cancel_pending);
 
521 +
    return std::noop_coroutine();
 
522 +
}
 
523 +

 
524 +
inline std::error_code
 
525 +
epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
 
526 +
{
 
527 +
    int how;
 
528 +
    switch (what)
 
529 +
    {
 
530 +
    case tcp_socket::shutdown_receive:
 
531 +
        how = SHUT_RD;
 
532 +
        break;
 
533 +
    case tcp_socket::shutdown_send:
 
534 +
        how = SHUT_WR;
 
535 +
        break;
 
536 +
    case tcp_socket::shutdown_both:
 
537 +
        how = SHUT_RDWR;
 
538 +
        break;
 
539 +
    default:
 
540 +
        return make_err(EINVAL);
 
541 +
    }
 
542 +
    if (::shutdown(fd_, how) != 0)
 
543 +
        return make_err(errno);
 
544 +
    return {};
 
545 +
}
 
546 +

 
547 +
inline std::error_code
 
548 +
epoll_socket::set_no_delay(bool value) noexcept
 
549 +
{
 
550 +
    int flag = value ? 1 : 0;
 
551 +
    if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
 
552 +
        return make_err(errno);
 
553 +
    return {};
 
554 +
}
 
555 +

 
556 +
inline bool
 
557 +
epoll_socket::no_delay(std::error_code& ec) const noexcept
 
558 +
{
 
559 +
    int flag      = 0;
 
560 +
    socklen_t len = sizeof(flag);
 
561 +
    if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
 
562 +
    {
 
563 +
        ec = make_err(errno);
 
564 +
        return false;
 
565 +
    }
 
566 +
    ec = {};
 
567 +
    return flag != 0;
 
568 +
}
 
569 +

 
570 +
inline std::error_code
 
571 +
epoll_socket::set_keep_alive(bool value) noexcept
 
572 +
{
 
573 +
    int flag = value ? 1 : 0;
 
574 +
    if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
 
575 +
        return make_err(errno);
 
576 +
    return {};
 
577 +
}
 
578 +

 
579 +
inline bool
 
580 +
epoll_socket::keep_alive(std::error_code& ec) const noexcept
 
581 +
{
 
582 +
    int flag      = 0;
 
583 +
    socklen_t len = sizeof(flag);
 
584 +
    if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
 
585 +
    {
 
586 +
        ec = make_err(errno);
 
587 +
        return false;
 
588 +
    }
 
589 +
    ec = {};
 
590 +
    return flag != 0;
 
591 +
}
 
592 +

 
593 +
inline std::error_code
 
594 +
epoll_socket::set_receive_buffer_size(int size) noexcept
 
595 +
{
 
596 +
    if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
 
597 +
        return make_err(errno);
 
598 +
    return {};
 
599 +
}
 
600 +

 
601 +
inline int
 
602 +
epoll_socket::receive_buffer_size(std::error_code& ec) const noexcept
 
603 +
{
 
604 +
    int size      = 0;
 
605 +
    socklen_t len = sizeof(size);
 
606 +
    if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
 
607 +
    {
 
608 +
        ec = make_err(errno);
 
609 +
        return 0;
 
610 +
    }
 
611 +
    ec = {};
 
612 +
    return size;
 
613 +
}
 
614 +

 
615 +
inline std::error_code
 
616 +
epoll_socket::set_send_buffer_size(int size) noexcept
 
617 +
{
 
618 +
    if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
 
619 +
        return make_err(errno);
 
620 +
    return {};
 
621 +
}
 
622 +

 
623 +
inline int
 
624 +
epoll_socket::send_buffer_size(std::error_code& ec) const noexcept
 
625 +
{
 
626 +
    int size      = 0;
 
627 +
    socklen_t len = sizeof(size);
 
628 +
    if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
 
629 +
    {
 
630 +
        ec = make_err(errno);
 
631 +
        return 0;
 
632 +
    }
 
633 +
    ec = {};
 
634 +
    return size;
 
635 +
}
 
636 +

 
637 +
inline std::error_code
 
638 +
epoll_socket::set_linger(bool enabled, int timeout) noexcept
 
639 +
{
 
640 +
    if (timeout < 0)
 
641 +
        return make_err(EINVAL);
 
642 +
    struct ::linger lg;
 
643 +
    lg.l_onoff  = enabled ? 1 : 0;
 
644 +
    lg.l_linger = timeout;
 
645 +
    if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
 
646 +
        return make_err(errno);
 
647 +
    return {};
 
648 +
}
 
649 +

 
650 +
inline tcp_socket::linger_options
 
651 +
epoll_socket::linger(std::error_code& ec) const noexcept
 
652 +
{
 
653 +
    struct ::linger lg{};
 
654 +
    socklen_t len = sizeof(lg);
 
655 +
    if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
 
656 +
    {
 
657 +
        ec = make_err(errno);
 
658 +
        return {};
 
659 +
    }
 
660 +
    ec = {};
 
661 +
    return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
 
662 +
}
 
663 +

 
664 +
inline void
 
665 +
epoll_socket::cancel() noexcept
 
666 +
{
 
667 +
    auto self = weak_from_this().lock();
 
668 +
    if (!self)
 
669 +
        return;
 
670 +

 
671 +
    conn_.request_cancel();
 
672 +
    rd_.request_cancel();
 
673 +
    wr_.request_cancel();
 
674 +

 
675 +
    epoll_op* conn_claimed = nullptr;
 
676 +
    epoll_op* rd_claimed   = nullptr;
 
677 +
    epoll_op* wr_claimed   = nullptr;
 
678 +
    {
 
679 +
        std::lock_guard lock(desc_state_.mutex);
 
680 +
        if (desc_state_.connect_op == &conn_)
 
681 +
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
 
682 +
        else
 
683 +
            desc_state_.connect_cancel_pending = true;
 
684 +
        if (desc_state_.read_op == &rd_)
 
685 +
            rd_claimed = std::exchange(desc_state_.read_op, nullptr);
 
686 +
        else
 
687 +
            desc_state_.read_cancel_pending = true;
 
688 +
        if (desc_state_.write_op == &wr_)
 
689 +
            wr_claimed = std::exchange(desc_state_.write_op, nullptr);
 
690 +
        else
 
691 +
            desc_state_.write_cancel_pending = true;
 
692 +
    }
 
693 +

 
694 +
    if (conn_claimed)
 
695 +
    {
 
696 +
        conn_.impl_ptr = self;
 
697 +
        svc_.post(&conn_);
 
698 +
        svc_.work_finished();
 
699 +
    }
 
700 +
    if (rd_claimed)
 
701 +
    {
 
702 +
        rd_.impl_ptr = self;
 
703 +
        svc_.post(&rd_);
 
704 +
        svc_.work_finished();
 
705 +
    }
 
706 +
    if (wr_claimed)
 
707 +
    {
 
708 +
        wr_.impl_ptr = self;
 
709 +
        svc_.post(&wr_);
 
710 +
        svc_.work_finished();
 
711 +
    }
 
712 +
}
 
713 +

 
714 +
inline void
 
715 +
epoll_socket::cancel_single_op(epoll_op& op) noexcept
 
716 +
{
 
717 +
    auto self = weak_from_this().lock();
 
718 +
    if (!self)
 
719 +
        return;
 
720 +

 
721 +
    op.request_cancel();
 
722 +

 
723 +
    epoll_op** desc_op_ptr = nullptr;
 
724 +
    if (&op == &conn_)
 
725 +
        desc_op_ptr = &desc_state_.connect_op;
 
726 +
    else if (&op == &rd_)
 
727 +
        desc_op_ptr = &desc_state_.read_op;
 
728 +
    else if (&op == &wr_)
 
729 +
        desc_op_ptr = &desc_state_.write_op;
 
730 +

 
731 +
    if (desc_op_ptr)
 
732 +
    {
 
733 +
        epoll_op* claimed = nullptr;
 
734 +
        {
 
735 +
            std::lock_guard lock(desc_state_.mutex);
 
736 +
            if (*desc_op_ptr == &op)
 
737 +
                claimed = std::exchange(*desc_op_ptr, nullptr);
 
738 +
            else if (&op == &conn_)
 
739 +
                desc_state_.connect_cancel_pending = true;
 
740 +
            else if (&op == &rd_)
 
741 +
                desc_state_.read_cancel_pending = true;
 
742 +
            else if (&op == &wr_)
 
743 +
                desc_state_.write_cancel_pending = true;
 
744 +
        }
 
745 +
        if (claimed)
 
746 +
        {
 
747 +
            op.impl_ptr = self;
 
748 +
            svc_.post(&op);
 
749 +
            svc_.work_finished();
 
750 +
        }
 
751 +
    }
 
752 +
}
 
753 +

 
754 +
inline void
 
755 +
epoll_socket::close_socket() noexcept
 
756 +
{
 
757 +
    auto self = weak_from_this().lock();
 
758 +
    if (self)
 
759 +
    {
 
760 +
        conn_.request_cancel();
 
761 +
        rd_.request_cancel();
 
762 +
        wr_.request_cancel();
 
763 +

 
764 +
        epoll_op* conn_claimed = nullptr;
 
765 +
        epoll_op* rd_claimed   = nullptr;
 
766 +
        epoll_op* wr_claimed   = nullptr;
 
767 +
        {
 
768 +
            std::lock_guard lock(desc_state_.mutex);
 
769 +
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
 
770 +
            rd_claimed   = std::exchange(desc_state_.read_op, nullptr);
 
771 +
            wr_claimed   = std::exchange(desc_state_.write_op, nullptr);
 
772 +
            desc_state_.read_ready             = false;
 
773 +
            desc_state_.write_ready            = false;
 
774 +
            desc_state_.read_cancel_pending    = false;
 
775 +
            desc_state_.write_cancel_pending   = false;
 
776 +
            desc_state_.connect_cancel_pending = false;
 
777 +
        }
 
778 +

 
779 +
        if (conn_claimed)
 
780 +
        {
 
781 +
            conn_.impl_ptr = self;
 
782 +
            svc_.post(&conn_);
 
783 +
            svc_.work_finished();
 
784 +
        }
 
785 +
        if (rd_claimed)
 
786 +
        {
 
787 +
            rd_.impl_ptr = self;
 
788 +
            svc_.post(&rd_);
 
789 +
            svc_.work_finished();
 
790 +
        }
 
791 +
        if (wr_claimed)
 
792 +
        {
 
793 +
            wr_.impl_ptr = self;
 
794 +
            svc_.post(&wr_);
 
795 +
            svc_.work_finished();
 
796 +
        }
 
797 +

 
798 +
        if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
 
799 +
            desc_state_.impl_ref_ = self;
 
800 +
    }
 
801 +

 
802 +
    if (fd_ >= 0)
 
803 +
    {
 
804 +
        if (desc_state_.registered_events != 0)
 
805 +
            svc_.scheduler().deregister_descriptor(fd_);
 
806 +
        ::close(fd_);
 
807 +
        fd_ = -1;
 
808 +
    }
 
809 +

 
810 +
    desc_state_.fd                = -1;
 
811 +
    desc_state_.registered_events = 0;
 
812 +

 
813 +
    local_endpoint_  = endpoint{};
 
814 +
    remote_endpoint_ = endpoint{};
 
815 +
}
 
816 +

 
817 +
inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
 
818 +
    : state_(
 
819 +
          std::make_unique<epoll_socket_state>(
 
820 +
              ctx.use_service<epoll_scheduler>()))
 
821 +
{
 
822 +
}
 
823 +

 
824 +
inline epoll_socket_service::~epoll_socket_service() {}
 
825 +

 
826 +
inline void
 
827 +
epoll_socket_service::shutdown()
 
828 +
{
 
829 +
    std::lock_guard lock(state_->mutex_);
 
830 +

 
831 +
    while (auto* impl = state_->socket_list_.pop_front())
 
832 +
        impl->close_socket();
 
833 +

 
834 +
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
 
835 +
    // drains completed_ops_, calling destroy() on each queued op. If we
 
836 +
    // released our shared_ptrs now, an epoll_op::destroy() could free the
 
837 +
    // last ref to an impl whose embedded descriptor_state is still linked
 
838 +
    // in the queue — use-after-free on the next pop(). Letting ~state_
 
839 +
    // release the ptrs (during service destruction, after scheduler
 
840 +
    // shutdown) keeps every impl alive until all ops have been drained.
 
841 +
}
 
842 +

 
843 +
inline io_object::implementation*
 
844 +
epoll_socket_service::construct()
 
845 +
{
 
846 +
    auto impl = std::make_shared<epoll_socket>(*this);
 
847 +
    auto* raw = impl.get();
 
848 +

 
849 +
    {
 
850 +
        std::lock_guard lock(state_->mutex_);
 
851 +
        state_->socket_list_.push_back(raw);
 
852 +
        state_->socket_ptrs_.emplace(raw, std::move(impl));
 
853 +
    }
 
854 +

 
855 +
    return raw;
 
856 +
}
 
857 +

 
858 +
inline void
 
859 +
epoll_socket_service::destroy(io_object::implementation* impl)
 
860 +
{
 
861 +
    auto* epoll_impl = static_cast<epoll_socket*>(impl);
 
862 +
    epoll_impl->close_socket();
 
863 +
    std::lock_guard lock(state_->mutex_);
 
864 +
    state_->socket_list_.remove(epoll_impl);
 
865 +
    state_->socket_ptrs_.erase(epoll_impl);
 
866 +
}
 
867 +

 
868 +
inline std::error_code
 
869 +
epoll_socket_service::open_socket(tcp_socket::implementation& impl)
 
870 +
{
 
871 +
    auto* epoll_impl = static_cast<epoll_socket*>(&impl);
 
872 +
    epoll_impl->close_socket();
 
873 +

 
874 +
    int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
 
875 +
    if (fd < 0)
 
876 +
        return make_err(errno);
 
877 +

 
878 +
    epoll_impl->fd_ = fd;
 
879 +

 
880 +
    // Register fd with epoll (edge-triggered mode)
 
881 +
    epoll_impl->desc_state_.fd = fd;
 
882 +
    {
 
883 +
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
 
884 +
        epoll_impl->desc_state_.read_op    = nullptr;
 
885 +
        epoll_impl->desc_state_.write_op   = nullptr;
 
886 +
        epoll_impl->desc_state_.connect_op = nullptr;
 
887 +
    }
 
888 +
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
 
889 +

 
890 +
    return {};
 
891 +
}
 
892 +

 
893 +
inline void
 
894 +
epoll_socket_service::close(io_object::handle& h)
 
895 +
{
 
896 +
    static_cast<epoll_socket*>(h.get())->close_socket();
 
897 +
}
 
898 +

 
899 +
inline void
 
900 +
epoll_socket_service::post(epoll_op* op)
 
901 +
{
 
902 +
    state_->sched_.post(op);
 
903 +
}
 
904 +

 
905 +
inline void
 
906 +
epoll_socket_service::work_started() noexcept
 
907 +
{
 
908 +
    state_->sched_.work_started();
 
909 +
}
 
910 +

 
911 +
inline void
 
912 +
epoll_socket_service::work_finished() noexcept
 
913 +
{
 
914 +
    state_->sched_.work_finished();
 
915 +
}
 
916 +

 
917 +
} // namespace boost::corosio::detail
 
918 +

 
919 +
#endif // BOOST_COROSIO_HAS_EPOLL
 
920 +

 
921 +
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP