Skip to main content

datadog_agent_commons/ipc/
session.rs

1//! IPC session management.
2
3use std::{
4    fmt,
5    sync::{Arc, Mutex},
6};
7
8use saluki_error::{ErrorContext as _, GenericError};
9use tokio::sync::Notify;
10use tonic::metadata::{Ascii, MetadataValue};
11
12/// Remote agent session ID.
13///
14/// Session IDs are acquired when registering as a remote agent with the Datadog Agent. This session ID must be provided
15/// when refreshing a remote agent's registration, as well as when streaming configuration updates. To ease passing this
16/// in the necessary format, and with the required validation, this struct provides a type-safe to carry around the
17/// session ID compared to a raw string.
18#[derive(Debug)]
19pub struct SessionId(MetadataValue<Ascii>);
20
21impl SessionId {
22    /// Creates a new `SessionId` from the given string.
23    ///
24    /// # Errors
25    ///
26    /// If the given string isn't valid ASCII, an error is returned.
27    pub fn new(session_id: &str) -> Result<Self, GenericError> {
28        MetadataValue::try_from(session_id)
29            .map(Self)
30            .error_context("Session ID is not valid ASCII")
31    }
32
33    /// Returns a reference to the string representation of the session ID.
34    pub fn as_str(&self) -> &str {
35        self.0
36            .to_str()
37            .expect("session ID is ensured to be valid ASCII on creation")
38    }
39
40    /// Returns the session ID as a metadata header value.
41    pub fn to_grpc_header_value(&self) -> MetadataValue<Ascii> {
42        self.0.clone()
43    }
44}
45
46impl fmt::Display for SessionId {
47    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48        write!(f, "{}", self.as_str())
49    }
50}
51
52#[derive(Debug, Default)]
53struct SessionIdHandleInner {
54    session_id: Mutex<Option<SessionId>>,
55    change_notify: Notify,
56}
57
58/// A handle for a dynamically updated session ID.
59///
60/// This handle allows sharing a session ID across multiple components while allowing it to be centrally updated in the
61/// case of refresh/re-registration.
62#[derive(Clone, Debug)]
63pub struct SessionIdHandle {
64    inner: Arc<SessionIdHandleInner>,
65}
66
67impl SessionIdHandle {
68    /// Creates a new `SessionIdHandle` with no session ID.
69    pub fn empty() -> Self {
70        Self {
71            inner: Arc::new(SessionIdHandleInner::default()),
72        }
73    }
74
75    /// Updates the current session ID to the given value.
76    pub fn update(&self, new_session_id: Option<SessionId>) {
77        if let Ok(mut session_id) = self.inner.session_id.lock() {
78            *session_id = new_session_id;
79            self.inner.change_notify.notify_waiters();
80        }
81    }
82
83    /// Gets the current session ID.
84    pub fn get(&self) -> Option<SessionId> {
85        self.inner
86            .session_id
87            .lock()
88            .ok()
89            .and_then(|s| (*s).as_ref().map(|session_id| SessionId(session_id.0.clone())))
90    }
91
92    /// Waits until the session ID is set to a non-empty value and returns it.
93    pub async fn wait_for_update(&self) -> SessionId {
94        loop {
95            let updated = self.inner.change_notify.notified();
96            if let Some(session_id) = self.get() {
97                return session_id;
98            }
99
100            updated.await;
101        }
102    }
103}