saluki_io/net/listener.rs
1//! Network listeners.
2use std::{future::pending, io};
3
4use snafu::{ResultExt as _, Snafu};
5use tokio::net::{TcpListener, UdpSocket};
6
7use super::{
8 addr::ListenAddress,
9 stream::{Connection, Stream},
10 unix::{enable_uds_socket_credentials, ensure_unix_socket_free, set_unix_socket_write_only},
11};
12
13/// A listener error.
14#[derive(Debug, Snafu)]
15#[snafu(context(suffix(false)))]
16pub enum ListenerError {
17 /// An invalid configuration was given when creating the listener.
18 #[snafu(display("invalid configuration: {}", reason))]
19 InvalidConfiguration {
20 /// Cause of the invalid configuration.
21 reason: &'static str,
22 },
23
24 /// Failed to bind to the listen address.
25 #[snafu(display("failed to bind to listen address {}: {}", address, source))]
26 FailedToBind {
27 /// Listen address.
28 address: ListenAddress,
29
30 /// Source of the error.
31 source: io::Error,
32 },
33
34 /// Failed to configure a setting on the listening socket.
35 #[snafu(display("failed to configure {} for listener on address {}: {}", setting, address, source))]
36 FailedToConfigureListener {
37 /// Listen address.
38 address: ListenAddress,
39
40 /// Name of the setting.
41 setting: &'static str,
42
43 /// Source of the error.
44 source: io::Error,
45 },
46
47 /// Failed to configure a setting on an accepted stream.
48 #[snafu(display("failed to configure {} for {} stream: {}", setting, stream_type, source))]
49 FailedToConfigureStream {
50 /// Name of the setting.
51 setting: &'static str,
52
53 /// Type of stream.
54 stream_type: &'static str,
55
56 /// Source of the error.
57 source: io::Error,
58 },
59
60 /// Failed to accept a new stream from the listener.
61 #[snafu(display("failed to accept new stream for listener on address {}: {}", address, source))]
62 FailedToAccept {
63 /// Listen address.
64 address: ListenAddress,
65
66 /// Source of the error.
67 source: io::Error,
68 },
69}
70
71enum ListenerInner {
72 Tcp(TcpListener),
73 Udp(Option<UdpSocket>),
74 #[cfg(unix)]
75 Unixgram(Option<tokio::net::UnixDatagram>),
76 #[cfg(unix)]
77 Unix(tokio::net::UnixListener),
78}
79
80/// A network listener.
81///
82/// `Listener` is a abstract listener that works in conjunction with `Stream`, providing the ability to listen on
83/// arbitrary addresses and accept new streams of that address family.
84///
85/// ## Connection-oriented vs connectionless listeners
86///
87/// For listeners on connection-oriented address families (e.g. TCP, Unix domain sockets in stream mode), the listener
88/// will listen for an accept new connections in the typical fashion. However, for connectionless address families
89/// (e.g. UDP, Unix domain sockets in datagram mode), there is no concept of a "connection" and so nothing to be
90/// continually "accepted". Instead, `Listener` will emit a single `Stream` that can be used to send and receive data
91/// from multiple remote peers.
92///
93/// ## Missing
94///
95/// - Ability to configure `Listener` to emit multiple streams for connectionless address families, allowing for load
96/// balancing. (Only possible for UDP via SO_REUSEPORT, as UDS does not support SO_REUSEPORT.)
97pub struct Listener {
98 listen_address: ListenAddress,
99 inner: ListenerInner,
100}
101
102impl Listener {
103 /// Creates a new `Listener` from the given listen address.
104 ///
105 /// ## Errors
106 ///
107 /// If the listen address cannot be bound, or if the listener cannot be configured correctly, an error is returned.
108 pub async fn from_listen_address(listen_address: ListenAddress) -> Result<Self, ListenerError> {
109 let inner = match &listen_address {
110 ListenAddress::Tcp(addr) => {
111 TcpListener::bind(addr)
112 .await
113 .map(ListenerInner::Tcp)
114 .context(FailedToBind {
115 address: listen_address.clone(),
116 })?
117 }
118 ListenAddress::Udp(addr) => {
119 UdpSocket::bind(addr)
120 .await
121 .map(Some)
122 .map(ListenerInner::Udp)
123 .context(FailedToBind {
124 address: listen_address.clone(),
125 })?
126 }
127 #[cfg(unix)]
128 ListenAddress::Unixgram(addr) => {
129 ensure_unix_socket_free(addr).await.context(FailedToBind {
130 address: listen_address.clone(),
131 })?;
132
133 let listener = tokio::net::UnixDatagram::bind(addr)
134 .map(Some)
135 .map(ListenerInner::Unixgram)
136 .context(FailedToBind {
137 address: listen_address.clone(),
138 })?;
139
140 set_unix_socket_write_only(addr)
141 .await
142 .context(FailedToConfigureListener {
143 address: listen_address.clone(),
144 setting: "read/write permissions",
145 })?;
146
147 listener
148 }
149 #[cfg(unix)]
150 ListenAddress::Unix(addr) => {
151 ensure_unix_socket_free(addr).await.context(FailedToBind {
152 address: listen_address.clone(),
153 })?;
154
155 let listener = tokio::net::UnixListener::bind(addr)
156 .map(ListenerInner::Unix)
157 .context(FailedToBind {
158 address: listen_address.clone(),
159 })?;
160 set_unix_socket_write_only(addr)
161 .await
162 .context(FailedToConfigureListener {
163 address: listen_address.clone(),
164 setting: "read/write permissions",
165 })?;
166
167 listener
168 }
169 };
170
171 Ok(Self { listen_address, inner })
172 }
173
174 /// Gets a reference to the listen address.
175 pub fn listen_address(&self) -> &ListenAddress {
176 &self.listen_address
177 }
178
179 /// Accepts a new stream from the listener.
180 ///
181 /// For connection-oriented address families, this will accept a new connection and return a `Stream` that is bound
182 /// to that remote peer. For connectionless address families, this will return a single `Stream` that will receive
183 /// from multiple remote peers, and no further streams will be emitted.
184 ///
185 /// ## Errors
186 ///
187 /// If the listener fails to accept a new stream, or if the accepted stream cannot be configured correctly, an error
188 /// is returned.
189 pub async fn accept(&mut self) -> Result<Stream, ListenerError> {
190 match &mut self.inner {
191 ListenerInner::Tcp(tcp) => tcp.accept().await.map(Into::into).context(FailedToAccept {
192 address: self.listen_address.clone(),
193 }),
194 ListenerInner::Udp(udp) => {
195 // TODO: We only emit a single stream here, but we _could_ do something like an internal configuration
196 // to allow for multiple streams to be emitted, where the socket is bound via SO_REUSEPORT and then we
197 // get load balancing between the sockets.... basically make it possible to parallelize UDP handling if
198 // that's a thing we want to do.
199 if let Some(socket) = udp.take() {
200 Ok(socket.into())
201 } else {
202 pending().await
203 }
204 }
205 #[cfg(unix)]
206 ListenerInner::Unixgram(unix) => {
207 if let Some(socket) = unix.take() {
208 enable_uds_socket_credentials(&socket).context(FailedToConfigureStream {
209 setting: "SO_PASSCRED",
210 stream_type: "UDS (datagram)",
211 })?;
212 Ok(socket.into())
213 } else {
214 pending().await
215 }
216 }
217 #[cfg(unix)]
218 ListenerInner::Unix(unix) => unix
219 .accept()
220 .await
221 .context(FailedToAccept {
222 address: self.listen_address.clone(),
223 })
224 .and_then(|(socket, _)| {
225 enable_uds_socket_credentials(&socket).context(FailedToConfigureStream {
226 setting: "SO_PASSCRED",
227 stream_type: "UDS (stream)",
228 })?;
229 Ok(socket.into())
230 }),
231 }
232 }
233}
234
235enum ConnectionOrientedListenerInner {
236 Tcp(TcpListener),
237 #[cfg(unix)]
238 Unix(tokio::net::UnixListener),
239}
240
241/// A connection-oriented network listener.
242///
243/// `ConnectionOrientedListener` is conceptually the same as `Listener`, but specifically works with connection-oriented
244/// protocols. This variant is provided to facilitate usages where only a connection-oriented stream makes sense, such
245/// as an HTTP server.
246pub struct ConnectionOrientedListener {
247 listen_address: ListenAddress,
248 inner: ConnectionOrientedListenerInner,
249}
250
251impl ConnectionOrientedListener {
252 /// Creates a new `ConnectionOrientedListener` from the given listen address.
253 ///
254 /// ## Errors
255 ///
256 /// If the listen address is not a connection-oriented address family, or if the listen address cannot be bound, or
257 /// if the listener cannot be configured correctly, an error is returned.
258 pub async fn from_listen_address(listen_address: ListenAddress) -> Result<Self, ListenerError> {
259 let inner = match &listen_address {
260 ListenAddress::Tcp(addr) => TcpListener::bind(addr)
261 .await
262 .map(ConnectionOrientedListenerInner::Tcp)
263 .context(FailedToBind {
264 address: listen_address.clone(),
265 })?,
266 #[cfg(unix)]
267 ListenAddress::Unix(addr) => {
268 ensure_unix_socket_free(addr).await.context(FailedToBind {
269 address: listen_address.clone(),
270 })?;
271
272 let listener = tokio::net::UnixListener::bind(addr)
273 .map(ConnectionOrientedListenerInner::Unix)
274 .context(FailedToBind {
275 address: listen_address.clone(),
276 })?;
277 set_unix_socket_write_only(addr)
278 .await
279 .context(FailedToConfigureListener {
280 address: listen_address.clone(),
281 setting: "read/write permissions",
282 })?;
283
284 listener
285 }
286 _ => {
287 return Err(ListenerError::InvalidConfiguration {
288 reason: "only TCP and Unix listen addresses are supported",
289 })
290 }
291 };
292
293 Ok(Self { listen_address, inner })
294 }
295
296 /// Gets a reference to the listen address.
297 pub fn listen_address(&self) -> &ListenAddress {
298 &self.listen_address
299 }
300
301 /// Accepts a new connection from the listener.
302 ///
303 /// ## Errors
304 ///
305 /// If the listener fails to accept a new connection, or if the accepted connection cannot be configured correctly,
306 /// an error is returned.
307 pub async fn accept(&mut self) -> Result<Connection, ListenerError> {
308 match &mut self.inner {
309 ConnectionOrientedListenerInner::Tcp(tcp) => tcp
310 .accept()
311 .await
312 .map(|(stream, addr)| Connection::Tcp(stream, addr))
313 .context(FailedToAccept {
314 address: self.listen_address.clone(),
315 }),
316 #[cfg(unix)]
317 ConnectionOrientedListenerInner::Unix(unix) => unix
318 .accept()
319 .await
320 .context(FailedToAccept {
321 address: self.listen_address.clone(),
322 })
323 .and_then(|(socket, _)| {
324 enable_uds_socket_credentials(&socket).context(FailedToConfigureStream {
325 setting: "SO_PASSCRED",
326 stream_type: "UDS (stream)",
327 })?;
328 Ok(Connection::Unix(socket))
329 }),
330 }
331 }
332}