stele/traces/
stats.rs

1use std::{
2    collections::hash_map::Entry,
3    fmt,
4    hash::{Hash as _, Hasher as _},
5    num::ParseIntError,
6    str::FromStr,
7};
8
9use datadog_protos::traces::{self as proto, Trilean};
10use saluki_common::{collections::FastHashMap, hash::StableHasher};
11use saluki_error::{generic_error, GenericError};
12use serde::{Deserialize, Serialize};
13use serde_with::{DeserializeFromStr, SerializeDisplay};
14use stringtheory::MetaString;
15
16#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
17struct AgentMetadata {
18    hostname: MetaString,
19    env: MetaString,
20    agent_version: MetaString,
21    client_computed: bool,
22    split_payload: bool,
23}
24
25impl From<&proto::StatsPayload> for AgentMetadata {
26    fn from(payload: &proto::StatsPayload) -> Self {
27        Self {
28            hostname: (*payload.agentHostname).into(),
29            env: (*payload.agentEnv).into(),
30            agent_version: (*payload.agentVersion).into(),
31            client_computed: payload.clientComputed,
32            split_payload: payload.splitPayload,
33        }
34    }
35}
36
37#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
38struct TracerMetadata {
39    hostname: MetaString,
40    env: MetaString,
41    version: MetaString,
42    service: MetaString,
43    language: MetaString,
44    tracer_version: MetaString,
45    runtime_id: MetaString,
46    sequence: u64,
47    agent_aggregation: MetaString,
48    container_id: MetaString,
49    tags: Vec<MetaString>,
50    git_commit_sha: MetaString,
51    image_tag: MetaString,
52    process_tags_hash: u64,
53    process_tags: MetaString,
54}
55
56impl From<&proto::ClientStatsPayload> for TracerMetadata {
57    fn from(payload: &proto::ClientStatsPayload) -> Self {
58        Self {
59            hostname: (*payload.hostname).into(),
60            env: (*payload.env).into(),
61            version: (*payload.version).into(),
62            service: (*payload.service).into(),
63            language: (*payload.lang).into(),
64            tracer_version: (*payload.tracerVersion).into(),
65            runtime_id: (*payload.runtimeID).into(),
66            sequence: payload.sequence,
67            agent_aggregation: (*payload.agentAggregation).into(),
68            container_id: (*payload.containerID).into(),
69            tags: payload.tags.iter().map(|t| MetaString::from(&**t)).collect(),
70            git_commit_sha: (*payload.git_commit_sha).into(),
71            image_tag: (*payload.image_tag).into(),
72            process_tags_hash: payload.process_tags_hash,
73            process_tags: (*payload.process_tags).into(),
74        }
75    }
76}
77
78/// Time frame covered by a bucket.
79#[derive(Clone, Copy, Debug, DeserializeFromStr, Hash, Eq, PartialEq, SerializeDisplay)]
80pub struct BucketTimeframe {
81    /// Start time of the bucket, in nanoseconds.
82    pub start_time_ns: u64,
83
84    /// Width of the bucket, in nanoseconds.
85    pub duration_ns: u64,
86}
87
88impl fmt::Display for BucketTimeframe {
89    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90        write!(f, "[{},{}]", self.start_time_ns, self.duration_ns)
91    }
92}
93
94impl FromStr for BucketTimeframe {
95    type Err = String;
96
97    fn from_str(s: &str) -> Result<Self, Self::Err> {
98        let parts: Vec<&str> = s.trim_matches(['[', ']']).split(',').collect();
99        if parts.len() != 2 {
100            return Err(format!("expected two elements, found {}", parts.len()));
101        }
102        let start_time_ns = parts[0].parse::<u64>().map_err(|e| e.to_string())?;
103        let duration_ns = parts[1].parse::<u64>().map_err(|e| e.to_string())?;
104        Ok(Self {
105            start_time_ns,
106            duration_ns,
107        })
108    }
109}
110
111impl From<&proto::ClientStatsBucket> for BucketTimeframe {
112    fn from(bucket: &proto::ClientStatsBucket) -> Self {
113        Self {
114            start_time_ns: bucket.start,
115            duration_ns: bucket.duration,
116        }
117    }
118}
119
120/// Client statistics grouped by time frame.
121#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
122pub struct BucketedClientStatistics {
123    agent_metadata: AgentMetadata,
124    tracer_metadata: TracerMetadata,
125    buckets: FastHashMap<BucketTimeframe, ClientStatistics>,
126}
127
128impl BucketedClientStatistics {
129    /// Merge the given stats into the given bucket, creating the bucket if it does not already exist.
130    pub fn merge(&mut self, bucket_timeframe: BucketTimeframe, stats: ClientStatistics) -> Result<(), GenericError> {
131        match self.buckets.entry(bucket_timeframe) {
132            Entry::Occupied(mut entry) => {
133                let existing_stats = entry.get_mut();
134                existing_stats.merge(stats)
135            }
136            Entry::Vacant(entry) => {
137                entry.insert(stats);
138                Ok(())
139            }
140        }
141    }
142
143    /// Returns an iterator over each bucket.
144    pub fn buckets(&self) -> impl Iterator<Item = (&BucketTimeframe, &ClientStatistics)> {
145        self.buckets.iter()
146    }
147}
148
149/// Client statistics for a given span.
150#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
151pub struct ClientStatistics {
152    service: MetaString,
153    name: MetaString,
154    resource: MetaString,
155    type_: MetaString,
156    hits: u64,
157    errors: u64,
158    duration_ns: u64,
159    // TODO: decode these to native DDSketch
160    ok_summary: Vec<u8>,
161    // TODO: decode these to native DDSketch
162    error_summary: Vec<u8>,
163    synthetics: bool,
164    top_level_hits: u64,
165    span_kind: MetaString,
166    peer_tags: Vec<MetaString>,
167    is_trace_root: Option<bool>,
168    db_type: MetaString,
169    grpc_status_code: MetaString,
170    http_status_code: u32,
171    http_method: MetaString,
172    http_endpoint: MetaString,
173}
174
175impl ClientStatistics {
176    /// Merges `other` into `self`.
177    ///
178    /// If `other` does not have the same aggregation key as `self` (essentially: if any of the string fields differ),
179    /// an error is returned.
180    pub fn merge(&mut self, other: Self) -> Result<(), GenericError> {
181        // Check all "fixed" fields and ensure they're identical.
182        if self.service != other.service {
183            return Err(generic_error!("failed to merge client statistics: service mismatch"));
184        }
185        if self.name != other.name {
186            return Err(generic_error!("failed to merge client statistics: name mismatch"));
187        }
188        if self.resource != other.resource {
189            return Err(generic_error!("failed to merge client statistics: resource mismatch"));
190        }
191        if self.type_ != other.type_ {
192            return Err(generic_error!("failed to merge client statistics: type mismatch"));
193        }
194        if self.span_kind != other.span_kind {
195            return Err(generic_error!("failed to merge client statistics: span kind mismatch"));
196        }
197        if self.peer_tags != other.peer_tags {
198            return Err(generic_error!("failed to merge client statistics: peer tags mismatch"));
199        }
200        if self.db_type != other.db_type {
201            return Err(generic_error!("failed to merge client statistics: db type mismatch"));
202        }
203        if self.grpc_status_code != other.grpc_status_code {
204            return Err(generic_error!(
205                "failed to merge client statistics: grpc status code mismatch"
206            ));
207        }
208        if self.http_status_code != other.http_status_code {
209            return Err(generic_error!(
210                "failed to merge client statistics: http status code mismatch"
211            ));
212        }
213        if self.http_method != other.http_method {
214            return Err(generic_error!(
215                "failed to merge client statistics: http method mismatch"
216            ));
217        }
218        if self.http_endpoint != other.http_endpoint {
219            return Err(generic_error!(
220                "failed to merge client statistics: http endpoint mismatch"
221            ));
222        }
223
224        // Merge together the things we can actually merge.
225        self.hits += other.hits;
226        self.errors += other.errors;
227        self.duration_ns += other.duration_ns;
228        self.synthetics |= other.synthetics;
229        self.top_level_hits += other.top_level_hits;
230
231        // TODO: Handle decoding the DDSketch entries and merging them together logically.
232
233        Ok(())
234    }
235}
236
237impl From<&proto::ClientGroupedStats> for ClientStatistics {
238    fn from(payload: &proto::ClientGroupedStats) -> Self {
239        let is_trace_root = match payload.is_trace_root.enum_value() {
240            Ok(Trilean::NOT_SET) => None,
241            Ok(Trilean::TRUE) => Some(true),
242            Ok(Trilean::FALSE) => Some(false),
243            Err(_) => None,
244        };
245
246        Self {
247            service: (*payload.service).into(),
248            name: (*payload.name).into(),
249            resource: (*payload.resource).into(),
250            type_: (*payload.type_).into(),
251            hits: payload.hits,
252            errors: payload.errors,
253            duration_ns: payload.duration,
254            ok_summary: payload.okSummary.to_vec(),
255            error_summary: payload.errorSummary.to_vec(),
256            synthetics: payload.synthetics,
257            top_level_hits: payload.topLevelHits,
258            span_kind: (*payload.span_kind).into(),
259            peer_tags: payload.peer_tags.iter().map(|t| MetaString::from(&**t)).collect(),
260            is_trace_root,
261            db_type: (*payload.DB_type).into(),
262            grpc_status_code: (*payload.GRPC_status_code).into(),
263            http_status_code: payload.HTTP_status_code,
264            http_method: (*payload.HTTP_method).into(),
265            http_endpoint: (*payload.HTTP_endpoint).into(),
266        }
267    }
268}
269
270/// Aggregation key for client statistics.
271#[derive(Clone, Debug, DeserializeFromStr, Eq, Hash, PartialEq, SerializeDisplay)]
272pub struct AggregationKey(u64);
273
274impl fmt::Display for AggregationKey {
275    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
276        write!(f, "{}", self.0)
277    }
278}
279
280impl FromStr for AggregationKey {
281    type Err = ParseIntError;
282
283    fn from_str(s: &str) -> Result<Self, Self::Err> {
284        Ok(Self(s.parse()?))
285    }
286}
287
288impl From<&proto::ClientGroupedStats> for AggregationKey {
289    fn from(payload: &proto::ClientGroupedStats) -> Self {
290        // We manually hash the various fields to come up with our aggregation key.
291        //
292        // TODO: This follows the logic in `libdd-trace-stats` and it would be nice to eventually converge on using
293        // that code directly, but there's a number of changes that would need to be made upstream in order to make
294        // doing so possible.
295        let mut hasher = StableHasher::default();
296
297        payload.resource.hash(&mut hasher);
298        payload.service.hash(&mut hasher);
299        payload.name.hash(&mut hasher);
300        payload.type_.hash(&mut hasher);
301        payload.span_kind.hash(&mut hasher);
302        payload.HTTP_status_code.hash(&mut hasher);
303        payload.synthetics.hash(&mut hasher);
304
305        // TODO: technically, peer tags should only be included if they match the _configured_ peer keys to aggregate on
306        // so we're doing this wrong but we can iterate on it later
307        payload.peer_tags.hash(&mut hasher);
308
309        payload.is_trace_root.value().hash(&mut hasher);
310        payload.HTTP_method.hash(&mut hasher);
311        payload.HTTP_endpoint.hash(&mut hasher);
312
313        Self(hasher.finish())
314    }
315}
316
317/// Aggregator for client statistics.
318///
319/// Client statistics are aggregated by a number of fields that generate correspond to a specific span: service, name,
320/// and operation. Additional fields are used to further group the statistics, such as response codes and tags. This is
321/// referred to as the "aggregation key".
322#[derive(Clone, Debug, Default, Deserialize, Serialize)]
323pub struct ClientStatisticsAggregator {
324    groups: FastHashMap<AggregationKey, BucketedClientStatistics>,
325}
326
327impl ClientStatisticsAggregator {
328    /// Merges the given payload into the aggregator.
329    pub fn merge_payload(&mut self, payload: &proto::StatsPayload) -> Result<(), GenericError> {
330        let agent_metadata = AgentMetadata::from(payload);
331        for client_stats_payload in payload.stats() {
332            let tracer_metadata = TracerMetadata::from(client_stats_payload);
333
334            for stats_bucket in client_stats_payload.stats() {
335                let bucket_timeframe = BucketTimeframe::from(stats_bucket);
336
337                for grouped_stat in stats_bucket.stats() {
338                    let aggregation_key = AggregationKey::from(grouped_stat);
339                    let stats_group = self
340                        .groups
341                        .entry(aggregation_key)
342                        .or_insert_with(|| BucketedClientStatistics {
343                            agent_metadata: agent_metadata.clone(),
344                            tracer_metadata: tracer_metadata.clone(),
345                            buckets: FastHashMap::default(),
346                        });
347
348                    if stats_group.agent_metadata != agent_metadata {
349                        return Err(generic_error!("agent metadata mismatch"));
350                    }
351                    if stats_group.tracer_metadata != tracer_metadata {
352                        return Err(generic_error!("tracer metadata mismatch"));
353                    }
354
355                    stats_group.merge(bucket_timeframe, ClientStatistics::from(grouped_stat))?;
356                }
357            }
358        }
359
360        Ok(())
361    }
362
363    /// Returns a reference to the aggregated statistics groups.
364    ///
365    /// Groups are split by "aggregation key", which is a combination of select fields in each client stats payload,
366    /// roughly corresponding to a specific span: name, operation, kind, tags, and so on.
367    pub fn groups(&self) -> &FastHashMap<AggregationKey, BucketedClientStatistics> {
368        &self.groups
369    }
370}