saluki_core/components/encoders/
mod.rs

1//! Encoder component basics.
2
3use async_trait::async_trait;
4use saluki_error::GenericError;
5
6use crate::{data_model::event::Event, topology::PayloadsDispatcher};
7
8mod builder;
9pub use self::builder::{EncoderBuilder, IncrementalEncoderBuilder};
10
11mod context;
12pub use self::context::EncoderContext;
13
14/// Encoder process result.
15#[allow(clippy::large_enum_variant)]
16pub enum ProcessResult {
17    /// The encoder processed the event successfully and is ready to process more events.
18    Continue,
19
20    /// The encoder cannot process the event without flushing first.
21    ///
22    /// The caller should flush the encoder and try again to process the event.
23    FlushRequired(Event),
24}
25
26/// A encoder.
27///
28/// Encoders are the bridge between forwarders and the rest of the topology. They are responsible for encoding
29/// telemetry events into output payloads that can then be forwarded. Most encoders are specific to a particular system
30/// and not simply equivalent to a certain encoding or serialization format: while two encoders may both produce JSON,
31/// they may produce different JSON formats such that one format only works for product A and the other only works for
32/// product B. In essence, the process of taking telemetry events and sending them to product A ends up becoming the sum
33/// of a specific encoder and forwarder combination.
34///
35/// Examples of typical encoders include Datadog Metrics, Events, and Service Checks.
36#[async_trait]
37pub trait Encoder {
38    /// Runs the encoder.
39    ///
40    /// The encoder context provides access primarily to the event stream, used to receive events sent to the encoder,
41    /// and the forwarder, used to send payloads to the downstream forwarder in the topology, as well as other
42    /// information such as the component context.
43    ///
44    /// Encoders are expected to run indefinitely until their event stream is terminated, or an error occurs.
45    ///
46    /// # Errors
47    ///
48    /// If an unrecoverable error occurs while running, an error is returned.
49    async fn run(self: Box<Self>, context: EncoderContext) -> Result<(), GenericError>;
50}
51
52/// An incremental encoder.
53///
54/// Incremental encoders represent the essential operations of a encoder: adding events and flushing the resulting
55/// payloads. Generally, encoders should not need to concern themselves with the high-level details of being a
56/// component within a topology, such as responding to health probes, handling graceful shutdown, and so on. Through
57/// separating out the core encoder functionality from the component functionality, we can make it easier to implement
58/// various encoder implementations while only maintaining a few common encoder components that actually _drive_ the
59/// underlying encoder logic.
60#[async_trait]
61pub trait IncrementalEncoder {
62    /// Process a single event.
63    ///
64    /// The encoder will process the event, attempting to add it to the current payload. If the encoder is unable to
65    /// process the event without flushing first, `Ok(ProcessResult::FlushRequired(event))` is returned, containing the
66    /// event that could not be processed. Otherwise, `Ok(ProcessResult::Continue)` is returned.
67    ///
68    /// # Errors
69    ///
70    /// If the encoder cannot process the event due to an unrecoverable error, an error is returned.
71    async fn process_event(&mut self, event: Event) -> Result<ProcessResult, GenericError>;
72
73    /// Flush the encoder, finalizing all current payloads and sending them to the dispatcher.
74    ///
75    /// # Errors
76    ///
77    /// If the encoder cannot flush the payloads, an error is returned.
78    async fn flush(&mut self, dispatcher: &PayloadsDispatcher) -> Result<(), GenericError>;
79}