Skip to main content

saluki_env/helpers/remote_agent/
session.rs

1use std::{
2    fmt,
3    sync::{Arc, Mutex},
4};
5
6use saluki_error::{ErrorContext as _, GenericError};
7use tokio::sync::Notify;
8use tonic::metadata::{Ascii, MetadataValue};
9
10/// A session ID for a register remote agent.
11///
12/// Session IDs are acquired when registering as a remote agent with the Datadog Agent. This session ID must be provided
13/// when refreshing a remote agent's registration, as well as when streaming configuration updates. To ease passing this
14/// in the necessary format, and with the required validation, this struct provides a type-safe to carry around the
15/// session ID compared to a raw string.
16#[derive(Debug)]
17pub struct SessionId(MetadataValue<Ascii>);
18
19impl SessionId {
20    /// Creates a new `SessionId` from the given string.
21    ///
22    /// # Errors
23    ///
24    /// If the given string is not valid ASCII, an error is returned.
25    pub fn new(session_id: &str) -> Result<Self, GenericError> {
26        MetadataValue::try_from(session_id)
27            .map(Self)
28            .error_context("Session ID is not valid ASCII")
29    }
30
31    /// Returns a reference to the string representation of the session ID.
32    pub fn as_str(&self) -> &str {
33        self.0
34            .to_str()
35            .expect("session ID is ensured to be valid ASCII on creation")
36    }
37
38    /// Returns the session ID as a metadata header value.
39    pub fn to_grpc_header_value(&self) -> MetadataValue<Ascii> {
40        self.0.clone()
41    }
42}
43
44impl fmt::Display for SessionId {
45    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46        write!(f, "{}", self.as_str())
47    }
48}
49
50#[derive(Debug, Default)]
51struct SessionIdHandleInner {
52    session_id: Mutex<Option<SessionId>>,
53    change_notify: Notify,
54}
55
56/// A handle for a dynamically-updated session ID.
57///
58/// This handle allows sharing a session ID across multiple components while allowing it to be centrally updated in the
59/// case of refresh/re-registration.
60#[derive(Clone, Debug)]
61pub struct SessionIdHandle {
62    inner: Arc<SessionIdHandleInner>,
63}
64
65impl SessionIdHandle {
66    /// Creates a new `SessionIdHandle` with no session ID.
67    pub fn empty() -> Self {
68        Self {
69            inner: Arc::new(SessionIdHandleInner::default()),
70        }
71    }
72
73    /// Updates the current session ID to the given value.
74    pub fn update(&self, new_session_id: Option<SessionId>) {
75        if let Ok(mut session_id) = self.inner.session_id.lock() {
76            *session_id = new_session_id;
77            self.inner.change_notify.notify_waiters();
78        }
79    }
80
81    /// Gets the current session ID.
82    pub fn get(&self) -> Option<SessionId> {
83        self.inner
84            .session_id
85            .lock()
86            .ok()
87            .and_then(|s| (*s).as_ref().map(|session_id| SessionId(session_id.0.clone())))
88    }
89
90    /// Waits until the session ID is set to a non-empty value and returns it.
91    pub async fn wait_for_update(&self) -> SessionId {
92        loop {
93            let updated = self.inner.change_notify.notified();
94            if let Some(session_id) = self.get() {
95                return session_id;
96            }
97
98            updated.await;
99        }
100    }
101}