LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_op_complete.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 95.0 % 101 96 5
Test Date: 2026-06-09 16:09:19 Functions: 100.0 % 40 40

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      14                 : #include <boost/corosio/native/detail/coro_op_complete.hpp>
      15                 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
      16                 : #include <boost/corosio/native/detail/make_err.hpp>
      17                 : #include <boost/corosio/io/io_object.hpp>
      18                 : 
      19                 : #include <coroutine>
      20                 : #include <mutex>
      21                 : #include <utility>
      22                 : 
      23                 : #include <netinet/in.h>
      24                 : #include <sys/socket.h>
      25                 : #include <unistd.h>
      26                 : 
      27                 : namespace boost::corosio::detail {
      28                 : 
      29                 : /** Complete a base read/write operation.
      30                 : 
      31                 :     Translates the recorded errno and cancellation state into
      32                 :     an error_code, stores the byte count, then resumes the
      33                 :     caller via symmetric transfer.
      34                 : 
      35                 :     @tparam Op The concrete operation type.
      36                 :     @param op The operation to complete.
      37                 : */
      38                 : template<typename Op>
      39                 : void
      40 HIT       87906 : complete_io_op(Op& op)
      41                 : {
      42           87906 :     op.stop_cb.reset();
      43           87906 :     op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
      44                 : 
      45                 :     // is_read_operation() already folds in the empty-buffer case (it
      46                 :     // returns false for a zero-length read), so empty_buffer stays false
      47                 :     // here and the shared EOF test reduces to the reactor's original
      48                 :     // `is_read && bytes == 0`.
      49          175812 :     decode_io_result(
      50                 :         op.ec_out,
      51           87906 :         op.cancelled.load(std::memory_order_acquire),
      52           87906 :         op.errn != 0 ? make_err(op.errn) : std::error_code{},
      53           87906 :         op.is_read_operation(), op.bytes_transferred, /*empty_buffer=*/false);
      54                 : 
      55           87906 :     *op.bytes_out = op.bytes_transferred;
      56                 : 
      57           87906 :     coro_resume(&op);
      58           87906 : }
      59                 : 
      60                 : /** Complete a datagram recv operation (connected mode).
      61                 : 
      62                 :     Like complete_io_op but does not translate zero bytes into
      63                 :     EOF. Zero-length datagrams are valid and should be reported
      64                 :     as success with 0 bytes transferred.
      65                 : 
      66                 :     @param op The operation to complete.
      67                 : */
      68                 : template<typename Op>
      69                 : void
      70                 : complete_dgram_recv_op(Op& op)
      71                 : {
      72                 :     op.stop_cb.reset();
      73                 :     op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
      74                 : 
      75                 :     // No EOF: a zero-length datagram is valid (success with 0 bytes).
      76                 :     decode_io_result(
      77                 :         op.ec_out,
      78                 :         op.cancelled.load(std::memory_order_acquire),
      79                 :         op.errn != 0 ? make_err(op.errn) : std::error_code{},
      80                 :         /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
      81                 : 
      82                 :     *op.bytes_out = op.bytes_transferred;
      83                 : 
      84                 :     coro_resume(&op);
      85                 : }
      86                 : 
      87                 : /** Complete a wait operation.
      88                 : 
      89                 :     Wait operations report only an error_code — no bytes_transferred,
      90                 :     no EOF translation. Used for socket and acceptor wait() awaitables;
      91                 :     picks the impl pointer set by start() to reach the scheduler.
      92                 : 
      93                 :     @tparam Op The concrete wait operation type.
      94                 :     @param op The operation to complete.
      95                 : */
      96                 : template<typename Op>
      97                 : void
      98              56 : complete_wait_op(Op& op)
      99                 : {
     100              56 :     op.stop_cb.reset();
     101              56 :     if (op.socket_impl_)
     102              44 :         op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
     103                 :     else
     104              12 :         op.acceptor_impl_->desc_state_.scheduler_->reset_inline_budget();
     105                 : 
     106                 :     // Wait reports only success/cancel/error — no bytes, no EOF.
     107             112 :     decode_io_result(
     108                 :         op.ec_out,
     109              56 :         op.cancelled.load(std::memory_order_acquire),
     110              56 :         op.errn != 0 ? make_err(op.errn) : std::error_code{},
     111                 :         /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
     112                 : 
     113              56 :     coro_resume(&op);
     114              56 : }
     115                 : 
     116                 : /** Complete a connect operation with endpoint caching.
     117                 : 
     118                 :     On success, queries the local endpoint via getsockname and
     119                 :     caches both endpoints in the socket impl. Then resumes the
     120                 :     caller via symmetric transfer.
     121                 : 
     122                 :     @tparam Op The concrete connect operation type.
     123                 :     @param op The operation to complete.
     124                 : */
     125                 : template<typename Op>
     126                 : void
     127            7166 : complete_connect_op(Op& op)
     128                 : {
     129            7166 :     op.stop_cb.reset();
     130            7166 :     op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
     131                 : 
     132            7166 :     bool success =
     133            7166 :         (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
     134                 : 
     135            7166 :     if (success && op.socket_impl_)
     136                 :     {
     137                 :         using ep_type = decltype(op.target_endpoint);
     138            7139 :         ep_type local_ep;
     139            7139 :         sockaddr_storage local_storage{};
     140            7139 :         socklen_t local_len = sizeof(local_storage);
     141            7139 :         if (::getsockname(
     142                 :                 op.fd, reinterpret_cast<sockaddr*>(&local_storage),
     143            7139 :                 &local_len) == 0)
     144            7119 :             local_ep =
     145            7139 :                 from_sockaddr_as(local_storage, local_len, ep_type{});
     146            7139 :         op.socket_impl_->set_endpoints(local_ep, op.target_endpoint);
     147                 :     }
     148                 : 
     149           14307 :     decode_io_result(
     150                 :         op.ec_out,
     151            7166 :         op.cancelled.load(std::memory_order_acquire),
     152            7166 :         op.errn != 0 ? make_err(op.errn) : std::error_code{},
     153                 :         /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
     154                 : 
     155            7166 :     coro_resume(&op);
     156            7166 : }
     157                 : 
     158                 : /** Construct and register a peer socket from an accepted fd.
     159                 : 
     160                 :     Creates a new socket impl via the acceptor's associated
     161                 :     socket service, registers it with the scheduler, and caches
     162                 :     the local and remote endpoints.
     163                 : 
     164                 :     @tparam SocketImpl The concrete socket implementation type.
     165                 :     @tparam AcceptorImpl The concrete acceptor implementation type.
     166                 :     @param acceptor_impl The acceptor that accepted the connection.
     167                 :     @param accepted_fd The accepted file descriptor (set to -1 on success).
     168                 :     @param peer_storage The peer address from accept().
     169                 :     @param impl_out Output pointer for the new socket impl.
     170                 :     @param ec_out Output pointer for any error.
     171                 :     @return True on success, false on failure.
     172                 : */
     173                 : template<typename SocketImpl, typename AcceptorImpl>
     174                 : bool
     175            7109 : setup_accepted_socket(
     176                 :     AcceptorImpl* acceptor_impl,
     177                 :     int& accepted_fd,
     178                 :     sockaddr_storage const& peer_storage,
     179                 :     socklen_t peer_addrlen,
     180                 :     io_object::implementation** impl_out,
     181                 :     std::error_code* ec_out)
     182                 : {
     183            7109 :     auto* socket_svc = acceptor_impl->service().stream_service();
     184            7109 :     if (!socket_svc)
     185                 :     {
     186 MIS           0 :         *ec_out = make_err(ENOENT);
     187               0 :         return false;
     188                 :     }
     189                 : 
     190 HIT        7109 :     auto& impl = static_cast<SocketImpl&>(*socket_svc->construct());
     191            7109 :     impl.set_socket(accepted_fd);
     192                 : 
     193            7109 :     impl.desc_state_.fd = accepted_fd;
     194                 :     {
     195            7109 :         std::lock_guard lock(impl.desc_state_.mutex);
     196            7109 :         impl.desc_state_.read_op    = nullptr;
     197            7109 :         impl.desc_state_.write_op   = nullptr;
     198            7109 :         impl.desc_state_.connect_op = nullptr;
     199            7109 :     }
     200            7109 :     socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
     201                 : 
     202                 :     using ep_type = decltype(acceptor_impl->local_endpoint());
     203            7109 :     impl.set_endpoints(
     204                 :         acceptor_impl->local_endpoint(),
     205            7109 :         from_sockaddr_as(
     206                 :             peer_storage,
     207                 :             peer_addrlen,
     208                 :             ep_type{}));
     209                 : 
     210            7109 :     if (impl_out)
     211            7109 :         *impl_out = &impl;
     212            7109 :     accepted_fd = -1;
     213            7109 :     return true;
     214                 : }
     215                 : 
     216                 : /** Complete an accept operation.
     217                 : 
     218                 :     Sets up the peer socket on success, or closes the accepted
     219                 :     fd on failure. Then resumes the caller via symmetric transfer.
     220                 : 
     221                 :     @tparam SocketImpl The concrete socket implementation type.
     222                 :     @tparam Op The concrete accept operation type.
     223                 :     @param op The operation to complete.
     224                 : */
     225                 : template<typename SocketImpl, typename Op>
     226                 : void
     227            7129 : complete_accept_op(Op& op)
     228                 : {
     229            7129 :     op.stop_cb.reset();
     230            7129 :     op.acceptor_impl_->desc_state_.scheduler_->reset_inline_budget();
     231                 : 
     232            7129 :     bool success =
     233            7129 :         (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
     234                 : 
     235           14258 :     decode_io_result(
     236                 :         op.ec_out,
     237            7129 :         op.cancelled.load(std::memory_order_acquire),
     238            7129 :         op.errn != 0 ? make_err(op.errn) : std::error_code{},
     239                 :         /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
     240                 : 
     241            7129 :     if (success && op.accepted_fd >= 0 && op.acceptor_impl_)
     242                 :     {
     243            7109 :         if (!setup_accepted_socket<SocketImpl>(
     244            7109 :                 op.acceptor_impl_, op.accepted_fd, op.peer_storage,
     245                 :                 op.peer_addrlen, op.impl_out, op.ec_out))
     246 MIS           0 :             success = false;
     247                 :     }
     248                 : 
     249 HIT        7129 :     if (!success || !op.acceptor_impl_)
     250                 :     {
     251              20 :         if (op.accepted_fd >= 0)
     252                 :         {
     253 MIS           0 :             ::close(op.accepted_fd);
     254               0 :             op.accepted_fd = -1;
     255                 :         }
     256 HIT          20 :         if (op.impl_out)
     257              20 :             *op.impl_out = nullptr;
     258                 :     }
     259                 : 
     260            7129 :     coro_resume(&op);
     261            7129 : }
     262                 : 
     263                 : /** Complete a datagram operation (send_to or recv_from).
     264                 : 
     265                 :     For recv_from operations, writes the source endpoint from the
     266                 :     recorded sockaddr_storage into the caller's endpoint pointer.
     267                 :     Then resumes the caller via symmetric transfer.
     268                 : 
     269                 :     @tparam Op The concrete datagram operation type.
     270                 :     @param op The operation to complete.
     271                 : */
     272                 : template<typename Op>
     273                 : void
     274              18 : complete_datagram_op(Op& op)
     275                 : {
     276              18 :     op.stop_cb.reset();
     277              18 :     op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
     278                 : 
     279                 :     // No EOF: a zero-length datagram is valid (success with 0 bytes).
     280              36 :     decode_io_result(
     281                 :         op.ec_out,
     282              18 :         op.cancelled.load(std::memory_order_acquire),
     283              18 :         op.errn != 0 ? make_err(op.errn) : std::error_code{},
     284                 :         /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
     285                 : 
     286              18 :     *op.bytes_out = op.bytes_transferred;
     287                 : 
     288              18 :     coro_resume(&op);
     289              18 : }
     290                 : 
     291                 : /** Complete a datagram operation with source endpoint capture.
     292                 : 
     293                 :     For recv_from operations, writes the source endpoint from the
     294                 :     recorded sockaddr_storage into the caller's endpoint pointer.
     295                 :     Then resumes the caller via symmetric transfer.
     296                 : 
     297                 :     @tparam Op The concrete datagram operation type.
     298                 :     @param op The operation to complete.
     299                 :     @param source_out Optional pointer to store source endpoint
     300                 :         (non-null for recv_from, null for send_to).
     301                 : */
     302                 : template<typename Op, typename Endpoint>
     303                 : void
     304              26 : complete_datagram_op(Op& op, Endpoint* source_out)
     305                 : {
     306              26 :     op.stop_cb.reset();
     307              26 :     op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
     308                 : 
     309                 :     // No EOF: a zero-length datagram is valid (success with 0 bytes).
     310              52 :     decode_io_result(
     311                 :         op.ec_out,
     312              26 :         op.cancelled.load(std::memory_order_acquire),
     313              26 :         op.errn != 0 ? make_err(op.errn) : std::error_code{},
     314                 :         /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
     315                 : 
     316              26 :     *op.bytes_out = op.bytes_transferred;
     317                 : 
     318              36 :     if (source_out && !op.cancelled.load(std::memory_order_acquire) &&
     319              10 :         op.errn == 0)
     320              20 :         *source_out = from_sockaddr_as(
     321              10 :             op.source_storage,
     322                 :             op.source_addrlen,
     323                 :             Endpoint{});
     324                 : 
     325              26 :     coro_resume(&op);
     326              26 : }
     327                 : 
     328                 : } // namespace boost::corosio::detail
     329                 : 
     330                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
        

Generated by: LCOV version 2.3