Skip to main content

saluki_core/runtime/
supervisor.rs

1use std::{future::Future, pin::Pin, sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use saluki_common::collections::FastIndexMap;
5use saluki_common::sync::shutdown::{ShutdownCoordinator, ShutdownHandle};
6use saluki_error::GenericError;
7use snafu::{OptionExt as _, Snafu};
8use tokio::{
9    pin, select,
10    task::{AbortHandle, Id, JoinSet},
11};
12use tracing::{debug, error, warn};
13
14use super::{
15    dedicated::{spawn_dedicated_runtime, RuntimeConfiguration, RuntimeMode},
16    restart::{RestartAction, RestartMode, RestartState, RestartStrategy, RestartType},
17};
18use crate::runtime::{
19    process::{Process, ProcessExt as _},
20    state::DataspaceRegistry,
21};
22
23/// A `Future` that represents the execution of a supervised process.
24pub type SupervisorFuture = Pin<Box<dyn Future<Output = Result<(), GenericError>> + Send>>;
25
26/// A `Future` that represents the full lifecycle of a worker, including initialization.
27///
28/// Unlike [`SupervisorFuture`], which only represents the runtime phase, this future first performs async
29/// initialization and then runs the worker. This allows initialization to happen concurrently when multiple workers are
30/// spawned, and keeps the supervisor loop responsive to shutdown signals during initialization.
31type WorkerFuture = Pin<Box<dyn Future<Output = Result<(), WorkerError>> + Send>>;
32
33/// Worker lifecycle errors.
34///
35/// Distinguishes between initialization failures (which shouldn't trigger restart logic) and runtime failures (which
36/// are eligible for restart).
37#[derive(Debug)]
38enum WorkerError {
39    /// The worker failed during async initialization.
40    ///
41    /// The optional `child_name` carries the name of the original failing child when the error originates from a
42    /// nested supervisor. This allows the parent to include it in its own `FailedToInitialize` error for better
43    /// diagnostics across supervision tree levels.
44    Initialization {
45        child_name: Option<String>,
46        source: InitializationError,
47    },
48
49    /// The worker failed during runtime execution.
50    Runtime(GenericError),
51}
52
53impl From<SupervisorError> for WorkerError {
54    fn from(err: SupervisorError) -> Self {
55        match err {
56            // Propagate initialization failures so the parent supervisor does NOT attempt to restart.
57            // Preserve the original child name so the parent can include it in diagnostics.
58            SupervisorError::FailedToInitialize { child_name, source } => WorkerError::Initialization {
59                child_name: Some(child_name),
60                source,
61            },
62            // All other supervisor errors (shutdown, no children, invalid name) are runtime-level.
63            other => WorkerError::Runtime(other.into()),
64        }
65    }
66}
67
68/// Process errors.
69#[derive(Debug, Snafu)]
70pub enum ProcessError {
71    /// The child process was aborted by the supervisor.
72    #[snafu(display("Child process was aborted by the supervisor."))]
73    Aborted,
74
75    /// The child process panicked.
76    #[snafu(display("Child process panicked."))]
77    Panicked,
78
79    /// The child process terminated with an error.
80    #[snafu(display("Child process terminated with an error: {}", source))]
81    Terminated {
82        /// The error that caused the termination.
83        source: GenericError,
84    },
85}
86
87/// Initialization errors.
88///
89/// Initialization errors are distinct from runtime errors: they indicate that a process couldn't be started at all
90/// (for example, failed to bind a port, missing configuration). These errors don't trigger restart logic; instead, they
91/// immediately propagate up and fail the supervisor.
92#[derive(Debug, Snafu)]
93#[snafu(context(suffix(false)))]
94pub enum InitializationError {
95    /// The process couldn't be initialized due to an error.
96    #[snafu(display("Process failed to initialize: {}", source))]
97    Failed {
98        /// The underlying error that caused initialization to fail.
99        source: GenericError,
100    },
101}
102
103impl From<GenericError> for InitializationError {
104    fn from(source: GenericError) -> Self {
105        Self::Failed { source }
106    }
107}
108
109/// Strategy for shutting down a process.
110pub enum ShutdownStrategy {
111    /// Waits for the configured duration for the process to exit, and then forcefully aborts it otherwise.
112    Graceful(Duration),
113
114    /// Forcefully aborts the process without waiting.
115    Brutal,
116}
117
118/// A supervisable process.
119#[async_trait]
120pub trait Supervisable: Send + Sync {
121    /// Returns the name of the process.
122    fn name(&self) -> &str;
123
124    /// Returns the shutdown strategy for the process.
125    fn shutdown_strategy(&self) -> ShutdownStrategy {
126        ShutdownStrategy::Graceful(Duration::from_secs(5))
127    }
128
129    /// Initializes the process asynchronously.
130    ///
131    /// During initialization, any resources or configuration for the process can be created asynchronously, and the
132    /// same runtime that's used for running the process is used for initialization. The resulting future is expected to
133    /// complete as soon as reasonably possible after `shutdown` resolves.
134    ///
135    /// **Important:** The `process_shutdown` signal must be moved into the returned [`SupervisorFuture`] so the worker
136    /// can respond to supervisor-initiated shutdown. If `process_shutdown` is dropped during initialization, the worker
137    /// will be unable to shut down gracefully and will be forcefully aborted after the shutdown timeout.
138    ///
139    /// # Errors
140    ///
141    /// If the process can't be initialized, an error is returned.
142    async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError>;
143}
144
145/// Supervisor errors.
146#[derive(Debug, Snafu)]
147#[snafu(context(suffix(false)))]
148pub enum SupervisorError {
149    /// Supervisor or worker name is invalid.
150    #[snafu(display("Invalid name for supervisor or worker: '{}'", name))]
151    InvalidName {
152        /// The name of the supervisor is invalid.
153        name: String,
154    },
155
156    /// The supervisor has no child processes.
157    #[snafu(display("Supervisor has no child processes."))]
158    NoChildren,
159
160    /// A child process failed to initialize.
161    ///
162    /// This error indicates that a child couldn't complete its async initialization. This is distinct from runtime
163    /// failures and doesn't trigger restart logic.
164    #[snafu(display("Child process '{}' failed to initialize: {}", child_name, source))]
165    FailedToInitialize {
166        /// The name of the child that failed to initialize.
167        child_name: String,
168
169        /// The underlying initialization error.
170        source: InitializationError,
171    },
172
173    /// The supervisor exceeded its restart limits and was forced to shutdown.
174    #[snafu(display("Supervisor has exceeded restart limits and was forced to shutdown."))]
175    Shutdown,
176}
177
178/// A specification for a process to be added to a [`Supervisor`].
179///
180/// A child specification describes how the supervisor should create and manage a child: the underlying future that
181/// represents the process, along with metadata such as its name and shutdown strategy. All processes in a supervisor,
182/// whether a worker or a (nested) supervisor, are represented by a [`ChildSpecification`].
183///
184/// Generally, callers should prefer to use [`add_worker`][Supervisor::add_worker] directly, which can accept either
185/// [`Supervisor`] or any value that implements [`Supervisable`], without needing to explicitly create a
186/// [`ChildSpecification`]. This is preferred as it is more concise but also will ensure that relevant settings are
187/// configured properly for the given worker type, such as using the proper shutdown strategy for supervisors to allow
188/// for complete, graceful shutdown.
189///
190/// If more control is needed, [`ChildSpecification::worker`] can be used to create a specification directly, allowing
191/// access to configuring those more advanced settings. This is currently only valid for worker processes, as
192/// supervisors have no additional user-configurable settings.
193pub struct ChildSpecification<S = WorkerSpec> {
194    spec_inner: S,
195}
196
197/// Child specification state for a worker.
198pub struct WorkerSpec {
199    worker: Arc<dyn Supervisable>,
200    restart_type: RestartType,
201}
202
203/// Child specification state for a supervisor.
204pub struct SupervisorSpec {
205    supervisor: Supervisor,
206}
207
208impl ChildSpecification<WorkerSpec> {
209    /// Creates a specification for the given worker.
210    pub fn worker<T: Supervisable + 'static>(worker: T) -> Self {
211        Self {
212            spec_inner: WorkerSpec {
213                worker: Arc::new(worker),
214                restart_type: RestartType::Permanent,
215            },
216        }
217    }
218
219    /// Sets the restart policy for this worker.
220    ///
221    /// Defaults to [`RestartType::Permanent`].
222    #[must_use]
223    pub fn with_restart_type(mut self, restart_type: RestartType) -> Self {
224        self.spec_inner.restart_type = restart_type;
225        self
226    }
227}
228
229impl<T> From<T> for ChildSpecification<WorkerSpec>
230where
231    T: Supervisable + 'static,
232{
233    fn from(worker: T) -> Self {
234        Self::worker(worker)
235    }
236}
237
238impl From<Supervisor> for ChildSpecification<SupervisorSpec> {
239    fn from(supervisor: Supervisor) -> Self {
240        Self {
241            spec_inner: SupervisorSpec { supervisor },
242        }
243    }
244}
245
246mod sealed {
247    pub trait Sealed {}
248}
249
250impl sealed::Sealed for WorkerSpec {}
251impl sealed::Sealed for SupervisorSpec {}
252
253/// Child specification state.
254///
255/// This trait is implemented only for [`WorkerSpec`] and [`SupervisorSpec`], and is sealed: it cannot be implemented
256/// outside of this crate. It exists so that [`Supervisor::add_worker`] can accept a [`ChildSpecification`] in either
257/// state (as well as bare workers and supervisors) while lowering each into the supervisor's internal representation.
258pub trait ChildState: sealed::Sealed + Sized {
259    #[doc(hidden)]
260    fn register(spec: ChildSpecification<Self>, supervisor: &mut Supervisor);
261}
262
263impl ChildState for WorkerSpec {
264    fn register(spec: ChildSpecification<Self>, supervisor: &mut Supervisor) {
265        supervisor.push_child(ChildEntry {
266            child: SupervisedChild::Worker(spec.spec_inner.worker),
267            restart: spec.spec_inner.restart_type,
268        });
269    }
270}
271
272impl ChildState for SupervisorSpec {
273    fn register(spec: ChildSpecification<Self>, supervisor: &mut Supervisor) {
274        supervisor.push_child(ChildEntry {
275            child: SupervisedChild::Supervisor(spec.spec_inner.supervisor),
276            restart: RestartType::Permanent,
277        });
278    }
279}
280
281/// The type-erased, runnable form of a child: either a worker or a nested supervisor.
282///
283/// This carries the behavior shared by both kinds of child -- creating the process and worker future, naming, and
284/// shutdown strategy. Public [`ChildSpecification`]s are lowered into this type when registered via
285/// [`ChildState::register`].
286enum SupervisedChild {
287    Worker(Arc<dyn Supervisable>),
288    Supervisor(Supervisor),
289}
290
291impl SupervisedChild {
292    fn process_type(&self) -> &'static str {
293        match self {
294            Self::Worker(_) => "worker",
295            Self::Supervisor(_) => "supervisor",
296        }
297    }
298
299    fn name(&self) -> &str {
300        match self {
301            Self::Worker(worker) => worker.name(),
302            Self::Supervisor(supervisor) => &supervisor.supervisor_id,
303        }
304    }
305
306    fn shutdown_strategy(&self) -> ShutdownStrategy {
307        match self {
308            Self::Worker(worker) => worker.shutdown_strategy(),
309
310            // Supervisors should always be given as much time as necessary shutdown down gracefully to ensure that the
311            // entire supervision subtree can be shutdown cleanly.
312            Self::Supervisor(_) => ShutdownStrategy::Graceful(Duration::MAX),
313        }
314    }
315
316    fn create_process(&self, parent_process: &Process) -> Result<Process, SupervisorError> {
317        match self {
318            Self::Worker(worker) => Process::worker(worker.name(), parent_process).context(InvalidName {
319                name: worker.name().to_string(),
320            }),
321            Self::Supervisor(sup) => {
322                Process::supervisor(&sup.supervisor_id, Some(parent_process)).context(InvalidName {
323                    name: sup.supervisor_id.to_string(),
324                })
325            }
326        }
327    }
328
329    fn create_worker_future(
330        &self, process: Process, process_shutdown: ShutdownHandle,
331    ) -> Result<WorkerFuture, SupervisorError> {
332        match self {
333            Self::Worker(worker) => {
334                let worker = Arc::clone(worker);
335                Ok(Box::pin(async move {
336                    let run_future =
337                        worker
338                            .initialize(process_shutdown)
339                            .await
340                            .map_err(|source| WorkerError::Initialization {
341                                child_name: None,
342                                source,
343                            })?;
344                    run_future.await.map_err(WorkerError::Runtime)
345                }))
346            }
347            Self::Supervisor(sup) => {
348                match sup.runtime_mode() {
349                    RuntimeMode::Ambient => {
350                        // Run on the parent's ambient runtime.
351                        Ok(sup.as_nested_process(process, process_shutdown))
352                    }
353                    RuntimeMode::Dedicated(config) => {
354                        // Spawn in a dedicated runtime on a new OS thread, passing the parent's
355                        // dataspace so the nested supervisor inherits it across the thread boundary.
356                        let child_name = sup.supervisor_id.to_string();
357                        let dataspace = process.dataspace().clone();
358                        let handle =
359                            spawn_dedicated_runtime(sup.inner_clone(), config.clone(), process_shutdown, dataspace)
360                                .map_err(|e| SupervisorError::FailedToInitialize {
361                                    child_name,
362                                    source: e.into(),
363                                })?;
364
365                        Ok(Box::pin(async move { handle.await.map_err(WorkerError::from) }))
366                    }
367                }
368            }
369        }
370    }
371}
372
373impl Clone for SupervisedChild {
374    fn clone(&self) -> Self {
375        match self {
376            Self::Worker(worker) => Self::Worker(Arc::clone(worker)),
377            Self::Supervisor(supervisor) => Self::Supervisor(supervisor.inner_clone()),
378        }
379    }
380}
381
382/// A registered child: its specification together with the restart policy chosen at registration time.
383#[derive(Clone)]
384struct ChildEntry {
385    child: SupervisedChild,
386    restart: RestartType,
387}
388
389/// Supervises a set of workers.
390///
391/// # Workers
392///
393/// All workers are defined through implementation of the [`Supervisable`] trait, which provides the logic for both
394/// creating the underlying worker future that's spawned, as well as other metadata, such as the worker's name, how the
395/// worker should be shutdown, and so on.
396///
397/// Supervisors can themselves be supervised by other supervisors, allowing _supervision trees_ to be constructed by
398/// adding one supervisor as a child of another.
399///
400/// # Instrumentation
401///
402/// Supervisors automatically create their own allocation group
403/// ([`TrackingAllocator`][resource_accounting::TrackingAllocator]), which is used to track both the memory usage of the
404/// supervisor itself and its children. Additionally, individual worker processes are wrapped in a dedicated
405/// [`tracing::Span`] to allow tracing the causal relationship between arbitrary code and the worker executing it.
406///
407/// # Restart Strategies
408///
409/// As the main purpose of a supervisor, restart behavior is fully configurable. A number of restart strategies are
410/// available, which generally relate to the purpose of the supervisor: whether the workers being managed are
411/// independent or interdependent.
412///
413/// All restart strategies are configured through [`RestartStrategy`], which has more information on the available
414/// strategies and configuration settings.
415pub struct Supervisor {
416    supervisor_id: Arc<str>,
417    child_specs: Vec<ChildEntry>,
418    restart_strategy: RestartStrategy,
419    runtime_mode: RuntimeMode,
420}
421
422impl Supervisor {
423    /// Creates an empty `Supervisor` with the default restart strategy.
424    pub fn new<S: AsRef<str>>(supervisor_id: S) -> Result<Self, SupervisorError> {
425        // We try to throw an error about invalid names as early as possible. This is a manual check, so we might still
426        // encounter an error later when actually running the supervisor, but this is a good first step to catch the
427        // bulk of invalid names.
428        if supervisor_id.as_ref().is_empty() {
429            return Err(SupervisorError::InvalidName {
430                name: supervisor_id.as_ref().to_string(),
431            });
432        }
433
434        Ok(Self {
435            supervisor_id: supervisor_id.as_ref().into(),
436            child_specs: Vec::new(),
437            restart_strategy: RestartStrategy::default(),
438            runtime_mode: RuntimeMode::default(),
439        })
440    }
441
442    /// Returns the supervisor's ID.
443    pub fn id(&self) -> &str {
444        &self.supervisor_id
445    }
446
447    /// Sets the restart strategy for the supervisor.
448    pub fn with_restart_strategy(mut self, strategy: RestartStrategy) -> Self {
449        self.restart_strategy = strategy;
450        self
451    }
452
453    /// Configures this supervisor to run in a dedicated runtime.
454    ///
455    /// When this supervisor is added as a child to another supervisor, it will spawn its own OS threads and Tokio
456    /// runtime instead of running on the parent's ambient runtime.
457    ///
458    /// This provides runtime isolation, which can be useful for:
459    /// - CPU-bound work that shouldn't block the parent's runtime
460    /// - Isolating failures in one part of the system
461    /// - Using different runtime configurations (for example, single-threaded vs multi-threaded)
462    pub fn with_dedicated_runtime(mut self, config: RuntimeConfiguration) -> Self {
463        self.runtime_mode = RuntimeMode::Dedicated(config);
464        self
465    }
466
467    /// Returns the runtime mode for this supervisor.
468    pub(crate) fn runtime_mode(&self) -> &RuntimeMode {
469        &self.runtime_mode
470    }
471
472    /// Adds a worker (or nested supervisor) to the supervisor.
473    ///
474    /// A worker can be anything that implements the [`Supervisable`] trait. A [`Supervisor`] can also be added as a
475    /// worker and managed in a nested fashion, known as a supervision tree.
476    ///
477    /// See [`ChildSpecification`] for more details on how workers are represented internally and what options are
478    /// available to configure.
479    pub fn add_worker<S, T>(&mut self, child: T)
480    where
481        S: ChildState,
482        T: Into<ChildSpecification<S>>,
483    {
484        S::register(child.into(), self);
485    }
486
487    fn push_child(&mut self, entry: ChildEntry) {
488        debug!(
489            supervisor_id = %self.supervisor_id,
490            "Adding new static child process #{}. ({}, {}, {:?})",
491            self.child_specs.len(),
492            entry.child.process_type(),
493            entry.child.name(),
494            entry.restart,
495        );
496        self.child_specs.push(entry);
497    }
498
499    fn get_child_spec(&self, child_spec_idx: usize) -> &SupervisedChild {
500        match self.child_specs.get(child_spec_idx) {
501            Some(entry) => &entry.child,
502            None => unreachable!("child spec index should never be out of bounds"),
503        }
504    }
505
506    fn get_restart_type(&self, child_spec_idx: usize) -> RestartType {
507        match self.child_specs.get(child_spec_idx) {
508            Some(entry) => entry.restart,
509            None => unreachable!("child spec index should never be out of bounds"),
510        }
511    }
512
513    fn spawn_child(&self, child_spec_idx: usize, worker_state: &mut WorkerState) -> Result<(), SupervisorError> {
514        let child_spec = self.get_child_spec(child_spec_idx);
515        debug!(supervisor_id = %self.supervisor_id, "Spawning static child process #{} ({}).", child_spec_idx, child_spec.name());
516        worker_state.add_worker(child_spec_idx, child_spec)
517    }
518
519    fn spawn_all_children(&self, worker_state: &mut WorkerState) -> Result<(), SupervisorError> {
520        debug!(supervisor_id = %self.supervisor_id, "Spawning all static child processes.");
521        for child_spec_idx in 0..self.child_specs.len() {
522            self.spawn_child(child_spec_idx, worker_state)?;
523        }
524
525        Ok(())
526    }
527
528    /// Respawns children after a one-for-all restart, honoring each child's [`RestartType`].
529    ///
530    /// Every child except [`RestartType::Temporary`] is restarted, matching Erlang/OTP: a group restart restarts all
531    /// permanent and transient children -- regardless of how they last exited, including a transient child that had
532    /// already exited cleanly -- but never temporary children, which are shut down with the group and not brought back.
533    /// A transient child's "restart only on abnormal exit" rule governs its _own_ termination, not a group restart
534    /// driven by a sibling.
535    fn respawn_children_one_for_all(&self, worker_state: &mut WorkerState) -> Result<(), SupervisorError> {
536        debug!(supervisor_id = %self.supervisor_id, "Restarting all eligible static child processes.");
537        for child_spec_idx in 0..self.child_specs.len() {
538            if self.get_restart_type(child_spec_idx) != RestartType::Temporary {
539                self.spawn_child(child_spec_idx, worker_state)?;
540            }
541        }
542
543        Ok(())
544    }
545
546    async fn run_inner(&self, process: Process, process_shutdown: ShutdownHandle) -> Result<(), SupervisorError> {
547        if self.child_specs.is_empty() {
548            return Err(SupervisorError::NoChildren);
549        }
550
551        let mut restart_state = RestartState::new(self.restart_strategy);
552        let mut worker_state = WorkerState::new(process);
553
554        // Spawn all child processes. Since initialization is folded into each worker's task, this returns immediately
555        // after spawning -- children initialize concurrently in the background.
556        self.spawn_all_children(&mut worker_state)?;
557
558        // Now we supervise.
559        pin!(process_shutdown);
560
561        loop {
562            select! {
563                // Shutdown has been triggered.
564                //
565                // Propagate shutdown to all child processes and wait for them to exit.
566                _ = &mut process_shutdown => {
567                    debug!(supervisor_id = %self.supervisor_id, "Shutdown triggered, shutting down all child processes.");
568                    worker_state.shutdown_workers().await;
569                    break;
570                },
571                (child_spec_idx, worker_result) = worker_state.wait_for_next_worker() => {
572                    let child_spec = self.get_child_spec(child_spec_idx);
573
574                    // Initialization failures are not eligible for restart -- they propagate immediately.
575                    if let Err(WorkerError::Initialization { child_name, source }) = worker_result {
576                        // If the error came from a nested supervisor, include the original child name
577                        // to make the error chain more informative (e.g., "ctrl-pln/privileged-api").
578                        let full_name = match child_name {
579                            Some(inner) => format!("{}/{}", child_spec.name(), inner),
580                            None => child_spec.name().to_string(),
581                        };
582
583                        error!(supervisor_id = %self.supervisor_id, worker_name = full_name, "Child process failed to initialize: {}", source);
584                        worker_state.shutdown_workers().await;
585                        return Err(SupervisorError::FailedToInitialize {
586                            child_name: full_name,
587                            source,
588                        });
589                    }
590
591                    // A worker exited abnormally if it returned an error, panicked, or was aborted; a clean exit is
592                    // `Ok(())`. Together with the worker's restart policy, this determines whether we restart it.
593                    let abnormal = worker_result.is_err();
594                    let restart_type = self.get_restart_type(child_spec_idx);
595
596                    // Convert the worker result to a process error for restart evaluation / logging.
597                    let worker_result = worker_result.map_err(|e| match e {
598                        WorkerError::Runtime(e) => ProcessError::Terminated { source: e },
599                        WorkerError::Initialization { .. } => unreachable!("handled above"),
600                    });
601
602                    if !restart_type.should_restart(abnormal) {
603                        // The worker isn't eligible for restart given how it exited. It has already been removed from the
604                        // worker map by `wait_for_next_worker`, so we simply continue supervising the rest. Crucially, we
605                        // do NOT consult `evaluate_restart` here: non-restarts must not consume the restart-intensity
606                        // budget, otherwise a steady stream of terminating temporary/transient children would eventually
607                        // trip the supervisor's restart limit and tear it (and its siblings) down.
608                        debug!(supervisor_id = %self.supervisor_id, worker_name = child_spec.name(), ?restart_type, ?worker_result, "Child process exited and is not eligible for restart.");
609                    } else {
610                        match restart_state.evaluate_restart() {
611                            RestartAction::Restart(mode) => match mode {
612                                RestartMode::OneForOne => {
613                                    warn!(supervisor_id = %self.supervisor_id, worker_name = child_spec.name(), ?worker_result, "Child process terminated, restarting.");
614                                    self.spawn_child(child_spec_idx, &mut worker_state)?;
615                                }
616                                RestartMode::OneForAll => {
617                                    warn!(supervisor_id = %self.supervisor_id, worker_name = child_spec.name(), ?worker_result, "Child process terminated, restarting all processes.");
618                                    worker_state.shutdown_workers().await;
619                                    self.respawn_children_one_for_all(&mut worker_state)?;
620                                }
621                            },
622                            RestartAction::Shutdown => {
623                                error!(supervisor_id = %self.supervisor_id, worker_name = child_spec.name(), ?worker_result, "Supervisor shutting down due to restart limits.");
624                                worker_state.shutdown_workers().await;
625                                return Err(SupervisorError::Shutdown);
626                            }
627                        }
628                    }
629                }
630            }
631        }
632
633        Ok(())
634    }
635
636    fn as_nested_process(&self, process: Process, process_shutdown: ShutdownHandle) -> WorkerFuture {
637        // Simple wrapper around `run_inner` to satisfy the return type signature needed when running the supervisor as
638        // a nested child process in another supervisor.
639        debug!(supervisor_id = %self.supervisor_id, "Nested supervisor starting.");
640
641        // Create a standalone clone of ourselves so we can fulfill the future signature.
642        let sup = self.inner_clone();
643
644        Box::pin(async move {
645            sup.run_inner(process, process_shutdown)
646                .await
647                .map_err(WorkerError::from)
648        })
649    }
650
651    /// Runs the supervisor forever.
652    ///
653    /// # Errors
654    ///
655    /// If the supervisor exceeds its restart limits, or fails to initialize a child process, an error is returned.
656    pub async fn run(&mut self) -> Result<(), SupervisorError> {
657        // Create a no-op `ShutdownHandle` to satisfy the `run_inner` function. This is never used since we want to run
658        // forever, but we need to satisfy the signature.
659        let process_shutdown = ShutdownHandle::noop();
660        let process = Process::supervisor(&self.supervisor_id, None).context(InvalidName {
661            name: self.supervisor_id.to_string(),
662        })?;
663
664        debug!(supervisor_id = %self.supervisor_id, "Supervisor starting.");
665        self.run_inner(process.clone(), process_shutdown)
666            .into_process_future(process)
667            .await
668    }
669
670    /// Runs the supervisor until shutdown is triggered.
671    ///
672    /// When `shutdown` resolves, the supervisor will shutdown all child processes according to their shutdown strategy,
673    /// and then return.
674    ///
675    /// # Errors
676    ///
677    /// If the supervisor exceeds its restart limits, or fails to initialize a child process, an error is returned.
678    pub async fn run_with_shutdown<F: Future + Send + 'static>(&mut self, shutdown: F) -> Result<(), SupervisorError> {
679        // Drive the caller-provided shutdown future into a trigger so the supervisor can begin shutting down its
680        // children once `shutdown` resolves. The trigger fires at most once (guarded), and otherwise fires on drop if
681        // the supervisor returns on its own first.
682        let (shutdown_coordinator, shutdown_handle) = ShutdownHandle::paired();
683        let run = self.run_with_shutdown_inner(shutdown_handle, None);
684        pin!(run, shutdown);
685
686        let mut shutdown_coordinator = Some(shutdown_coordinator);
687        loop {
688            select! {
689                result = &mut run => return result,
690                _ = &mut shutdown, if shutdown_coordinator.is_some() => {
691                    shutdown_coordinator.take().expect("coordinator present per select guard").shutdown();
692                }
693            }
694        }
695    }
696
697    /// Runs the supervisor until the given `ShutdownHandle` signal is received.
698    ///
699    /// This is an internal variant of `run_with_shutdown` that takes a `ShutdownHandle` directly, used when spawning
700    /// supervisors in dedicated runtimes where the shutdown signal is already wrapped in a `ShutdownHandle`.
701    ///
702    /// If `dataspace` is provided, the supervisor will use it instead of creating a new one. This is used to propagate
703    /// the parent's dataspace across OS thread boundaries for dedicated runtimes.
704    ///
705    /// # Errors
706    ///
707    /// If the supervisor exceeds its restart limits, or fails to initialize a child process, an error is returned.
708    pub(crate) async fn run_with_shutdown_inner(
709        &mut self, process_shutdown: ShutdownHandle, dataspace: Option<DataspaceRegistry>,
710    ) -> Result<(), SupervisorError> {
711        let process =
712            Process::supervisor_with_dataspace(&self.supervisor_id, None, dataspace).context(InvalidName {
713                name: self.supervisor_id.to_string(),
714            })?;
715
716        debug!(supervisor_id = %self.supervisor_id, "Supervisor starting.");
717        self.run_inner(process.clone(), process_shutdown)
718            .into_process_future(process)
719            .await
720    }
721
722    fn inner_clone(&self) -> Self {
723        // This is no different than if we just implemented `Clone` directly, but it allows us to avoid exposing a
724        // _public_ implementation of `Clone`, which we don't want normal users to be able to do. We only need this
725        // internally to support nested supervisors.
726        Self {
727            supervisor_id: Arc::clone(&self.supervisor_id),
728            child_specs: self.child_specs.clone(),
729            restart_strategy: self.restart_strategy,
730            runtime_mode: self.runtime_mode.clone(),
731        }
732    }
733}
734
735struct ProcessState {
736    worker_id: usize,
737    shutdown_strategy: ShutdownStrategy,
738    shutdown_coordinator: ShutdownCoordinator,
739    abort_handle: AbortHandle,
740}
741
742struct WorkerState {
743    process: Process,
744    worker_tasks: JoinSet<Result<(), WorkerError>>,
745    worker_map: FastIndexMap<Id, ProcessState>,
746}
747
748impl WorkerState {
749    fn new(process: Process) -> Self {
750        Self {
751            process,
752            worker_tasks: JoinSet::new(),
753            worker_map: FastIndexMap::default(),
754        }
755    }
756
757    fn add_worker(&mut self, worker_id: usize, child_spec: &SupervisedChild) -> Result<(), SupervisorError> {
758        let (shutdown_coordinator, shutdown_handle) = ShutdownHandle::paired();
759        let process = child_spec.create_process(&self.process)?;
760        let worker_future = child_spec.create_worker_future(process.clone(), shutdown_handle)?;
761        let shutdown_strategy = child_spec.shutdown_strategy();
762        let abort_handle = self.worker_tasks.spawn(worker_future.into_process_future(process));
763        self.worker_map.insert(
764            abort_handle.id(),
765            ProcessState {
766                worker_id,
767                shutdown_strategy,
768                shutdown_coordinator,
769                abort_handle,
770            },
771        );
772        Ok(())
773    }
774
775    async fn wait_for_next_worker(&mut self) -> (usize, Result<(), WorkerError>) {
776        debug!("Waiting for next process to complete.");
777
778        // If there are no workers to wait on, park indefinitely so the supervisor's select loop only proceeds via its
779        // other arms (shutdown, or -- for dynamic supervisors -- a newly-added worker). Without this guard,
780        // `join_next_with_id` would return `None` immediately on an empty set and the supervisor would busy-loop. The
781        // set legitimately empties when all children are non-restartable (e.g. `RestartType::Temporary`) and have
782        // exited.
783        if self.worker_tasks.is_empty() {
784            std::future::pending::<()>().await;
785        }
786
787        match self.worker_tasks.join_next_with_id().await {
788            Some(Ok((worker_task_id, worker_result))) => {
789                let process_state = self
790                    .worker_map
791                    .swap_remove(&worker_task_id)
792                    .expect("worker task ID not found");
793                (process_state.worker_id, worker_result)
794            }
795            Some(Err(e)) => {
796                let worker_task_id = e.id();
797                let process_state = self
798                    .worker_map
799                    .swap_remove(&worker_task_id)
800                    .expect("worker task ID not found");
801                let e = if e.is_cancelled() {
802                    ProcessError::Aborted
803                } else {
804                    ProcessError::Panicked
805                };
806                (process_state.worker_id, Err(WorkerError::Runtime(e.into())))
807            }
808            None => unreachable!(
809                "join set is non-empty here: we park above while empty, and only this method removes workers"
810            ),
811        }
812    }
813
814    async fn shutdown_workers(&mut self) {
815        debug!("Shutting down all processes.");
816
817        // Pop entries from the worker map, which grabs us workers in the reverse order they were added. This lets us
818        // ensure we're shutting down any _dependent_ processes (processes which depend on previously-started processes)
819        // first.
820        //
821        // For each entry, we trigger shutdown in whatever way necessary, and then wait for the process to exit by
822        // driving the `JoinSet`. If other workers complete while we're waiting, we'll simply remove them from the
823        // worker map and continue waiting for the current worker we're shutting down.
824        //
825        // We do this until the worker map is empty, at which point we can be sure that all processes have exited.
826        while let Some((current_worker_task_id, process_state)) = self.worker_map.pop() {
827            let ProcessState {
828                worker_id,
829                shutdown_strategy,
830                shutdown_coordinator,
831                abort_handle,
832            } = process_state;
833
834            // Trigger the process to shutdown based on the configured shutdown strategy.
835            let shutdown_deadline = match shutdown_strategy {
836                ShutdownStrategy::Graceful(timeout) => {
837                    debug!(worker_id, shutdown_timeout = ?timeout, "Gracefully shutting down process.");
838                    shutdown_coordinator.shutdown();
839
840                    tokio::time::sleep(timeout)
841                }
842                ShutdownStrategy::Brutal => {
843                    debug!(worker_id, "Forcefully aborting process.");
844                    abort_handle.abort();
845
846                    // We have to return a future that never resolves, since we're already aborting it. This is a little
847                    // hacky but it's also difficult to do an optional future, so this is what we're going with for now.
848                    tokio::time::sleep(Duration::MAX)
849                }
850            };
851            pin!(shutdown_deadline);
852
853            // Wait for the process to exit by driving the `JoinSet`. If other workers complete while we're waiting,
854            // we'll simply remove them from the worker map and continue waiting.
855            loop {
856                select! {
857                    worker_result = self.worker_tasks.join_next_with_id() => {
858                        match worker_result {
859                            Some(Ok((worker_task_id, _))) => {
860                                if worker_task_id == current_worker_task_id {
861                                    debug!(?worker_task_id, "Target process exited successfully.");
862                                    break;
863                                } else {
864                                    debug!(?worker_task_id, "Non-target process exited successfully. Continuing to wait.");
865                                    self.worker_map.swap_remove(&worker_task_id);
866                                }
867                            },
868                            Some(Err(e)) => {
869                                let worker_task_id = e.id();
870                                if worker_task_id == current_worker_task_id {
871                                    debug!(?worker_task_id, "Target process exited with error.");
872                                    break;
873                                } else {
874                                    debug!(?worker_task_id, "Non-target process exited with error. Continuing to wait.");
875                                    self.worker_map.swap_remove(&worker_task_id);
876                                }
877                            }
878                            None => unreachable!("worker task must exist in join set if we are waiting for it"),
879                        }
880                    },
881                    // We've exceeded the shutdown timeout, so we need to abort the process.
882                    _ = &mut shutdown_deadline => {
883                        debug!(worker_id, "Shutdown timeout expired, forcefully aborting process.");
884                        abort_handle.abort();
885                    }
886                }
887            }
888        }
889
890        debug_assert!(self.worker_map.is_empty(), "worker map should be empty after shutdown");
891        debug_assert!(
892            self.worker_tasks.is_empty(),
893            "worker tasks should be empty after shutdown"
894        );
895    }
896}
897
898#[cfg(test)]
899mod tests {
900    use std::sync::atomic::{AtomicUsize, Ordering};
901
902    use async_trait::async_trait;
903    use tokio::{
904        sync::oneshot,
905        task::JoinHandle,
906        time::{sleep, timeout},
907    };
908
909    use super::*;
910
911    /// Behavior for a mock worker during initialization.
912    #[derive(Clone)]
913    enum InitBehavior {
914        /// Initialization succeeds immediately.
915        Instant,
916
917        /// Initialization takes the given duration before succeeding.
918        Slow(Duration),
919
920        /// Initialization fails with the given message.
921        Fail(&'static str),
922    }
923
924    /// Behavior for a mock worker during runtime (after initialization).
925    #[derive(Clone)]
926    enum RunBehavior {
927        /// Runs until shutdown is received.
928        UntilShutdown,
929
930        /// Fails with the given error message after the given delay.
931        FailAfter(Duration, &'static str),
932
933        /// Completes successfully after the given delay.
934        CompleteAfter(Duration),
935    }
936
937    /// A configurable mock worker for testing supervisor behavior.
938    struct MockWorker {
939        name: &'static str,
940        init_behavior: InitBehavior,
941        run_behavior: RunBehavior,
942        start_count: Arc<AtomicUsize>,
943    }
944
945    impl MockWorker {
946        /// Creates a worker that runs until shutdown.
947        fn long_running(name: &'static str) -> Self {
948            Self {
949                name,
950                init_behavior: InitBehavior::Instant,
951                run_behavior: RunBehavior::UntilShutdown,
952                start_count: Arc::new(AtomicUsize::new(0)),
953            }
954        }
955
956        /// Creates a worker that fails after the given delay.
957        fn failing(name: &'static str, delay: Duration) -> Self {
958            Self {
959                name,
960                init_behavior: InitBehavior::Instant,
961                run_behavior: RunBehavior::FailAfter(delay, "worker failed"),
962                start_count: Arc::new(AtomicUsize::new(0)),
963            }
964        }
965
966        /// Creates a worker that completes successfully after the given delay.
967        fn completing(name: &'static str, delay: Duration) -> Self {
968            Self {
969                name,
970                init_behavior: InitBehavior::Instant,
971                run_behavior: RunBehavior::CompleteAfter(delay),
972                start_count: Arc::new(AtomicUsize::new(0)),
973            }
974        }
975
976        /// Creates a worker that fails during initialization.
977        fn init_failure(name: &'static str) -> Self {
978            Self {
979                name,
980                init_behavior: InitBehavior::Fail("init failed"),
981                run_behavior: RunBehavior::UntilShutdown,
982                start_count: Arc::new(AtomicUsize::new(0)),
983            }
984        }
985
986        /// Creates a worker with slow initialization.
987        fn slow_init(name: &'static str, init_delay: Duration) -> Self {
988            Self {
989                name,
990                init_behavior: InitBehavior::Slow(init_delay),
991                run_behavior: RunBehavior::UntilShutdown,
992                start_count: Arc::new(AtomicUsize::new(0)),
993            }
994        }
995
996        /// Returns a shared handle to the start count for this worker.
997        fn start_count(&self) -> Arc<AtomicUsize> {
998            Arc::clone(&self.start_count)
999        }
1000    }
1001
1002    #[async_trait]
1003    impl Supervisable for MockWorker {
1004        fn name(&self) -> &str {
1005            self.name
1006        }
1007
1008        fn shutdown_strategy(&self) -> ShutdownStrategy {
1009            ShutdownStrategy::Graceful(Duration::from_millis(500))
1010        }
1011
1012        async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
1013            match &self.init_behavior {
1014                InitBehavior::Instant => {}
1015                InitBehavior::Slow(delay) => {
1016                    sleep(*delay).await;
1017                }
1018                InitBehavior::Fail(msg) => {
1019                    return Err(InitializationError::Failed {
1020                        source: GenericError::msg(*msg),
1021                    });
1022                }
1023            }
1024
1025            let start_count = Arc::clone(&self.start_count);
1026            let run_behavior = self.run_behavior.clone();
1027
1028            Ok(Box::pin(async move {
1029                start_count.fetch_add(1, Ordering::SeqCst);
1030
1031                match run_behavior {
1032                    RunBehavior::UntilShutdown => {
1033                        process_shutdown.await;
1034                        Ok(())
1035                    }
1036                    RunBehavior::FailAfter(delay, msg) => {
1037                        select! {
1038                            _ = sleep(delay) => {
1039                                Err(GenericError::msg(msg))
1040                            }
1041                            _ = process_shutdown => {
1042                                Ok(())
1043                            }
1044                        }
1045                    }
1046                    RunBehavior::CompleteAfter(delay) => {
1047                        select! {
1048                            _ = sleep(delay) => Ok(()),
1049                            _ = process_shutdown => Ok(()),
1050                        }
1051                    }
1052                }
1053            }))
1054        }
1055    }
1056
1057    /// Helper: run a supervisor with a oneshot-based shutdown trigger.
1058    /// Returns the supervisor result and provides the shutdown sender.
1059    async fn run_supervisor_with_trigger(
1060        mut supervisor: Supervisor,
1061    ) -> (oneshot::Sender<()>, JoinHandle<Result<(), SupervisorError>>) {
1062        let (tx, rx) = oneshot::channel();
1063        let handle = tokio::spawn(async move { supervisor.run_with_shutdown(rx).await });
1064        // Give the supervisor a moment to start and spawn children.
1065        sleep(Duration::from_millis(50)).await;
1066        (tx, handle)
1067    }
1068
1069    // -- Supervisor run mode tests ---------------------------------------------------------
1070
1071    #[tokio::test]
1072    async fn standalone_supervisor_shuts_down_cleanly() {
1073        let mut sup = Supervisor::new("test-sup").unwrap();
1074        sup.add_worker(MockWorker::long_running("worker1"));
1075        sup.add_worker(MockWorker::long_running("worker2"));
1076
1077        let (tx, handle) = run_supervisor_with_trigger(sup).await;
1078        tx.send(()).unwrap();
1079
1080        let result = timeout(Duration::from_secs(2), handle).await.unwrap().unwrap();
1081        assert!(result.is_ok());
1082    }
1083
1084    #[tokio::test]
1085    async fn nested_supervisor_shuts_down_cleanly() {
1086        let mut child_sup = Supervisor::new("child-sup").unwrap();
1087        child_sup.add_worker(MockWorker::long_running("inner-worker"));
1088
1089        let mut parent_sup = Supervisor::new("parent-sup").unwrap();
1090        parent_sup.add_worker(MockWorker::long_running("outer-worker"));
1091        parent_sup.add_worker(child_sup);
1092
1093        let (tx, handle) = run_supervisor_with_trigger(parent_sup).await;
1094        tx.send(()).unwrap();
1095
1096        let result = timeout(Duration::from_secs(2), handle).await.unwrap().unwrap();
1097        assert!(result.is_ok());
1098    }
1099
1100    #[tokio::test]
1101    async fn supervisor_with_no_children_returns_error() {
1102        let mut sup = Supervisor::new("empty-sup").unwrap();
1103
1104        let (tx, rx) = oneshot::channel::<()>();
1105        let result = sup.run_with_shutdown(rx).await;
1106        drop(tx);
1107
1108        assert!(matches!(result, Err(SupervisorError::NoChildren)));
1109    }
1110
1111    // -- Child restart behavior tests ------------------------------------------------------
1112
1113    #[tokio::test]
1114    async fn one_for_one_restarts_only_failed_child() {
1115        let failing = MockWorker::failing("failing-worker", Duration::from_millis(50));
1116        let failing_count = failing.start_count();
1117
1118        let stable = MockWorker::long_running("stable-worker");
1119        let stable_count = stable.start_count();
1120
1121        let mut sup = Supervisor::new("test-sup").unwrap().with_restart_strategy(
1122            RestartStrategy::one_to_one().with_intensity_and_period(20, Duration::from_secs(10)),
1123        );
1124        sup.add_worker(stable);
1125        sup.add_worker(failing);
1126
1127        let (tx, handle) = run_supervisor_with_trigger(sup).await;
1128
1129        // Wait for a few restarts to happen.
1130        sleep(Duration::from_millis(300)).await;
1131        let _ = tx.send(());
1132
1133        let result = timeout(Duration::from_secs(2), handle).await.unwrap().unwrap();
1134        assert!(result.is_ok());
1135
1136        // The failing worker should have been started multiple times.
1137        assert!(
1138            failing_count.load(Ordering::SeqCst) >= 2,
1139            "failing worker should have been restarted"
1140        );
1141        // The stable worker should only have been started once (never restarted).
1142        assert_eq!(
1143            stable_count.load(Ordering::SeqCst),
1144            1,
1145            "stable worker should not have been restarted"
1146        );
1147    }
1148
1149    #[tokio::test]
1150    async fn one_for_all_restarts_all_children() {
1151        let failing = MockWorker::failing("failing-worker", Duration::from_millis(50));
1152        let failing_count = failing.start_count();
1153
1154        let stable = MockWorker::long_running("stable-worker");
1155        let stable_count = stable.start_count();
1156
1157        let mut sup = Supervisor::new("test-sup").unwrap().with_restart_strategy(
1158            RestartStrategy::one_for_all().with_intensity_and_period(20, Duration::from_secs(10)),
1159        );
1160        sup.add_worker(stable);
1161        sup.add_worker(failing);
1162
1163        let (tx, handle) = run_supervisor_with_trigger(sup).await;
1164
1165        // Wait for at least one restart cycle.
1166        sleep(Duration::from_millis(300)).await;
1167        let _ = tx.send(());
1168
1169        let result = timeout(Duration::from_secs(2), handle).await.unwrap().unwrap();
1170        assert!(result.is_ok());
1171
1172        // Both workers should have been started multiple times.
1173        assert!(
1174            failing_count.load(Ordering::SeqCst) >= 2,
1175            "failing worker should have been restarted"
1176        );
1177        assert!(
1178            stable_count.load(Ordering::SeqCst) >= 2,
1179            "stable worker should also have been restarted"
1180        );
1181    }
1182
1183    #[tokio::test]
1184    async fn one_for_all_does_not_restart_temporary_children() {
1185        // A permanent worker that fails repeatedly drives one-for-all restarts; a temporary sibling is shut down with
1186        // the group on each cycle but, per OTP semantics, must never be brought back.
1187        let failing = MockWorker::failing("failing-worker", Duration::from_millis(50));
1188        let failing_count = failing.start_count();
1189
1190        let temp = MockWorker::long_running("temp-worker");
1191        let temp_count = temp.start_count();
1192
1193        let mut sup = Supervisor::new("test-sup").unwrap().with_restart_strategy(
1194            RestartStrategy::one_for_all().with_intensity_and_period(20, Duration::from_secs(10)),
1195        );
1196        sup.add_worker(ChildSpecification::worker(temp).with_restart_type(RestartType::Temporary));
1197        sup.add_worker(failing);
1198
1199        let (tx, handle) = run_supervisor_with_trigger(sup).await;
1200
1201        // Let several one-for-all cycles occur.
1202        sleep(Duration::from_millis(300)).await;
1203        let _ = tx.send(());
1204
1205        let result = timeout(Duration::from_secs(2), handle).await.unwrap().unwrap();
1206        assert!(result.is_ok());
1207        assert!(
1208            failing_count.load(Ordering::SeqCst) >= 2,
1209            "permanent worker should have been restarted by one-for-all"
1210        );
1211        assert_eq!(
1212            temp_count.load(Ordering::SeqCst),
1213            1,
1214            "temporary child must not be restarted by a one-for-all group restart"
1215        );
1216    }
1217
1218    #[tokio::test]
1219    async fn one_for_all_restarts_transient_children() {
1220        // A transient child that exits cleanly is not restarted on its own, but a one-for-all restart triggered by a
1221        // sibling restarts it anyway -- matching OTP, where only temporary children are exempt from group restarts.
1222        let transient = MockWorker::completing("transient-worker", Duration::from_millis(30));
1223        let transient_count = transient.start_count();
1224
1225        // Fails after the transient has already exited cleanly, so the group restart is what brings the transient back.
1226        let failing = MockWorker::failing("failing-worker", Duration::from_millis(80));
1227
1228        let mut sup = Supervisor::new("test-sup").unwrap().with_restart_strategy(
1229            RestartStrategy::one_for_all().with_intensity_and_period(20, Duration::from_secs(10)),
1230        );
1231        sup.add_worker(ChildSpecification::worker(transient).with_restart_type(RestartType::Transient));
1232        sup.add_worker(failing);
1233
1234        let (tx, handle) = run_supervisor_with_trigger(sup).await;
1235
1236        sleep(Duration::from_millis(300)).await;
1237        let _ = tx.send(());
1238
1239        let result = timeout(Duration::from_secs(2), handle).await.unwrap().unwrap();
1240        assert!(result.is_ok());
1241        assert!(
1242            transient_count.load(Ordering::SeqCst) >= 2,
1243            "transient child must be restarted by a one-for-all group restart, even after a clean exit"
1244        );
1245    }
1246
1247    #[tokio::test]
1248    async fn restart_limit_exceeded_shuts_down_supervisor() {
1249        let mut sup = Supervisor::new("test-sup")
1250            .unwrap()
1251            .with_restart_strategy(RestartStrategy::one_to_one().with_intensity_and_period(1, Duration::from_secs(10)));
1252        // This worker fails immediately, which will exhaust the restart budget quickly.
1253        sup.add_worker(MockWorker::failing("fast-fail", Duration::ZERO));
1254
1255        let (tx, rx) = oneshot::channel::<()>();
1256        let handle = tokio::spawn(async move { sup.run_with_shutdown(rx).await });
1257
1258        let result = timeout(Duration::from_secs(2), handle).await.unwrap().unwrap();
1259        drop(tx);
1260
1261        assert!(matches!(result, Err(SupervisorError::Shutdown)));
1262    }
1263
1264    // -- Restart type tests ----------------------------------------------------------------
1265
1266    #[tokio::test]
1267    async fn temporary_child_is_not_restarted() {
1268        // A temporary worker that fails quickly, alongside a long-running worker that keeps the supervisor alive.
1269        let temp = MockWorker::failing("temp-worker", Duration::from_millis(50));
1270        let temp_count = temp.start_count();
1271
1272        let stable = MockWorker::long_running("stable-worker");
1273
1274        let mut sup = Supervisor::new("test-sup").unwrap().with_restart_strategy(
1275            RestartStrategy::one_to_one().with_intensity_and_period(20, Duration::from_secs(10)),
1276        );
1277        sup.add_worker(stable);
1278        sup.add_worker(ChildSpecification::worker(temp).with_restart_type(RestartType::Temporary));
1279
1280        let (tx, handle) = run_supervisor_with_trigger(sup).await;
1281
1282        // Give the temporary worker time to fail; it must not be restarted.
1283        sleep(Duration::from_millis(300)).await;
1284        let _ = tx.send(());
1285
1286        let result = timeout(Duration::from_secs(2), handle).await.unwrap().unwrap();
1287        assert!(result.is_ok());
1288        assert_eq!(
1289            temp_count.load(Ordering::SeqCst),
1290            1,
1291            "temporary worker must not be restarted"
1292        );
1293    }
1294
1295    #[tokio::test]
1296    async fn transient_child_is_not_restarted_on_clean_exit() {
1297        let transient = MockWorker::completing("transient-worker", Duration::from_millis(50));
1298        let transient_count = transient.start_count();
1299
1300        let stable = MockWorker::long_running("stable-worker");
1301
1302        let mut sup = Supervisor::new("test-sup").unwrap().with_restart_strategy(
1303            RestartStrategy::one_to_one().with_intensity_and_period(20, Duration::from_secs(10)),
1304        );
1305        sup.add_worker(stable);
1306        sup.add_worker(ChildSpecification::worker(transient).with_restart_type(RestartType::Transient));
1307
1308        let (tx, handle) = run_supervisor_with_trigger(sup).await;
1309
1310        sleep(Duration::from_millis(300)).await;
1311        let _ = tx.send(());
1312
1313        let result = timeout(Duration::from_secs(2), handle).await.unwrap().unwrap();
1314        assert!(result.is_ok());
1315        assert_eq!(
1316            transient_count.load(Ordering::SeqCst),
1317            1,
1318            "transient worker must not be restarted after a clean exit"
1319        );
1320    }
1321
1322    #[tokio::test]
1323    async fn transient_child_is_restarted_on_failure() {
1324        let transient = MockWorker::failing("transient-worker", Duration::from_millis(50));
1325        let transient_count = transient.start_count();
1326
1327        let mut sup = Supervisor::new("test-sup").unwrap().with_restart_strategy(
1328            RestartStrategy::one_to_one().with_intensity_and_period(20, Duration::from_secs(10)),
1329        );
1330        sup.add_worker(ChildSpecification::worker(transient).with_restart_type(RestartType::Transient));
1331
1332        let (tx, handle) = run_supervisor_with_trigger(sup).await;
1333
1334        sleep(Duration::from_millis(300)).await;
1335        let _ = tx.send(());
1336
1337        let result = timeout(Duration::from_secs(2), handle).await.unwrap().unwrap();
1338        assert!(result.is_ok());
1339        assert!(
1340            transient_count.load(Ordering::SeqCst) >= 2,
1341            "transient worker must be restarted after an abnormal exit"
1342        );
1343    }
1344
1345    #[tokio::test]
1346    async fn permanent_child_is_restarted_on_clean_exit() {
1347        // A permanent worker that completes cleanly must still be restarted -- this is what distinguishes
1348        // `Permanent` from `Transient`, which is left stopped after a clean exit.
1349        let permanent = MockWorker::completing("permanent-worker", Duration::from_millis(50));
1350        let permanent_count = permanent.start_count();
1351
1352        let mut sup = Supervisor::new("test-sup").unwrap().with_restart_strategy(
1353            RestartStrategy::one_to_one().with_intensity_and_period(20, Duration::from_secs(10)),
1354        );
1355        // Added with the default restart policy, which is `Permanent`.
1356        sup.add_worker(permanent);
1357
1358        let (tx, handle) = run_supervisor_with_trigger(sup).await;
1359
1360        sleep(Duration::from_millis(300)).await;
1361        let _ = tx.send(());
1362
1363        let result = timeout(Duration::from_secs(2), handle).await.unwrap().unwrap();
1364        assert!(result.is_ok());
1365        assert!(
1366            permanent_count.load(Ordering::SeqCst) >= 2,
1367            "permanent worker must be restarted even after a clean exit"
1368        );
1369    }
1370
1371    #[tokio::test]
1372    async fn temporary_failures_do_not_consume_restart_intensity() {
1373        // With intensity=1, two *restartable* failures within the period would shut the supervisor down. Here several
1374        // temporary workers all fail quickly. Because temporary exits aren't eligible for restart, they must not consume
1375        // the restart-intensity budget, and the supervisor must stay up.
1376        let mut sup = Supervisor::new("test-sup")
1377            .unwrap()
1378            .with_restart_strategy(RestartStrategy::one_to_one().with_intensity_and_period(1, Duration::from_secs(10)));
1379
1380        let workers = [
1381            MockWorker::failing("temp-0", Duration::from_millis(20)),
1382            MockWorker::failing("temp-1", Duration::from_millis(20)),
1383            MockWorker::failing("temp-2", Duration::from_millis(20)),
1384            MockWorker::failing("temp-3", Duration::from_millis(20)),
1385            MockWorker::failing("temp-4", Duration::from_millis(20)),
1386        ];
1387        let counts: Vec<_> = workers.iter().map(|w| w.start_count()).collect();
1388        for worker in workers {
1389            sup.add_worker(ChildSpecification::worker(worker).with_restart_type(RestartType::Temporary));
1390        }
1391        // A long-running worker so the supervisor doesn't simply idle once the temporaries are gone.
1392        sup.add_worker(MockWorker::long_running("stable-worker"));
1393
1394        let (tx, handle) = run_supervisor_with_trigger(sup).await;
1395        sleep(Duration::from_millis(300)).await;
1396        let _ = tx.send(());
1397
1398        let result = timeout(Duration::from_secs(2), handle).await.unwrap().unwrap();
1399        assert!(
1400            result.is_ok(),
1401            "supervisor must not trip its restart limit on temporary exits"
1402        );
1403        for count in counts {
1404            assert_eq!(
1405                count.load(Ordering::SeqCst),
1406                1,
1407                "each temporary worker runs exactly once"
1408            );
1409        }
1410    }
1411
1412    #[tokio::test]
1413    async fn supervisor_idles_when_all_temporary_children_exit() {
1414        // When every child is temporary and they all exit, the worker set drains. The supervisor must not panic or exit
1415        // on its own; it should keep waiting until shutdown is triggered.
1416        let mut sup = Supervisor::new("test-sup").unwrap();
1417        sup.add_worker(
1418            ChildSpecification::worker(MockWorker::completing("temp-a", Duration::from_millis(30)))
1419                .with_restart_type(RestartType::Temporary),
1420        );
1421        sup.add_worker(
1422            ChildSpecification::worker(MockWorker::completing("temp-b", Duration::from_millis(30)))
1423                .with_restart_type(RestartType::Temporary),
1424        );
1425
1426        let (tx, handle) = run_supervisor_with_trigger(sup).await;
1427
1428        // Both children complete well within this window; the supervisor should still be running (idling).
1429        sleep(Duration::from_millis(200)).await;
1430        assert!(
1431            !handle.is_finished(),
1432            "supervisor must keep running after all children exit"
1433        );
1434
1435        let _ = tx.send(());
1436        let result = timeout(Duration::from_secs(2), handle).await.unwrap().unwrap();
1437        assert!(result.is_ok());
1438    }
1439
1440    // -- Initialization failure tests ------------------------------------------------------
1441
1442    #[tokio::test]
1443    async fn init_failure_propagates_with_child_name() {
1444        let mut sup = Supervisor::new("test-sup").unwrap();
1445        sup.add_worker(MockWorker::long_running("good-worker"));
1446        sup.add_worker(MockWorker::init_failure("bad-worker"));
1447
1448        let (_tx, rx) = oneshot::channel::<()>();
1449        let result = timeout(Duration::from_secs(2), sup.run_with_shutdown(rx))
1450            .await
1451            .unwrap();
1452
1453        match result {
1454            Err(SupervisorError::FailedToInitialize { child_name, .. }) => {
1455                assert_eq!(child_name, "bad-worker");
1456            }
1457            other => panic!("expected FailedToInitialize, got: {:?}", other),
1458        }
1459    }
1460
1461    #[tokio::test]
1462    async fn init_failure_does_not_trigger_restart() {
1463        let init_fail = MockWorker::init_failure("bad-worker");
1464        let start_count = init_fail.start_count();
1465
1466        let mut sup = Supervisor::new("test-sup").unwrap().with_restart_strategy(
1467            RestartStrategy::one_to_one().with_intensity_and_period(10, Duration::from_secs(10)),
1468        );
1469        sup.add_worker(init_fail);
1470
1471        let (_tx, rx) = oneshot::channel::<()>();
1472        let result = timeout(Duration::from_secs(2), sup.run_with_shutdown(rx))
1473            .await
1474            .unwrap();
1475
1476        assert!(matches!(result, Err(SupervisorError::FailedToInitialize { .. })));
1477        // The worker never got past init, so start_count should be 0.
1478        assert_eq!(start_count.load(Ordering::SeqCst), 0);
1479    }
1480
1481    // -- Shutdown responsiveness tests -----------------------------------------------------
1482
1483    #[tokio::test]
1484    async fn shutdown_completes_promptly_in_steady_state() {
1485        let mut sup = Supervisor::new("test-sup").unwrap();
1486        sup.add_worker(MockWorker::long_running("worker1"));
1487        sup.add_worker(MockWorker::long_running("worker2"));
1488
1489        let (tx, handle) = run_supervisor_with_trigger(sup).await;
1490        tx.send(()).unwrap();
1491
1492        // Shutdown should complete well within 1 second (workers respond to shutdown signal immediately).
1493        let result = timeout(Duration::from_secs(1), handle).await;
1494        assert!(result.is_ok(), "shutdown should complete promptly");
1495    }
1496
1497    #[tokio::test]
1498    async fn shutdown_during_slow_init_completes_promptly() {
1499        let mut sup = Supervisor::new("test-sup").unwrap();
1500        // This worker takes 30 seconds to initialize — but we'll trigger shutdown immediately.
1501        sup.add_worker(MockWorker::slow_init("slow-worker", Duration::from_secs(30)));
1502
1503        let (tx, rx) = oneshot::channel();
1504        let handle = tokio::spawn(async move { sup.run_with_shutdown(rx).await });
1505
1506        // Give the supervisor just enough time to spawn the task, then trigger shutdown.
1507        sleep(Duration::from_millis(20)).await;
1508        tx.send(()).unwrap();
1509
1510        // Shutdown should complete quickly even though the worker hasn't finished initializing.
1511        // The supervisor loop sees the shutdown signal and aborts the still-initializing task.
1512        let result = timeout(Duration::from_secs(2), handle).await;
1513        assert!(result.is_ok(), "shutdown during slow init should complete promptly");
1514    }
1515}