saluki_core/topology/
running.rs

1use std::{
2    collections::HashMap,
3    time::{Duration, Instant},
4};
5
6use saluki_error::{generic_error, GenericError};
7use tokio::{
8    pin, select,
9    task::{Id, JoinError, JoinSet},
10    time::{interval, sleep},
11};
12use tracing::{debug, error, info, warn};
13
14use super::{shutdown::ComponentShutdownCoordinator, ComponentId};
15
16/// A running topology.
17pub struct RunningTopology {
18    shutdown_coordinator: ComponentShutdownCoordinator,
19    component_tasks: JoinSet<Result<(), GenericError>>,
20    component_task_map: HashMap<Id, ComponentId>,
21}
22
23impl RunningTopology {
24    /// Creates a new `RunningTopology`.
25    pub(super) fn from_parts(
26        shutdown_coordinator: ComponentShutdownCoordinator, component_tasks: JoinSet<Result<(), GenericError>>,
27        component_task_map: HashMap<Id, ComponentId>,
28    ) -> Self {
29        Self {
30            shutdown_coordinator,
31            component_tasks,
32            component_task_map,
33        }
34    }
35
36    /// Waits for any spawned component to unexpectedly finish.
37    ///
38    /// This can be used in a loop to determine if any component has unexpectedly finished before shutdown was
39    /// initiated, which usually represents an error or bug with the component.
40    pub async fn wait_for_unexpected_finish(&mut self) {
41        let task_result = self
42            .component_tasks
43            .join_next_with_id()
44            .await
45            .expect("no components to wait for");
46
47        // We call `handle_task_result` to log everything the same as when calling `shutdown`/`shutdown_with_timeout`,
48        // with an "unexpected" flag to indicate that this should be considered an unexpected finish if it did happen to
49        // finish "successfully", which adjusts the logging accordingly.
50        handle_task_result(&mut self.component_task_map, task_result, true);
51    }
52
53    /// Triggers the topology to shutdown, waiting until all components have stopped.
54    ///
55    /// This will wait indefinitely for all components to stop. If graceful shutdown with an upper bound is desired, use
56    /// [`shutdown_with_timeout`][Self::shutdown_with_timeout] instead.
57    ///
58    /// # Errors
59    ///
60    /// If the topology fails to shutdown cleanly due to an error in a component, an error will be returned.
61    pub async fn shutdown(self) -> Result<(), GenericError> {
62        self.shutdown_with_timeout(Duration::MAX).await
63    }
64
65    /// Triggers the topology to shutdown, waiting until all components have stopped or the timeout has elapsed.
66    ///
67    /// # Errors
68    ///
69    /// If the topology fails to shutdown cleanly, either due to an error in a component or the timeout being reached,
70    /// an error will be returned.
71    pub async fn shutdown_with_timeout(mut self, timeout: Duration) -> Result<(), GenericError> {
72        let shutdown_deadline = Instant::now() + timeout;
73
74        let shutdown_timeout = sleep(timeout);
75        pin!(shutdown_timeout);
76
77        let mut progress_interval = interval(Duration::from_secs(5));
78        progress_interval.tick().await;
79
80        // Trigger shutdown of sources, which will then cascade to the downstream components connected to those sources,
81        // eventually leading to all components shutting down.
82        self.shutdown_coordinator.shutdown();
83
84        let mut stopped_cleanly = true;
85
86        loop {
87            select! {
88                // A task finished.
89                maybe_task_result = self.component_tasks.join_next_with_id() => match maybe_task_result {
90                    None => {
91                        info!("All components stopped.");
92                        break;
93                    },
94                    Some(component_result) => if !handle_task_result(&mut self.component_task_map, component_result, false) {
95                        stopped_cleanly = false;
96                    },
97                },
98
99                // Emit some information about which components we're still waiting on to shutdown.
100                _ = progress_interval.tick() => {
101                    let mut remaining_components = self.component_task_map.values()
102                        .map(|id| id.to_string())
103                        .collect::<Vec<_>>();
104                    remaining_components.sort();
105                    let remaining_time = shutdown_deadline.saturating_duration_since(Instant::now());
106
107                    info!("Waiting for the remaining component(s) to stop: {}. {} seconds remaining.", remaining_components.join(", "), remaining_time.as_secs_f64().round() as u64);
108                },
109
110                // Shutdown timeout was reached.
111                _ = &mut shutdown_timeout => {
112                    warn!("Forcefully stopping topology after shutdown grace period.");
113                    stopped_cleanly = false;
114                    break;
115                },
116            }
117        }
118
119        if stopped_cleanly {
120            Ok(())
121        } else {
122            Err(generic_error!("Topology failed to shutdown cleanly."))
123        }
124    }
125}
126
127/// Handles the result of a component task finishing, logging the result and removing the task from our map of running components.
128///
129/// If the task stopped successfully, `true` is returned.
130fn handle_task_result(
131    component_task_map: &mut HashMap<Id, ComponentId>, task_result: Result<(Id, Result<(), GenericError>), JoinError>,
132    unexpected: bool,
133) -> bool {
134    let (task_id, stopped_successfully) = match task_result {
135        Ok((id, component_result)) => {
136            let component_id = component_task_map.get(&id).expect("component ID not found");
137            match component_result {
138                Ok(()) => {
139                    if unexpected {
140                        warn!(%component_id, "Component unexpectedly finished.");
141                    } else {
142                        debug!(%component_id, "Component stopped.");
143                    }
144                    (id, true)
145                }
146                Err(e) => {
147                    error!(%component_id, error = %e, "Component stopped with error.");
148                    (id, false)
149                }
150            }
151        }
152        Err(e) => {
153            let id = e.id();
154            let component_id = component_task_map.get(&id).expect("component ID not found");
155            error!(%component_id, error = %e, "Component task failed unexpectedly.");
156            (id, false)
157        }
158    };
159
160    component_task_map.remove(&task_id);
161    stopped_successfully
162}