1 -
//
 
2 -
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
 
3 -
//
 
4 -
// Distributed under the Boost Software License, Version 1.0. (See accompanying
 
5 -
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 
6 -
//
 
7 -
// Official repository: https://github.com/cppalliance/corosio
 
8 -
//
 
9 -

 
10 -
#ifndef BOOST_COROSIO_IO_STREAM_HPP
 
11 -
#define BOOST_COROSIO_IO_STREAM_HPP
 
12 -

 
13 -
#include <boost/corosio/detail/config.hpp>
 
14 -
#include <boost/corosio/io_object.hpp>
 
15 -
#include <boost/capy/io_result.hpp>
 
16 -
#include <boost/corosio/io_buffer_param.hpp>
 
17 -
#include <boost/capy/ex/executor_ref.hpp>
 
18 -
#include <boost/capy/ex/io_env.hpp>
 
19 -
#include <system_error>
 
20 -

 
21 -
#include <coroutine>
 
22 -
#include <cstddef>
 
23 -
#include <stop_token>
 
24 -

 
25 -
namespace boost::corosio {
 
26 -

 
27 -
/** Platform stream with read/write operations.
 
28 -

 
29 -
    This base class provides the fundamental async read and write
 
30 -
    operations for kernel-level stream I/O. Derived classes wrap
 
31 -
    OS-specific stream implementations (sockets, pipes, etc.) and
 
32 -
    satisfy @ref capy::ReadStream and @ref capy::WriteStream concepts.
 
33 -

 
34 -
    @par Semantics
 
35 -
    Concrete classes wrap direct platform I/O completed by the kernel.
 
36 -
    Functions taking `io_stream&` signal "platform implementation
 
37 -
    required" - use this when you need actual kernel I/O rather than
 
38 -
    a mock or test double.
 
39 -

 
40 -
    For generic stream algorithms that work with test mocks,
 
41 -
    use `template<capy::Stream S>` instead of `io_stream&`.
 
42 -

 
43 -
    @par Thread Safety
 
44 -
    Distinct objects: Safe.
 
45 -
    Shared objects: Unsafe. All calls to a single stream must be made
 
46 -
    from the same implicit or explicit serialization context.
 
47 -

 
48 -
    @par Example
 
49 -
    @code
 
50 -
    // Read until buffer full or EOF
 
51 -
    capy::task<> read_all( io_stream& stream, std::span<char> buf )
 
52 -
    {
 
53 -
        std::size_t total = 0;
 
54 -
        while( total < buf.size() )
 
55 -
        {
 
56 -
            auto [ec, n] = co_await stream.read_some(
 
57 -
                capy::buffer( buf.data() + total, buf.size() - total ) );
 
58 -
            if( ec == capy::cond::eof )
 
59 -
                break;
 
60 -
            if( ec.failed() )
 
61 -
                capy::detail::throw_system_error( ec );
 
62 -
            total += n;
 
63 -
        }
 
64 -
    }
 
65 -
    @endcode
 
66 -

 
67 -
    @see capy::Stream, capy::ReadStream, capy::WriteStream, tcp_socket
 
68 -
*/
 
69 -
class BOOST_COROSIO_DECL io_stream : public io_object
 
70 -
{
 
71 -
public:
 
72 -
    /** Asynchronously read data from the stream.
 
73 -

 
74 -
        This operation suspends the calling coroutine and initiates a
 
75 -
        kernel-level read. The coroutine resumes when the operation
 
76 -
        completes.
 
77 -

 
78 -
        @li The operation completes when:
 
79 -
        @li At least one byte has been read into the buffer sequence
 
80 -
        @li The peer closes the connection (EOF)
 
81 -
        @li An error occurs
 
82 -
        @li The operation is cancelled via stop token or `cancel()`
 
83 -

 
84 -
        @par Concurrency
 
85 -
        At most one write operation may be in flight concurrently with
 
86 -
        this read. No other read operations may be in flight until this
 
87 -
        operation completes. Note that concurrent in-flight operations
 
88 -
        does not imply the initiating calls may be made concurrently;
 
89 -
        all calls must be serialized.
 
90 -

 
91 -
        @par Cancellation
 
92 -
        Supports cancellation via `std::stop_token` propagated through
 
93 -
        the IoAwaitable protocol, or via the I/O object's `cancel()`
 
94 -
        member. When cancelled, the operation completes with an error
 
95 -
        that compares equal to `capy::cond::canceled`.
 
96 -

 
97 -
        @par Preconditions
 
98 -
        The stream must be open and connected.
 
99 -

 
100 -
        @param buffers The buffer sequence to read data into. The caller
 
101 -
            retains ownership and must ensure validity until the
 
102 -
            operation completes.
 
103 -

 
104 -
        @return An awaitable yielding `(error_code, std::size_t)`.
 
105 -
            On success, `bytes_transferred` contains the number of bytes
 
106 -
            read. Compare error codes to conditions, not specific values:
 
107 -
            @li `capy::cond::eof` - Peer closed connection (TCP FIN)
 
108 -
            @li `capy::cond::canceled` - Operation was cancelled
 
109 -

 
110 -
        @par Example
 
111 -
        @code
 
112 -
        // Simple read with error handling
 
113 -
        auto [ec, n] = co_await stream.read_some( capy::buffer( buf ) );
 
114 -
        if( ec == capy::cond::eof )
 
115 -
            co_return;  // Connection closed gracefully
 
116 -
        if( ec.failed() )
 
117 -
            capy::detail::throw_system_error( ec );
 
118 -
        process( buf, n );
 
119 -
        @endcode
 
120 -

 
121 -
        @note This operation may read fewer bytes than the buffer
 
122 -
            capacity. Use a loop or `capy::async_read` to read an
 
123 -
            exact amount.
 
124 -

 
125 -
        @see write_some, capy::async_read
 
126 -
    */
 
127 -
    template<capy::MutableBufferSequence MB>
 
128 -
    auto read_some(MB const& buffers)
 
129 -
    {
 
130 -
        return read_some_awaitable<MB>(*this, buffers);
 
131 -
    }
 
132 -

 
133 -
    /** Asynchronously write data to the stream.
 
134 -

 
135 -
        This operation suspends the calling coroutine and initiates a
 
136 -
        kernel-level write. The coroutine resumes when the operation
 
137 -
        completes.
 
138 -

 
139 -
        @li The operation completes when:
 
140 -
        @li At least one byte has been written from the buffer sequence
 
141 -
        @li An error occurs (including connection reset by peer)
 
142 -
        @li The operation is cancelled via stop token or `cancel()`
 
143 -

 
144 -
        @par Concurrency
 
145 -
        At most one read operation may be in flight concurrently with
 
146 -
        this write. No other write operations may be in flight until
 
147 -
        this operation completes. Note that concurrent in-flight
 
148 -
        operations does not imply the initiating calls may be made
 
149 -
        concurrently; all calls must be serialized.
 
150 -

 
151 -
        @par Cancellation
 
152 -
        Supports cancellation via `std::stop_token` propagated through
 
153 -
        the IoAwaitable protocol, or via the I/O object's `cancel()`
 
154 -
        member. When cancelled, the operation completes with an error
 
155 -
        that compares equal to `capy::cond::canceled`.
 
156 -

 
157 -
        @par Preconditions
 
158 -
        The stream must be open and connected.
 
159 -

 
160 -
        @param buffers The buffer sequence containing data to write.
 
161 -
            The caller retains ownership and must ensure validity
 
162 -
            until the operation completes.
 
163 -

 
164 -
        @return An awaitable yielding `(error_code, std::size_t)`.
 
165 -
            On success, `bytes_transferred` contains the number of bytes
 
166 -
            written. Compare error codes to conditions, not specific
 
167 -
            values:
 
168 -
            @li `capy::cond::canceled` - Operation was cancelled
 
169 -
            @li `std::errc::broken_pipe` - Peer closed connection
 
170 -

 
171 -
        @par Example
 
172 -
        @code
 
173 -
        // Write all data
 
174 -
        std::string_view data = "Hello, World!";
 
175 -
        std::size_t written = 0;
 
176 -
        while( written < data.size() )
 
177 -
        {
 
178 -
            auto [ec, n] = co_await stream.write_some(
 
179 -
                capy::buffer( data.data() + written,
 
180 -
                              data.size() - written ) );
 
181 -
            if( ec.failed() )
 
182 -
                capy::detail::throw_system_error( ec );
 
183 -
            written += n;
 
184 -
        }
 
185 -
        @endcode
 
186 -

 
187 -
        @note This operation may write fewer bytes than the buffer
 
188 -
            contains. Use a loop or `capy::async_write` to write
 
189 -
            all data.
 
190 -

 
191 -
        @see read_some, capy::async_write
 
192 -
    */
 
193 -
    template<capy::ConstBufferSequence CB>
 
194 -
    auto write_some(CB const& buffers)
 
195 -
    {
 
196 -
        return write_some_awaitable<CB>(*this, buffers);
 
197 -
    }
 
198 -

 
199 -
protected:
 
200 -
    /// Awaitable for async read operations.
 
201 -
    template<class MutableBufferSequence>
 
202 -
    struct read_some_awaitable
 
203 -
    {
 
204 -
        io_stream& ios_;
 
205 -
        MutableBufferSequence buffers_;
 
206 -
        std::stop_token token_;
 
207 -
        mutable std::error_code ec_;
 
208 -
        mutable std::size_t bytes_transferred_ = 0;
 
209 -

 
210 -
        read_some_awaitable(
 
211 -
            io_stream& ios, MutableBufferSequence buffers) noexcept
 
212 -
            : ios_(ios)
 
213 -
            , buffers_(std::move(buffers))
 
214 -
        {
 
215 -
        }
 
216 -

 
217 -
        bool await_ready() const noexcept
 
218 -
        {
 
219 -
            return token_.stop_requested();
 
220 -
        }
 
221 -

 
222 -
        capy::io_result<std::size_t> await_resume() const noexcept
 
223 -
        {
 
224 -
            if (token_.stop_requested())
 
225 -
                return {make_error_code(std::errc::operation_canceled), 0};
 
226 -
            return {ec_, bytes_transferred_};
 
227 -
        }
 
228 -

 
229 -
        auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
 
230 -
            -> std::coroutine_handle<>
 
231 -
        {
 
232 -
            token_ = env->stop_token;
 
233 -
            return ios_.get().read_some(
 
234 -
                h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
 
235 -
        }
 
236 -
    };
 
237 -

 
238 -
    /// Awaitable for async write operations.
 
239 -
    template<class ConstBufferSequence>
 
240 -
    struct write_some_awaitable
 
241 -
    {
 
242 -
        io_stream& ios_;
 
243 -
        ConstBufferSequence buffers_;
 
244 -
        std::stop_token token_;
 
245 -
        mutable std::error_code ec_;
 
246 -
        mutable std::size_t bytes_transferred_ = 0;
 
247 -

 
248 -
        write_some_awaitable(
 
249 -
            io_stream& ios, ConstBufferSequence buffers) noexcept
 
250 -
            : ios_(ios)
 
251 -
            , buffers_(std::move(buffers))
 
252 -
        {
 
253 -
        }
 
254 -

 
255 -
        bool await_ready() const noexcept
 
256 -
        {
 
257 -
            return token_.stop_requested();
 
258 -
        }
 
259 -

 
260 -
        capy::io_result<std::size_t> await_resume() const noexcept
 
261 -
        {
 
262 -
            if (token_.stop_requested())
 
263 -
                return {make_error_code(std::errc::operation_canceled), 0};
 
264 -
            return {ec_, bytes_transferred_};
 
265 -
        }
 
266 -

 
267 -
        auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
 
268 -
            -> std::coroutine_handle<>
 
269 -
        {
 
270 -
            token_ = env->stop_token;
 
271 -
            return ios_.get().write_some(
 
272 -
                h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
 
273 -
        }
 
274 -
    };
 
275 -

 
276 -
public:
 
277 -
    /** Platform-specific stream implementation interface.
 
278 -

 
279 -
        Derived classes implement this interface to provide kernel-level
 
280 -
        read and write operations for each supported platform (IOCP,
 
281 -
        epoll, kqueue, io_uring).
 
282 -
    */
 
283 -
    struct implementation : io_object::implementation
 
284 -
    {
 
285 -
        /// Initiate platform read operation.
 
286 -
        virtual std::coroutine_handle<> read_some(
 
287 -
            std::coroutine_handle<>,
 
288 -
            capy::executor_ref,
 
289 -
            io_buffer_param,
 
290 -
            std::stop_token,
 
291 -
            std::error_code*,
 
292 -
            std::size_t*) = 0;
 
293 -

 
294 -
        /// Initiate platform write operation.
 
295 -
        virtual std::coroutine_handle<> write_some(
 
296 -
            std::coroutine_handle<>,
 
297 -
            capy::executor_ref,
 
298 -
            io_buffer_param,
 
299 -
            std::stop_token,
 
300 -
            std::error_code*,
 
301 -
            std::size_t*) = 0;
 
302 -
    };
 
303 -

 
304 -
protected:
 
305 -
    /// Construct stream from a handle.
 
306 -
    explicit io_stream(handle h) noexcept : io_object(std::move(h)) {}
 
307 -

 
308 -
private:
 
309 -
    /// Return implementation downcasted to stream interface.
 
310 -
    implementation& get() const noexcept
 
311 -
    {
 
312 -
        return *static_cast<implementation*>(h_.get());
 
313 -
    }
 
314 -
};
 
315 -

 
316 -
} // namespace boost::corosio
 
317 -

 
318 -
#endif