94.44% Lines (68/72) 100.00% Functions (23/23)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2026 Steve Gerbino 2   // Copyright (c) 2026 Steve Gerbino
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/corosio 7   // Official repository: https://github.com/cppalliance/corosio
8   // 8   //
9   9  
10   #ifndef BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_SOCKET_HPP 10   #ifndef BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_SOCKET_HPP
11   #define BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_SOCKET_HPP 11   #define BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_SOCKET_HPP
12   12  
13   #include <boost/corosio/local_stream_socket.hpp> 13   #include <boost/corosio/local_stream_socket.hpp>
14   #include <boost/corosio/backend.hpp> 14   #include <boost/corosio/backend.hpp>
15   15  
16   #ifndef BOOST_COROSIO_MRDOCS 16   #ifndef BOOST_COROSIO_MRDOCS
17   #if BOOST_COROSIO_HAS_EPOLL 17   #if BOOST_COROSIO_HAS_EPOLL
18   #include <boost/corosio/native/detail/epoll/epoll_types.hpp> 18   #include <boost/corosio/native/detail/epoll/epoll_types.hpp>
19   #endif 19   #endif
20   20  
21   #if BOOST_COROSIO_HAS_SELECT 21   #if BOOST_COROSIO_HAS_SELECT
22   #include <boost/corosio/native/detail/select/select_types.hpp> 22   #include <boost/corosio/native/detail/select/select_types.hpp>
23   #endif 23   #endif
24   24  
25   #if BOOST_COROSIO_HAS_KQUEUE 25   #if BOOST_COROSIO_HAS_KQUEUE
26   #include <boost/corosio/native/detail/kqueue/kqueue_types.hpp> 26   #include <boost/corosio/native/detail/kqueue/kqueue_types.hpp>
27   #endif 27   #endif
28   28  
29   #if BOOST_COROSIO_HAS_IO_URING 29   #if BOOST_COROSIO_HAS_IO_URING
30   #include <boost/corosio/native/detail/io_uring/io_uring_types.hpp> 30   #include <boost/corosio/native/detail/io_uring/io_uring_types.hpp>
31   #endif 31   #endif
32   32  
33   #if BOOST_COROSIO_HAS_IOCP 33   #if BOOST_COROSIO_HAS_IOCP
34   #include <boost/corosio/native/detail/iocp/win_local_stream_service.hpp> 34   #include <boost/corosio/native/detail/iocp/win_local_stream_service.hpp>
35   #endif 35   #endif
36   #endif // !BOOST_COROSIO_MRDOCS 36   #endif // !BOOST_COROSIO_MRDOCS
37   37  
38   namespace boost::corosio { 38   namespace boost::corosio {
39   39  
40   /** An asynchronous Unix stream socket with devirtualized I/O operations. 40   /** An asynchronous Unix stream socket with devirtualized I/O operations.
41   41  
42   This class template inherits from @ref local_stream_socket and 42   This class template inherits from @ref local_stream_socket and
43   shadows the async operations (`read_some`, `write_some`, 43   shadows the async operations (`read_some`, `write_some`,
44   `connect`) with versions that call the backend implementation 44   `connect`) with versions that call the backend implementation
45   directly, allowing the compiler to inline through the entire 45   directly, allowing the compiler to inline through the entire
46   call chain. 46   call chain.
47   47  
48   Non-async operations (`open`, `close`, `cancel`, socket options) 48   Non-async operations (`open`, `close`, `cancel`, socket options)
49   remain unchanged and dispatch through the compiled library. 49   remain unchanged and dispatch through the compiled library.
50   50  
51   A `native_local_stream_socket` IS-A `local_stream_socket` and 51   A `native_local_stream_socket` IS-A `local_stream_socket` and
52   can be passed to any function expecting `local_stream_socket&` 52   can be passed to any function expecting `local_stream_socket&`
53   or `io_stream&`, in which case virtual dispatch is used 53   or `io_stream&`, in which case virtual dispatch is used
54   transparently. 54   transparently.
55   55  
56   @tparam Backend A backend tag value (e.g., `epoll`) whose type 56   @tparam Backend A backend tag value (e.g., `epoll`) whose type
57   provides the concrete implementation types. 57   provides the concrete implementation types.
58   58  
59   @par Thread Safety 59   @par Thread Safety
60   Same as @ref local_stream_socket. 60   Same as @ref local_stream_socket.
61   61  
62   @par Example 62   @par Example
63   @code 63   @code
64   #include <boost/corosio/native/native_local_stream_socket.hpp> 64   #include <boost/corosio/native/native_local_stream_socket.hpp>
65   65  
66   native_io_context<epoll> ctx; 66   native_io_context<epoll> ctx;
67   native_local_stream_socket<epoll> s(ctx); 67   native_local_stream_socket<epoll> s(ctx);
68   s.open(); 68   s.open();
69   auto [ec] = co_await s.connect(local_endpoint("/tmp/my.sock")); 69   auto [ec] = co_await s.connect(local_endpoint("/tmp/my.sock"));
70   @endcode 70   @endcode
71   71  
72   @see local_stream_socket, epoll_t, iocp_t 72   @see local_stream_socket, epoll_t, iocp_t
73   */ 73   */
74   template<auto Backend> 74   template<auto Backend>
75   class native_local_stream_socket : public local_stream_socket 75   class native_local_stream_socket : public local_stream_socket
76   { 76   {
77   using backend_type = decltype(Backend); 77   using backend_type = decltype(Backend);
78   using impl_type = typename backend_type::local_stream_socket_type; 78   using impl_type = typename backend_type::local_stream_socket_type;
79   using service_type = typename backend_type::local_stream_service_type; 79   using service_type = typename backend_type::local_stream_service_type;
80   80  
HITCBC 81   20 impl_type& get_impl() noexcept 81   20 impl_type& get_impl() noexcept
82   { 82   {
HITCBC 83   20 return *static_cast<impl_type*>(h_.get()); 83   20 return *static_cast<impl_type*>(h_.get());
84   } 84   }
85   85  
86   template<class MutableBufferSequence> 86   template<class MutableBufferSequence>
87   struct native_read_awaitable 87   struct native_read_awaitable
88   { 88   {
89   native_local_stream_socket& self_; 89   native_local_stream_socket& self_;
90   MutableBufferSequence buffers_; 90   MutableBufferSequence buffers_;
91   std::stop_token token_; 91   std::stop_token token_;
92   mutable std::error_code ec_; 92   mutable std::error_code ec_;
93   mutable std::size_t bytes_transferred_ = 0; 93   mutable std::size_t bytes_transferred_ = 0;
94   94  
HITCBC 95   4 native_read_awaitable( 95   4 native_read_awaitable(
96   native_local_stream_socket& self, 96   native_local_stream_socket& self,
97   MutableBufferSequence buffers) noexcept 97   MutableBufferSequence buffers) noexcept
HITCBC 98   4 : self_(self) 98   4 : self_(self)
HITCBC 99   4 , buffers_(std::move(buffers)) 99   4 , buffers_(std::move(buffers))
100   { 100   {
HITCBC 101   4 } 101   4 }
102   102  
HITCBC 103   4 bool await_ready() const noexcept 103   4 bool await_ready() const noexcept
104   { 104   {
HITCBC 105   4 return token_.stop_requested(); 105   4 return token_.stop_requested();
106   } 106   }
107   107  
HITCBC 108   4 capy::io_result<std::size_t> await_resume() const noexcept 108   4 capy::io_result<std::size_t> await_resume() const noexcept
109   { 109   {
HITCBC 110   4 if (token_.stop_requested()) 110   4 if (token_.stop_requested())
MISUBC 111   return {make_error_code(std::errc::operation_canceled), 0}; 111   return {make_error_code(std::errc::operation_canceled), 0};
HITCBC 112   4 return {ec_, bytes_transferred_}; 112   4 return {ec_, bytes_transferred_};
113   } 113   }
114   114  
HITCBC 115   4 auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) 115   4 auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
116   -> std::coroutine_handle<> 116   -> std::coroutine_handle<>
117   { 117   {
HITCBC 118   4 token_ = env->stop_token; 118   4 token_ = env->stop_token;
HITCBC 119   12 return self_.get_impl().read_some( 119   12 return self_.get_impl().read_some(
HITCBC 120   12 h, env->executor, buffers_, token_, &ec_, &bytes_transferred_); 120   12 h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
121   } 121   }
122   }; 122   };
123   123  
124   template<class ConstBufferSequence> 124   template<class ConstBufferSequence>
125   struct native_write_awaitable 125   struct native_write_awaitable
126   { 126   {
127   native_local_stream_socket& self_; 127   native_local_stream_socket& self_;
128   ConstBufferSequence buffers_; 128   ConstBufferSequence buffers_;
129   std::stop_token token_; 129   std::stop_token token_;
130   mutable std::error_code ec_; 130   mutable std::error_code ec_;
131   mutable std::size_t bytes_transferred_ = 0; 131   mutable std::size_t bytes_transferred_ = 0;
132   132  
HITCBC 133   4 native_write_awaitable( 133   4 native_write_awaitable(
134   native_local_stream_socket& self, 134   native_local_stream_socket& self,
135   ConstBufferSequence buffers) noexcept 135   ConstBufferSequence buffers) noexcept
HITCBC 136   4 : self_(self) 136   4 : self_(self)
HITCBC 137   4 , buffers_(std::move(buffers)) 137   4 , buffers_(std::move(buffers))
138   { 138   {
HITCBC 139   4 } 139   4 }
140   140  
HITCBC 141   4 bool await_ready() const noexcept 141   4 bool await_ready() const noexcept
142   { 142   {
HITCBC 143   4 return token_.stop_requested(); 143   4 return token_.stop_requested();
144   } 144   }
145   145  
HITCBC 146   4 capy::io_result<std::size_t> await_resume() const noexcept 146   4 capy::io_result<std::size_t> await_resume() const noexcept
147   { 147   {
HITCBC 148   4 if (token_.stop_requested()) 148   4 if (token_.stop_requested())
MISUBC 149   return {make_error_code(std::errc::operation_canceled), 0}; 149   return {make_error_code(std::errc::operation_canceled), 0};
HITCBC 150   4 return {ec_, bytes_transferred_}; 150   4 return {ec_, bytes_transferred_};
151   } 151   }
152   152  
HITCBC 153   4 auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) 153   4 auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
154   -> std::coroutine_handle<> 154   -> std::coroutine_handle<>
155   { 155   {
HITCBC 156   4 token_ = env->stop_token; 156   4 token_ = env->stop_token;
HITCBC 157   12 return self_.get_impl().write_some( 157   12 return self_.get_impl().write_some(
HITCBC 158   12 h, env->executor, buffers_, token_, &ec_, &bytes_transferred_); 158   12 h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
159   } 159   }
160   }; 160   };
161   161  
162   struct native_wait_awaitable 162   struct native_wait_awaitable
163   { 163   {
164   native_local_stream_socket& self_; 164   native_local_stream_socket& self_;
165   wait_type w_; 165   wait_type w_;
166   std::stop_token token_; 166   std::stop_token token_;
167   mutable std::error_code ec_; 167   mutable std::error_code ec_;
168   168  
HITCBC 169   2 native_wait_awaitable( 169   2 native_wait_awaitable(
170   native_local_stream_socket& self, wait_type w) noexcept 170   native_local_stream_socket& self, wait_type w) noexcept
HITCBC 171   2 : self_(self) 171   2 : self_(self)
HITCBC 172   2 , w_(w) 172   2 , w_(w)
173   { 173   {
HITCBC 174   2 } 174   2 }
175   175  
HITCBC 176   2 bool await_ready() const noexcept 176   2 bool await_ready() const noexcept
177   { 177   {
HITCBC 178   2 return token_.stop_requested(); 178   2 return token_.stop_requested();
179   } 179   }
180   180  
HITCBC 181   2 capy::io_result<> await_resume() const noexcept 181   2 capy::io_result<> await_resume() const noexcept
182   { 182   {
HITCBC 183   2 if (token_.stop_requested()) 183   2 if (token_.stop_requested())
MISUBC 184   return {make_error_code(std::errc::operation_canceled)}; 184   return {make_error_code(std::errc::operation_canceled)};
HITCBC 185   2 return {ec_}; 185   2 return {ec_};
186   } 186   }
187   187  
HITCBC 188   2 auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) 188   2 auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
189   -> std::coroutine_handle<> 189   -> std::coroutine_handle<>
190   { 190   {
HITCBC 191   2 token_ = env->stop_token; 191   2 token_ = env->stop_token;
HITCBC 192   6 return self_.get_impl().wait( 192   6 return self_.get_impl().wait(
HITCBC 193   6 h, env->executor, w_, token_, &ec_); 193   6 h, env->executor, w_, token_, &ec_);
194   } 194   }
195   }; 195   };
196   196  
197   struct native_connect_awaitable 197   struct native_connect_awaitable
198   { 198   {
199   native_local_stream_socket& self_; 199   native_local_stream_socket& self_;
200   corosio::local_endpoint endpoint_; 200   corosio::local_endpoint endpoint_;
201   std::stop_token token_; 201   std::stop_token token_;
202   mutable std::error_code ec_; 202   mutable std::error_code ec_;
203   203  
HITCBC 204   10 native_connect_awaitable( 204   10 native_connect_awaitable(
205   native_local_stream_socket& self, 205   native_local_stream_socket& self,
206   corosio::local_endpoint ep) noexcept 206   corosio::local_endpoint ep) noexcept
HITCBC 207   10 : self_(self) 207   10 : self_(self)
HITCBC 208   10 , endpoint_(ep) 208   10 , endpoint_(ep)
209   { 209   {
HITCBC 210   10 } 210   10 }
211   211  
HITCBC 212   10 bool await_ready() const noexcept 212   10 bool await_ready() const noexcept
213   { 213   {
HITCBC 214   10 return token_.stop_requested(); 214   10 return token_.stop_requested();
215   } 215   }
216   216  
HITCBC 217   10 capy::io_result<> await_resume() const noexcept 217   10 capy::io_result<> await_resume() const noexcept
218   { 218   {
HITCBC 219   10 if (token_.stop_requested()) 219   10 if (token_.stop_requested())
MISUBC 220   return {make_error_code(std::errc::operation_canceled)}; 220   return {make_error_code(std::errc::operation_canceled)};
HITCBC 221   10 return {ec_}; 221   10 return {ec_};
222   } 222   }
223   223  
HITCBC 224   10 auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env) 224   10 auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
225   -> std::coroutine_handle<> 225   -> std::coroutine_handle<>
226   { 226   {
HITCBC 227   10 token_ = env->stop_token; 227   10 token_ = env->stop_token;
HITCBC 228   30 return self_.get_impl().connect( 228   30 return self_.get_impl().connect(
HITCBC 229   30 h, env->executor, endpoint_, token_, &ec_); 229   30 h, env->executor, endpoint_, token_, &ec_);
230   } 230   }
231   }; 231   };
232   232  
233   public: 233   public:
234   /** Construct a native socket from an execution context. 234   /** Construct a native socket from an execution context.
235   235  
236   @param ctx The execution context that will own this socket. 236   @param ctx The execution context that will own this socket.
237   */ 237   */
HITCBC 238   28 explicit native_local_stream_socket(capy::execution_context& ctx) 238   28 explicit native_local_stream_socket(capy::execution_context& ctx)
HITCBC 239   28 : io_object(create_handle<service_type>(ctx)) 239   28 : io_object(create_handle<service_type>(ctx))
240   { 240   {
HITCBC 241   28 } 241   28 }
242   242  
243   /** Construct a native socket from an executor. 243   /** Construct a native socket from an executor.
244   244  
245   @param ex The executor whose context will own the socket. 245   @param ex The executor whose context will own the socket.
246   */ 246   */
247   template<class Ex> 247   template<class Ex>
248   requires(!std::same_as< 248   requires(!std::same_as<
249   std::remove_cvref_t<Ex>, 249   std::remove_cvref_t<Ex>,
250   native_local_stream_socket>) && 250   native_local_stream_socket>) &&
251   capy::Executor<Ex> 251   capy::Executor<Ex>
252   explicit native_local_stream_socket(Ex const& ex) 252   explicit native_local_stream_socket(Ex const& ex)
253   : native_local_stream_socket(ex.context()) 253   : native_local_stream_socket(ex.context())
254   { 254   {
255   } 255   }
256   256  
257   /// Move construct. 257   /// Move construct.
HITCBC 258   4 native_local_stream_socket(native_local_stream_socket&&) noexcept = default; 258   4 native_local_stream_socket(native_local_stream_socket&&) noexcept = default;
259   259  
260   /// Move assign. 260   /// Move assign.
261   native_local_stream_socket& 261   native_local_stream_socket&
262   operator=(native_local_stream_socket&&) noexcept = default; 262   operator=(native_local_stream_socket&&) noexcept = default;
263   263  
264   native_local_stream_socket(native_local_stream_socket const&) = delete; 264   native_local_stream_socket(native_local_stream_socket const&) = delete;
265   native_local_stream_socket& 265   native_local_stream_socket&
266   operator=(native_local_stream_socket const&) = delete; 266   operator=(native_local_stream_socket const&) = delete;
267   267  
268   /** Asynchronously read data from the socket. 268   /** Asynchronously read data from the socket.
269   269  
270   Calls the backend implementation directly, bypassing virtual 270   Calls the backend implementation directly, bypassing virtual
271   dispatch. Otherwise identical to @ref io_stream::read_some. 271   dispatch. Otherwise identical to @ref io_stream::read_some.
272   272  
273   @param buffers The buffer sequence to read into. 273   @param buffers The buffer sequence to read into.
274   274  
275   @return An awaitable yielding `(error_code, std::size_t)`. 275   @return An awaitable yielding `(error_code, std::size_t)`.
276   */ 276   */
277   template<capy::MutableBufferSequence MB> 277   template<capy::MutableBufferSequence MB>
HITCBC 278   4 auto read_some(MB const& buffers) 278   4 auto read_some(MB const& buffers)
279   { 279   {
HITCBC 280   4 return native_read_awaitable<MB>(*this, buffers); 280   4 return native_read_awaitable<MB>(*this, buffers);
281   } 281   }
282   282  
283   /** Asynchronously write data to the socket. 283   /** Asynchronously write data to the socket.
284   284  
285   Calls the backend implementation directly, bypassing virtual 285   Calls the backend implementation directly, bypassing virtual
286   dispatch. Otherwise identical to @ref io_stream::write_some. 286   dispatch. Otherwise identical to @ref io_stream::write_some.
287   287  
288   @param buffers The buffer sequence to write from. 288   @param buffers The buffer sequence to write from.
289   289  
290   @return An awaitable yielding `(error_code, std::size_t)`. 290   @return An awaitable yielding `(error_code, std::size_t)`.
291   */ 291   */
292   template<capy::ConstBufferSequence CB> 292   template<capy::ConstBufferSequence CB>
HITCBC 293   4 auto write_some(CB const& buffers) 293   4 auto write_some(CB const& buffers)
294   { 294   {
HITCBC 295   4 return native_write_awaitable<CB>(*this, buffers); 295   4 return native_write_awaitable<CB>(*this, buffers);
296   } 296   }
297   297  
298   /** Asynchronously connect to a remote endpoint. 298   /** Asynchronously connect to a remote endpoint.
299   299  
300   Calls the backend implementation directly, bypassing virtual 300   Calls the backend implementation directly, bypassing virtual
301   dispatch. Otherwise identical to @ref local_stream_socket::connect. 301   dispatch. Otherwise identical to @ref local_stream_socket::connect.
302   302  
303   If the socket is not already open, it is opened automatically. 303   If the socket is not already open, it is opened automatically.
304   304  
305   @param ep The local endpoint (path) to connect to. 305   @param ep The local endpoint (path) to connect to.
306   306  
307   @return An awaitable yielding `io_result<>`. 307   @return An awaitable yielding `io_result<>`.
308   308  
309   @throws std::system_error if the socket needs to be opened 309   @throws std::system_error if the socket needs to be opened
310   and the open fails. 310   and the open fails.
311   */ 311   */
HITCBC 312   10 auto connect(corosio::local_endpoint ep) 312   10 auto connect(corosio::local_endpoint ep)
313   { 313   {
HITCBC 314   10 if (!is_open()) 314   10 if (!is_open())
HITCBC 315   10 open(); 315   10 open();
HITCBC 316   10 return native_connect_awaitable(*this, ep); 316   10 return native_connect_awaitable(*this, ep);
317   } 317   }
318   318  
319   /** Asynchronously wait for the socket to be ready. 319   /** Asynchronously wait for the socket to be ready.
320   320  
321   Calls the backend implementation directly, bypassing virtual 321   Calls the backend implementation directly, bypassing virtual
322   dispatch. Otherwise identical to @ref local_stream_socket::wait. 322   dispatch. Otherwise identical to @ref local_stream_socket::wait.
323   323  
324   @param w The wait direction (read, write, or error). 324   @param w The wait direction (read, write, or error).
325   325  
326   @return An awaitable yielding `io_result<>`. 326   @return An awaitable yielding `io_result<>`.
327   */ 327   */
HITCBC 328   2 [[nodiscard]] auto wait(wait_type w) 328   2 [[nodiscard]] auto wait(wait_type w)
329   { 329   {
HITCBC 330   2 return native_wait_awaitable(*this, w); 330   2 return native_wait_awaitable(*this, w);
331   } 331   }
332   }; 332   };
333   333  
334   } // namespace boost::corosio 334   } // namespace boost::corosio
335   335  
336   #endif // BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_SOCKET_HPP 336   #endif // BOOST_COROSIO_NATIVE_NATIVE_LOCAL_STREAM_SOCKET_HPP