Skip to main content

saluki_env/workload/collectors/
containerd.rs

1use std::time::Duration;
2
3use async_stream::stream;
4use async_trait::async_trait;
5use containerd_protos::services::namespaces::v1::Namespace;
6use futures::{stream::select_all, Stream, StreamExt as _};
7use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
8use saluki_config::GenericConfiguration;
9use saluki_core::health::Health;
10use saluki_error::GenericError;
11use saluki_metrics::static_metrics;
12use stringtheory::interning::{GenericMapInterner, Interner as _};
13use tokio::{select, sync::mpsc, time::sleep};
14use tracing::{debug, error, warn};
15
16use super::MetadataCollector;
17use crate::workload::{
18    entity::EntityId,
19    helpers::containerd::{
20        events::{ContainerdEvent, ContainerdTopic},
21        ContainerdClient,
22    },
23    metadata::MetadataOperation,
24};
25
26static CONTAINERD_WATCH_EVENTS: &[ContainerdTopic] = &[ContainerdTopic::TaskStarted, ContainerdTopic::TaskDeleted];
27
28static_metrics!(
29   name => Telemetry,
30   prefix => containerd_metadata_collector,
31   labels => [namespace: String],
32   metrics => [
33       counter(rpc_errors_total),
34       counter(intern_failed_total),
35       counter(events_task_started_total),
36       counter(events_task_deleted_total),
37   ],
38);
39
40/// A metadata collector that watches for updates from containerd.
41pub struct ContainerdMetadataCollector {
42    client: ContainerdClient,
43    watched_namespaces: Vec<Namespace>,
44    tag_interner: GenericMapInterner,
45    health: Health,
46}
47
48impl ContainerdMetadataCollector {
49    /// Creates a new `ContainerdMetadataCollector` from the given configuration.
50    ///
51    /// # Errors
52    ///
53    /// If the containerd gRPC client can't be created, or listing the namespaces in the containerd runtime fails, an
54    /// error will be returned.
55    pub async fn from_configuration(
56        config: &GenericConfiguration, health: Health, tag_interner: GenericMapInterner,
57    ) -> Result<Self, GenericError> {
58        let client = ContainerdClient::from_configuration(config).await?;
59        let watched_namespaces = client.list_namespaces().await?;
60
61        Ok(Self {
62            client,
63            watched_namespaces,
64            tag_interner,
65            health,
66        })
67    }
68}
69
70#[async_trait]
71impl MetadataCollector for ContainerdMetadataCollector {
72    fn name(&self) -> &'static str {
73        "containerd"
74    }
75
76    async fn watch(&mut self, operations_tx: &mut mpsc::Sender<MetadataOperation>) -> Result<(), GenericError> {
77        self.health.mark_ready();
78
79        // Create a watcher for each namespace, and then join all of their watch streams, which then we'll just funnel
80        // back to the operations channel.
81        let watchers = self
82            .watched_namespaces
83            .iter()
84            .map(|ns| NamespaceWatcher::new(self.client.clone(), ns.clone(), self.tag_interner.clone()).watch());
85
86        let mut operations_stream = select_all(watchers);
87
88        loop {
89            select! {
90                _ = self.health.live() => {},
91                maybe_operation = operations_stream.next() => match maybe_operation {
92                    Some(operation) => {
93                        operations_tx.send(operation).await?;
94                    },
95                    None => break,
96                },
97            }
98        }
99
100        self.health.mark_not_ready();
101
102        Ok(())
103    }
104}
105
106impl MemoryBounds for ContainerdMetadataCollector {
107    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
108        // TODO: Kind of a throwaway calculation because nothing about the gRPC client can really be bounded at the
109        // moment, and we also don't have any way to know the number of namespaces we'll be monitoring a priori.
110        builder
111            .firm()
112            .with_fixed_amount("self struct", std::mem::size_of::<Self>());
113    }
114}
115
116struct NamespaceWatcher {
117    namespace: Namespace,
118    client: ContainerdClient,
119    tag_interner: GenericMapInterner,
120    telemetry: Telemetry,
121}
122
123impl NamespaceWatcher {
124    fn new(client: ContainerdClient, namespace: Namespace, tag_interner: GenericMapInterner) -> Self {
125        let telemetry = Telemetry::new(namespace.name.clone());
126        Self {
127            client,
128            namespace,
129            tag_interner,
130            telemetry,
131        }
132    }
133
134    async fn process_event(&self, event: ContainerdEvent) -> Option<MetadataOperation> {
135        match event {
136            ContainerdEvent::TaskStarted { id, pid } => {
137                self.telemetry.events_task_started_total().increment(1);
138                let pid_entity_id = EntityId::ContainerPid(pid);
139                let container_entity_id = EntityId::Container(id);
140                Some(MetadataOperation::add_alias(pid_entity_id, container_entity_id))
141            }
142            ContainerdEvent::TaskDeleted { pid, .. } => {
143                self.telemetry.events_task_deleted_total().increment(1);
144                Some(MetadataOperation::delete(EntityId::ContainerPid(pid)))
145            }
146        }
147    }
148
149    async fn build_initial_metadata_operations(&self) -> Option<Vec<MetadataOperation>> {
150        let mut operations = Vec::new();
151
152        // Get a list of all containers in the namespace.
153        let containers = match self.client.list_containers(&self.namespace).await {
154            Ok(containers) => containers,
155            Err(e) => {
156                self.telemetry.rpc_errors_total().increment(1);
157                error!(namespace = self.namespace.name, error = %e, "Error listing containers.");
158                return None;
159            }
160        };
161
162        for container in containers {
163            let pids = match self
164                .client
165                .list_pids_for_container(&self.namespace, container.id.clone())
166                .await
167            {
168                Ok(pids) => pids,
169                Err(e) => {
170                    if let Some(status) = e.as_response_error() {
171                        if status.code() == tonic::Code::NotFound {
172                            // The container may have been deleted before we could get the PIDs for it, so we'll just
173                            // skip it without making a fuss.
174                            continue;
175                        }
176                    }
177
178                    self.telemetry.rpc_errors_total().increment(1);
179                    error!(namespace = self.namespace.name, container_id = container.id, error = %e, "Error getting PIDs for container.");
180                    continue;
181                }
182            };
183
184            for pid in pids {
185                let pid_entity_id = EntityId::ContainerPid(pid);
186
187                match self.tag_interner.try_intern(container.id.as_str()) {
188                    Some(container_id) => {
189                        let container_entity_id = EntityId::Container(container_id.into());
190                        operations.push(MetadataOperation::add_alias(pid_entity_id, container_entity_id));
191                    }
192                    None => {
193                        self.telemetry.intern_failed_total().increment(1);
194                        warn!(
195                            namespace = self.namespace.name,
196                            container_id = container.id,
197                            container_task_pid = pid,
198                            "Failed to intern container ID. Container ID/task PID link will not be created."
199                        );
200                    }
201                }
202            }
203        }
204
205        Some(operations)
206    }
207
208    fn watch(self) -> impl Stream<Item = MetadataOperation> + Unpin {
209        debug!(
210            namespace = self.namespace.name,
211            "Starting containerd namespace watcher."
212        );
213
214        // We watch the given namespace for all of the relevant events, and convert those into metadata operations that
215        // we pass back to be collected by the parent watcher task, which then forwards them to the metadata aggregator.
216        Box::pin(stream! {
217            // Do an initial scan of the namespace to get all of the existing containers, their tasks and images, and
218            // so on, and generate metadata operations from that as a way to prime the store.
219            if let Some(initial_operations) = self.build_initial_metadata_operations().await {
220                for operation in initial_operations {
221                    yield operation;
222                }
223            }
224
225            // Now watch for events.
226            loop {
227                // TODO: We should be creating this stream -- and polling it! -- before we build our initial metadata
228                // operations in order to ensure that we have an overlap between new events and the initial scan.
229                let mut event_stream = match self.client.watch_events(CONTAINERD_WATCH_EVENTS, &self.namespace).await {
230                    Ok(stream) => stream,
231                    Err(e) => {
232                        self.telemetry.rpc_errors_total().increment(1);
233                        error!(namespace = self.namespace.name, error = %e, "Error watching container events.");
234
235                        sleep(Duration::from_secs(1)).await;
236                        continue;
237                    },
238                };
239
240                while let Some(event_result) = event_stream.next().await {
241                    let event = match event_result {
242                        Ok(event) => event,
243                        Err(e) => {
244                            error!(namespace = self.namespace.name, error = %e, "Error watching container events.");
245                            continue;
246                        },
247                    };
248
249                    if let Some(operation) = self.process_event(event).await {
250                        yield operation;
251                    }
252                }
253            }
254        })
255    }
256}