saluki_env/workload/providers/remote_agent/
api.rs

1use saluki_api::{
2    extract::State,
3    response::IntoResponse,
4    routing::{get, Router},
5    APIHandler,
6};
7use saluki_common::collections::{FastHashMap, FastHashSet};
8use saluki_context::{
9    origin::OriginTagCardinality,
10    tags::{SharedTagSet, TagSet},
11};
12use serde::{ser::SerializeSeq as _, Serialize};
13
14use crate::workload::{
15    entity::HighestPrecedenceEntityIdRef,
16    stores::{ExternalDataStoreResolver, TagStoreQuerier},
17    EntityId,
18};
19
20#[derive(Serialize)]
21struct EntityTags {
22    low_cardinality: SharedTagSet,
23    orchestrator_cardinality: SharedTagSet,
24    high_cardinality: SharedTagSet,
25}
26
27#[derive(Serialize)]
28struct EntityInformation<'a> {
29    entity_id: &'a EntityId,
30
31    #[serde(skip_serializing_if = "Option::is_none")]
32    alias: Option<&'a EntityId>,
33
34    tags: EntityTags,
35}
36
37#[derive(Serialize)]
38struct ExternalDataInformation<'a> {
39    pod_uid: &'a str,
40    container_name: &'a str,
41    init_container: bool,
42    container_id: &'a EntityId,
43}
44
45/// State used for the Remote Agent workload API handler.
46#[derive(Clone)]
47pub struct RemoteAgentWorkloadState {
48    tag_querier: TagStoreQuerier,
49    eds_resolver: ExternalDataStoreResolver,
50}
51
52impl RemoteAgentWorkloadState {
53    fn get_tags_dump_response(&self) -> String {
54        let mut active_entities = FastHashSet::default();
55        let mut entity_aliases = FastHashMap::default();
56        let mut entity_info_map = FastHashMap::default();
57        let empty_tagset = TagSet::default().into_shared();
58
59        // First, collect a list of all entities presently in the tag store, and then also go through and collect the
60        // entity mappings for each entity.
61        self.tag_querier.visit_active_entities(|entity_id| {
62            active_entities.insert(entity_id.clone());
63        });
64
65        self.tag_querier.visit_entity_aliases(|entity_id, target_entity_id| {
66            active_entities.insert(entity_id.clone());
67            entity_aliases.insert(entity_id.clone(), target_entity_id.clone());
68        });
69
70        // For each entity, build its information.
71        for entity_id in active_entities.iter() {
72            let alias = entity_aliases.get(entity_id);
73            let low_cardinality_tags = self
74                .tag_querier
75                .get_exact_entity_tags(entity_id, OriginTagCardinality::Low)
76                .unwrap_or_else(|| empty_tagset.clone());
77            let orchestrator_cardinality_tags = self
78                .tag_querier
79                .get_exact_entity_tags(entity_id, OriginTagCardinality::Orchestrator)
80                .unwrap_or_else(|| empty_tagset.clone());
81            let high_cardinality_tags = self
82                .tag_querier
83                .get_exact_entity_tags(entity_id, OriginTagCardinality::High)
84                .unwrap_or_else(|| empty_tagset.clone());
85
86            let tags = EntityTags {
87                low_cardinality: low_cardinality_tags,
88                orchestrator_cardinality: orchestrator_cardinality_tags,
89                high_cardinality: high_cardinality_tags,
90            };
91
92            entity_info_map.insert(entity_id, EntityInformation { entity_id, alias, tags });
93        }
94
95        // Collapse the entity information map into sorted vector of entity information, which is sorted in precedence
96        // order of the entity ID.
97        let mut entity_info = entity_info_map.into_values().collect::<Vec<_>>();
98        entity_info.sort_by_cached_key(|entity_info| HighestPrecedenceEntityIdRef::from(entity_info.entity_id));
99
100        serde_json::to_string(&entity_info).unwrap()
101    }
102
103    fn get_eds_dump_response(&self) -> String {
104        let eds_serializer = ExternalDataSerializer {
105            eds_resolver: &self.eds_resolver,
106        };
107        serde_json::to_string(&eds_serializer).unwrap()
108    }
109}
110
111/// An API handler for interacting with the underlying data stores that comprise the Remote Agent workload provider.
112///
113/// This handler registers a number of routes that allow for introspecting the state of the underlying data stores to
114/// understand exactly what entities are being tracked and what tags are associated with them.
115///
116/// # Routes
117///
118/// ## GET `/workload/remote_agent/tags/dump`
119///
120/// This route will dump the contents of the associated tag store in a human-readable form.
121///
122/// All entities present in the tag store will be listed, along with their ancestry chain and the tags associated with
123/// the entity at each tag cardinality level. Entities are sorted in the output from highest to lowest precedence.
124pub struct RemoteAgentWorkloadAPIHandler {
125    state: RemoteAgentWorkloadState,
126}
127
128impl RemoteAgentWorkloadAPIHandler {
129    pub(crate) fn from_state(tag_querier: TagStoreQuerier, eds_resolver: ExternalDataStoreResolver) -> Self {
130        Self {
131            state: RemoteAgentWorkloadState {
132                tag_querier,
133                eds_resolver,
134            },
135        }
136    }
137
138    async fn tags_dump_handler(State(state): State<RemoteAgentWorkloadState>) -> impl IntoResponse {
139        state.get_tags_dump_response()
140    }
141
142    async fn eds_dump_handler(State(state): State<RemoteAgentWorkloadState>) -> impl IntoResponse {
143        state.get_eds_dump_response()
144    }
145}
146
147impl APIHandler for RemoteAgentWorkloadAPIHandler {
148    type State = RemoteAgentWorkloadState;
149
150    fn generate_initial_state(&self) -> Self::State {
151        self.state.clone()
152    }
153
154    fn generate_routes(&self) -> Router<Self::State> {
155        Router::new()
156            .route("/workload/remote_agent/tags/dump", get(Self::tags_dump_handler))
157            .route("/workload/remote_agent/external_data/dump", get(Self::eds_dump_handler))
158    }
159}
160
161struct ExternalDataSerializer<'a> {
162    eds_resolver: &'a ExternalDataStoreResolver,
163}
164
165impl<'a> Serialize for ExternalDataSerializer<'a> {
166    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
167    where
168        S: serde::Serializer,
169    {
170        let mut last_error = None;
171
172        let mut seq = serializer.serialize_seq(None)?;
173        self.eds_resolver.with_latest_snapshot(|ed, cid| {
174            if last_error.is_some() {
175                return;
176            }
177
178            let eds_info = ExternalDataInformation {
179                pod_uid: ed.pod_uid(),
180                container_name: ed.container_name(),
181                init_container: ed.is_init_container(),
182                container_id: cid,
183            };
184
185            if let Err(e) = seq.serialize_element(&eds_info) {
186                last_error = Some(e);
187            }
188        });
189
190        if let Some(e) = last_error {
191            return Err(e);
192        }
193
194        seq.end()
195    }
196}