saluki_io/net/
listener.rs

1//! Network listeners.
2use std::{future::pending, io};
3
4use snafu::{ResultExt as _, Snafu};
5use tokio::net::{TcpListener, UdpSocket};
6
7use super::{
8    addr::ListenAddress,
9    stream::{Connection, Stream},
10    unix::{enable_uds_socket_credentials, ensure_unix_socket_free, set_unix_socket_write_only},
11};
12
13/// A listener error.
14#[derive(Debug, Snafu)]
15#[snafu(context(suffix(false)))]
16pub enum ListenerError {
17    /// An invalid configuration was given when creating the listener.
18    #[snafu(display("invalid configuration: {}", reason))]
19    InvalidConfiguration {
20        /// Cause of the invalid configuration.
21        reason: &'static str,
22    },
23
24    /// Failed to bind to the listen address.
25    #[snafu(display("failed to bind to listen address {}: {}", address, source))]
26    FailedToBind {
27        /// Listen address.
28        address: ListenAddress,
29
30        /// Source of the error.
31        source: io::Error,
32    },
33
34    /// Failed to configure a setting on the listening socket.
35    #[snafu(display("failed to configure {} for listener on address {}: {}", setting, address, source))]
36    FailedToConfigureListener {
37        /// Listen address.
38        address: ListenAddress,
39
40        /// Name of the setting.
41        setting: &'static str,
42
43        /// Source of the error.
44        source: io::Error,
45    },
46
47    /// Failed to configure a setting on an accepted stream.
48    #[snafu(display("failed to configure {} for {} stream: {}", setting, stream_type, source))]
49    FailedToConfigureStream {
50        /// Name of the setting.
51        setting: &'static str,
52
53        /// Type of stream.
54        stream_type: &'static str,
55
56        /// Source of the error.
57        source: io::Error,
58    },
59
60    /// Failed to accept a new stream from the listener.
61    #[snafu(display("failed to accept new stream for listener on address {}: {}", address, source))]
62    FailedToAccept {
63        /// Listen address.
64        address: ListenAddress,
65
66        /// Source of the error.
67        source: io::Error,
68    },
69}
70
71enum ListenerInner {
72    Tcp(TcpListener),
73    Udp(Option<UdpSocket>),
74    #[cfg(unix)]
75    Unixgram(Option<tokio::net::UnixDatagram>),
76    #[cfg(unix)]
77    Unix(tokio::net::UnixListener),
78}
79
80/// A network listener.
81///
82/// `Listener` is a abstract listener that works in conjunction with `Stream`, providing the ability to listen on
83/// arbitrary addresses and accept new streams of that address family.
84///
85/// ## Connection-oriented vs connectionless listeners
86///
87/// For listeners on connection-oriented address families (e.g. TCP, Unix domain sockets in stream mode), the listener
88/// will listen for an accept new connections in the typical fashion. However, for connectionless address families
89/// (e.g. UDP, Unix domain sockets in datagram mode), there is no concept of a "connection" and so nothing to be
90/// continually "accepted". Instead, `Listener` will emit a single `Stream` that can be used to send and receive data
91/// from multiple remote peers.
92///
93/// ## Missing
94///
95/// - Ability to configure `Listener` to emit multiple streams for connectionless address families, allowing for load
96///   balancing. (Only possible for UDP via SO_REUSEPORT, as UDS does not support SO_REUSEPORT.)
97pub struct Listener {
98    listen_address: ListenAddress,
99    inner: ListenerInner,
100}
101
102impl Listener {
103    /// Creates a new `Listener` from the given listen address.
104    ///
105    /// ## Errors
106    ///
107    /// If the listen address cannot be bound, or if the listener cannot be configured correctly, an error is returned.
108    pub async fn from_listen_address(listen_address: ListenAddress) -> Result<Self, ListenerError> {
109        let inner = match &listen_address {
110            ListenAddress::Tcp(addr) => {
111                TcpListener::bind(addr)
112                    .await
113                    .map(ListenerInner::Tcp)
114                    .context(FailedToBind {
115                        address: listen_address.clone(),
116                    })?
117            }
118            ListenAddress::Udp(addr) => {
119                UdpSocket::bind(addr)
120                    .await
121                    .map(Some)
122                    .map(ListenerInner::Udp)
123                    .context(FailedToBind {
124                        address: listen_address.clone(),
125                    })?
126            }
127            #[cfg(unix)]
128            ListenAddress::Unixgram(addr) => {
129                ensure_unix_socket_free(addr).await.context(FailedToBind {
130                    address: listen_address.clone(),
131                })?;
132
133                let listener = tokio::net::UnixDatagram::bind(addr)
134                    .map(Some)
135                    .map(ListenerInner::Unixgram)
136                    .context(FailedToBind {
137                        address: listen_address.clone(),
138                    })?;
139
140                set_unix_socket_write_only(addr)
141                    .await
142                    .context(FailedToConfigureListener {
143                        address: listen_address.clone(),
144                        setting: "read/write permissions",
145                    })?;
146
147                listener
148            }
149            #[cfg(unix)]
150            ListenAddress::Unix(addr) => {
151                ensure_unix_socket_free(addr).await.context(FailedToBind {
152                    address: listen_address.clone(),
153                })?;
154
155                let listener = tokio::net::UnixListener::bind(addr)
156                    .map(ListenerInner::Unix)
157                    .context(FailedToBind {
158                        address: listen_address.clone(),
159                    })?;
160                set_unix_socket_write_only(addr)
161                    .await
162                    .context(FailedToConfigureListener {
163                        address: listen_address.clone(),
164                        setting: "read/write permissions",
165                    })?;
166
167                listener
168            }
169        };
170
171        Ok(Self { listen_address, inner })
172    }
173
174    /// Gets a reference to the listen address.
175    pub fn listen_address(&self) -> &ListenAddress {
176        &self.listen_address
177    }
178
179    /// Accepts a new stream from the listener.
180    ///
181    /// For connection-oriented address families, this will accept a new connection and return a `Stream` that is bound
182    /// to that remote peer. For connectionless address families, this will return a single `Stream` that will receive
183    /// from multiple remote peers, and no further streams will be emitted.
184    ///
185    /// ## Errors
186    ///
187    /// If the listener fails to accept a new stream, or if the accepted stream cannot be configured correctly, an error
188    /// is returned.
189    pub async fn accept(&mut self) -> Result<Stream, ListenerError> {
190        match &mut self.inner {
191            ListenerInner::Tcp(tcp) => tcp.accept().await.map(Into::into).context(FailedToAccept {
192                address: self.listen_address.clone(),
193            }),
194            ListenerInner::Udp(udp) => {
195                // TODO: We only emit a single stream here, but we _could_ do something like an internal configuration
196                // to allow for multiple streams to be emitted, where the socket is bound via SO_REUSEPORT and then we
197                // get load balancing between the sockets.... basically make it possible to parallelize UDP handling if
198                // that's a thing we want to do.
199                if let Some(socket) = udp.take() {
200                    Ok(socket.into())
201                } else {
202                    pending().await
203                }
204            }
205            #[cfg(unix)]
206            ListenerInner::Unixgram(unix) => {
207                if let Some(socket) = unix.take() {
208                    enable_uds_socket_credentials(&socket).context(FailedToConfigureStream {
209                        setting: "SO_PASSCRED",
210                        stream_type: "UDS (datagram)",
211                    })?;
212                    Ok(socket.into())
213                } else {
214                    pending().await
215                }
216            }
217            #[cfg(unix)]
218            ListenerInner::Unix(unix) => unix
219                .accept()
220                .await
221                .context(FailedToAccept {
222                    address: self.listen_address.clone(),
223                })
224                .and_then(|(socket, _)| {
225                    enable_uds_socket_credentials(&socket).context(FailedToConfigureStream {
226                        setting: "SO_PASSCRED",
227                        stream_type: "UDS (stream)",
228                    })?;
229                    Ok(socket.into())
230                }),
231        }
232    }
233}
234
235enum ConnectionOrientedListenerInner {
236    Tcp(TcpListener),
237    #[cfg(unix)]
238    Unix(tokio::net::UnixListener),
239}
240
241/// A connection-oriented network listener.
242///
243/// `ConnectionOrientedListener` is conceptually the same as `Listener`, but specifically works with connection-oriented
244/// protocols. This variant is provided to facilitate usages where only a connection-oriented stream makes sense, such
245/// as an HTTP server.
246pub struct ConnectionOrientedListener {
247    listen_address: ListenAddress,
248    inner: ConnectionOrientedListenerInner,
249}
250
251impl ConnectionOrientedListener {
252    /// Creates a new `ConnectionOrientedListener` from the given listen address.
253    ///
254    /// ## Errors
255    ///
256    /// If the listen address is not a connection-oriented address family, or if the listen address cannot be bound, or
257    /// if the listener cannot be configured correctly, an error is returned.
258    pub async fn from_listen_address(listen_address: ListenAddress) -> Result<Self, ListenerError> {
259        let inner = match &listen_address {
260            ListenAddress::Tcp(addr) => TcpListener::bind(addr)
261                .await
262                .map(ConnectionOrientedListenerInner::Tcp)
263                .context(FailedToBind {
264                    address: listen_address.clone(),
265                })?,
266            #[cfg(unix)]
267            ListenAddress::Unix(addr) => {
268                ensure_unix_socket_free(addr).await.context(FailedToBind {
269                    address: listen_address.clone(),
270                })?;
271
272                let listener = tokio::net::UnixListener::bind(addr)
273                    .map(ConnectionOrientedListenerInner::Unix)
274                    .context(FailedToBind {
275                        address: listen_address.clone(),
276                    })?;
277                set_unix_socket_write_only(addr)
278                    .await
279                    .context(FailedToConfigureListener {
280                        address: listen_address.clone(),
281                        setting: "read/write permissions",
282                    })?;
283
284                listener
285            }
286            _ => {
287                return Err(ListenerError::InvalidConfiguration {
288                    reason: "only TCP and Unix listen addresses are supported",
289                })
290            }
291        };
292
293        Ok(Self { listen_address, inner })
294    }
295
296    /// Gets a reference to the listen address.
297    pub fn listen_address(&self) -> &ListenAddress {
298        &self.listen_address
299    }
300
301    /// Accepts a new connection from the listener.
302    ///
303    /// ## Errors
304    ///
305    /// If the listener fails to accept a new connection, or if the accepted connection cannot be configured correctly,
306    /// an error is returned.
307    pub async fn accept(&mut self) -> Result<Connection, ListenerError> {
308        match &mut self.inner {
309            ConnectionOrientedListenerInner::Tcp(tcp) => tcp
310                .accept()
311                .await
312                .map(|(stream, addr)| Connection::Tcp(stream, addr))
313                .context(FailedToAccept {
314                    address: self.listen_address.clone(),
315                }),
316            #[cfg(unix)]
317            ConnectionOrientedListenerInner::Unix(unix) => unix
318                .accept()
319                .await
320                .context(FailedToAccept {
321                    address: self.listen_address.clone(),
322                })
323                .and_then(|(socket, _)| {
324                    enable_uds_socket_credentials(&socket).context(FailedToConfigureStream {
325                        setting: "SO_PASSCRED",
326                        stream_type: "UDS (stream)",
327                    })?;
328                    Ok(Connection::Unix(socket))
329                }),
330        }
331    }
332}