1 +
//
 
2 +
// Copyright (c) 2026 Steve Gerbino
 
3 +
//
 
4 +
// Distributed under the Boost Software License, Version 1.0. (See accompanying
 
5 +
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 
6 +
//
 
7 +
// Official repository: https://github.com/cppalliance/corosio
 
8 +
//
 
9 +

 
10 +
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
 
11 +
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
 
12 +

 
13 +
#include <boost/corosio/detail/platform.hpp>
 
14 +

 
15 +
#if BOOST_COROSIO_HAS_SELECT
 
16 +

 
17 +
#include <boost/corosio/detail/config.hpp>
 
18 +
#include <boost/capy/ex/execution_context.hpp>
 
19 +
#include <boost/corosio/detail/socket_service.hpp>
 
20 +

 
21 +
#include <boost/corosio/native/detail/select/select_socket.hpp>
 
22 +
#include <boost/corosio/native/detail/select/select_scheduler.hpp>
 
23 +

 
24 +
#include <boost/corosio/detail/endpoint_convert.hpp>
 
25 +
#include <boost/corosio/detail/dispatch_coro.hpp>
 
26 +
#include <boost/corosio/detail/make_err.hpp>
 
27 +

 
28 +
#include <boost/corosio/detail/except.hpp>
 
29 +

 
30 +
#include <boost/capy/buffers.hpp>
 
31 +

 
32 +
#include <errno.h>
 
33 +
#include <fcntl.h>
 
34 +
#include <netinet/in.h>
 
35 +
#include <netinet/tcp.h>
 
36 +
#include <sys/socket.h>
 
37 +
#include <unistd.h>
 
38 +

 
39 +
#include <memory>
 
40 +
#include <mutex>
 
41 +
#include <unordered_map>
 
42 +

 
43 +
/*
 
44 +
    select Socket Implementation
 
45 +
    ============================
 
46 +

 
47 +
    This mirrors the epoll_sockets design for behavioral consistency.
 
48 +
    Each I/O operation follows the same pattern:
 
49 +
      1. Try the syscall immediately (non-blocking socket)
 
50 +
      2. If it succeeds or fails with a real error, post to completion queue
 
51 +
      3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
 
52 +

 
53 +
    Cancellation
 
54 +
    ------------
 
55 +
    See op.hpp for the completion/cancellation race handling via the
 
56 +
    `registered` atomic. cancel() must complete pending operations (post
 
57 +
    them with cancelled flag) so coroutines waiting on them can resume.
 
58 +
    close_socket() calls cancel() first to ensure this.
 
59 +

 
60 +
    Impl Lifetime with shared_ptr
 
61 +
    -----------------------------
 
62 +
    Socket impls use enable_shared_from_this. The service owns impls via
 
63 +
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
 
64 +
    removal. When a user calls close(), we call cancel() which posts pending
 
65 +
    ops to the scheduler.
 
66 +

 
67 +
    CRITICAL: The posted ops must keep the impl alive until they complete.
 
68 +
    Otherwise the scheduler would process a freed op (use-after-free). The
 
69 +
    cancel() method captures shared_from_this() into op.impl_ptr before
 
70 +
    posting. When the op completes, impl_ptr is cleared, allowing the impl
 
71 +
    to be destroyed if no other references exist.
 
72 +

 
73 +
    Service Ownership
 
74 +
    -----------------
 
75 +
    select_socket_service owns all socket impls. destroy() removes the
 
76 +
    shared_ptr from the map, but the impl may survive if ops still hold
 
77 +
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
 
78 +
    in-flight ops will complete and release their refs.
 
79 +
*/
 
