saluki_env/autodiscovery/providers/
remote_agent.rs1use std::time::Duration;
2
3use async_trait::async_trait;
4use futures::StreamExt;
5use tokio::sync::broadcast::{self, Receiver, Sender};
6use tokio::sync::OnceCell;
7use tracing::{debug, info, warn};
8
9use crate::autodiscovery::{AutodiscoveryEvent, AutodiscoveryProvider};
10use crate::helpers::remote_agent::RemoteAgentClient;
11
12pub struct RemoteAgentAutodiscoveryProvider {
14 client: RemoteAgentClient,
15 sender: Sender<AutodiscoveryEvent>,
16 listener_init: OnceCell<()>,
17}
18
19impl RemoteAgentAutodiscoveryProvider {
20 pub fn new(client: RemoteAgentClient) -> Self {
22 let (sender, _) = broadcast::channel::<AutodiscoveryEvent>(super::AD_STREAM_CAPACITY);
23
24 Self {
25 client,
26 sender,
27 listener_init: OnceCell::new(),
28 }
29 }
30
31 async fn start_background_listener(&self) {
32 debug!("Starting autodiscovery background listener.");
33
34 let mut client = self.client.clone();
35 let sender = self.sender.clone();
36
37 tokio::spawn(async move {
38 info!("Listening to autodiscovery events from remote agent.");
39
40 loop {
41 let mut autodiscovery_stream = client.get_autodiscovery_stream();
42
43 debug!("Polling autodiscovery event stream.");
44
45 while let Some(result) = autodiscovery_stream.next().await {
46 match result {
47 Ok(response) => {
48 for proto_config in response.configs {
49 let event = AutodiscoveryEvent::from(proto_config);
50 let _ = sender.send(event);
51 }
52 }
53 Err(status) => {
54 warn!(
55 ?status,
56 "Encountered error while listening for autodiscovery events. Retrying in 1 second..."
57 );
58 tokio::time::sleep(Duration::from_secs(1)).await;
59 break;
60 }
61 }
62 }
63 }
64 });
65 }
66}
67
68#[async_trait]
69impl AutodiscoveryProvider for RemoteAgentAutodiscoveryProvider {
70 async fn subscribe(&self) -> Option<Receiver<AutodiscoveryEvent>> {
71 self.listener_init
72 .get_or_init(|| async {
73 self.start_background_listener().await;
74 })
75 .await;
76
77 Some(self.sender.subscribe())
78 }
79}