saluki_components/destinations/blackhole/
mod.rs1use std::time::{Duration, Instant};
2
3use async_trait::async_trait;
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_core::components::{destinations::*, ComponentContext};
6use saluki_core::data_model::event::EventType;
7use saluki_error::GenericError;
8use tokio::select;
9use tracing::{debug, info};
10
11#[derive(Default)]
16pub struct BlackholeConfiguration;
17
18#[async_trait]
19impl DestinationBuilder for BlackholeConfiguration {
20 fn input_event_type(&self) -> EventType {
21 EventType::all_bits()
22 }
23
24 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Destination + Send>, GenericError> {
25 Ok(Box::new(Blackhole))
26 }
27}
28
29impl MemoryBounds for BlackholeConfiguration {
30 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
31 builder.minimum().with_single_value::<Blackhole>("blackhole");
33 }
34}
35
36struct Blackhole;
37
38#[async_trait]
39impl Destination for Blackhole {
40 async fn run(mut self: Box<Self>, mut context: DestinationContext) -> Result<(), GenericError> {
41 let mut health = context.take_health_handle();
42
43 let mut last_update = Instant::now();
44 let mut event_counter = 0;
45
46 health.mark_ready();
47 debug!("Blackhole destination started.");
48
49 loop {
50 select! {
51 _ = health.live() => continue,
52 result = context.events().next() => match result {
53 Some(events) => {
54 event_counter += events.len();
55
56 if last_update.elapsed() > Duration::from_secs(1) {
57 info!("Received {} events.", event_counter);
58 last_update = Instant::now();
59 event_counter = 0;
60 }
61 },
62 None => break,
63 },
64 }
65 }
66
67 debug!("Blackhole destination stopped.");
68
69 Ok(())
70 }
71}