Skip to main content

stele/traces/
stats.rs

1// serde_with's DeserializeFromStr macro generates code that calls core::str::from_utf8 internally,
2// which we otherwise disallow in favor of simdutf8::basic::from_utf8.
3#![allow(clippy::disallowed_methods)]
4
5use std::{
6    collections::hash_map::Entry,
7    fmt,
8    hash::{Hash as _, Hasher as _},
9    num::ParseIntError,
10    str::FromStr,
11};
12
13use base64::Engine as _;
14use datadog_protos::{
15    sketches::DDSketch as ProtoDDSketch,
16    traces::{self as proto, Trilean},
17};
18use ddsketch::canonical::{mapping::LogarithmicMapping, DDSketch};
19use protobuf::Message as _;
20use saluki_common::{collections::FastHashMap, hash::StableHasher};
21use saluki_error::{generic_error, ErrorContext as _, GenericError};
22use serde::{Deserialize, Serialize};
23use serde_with::{DeserializeFromStr, SerializeDisplay};
24use stringtheory::MetaString;
25
26/// An ergonomic wrapper around `DDSketch` for APM stats.
27///
28/// This wrapper holds an APM stats-specific `DDSketch`, which ensures the relevant DDSketch parameters (relative
29/// accuracy, max bins, etc) are matched to that of the DDSketch configuration used in the Trace Agent. Further, this
30/// wrapper provides ergonomic conversions to/from a string representation based on a base64-encoded wrapper around the
31/// canonical Protobuf representation.
32///
33/// This makes it safe to transport over JSON for use in `datadog-intake` and `panoramic`.
34#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
35#[serde(try_from = "String", into = "String")]
36pub struct EncodedApmStatsDDSketch {
37    inner: DDSketch,
38}
39
40impl EncodedApmStatsDDSketch {
41    /// Returns a reference to the inner `DDSketch`.
42    pub fn inner(&self) -> &DDSketch {
43        &self.inner
44    }
45
46    /// Returns a mutable reference to the inner `DDSketch`.
47    pub fn inner_mut(&mut self) -> &mut DDSketch {
48        &mut self.inner
49    }
50}
51
52impl TryFrom<ProtoDDSketch> for EncodedApmStatsDDSketch {
53    type Error = GenericError;
54
55    fn try_from(value: ProtoDDSketch) -> Result<Self, Self::Error> {
56        // TODO: It'd be good to avoid having to reconstitute the mapping manually, somehow.
57        let mapping = LogarithmicMapping::new(0.01).expect("Relative accuracy should be valid (0 <= 0.01 <= 1)");
58        let inner = DDSketch::from_proto(&value, mapping)
59            .error_context("Failed to convert DDSketch from canonical Protocol Buffers representation.")?;
60
61        Ok(Self { inner })
62    }
63}
64
65impl TryFrom<String> for EncodedApmStatsDDSketch {
66    type Error = GenericError;
67
68    fn try_from(value: String) -> Result<Self, Self::Error> {
69        // We expect the sketch to be encoded in its canonical Protocol Buffers form, additionally wrapped
70        // in a base64 encoding to make it safe for JSON transport.
71        let sketch_raw_proto = base64::engine::general_purpose::STANDARD
72            .decode(value)
73            .error_context("Failed to decode base64-encoded DDSketch.")?;
74
75        let sketch_proto = ProtoDDSketch::parse_from_bytes(&sketch_raw_proto[..])
76            .error_context("Failed to decode DDSketch from canonical Protocol Buffers representation.")?;
77
78        Self::try_from(sketch_proto)
79    }
80}
81
82impl From<EncodedApmStatsDDSketch> for ProtoDDSketch {
83    fn from(value: EncodedApmStatsDDSketch) -> Self {
84        value.inner.to_proto()
85    }
86}
87
88impl From<EncodedApmStatsDDSketch> for String {
89    fn from(value: EncodedApmStatsDDSketch) -> Self {
90        let sketch_proto = ProtoDDSketch::from(value);
91        let sketch_raw_proto = sketch_proto
92            .write_to_bytes()
93            .expect("Should not fail to encode DDSketch to serialized Protocol Buffers representation.");
94
95        base64::engine::general_purpose::STANDARD.encode(sketch_raw_proto)
96    }
97}
98
99#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
100struct AgentMetadata {
101    hostname: MetaString,
102    env: MetaString,
103    agent_version: MetaString,
104    client_computed: bool,
105    split_payload: bool,
106}
107
108impl From<&proto::StatsPayload> for AgentMetadata {
109    fn from(payload: &proto::StatsPayload) -> Self {
110        Self {
111            hostname: (*payload.agentHostname).into(),
112            env: (*payload.agentEnv).into(),
113            agent_version: (*payload.agentVersion).into(),
114            client_computed: payload.clientComputed,
115            split_payload: payload.splitPayload,
116        }
117    }
118}
119
120#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
121struct TracerMetadata {
122    hostname: MetaString,
123    env: MetaString,
124    version: MetaString,
125    service: MetaString,
126    language: MetaString,
127    tracer_version: MetaString,
128    runtime_id: MetaString,
129    sequence: u64,
130    agent_aggregation: MetaString,
131    container_id: MetaString,
132    tags: Vec<MetaString>,
133    git_commit_sha: MetaString,
134    image_tag: MetaString,
135    process_tags_hash: u64,
136    process_tags: MetaString,
137}
138
139impl From<&proto::ClientStatsPayload> for TracerMetadata {
140    fn from(payload: &proto::ClientStatsPayload) -> Self {
141        Self {
142            hostname: (*payload.hostname).into(),
143            env: (*payload.env).into(),
144            version: (*payload.version).into(),
145            service: (*payload.service).into(),
146            language: (*payload.lang).into(),
147            tracer_version: (*payload.tracerVersion).into(),
148            runtime_id: (*payload.runtimeID).into(),
149            sequence: payload.sequence,
150            agent_aggregation: (*payload.agentAggregation).into(),
151            container_id: (*payload.containerID).into(),
152            tags: payload.tags.iter().map(|t| MetaString::from(&**t)).collect(),
153            git_commit_sha: (*payload.git_commit_sha).into(),
154            image_tag: (*payload.image_tag).into(),
155            process_tags_hash: payload.process_tags_hash,
156            process_tags: (*payload.process_tags).into(),
157        }
158    }
159}
160
161/// Time frame covered by a bucket.
162#[derive(Clone, Copy, Debug, DeserializeFromStr, Hash, Eq, PartialEq, SerializeDisplay)]
163pub struct BucketTimeframe {
164    /// Start time of the bucket, in nanoseconds.
165    pub start_time_ns: u64,
166
167    /// Width of the bucket, in nanoseconds.
168    pub duration_ns: u64,
169}
170
171impl fmt::Display for BucketTimeframe {
172    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173        write!(f, "[{},{}]", self.start_time_ns, self.duration_ns)
174    }
175}
176
177impl FromStr for BucketTimeframe {
178    type Err = String;
179
180    fn from_str(s: &str) -> Result<Self, Self::Err> {
181        let parts: Vec<&str> = s.trim_matches(['[', ']']).split(',').collect();
182        if parts.len() != 2 {
183            return Err(format!("expected two elements, found {}", parts.len()));
184        }
185        let start_time_ns = parts[0].parse::<u64>().map_err(|e| e.to_string())?;
186        let duration_ns = parts[1].parse::<u64>().map_err(|e| e.to_string())?;
187        Ok(Self {
188            start_time_ns,
189            duration_ns,
190        })
191    }
192}
193
194impl From<&proto::ClientStatsBucket> for BucketTimeframe {
195    fn from(bucket: &proto::ClientStatsBucket) -> Self {
196        Self {
197            start_time_ns: bucket.start,
198            duration_ns: bucket.duration,
199        }
200    }
201}
202
203/// Client statistics grouped by time frame.
204#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
205pub struct BucketedClientStatistics {
206    agent_metadata: AgentMetadata,
207    tracer_metadata: TracerMetadata,
208    buckets: FastHashMap<BucketTimeframe, ClientStatistics>,
209}
210
211impl BucketedClientStatistics {
212    /// Merge the given stats into the given bucket, creating the bucket if it doesn't already exist.
213    pub fn merge(&mut self, bucket_timeframe: BucketTimeframe, stats: ClientStatistics) -> Result<(), GenericError> {
214        match self.buckets.entry(bucket_timeframe) {
215            Entry::Occupied(mut entry) => {
216                let existing_stats = entry.get_mut();
217                existing_stats.merge(stats)
218            }
219            Entry::Vacant(entry) => {
220                entry.insert(stats);
221                Ok(())
222            }
223        }
224    }
225
226    /// Returns a merged `ClientStatistics` representing all buckets.
227    ///
228    /// If no buckets are present, `Ok(None)` is returned. Otherwise, `Ok(Some)` is returned containing the merged
229    /// client statistics.
230    ///
231    /// # Errors
232    ///
233    /// If the buckets can't be merged for any reason, an error is returned.
234    pub fn merged(&self) -> Result<Option<ClientStatistics>, GenericError> {
235        let mut bucket_stats = self.buckets.values().cloned();
236
237        let mut merged_stats = match bucket_stats.next() {
238            Some(bucket) => bucket,
239            None => return Ok(None),
240        };
241
242        for other in bucket_stats {
243            merged_stats.merge(other)?;
244        }
245
246        Ok(Some(merged_stats))
247    }
248
249    /// Returns an iterator over each bucket.
250    pub fn buckets(&self) -> impl Iterator<Item = (&BucketTimeframe, &ClientStatistics)> {
251        self.buckets.iter()
252    }
253}
254
255/// Client statistics for a given span.
256#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
257pub struct ClientStatistics {
258    service: MetaString,
259    name: MetaString,
260    resource: MetaString,
261    type_: MetaString,
262    hits: u64,
263    errors: u64,
264    duration_ns: u64,
265    ok_summary: EncodedApmStatsDDSketch,
266    error_summary: EncodedApmStatsDDSketch,
267    synthetics: bool,
268    top_level_hits: u64,
269    span_kind: MetaString,
270    peer_tags: Vec<MetaString>,
271    is_trace_root: Option<bool>,
272    db_type: MetaString,
273    grpc_status_code: MetaString,
274    http_status_code: u32,
275    http_method: MetaString,
276    http_endpoint: MetaString,
277}
278
279impl ClientStatistics {
280    /// Merges `other` into `self`.
281    ///
282    /// If `other` doesn't have the same aggregation key as `self` (essentially: if any of the string fields differ),
283    /// an error is returned.
284    pub fn merge(&mut self, other: Self) -> Result<(), GenericError> {
285        // Check all "fixed" fields and ensure they're identical.
286        if self.service != other.service {
287            return Err(generic_error!("failed to merge client statistics: service mismatch"));
288        }
289        if self.name != other.name {
290            return Err(generic_error!("failed to merge client statistics: name mismatch"));
291        }
292        if self.resource != other.resource {
293            return Err(generic_error!("failed to merge client statistics: resource mismatch"));
294        }
295        if self.type_ != other.type_ {
296            return Err(generic_error!("failed to merge client statistics: type mismatch"));
297        }
298        if self.span_kind != other.span_kind {
299            return Err(generic_error!("failed to merge client statistics: span kind mismatch"));
300        }
301        if self.peer_tags != other.peer_tags {
302            return Err(generic_error!("failed to merge client statistics: peer tags mismatch"));
303        }
304        if self.db_type != other.db_type {
305            return Err(generic_error!("failed to merge client statistics: db type mismatch"));
306        }
307        if self.grpc_status_code != other.grpc_status_code {
308            return Err(generic_error!(
309                "failed to merge client statistics: grpc status code mismatch"
310            ));
311        }
312        if self.http_status_code != other.http_status_code {
313            return Err(generic_error!(
314                "failed to merge client statistics: http status code mismatch"
315            ));
316        }
317        if self.http_method != other.http_method {
318            return Err(generic_error!(
319                "failed to merge client statistics: http method mismatch"
320            ));
321        }
322        if self.http_endpoint != other.http_endpoint {
323            return Err(generic_error!(
324                "failed to merge client statistics: http endpoint mismatch"
325            ));
326        }
327
328        // Merge together the things we can actually merge.
329        self.hits += other.hits;
330        self.errors += other.errors;
331        self.duration_ns += other.duration_ns;
332        self.synthetics |= other.synthetics;
333        self.top_level_hits += other.top_level_hits;
334
335        self.ok_summary.inner_mut().merge(other.ok_summary.inner());
336        self.error_summary.inner_mut().merge(other.error_summary.inner());
337
338        Ok(())
339    }
340}
341
342impl From<&proto::ClientGroupedStats> for ClientStatistics {
343    fn from(payload: &proto::ClientGroupedStats) -> Self {
344        let is_trace_root = match payload.is_trace_root.enum_value() {
345            Ok(Trilean::NOT_SET) => None,
346            Ok(Trilean::TRUE) => Some(true),
347            Ok(Trilean::FALSE) => Some(false),
348            Err(_) => None,
349        };
350
351        // Decode the DDSketch entries from their canonial Protocol Buffers representation.
352        let ok_summary_proto = ProtoDDSketch::parse_from_bytes(payload.okSummary())
353            .expect("Should not fail to decode DDSketch from serialized Protocol Buffers representation.");
354        let ok_summary = EncodedApmStatsDDSketch::try_from(ok_summary_proto)
355            .expect("Should not fail to convert DDSketch from canonical Protocol Buffers representation.");
356        let error_summary_proto = ProtoDDSketch::parse_from_bytes(payload.errorSummary())
357            .expect("Should not fail to decode DDSketch from serialized Protocol Buffers representation.");
358        let error_summary = EncodedApmStatsDDSketch::try_from(error_summary_proto)
359            .expect("Should not fail to convert DDSketch from canonical Protocol Buffers representation.");
360
361        Self {
362            service: (*payload.service).into(),
363            name: (*payload.name).into(),
364            resource: (*payload.resource).into(),
365            type_: (*payload.type_).into(),
366            hits: payload.hits,
367            errors: payload.errors,
368            duration_ns: payload.duration,
369            ok_summary,
370            error_summary,
371            synthetics: payload.synthetics,
372            top_level_hits: payload.topLevelHits,
373            span_kind: (*payload.span_kind).into(),
374            peer_tags: payload.peer_tags.iter().map(|t| MetaString::from(&**t)).collect(),
375            is_trace_root,
376            db_type: (*payload.DB_type).into(),
377            grpc_status_code: (*payload.GRPC_status_code).into(),
378            http_status_code: payload.HTTP_status_code,
379            http_method: (*payload.HTTP_method).into(),
380            http_endpoint: (*payload.HTTP_endpoint).into(),
381        }
382    }
383}
384
385/// Aggregation key for client statistics.
386#[derive(Clone, Debug, DeserializeFromStr, Eq, Hash, PartialEq, SerializeDisplay)]
387pub struct AggregationKey(u64);
388
389impl fmt::Display for AggregationKey {
390    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
391        write!(f, "{}", self.0)
392    }
393}
394
395impl FromStr for AggregationKey {
396    type Err = ParseIntError;
397
398    fn from_str(s: &str) -> Result<Self, Self::Err> {
399        Ok(Self(s.parse()?))
400    }
401}
402
403impl From<&proto::ClientGroupedStats> for AggregationKey {
404    fn from(payload: &proto::ClientGroupedStats) -> Self {
405        // We manually hash the various fields to come up with our aggregation key.
406        //
407        // TODO: This follows the logic in `libdd-trace-stats` and it would be nice to eventually converge on using
408        // that code directly, but there's a number of changes that would need to be made upstream in order to make
409        // doing so possible.
410        let mut hasher = StableHasher::default();
411
412        payload.resource.hash(&mut hasher);
413        payload.service.hash(&mut hasher);
414        payload.name.hash(&mut hasher);
415        payload.type_.hash(&mut hasher);
416        payload.span_kind.hash(&mut hasher);
417        payload.HTTP_status_code.hash(&mut hasher);
418        payload.synthetics.hash(&mut hasher);
419
420        // TODO: technically, peer tags should only be included if they match the _configured_ peer keys to aggregate on
421        // so we're doing this wrong but we can iterate on it later
422        payload.peer_tags.hash(&mut hasher);
423
424        payload.is_trace_root.value().hash(&mut hasher);
425        payload.HTTP_method.hash(&mut hasher);
426        payload.HTTP_endpoint.hash(&mut hasher);
427
428        Self(hasher.finish())
429    }
430}
431
432/// Aggregator for client statistics.
433///
434/// Client statistics are represented by generating an "aggregation key" for a given span, and then grouping statistics
435/// by that key. The aggregation key includes fields like service, span name, span operation, response code, and more.
436/// This is meant to group like spans together,
437#[derive(Clone, Debug, Default, Deserialize, Serialize)]
438pub struct ClientStatisticsAggregator {
439    groups: FastHashMap<AggregationKey, BucketedClientStatistics>,
440}
441
442impl ClientStatisticsAggregator {
443    /// Merges the given payload into the aggregator.
444    pub fn merge_payload(&mut self, payload: &proto::StatsPayload) -> Result<(), GenericError> {
445        let agent_metadata = AgentMetadata::from(payload);
446        for client_stats_payload in payload.stats() {
447            let tracer_metadata = TracerMetadata::from(client_stats_payload);
448
449            for stats_bucket in client_stats_payload.stats() {
450                let bucket_timeframe = BucketTimeframe::from(stats_bucket);
451
452                for grouped_stat in stats_bucket.stats() {
453                    let aggregation_key = AggregationKey::from(grouped_stat);
454                    let stats_group = self
455                        .groups
456                        .entry(aggregation_key)
457                        .or_insert_with(|| BucketedClientStatistics {
458                            agent_metadata: agent_metadata.clone(),
459                            tracer_metadata: tracer_metadata.clone(),
460                            buckets: FastHashMap::default(),
461                        });
462
463                    if stats_group.agent_metadata != agent_metadata {
464                        return Err(generic_error!("agent metadata mismatch"));
465                    }
466                    if stats_group.tracer_metadata != tracer_metadata {
467                        return Err(generic_error!("tracer metadata mismatch"));
468                    }
469
470                    stats_group.merge(bucket_timeframe, ClientStatistics::from(grouped_stat))?;
471                }
472            }
473        }
474
475        Ok(())
476    }
477
478    /// Returns a reference to the aggregated statistics groups.
479    pub fn groups(&self) -> &FastHashMap<AggregationKey, BucketedClientStatistics> {
480        &self.groups
481    }
482}