saluki_core/state/reflector.rs
1//! Mechanisms for processing a data source and sharing the processed results.
2use std::sync::Arc;
3
4use futures::{Stream, StreamExt};
5use tokio::sync::Notify;
6
7/// Processes input data and modifies shared state based on the result.
8pub trait Processor: Send + Sync {
9 /// The type of input to the processor.
10 type Input;
11
12 /// The state that the processor acts on.
13 type State: Send + Sync;
14
15 /// Builds the initial state for the processor.
16 fn build_initial_state(&self) -> Self::State;
17
18 /// Processes the input, potentially updating the reflector state.
19 fn process(&self, input: Self::Input, state: &Self::State);
20}
21
22struct StoreInner<P: Processor> {
23 processor: P,
24 state: P::State,
25 notify_update: Notify,
26}
27
28/// Shared state based on a processor.
29///
30/// `Store` acts as the glue between a processor and the data that it processes. It acts as the entrypoint for taking in
31/// a a group of items from a data source, running them through the configured processor, and then notifying callers
32/// that an update has taken place.
33struct Store<P: Processor> {
34 inner: Arc<StoreInner<P>>,
35}
36
37impl<P: Processor> Store<P> {
38 fn from_processor(processor: P) -> Self {
39 let state = processor.build_initial_state();
40 Self {
41 inner: Arc::new(StoreInner {
42 processor,
43 state,
44 notify_update: Notify::const_new(),
45 }),
46 }
47 }
48
49 pub fn process<I>(&self, inputs: I)
50 where
51 I: IntoIterator<Item = P::Input>,
52 {
53 for input in inputs {
54 self.inner.processor.process(input, &self.inner.state);
55 }
56 self.inner.notify_update.notify_waiters();
57 }
58
59 pub async fn wait_for_update(&self) {
60 self.inner.notify_update.notified().await;
61 }
62
63 pub fn state(&self) -> &P::State {
64 &self.inner.state
65 }
66}
67
68impl<P: Processor> Clone for Store<P> {
69 fn clone(&self) -> Self {
70 Self {
71 inner: Arc::clone(&self.inner),
72 }
73 }
74}
75
76/// `Reflector` composes a source of data with a processor that is used to transform the data, and then stores the
77/// results and allows for shared access by multiple callers.
78///
79/// Reflectors are a term often found in the context of custom Kubernetes controllers, where they are used to reduce the
80/// load on the Kubernetes API server by caching the state of resources in memory. `Reflector` provides comparable
81/// functionality, allowing for a single data source to be consumed, and then shared amongst multiple callers. However,
82///
83/// `Reflector` utilizes the concept of a "processor", which dictates both the type of data that can be consumed and
84/// data that gets stored. This means that `Reflector` is more than just a cache of the data source, but also
85/// potentially a mapped version of it, allowing for transforming the data in whatever way is necessary.
86#[derive(Clone)]
87pub struct Reflector<P: Processor> {
88 store: Store<P>,
89}
90
91impl<P: Processor> Reflector<P> {
92 /// Creates a new reflector with the given data source and processor.
93 ///
94 /// A reflector composes a source of data with a processor that is used to transform the data, and then stores the
95 /// the processed results. It can be listened to for updates, and cheaply shared. This allows multiple interested
96 /// components to subscribe to the same data source without having to duplicate the processing or storage of the
97 /// data.
98 ///
99 /// A task will be spawned that drives consumption of the data source and processes the items, feeding them into the
100 /// reflector state, which can be queried from the returned `Reflector.`
101 ///
102 /// `Reflector` is cheaply cloneable and can either be cloned for each caller or shared between them (e.g., via
103 /// `Arc<T>`).
104 pub async fn new<S, I>(mut source: S, processor: P) -> Self
105 where
106 S: Stream<Item = I> + Unpin + Send + 'static,
107 I: IntoIterator<Item = P::Input> + Send,
108 P: 'static,
109 {
110 let store = Store::from_processor(processor);
111
112 let sender_store = store.clone();
113 tokio::spawn(async move {
114 while let Some(inputs) = source.next().await {
115 sender_store.process(inputs);
116 }
117 });
118
119 Self { store }
120 }
121}
122
123impl<P: Processor> Reflector<P> {
124 /// Waits for the next update to the reflector.
125 ///
126 /// When this method completes, callers must query the reflector to acquire the latest state.
127 pub async fn wait_for_update(&self) {
128 self.store.wait_for_update().await;
129 }
130
131 /// Returns a reference a to the reflector's state.
132 pub fn state(&self) -> &P::State {
133 self.store.state()
134 }
135}