stele/traces/
stats.rs

1use std::{
2    collections::hash_map::Entry,
3    fmt,
4    hash::{Hash as _, Hasher as _},
5    num::ParseIntError,
6    str::FromStr,
7};
8
9use base64::Engine as _;
10use datadog_protos::{
11    sketches::DDSketch as ProtoDDSketch,
12    traces::{self as proto, Trilean},
13};
14use ddsketch::canonical::{mapping::LogarithmicMapping, DDSketch};
15use protobuf::Message as _;
16use saluki_common::{collections::FastHashMap, hash::StableHasher};
17use saluki_error::{generic_error, ErrorContext as _, GenericError};
18use serde::{Deserialize, Serialize};
19use serde_with::{DeserializeFromStr, SerializeDisplay};
20use stringtheory::MetaString;
21
22/// An ergonomic wrapper around `DDSketch` for APM stats.
23///
24/// This wrapper holds an APM stats-specific `DDSketch`, which ensures the relevant DDSketch parameters (relative
25/// accuracy, max bins, etc) are matched to that of the DDSketch configuration used in the Trace Agent. Further, this
26/// wrapper provides ergonomic conversions to/from a string representation based on a base64-encoded wrapper around the
27/// canonical Protobuf representation.
28///
29/// This makes it safe to transport over JSON for use in `datadog-intake` and `ground-truth`.
30#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
31#[serde(try_from = "String", into = "String")]
32pub struct EncodedApmStatsDDSketch {
33    inner: DDSketch,
34}
35
36impl EncodedApmStatsDDSketch {
37    /// Returns a reference to the inner `DDSketch`.
38    pub fn inner(&self) -> &DDSketch {
39        &self.inner
40    }
41
42    /// Returns a mutable reference to the inner `DDSketch`.
43    pub fn inner_mut(&mut self) -> &mut DDSketch {
44        &mut self.inner
45    }
46}
47
48impl TryFrom<ProtoDDSketch> for EncodedApmStatsDDSketch {
49    type Error = GenericError;
50
51    fn try_from(value: ProtoDDSketch) -> Result<Self, Self::Error> {
52        // TODO: It'd be good to avoid having to reconstitute the mapping manually, somehow.
53        let mapping = LogarithmicMapping::new(0.01).expect("Relative accuracy should be valid (0 <= 0.01 <= 1)");
54        let inner = DDSketch::from_proto(&value, mapping)
55            .error_context("Failed to convert DDSketch from canonical Protocol Buffers representation.")?;
56
57        Ok(Self { inner })
58    }
59}
60
61impl TryFrom<String> for EncodedApmStatsDDSketch {
62    type Error = GenericError;
63
64    fn try_from(value: String) -> Result<Self, Self::Error> {
65        // We expect the sketch to be encoded in its canonical Protocol Buffers form, additionally wrapped
66        // in a base64 encoding to make it safe for JSON transport.
67        let sketch_raw_proto = base64::engine::general_purpose::STANDARD
68            .decode(value)
69            .error_context("Failed to decode base64-encoded DDSketch.")?;
70
71        let sketch_proto = ProtoDDSketch::parse_from_bytes(&sketch_raw_proto[..])
72            .error_context("Failed to decode DDSketch from canonical Protocol Buffers representation.")?;
73
74        Self::try_from(sketch_proto)
75    }
76}
77
78impl From<EncodedApmStatsDDSketch> for ProtoDDSketch {
79    fn from(value: EncodedApmStatsDDSketch) -> Self {
80        value.inner.to_proto()
81    }
82}
83
84impl From<EncodedApmStatsDDSketch> for String {
85    fn from(value: EncodedApmStatsDDSketch) -> Self {
86        let sketch_proto = ProtoDDSketch::from(value);
87        let sketch_raw_proto = sketch_proto
88            .write_to_bytes()
89            .expect("Should not fail to encode DDSketch to serialized Protocol Buffers representation.");
90
91        base64::engine::general_purpose::STANDARD.encode(sketch_raw_proto)
92    }
93}
94
95#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
96struct AgentMetadata {
97    hostname: MetaString,
98    env: MetaString,
99    agent_version: MetaString,
100    client_computed: bool,
101    split_payload: bool,
102}
103
104impl From<&proto::StatsPayload> for AgentMetadata {
105    fn from(payload: &proto::StatsPayload) -> Self {
106        Self {
107            hostname: (*payload.agentHostname).into(),
108            env: (*payload.agentEnv).into(),
109            agent_version: (*payload.agentVersion).into(),
110            client_computed: payload.clientComputed,
111            split_payload: payload.splitPayload,
112        }
113    }
114}
115
116#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
117struct TracerMetadata {
118    hostname: MetaString,
119    env: MetaString,
120    version: MetaString,
121    service: MetaString,
122    language: MetaString,
123    tracer_version: MetaString,
124    runtime_id: MetaString,
125    sequence: u64,
126    agent_aggregation: MetaString,
127    container_id: MetaString,
128    tags: Vec<MetaString>,
129    git_commit_sha: MetaString,
130    image_tag: MetaString,
131    process_tags_hash: u64,
132    process_tags: MetaString,
133}
134
135impl From<&proto::ClientStatsPayload> for TracerMetadata {
136    fn from(payload: &proto::ClientStatsPayload) -> Self {
137        Self {
138            hostname: (*payload.hostname).into(),
139            env: (*payload.env).into(),
140            version: (*payload.version).into(),
141            service: (*payload.service).into(),
142            language: (*payload.lang).into(),
143            tracer_version: (*payload.tracerVersion).into(),
144            runtime_id: (*payload.runtimeID).into(),
145            sequence: payload.sequence,
146            agent_aggregation: (*payload.agentAggregation).into(),
147            container_id: (*payload.containerID).into(),
148            tags: payload.tags.iter().map(|t| MetaString::from(&**t)).collect(),
149            git_commit_sha: (*payload.git_commit_sha).into(),
150            image_tag: (*payload.image_tag).into(),
151            process_tags_hash: payload.process_tags_hash,
152            process_tags: (*payload.process_tags).into(),
153        }
154    }
155}
156
157/// Time frame covered by a bucket.
158#[derive(Clone, Copy, Debug, DeserializeFromStr, Hash, Eq, PartialEq, SerializeDisplay)]
159pub struct BucketTimeframe {
160    /// Start time of the bucket, in nanoseconds.
161    pub start_time_ns: u64,
162
163    /// Width of the bucket, in nanoseconds.
164    pub duration_ns: u64,
165}
166
167impl fmt::Display for BucketTimeframe {
168    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169        write!(f, "[{},{}]", self.start_time_ns, self.duration_ns)
170    }
171}
172
173impl FromStr for BucketTimeframe {
174    type Err = String;
175
176    fn from_str(s: &str) -> Result<Self, Self::Err> {
177        let parts: Vec<&str> = s.trim_matches(['[', ']']).split(',').collect();
178        if parts.len() != 2 {
179            return Err(format!("expected two elements, found {}", parts.len()));
180        }
181        let start_time_ns = parts[0].parse::<u64>().map_err(|e| e.to_string())?;
182        let duration_ns = parts[1].parse::<u64>().map_err(|e| e.to_string())?;
183        Ok(Self {
184            start_time_ns,
185            duration_ns,
186        })
187    }
188}
189
190impl From<&proto::ClientStatsBucket> for BucketTimeframe {
191    fn from(bucket: &proto::ClientStatsBucket) -> Self {
192        Self {
193            start_time_ns: bucket.start,
194            duration_ns: bucket.duration,
195        }
196    }
197}
198
199/// Client statistics grouped by time frame.
200#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
201pub struct BucketedClientStatistics {
202    agent_metadata: AgentMetadata,
203    tracer_metadata: TracerMetadata,
204    buckets: FastHashMap<BucketTimeframe, ClientStatistics>,
205}
206
207impl BucketedClientStatistics {
208    /// Merge the given stats into the given bucket, creating the bucket if it does not already exist.
209    pub fn merge(&mut self, bucket_timeframe: BucketTimeframe, stats: ClientStatistics) -> Result<(), GenericError> {
210        match self.buckets.entry(bucket_timeframe) {
211            Entry::Occupied(mut entry) => {
212                let existing_stats = entry.get_mut();
213                existing_stats.merge(stats)
214            }
215            Entry::Vacant(entry) => {
216                entry.insert(stats);
217                Ok(())
218            }
219        }
220    }
221
222    /// Returns a merged `ClientStatistics` representing all buckets.
223    ///
224    /// If no buckets are present, `Ok(None)` is returned. Otherwise, `Ok(Some)` is returned containing the merged
225    /// client statistics.
226    ///
227    /// # Errors
228    ///
229    /// If the buckets cannot be merged for any reason, an error is returned.
230    pub fn merged(&self) -> Result<Option<ClientStatistics>, GenericError> {
231        let mut bucket_stats = self.buckets.values().cloned();
232
233        let mut merged_stats = match bucket_stats.next() {
234            Some(bucket) => bucket,
235            None => return Ok(None),
236        };
237
238        for other in bucket_stats {
239            merged_stats.merge(other)?;
240        }
241
242        Ok(Some(merged_stats))
243    }
244
245    /// Returns an iterator over each bucket.
246    pub fn buckets(&self) -> impl Iterator<Item = (&BucketTimeframe, &ClientStatistics)> {
247        self.buckets.iter()
248    }
249}
250
251/// Client statistics for a given span.
252#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
253pub struct ClientStatistics {
254    service: MetaString,
255    name: MetaString,
256    resource: MetaString,
257    type_: MetaString,
258    hits: u64,
259    errors: u64,
260    duration_ns: u64,
261    ok_summary: EncodedApmStatsDDSketch,
262    error_summary: EncodedApmStatsDDSketch,
263    synthetics: bool,
264    top_level_hits: u64,
265    span_kind: MetaString,
266    peer_tags: Vec<MetaString>,
267    is_trace_root: Option<bool>,
268    db_type: MetaString,
269    grpc_status_code: MetaString,
270    http_status_code: u32,
271    http_method: MetaString,
272    http_endpoint: MetaString,
273}
274
275impl ClientStatistics {
276    /// Merges `other` into `self`.
277    ///
278    /// If `other` does not have the same aggregation key as `self` (essentially: if any of the string fields differ),
279    /// an error is returned.
280    pub fn merge(&mut self, other: Self) -> Result<(), GenericError> {
281        // Check all "fixed" fields and ensure they're identical.
282        if self.service != other.service {
283            return Err(generic_error!("failed to merge client statistics: service mismatch"));
284        }
285        if self.name != other.name {
286            return Err(generic_error!("failed to merge client statistics: name mismatch"));
287        }
288        if self.resource != other.resource {
289            return Err(generic_error!("failed to merge client statistics: resource mismatch"));
290        }
291        if self.type_ != other.type_ {
292            return Err(generic_error!("failed to merge client statistics: type mismatch"));
293        }
294        if self.span_kind != other.span_kind {
295            return Err(generic_error!("failed to merge client statistics: span kind mismatch"));
296        }
297        if self.peer_tags != other.peer_tags {
298            return Err(generic_error!("failed to merge client statistics: peer tags mismatch"));
299        }
300        if self.db_type != other.db_type {
301            return Err(generic_error!("failed to merge client statistics: db type mismatch"));
302        }
303        if self.grpc_status_code != other.grpc_status_code {
304            return Err(generic_error!(
305                "failed to merge client statistics: grpc status code mismatch"
306            ));
307        }
308        if self.http_status_code != other.http_status_code {
309            return Err(generic_error!(
310                "failed to merge client statistics: http status code mismatch"
311            ));
312        }
313        if self.http_method != other.http_method {
314            return Err(generic_error!(
315                "failed to merge client statistics: http method mismatch"
316            ));
317        }
318        if self.http_endpoint != other.http_endpoint {
319            return Err(generic_error!(
320                "failed to merge client statistics: http endpoint mismatch"
321            ));
322        }
323
324        // Merge together the things we can actually merge.
325        self.hits += other.hits;
326        self.errors += other.errors;
327        self.duration_ns += other.duration_ns;
328        self.synthetics |= other.synthetics;
329        self.top_level_hits += other.top_level_hits;
330
331        self.ok_summary.inner_mut().merge(other.ok_summary.inner());
332        self.error_summary.inner_mut().merge(other.error_summary.inner());
333
334        Ok(())
335    }
336}
337
338impl From<&proto::ClientGroupedStats> for ClientStatistics {
339    fn from(payload: &proto::ClientGroupedStats) -> Self {
340        let is_trace_root = match payload.is_trace_root.enum_value() {
341            Ok(Trilean::NOT_SET) => None,
342            Ok(Trilean::TRUE) => Some(true),
343            Ok(Trilean::FALSE) => Some(false),
344            Err(_) => None,
345        };
346
347        // Decode the DDSketch entries from their canonial Protocol Buffers representation.
348        let ok_summary_proto = ProtoDDSketch::parse_from_bytes(payload.okSummary())
349            .expect("Should not fail to decode DDSketch from serialized Protocol Buffers representation.");
350        let ok_summary = EncodedApmStatsDDSketch::try_from(ok_summary_proto)
351            .expect("Should not fail to convert DDSketch from canonical Protocol Buffers representation.");
352        let error_summary_proto = ProtoDDSketch::parse_from_bytes(payload.errorSummary())
353            .expect("Should not fail to decode DDSketch from serialized Protocol Buffers representation.");
354        let error_summary = EncodedApmStatsDDSketch::try_from(error_summary_proto)
355            .expect("Should not fail to convert DDSketch from canonical Protocol Buffers representation.");
356
357        Self {
358            service: (*payload.service).into(),
359            name: (*payload.name).into(),
360            resource: (*payload.resource).into(),
361            type_: (*payload.type_).into(),
362            hits: payload.hits,
363            errors: payload.errors,
364            duration_ns: payload.duration,
365            ok_summary,
366            error_summary,
367            synthetics: payload.synthetics,
368            top_level_hits: payload.topLevelHits,
369            span_kind: (*payload.span_kind).into(),
370            peer_tags: payload.peer_tags.iter().map(|t| MetaString::from(&**t)).collect(),
371            is_trace_root,
372            db_type: (*payload.DB_type).into(),
373            grpc_status_code: (*payload.GRPC_status_code).into(),
374            http_status_code: payload.HTTP_status_code,
375            http_method: (*payload.HTTP_method).into(),
376            http_endpoint: (*payload.HTTP_endpoint).into(),
377        }
378    }
379}
380
381/// Aggregation key for client statistics.
382#[derive(Clone, Debug, DeserializeFromStr, Eq, Hash, PartialEq, SerializeDisplay)]
383pub struct AggregationKey(u64);
384
385impl fmt::Display for AggregationKey {
386    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
387        write!(f, "{}", self.0)
388    }
389}
390
391impl FromStr for AggregationKey {
392    type Err = ParseIntError;
393
394    fn from_str(s: &str) -> Result<Self, Self::Err> {
395        Ok(Self(s.parse()?))
396    }
397}
398
399impl From<&proto::ClientGroupedStats> for AggregationKey {
400    fn from(payload: &proto::ClientGroupedStats) -> Self {
401        // We manually hash the various fields to come up with our aggregation key.
402        //
403        // TODO: This follows the logic in `libdd-trace-stats` and it would be nice to eventually converge on using
404        // that code directly, but there's a number of changes that would need to be made upstream in order to make
405        // doing so possible.
406        let mut hasher = StableHasher::default();
407
408        payload.resource.hash(&mut hasher);
409        payload.service.hash(&mut hasher);
410        payload.name.hash(&mut hasher);
411        payload.type_.hash(&mut hasher);
412        payload.span_kind.hash(&mut hasher);
413        payload.HTTP_status_code.hash(&mut hasher);
414        payload.synthetics.hash(&mut hasher);
415
416        // TODO: technically, peer tags should only be included if they match the _configured_ peer keys to aggregate on
417        // so we're doing this wrong but we can iterate on it later
418        payload.peer_tags.hash(&mut hasher);
419
420        payload.is_trace_root.value().hash(&mut hasher);
421        payload.HTTP_method.hash(&mut hasher);
422        payload.HTTP_endpoint.hash(&mut hasher);
423
424        Self(hasher.finish())
425    }
426}
427
428/// Aggregator for client statistics.
429///
430/// Client statistics are aggregated by a number of fields that generate correspond to a specific span: service, name,
431/// and operation. Additional fields are used to further group the statistics, such as response codes and tags. This is
432/// referred to as the "aggregation key".
433#[derive(Clone, Debug, Default, Deserialize, Serialize)]
434pub struct ClientStatisticsAggregator {
435    groups: FastHashMap<AggregationKey, BucketedClientStatistics>,
436}
437
438impl ClientStatisticsAggregator {
439    /// Merges the given payload into the aggregator.
440    pub fn merge_payload(&mut self, payload: &proto::StatsPayload) -> Result<(), GenericError> {
441        let agent_metadata = AgentMetadata::from(payload);
442        for client_stats_payload in payload.stats() {
443            let tracer_metadata = TracerMetadata::from(client_stats_payload);
444
445            for stats_bucket in client_stats_payload.stats() {
446                let bucket_timeframe = BucketTimeframe::from(stats_bucket);
447
448                for grouped_stat in stats_bucket.stats() {
449                    let aggregation_key = AggregationKey::from(grouped_stat);
450                    let stats_group = self
451                        .groups
452                        .entry(aggregation_key)
453                        .or_insert_with(|| BucketedClientStatistics {
454                            agent_metadata: agent_metadata.clone(),
455                            tracer_metadata: tracer_metadata.clone(),
456                            buckets: FastHashMap::default(),
457                        });
458
459                    if stats_group.agent_metadata != agent_metadata {
460                        return Err(generic_error!("agent metadata mismatch"));
461                    }
462                    if stats_group.tracer_metadata != tracer_metadata {
463                        return Err(generic_error!("tracer metadata mismatch"));
464                    }
465
466                    stats_group.merge(bucket_timeframe, ClientStatistics::from(grouped_stat))?;
467                }
468            }
469        }
470
471        Ok(())
472    }
473
474    /// Returns a reference to the aggregated statistics groups.
475    ///
476    /// Groups are split by "aggregation key", which is a combination of select fields in each client stats payload,
477    /// roughly corresponding to a specific span: name, operation, kind, tags, and so on.
478    pub fn groups(&self) -> &FastHashMap<AggregationKey, BucketedClientStatistics> {
479        &self.groups
480    }
481}