saluki_env/autodiscovery/providers/
remote_agent.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use futures::StreamExt;
5use tokio::sync::broadcast::{self, Receiver, Sender};
6use tokio::sync::OnceCell;
7use tracing::{debug, info, warn};
8
9use crate::autodiscovery::{AutodiscoveryEvent, AutodiscoveryProvider};
10use crate::helpers::remote_agent::RemoteAgentClient;
11
12/// An autodiscovery provider that uses the Datadog Agent's internal gRPC API to receive autodiscovery updates.
13pub struct RemoteAgentAutodiscoveryProvider {
14    client: RemoteAgentClient,
15    sender: Sender<AutodiscoveryEvent>,
16    listener_init: OnceCell<()>,
17}
18
19impl RemoteAgentAutodiscoveryProvider {
20    /// Creates a new `RemoteAgentAutodiscoveryProvider` that uses the remote client to receive autodiscovery updates.
21    pub fn new(client: RemoteAgentClient) -> Self {
22        let (sender, _) = broadcast::channel::<AutodiscoveryEvent>(super::AD_STREAM_CAPACITY);
23
24        Self {
25            client,
26            sender,
27            listener_init: OnceCell::new(),
28        }
29    }
30
31    async fn start_background_listener(&self) {
32        debug!("Starting autodiscovery background listener.");
33
34        let mut client = self.client.clone();
35        let sender = self.sender.clone();
36
37        tokio::spawn(async move {
38            info!("Listening to autodiscovery events from remote agent.");
39
40            loop {
41                let mut autodiscovery_stream = client.get_autodiscovery_stream();
42
43                debug!("Polling autodiscovery event stream.");
44
45                while let Some(result) = autodiscovery_stream.next().await {
46                    match result {
47                        Ok(response) => {
48                            for proto_config in response.configs {
49                                let event = AutodiscoveryEvent::from(proto_config);
50                                let _ = sender.send(event);
51                            }
52                        }
53                        Err(status) => {
54                            warn!(
55                                ?status,
56                                "Encountered error while listening for autodiscovery events.  Retrying in 1 second..."
57                            );
58                            tokio::time::sleep(Duration::from_secs(1)).await;
59                            break;
60                        }
61                    }
62                }
63            }
64        });
65    }
66}
67
68#[async_trait]
69impl AutodiscoveryProvider for RemoteAgentAutodiscoveryProvider {
70    async fn subscribe(&self) -> Option<Receiver<AutodiscoveryEvent>> {
71        self.listener_init
72            .get_or_init(|| async {
73                self.start_background_listener().await;
74            })
75            .await;
76
77        Some(self.sender.subscribe())
78    }
79}