saluki_env/workload/collectors/
containerd.rs1use std::time::Duration;
2
3use async_stream::stream;
4use async_trait::async_trait;
5use containerd_protos::services::namespaces::v1::Namespace;
6use futures::{stream::select_all, Stream, StreamExt as _};
7use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
8use saluki_config::GenericConfiguration;
9use saluki_core::health::Health;
10use saluki_error::GenericError;
11use saluki_metrics::static_metrics;
12use stringtheory::interning::{GenericMapInterner, Interner as _};
13use tokio::{select, sync::mpsc, time::sleep};
14use tracing::{debug, error, warn};
15
16use super::MetadataCollector;
17use crate::workload::{
18 entity::EntityId,
19 helpers::containerd::{
20 events::{ContainerdEvent, ContainerdTopic},
21 ContainerdClient,
22 },
23 metadata::MetadataOperation,
24};
25
26static CONTAINERD_WATCH_EVENTS: &[ContainerdTopic] = &[ContainerdTopic::TaskStarted, ContainerdTopic::TaskDeleted];
27
28static_metrics!(
29 name => Telemetry,
30 prefix => containerd_metadata_collector,
31 labels => [namespace: String],
32 metrics => [
33 counter(rpc_errors_total),
34 counter(intern_failed_total),
35 counter(events_task_started_total),
36 counter(events_task_deleted_total),
37 ],
38);
39
40pub struct ContainerdMetadataCollector {
42 client: ContainerdClient,
43 watched_namespaces: Vec<Namespace>,
44 tag_interner: GenericMapInterner,
45 health: Health,
46}
47
48impl ContainerdMetadataCollector {
49 pub async fn from_configuration(
56 config: &GenericConfiguration, health: Health, tag_interner: GenericMapInterner,
57 ) -> Result<Self, GenericError> {
58 let client = ContainerdClient::from_configuration(config).await?;
59 let watched_namespaces = client.list_namespaces().await?;
60
61 Ok(Self {
62 client,
63 watched_namespaces,
64 tag_interner,
65 health,
66 })
67 }
68}
69
70#[async_trait]
71impl MetadataCollector for ContainerdMetadataCollector {
72 fn name(&self) -> &'static str {
73 "containerd"
74 }
75
76 async fn watch(&mut self, operations_tx: &mut mpsc::Sender<MetadataOperation>) -> Result<(), GenericError> {
77 self.health.mark_ready();
78
79 let watchers = self
82 .watched_namespaces
83 .iter()
84 .map(|ns| NamespaceWatcher::new(self.client.clone(), ns.clone(), self.tag_interner.clone()).watch());
85
86 let mut operations_stream = select_all(watchers);
87
88 loop {
89 select! {
90 _ = self.health.live() => {},
91 maybe_operation = operations_stream.next() => match maybe_operation {
92 Some(operation) => {
93 operations_tx.send(operation).await?;
94 },
95 None => break,
96 },
97 }
98 }
99
100 self.health.mark_not_ready();
101
102 Ok(())
103 }
104}
105
106impl MemoryBounds for ContainerdMetadataCollector {
107 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
108 builder
111 .firm()
112 .with_fixed_amount("self struct", std::mem::size_of::<Self>());
113 }
114}
115
116struct NamespaceWatcher {
117 namespace: Namespace,
118 client: ContainerdClient,
119 tag_interner: GenericMapInterner,
120 telemetry: Telemetry,
121}
122
123impl NamespaceWatcher {
124 fn new(client: ContainerdClient, namespace: Namespace, tag_interner: GenericMapInterner) -> Self {
125 let telemetry = Telemetry::new(namespace.name.clone());
126 Self {
127 client,
128 namespace,
129 tag_interner,
130 telemetry,
131 }
132 }
133
134 async fn process_event(&self, event: ContainerdEvent) -> Option<MetadataOperation> {
135 match event {
136 ContainerdEvent::TaskStarted { id, pid } => {
137 self.telemetry.events_task_started_total().increment(1);
138 let pid_entity_id = EntityId::ContainerPid(pid);
139 let container_entity_id = EntityId::Container(id);
140 Some(MetadataOperation::add_alias(pid_entity_id, container_entity_id))
141 }
142 ContainerdEvent::TaskDeleted { pid, .. } => {
143 self.telemetry.events_task_deleted_total().increment(1);
144 Some(MetadataOperation::delete(EntityId::ContainerPid(pid)))
145 }
146 }
147 }
148
149 async fn build_initial_metadata_operations(&self) -> Option<Vec<MetadataOperation>> {
150 let mut operations = Vec::new();
151
152 let containers = match self.client.list_containers(&self.namespace).await {
154 Ok(containers) => containers,
155 Err(e) => {
156 self.telemetry.rpc_errors_total().increment(1);
157 error!(namespace = self.namespace.name, error = %e, "Error listing containers.");
158 return None;
159 }
160 };
161
162 for container in containers {
163 let pids = match self
164 .client
165 .list_pids_for_container(&self.namespace, container.id.clone())
166 .await
167 {
168 Ok(pids) => pids,
169 Err(e) => {
170 if let Some(status) = e.as_response_error() {
171 if status.code() == tonic::Code::NotFound {
172 continue;
175 }
176 }
177
178 self.telemetry.rpc_errors_total().increment(1);
179 error!(namespace = self.namespace.name, container_id = container.id, error = %e, "Error getting PIDs for container.");
180 continue;
181 }
182 };
183
184 for pid in pids {
185 let pid_entity_id = EntityId::ContainerPid(pid);
186
187 match self.tag_interner.try_intern(container.id.as_str()) {
188 Some(container_id) => {
189 let container_entity_id = EntityId::Container(container_id.into());
190 operations.push(MetadataOperation::add_alias(pid_entity_id, container_entity_id));
191 }
192 None => {
193 self.telemetry.intern_failed_total().increment(1);
194 warn!(
195 namespace = self.namespace.name,
196 container_id = container.id,
197 container_task_pid = pid,
198 "Failed to intern container ID. Container ID/task PID link will not be created."
199 );
200 }
201 }
202 }
203 }
204
205 Some(operations)
206 }
207
208 fn watch(self) -> impl Stream<Item = MetadataOperation> + Unpin {
209 debug!(
210 namespace = self.namespace.name,
211 "Starting containerd namespace watcher."
212 );
213
214 Box::pin(stream! {
217 if let Some(initial_operations) = self.build_initial_metadata_operations().await {
220 for operation in initial_operations {
221 yield operation;
222 }
223 }
224
225 loop {
227 let mut event_stream = match self.client.watch_events(CONTAINERD_WATCH_EVENTS, &self.namespace).await {
230 Ok(stream) => stream,
231 Err(e) => {
232 self.telemetry.rpc_errors_total().increment(1);
233 error!(namespace = self.namespace.name, error = %e, "Error watching container events.");
234
235 sleep(Duration::from_secs(1)).await;
236 continue;
237 },
238 };
239
240 while let Some(event_result) = event_stream.next().await {
241 let event = match event_result {
242 Ok(event) => event,
243 Err(e) => {
244 error!(namespace = self.namespace.name, error = %e, "Error watching container events.");
245 continue;
246 },
247 };
248
249 if let Some(operation) = self.process_event(event).await {
250 yield operation;
251 }
252 }
253 }
254 })
255 }
256}