saluki_core/state/
reflector.rs

1//! Mechanisms for processing a data source and sharing the processed results.
2use std::sync::Arc;
3
4use futures::{Stream, StreamExt};
5use tokio::sync::Notify;
6
7/// Processes input data and modifies shared state based on the result.
8pub trait Processor: Send + Sync {
9    /// The type of input to the processor.
10    type Input;
11
12    /// The state that the processor acts on.
13    type State: Send + Sync;
14
15    /// Builds the initial state for the processor.
16    fn build_initial_state(&self) -> Self::State;
17
18    /// Processes the input, potentially updating the reflector state.
19    fn process(&self, input: Self::Input, state: &Self::State);
20}
21
22struct StoreInner<P: Processor> {
23    processor: P,
24    state: P::State,
25    notify_update: Notify,
26}
27
28/// Shared state based on a processor.
29///
30/// `Store` acts as the glue between a processor and the data that it processes. It acts as the entrypoint for taking in
31/// a a group of items from a data source, running them through the configured processor, and then notifying callers
32/// that an update has taken place.
33struct Store<P: Processor> {
34    inner: Arc<StoreInner<P>>,
35}
36
37impl<P: Processor> Store<P> {
38    fn from_processor(processor: P) -> Self {
39        let state = processor.build_initial_state();
40        Self {
41            inner: Arc::new(StoreInner {
42                processor,
43                state,
44                notify_update: Notify::const_new(),
45            }),
46        }
47    }
48
49    pub fn process<I>(&self, inputs: I)
50    where
51        I: IntoIterator<Item = P::Input>,
52    {
53        for input in inputs {
54            self.inner.processor.process(input, &self.inner.state);
55        }
56        self.inner.notify_update.notify_waiters();
57    }
58
59    pub async fn wait_for_update(&self) {
60        self.inner.notify_update.notified().await;
61    }
62
63    pub fn state(&self) -> &P::State {
64        &self.inner.state
65    }
66}
67
68impl<P: Processor> Clone for Store<P> {
69    fn clone(&self) -> Self {
70        Self {
71            inner: Arc::clone(&self.inner),
72        }
73    }
74}
75
76/// `Reflector` composes a source of data with a processor that is used to transform the data, and then stores the
77/// results and allows for shared access by multiple callers.
78///
79/// Reflectors are a term often found in the context of custom Kubernetes controllers, where they are used to reduce the
80/// load on the Kubernetes API server by caching the state of resources in memory. `Reflector` provides comparable
81/// functionality, allowing for a single data source to be consumed, and then shared amongst multiple callers. However,
82///
83/// `Reflector` utilizes the concept of a "processor", which dictates both the type of data that can be consumed and
84/// data that gets stored. This means that `Reflector` is more than just a cache of the data source, but also
85/// potentially a mapped version of it, allowing for transforming the data in whatever way is necessary.
86#[derive(Clone)]
87pub struct Reflector<P: Processor> {
88    store: Store<P>,
89}
90
91impl<P: Processor> Reflector<P> {
92    /// Creates a new reflector with the given data source and processor.
93    ///
94    /// A reflector composes a source of data with a processor that is used to transform the data, and then stores the
95    /// the processed results. It can be listened to for updates, and cheaply shared. This allows multiple interested
96    /// components to subscribe to the same data source without having to duplicate the processing or storage of the
97    /// data.
98    ///
99    /// A task will be spawned that drives consumption of the data source and processes the items, feeding them into the
100    /// reflector state, which can be queried from the returned `Reflector.`
101    ///
102    /// `Reflector` is cheaply cloneable and can either be cloned for each caller or shared between them (e.g., via
103    /// `Arc<T>`).
104    pub async fn new<S, I>(mut source: S, processor: P) -> Self
105    where
106        S: Stream<Item = I> + Unpin + Send + 'static,
107        I: IntoIterator<Item = P::Input> + Send,
108        P: 'static,
109    {
110        let store = Store::from_processor(processor);
111
112        let sender_store = store.clone();
113        tokio::spawn(async move {
114            while let Some(inputs) = source.next().await {
115                sender_store.process(inputs);
116            }
117        });
118
119        Self { store }
120    }
121}
122
123impl<P: Processor> Reflector<P> {
124    /// Waits for the next update to the reflector.
125    ///
126    /// When this method completes, callers must query the reflector to acquire the latest state.
127    pub async fn wait_for_update(&self) {
128        self.store.wait_for_update().await;
129    }
130
131    /// Returns a reference a to the reflector's state.
132    pub fn state(&self) -> &P::State {
133        self.store.state()
134    }
135}