LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_op.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 69.7 % 165 115 50
Test Date: 2026-06-09 16:09:19 Functions: 63.8 % 160 102 58

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
      12                 : 
      13                 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
      14                 : #include <boost/corosio/io/io_object.hpp>
      15                 : #include <boost/corosio/endpoint.hpp>
      16                 : #include <boost/corosio/detail/continuation_op.hpp>
      17                 : #include <boost/capy/ex/executor_ref.hpp>
      18                 : 
      19                 : #include <atomic>
      20                 : #include <coroutine>
      21                 : #include <cstddef>
      22                 : #include <memory>
      23                 : #include <optional>
      24                 : #include <stop_token>
      25                 : #include <system_error>
      26                 : 
      27                 : #include <errno.h>
      28                 : 
      29                 : #include <netinet/in.h>
      30                 : #include <sys/socket.h>
      31                 : #include <sys/uio.h>
      32                 : 
      33                 : namespace boost::corosio::detail {
      34                 : 
      35                 : /** Base operation for reactor-based backends.
      36                 : 
      37                 :     Holds per-operation state that depends on the concrete backend
      38                 :     socket/acceptor types: coroutine handle, executor, output
      39                 :     pointers, file descriptor, stop_callback, and type-specific
      40                 :     impl pointers.
      41                 : 
      42                 :     Fields shared across all backends (errn, bytes_transferred,
      43                 :     cancelled, impl_ptr, perform_io, complete) live in
      44                 :     reactor_op_base so the scheduler and descriptor_state can
      45                 :     access them without template instantiation.
      46                 : 
      47                 :     @tparam Socket The backend socket impl type (forward-declared).
      48                 :     @tparam Acceptor The backend acceptor impl type (forward-declared).
      49                 : */
      50                 : template<class Socket, class Acceptor>
      51                 : struct reactor_op : reactor_op_base
      52                 : {
      53                 :     // The op envelope — coroutine handle h, cont_op, executor ex, ec_out,
      54                 :     // bytes_out, cancelled, stop_cb (+ its canceller), impl_ptr — lives in
      55                 :     // coro_op (via reactor_op_base) and is shared with io_uring/IOCP.
      56                 :     // reactor_op adds only the reactor-specific routing state below.
      57                 : 
      58                 :     /// File descriptor this operation targets.
      59                 :     int fd = -1;
      60                 : 
      61                 :     /// Owning socket impl (for stop_token cancellation routing).
      62                 :     Socket* socket_impl_ = nullptr;
      63                 : 
      64                 :     /// Owning acceptor impl (for stop_token cancellation routing).
      65                 :     Acceptor* acceptor_impl_ = nullptr;
      66                 : 
      67 HIT      133100 :     reactor_op() = default;
      68                 : 
      69                 :     /// Reset operation state for reuse.
      70          451618 :     void reset() noexcept
      71                 :     {
      72          451618 :         fd                = -1;
      73          451618 :         errn              = 0;
      74          451618 :         bytes_transferred = 0;
      75          451618 :         cancelled.store(false, std::memory_order_relaxed);
      76          451618 :         impl_ptr.reset();
      77          451618 :         socket_impl_   = nullptr;
      78          451618 :         acceptor_impl_ = nullptr;
      79          451618 :     }
      80                 : 
      81                 :     /// Return true if this is a read-direction operation.
      82           43724 :     virtual bool is_read_operation() const noexcept
      83                 :     {
      84           43724 :         return false;
      85                 :     }
      86                 : 
      87                 :     /// Cancel this operation via the owning impl.
      88                 :     virtual void cancel() noexcept = 0;
      89                 : 
      90                 :     /// coro_op cancellation hook (fired by the shared canceller when the
      91                 :     /// stop_token requests cancellation): route to the impl-specific cancel().
      92             214 :     void on_cancel() noexcept override
      93                 :     {
      94             214 :         cancel();
      95             214 :     }
      96                 : 
      97                 :     /// Destroy without invoking.
      98 MIS           0 :     void destroy() override
      99                 :     {
     100               0 :         stop_cb.reset();
     101               0 :         reactor_op_base::destroy();
     102               0 :     }
     103                 : 
     104                 :     /// Arm the stop-token callback for a socket operation.
     105 HIT       95160 :     void start(std::stop_token const& token, Socket* impl)
     106                 :     {
     107           95160 :         socket_impl_   = impl;
     108           95160 :         acceptor_impl_ = nullptr;
     109           95160 :         coro_op::start(token);
     110           95160 :     }
     111                 : 
     112                 :     /// Arm the stop-token callback for an acceptor operation.
     113            7145 :     void start(std::stop_token const& token, Acceptor* impl)
     114                 :     {
     115            7145 :         socket_impl_   = nullptr;
     116            7145 :         acceptor_impl_ = impl;
     117            7145 :         coro_op::start(token);
     118            7145 :     }
     119                 : };
     120                 : 
     121                 : /** Shared connect operation.
     122                 : 
     123                 :     Checks SO_ERROR for connect completion status. The operator()()
     124                 :     and cancel() are provided by the concrete backend type.
     125                 : 
     126                 :     @tparam Base The backend's base op type.
     127                 :     @tparam Endpoint The endpoint type (endpoint or local_endpoint).
     128                 : */
     129                 : template<class Base, class Endpoint = endpoint>
     130                 : struct reactor_connect_op : Base
     131                 : {
     132                 :     /// Endpoint to connect to.
     133                 :     Endpoint target_endpoint;
     134                 : 
     135                 :     /// Reset operation state for reuse.
     136            7166 :     void reset() noexcept
     137                 :     {
     138            7166 :         Base::reset();
     139            7166 :         target_endpoint = Endpoint{};
     140            7166 :     }
     141                 : 
     142            7112 :     void perform_io() noexcept override
     143                 :     {
     144            7112 :         int err       = 0;
     145            7112 :         socklen_t len = sizeof(err);
     146            7112 :         if (::getsockopt(this->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     147 MIS           0 :             err = errno;
     148 HIT        7112 :         this->complete(err, 0);
     149            7112 :     }
     150                 : };
     151                 : 
     152                 : /** Readiness-only wait operation.
     153                 : 
     154                 :     Does not perform any I/O syscall. Completion is signalled by
     155                 :     the reactor delivering the requested edge event; reactor_descriptor_state
     156                 :     calls complete() directly and never invokes perform_io().
     157                 : 
     158                 :     @tparam Base The backend's base op type.
     159                 : */
     160                 : template<class Base>
     161                 : struct reactor_wait_op : Base
     162                 : {
     163                 :     /* Mirror of reactor_event_read from reactor_descriptor_state.hpp.
     164                 :        Including that header from here would create an include cycle
     165                 :        (descriptor_state -> reactor_op_base; reactor_op -> reactor_op_base),
     166                 :        so we carry the value locally. Both must stay in sync. */
     167                 :     static constexpr std::uint32_t read_event = 0x001;
     168                 : 
     169                 :     /// Which event bit this wait targets (reactor_event_read/write/error).
     170                 :     std::uint32_t wait_event = 0;
     171                 : 
     172              56 :     void reset() noexcept
     173                 :     {
     174              56 :         Base::reset();
     175              56 :         wait_event = 0;
     176              56 :     }
     177                 : 
     178 MIS           0 :     bool is_read_operation() const noexcept override
     179                 :     {
     180               0 :         return wait_event == read_event;
     181                 :     }
     182                 : 
     183                 :     /* perform_io() should never be called for a wait op — readiness
     184                 :        IS the completion. Overridden here to satisfy the virtual and
     185                 :        produce a safe result if called defensively. */
     186               0 :     void perform_io() noexcept override
     187                 :     {
     188               0 :         this->complete(0, 0);
     189               0 :     }
     190                 : };
     191                 : 
     192                 : /** Shared scatter-read operation.
     193                 : 
     194                 :     Uses readv() with an EINTR retry loop.
     195                 : 
     196                 :     @tparam Base The backend's base op type.
     197                 : */
     198                 : template<class Base>
     199                 : struct reactor_read_op : Base
     200                 : {
     201                 :     /// Maximum scatter-gather buffer count.
     202                 :     static constexpr std::size_t max_buffers = 16;
     203                 : 
     204                 :     /// Scatter-gather I/O vectors.
     205                 :     iovec iovecs[max_buffers];
     206                 : 
     207                 :     /// Number of active I/O vectors.
     208                 :     int iovec_count = 0;
     209                 : 
     210                 :     /// True for zero-length reads (completed immediately).
     211                 :     bool empty_buffer_read = false;
     212                 : 
     213                 :     /// Return true (this is a read-direction operation).
     214 HIT       44182 :     bool is_read_operation() const noexcept override
     215                 :     {
     216           44182 :         return !empty_buffer_read;
     217                 :     }
     218                 : 
     219          218792 :     void reset() noexcept
     220                 :     {
     221          218792 :         Base::reset();
     222          218792 :         iovec_count       = 0;
     223          218792 :         empty_buffer_read = false;
     224          218792 :     }
     225                 : 
     226             509 :     void perform_io() noexcept override
     227                 :     {
     228                 :         ssize_t n;
     229                 :         do
     230                 :         {
     231             509 :             n = ::readv(this->fd, iovecs, iovec_count);
     232                 :         }
     233             509 :         while (n < 0 && errno == EINTR);
     234                 : 
     235             509 :         if (n >= 0)
     236             278 :             this->complete(0, static_cast<std::size_t>(n));
     237                 :         else
     238             231 :             this->complete(errno, 0);
     239             509 :     }
     240                 : };
     241                 : 
     242                 : /** Shared gather-write operation.
     243                 : 
     244                 :     Delegates the actual syscall to WritePolicy::write(fd, iovecs, count),
     245                 :     which returns ssize_t (bytes written or -1 with errno set).
     246                 : 
     247                 :     @tparam Base The backend's base op type.
     248                 :     @tparam WritePolicy Provides `static ssize_t write(int, iovec*, int)`.
     249                 : */
     250                 : template<class Base, class WritePolicy>
     251                 : struct reactor_write_op : Base
     252                 : {
     253                 :     /// The write syscall policy type.
     254                 :     using write_policy = WritePolicy;
     255                 : 
     256                 :     /// Maximum scatter-gather buffer count.
     257                 :     static constexpr std::size_t max_buffers = 16;
     258                 : 
     259                 :     /// Scatter-gather I/O vectors.
     260                 :     iovec iovecs[max_buffers];
     261                 : 
     262                 :     /// Number of active I/O vectors.
     263                 :     int iovec_count = 0;
     264                 : 
     265          218313 :     void reset() noexcept
     266                 :     {
     267          218313 :         Base::reset();
     268          218313 :         iovec_count = 0;
     269          218313 :     }
     270                 : 
     271              15 :     void perform_io() noexcept override
     272                 :     {
     273              15 :         ssize_t n = WritePolicy::write(this->fd, iovecs, iovec_count);
     274              15 :         if (n >= 0)
     275              13 :             this->complete(0, static_cast<std::size_t>(n));
     276                 :         else
     277               2 :             this->complete(errno, 0);
     278              15 :     }
     279                 : };
     280                 : 
     281                 : /** Shared accept operation.
     282                 : 
     283                 :     Delegates the actual syscall to AcceptPolicy::do_accept(fd, peer_storage),
     284                 :     which returns the accepted fd or -1 with errno set.
     285                 : 
     286                 :     @tparam Base The backend's base op type.
     287                 :     @tparam AcceptPolicy Provides `static int do_accept(int, sockaddr_storage&)`.
     288                 : */
     289                 : template<class Base, class AcceptPolicy>
     290                 : struct reactor_accept_op : Base
     291                 : {
     292                 :     /// File descriptor of the accepted connection.
     293                 :     int accepted_fd = -1;
     294                 : 
     295                 :     /// Pointer to the peer socket implementation.
     296                 :     io_object::implementation* peer_impl = nullptr;
     297                 : 
     298                 :     /// Output pointer for the accepted implementation.
     299                 :     io_object::implementation** impl_out = nullptr;
     300                 : 
     301                 :     /// Peer address storage filled by accept.
     302                 :     sockaddr_storage peer_storage{};
     303                 : 
     304                 :     /// Peer address length returned by accept.
     305                 :     socklen_t peer_addrlen = 0;
     306                 : 
     307            7133 :     void reset() noexcept
     308                 :     {
     309            7133 :         Base::reset();
     310            7133 :         accepted_fd   = -1;
     311            7133 :         peer_impl     = nullptr;
     312            7133 :         impl_out      = nullptr;
     313            7133 :         peer_storage  = {};
     314            7133 :         peer_addrlen  = 0;
     315            7133 :     }
     316                 : 
     317            7103 :     void perform_io() noexcept override
     318                 :     {
     319            7103 :         int new_fd = AcceptPolicy::do_accept(
     320            7103 :             this->fd, peer_storage, peer_addrlen);
     321            7103 :         if (new_fd >= 0)
     322                 :         {
     323            7103 :             accepted_fd = new_fd;
     324            7103 :             this->complete(0, 0);
     325                 :         }
     326                 :         else
     327                 :         {
     328 MIS           0 :             this->complete(errno, 0);
     329                 :         }
     330 HIT        7103 :     }
     331                 : };
     332                 : 
     333                 : /** Shared connected send operation for datagram sockets.
     334                 : 
     335                 :     Uses sendmsg() with msg_name=nullptr (connected mode).
     336                 : 
     337                 :     @tparam Base The backend's base op type.
     338                 : */
     339                 : template<class Base>
     340                 : struct reactor_send_op : Base
     341                 : {
     342                 :     /// Maximum scatter-gather buffer count.
     343                 :     static constexpr std::size_t max_buffers = 16;
     344                 : 
     345                 :     /// Scatter-gather I/O vectors.
     346                 :     iovec iovecs[max_buffers];
     347                 : 
     348                 :     /// Number of active I/O vectors.
     349                 :     int iovec_count = 0;
     350                 : 
     351                 :     /// User-supplied message flags.
     352                 :     int msg_flags = 0;
     353                 : 
     354              28 :     void reset() noexcept
     355                 :     {
     356              28 :         Base::reset();
     357              28 :         iovec_count = 0;
     358              28 :         msg_flags   = 0;
     359              28 :     }
     360                 : 
     361 MIS           0 :     void perform_io() noexcept override
     362                 :     {
     363               0 :         msghdr msg{};
     364               0 :         msg.msg_iov    = iovecs;
     365               0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     366                 : 
     367                 : #ifdef MSG_NOSIGNAL
     368               0 :         int send_flags = msg_flags | MSG_NOSIGNAL;
     369                 : #else
     370                 :         int send_flags = msg_flags;
     371                 : #endif
     372                 : 
     373                 :         ssize_t n;
     374                 :         do
     375                 :         {
     376               0 :             n = ::sendmsg(this->fd, &msg, send_flags);
     377                 :         }
     378               0 :         while (n < 0 && errno == EINTR);
     379                 : 
     380               0 :         if (n >= 0)
     381               0 :             this->complete(0, static_cast<std::size_t>(n));
     382                 :         else
     383               0 :             this->complete(errno, 0);
     384               0 :     }
     385                 : };
     386                 : 
     387                 : /** Shared connected recv operation for datagram sockets.
     388                 : 
     389                 :     Uses recvmsg() with msg_name=nullptr (connected mode).
     390                 :     Unlike reactor_read_op, does not map n==0 to EOF
     391                 :     (zero-length datagrams are valid).
     392                 : 
     393                 :     @tparam Base The backend's base op type.
     394                 : */
     395                 : template<class Base>
     396                 : struct reactor_recv_op : Base
     397                 : {
     398                 :     /// Maximum scatter-gather buffer count.
     399                 :     static constexpr std::size_t max_buffers = 16;
     400                 : 
     401                 :     /// Scatter-gather I/O vectors.
     402                 :     iovec iovecs[max_buffers];
     403                 : 
     404                 :     /// Number of active I/O vectors.
     405                 :     int iovec_count = 0;
     406                 : 
     407                 :     /// User-supplied message flags.
     408                 :     int msg_flags = 0;
     409                 : 
     410                 :     /// Return true (this is a read-direction operation).
     411               0 :     bool is_read_operation() const noexcept override
     412                 :     {
     413               0 :         return true;
     414                 :     }
     415                 : 
     416 HIT          28 :     void reset() noexcept
     417                 :     {
     418              28 :         Base::reset();
     419              28 :         iovec_count = 0;
     420              28 :         msg_flags   = 0;
     421              28 :     }
     422                 : 
     423 MIS           0 :     void perform_io() noexcept override
     424                 :     {
     425               0 :         msghdr msg{};
     426               0 :         msg.msg_iov    = iovecs;
     427               0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     428                 : 
     429                 :         ssize_t n;
     430                 :         do
     431                 :         {
     432               0 :             n = ::recvmsg(this->fd, &msg, msg_flags);
     433                 :         }
     434               0 :         while (n < 0 && errno == EINTR);
     435                 : 
     436               0 :         if (n >= 0)
     437               0 :             this->complete(0, static_cast<std::size_t>(n));
     438                 :         else
     439               0 :             this->complete(errno, 0);
     440               0 :     }
     441                 : };
     442                 : 
     443                 : /** Shared send_to operation for datagram sockets.
     444                 : 
     445                 :     Uses sendmsg() with the destination endpoint in msg_name.
     446                 : 
     447                 :     @tparam Base The backend's base op type.
     448                 : */
     449                 : template<class Base>
     450                 : struct reactor_send_to_op : Base
     451                 : {
     452                 :     /// Maximum scatter-gather buffer count.
     453                 :     static constexpr std::size_t max_buffers = 16;
     454                 : 
     455                 :     /// Scatter-gather I/O vectors.
     456                 :     iovec iovecs[max_buffers];
     457                 : 
     458                 :     /// Number of active I/O vectors.
     459                 :     int iovec_count = 0;
     460                 : 
     461                 :     /// Destination address storage.
     462                 :     sockaddr_storage dest_storage{};
     463                 : 
     464                 :     /// Destination address length.
     465                 :     socklen_t dest_len = 0;
     466                 : 
     467                 :     /// User-supplied message flags.
     468                 :     int msg_flags = 0;
     469                 : 
     470 HIT          44 :     void reset() noexcept
     471                 :     {
     472              44 :         Base::reset();
     473              44 :         iovec_count  = 0;
     474              44 :         dest_storage = {};
     475              44 :         dest_len     = 0;
     476              44 :         msg_flags    = 0;
     477              44 :     }
     478                 : 
     479 MIS           0 :     void perform_io() noexcept override
     480                 :     {
     481               0 :         msghdr msg{};
     482               0 :         msg.msg_name    = &dest_storage;
     483               0 :         msg.msg_namelen = dest_len;
     484               0 :         msg.msg_iov     = iovecs;
     485               0 :         msg.msg_iovlen  = static_cast<std::size_t>(iovec_count);
     486                 : 
     487                 : #ifdef MSG_NOSIGNAL
     488               0 :         int send_flags = msg_flags | MSG_NOSIGNAL;
     489                 : #else
     490                 :         int send_flags = msg_flags;
     491                 : #endif
     492                 : 
     493                 :         ssize_t n;
     494                 :         do
     495                 :         {
     496               0 :             n = ::sendmsg(this->fd, &msg, send_flags);
     497                 :         }
     498               0 :         while (n < 0 && errno == EINTR);
     499                 : 
     500               0 :         if (n >= 0)
     501               0 :             this->complete(0, static_cast<std::size_t>(n));
     502                 :         else
     503               0 :             this->complete(errno, 0);
     504               0 :     }
     505                 : };
     506                 : 
     507                 : /** Shared recv_from operation for datagram sockets.
     508                 : 
     509                 :     Uses recvmsg() with msg_name to capture the source endpoint.
     510                 : 
     511                 :     @tparam Base The backend's base op type.
     512                 :     @tparam Endpoint The endpoint type (endpoint or local_endpoint).
     513                 : */
     514                 : template<class Base, class Endpoint = endpoint>
     515                 : struct reactor_recv_from_op : Base
     516                 : {
     517                 :     /// Maximum scatter-gather buffer count.
     518                 :     static constexpr std::size_t max_buffers = 16;
     519                 : 
     520                 :     /// Scatter-gather I/O vectors.
     521                 :     iovec iovecs[max_buffers];
     522                 : 
     523                 :     /// Number of active I/O vectors.
     524                 :     int iovec_count = 0;
     525                 : 
     526                 :     /// Source address storage filled by recvmsg.
     527                 :     sockaddr_storage source_storage{};
     528                 : 
     529                 :     /// Actual source address length returned by recvmsg.
     530                 :     socklen_t source_addrlen = 0;
     531                 : 
     532                 :     /// Output pointer for the source endpoint (set by do_recv_from).
     533                 :     Endpoint* source_out = nullptr;
     534                 : 
     535                 :     /// User-supplied message flags.
     536                 :     int msg_flags = 0;
     537                 : 
     538                 :     /// Return true (this is a read-direction operation).
     539               0 :     bool is_read_operation() const noexcept override
     540                 :     {
     541               0 :         return true;
     542                 :     }
     543                 : 
     544 HIT          58 :     void reset() noexcept
     545                 :     {
     546              58 :         Base::reset();
     547              58 :         iovec_count    = 0;
     548              58 :         source_storage = {};
     549              58 :         source_addrlen = 0;
     550              58 :         source_out     = nullptr;
     551              58 :         msg_flags      = 0;
     552              58 :     }
     553                 : 
     554               2 :     void perform_io() noexcept override
     555                 :     {
     556               2 :         msghdr msg{};
     557               2 :         msg.msg_name    = &source_storage;
     558               2 :         msg.msg_namelen = sizeof(source_storage);
     559               2 :         msg.msg_iov     = iovecs;
     560               2 :         msg.msg_iovlen  = static_cast<std::size_t>(iovec_count);
     561                 : 
     562                 :         ssize_t n;
     563                 :         do
     564                 :         {
     565               2 :             n = ::recvmsg(this->fd, &msg, msg_flags);
     566                 :         }
     567               2 :         while (n < 0 && errno == EINTR);
     568                 : 
     569               2 :         if (n >= 0)
     570                 :         {
     571               2 :             source_addrlen = msg.msg_namelen;
     572               2 :             this->complete(0, static_cast<std::size_t>(n));
     573                 :         }
     574                 :         else
     575 MIS           0 :             this->complete(errno, 0);
     576 HIT           2 :     }
     577                 : };
     578                 : 
     579                 : } // namespace boost::corosio::detail
     580                 : 
     581                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
        

Generated by: LCOV version 2.3