80 +

 
81 +
namespace boost::corosio::detail {
 
82 +

 
83 +
/** State for select socket service. */
 
84 +
class select_socket_state
 
85 +
{
 
86 +
public:
 
87 +
    explicit select_socket_state(select_scheduler& sched) noexcept
 
88 +
        : sched_(sched)
 
89 +
    {
 
90 +
    }
 
91 +

 
92 +
    select_scheduler& sched_;
 
93 +
    std::mutex mutex_;
 
94 +
    intrusive_list<select_socket> socket_list_;
 
95 +
    std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
 
96 +
        socket_ptrs_;
 
97 +
};
 
98 +

 
99 +
/** select socket service implementation.
 
100 +

 
101 +
    Inherits from socket_service to enable runtime polymorphism.
 
102 +
    Uses key_type = socket_service for service lookup.
 
103 +
*/
 
104 +
class BOOST_COROSIO_DECL select_socket_service final : public socket_service
 
105 +
{
 
106 +
public:
 
107 +
    explicit select_socket_service(capy::execution_context& ctx);
 
108 +
    ~select_socket_service() override;
 
109 +

 
110 +
    select_socket_service(select_socket_service const&)            = delete;
 
111 +
    select_socket_service& operator=(select_socket_service const&) = delete;
 
112 +

 
113 +
    void shutdown() override;
 
114 +

 
115 +
    io_object::implementation* construct() override;
 
116 +
    void destroy(io_object::implementation*) override;
 
117 +
    void close(io_object::handle&) override;
 
118 +
    std::error_code open_socket(tcp_socket::implementation& impl) override;
 
119 +

 
120 +
    select_scheduler& scheduler() const noexcept
 
121 +
    {
 
122 +
        return state_->sched_;
 
123 +
    }
 
124 +
    void post(select_op* op);
 
125 +
    void work_started() noexcept;
 
126 +
    void work_finished() noexcept;
 
127 +

 
128 +
private:
 
129 +
    std::unique_ptr<select_socket_state> state_;
 
130 +
};
 
131 +

 
132 +
// Backward compatibility alias
 
133 +
using select_sockets = select_socket_service;
 
134 +

 
135 +
inline void
 
136 +
select_op::canceller::operator()() const noexcept
 
137 +
{
 
138 +
    op->cancel();
 
139 +
}
 
140 +

 
141 +
inline void
 
142 +
select_connect_op::cancel() noexcept
 
143 +
{
 
144 +
    if (socket_impl_)
 
145 +
        socket_impl_->cancel_single_op(*this);
 
146 +
    else
 
147 +
        request_cancel();
 
148 +
}
 
149 +

 
150 +
inline void
 
151 +
select_read_op::cancel() noexcept
 
152 +
{
 
153 +
    if (socket_impl_)
 
154 +
        socket_impl_->cancel_single_op(*this);
 
155 +
    else
 
156 +
        request_cancel();
 
157 +
}
 
158 +

 
159 +
inline void
 
160 +
select_write_op::cancel() noexcept
 
161 +
{
 
162 +
    if (socket_impl_)
 
163 +
        socket_impl_->cancel_single_op(*this);
 
164 +
    else
 
165 +
        request_cancel();
 
166 +
}
 
167 +

 
168 +
inline void
 
169 +
select_connect_op::operator()()
 
170 +
{
 
171 +
    stop_cb.reset();
 
172 +

 
173 +
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
 
174 +

 
175 +
    // Cache endpoints on successful connect
 
176 +
    if (success && socket_impl_)
 
177 +
    {
 
178 +
        // Query local endpoint via getsockname (may fail, but remote is always known)
 
179 +
        endpoint local_ep;
 
180 +
        sockaddr_in local_addr{};
 
181 +
        socklen_t local_len = sizeof(local_addr);
 
182 +
        if (::getsockname(
 
183 +
                fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
 
184 +
            local_ep = from_sockaddr_in(local_addr);
 
185 +
        // Always cache remote endpoint; local may be default if getsockname failed
 
186 +
        static_cast<select_socket*>(socket_impl_)
 
187 +
            ->set_endpoints(local_ep, target_endpoint);
 
188 +
    }
 
189 +

 
190 +
    if (ec_out)
 
191 +
    {
 
192 +
        if (cancelled.load(std::memory_order_acquire))
 
193 +
            *ec_out = capy::error::canceled;
 
194 +
        else if (errn != 0)
 
195 +
            *ec_out = make_err(errn);
 
196 +
        else
 
197 +
            *ec_out = {};
 
198 +
    }
 
199 +

 
200 +
    if (bytes_out)
 
201 +
        *bytes_out = bytes_transferred;
 
202 +

 
203 +
    // Move to stack before destroying the frame
 
204 +
    capy::executor_ref saved_ex(ex);
 
205 +
    std::coroutine_handle<> saved_h(h);
 
206 +
    impl_ptr.reset();
 
207 +
    dispatch_coro(saved_ex, saved_h).resume();
 
208 +
}
 
209 +

 
210 +
inline select_socket::select_socket(select_socket_service& svc) noexcept
 
211 +
    : svc_(svc)
 
212 +
{
 
213 +
}
 
214 +

 
215 +
inline std::coroutine_handle<>
 
216 +
select_socket::connect(
 
217 +
    std::coroutine_handle<> h,
 
218 +
    capy::executor_ref ex,
 
219 +
    endpoint ep,
 
220 +
    std::stop_token token,
 
221 +
    std::error_code* ec)
 
222 +
{
 
223 +
    auto& op = conn_;
 
224 +
    op.reset();
 
225 +
    op.h               = h;
 
226 +
    op.ex              = ex;
 
227 +
    op.ec_out          = ec;
 
228 +
    op.fd              = fd_;
 
229 +
    op.target_endpoint = ep; // Store target for endpoint caching
 
230 +
    op.start(token, this);
 
231 +

 
232 +
    sockaddr_in addr = detail::to_sockaddr_in(ep);
 
233 +
    int result =
 
234 +
        ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
 
235 +

 
236 +
    if (result == 0)
 
237 +
    {
 
238 +
        // Sync success - cache endpoints immediately
 
239 +
        sockaddr_in local_addr{};
 
240 +
        socklen_t local_len = sizeof(local_addr);
 
241 +
        if (::getsockname(
 
242 +
                fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
 
243 +
            local_endpoint_ = detail::from_sockaddr_in(local_addr);
 
244 +
        remote_endpoint_ = ep;
 
245 +

 
246 +
        op.complete(0, 0);
 
247 +
        op.impl_ptr = shared_from_this();
 
248 +
        svc_.post(&op);
 
249 +
        // completion is always posted to scheduler queue, never inline.
 
250 +
        return std::noop_coroutine();
 
251 +
    }
 
252 +

 
253 +
    if (errno == EINPROGRESS)
 
254 +
    {
 
255 +
        svc_.work_started();
 
256 +
        op.impl_ptr = shared_from_this();
 
257 +

 
258 +
        // Set registering BEFORE register_fd to close the race window where
 
259 +
        // reactor sees an event before we set registered. The reactor treats
 
260 +
        // registering the same as registered when claiming the op.
 
261 +
        op.registered.store(
 
262 +
            select_registration_state::registering, std::memory_order_release);
 
263 +
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
 
264 +

 
265 +
        // Transition to registered. If this fails, reactor or cancel already
 
266 +
        // claimed the op (state is now unregistered), so we're done. However,
 
267 +
        // we must still deregister the fd because cancel's deregister_fd may
 
268 +
        // have run before our register_fd, leaving the fd orphaned.
 
269 +
        auto expected = select_registration_state::registering;
 
270 +
        if (!op.registered.compare_exchange_strong(
 
271 +
                expected, select_registration_state::registered,
 
272 +
                std::memory_order_acq_rel))
 
273 +
        {
 
274 +
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
 
275 +
            // completion is always posted to scheduler queue, never inline.
 
276 +
            return std::noop_coroutine();
 
277 +
        }
 
278 +

 
279 +
        // If cancelled was set before we registered, handle it now.
 
280 +
        if (op.cancelled.load(std::memory_order_acquire))
 
281 +
        {
 
282 +
            auto prev = op.registered.exchange(
 
283 +
                select_registration_state::unregistered,
 
284 +
                std::memory_order_acq_rel);
 
285 +
            if (prev != select_registration_state::unregistered)
 
286 +
            {
 
287 +
                svc_.scheduler().deregister_fd(
 
288 +
                    fd_, select_scheduler::event_write);
 
289 +
                op.impl_ptr = shared_from_this();
 
290 +
                svc_.post(&op);
 
291 +
                svc_.work_finished();
 
292 +
            }
 
293 +
        }
 
294 +
        // completion is always posted to scheduler queue, never inline.
 
295 +
        return std::noop_coroutine();
 
296 +
    }
 
297 +

 
298 +
    op.complete(errno, 0);
 
299 +
    op.impl_ptr = shared_from_this();
 
300 +
    svc_.post(&op);
 
301 +
    // completion is always posted to scheduler queue, never inline.
 
302 +
    return std::noop_coroutine();
 
303 +
}
 
304 +

 
305 +
inline std::coroutine_handle<>
 
306 +
select_socket::read_some(
 
307 +
    std::coroutine_handle<> h,
 
308 +
    capy::executor_ref ex,
 
309 +
    io_buffer_param param,
 
310 +
    std::stop_token token,
 
311 +
    std::error_code* ec,
 
312 +
    std::size_t* bytes_out)
 
313 +
{
 
314 +
    auto& op = rd_;
 
315 +
    op.reset();
 
316 +
    op.h         = h;
 
317 +
    op.ex        = ex;
 
318 +
    op.ec_out    = ec;
 
319 +
    op.bytes_out = bytes_out;
 
320 +
    op.fd        = fd_;
 
321 +
    op.start(token, this);
 
322 +

 
323 +
    capy::mutable_buffer bufs[select_read_op::max_buffers];
 
324 +
    op.iovec_count =
 
325 +
        static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
 
326 +

 
327 +
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
 
328 +
    {
 
329 +
        op.empty_buffer_read = true;
 
330 +
        op.complete(0, 0);
 
331 +
        op.impl_ptr = shared_from_this();
 
332 +
        svc_.post(&op);
 
333 +
        return std::noop_coroutine();
 
334 +
    }
 
335 +

 
336 +
    for (int i = 0; i < op.iovec_count; ++i)
 
337 +
    {
 
338 +
        op.iovecs[i].iov_base = bufs[i].data();
 
339 +
        op.iovecs[i].iov_len  = bufs[i].size();
 
340 +
    }
 
341 +

 
342 +
    ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
 
343 +

 
344 +
    if (n > 0)
 
345 +
    {
 
346 +
        op.complete(0, static_cast<std::size_t>(n));
 
347 +
        op.impl_ptr = shared_from_this();
 
348 +
        svc_.post(&op);
 
349 +
        return std::noop_coroutine();
 
350 +
    }
 
351 +

 
352 +
    if (n == 0)
 
353 +
    {
 
354 +
        op.complete(0, 0);
 
355 +
        op.impl_ptr = shared_from_this();
 
356 +
        svc_.post(&op);
 
357 +
        return std::noop_coroutine();
 
358 +
    }
 
359 +

 
360 +
    if (errno == EAGAIN || errno == EWOULDBLOCK)
 
361 +
    {
 
362 +
        svc_.work_started();
 
363 +
        op.impl_ptr = shared_from_this();
 
364 +

 
365 +
        // Set registering BEFORE register_fd to close the race window where
 
366 +
        // reactor sees an event before we set registered.
 
367 +
        op.registered.store(
 
368 +
            select_registration_state::registering, std::memory_order_release);
 
369 +
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
 
370 +

 
371 +
        // Transition to registered. If this fails, reactor or cancel already
 
372 +
        // claimed the op (state is now unregistered), so we're done. However,
 
373 +
        // we must still deregister the fd because cancel's deregister_fd may
 
374 +
        // have run before our register_fd, leaving the fd orphaned.
 
375 +
        auto expected = select_registration_state::registering;
 
376 +
        if (!op.registered.compare_exchange_strong(
 
377 +
                expected, select_registration_state::registered,
 
378 +
                std::memory_order_acq_rel))
 
379 +
        {
 
380 +
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
 
381 +
            return std::noop_coroutine();
 
382 +
        }
 
383 +

 
384 +
        // If cancelled was set before we registered, handle it now.
 
385 +
        if (op.cancelled.load(std::memory_order_acquire))
 
386 +
        {
 
387 +
            auto prev = op.registered.exchange(
 
388 +
                select_registration_state::unregistered,
 
389 +
                std::memory_order_acq_rel);
 
390 +
            if (prev != select_registration_state::unregistered)
 
391 +
            {
 
392 +
                svc_.scheduler().deregister_fd(
 
393 +
                    fd_, select_scheduler::event_read);
 
394 +
                op.impl_ptr = shared_from_this();
 
395 +
                svc_.post(&op);
 
396 +
                svc_.work_finished();
 
397 +
            }
 
398 +
        }
 
399 +
        return std::noop_coroutine();
 
400 +
    }
 
401 +

 
402 +
    op.complete(errno, 0);
 
403 +
    op.impl_ptr = shared_from_this();
 
404 +
    svc_.post(&op);
 
405 +
    return std::noop_coroutine();
 
406 +
}
 
407 +

 
408 +
inline std::coroutine_handle<>
 
409 +
select_socket::write_some(
 
410 +
    std::coroutine_handle<> h,
 
411 +
    capy::executor_ref ex,
 
412 +
    io_buffer_param param,
 
413 +
    std::stop_token token,
 
414 +
    std::error_code* ec,
 
415 +
    std::size_t* bytes_out)
 
416 +
{
 
417 +
    auto& op = wr_;
 
418 +
    op.reset();
 
419 +
    op.h         = h;
 
420 +
    op.ex        = ex;
 
421 +
    op.ec_out    = ec;
 
422 +
    op.bytes_out = bytes_out;
 
423 +
    op.fd        = fd_;
 
424 +
    op.start(token, this);
 
425 +

 
426 +
    capy::mutable_buffer bufs[select_write_op::max_buffers];
 
427 +
    op.iovec_count =
 
428 +
        static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
 
429 +

 
430 +
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
 
431 +
    {
 
432 +
        op.complete(0, 0);
 
433 +
        op.impl_ptr = shared_from_this();
 
434 +
        svc_.post(&op);
 
435 +
        return std::noop_coroutine();
 
436 +
    }
 
437 +

 
438 +
    for (int i = 0; i < op.iovec_count; ++i)
 
439 +
    {
 
440 +
        op.iovecs[i].iov_base = bufs[i].data();
 
441 +
        op.iovecs[i].iov_len  = bufs[i].size();
 
442 +
    }
 
443 +

 
444 +
    msghdr msg{};
 
445 +
    msg.msg_iov    = op.iovecs;
 
446 +
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
 
447 +

 
448 +
    ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
 
449 +

 
450 +
    if (n > 0)
 
451 +
    {
 
452 +
        op.complete(0, static_cast<std::size_t>(n));
 
453 +
        op.impl_ptr = shared_from_this();
 
454 +
        svc_.post(&op);
 
455 +
        return std::noop_coroutine();
 
456 +
    }
 
457 +

 
458 +
    if (errno == EAGAIN || errno == EWOULDBLOCK)
 
459 +
    {
 
460 +
        svc_.work_started();
 
461 +
        op.impl_ptr = shared_from_this();
 
462 +

 
463 +
        // Set registering BEFORE register_fd to close the race window where
 
464 +
        // reactor sees an event before we set registered.
 
465 +
        op.registered.store(
 
466 +
            select_registration_state::registering, std::memory_order_release);
 
467 +
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
 
468 +

 
469 +
        // Transition to registered. If this fails, reactor or cancel already
 
470 +
        // claimed the op (state is now unregistered), so we're done. However,
 
471 +
        // we must still deregister the fd because cancel's deregister_fd may
 
472 +
        // have run before our register_fd, leaving the fd orphaned.
 
473 +
        auto expected = select_registration_state::registering;
 
474 +
        if (!op.registered.compare_exchange_strong(
 
475 +
                expected, select_registration_state::registered,
 
476 +
                std::memory_order_acq_rel))
 
477 +
        {
 
478 +
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
 
479 +
            return std::noop_coroutine();
 
480 +
        }
 
481 +

 
482 +
        // If cancelled was set before we registered, handle it now.
 
483 +
        if (op.cancelled.load(std::memory_order_acquire))
 
484 +
        {
 
485 +
            auto prev = op.registered.exchange(
 
486 +
                select_registration_state::unregistered,
 
487 +
                std::memory_order_acq_rel);
 
488 +
            if (prev != select_registration_state::unregistered)
 
489 +
            {
 
490 +
                svc_.scheduler().deregister_fd(
 
491 +
                    fd_, select_scheduler::event_write);
 
492 +
                op.impl_ptr = shared_from_this();
 
493 +
                svc_.post(&op);
 
494 +
                svc_.work_finished();
 
495 +
            }
 
496 +
        }
 
497 +
        return std::noop_coroutine();
 
498 +
    }
 
499 +

 
500 +
    op.complete(errno ? errno : EIO, 0);
 
501 +
    op.impl_ptr = shared_from_this();
 
502 +
    svc_.post(&op);
 
503 +
    return std::noop_coroutine();
 
504 +
}
 
505 +

 
506 +
inline std::error_code
 
507 +
select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
 
508 +
{
 
509 +
    int how;
 
510 +
    switch (what)
 
511 +
    {
 
512 +
    case tcp_socket::shutdown_receive:
 
513 +
        how = SHUT_RD;
 
514 +
        break;
 
515 +
    case tcp_socket::shutdown_send:
 
516 +
        how = SHUT_WR;
 
517 +
        break;
 
518 +
    case tcp_socket::shutdown_both:
 
519 +
        how = SHUT_RDWR;
 
520 +
        break;
 
521 +
    default:
 
522 +
        return make_err(EINVAL);
 
523 +
    }
 
524 +
    if (::shutdown(fd_, how) != 0)
 
525 +
        return make_err(errno);
 
526 +
    return {};
 
527 +
}
 
528 +

 
529 +
inline std::error_code
 
530 +
select_socket::set_no_delay(bool value) noexcept
 
531 +
{
 
532 +
    int flag = value ? 1 : 0;
 
533 +
    if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
 
534 +
        return make_err(errno);
 
535 +
    return {};
 
536 +
}
 
537 +

 
538 +
inline bool
 
539 +
select_socket::no_delay(std::error_code& ec) const noexcept
 
540 +
{
 
541 +
    int flag      = 0;
 
542 +
    socklen_t len = sizeof(flag);
 
543 +
    if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
 
544 +
    {
 
545 +
        ec = make_err(errno);
 
546 +
        return false;
 
547 +
    }
 
548 +
    ec = {};
 
549 +
    return flag != 0;
 
550 +
}
 
551 +

 
552 +
inline std::error_code
 
553 +
select_socket::set_keep_alive(bool value) noexcept
 
554 +
{
 
555 +
    int flag = value ? 1 : 0;
 
556 +
    if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
 
557 +
        return make_err(errno);
 
558 +
    return {};
 
559 +
}
 
560 +

 
561 +
inline bool
 
562 +
select_socket::keep_alive(std::error_code& ec) const noexcept
 
563 +
{
 
564 +
    int flag      = 0;
 
565 +
    socklen_t len = sizeof(flag);
 
566 +
    if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
 
567 +
    {
 
568 +
        ec = make_err(errno);
 
569 +
        return false;
 
570 +
    }
 
571 +
    ec = {};
 
572 +
    return flag != 0;
 
573 +
}
 
574 +

 
575 +
inline std::error_code
 
576 +
select_socket::set_receive_buffer_size(int size) noexcept
 
577 +
{
 
578 +
    if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
 
579 +
        return make_err(errno);
 
580 +
    return {};
 
581 +
}
 
582 +

 
583 +
inline int
 
584 +
select_socket::receive_buffer_size(std::error_code& ec) const noexcept
 
585 +
{
 
586 +
    int size      = 0;
 
587 +
    socklen_t len = sizeof(size);
 
588 +
    if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
 
589 +
    {
 
590 +
        ec = make_err(errno);
 
591 +
        return 0;
 
592 +
    }
 
593 +
    ec = {};
 
594 +
    return size;
 
595 +
}
 
596 +

 
597 +
inline std::error_code
 
598 +
select_socket::set_send_buffer_size(int size) noexcept
 
599 +
{
 
600 +
    if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
 
601 +
        return make_err(errno);
 
602 +
    return {};
 
603 +
}
 
604 +

 
605 +
inline int
 
606 +
select_socket::send_buffer_size(std::error_code& ec) const noexcept
 
607 +
{
 
608 +
    int size      = 0;
 
609 +
    socklen_t len = sizeof(size);
 
610 +
    if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
 
611 +
    {
 
612 +
        ec = make_err(errno);
 
613 +
        return 0;
 
614 +
    }
 
615 +
    ec = {};
 
616 +
    return size;
 
617 +
}
 
618 +

 
619 +
inline std::error_code
 
620 +
select_socket::set_linger(bool enabled, int timeout) noexcept
 
621 +
{
 
622 +
    if (timeout < 0)
 
623 +
        return make_err(EINVAL);
 
624 +
    struct ::linger lg;
 
625 +
    lg.l_onoff  = enabled ? 1 : 0;
 
626 +
    lg.l_linger = timeout;
 
627 +
    if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
 
628 +
        return make_err(errno);
 
629 +
    return {};
 
630 +
}
 
631 +

 
632 +
inline tcp_socket::linger_options
 
633 +
select_socket::linger(std::error_code& ec) const noexcept
 
634 +
{
 
635 +
    struct ::linger lg{};
 
636 +
    socklen_t len = sizeof(lg);
 
637 +
    if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
 
638 +
    {
 
639 +
        ec = make_err(errno);
 
640 +
        return {};
 
641 +
    }
 
642 +
    ec = {};
 
643 +
    return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
 
644 +
}
 
645 +

 
646 +
inline void
 
647 +
select_socket::cancel() noexcept
 
648 +
{
 
649 +
    auto self = weak_from_this().lock();
 
650 +
    if (!self)
 
651 +
        return;
 
652 +

 
653 +
    auto cancel_op = [this, &self](select_op& op, int events) {
 
654 +
        auto prev = op.registered.exchange(
 
655 +
            select_registration_state::unregistered, std::memory_order_acq_rel);
 
656 +
        op.request_cancel();
 
657 +
        if (prev != select_registration_state::unregistered)
 
658 +
        {
 
659 +
            svc_.scheduler().deregister_fd(fd_, events);
 
660 +
            op.impl_ptr = self;
 
661 +
            svc_.post(&op);
 
662 +
            svc_.work_finished();
 
663 +
        }
 
664 +
    };
 
665 +

 
666 +
    cancel_op(conn_, select_scheduler::event_write);
 
667 +
    cancel_op(rd_, select_scheduler::event_read);
 
668 +
    cancel_op(wr_, select_scheduler::event_write);
 
669 +
}
 
670 +

 
671 +
inline void
 
672 +
select_socket::cancel_single_op(select_op& op) noexcept
 
673 +
{
 
674 +
    auto self = weak_from_this().lock();
 
675 +
    if (!self)
 
676 +
        return;
 
677 +

 
678 +
    // Called from stop_token callback to cancel a specific pending operation.
 
679 +
    auto prev = op.registered.exchange(
 
680 +
        select_registration_state::unregistered, std::memory_order_acq_rel);
 
681 +
    op.request_cancel();
 
682 +

 
683 +
    if (prev != select_registration_state::unregistered)
 
684 +
    {
 
685 +
        // Determine which event type to deregister
 
686 +
        int events = 0;
 
687 +
        if (&op == &conn_ || &op == &wr_)
 
688 +
            events = select_scheduler::event_write;
 
689 +
        else if (&op == &rd_)
 
690 +
            events = select_scheduler::event_read;
 
691 +

 
692 +
        svc_.scheduler().deregister_fd(fd_, events);
 
693 +

 
694 +
        op.impl_ptr = self;
 
695 +
        svc_.post(&op);
 
696 +
        svc_.work_finished();
 
697 +
    }
 
698 +
}
 
699 +

 
700 +
inline void
 
701 +
select_socket::close_socket() noexcept
 
702 +
{
 
703 +
    auto self = weak_from_this().lock();
 
704 +
    if (self)
 
705 +
    {
 
706 +
        auto cancel_op = [this, &self](select_op& op, int events) {
 
707 +
            auto prev = op.registered.exchange(
 
708 +
                select_registration_state::unregistered,
 
709 +
                std::memory_order_acq_rel);
 
710 +
            op.request_cancel();
 
711 +
            if (prev != select_registration_state::unregistered)
 
712 +
            {
 
713 +
                svc_.scheduler().deregister_fd(fd_, events);
 
714 +
                op.impl_ptr = self;
 
715 +
                svc_.post(&op);
 
716 +
                svc_.work_finished();
 
717 +
            }
 
718 +
        };
 
719 +

 
720 +
        cancel_op(conn_, select_scheduler::event_write);
 
721 +
        cancel_op(rd_, select_scheduler::event_read);
 
722 +
        cancel_op(wr_, select_scheduler::event_write);
 
723 +
    }
 
724 +

 
725 +
    if (fd_ >= 0)
 
726 +
    {
 
727 +
        svc_.scheduler().deregister_fd(
 
728 +
            fd_, select_scheduler::event_read | select_scheduler::event_write);
 
729 +
        ::close(fd_);
 
730 +
        fd_ = -1;
 
731 +
    }
 
732 +

 
733 +
    local_endpoint_  = endpoint{};
 
734 +
    remote_endpoint_ = endpoint{};
 
735 +
}
 
736 +

 
737 +
inline select_socket_service::select_socket_service(
 
738 +
    capy::execution_context& ctx)
 
739 +
    : state_(
 
740 +
          std::make_unique<select_socket_state>(
 
741 +
              ctx.use_service<select_scheduler>()))
 
742 +
{
 
743 +
}
 
744 +

 
745 +
inline select_socket_service::~select_socket_service() {}
 
746 +

 
747 +
inline void
 
748 +
select_socket_service::shutdown()
 
749 +
{
 
750 +
    std::lock_guard lock(state_->mutex_);
 
751 +

 
752 +
    while (auto* impl = state_->socket_list_.pop_front())
 
753 +
        impl->close_socket();
 
754 +

 
755 +
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
 
756 +
    // drains completed_ops_, calling destroy() on each queued op. Letting
 
757 +
    // ~state_ release the ptrs (during service destruction, after scheduler
 
758 +
    // shutdown) keeps every impl alive until all ops have been drained.
 
759 +
}
 
760 +

 
761 +
inline io_object::implementation*
 
762 +
select_socket_service::construct()
 
763 +
{
 
764 +
    auto impl = std::make_shared<select_socket>(*this);
 
765 +
    auto* raw = impl.get();
 
766 +

 
767 +
    {
 
768 +
        std::lock_guard lock(state_->mutex_);
 
769 +
        state_->socket_list_.push_back(raw);
 
770 +
        state_->socket_ptrs_.emplace(raw, std::move(impl));
 
771 +
    }
 
772 +

 
773 +
    return raw;
 
774 +
}
 
775 +

 
776 +
inline void
 
777 +
select_socket_service::destroy(io_object::implementation* impl)
 
778 +
{
 
779 +
    auto* select_impl = static_cast<select_socket*>(impl);
 
780 +
    select_impl->close_socket();
 
781 +
    std::lock_guard lock(state_->mutex_);
 
782 +
    state_->socket_list_.remove(select_impl);
 
783 +
    state_->socket_ptrs_.erase(select_impl);
 
784 +
}
 
785 +

 
786 +
inline std::error_code
 
787 +
select_socket_service::open_socket(tcp_socket::implementation& impl)
 
788 +
{
 
789 +
    auto* select_impl = static_cast<select_socket*>(&impl);
 
790 +
    select_impl->close_socket();
 
791 +

 
792 +
    int fd = ::socket(AF_INET, SOCK_STREAM, 0);
 
793 +
    if (fd < 0)
 
794 +
        return make_err(errno);
 
795 +

 
796 +
    // Set non-blocking and close-on-exec
 
797 +
    int flags = ::fcntl(fd, F_GETFL, 0);
 
798 +
    if (flags == -1)
 
799 +
    {
 
800 +
        int errn = errno;
 
801 +
        ::close(fd);
 
802 +
        return make_err(errn);
 
803 +
    }
 
804 +
    if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
 
805 +
    {
 
806 +
        int errn = errno;
 
807 +
        ::close(fd);
 
808 +
        return make_err(errn);
 
809 +
    }
 
810 +
    if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
 
811 +
    {
 
812 +
        int errn = errno;
 
813 +
        ::close(fd);
 
814 +
        return make_err(errn);
 
815 +
    }
 
816 +

 
817 +
    // Check fd is within select() limits
 
818 +
    if (fd >= FD_SETSIZE)
 
819 +
    {
 
820 +
        ::close(fd);
 
821 +
        return make_err(EMFILE); // Too many open files
 
822 +
    }
 
823 +

 
824 +
    select_impl->fd_ = fd;
 
825 +
    return {};
 
826 +
}
 
827 +

 
828 +
inline void
 
829 +
select_socket_service::close(io_object::handle& h)
 
830 +
{
 
831 +
    static_cast<select_socket*>(h.get())->close_socket();
 
832 +
}
 
833 +

 
834 +
inline void
 
835 +
select_socket_service::post(select_op* op)
 
836 +
{
 
837 +
    state_->sched_.post(op);
 
838 +
}
 
839 +

 
840 +
inline void
 
841 +
select_socket_service::work_started() noexcept
 
842 +
{
 
843 +
    state_->sched_.work_started();
 
844 +
}
 
845 +

 
846 +
inline void
 
847 +
select_socket_service::work_finished() noexcept
 
848 +
{
 
849 +
    state_->sched_.work_finished();
 
850 +
}
 
851 +

 
852 +
} // namespace boost::corosio::detail
 
853 +

 
854 +
#endif // BOOST_COROSIO_HAS_SELECT
 
855 +

 
856 +
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP