LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_basic_socket.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 90.0 % 150 135 15
Test Date: 2026-06-09 16:09:19 Functions: 90.0 % 300 270 30

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/intrusive.hpp>
      14                 : #include <boost/corosio/detail/native_handle.hpp>
      15                 : #include <boost/corosio/endpoint.hpp>
      16                 : #include <boost/corosio/native/detail/native_socket_base.hpp>
      17                 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
      18                 : #include <boost/corosio/native/detail/make_err.hpp>
      19                 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
      20                 : 
      21                 : #include <memory>
      22                 : #include <mutex>
      23                 : #include <utility>
      24                 : 
      25                 : #include <errno.h>
      26                 : #include <netinet/in.h>
      27                 : #include <sys/socket.h>
      28                 : #include <unistd.h>
      29                 : 
      30                 : namespace boost::corosio::detail {
      31                 : 
      32                 : /** CRTP base for reactor-backed socket implementations.
      33                 : 
      34                 :     Extracts the shared data members, virtual overrides, and
      35                 :     cancel/close/register logic that is identical across TCP
      36                 :     (reactor_stream_socket) and UDP (reactor_datagram_socket).
      37                 : 
      38                 :     Derived classes provide CRTP callbacks that enumerate their
      39                 :     specific op slots so cancel/close can iterate them generically.
      40                 : 
      41                 :     @tparam Derived   The concrete socket type (CRTP).
      42                 :     @tparam ImplBase  The public vtable base (tcp_socket::implementation
      43                 :                       or udp_socket::implementation).
      44                 :     @tparam Service   The backend's service type.
      45                 :     @tparam DescState The backend's descriptor_state type.
      46                 :     @tparam Endpoint  The endpoint type (endpoint or local_endpoint).
      47                 : */
      48                 : template<
      49                 :     class Derived,
      50                 :     class ImplBase,
      51                 :     class Service,
      52                 :     class DescState,
      53                 :     class Endpoint = endpoint>
      54                 : class reactor_basic_socket
      55                 :     : public native_socket_base<Derived, ImplBase, Endpoint>
      56                 :     , public intrusive_list<Derived>::node
      57                 : {
      58                 :     friend Derived;
      59                 : 
      60                 :     template<class, class, class, class, class, class, class, class, class>
      61                 :     friend class reactor_stream_socket;
      62                 : 
      63                 :     template<class, class, class, class, class, class, class, class, class, class, class>
      64                 :     friend class reactor_datagram_socket;
      65                 : 
      66 HIT       21888 :     explicit reactor_basic_socket(Service& svc) noexcept : svc_(svc) {}
      67                 : 
      68                 : protected:
      69                 :     // fd_ / local_endpoint_ and the synchronous accessors (native_handle,
      70                 :     // is_open, set_option/get_option, set_socket/set_local_endpoint, do_bind)
      71                 :     // live in native_socket_base — the readiness/completion-agnostic base
      72                 :     // shared with io_uring's sockets. The using-declarations make the
      73                 :     // inherited members visible to this template's own unqualified
      74                 :     // references below (two-phase lookup).
      75                 :     using native_socket_base<Derived, ImplBase, Endpoint>::fd_;
      76                 :     using native_socket_base<Derived, ImplBase, Endpoint>::local_endpoint_;
      77                 : 
      78                 :     Service& svc_;
      79                 : 
      80                 : public:
      81                 :     /// Per-descriptor state for persistent reactor registration.
      82                 :     DescState desc_state_;
      83                 : 
      84           21888 :     ~reactor_basic_socket() override = default;
      85                 : 
      86                 :     /// Assign the fd, initialize descriptor state, and register with the reactor.
      87            7514 :     void init_and_register(int fd) noexcept
      88                 :     {
      89            7514 :         fd_ = fd;
      90            7514 :         desc_state_.fd = fd;
      91                 :         {
      92            7514 :             std::lock_guard lock(desc_state_.mutex);
      93            7514 :             desc_state_.read_op    = nullptr;
      94            7514 :             desc_state_.write_op   = nullptr;
      95            7514 :             desc_state_.connect_op = nullptr;
      96            7514 :         }
      97            7514 :         svc_.scheduler().register_descriptor(fd, &desc_state_);
      98            7514 :     }
      99                 : 
     100                 :     /** Register an op with the reactor.
     101                 : 
     102                 :         Handles cached edge events and deferred cancellation.
     103                 :         Called on the EAGAIN/EINPROGRESS path when speculative
     104                 :         I/O failed.
     105                 :     */
     106                 :     template<class Op>
     107                 :     void register_op(
     108                 :         Op& op,
     109                 :         reactor_op_base*& desc_slot,
     110                 :         bool& ready_flag,
     111                 :         bool& cancel_flag,
     112                 :         bool is_write_direction = false) noexcept;
     113                 : 
     114                 :     /** Cancel a single pending operation.
     115                 : 
     116                 :         Claims the operation from its descriptor_state slot under
     117                 :         the mutex and posts it to the scheduler as cancelled.
     118                 :         Derived must implement:
     119                 :           op_to_desc_slot(Op&) -> reactor_op_base**
     120                 :           op_to_cancel_flag(Op&) -> bool*
     121                 :     */
     122                 :     template<class Op>
     123                 :     void cancel_single_op(Op& op) noexcept;
     124                 : 
     125                 :     /** Cancel all pending operations.
     126                 : 
     127                 :         Invoked by the derived class's cancel() override.
     128                 :         Derived must implement:
     129                 :           for_each_op(auto fn)
     130                 :           for_each_desc_entry(auto fn)
     131                 :     */
     132                 :     void do_cancel() noexcept;
     133                 : 
     134                 :     /** Close the socket and cancel pending operations.
     135                 : 
     136                 :         Invoked by the derived class's close_socket(). The
     137                 :         derived class may add backend-specific cleanup after
     138                 :         calling this method.
     139                 :         Derived must implement:
     140                 :           for_each_op(auto fn)
     141                 :           for_each_desc_entry(auto fn)
     142                 :     */
     143                 :     void do_close_socket() noexcept;
     144                 : 
     145                 :     /** Release the socket without closing the fd.
     146                 : 
     147                 :         Like do_close_socket() but does not call ::close().
     148                 :         Returns the fd so the caller can take ownership.
     149                 :     */
     150                 :     native_handle_type do_release_socket() noexcept;
     151                 : };
     152                 : 
     153                 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
     154                 : template<class Op>
     155                 : void
     156            7751 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::register_op(
     157                 :     Op& op,
     158                 :     reactor_op_base*& desc_slot,
     159                 :     bool& ready_flag,
     160                 :     bool& cancel_flag,
     161                 :     bool is_write_direction) noexcept
     162                 : {
     163            7751 :     svc_.work_started();
     164                 : 
     165            7751 :     std::lock_guard lock(desc_state_.mutex);
     166            7751 :     bool io_done = false;
     167            7751 :     if (ready_flag)
     168                 :     {
     169             186 :         ready_flag = false;
     170             186 :         op.perform_io();
     171             186 :         io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
     172             186 :         if (!io_done)
     173             186 :             op.errn = 0;
     174                 :     }
     175                 : 
     176            7751 :     if (cancel_flag)
     177                 :     {
     178 MIS           0 :         cancel_flag = false;
     179               0 :         op.cancelled.store(true, std::memory_order_relaxed);
     180                 :     }
     181                 : 
     182 HIT        7751 :     if (io_done || op.cancelled.load(std::memory_order_acquire))
     183                 :     {
     184 MIS           0 :         svc_.post(&op);
     185               0 :         svc_.work_finished();
     186                 :     }
     187                 :     else
     188                 :     {
     189 HIT        7751 :         desc_slot = &op;
     190                 : 
     191                 :         // Select must rebuild its fd_sets when a write-direction op
     192                 :         // is parked, so select() watches for writability. Compiled
     193                 :         // away to nothing for epoll and kqueue.
     194                 :         if constexpr (requires { Service::needs_write_notification; })
     195                 :         {
     196                 :             if constexpr (Service::needs_write_notification)
     197                 :             {
     198            3563 :                 if (is_write_direction)
     199            3262 :                     svc_.scheduler().notify_reactor();
     200                 :             }
     201                 :         }
     202                 :     }
     203            7751 : }
     204                 : 
     205                 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
     206                 : template<class Op>
     207                 : void
     208             200 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::cancel_single_op(
     209                 :     Op& op) noexcept
     210                 : {
     211             200 :     auto self = this->weak_from_this().lock();
     212             200 :     if (!self)
     213 MIS           0 :         return;
     214                 : 
     215 HIT         200 :     op.request_cancel();
     216                 : 
     217             200 :     auto* d                       = static_cast<Derived*>(this);
     218             200 :     reactor_op_base** desc_op_ptr = d->op_to_desc_slot(op);
     219                 : 
     220             200 :     if (desc_op_ptr)
     221                 :     {
     222             200 :         reactor_op_base* claimed = nullptr;
     223                 :         {
     224             200 :             std::lock_guard lock(desc_state_.mutex);
     225             200 :             if (*desc_op_ptr == &op)
     226             200 :                 claimed = std::exchange(*desc_op_ptr, nullptr);
     227                 :             else
     228                 :             {
     229 MIS           0 :                 bool* cflag = d->op_to_cancel_flag(op);
     230               0 :                 if (cflag)
     231               0 :                     *cflag = true;
     232                 :             }
     233 HIT         200 :         }
     234             200 :         if (claimed)
     235                 :         {
     236             200 :             op.impl_ptr = self;
     237             200 :             svc_.post(&op);
     238             200 :             svc_.work_finished();
     239                 :         }
     240                 :     }
     241             200 : }
     242                 : 
     243                 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
     244                 : void
     245             209 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
     246                 :     do_cancel() noexcept
     247                 : {
     248             209 :     auto self = this->weak_from_this().lock();
     249             209 :     if (!self)
     250 MIS           0 :         return;
     251                 : 
     252 HIT         209 :     auto* d = static_cast<Derived*>(this);
     253                 : 
     254            1491 :     d->for_each_op([](auto& op) { op.request_cancel(); });
     255                 : 
     256                 :     // Claim ops under a single lock acquisition
     257                 :     struct claimed_entry
     258                 :     {
     259                 :         reactor_op_base* op   = nullptr;
     260                 :         reactor_op_base* base = nullptr;
     261                 :     };
     262                 :     // Max 8 ops: conn, rd, wr, wait_rd, wait_wr, wait_er, recv_rd, send_wr
     263             209 :     claimed_entry claimed[8];
     264             209 :     int count = 0;
     265                 : 
     266                 :     {
     267             209 :         std::lock_guard lock(desc_state_.mutex);
     268            2773 :         d->for_each_desc_entry([&](auto& op, reactor_op_base*& desc_slot) {
     269            1282 :             if (desc_slot == &op)
     270                 :             {
     271             122 :                 claimed[count].op   = std::exchange(desc_slot, nullptr);
     272             122 :                 claimed[count].base = &op;
     273             122 :                 ++count;
     274                 :             }
     275                 :         });
     276             209 :     }
     277                 : 
     278             331 :     for (int i = 0; i < count; ++i)
     279                 :     {
     280             122 :         claimed[i].base->impl_ptr = self;
     281             122 :         svc_.post(claimed[i].base);
     282             122 :         svc_.work_finished();
     283                 :     }
     284             209 : }
     285                 : 
     286                 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
     287                 : void
     288           65917 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
     289                 :     do_close_socket() noexcept
     290                 : {
     291           65917 :     auto self = this->weak_from_this().lock();
     292           65917 :     if (self)
     293                 :     {
     294           65917 :         auto* d = static_cast<Derived*>(this);
     295                 : 
     296          463731 :         d->for_each_op([](auto& op) { op.request_cancel(); });
     297                 : 
     298                 :         struct claimed_entry
     299                 :         {
     300                 :             reactor_op_base* base = nullptr;
     301                 :         };
     302           65917 :         claimed_entry claimed[8];
     303           65917 :         int count = 0;
     304                 : 
     305                 :         {
     306           65917 :             std::lock_guard lock(desc_state_.mutex);
     307           65917 :             d->for_each_desc_entry(
     308          795628 :                 [&](auto& /*op*/, reactor_op_base*& desc_slot) {
     309          397814 :                     auto* c = std::exchange(desc_slot, nullptr);
     310          397814 :                     if (c)
     311                 :                     {
     312               8 :                         claimed[count].base = c;
     313               8 :                         ++count;
     314                 :                     }
     315                 :                 });
     316           65917 :             desc_state_.read_ready             = false;
     317           65917 :             desc_state_.write_ready            = false;
     318           65917 :             desc_state_.read_cancel_pending       = false;
     319           65917 :             desc_state_.write_cancel_pending      = false;
     320           65917 :             desc_state_.connect_cancel_pending    = false;
     321           65917 :             desc_state_.wait_read_cancel_pending  = false;
     322           65917 :             desc_state_.wait_write_cancel_pending = false;
     323           65917 :             desc_state_.wait_error_cancel_pending = false;
     324                 : 
     325           65917 :             if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
     326             409 :                 desc_state_.impl_ref_ = self;
     327           65917 :         }
     328                 : 
     329           65925 :         for (int i = 0; i < count; ++i)
     330                 :         {
     331               8 :             claimed[i].base->impl_ptr = self;
     332               8 :             svc_.post(claimed[i].base);
     333               8 :             svc_.work_finished();
     334                 :         }
     335                 :     }
     336                 : 
     337           65917 :     if (fd_ >= 0)
     338                 :     {
     339           14623 :         if (desc_state_.registered_events != 0)
     340           14623 :             svc_.scheduler().deregister_descriptor(fd_);
     341           14623 :         ::close(fd_);
     342           14623 :         fd_ = -1;
     343                 :     }
     344                 : 
     345           65917 :     desc_state_.fd                = -1;
     346           65917 :     desc_state_.registered_events = 0;
     347                 : 
     348           65917 :     local_endpoint_ = Endpoint{};
     349           65917 : }
     350                 : 
     351                 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
     352                 : native_handle_type
     353               4 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
     354                 :     do_release_socket() noexcept
     355                 : {
     356                 :     // Cancel pending ops (same as do_close_socket)
     357               4 :     auto self = this->weak_from_this().lock();
     358               4 :     if (self)
     359                 :     {
     360               4 :         auto* d = static_cast<Derived*>(this);
     361                 : 
     362              32 :         d->for_each_op([](auto& op) { op.request_cancel(); });
     363                 : 
     364                 :         struct claimed_entry
     365                 :         {
     366                 :             reactor_op_base* base = nullptr;
     367                 :         };
     368               4 :         claimed_entry claimed[8];
     369               4 :         int count = 0;
     370                 : 
     371                 :         {
     372               4 :             std::lock_guard lock(desc_state_.mutex);
     373               4 :             d->for_each_desc_entry(
     374              56 :                 [&](auto& /*op*/, reactor_op_base*& desc_slot) {
     375              28 :                     auto* c = std::exchange(desc_slot, nullptr);
     376              28 :                     if (c)
     377                 :                     {
     378 MIS           0 :                         claimed[count].base = c;
     379               0 :                         ++count;
     380                 :                     }
     381                 :                 });
     382 HIT           4 :             desc_state_.read_ready             = false;
     383               4 :             desc_state_.write_ready            = false;
     384               4 :             desc_state_.read_cancel_pending       = false;
     385               4 :             desc_state_.write_cancel_pending      = false;
     386               4 :             desc_state_.connect_cancel_pending    = false;
     387               4 :             desc_state_.wait_read_cancel_pending  = false;
     388               4 :             desc_state_.wait_write_cancel_pending = false;
     389               4 :             desc_state_.wait_error_cancel_pending = false;
     390                 : 
     391               4 :             if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
     392 MIS           0 :                 desc_state_.impl_ref_ = self;
     393 HIT           4 :         }
     394                 : 
     395               4 :         for (int i = 0; i < count; ++i)
     396                 :         {
     397 MIS           0 :             claimed[i].base->impl_ptr = self;
     398               0 :             svc_.post(claimed[i].base);
     399               0 :             svc_.work_finished();
     400                 :         }
     401                 :     }
     402                 : 
     403 HIT           4 :     native_handle_type released = fd_;
     404                 : 
     405               4 :     if (fd_ >= 0)
     406                 :     {
     407               4 :         if (desc_state_.registered_events != 0)
     408               4 :             svc_.scheduler().deregister_descriptor(fd_);
     409                 :         // Do NOT close -- caller takes ownership
     410               4 :         fd_ = -1;
     411                 :     }
     412                 : 
     413               4 :     desc_state_.fd                = -1;
     414               4 :     desc_state_.registered_events = 0;
     415                 : 
     416               4 :     local_endpoint_ = Endpoint{};
     417                 : 
     418               8 :     return released;
     419               4 : }
     420                 : 
     421                 : } // namespace boost::corosio::detail
     422                 : 
     423                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
        

Generated by: LCOV version 2.3