saluki_core/topology/
running.rs1use std::{
2 collections::HashMap,
3 time::{Duration, Instant},
4};
5
6use saluki_error::{generic_error, GenericError};
7use tokio::{
8 pin, select,
9 task::{Id, JoinError, JoinSet},
10 time::{interval, sleep},
11};
12use tracing::{debug, error, info, warn};
13
14use super::{shutdown::ComponentShutdownCoordinator, ComponentId};
15
16pub struct RunningTopology {
18 shutdown_coordinator: ComponentShutdownCoordinator,
19 component_tasks: JoinSet<Result<(), GenericError>>,
20 component_task_map: HashMap<Id, ComponentId>,
21}
22
23impl RunningTopology {
24 pub(super) fn from_parts(
26 shutdown_coordinator: ComponentShutdownCoordinator, component_tasks: JoinSet<Result<(), GenericError>>,
27 component_task_map: HashMap<Id, ComponentId>,
28 ) -> Self {
29 Self {
30 shutdown_coordinator,
31 component_tasks,
32 component_task_map,
33 }
34 }
35
36 pub async fn wait_for_unexpected_finish(&mut self) {
41 let task_result = self
42 .component_tasks
43 .join_next_with_id()
44 .await
45 .expect("no components to wait for");
46
47 handle_task_result(&mut self.component_task_map, task_result, true);
51 }
52
53 pub async fn shutdown(self) -> Result<(), GenericError> {
62 self.shutdown_with_timeout(Duration::MAX).await
63 }
64
65 pub async fn shutdown_with_timeout(mut self, timeout: Duration) -> Result<(), GenericError> {
72 let shutdown_deadline = Instant::now() + timeout;
73
74 let shutdown_timeout = sleep(timeout);
75 pin!(shutdown_timeout);
76
77 let mut progress_interval = interval(Duration::from_secs(5));
78 progress_interval.tick().await;
79
80 self.shutdown_coordinator.shutdown();
83
84 let mut stopped_cleanly = true;
85
86 loop {
87 select! {
88 maybe_task_result = self.component_tasks.join_next_with_id() => match maybe_task_result {
90 None => {
91 info!("All components stopped.");
92 break;
93 },
94 Some(component_result) => if !handle_task_result(&mut self.component_task_map, component_result, false) {
95 stopped_cleanly = false;
96 },
97 },
98
99 _ = progress_interval.tick() => {
101 let mut remaining_components = self.component_task_map.values()
102 .map(|id| id.to_string())
103 .collect::<Vec<_>>();
104 remaining_components.sort();
105 let remaining_time = shutdown_deadline.saturating_duration_since(Instant::now());
106
107 info!("Waiting for the remaining component(s) to stop: {}. {} seconds remaining.", remaining_components.join(", "), remaining_time.as_secs_f64().round() as u64);
108 },
109
110 _ = &mut shutdown_timeout => {
112 warn!("Forcefully stopping topology after shutdown grace period.");
113 stopped_cleanly = false;
114 break;
115 },
116 }
117 }
118
119 if stopped_cleanly {
120 Ok(())
121 } else {
122 Err(generic_error!("Topology failed to shutdown cleanly."))
123 }
124 }
125}
126
127fn handle_task_result(
131 component_task_map: &mut HashMap<Id, ComponentId>, task_result: Result<(Id, Result<(), GenericError>), JoinError>,
132 unexpected: bool,
133) -> bool {
134 let (task_id, stopped_successfully) = match task_result {
135 Ok((id, component_result)) => {
136 let component_id = component_task_map.get(&id).expect("component ID not found");
137 match component_result {
138 Ok(()) => {
139 if unexpected {
140 warn!(%component_id, "Component unexpectedly finished.");
141 } else {
142 debug!(%component_id, "Component stopped.");
143 }
144 (id, true)
145 }
146 Err(e) => {
147 error!(%component_id, error = %e, "Component stopped with error.");
148 (id, false)
149 }
150 }
151 }
152 Err(e) => {
153 let id = e.id();
154 let component_id = component_task_map.get(&id).expect("component ID not found");
155 error!(%component_id, error = %e, "Component task failed unexpectedly.");
156 (id, false)
157 }
158 };
159
160 component_task_map.remove(&task_id);
161 stopped_successfully
162}