saluki_core/components/encoders/mod.rs
1//! Encoder component basics.
2
3use async_trait::async_trait;
4use saluki_error::GenericError;
5
6use crate::{data_model::event::Event, topology::PayloadsDispatcher};
7
8mod builder;
9pub use self::builder::{EncoderBuilder, IncrementalEncoderBuilder};
10
11mod context;
12pub use self::context::EncoderContext;
13
14/// Encoder process result.
15#[allow(clippy::large_enum_variant)]
16pub enum ProcessResult {
17 /// The encoder processed the event successfully and is ready to process more events.
18 Continue,
19
20 /// The encoder cannot process the event without flushing first.
21 ///
22 /// The caller should flush the encoder and try again to process the event.
23 FlushRequired(Event),
24}
25
26/// A encoder.
27///
28/// Encoders are the bridge between forwarders and the rest of the topology. They are responsible for encoding
29/// telemetry events into output payloads that can then be forwarded. Most encoders are specific to a particular system
30/// and not simply equivalent to a certain encoding or serialization format: while two encoders may both produce JSON,
31/// they may produce different JSON formats such that one format only works for product A and the other only works for
32/// product B. In essence, the process of taking telemetry events and sending them to product A ends up becoming the sum
33/// of a specific encoder and forwarder combination.
34///
35/// Examples of typical encoders include Datadog Metrics, Events, and Service Checks.
36#[async_trait]
37pub trait Encoder {
38 /// Runs the encoder.
39 ///
40 /// The encoder context provides access primarily to the event stream, used to receive events sent to the encoder,
41 /// and the forwarder, used to send payloads to the downstream forwarder in the topology, as well as other
42 /// information such as the component context.
43 ///
44 /// Encoders are expected to run indefinitely until their event stream is terminated, or an error occurs.
45 ///
46 /// # Errors
47 ///
48 /// If an unrecoverable error occurs while running, an error is returned.
49 async fn run(self: Box<Self>, context: EncoderContext) -> Result<(), GenericError>;
50}
51
52/// An incremental encoder.
53///
54/// Incremental encoders represent the essential operations of a encoder: adding events and flushing the resulting
55/// payloads. Generally, encoders should not need to concern themselves with the high-level details of being a
56/// component within a topology, such as responding to health probes, handling graceful shutdown, and so on. Through
57/// separating out the core encoder functionality from the component functionality, we can make it easier to implement
58/// various encoder implementations while only maintaining a few common encoder components that actually _drive_ the
59/// underlying encoder logic.
60#[async_trait]
61pub trait IncrementalEncoder {
62 /// Process a single event.
63 ///
64 /// The encoder will process the event, attempting to add it to the current payload. If the encoder is unable to
65 /// process the event without flushing first, `Ok(ProcessResult::FlushRequired(event))` is returned, containing the
66 /// event that could not be processed. Otherwise, `Ok(ProcessResult::Continue)` is returned.
67 ///
68 /// # Errors
69 ///
70 /// If the encoder cannot process the event due to an unrecoverable error, an error is returned.
71 async fn process_event(&mut self, event: Event) -> Result<ProcessResult, GenericError>;
72
73 /// Flush the encoder, finalizing all current payloads and sending them to the dispatcher.
74 ///
75 /// # Errors
76 ///
77 /// If the encoder cannot flush the payloads, an error is returned.
78 async fn flush(&mut self, dispatcher: &PayloadsDispatcher) -> Result<(), GenericError>;
79}