saluki_components/destinations/blackhole/
mod.rs

1use std::time::{Duration, Instant};
2
3use async_trait::async_trait;
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_core::components::{destinations::*, ComponentContext};
6use saluki_core::data_model::event::EventType;
7use saluki_error::GenericError;
8use tokio::select;
9use tracing::{debug, info};
10
11/// Blackhole destination.
12///
13/// Does nothing with the events it receives. It's useful for testing, providing both a valid destination implementation
14/// while also periodically emitting the number of events it has received.
15#[derive(Default)]
16pub struct BlackholeConfiguration;
17
18#[async_trait]
19impl DestinationBuilder for BlackholeConfiguration {
20    fn input_event_type(&self) -> EventType {
21        EventType::all_bits()
22    }
23
24    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Destination + Send>, GenericError> {
25        Ok(Box::new(Blackhole))
26    }
27}
28
29impl MemoryBounds for BlackholeConfiguration {
30    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
31        // Capture the size of the heap allocation when the component is built.
32        builder.minimum().with_single_value::<Blackhole>("blackhole");
33    }
34}
35
36struct Blackhole;
37
38#[async_trait]
39impl Destination for Blackhole {
40    async fn run(mut self: Box<Self>, mut context: DestinationContext) -> Result<(), GenericError> {
41        let mut health = context.take_health_handle();
42
43        let mut last_update = Instant::now();
44        let mut event_counter = 0;
45
46        health.mark_ready();
47        debug!("Blackhole destination started.");
48
49        loop {
50            select! {
51                _ = health.live() => continue,
52                result = context.events().next() => match result {
53                    Some(events) => {
54                        event_counter += events.len();
55
56                        if last_update.elapsed() > Duration::from_secs(1) {
57                            info!("Received {} events.", event_counter);
58                            last_update = Instant::now();
59                            event_counter = 0;
60                        }
61                    },
62                    None => break,
63                },
64            }
65        }
66
67        debug!("Blackhole destination stopped.");
68
69        Ok(())
70    }
71}