TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
12 :
13 : #include <boost/corosio/detail/dispatch_coro.hpp>
14 : #include <boost/corosio/native/detail/coro_op_complete.hpp>
15 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
16 : #include <boost/corosio/native/detail/make_err.hpp>
17 : #include <boost/corosio/io/io_object.hpp>
18 :
19 : #include <coroutine>
20 : #include <mutex>
21 : #include <utility>
22 :
23 : #include <netinet/in.h>
24 : #include <sys/socket.h>
25 : #include <unistd.h>
26 :
27 : namespace boost::corosio::detail {
28 :
29 : /** Complete a base read/write operation.
30 :
31 : Translates the recorded errno and cancellation state into
32 : an error_code, stores the byte count, then resumes the
33 : caller via symmetric transfer.
34 :
35 : @tparam Op The concrete operation type.
36 : @param op The operation to complete.
37 : */
38 : template<typename Op>
39 : void
40 HIT 87906 : complete_io_op(Op& op)
41 : {
42 87906 : op.stop_cb.reset();
43 87906 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
44 :
45 : // is_read_operation() already folds in the empty-buffer case (it
46 : // returns false for a zero-length read), so empty_buffer stays false
47 : // here and the shared EOF test reduces to the reactor's original
48 : // `is_read && bytes == 0`.
49 175812 : decode_io_result(
50 : op.ec_out,
51 87906 : op.cancelled.load(std::memory_order_acquire),
52 87906 : op.errn != 0 ? make_err(op.errn) : std::error_code{},
53 87906 : op.is_read_operation(), op.bytes_transferred, /*empty_buffer=*/false);
54 :
55 87906 : *op.bytes_out = op.bytes_transferred;
56 :
57 87906 : coro_resume(&op);
58 87906 : }
59 :
60 : /** Complete a datagram recv operation (connected mode).
61 :
62 : Like complete_io_op but does not translate zero bytes into
63 : EOF. Zero-length datagrams are valid and should be reported
64 : as success with 0 bytes transferred.
65 :
66 : @param op The operation to complete.
67 : */
68 : template<typename Op>
69 : void
70 : complete_dgram_recv_op(Op& op)
71 : {
72 : op.stop_cb.reset();
73 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
74 :
75 : // No EOF: a zero-length datagram is valid (success with 0 bytes).
76 : decode_io_result(
77 : op.ec_out,
78 : op.cancelled.load(std::memory_order_acquire),
79 : op.errn != 0 ? make_err(op.errn) : std::error_code{},
80 : /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
81 :
82 : *op.bytes_out = op.bytes_transferred;
83 :
84 : coro_resume(&op);
85 : }
86 :
87 : /** Complete a wait operation.
88 :
89 : Wait operations report only an error_code — no bytes_transferred,
90 : no EOF translation. Used for socket and acceptor wait() awaitables;
91 : picks the impl pointer set by start() to reach the scheduler.
92 :
93 : @tparam Op The concrete wait operation type.
94 : @param op The operation to complete.
95 : */
96 : template<typename Op>
97 : void
98 56 : complete_wait_op(Op& op)
99 : {
100 56 : op.stop_cb.reset();
101 56 : if (op.socket_impl_)
102 44 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
103 : else
104 12 : op.acceptor_impl_->desc_state_.scheduler_->reset_inline_budget();
105 :
106 : // Wait reports only success/cancel/error — no bytes, no EOF.
107 112 : decode_io_result(
108 : op.ec_out,
109 56 : op.cancelled.load(std::memory_order_acquire),
110 56 : op.errn != 0 ? make_err(op.errn) : std::error_code{},
111 : /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
112 :
113 56 : coro_resume(&op);
114 56 : }
115 :
116 : /** Complete a connect operation with endpoint caching.
117 :
118 : On success, queries the local endpoint via getsockname and
119 : caches both endpoints in the socket impl. Then resumes the
120 : caller via symmetric transfer.
121 :
122 : @tparam Op The concrete connect operation type.
123 : @param op The operation to complete.
124 : */
125 : template<typename Op>
126 : void
127 7166 : complete_connect_op(Op& op)
128 : {
129 7166 : op.stop_cb.reset();
130 7166 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
131 :
132 7166 : bool success =
133 7166 : (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
134 :
135 7166 : if (success && op.socket_impl_)
136 : {
137 : using ep_type = decltype(op.target_endpoint);
138 7139 : ep_type local_ep;
139 7139 : sockaddr_storage local_storage{};
140 7139 : socklen_t local_len = sizeof(local_storage);
141 7139 : if (::getsockname(
142 : op.fd, reinterpret_cast<sockaddr*>(&local_storage),
143 7139 : &local_len) == 0)
144 7119 : local_ep =
145 7139 : from_sockaddr_as(local_storage, local_len, ep_type{});
146 7139 : op.socket_impl_->set_endpoints(local_ep, op.target_endpoint);
147 : }
148 :
149 14307 : decode_io_result(
150 : op.ec_out,
151 7166 : op.cancelled.load(std::memory_order_acquire),
152 7166 : op.errn != 0 ? make_err(op.errn) : std::error_code{},
153 : /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
154 :
155 7166 : coro_resume(&op);
156 7166 : }
157 :
158 : /** Construct and register a peer socket from an accepted fd.
159 :
160 : Creates a new socket impl via the acceptor's associated
161 : socket service, registers it with the scheduler, and caches
162 : the local and remote endpoints.
163 :
164 : @tparam SocketImpl The concrete socket implementation type.
165 : @tparam AcceptorImpl The concrete acceptor implementation type.
166 : @param acceptor_impl The acceptor that accepted the connection.
167 : @param accepted_fd The accepted file descriptor (set to -1 on success).
168 : @param peer_storage The peer address from accept().
169 : @param impl_out Output pointer for the new socket impl.
170 : @param ec_out Output pointer for any error.
171 : @return True on success, false on failure.
172 : */
173 : template<typename SocketImpl, typename AcceptorImpl>
174 : bool
175 7109 : setup_accepted_socket(
176 : AcceptorImpl* acceptor_impl,
177 : int& accepted_fd,
178 : sockaddr_storage const& peer_storage,
179 : socklen_t peer_addrlen,
180 : io_object::implementation** impl_out,
181 : std::error_code* ec_out)
182 : {
183 7109 : auto* socket_svc = acceptor_impl->service().stream_service();
184 7109 : if (!socket_svc)
185 : {
186 MIS 0 : *ec_out = make_err(ENOENT);
187 0 : return false;
188 : }
189 :
190 HIT 7109 : auto& impl = static_cast<SocketImpl&>(*socket_svc->construct());
191 7109 : impl.set_socket(accepted_fd);
192 :
193 7109 : impl.desc_state_.fd = accepted_fd;
194 : {
195 7109 : std::lock_guard lock(impl.desc_state_.mutex);
196 7109 : impl.desc_state_.read_op = nullptr;
197 7109 : impl.desc_state_.write_op = nullptr;
198 7109 : impl.desc_state_.connect_op = nullptr;
199 7109 : }
200 7109 : socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
201 :
202 : using ep_type = decltype(acceptor_impl->local_endpoint());
203 7109 : impl.set_endpoints(
204 : acceptor_impl->local_endpoint(),
205 7109 : from_sockaddr_as(
206 : peer_storage,
207 : peer_addrlen,
208 : ep_type{}));
209 :
210 7109 : if (impl_out)
211 7109 : *impl_out = &impl;
212 7109 : accepted_fd = -1;
213 7109 : return true;
214 : }
215 :
216 : /** Complete an accept operation.
217 :
218 : Sets up the peer socket on success, or closes the accepted
219 : fd on failure. Then resumes the caller via symmetric transfer.
220 :
221 : @tparam SocketImpl The concrete socket implementation type.
222 : @tparam Op The concrete accept operation type.
223 : @param op The operation to complete.
224 : */
225 : template<typename SocketImpl, typename Op>
226 : void
227 7129 : complete_accept_op(Op& op)
228 : {
229 7129 : op.stop_cb.reset();
230 7129 : op.acceptor_impl_->desc_state_.scheduler_->reset_inline_budget();
231 :
232 7129 : bool success =
233 7129 : (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
234 :
235 14258 : decode_io_result(
236 : op.ec_out,
237 7129 : op.cancelled.load(std::memory_order_acquire),
238 7129 : op.errn != 0 ? make_err(op.errn) : std::error_code{},
239 : /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
240 :
241 7129 : if (success && op.accepted_fd >= 0 && op.acceptor_impl_)
242 : {
243 7109 : if (!setup_accepted_socket<SocketImpl>(
244 7109 : op.acceptor_impl_, op.accepted_fd, op.peer_storage,
245 : op.peer_addrlen, op.impl_out, op.ec_out))
246 MIS 0 : success = false;
247 : }
248 :
249 HIT 7129 : if (!success || !op.acceptor_impl_)
250 : {
251 20 : if (op.accepted_fd >= 0)
252 : {
253 MIS 0 : ::close(op.accepted_fd);
254 0 : op.accepted_fd = -1;
255 : }
256 HIT 20 : if (op.impl_out)
257 20 : *op.impl_out = nullptr;
258 : }
259 :
260 7129 : coro_resume(&op);
261 7129 : }
262 :
263 : /** Complete a datagram operation (send_to or recv_from).
264 :
265 : For recv_from operations, writes the source endpoint from the
266 : recorded sockaddr_storage into the caller's endpoint pointer.
267 : Then resumes the caller via symmetric transfer.
268 :
269 : @tparam Op The concrete datagram operation type.
270 : @param op The operation to complete.
271 : */
272 : template<typename Op>
273 : void
274 18 : complete_datagram_op(Op& op)
275 : {
276 18 : op.stop_cb.reset();
277 18 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
278 :
279 : // No EOF: a zero-length datagram is valid (success with 0 bytes).
280 36 : decode_io_result(
281 : op.ec_out,
282 18 : op.cancelled.load(std::memory_order_acquire),
283 18 : op.errn != 0 ? make_err(op.errn) : std::error_code{},
284 : /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
285 :
286 18 : *op.bytes_out = op.bytes_transferred;
287 :
288 18 : coro_resume(&op);
289 18 : }
290 :
291 : /** Complete a datagram operation with source endpoint capture.
292 :
293 : For recv_from operations, writes the source endpoint from the
294 : recorded sockaddr_storage into the caller's endpoint pointer.
295 : Then resumes the caller via symmetric transfer.
296 :
297 : @tparam Op The concrete datagram operation type.
298 : @param op The operation to complete.
299 : @param source_out Optional pointer to store source endpoint
300 : (non-null for recv_from, null for send_to).
301 : */
302 : template<typename Op, typename Endpoint>
303 : void
304 26 : complete_datagram_op(Op& op, Endpoint* source_out)
305 : {
306 26 : op.stop_cb.reset();
307 26 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
308 :
309 : // No EOF: a zero-length datagram is valid (success with 0 bytes).
310 52 : decode_io_result(
311 : op.ec_out,
312 26 : op.cancelled.load(std::memory_order_acquire),
313 26 : op.errn != 0 ? make_err(op.errn) : std::error_code{},
314 : /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
315 :
316 26 : *op.bytes_out = op.bytes_transferred;
317 :
318 36 : if (source_out && !op.cancelled.load(std::memory_order_acquire) &&
319 10 : op.errn == 0)
320 20 : *source_out = from_sockaddr_as(
321 10 : op.source_storage,
322 : op.source_addrlen,
323 : Endpoint{});
324 :
325 26 : coro_resume(&op);
326 26 : }
327 :
328 : } // namespace boost::corosio::detail
329 :
330 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
|