TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
12 :
13 : #include <boost/corosio/detail/intrusive.hpp>
14 : #include <boost/corosio/detail/native_handle.hpp>
15 : #include <boost/corosio/endpoint.hpp>
16 : #include <boost/corosio/native/detail/native_socket_base.hpp>
17 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
18 : #include <boost/corosio/native/detail/make_err.hpp>
19 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
20 :
21 : #include <memory>
22 : #include <mutex>
23 : #include <utility>
24 :
25 : #include <errno.h>
26 : #include <netinet/in.h>
27 : #include <sys/socket.h>
28 : #include <unistd.h>
29 :
30 : namespace boost::corosio::detail {
31 :
32 : /** CRTP base for reactor-backed socket implementations.
33 :
34 : Extracts the shared data members, virtual overrides, and
35 : cancel/close/register logic that is identical across TCP
36 : (reactor_stream_socket) and UDP (reactor_datagram_socket).
37 :
38 : Derived classes provide CRTP callbacks that enumerate their
39 : specific op slots so cancel/close can iterate them generically.
40 :
41 : @tparam Derived The concrete socket type (CRTP).
42 : @tparam ImplBase The public vtable base (tcp_socket::implementation
43 : or udp_socket::implementation).
44 : @tparam Service The backend's service type.
45 : @tparam DescState The backend's descriptor_state type.
46 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
47 : */
48 : template<
49 : class Derived,
50 : class ImplBase,
51 : class Service,
52 : class DescState,
53 : class Endpoint = endpoint>
54 : class reactor_basic_socket
55 : : public native_socket_base<Derived, ImplBase, Endpoint>
56 : , public intrusive_list<Derived>::node
57 : {
58 : friend Derived;
59 :
60 : template<class, class, class, class, class, class, class, class, class>
61 : friend class reactor_stream_socket;
62 :
63 : template<class, class, class, class, class, class, class, class, class, class, class>
64 : friend class reactor_datagram_socket;
65 :
66 HIT 21888 : explicit reactor_basic_socket(Service& svc) noexcept : svc_(svc) {}
67 :
68 : protected:
69 : // fd_ / local_endpoint_ and the synchronous accessors (native_handle,
70 : // is_open, set_option/get_option, set_socket/set_local_endpoint, do_bind)
71 : // live in native_socket_base — the readiness/completion-agnostic base
72 : // shared with io_uring's sockets. The using-declarations make the
73 : // inherited members visible to this template's own unqualified
74 : // references below (two-phase lookup).
75 : using native_socket_base<Derived, ImplBase, Endpoint>::fd_;
76 : using native_socket_base<Derived, ImplBase, Endpoint>::local_endpoint_;
77 :
78 : Service& svc_;
79 :
80 : public:
81 : /// Per-descriptor state for persistent reactor registration.
82 : DescState desc_state_;
83 :
84 21888 : ~reactor_basic_socket() override = default;
85 :
86 : /// Assign the fd, initialize descriptor state, and register with the reactor.
87 7514 : void init_and_register(int fd) noexcept
88 : {
89 7514 : fd_ = fd;
90 7514 : desc_state_.fd = fd;
91 : {
92 7514 : std::lock_guard lock(desc_state_.mutex);
93 7514 : desc_state_.read_op = nullptr;
94 7514 : desc_state_.write_op = nullptr;
95 7514 : desc_state_.connect_op = nullptr;
96 7514 : }
97 7514 : svc_.scheduler().register_descriptor(fd, &desc_state_);
98 7514 : }
99 :
100 : /** Register an op with the reactor.
101 :
102 : Handles cached edge events and deferred cancellation.
103 : Called on the EAGAIN/EINPROGRESS path when speculative
104 : I/O failed.
105 : */
106 : template<class Op>
107 : void register_op(
108 : Op& op,
109 : reactor_op_base*& desc_slot,
110 : bool& ready_flag,
111 : bool& cancel_flag,
112 : bool is_write_direction = false) noexcept;
113 :
114 : /** Cancel a single pending operation.
115 :
116 : Claims the operation from its descriptor_state slot under
117 : the mutex and posts it to the scheduler as cancelled.
118 : Derived must implement:
119 : op_to_desc_slot(Op&) -> reactor_op_base**
120 : op_to_cancel_flag(Op&) -> bool*
121 : */
122 : template<class Op>
123 : void cancel_single_op(Op& op) noexcept;
124 :
125 : /** Cancel all pending operations.
126 :
127 : Invoked by the derived class's cancel() override.
128 : Derived must implement:
129 : for_each_op(auto fn)
130 : for_each_desc_entry(auto fn)
131 : */
132 : void do_cancel() noexcept;
133 :
134 : /** Close the socket and cancel pending operations.
135 :
136 : Invoked by the derived class's close_socket(). The
137 : derived class may add backend-specific cleanup after
138 : calling this method.
139 : Derived must implement:
140 : for_each_op(auto fn)
141 : for_each_desc_entry(auto fn)
142 : */
143 : void do_close_socket() noexcept;
144 :
145 : /** Release the socket without closing the fd.
146 :
147 : Like do_close_socket() but does not call ::close().
148 : Returns the fd so the caller can take ownership.
149 : */
150 : native_handle_type do_release_socket() noexcept;
151 : };
152 :
153 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
154 : template<class Op>
155 : void
156 7751 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::register_op(
157 : Op& op,
158 : reactor_op_base*& desc_slot,
159 : bool& ready_flag,
160 : bool& cancel_flag,
161 : bool is_write_direction) noexcept
162 : {
163 7751 : svc_.work_started();
164 :
165 7751 : std::lock_guard lock(desc_state_.mutex);
166 7751 : bool io_done = false;
167 7751 : if (ready_flag)
168 : {
169 186 : ready_flag = false;
170 186 : op.perform_io();
171 186 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
172 186 : if (!io_done)
173 186 : op.errn = 0;
174 : }
175 :
176 7751 : if (cancel_flag)
177 : {
178 MIS 0 : cancel_flag = false;
179 0 : op.cancelled.store(true, std::memory_order_relaxed);
180 : }
181 :
182 HIT 7751 : if (io_done || op.cancelled.load(std::memory_order_acquire))
183 : {
184 MIS 0 : svc_.post(&op);
185 0 : svc_.work_finished();
186 : }
187 : else
188 : {
189 HIT 7751 : desc_slot = &op;
190 :
191 : // Select must rebuild its fd_sets when a write-direction op
192 : // is parked, so select() watches for writability. Compiled
193 : // away to nothing for epoll and kqueue.
194 : if constexpr (requires { Service::needs_write_notification; })
195 : {
196 : if constexpr (Service::needs_write_notification)
197 : {
198 3563 : if (is_write_direction)
199 3262 : svc_.scheduler().notify_reactor();
200 : }
201 : }
202 : }
203 7751 : }
204 :
205 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
206 : template<class Op>
207 : void
208 200 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::cancel_single_op(
209 : Op& op) noexcept
210 : {
211 200 : auto self = this->weak_from_this().lock();
212 200 : if (!self)
213 MIS 0 : return;
214 :
215 HIT 200 : op.request_cancel();
216 :
217 200 : auto* d = static_cast<Derived*>(this);
218 200 : reactor_op_base** desc_op_ptr = d->op_to_desc_slot(op);
219 :
220 200 : if (desc_op_ptr)
221 : {
222 200 : reactor_op_base* claimed = nullptr;
223 : {
224 200 : std::lock_guard lock(desc_state_.mutex);
225 200 : if (*desc_op_ptr == &op)
226 200 : claimed = std::exchange(*desc_op_ptr, nullptr);
227 : else
228 : {
229 MIS 0 : bool* cflag = d->op_to_cancel_flag(op);
230 0 : if (cflag)
231 0 : *cflag = true;
232 : }
233 HIT 200 : }
234 200 : if (claimed)
235 : {
236 200 : op.impl_ptr = self;
237 200 : svc_.post(&op);
238 200 : svc_.work_finished();
239 : }
240 : }
241 200 : }
242 :
243 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
244 : void
245 209 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
246 : do_cancel() noexcept
247 : {
248 209 : auto self = this->weak_from_this().lock();
249 209 : if (!self)
250 MIS 0 : return;
251 :
252 HIT 209 : auto* d = static_cast<Derived*>(this);
253 :
254 1491 : d->for_each_op([](auto& op) { op.request_cancel(); });
255 :
256 : // Claim ops under a single lock acquisition
257 : struct claimed_entry
258 : {
259 : reactor_op_base* op = nullptr;
260 : reactor_op_base* base = nullptr;
261 : };
262 : // Max 8 ops: conn, rd, wr, wait_rd, wait_wr, wait_er, recv_rd, send_wr
263 209 : claimed_entry claimed[8];
264 209 : int count = 0;
265 :
266 : {
267 209 : std::lock_guard lock(desc_state_.mutex);
268 2773 : d->for_each_desc_entry([&](auto& op, reactor_op_base*& desc_slot) {
269 1282 : if (desc_slot == &op)
270 : {
271 122 : claimed[count].op = std::exchange(desc_slot, nullptr);
272 122 : claimed[count].base = &op;
273 122 : ++count;
274 : }
275 : });
276 209 : }
277 :
278 331 : for (int i = 0; i < count; ++i)
279 : {
280 122 : claimed[i].base->impl_ptr = self;
281 122 : svc_.post(claimed[i].base);
282 122 : svc_.work_finished();
283 : }
284 209 : }
285 :
286 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
287 : void
288 65917 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
289 : do_close_socket() noexcept
290 : {
291 65917 : auto self = this->weak_from_this().lock();
292 65917 : if (self)
293 : {
294 65917 : auto* d = static_cast<Derived*>(this);
295 :
296 463731 : d->for_each_op([](auto& op) { op.request_cancel(); });
297 :
298 : struct claimed_entry
299 : {
300 : reactor_op_base* base = nullptr;
301 : };
302 65917 : claimed_entry claimed[8];
303 65917 : int count = 0;
304 :
305 : {
306 65917 : std::lock_guard lock(desc_state_.mutex);
307 65917 : d->for_each_desc_entry(
308 795628 : [&](auto& /*op*/, reactor_op_base*& desc_slot) {
309 397814 : auto* c = std::exchange(desc_slot, nullptr);
310 397814 : if (c)
311 : {
312 8 : claimed[count].base = c;
313 8 : ++count;
314 : }
315 : });
316 65917 : desc_state_.read_ready = false;
317 65917 : desc_state_.write_ready = false;
318 65917 : desc_state_.read_cancel_pending = false;
319 65917 : desc_state_.write_cancel_pending = false;
320 65917 : desc_state_.connect_cancel_pending = false;
321 65917 : desc_state_.wait_read_cancel_pending = false;
322 65917 : desc_state_.wait_write_cancel_pending = false;
323 65917 : desc_state_.wait_error_cancel_pending = false;
324 :
325 65917 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
326 409 : desc_state_.impl_ref_ = self;
327 65917 : }
328 :
329 65925 : for (int i = 0; i < count; ++i)
330 : {
331 8 : claimed[i].base->impl_ptr = self;
332 8 : svc_.post(claimed[i].base);
333 8 : svc_.work_finished();
334 : }
335 : }
336 :
337 65917 : if (fd_ >= 0)
338 : {
339 14623 : if (desc_state_.registered_events != 0)
340 14623 : svc_.scheduler().deregister_descriptor(fd_);
341 14623 : ::close(fd_);
342 14623 : fd_ = -1;
343 : }
344 :
345 65917 : desc_state_.fd = -1;
346 65917 : desc_state_.registered_events = 0;
347 :
348 65917 : local_endpoint_ = Endpoint{};
349 65917 : }
350 :
351 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
352 : native_handle_type
353 4 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
354 : do_release_socket() noexcept
355 : {
356 : // Cancel pending ops (same as do_close_socket)
357 4 : auto self = this->weak_from_this().lock();
358 4 : if (self)
359 : {
360 4 : auto* d = static_cast<Derived*>(this);
361 :
362 32 : d->for_each_op([](auto& op) { op.request_cancel(); });
363 :
364 : struct claimed_entry
365 : {
366 : reactor_op_base* base = nullptr;
367 : };
368 4 : claimed_entry claimed[8];
369 4 : int count = 0;
370 :
371 : {
372 4 : std::lock_guard lock(desc_state_.mutex);
373 4 : d->for_each_desc_entry(
374 56 : [&](auto& /*op*/, reactor_op_base*& desc_slot) {
375 28 : auto* c = std::exchange(desc_slot, nullptr);
376 28 : if (c)
377 : {
378 MIS 0 : claimed[count].base = c;
379 0 : ++count;
380 : }
381 : });
382 HIT 4 : desc_state_.read_ready = false;
383 4 : desc_state_.write_ready = false;
384 4 : desc_state_.read_cancel_pending = false;
385 4 : desc_state_.write_cancel_pending = false;
386 4 : desc_state_.connect_cancel_pending = false;
387 4 : desc_state_.wait_read_cancel_pending = false;
388 4 : desc_state_.wait_write_cancel_pending = false;
389 4 : desc_state_.wait_error_cancel_pending = false;
390 :
391 4 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
392 MIS 0 : desc_state_.impl_ref_ = self;
393 HIT 4 : }
394 :
395 4 : for (int i = 0; i < count; ++i)
396 : {
397 MIS 0 : claimed[i].base->impl_ptr = self;
398 0 : svc_.post(claimed[i].base);
399 0 : svc_.work_finished();
400 : }
401 : }
402 :
403 HIT 4 : native_handle_type released = fd_;
404 :
405 4 : if (fd_ >= 0)
406 : {
407 4 : if (desc_state_.registered_events != 0)
408 4 : svc_.scheduler().deregister_descriptor(fd_);
409 : // Do NOT close -- caller takes ownership
410 4 : fd_ = -1;
411 : }
412 :
413 4 : desc_state_.fd = -1;
414 4 : desc_state_.registered_events = 0;
415 :
416 4 : local_endpoint_ = Endpoint{};
417 :
418 8 : return released;
419 4 : }
420 :
421 : } // namespace boost::corosio::detail
422 :
423 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
|