saluki_env/workload/providers/remote_agent/
api.rs1use saluki_api::{
2 extract::State,
3 response::IntoResponse,
4 routing::{get, Router},
5 APIHandler,
6};
7use saluki_common::collections::{FastHashMap, FastHashSet};
8use saluki_context::{
9 origin::OriginTagCardinality,
10 tags::{SharedTagSet, TagSet},
11};
12use serde::{ser::SerializeSeq as _, Serialize};
13
14use crate::workload::{
15 entity::HighestPrecedenceEntityIdRef,
16 stores::{ExternalDataStoreResolver, TagStoreQuerier},
17 EntityId,
18};
19
20#[derive(Serialize)]
21struct EntityTags {
22 low_cardinality: SharedTagSet,
23 orchestrator_cardinality: SharedTagSet,
24 high_cardinality: SharedTagSet,
25}
26
27#[derive(Serialize)]
28struct EntityInformation<'a> {
29 entity_id: &'a EntityId,
30
31 #[serde(skip_serializing_if = "Option::is_none")]
32 alias: Option<&'a EntityId>,
33
34 tags: EntityTags,
35}
36
37#[derive(Serialize)]
38struct ExternalDataInformation<'a> {
39 pod_uid: &'a str,
40 container_name: &'a str,
41 init_container: bool,
42 container_id: &'a EntityId,
43}
44
45#[derive(Clone)]
47pub struct RemoteAgentWorkloadState {
48 tag_querier: TagStoreQuerier,
49 eds_resolver: ExternalDataStoreResolver,
50}
51
52impl RemoteAgentWorkloadState {
53 fn get_tags_dump_response(&self) -> String {
54 let mut active_entities = FastHashSet::default();
55 let mut entity_aliases = FastHashMap::default();
56 let mut entity_info_map = FastHashMap::default();
57 let empty_tagset = TagSet::default().into_shared();
58
59 self.tag_querier.visit_active_entities(|entity_id| {
62 active_entities.insert(entity_id.clone());
63 });
64
65 self.tag_querier.visit_entity_aliases(|entity_id, target_entity_id| {
66 active_entities.insert(entity_id.clone());
67 entity_aliases.insert(entity_id.clone(), target_entity_id.clone());
68 });
69
70 for entity_id in active_entities.iter() {
72 let alias = entity_aliases.get(entity_id);
73 let low_cardinality_tags = self
74 .tag_querier
75 .get_exact_entity_tags(entity_id, OriginTagCardinality::Low)
76 .unwrap_or_else(|| empty_tagset.clone());
77 let orchestrator_cardinality_tags = self
78 .tag_querier
79 .get_exact_entity_tags(entity_id, OriginTagCardinality::Orchestrator)
80 .unwrap_or_else(|| empty_tagset.clone());
81 let high_cardinality_tags = self
82 .tag_querier
83 .get_exact_entity_tags(entity_id, OriginTagCardinality::High)
84 .unwrap_or_else(|| empty_tagset.clone());
85
86 let tags = EntityTags {
87 low_cardinality: low_cardinality_tags,
88 orchestrator_cardinality: orchestrator_cardinality_tags,
89 high_cardinality: high_cardinality_tags,
90 };
91
92 entity_info_map.insert(entity_id, EntityInformation { entity_id, alias, tags });
93 }
94
95 let mut entity_info = entity_info_map.into_values().collect::<Vec<_>>();
98 entity_info.sort_by_cached_key(|entity_info| HighestPrecedenceEntityIdRef::from(entity_info.entity_id));
99
100 serde_json::to_string(&entity_info).unwrap()
101 }
102
103 fn get_eds_dump_response(&self) -> String {
104 let eds_serializer = ExternalDataSerializer {
105 eds_resolver: &self.eds_resolver,
106 };
107 serde_json::to_string(&eds_serializer).unwrap()
108 }
109}
110
111pub struct RemoteAgentWorkloadAPIHandler {
125 state: RemoteAgentWorkloadState,
126}
127
128impl RemoteAgentWorkloadAPIHandler {
129 pub(crate) fn from_state(tag_querier: TagStoreQuerier, eds_resolver: ExternalDataStoreResolver) -> Self {
130 Self {
131 state: RemoteAgentWorkloadState {
132 tag_querier,
133 eds_resolver,
134 },
135 }
136 }
137
138 async fn tags_dump_handler(State(state): State<RemoteAgentWorkloadState>) -> impl IntoResponse {
139 state.get_tags_dump_response()
140 }
141
142 async fn eds_dump_handler(State(state): State<RemoteAgentWorkloadState>) -> impl IntoResponse {
143 state.get_eds_dump_response()
144 }
145}
146
147impl APIHandler for RemoteAgentWorkloadAPIHandler {
148 type State = RemoteAgentWorkloadState;
149
150 fn generate_initial_state(&self) -> Self::State {
151 self.state.clone()
152 }
153
154 fn generate_routes(&self) -> Router<Self::State> {
155 Router::new()
156 .route("/workload/remote_agent/tags/dump", get(Self::tags_dump_handler))
157 .route("/workload/remote_agent/external_data/dump", get(Self::eds_dump_handler))
158 }
159}
160
161struct ExternalDataSerializer<'a> {
162 eds_resolver: &'a ExternalDataStoreResolver,
163}
164
165impl<'a> Serialize for ExternalDataSerializer<'a> {
166 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
167 where
168 S: serde::Serializer,
169 {
170 let mut last_error = None;
171
172 let mut seq = serializer.serialize_seq(None)?;
173 self.eds_resolver.with_latest_snapshot(|ed, cid| {
174 if last_error.is_some() {
175 return;
176 }
177
178 let eds_info = ExternalDataInformation {
179 pod_uid: ed.pod_uid(),
180 container_name: ed.container_name(),
181 init_container: ed.is_init_container(),
182 container_id: cid,
183 };
184
185 if let Err(e) = seq.serialize_element(&eds_info) {
186 last_error = Some(e);
187 }
188 });
189
190 if let Some(e) = last_error {
191 return Err(e);
192 }
193
194 seq.end()
195 }
196}