TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
12 :
13 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
14 : #include <boost/corosio/io/io_object.hpp>
15 : #include <boost/corosio/endpoint.hpp>
16 : #include <boost/corosio/detail/continuation_op.hpp>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 :
19 : #include <atomic>
20 : #include <coroutine>
21 : #include <cstddef>
22 : #include <memory>
23 : #include <optional>
24 : #include <stop_token>
25 : #include <system_error>
26 :
27 : #include <errno.h>
28 :
29 : #include <netinet/in.h>
30 : #include <sys/socket.h>
31 : #include <sys/uio.h>
32 :
33 : namespace boost::corosio::detail {
34 :
35 : /** Base operation for reactor-based backends.
36 :
37 : Holds per-operation state that depends on the concrete backend
38 : socket/acceptor types: coroutine handle, executor, output
39 : pointers, file descriptor, stop_callback, and type-specific
40 : impl pointers.
41 :
42 : Fields shared across all backends (errn, bytes_transferred,
43 : cancelled, impl_ptr, perform_io, complete) live in
44 : reactor_op_base so the scheduler and descriptor_state can
45 : access them without template instantiation.
46 :
47 : @tparam Socket The backend socket impl type (forward-declared).
48 : @tparam Acceptor The backend acceptor impl type (forward-declared).
49 : */
50 : template<class Socket, class Acceptor>
51 : struct reactor_op : reactor_op_base
52 : {
53 : // The op envelope — coroutine handle h, cont_op, executor ex, ec_out,
54 : // bytes_out, cancelled, stop_cb (+ its canceller), impl_ptr — lives in
55 : // coro_op (via reactor_op_base) and is shared with io_uring/IOCP.
56 : // reactor_op adds only the reactor-specific routing state below.
57 :
58 : /// File descriptor this operation targets.
59 : int fd = -1;
60 :
61 : /// Owning socket impl (for stop_token cancellation routing).
62 : Socket* socket_impl_ = nullptr;
63 :
64 : /// Owning acceptor impl (for stop_token cancellation routing).
65 : Acceptor* acceptor_impl_ = nullptr;
66 :
67 HIT 133100 : reactor_op() = default;
68 :
69 : /// Reset operation state for reuse.
70 451618 : void reset() noexcept
71 : {
72 451618 : fd = -1;
73 451618 : errn = 0;
74 451618 : bytes_transferred = 0;
75 451618 : cancelled.store(false, std::memory_order_relaxed);
76 451618 : impl_ptr.reset();
77 451618 : socket_impl_ = nullptr;
78 451618 : acceptor_impl_ = nullptr;
79 451618 : }
80 :
81 : /// Return true if this is a read-direction operation.
82 43724 : virtual bool is_read_operation() const noexcept
83 : {
84 43724 : return false;
85 : }
86 :
87 : /// Cancel this operation via the owning impl.
88 : virtual void cancel() noexcept = 0;
89 :
90 : /// coro_op cancellation hook (fired by the shared canceller when the
91 : /// stop_token requests cancellation): route to the impl-specific cancel().
92 214 : void on_cancel() noexcept override
93 : {
94 214 : cancel();
95 214 : }
96 :
97 : /// Destroy without invoking.
98 MIS 0 : void destroy() override
99 : {
100 0 : stop_cb.reset();
101 0 : reactor_op_base::destroy();
102 0 : }
103 :
104 : /// Arm the stop-token callback for a socket operation.
105 HIT 95160 : void start(std::stop_token const& token, Socket* impl)
106 : {
107 95160 : socket_impl_ = impl;
108 95160 : acceptor_impl_ = nullptr;
109 95160 : coro_op::start(token);
110 95160 : }
111 :
112 : /// Arm the stop-token callback for an acceptor operation.
113 7145 : void start(std::stop_token const& token, Acceptor* impl)
114 : {
115 7145 : socket_impl_ = nullptr;
116 7145 : acceptor_impl_ = impl;
117 7145 : coro_op::start(token);
118 7145 : }
119 : };
120 :
121 : /** Shared connect operation.
122 :
123 : Checks SO_ERROR for connect completion status. The operator()()
124 : and cancel() are provided by the concrete backend type.
125 :
126 : @tparam Base The backend's base op type.
127 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
128 : */
129 : template<class Base, class Endpoint = endpoint>
130 : struct reactor_connect_op : Base
131 : {
132 : /// Endpoint to connect to.
133 : Endpoint target_endpoint;
134 :
135 : /// Reset operation state for reuse.
136 7166 : void reset() noexcept
137 : {
138 7166 : Base::reset();
139 7166 : target_endpoint = Endpoint{};
140 7166 : }
141 :
142 7112 : void perform_io() noexcept override
143 : {
144 7112 : int err = 0;
145 7112 : socklen_t len = sizeof(err);
146 7112 : if (::getsockopt(this->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
147 MIS 0 : err = errno;
148 HIT 7112 : this->complete(err, 0);
149 7112 : }
150 : };
151 :
152 : /** Readiness-only wait operation.
153 :
154 : Does not perform any I/O syscall. Completion is signalled by
155 : the reactor delivering the requested edge event; reactor_descriptor_state
156 : calls complete() directly and never invokes perform_io().
157 :
158 : @tparam Base The backend's base op type.
159 : */
160 : template<class Base>
161 : struct reactor_wait_op : Base
162 : {
163 : /* Mirror of reactor_event_read from reactor_descriptor_state.hpp.
164 : Including that header from here would create an include cycle
165 : (descriptor_state -> reactor_op_base; reactor_op -> reactor_op_base),
166 : so we carry the value locally. Both must stay in sync. */
167 : static constexpr std::uint32_t read_event = 0x001;
168 :
169 : /// Which event bit this wait targets (reactor_event_read/write/error).
170 : std::uint32_t wait_event = 0;
171 :
172 56 : void reset() noexcept
173 : {
174 56 : Base::reset();
175 56 : wait_event = 0;
176 56 : }
177 :
178 MIS 0 : bool is_read_operation() const noexcept override
179 : {
180 0 : return wait_event == read_event;
181 : }
182 :
183 : /* perform_io() should never be called for a wait op — readiness
184 : IS the completion. Overridden here to satisfy the virtual and
185 : produce a safe result if called defensively. */
186 0 : void perform_io() noexcept override
187 : {
188 0 : this->complete(0, 0);
189 0 : }
190 : };
191 :
192 : /** Shared scatter-read operation.
193 :
194 : Uses readv() with an EINTR retry loop.
195 :
196 : @tparam Base The backend's base op type.
197 : */
198 : template<class Base>
199 : struct reactor_read_op : Base
200 : {
201 : /// Maximum scatter-gather buffer count.
202 : static constexpr std::size_t max_buffers = 16;
203 :
204 : /// Scatter-gather I/O vectors.
205 : iovec iovecs[max_buffers];
206 :
207 : /// Number of active I/O vectors.
208 : int iovec_count = 0;
209 :
210 : /// True for zero-length reads (completed immediately).
211 : bool empty_buffer_read = false;
212 :
213 : /// Return true (this is a read-direction operation).
214 HIT 44182 : bool is_read_operation() const noexcept override
215 : {
216 44182 : return !empty_buffer_read;
217 : }
218 :
219 218792 : void reset() noexcept
220 : {
221 218792 : Base::reset();
222 218792 : iovec_count = 0;
223 218792 : empty_buffer_read = false;
224 218792 : }
225 :
226 509 : void perform_io() noexcept override
227 : {
228 : ssize_t n;
229 : do
230 : {
231 509 : n = ::readv(this->fd, iovecs, iovec_count);
232 : }
233 509 : while (n < 0 && errno == EINTR);
234 :
235 509 : if (n >= 0)
236 278 : this->complete(0, static_cast<std::size_t>(n));
237 : else
238 231 : this->complete(errno, 0);
239 509 : }
240 : };
241 :
242 : /** Shared gather-write operation.
243 :
244 : Delegates the actual syscall to WritePolicy::write(fd, iovecs, count),
245 : which returns ssize_t (bytes written or -1 with errno set).
246 :
247 : @tparam Base The backend's base op type.
248 : @tparam WritePolicy Provides `static ssize_t write(int, iovec*, int)`.
249 : */
250 : template<class Base, class WritePolicy>
251 : struct reactor_write_op : Base
252 : {
253 : /// The write syscall policy type.
254 : using write_policy = WritePolicy;
255 :
256 : /// Maximum scatter-gather buffer count.
257 : static constexpr std::size_t max_buffers = 16;
258 :
259 : /// Scatter-gather I/O vectors.
260 : iovec iovecs[max_buffers];
261 :
262 : /// Number of active I/O vectors.
263 : int iovec_count = 0;
264 :
265 218313 : void reset() noexcept
266 : {
267 218313 : Base::reset();
268 218313 : iovec_count = 0;
269 218313 : }
270 :
271 15 : void perform_io() noexcept override
272 : {
273 15 : ssize_t n = WritePolicy::write(this->fd, iovecs, iovec_count);
274 15 : if (n >= 0)
275 13 : this->complete(0, static_cast<std::size_t>(n));
276 : else
277 2 : this->complete(errno, 0);
278 15 : }
279 : };
280 :
281 : /** Shared accept operation.
282 :
283 : Delegates the actual syscall to AcceptPolicy::do_accept(fd, peer_storage),
284 : which returns the accepted fd or -1 with errno set.
285 :
286 : @tparam Base The backend's base op type.
287 : @tparam AcceptPolicy Provides `static int do_accept(int, sockaddr_storage&)`.
288 : */
289 : template<class Base, class AcceptPolicy>
290 : struct reactor_accept_op : Base
291 : {
292 : /// File descriptor of the accepted connection.
293 : int accepted_fd = -1;
294 :
295 : /// Pointer to the peer socket implementation.
296 : io_object::implementation* peer_impl = nullptr;
297 :
298 : /// Output pointer for the accepted implementation.
299 : io_object::implementation** impl_out = nullptr;
300 :
301 : /// Peer address storage filled by accept.
302 : sockaddr_storage peer_storage{};
303 :
304 : /// Peer address length returned by accept.
305 : socklen_t peer_addrlen = 0;
306 :
307 7133 : void reset() noexcept
308 : {
309 7133 : Base::reset();
310 7133 : accepted_fd = -1;
311 7133 : peer_impl = nullptr;
312 7133 : impl_out = nullptr;
313 7133 : peer_storage = {};
314 7133 : peer_addrlen = 0;
315 7133 : }
316 :
317 7103 : void perform_io() noexcept override
318 : {
319 7103 : int new_fd = AcceptPolicy::do_accept(
320 7103 : this->fd, peer_storage, peer_addrlen);
321 7103 : if (new_fd >= 0)
322 : {
323 7103 : accepted_fd = new_fd;
324 7103 : this->complete(0, 0);
325 : }
326 : else
327 : {
328 MIS 0 : this->complete(errno, 0);
329 : }
330 HIT 7103 : }
331 : };
332 :
333 : /** Shared connected send operation for datagram sockets.
334 :
335 : Uses sendmsg() with msg_name=nullptr (connected mode).
336 :
337 : @tparam Base The backend's base op type.
338 : */
339 : template<class Base>
340 : struct reactor_send_op : Base
341 : {
342 : /// Maximum scatter-gather buffer count.
343 : static constexpr std::size_t max_buffers = 16;
344 :
345 : /// Scatter-gather I/O vectors.
346 : iovec iovecs[max_buffers];
347 :
348 : /// Number of active I/O vectors.
349 : int iovec_count = 0;
350 :
351 : /// User-supplied message flags.
352 : int msg_flags = 0;
353 :
354 28 : void reset() noexcept
355 : {
356 28 : Base::reset();
357 28 : iovec_count = 0;
358 28 : msg_flags = 0;
359 28 : }
360 :
361 MIS 0 : void perform_io() noexcept override
362 : {
363 0 : msghdr msg{};
364 0 : msg.msg_iov = iovecs;
365 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
366 :
367 : #ifdef MSG_NOSIGNAL
368 0 : int send_flags = msg_flags | MSG_NOSIGNAL;
369 : #else
370 : int send_flags = msg_flags;
371 : #endif
372 :
373 : ssize_t n;
374 : do
375 : {
376 0 : n = ::sendmsg(this->fd, &msg, send_flags);
377 : }
378 0 : while (n < 0 && errno == EINTR);
379 :
380 0 : if (n >= 0)
381 0 : this->complete(0, static_cast<std::size_t>(n));
382 : else
383 0 : this->complete(errno, 0);
384 0 : }
385 : };
386 :
387 : /** Shared connected recv operation for datagram sockets.
388 :
389 : Uses recvmsg() with msg_name=nullptr (connected mode).
390 : Unlike reactor_read_op, does not map n==0 to EOF
391 : (zero-length datagrams are valid).
392 :
393 : @tparam Base The backend's base op type.
394 : */
395 : template<class Base>
396 : struct reactor_recv_op : Base
397 : {
398 : /// Maximum scatter-gather buffer count.
399 : static constexpr std::size_t max_buffers = 16;
400 :
401 : /// Scatter-gather I/O vectors.
402 : iovec iovecs[max_buffers];
403 :
404 : /// Number of active I/O vectors.
405 : int iovec_count = 0;
406 :
407 : /// User-supplied message flags.
408 : int msg_flags = 0;
409 :
410 : /// Return true (this is a read-direction operation).
411 0 : bool is_read_operation() const noexcept override
412 : {
413 0 : return true;
414 : }
415 :
416 HIT 28 : void reset() noexcept
417 : {
418 28 : Base::reset();
419 28 : iovec_count = 0;
420 28 : msg_flags = 0;
421 28 : }
422 :
423 MIS 0 : void perform_io() noexcept override
424 : {
425 0 : msghdr msg{};
426 0 : msg.msg_iov = iovecs;
427 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
428 :
429 : ssize_t n;
430 : do
431 : {
432 0 : n = ::recvmsg(this->fd, &msg, msg_flags);
433 : }
434 0 : while (n < 0 && errno == EINTR);
435 :
436 0 : if (n >= 0)
437 0 : this->complete(0, static_cast<std::size_t>(n));
438 : else
439 0 : this->complete(errno, 0);
440 0 : }
441 : };
442 :
443 : /** Shared send_to operation for datagram sockets.
444 :
445 : Uses sendmsg() with the destination endpoint in msg_name.
446 :
447 : @tparam Base The backend's base op type.
448 : */
449 : template<class Base>
450 : struct reactor_send_to_op : Base
451 : {
452 : /// Maximum scatter-gather buffer count.
453 : static constexpr std::size_t max_buffers = 16;
454 :
455 : /// Scatter-gather I/O vectors.
456 : iovec iovecs[max_buffers];
457 :
458 : /// Number of active I/O vectors.
459 : int iovec_count = 0;
460 :
461 : /// Destination address storage.
462 : sockaddr_storage dest_storage{};
463 :
464 : /// Destination address length.
465 : socklen_t dest_len = 0;
466 :
467 : /// User-supplied message flags.
468 : int msg_flags = 0;
469 :
470 HIT 44 : void reset() noexcept
471 : {
472 44 : Base::reset();
473 44 : iovec_count = 0;
474 44 : dest_storage = {};
475 44 : dest_len = 0;
476 44 : msg_flags = 0;
477 44 : }
478 :
479 MIS 0 : void perform_io() noexcept override
480 : {
481 0 : msghdr msg{};
482 0 : msg.msg_name = &dest_storage;
483 0 : msg.msg_namelen = dest_len;
484 0 : msg.msg_iov = iovecs;
485 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
486 :
487 : #ifdef MSG_NOSIGNAL
488 0 : int send_flags = msg_flags | MSG_NOSIGNAL;
489 : #else
490 : int send_flags = msg_flags;
491 : #endif
492 :
493 : ssize_t n;
494 : do
495 : {
496 0 : n = ::sendmsg(this->fd, &msg, send_flags);
497 : }
498 0 : while (n < 0 && errno == EINTR);
499 :
500 0 : if (n >= 0)
501 0 : this->complete(0, static_cast<std::size_t>(n));
502 : else
503 0 : this->complete(errno, 0);
504 0 : }
505 : };
506 :
507 : /** Shared recv_from operation for datagram sockets.
508 :
509 : Uses recvmsg() with msg_name to capture the source endpoint.
510 :
511 : @tparam Base The backend's base op type.
512 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
513 : */
514 : template<class Base, class Endpoint = endpoint>
515 : struct reactor_recv_from_op : Base
516 : {
517 : /// Maximum scatter-gather buffer count.
518 : static constexpr std::size_t max_buffers = 16;
519 :
520 : /// Scatter-gather I/O vectors.
521 : iovec iovecs[max_buffers];
522 :
523 : /// Number of active I/O vectors.
524 : int iovec_count = 0;
525 :
526 : /// Source address storage filled by recvmsg.
527 : sockaddr_storage source_storage{};
528 :
529 : /// Actual source address length returned by recvmsg.
530 : socklen_t source_addrlen = 0;
531 :
532 : /// Output pointer for the source endpoint (set by do_recv_from).
533 : Endpoint* source_out = nullptr;
534 :
535 : /// User-supplied message flags.
536 : int msg_flags = 0;
537 :
538 : /// Return true (this is a read-direction operation).
539 0 : bool is_read_operation() const noexcept override
540 : {
541 0 : return true;
542 : }
543 :
544 HIT 58 : void reset() noexcept
545 : {
546 58 : Base::reset();
547 58 : iovec_count = 0;
548 58 : source_storage = {};
549 58 : source_addrlen = 0;
550 58 : source_out = nullptr;
551 58 : msg_flags = 0;
552 58 : }
553 :
554 2 : void perform_io() noexcept override
555 : {
556 2 : msghdr msg{};
557 2 : msg.msg_name = &source_storage;
558 2 : msg.msg_namelen = sizeof(source_storage);
559 2 : msg.msg_iov = iovecs;
560 2 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
561 :
562 : ssize_t n;
563 : do
564 : {
565 2 : n = ::recvmsg(this->fd, &msg, msg_flags);
566 : }
567 2 : while (n < 0 && errno == EINTR);
568 :
569 2 : if (n >= 0)
570 : {
571 2 : source_addrlen = msg.msg_namelen;
572 2 : this->complete(0, static_cast<std::size_t>(n));
573 : }
574 : else
575 MIS 0 : this->complete(errno, 0);
576 HIT 2 : }
577 : };
578 :
579 : } // namespace boost::corosio::detail
580 :
581 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
|