1#![allow(clippy::disallowed_methods)]
4
5use std::{
6 collections::hash_map::Entry,
7 fmt,
8 hash::{Hash as _, Hasher as _},
9 num::ParseIntError,
10 str::FromStr,
11};
12
13use base64::Engine as _;
14use datadog_protos::{
15 sketches::DDSketch as ProtoDDSketch,
16 traces::{self as proto, Trilean},
17};
18use ddsketch::canonical::{mapping::LogarithmicMapping, DDSketch};
19use protobuf::Message as _;
20use saluki_common::{collections::FastHashMap, hash::StableHasher};
21use saluki_error::{generic_error, ErrorContext as _, GenericError};
22use serde::{Deserialize, Serialize};
23use serde_with::{DeserializeFromStr, SerializeDisplay};
24use stringtheory::MetaString;
25
26#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
35#[serde(try_from = "String", into = "String")]
36pub struct EncodedApmStatsDDSketch {
37 inner: DDSketch,
38}
39
40impl EncodedApmStatsDDSketch {
41 pub fn inner(&self) -> &DDSketch {
43 &self.inner
44 }
45
46 pub fn inner_mut(&mut self) -> &mut DDSketch {
48 &mut self.inner
49 }
50}
51
52impl TryFrom<ProtoDDSketch> for EncodedApmStatsDDSketch {
53 type Error = GenericError;
54
55 fn try_from(value: ProtoDDSketch) -> Result<Self, Self::Error> {
56 let mapping = LogarithmicMapping::new(0.01).expect("Relative accuracy should be valid (0 <= 0.01 <= 1)");
58 let inner = DDSketch::from_proto(&value, mapping)
59 .error_context("Failed to convert DDSketch from canonical Protocol Buffers representation.")?;
60
61 Ok(Self { inner })
62 }
63}
64
65impl TryFrom<String> for EncodedApmStatsDDSketch {
66 type Error = GenericError;
67
68 fn try_from(value: String) -> Result<Self, Self::Error> {
69 let sketch_raw_proto = base64::engine::general_purpose::STANDARD
72 .decode(value)
73 .error_context("Failed to decode base64-encoded DDSketch.")?;
74
75 let sketch_proto = ProtoDDSketch::parse_from_bytes(&sketch_raw_proto[..])
76 .error_context("Failed to decode DDSketch from canonical Protocol Buffers representation.")?;
77
78 Self::try_from(sketch_proto)
79 }
80}
81
82impl From<EncodedApmStatsDDSketch> for ProtoDDSketch {
83 fn from(value: EncodedApmStatsDDSketch) -> Self {
84 value.inner.to_proto()
85 }
86}
87
88impl From<EncodedApmStatsDDSketch> for String {
89 fn from(value: EncodedApmStatsDDSketch) -> Self {
90 let sketch_proto = ProtoDDSketch::from(value);
91 let sketch_raw_proto = sketch_proto
92 .write_to_bytes()
93 .expect("Should not fail to encode DDSketch to serialized Protocol Buffers representation.");
94
95 base64::engine::general_purpose::STANDARD.encode(sketch_raw_proto)
96 }
97}
98
99#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
100struct AgentMetadata {
101 hostname: MetaString,
102 env: MetaString,
103 agent_version: MetaString,
104 client_computed: bool,
105 split_payload: bool,
106}
107
108impl From<&proto::StatsPayload> for AgentMetadata {
109 fn from(payload: &proto::StatsPayload) -> Self {
110 Self {
111 hostname: (*payload.agentHostname).into(),
112 env: (*payload.agentEnv).into(),
113 agent_version: (*payload.agentVersion).into(),
114 client_computed: payload.clientComputed,
115 split_payload: payload.splitPayload,
116 }
117 }
118}
119
120#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
121struct TracerMetadata {
122 hostname: MetaString,
123 env: MetaString,
124 version: MetaString,
125 service: MetaString,
126 language: MetaString,
127 tracer_version: MetaString,
128 runtime_id: MetaString,
129 sequence: u64,
130 agent_aggregation: MetaString,
131 container_id: MetaString,
132 tags: Vec<MetaString>,
133 git_commit_sha: MetaString,
134 image_tag: MetaString,
135 process_tags_hash: u64,
136 process_tags: MetaString,
137}
138
139impl From<&proto::ClientStatsPayload> for TracerMetadata {
140 fn from(payload: &proto::ClientStatsPayload) -> Self {
141 Self {
142 hostname: (*payload.hostname).into(),
143 env: (*payload.env).into(),
144 version: (*payload.version).into(),
145 service: (*payload.service).into(),
146 language: (*payload.lang).into(),
147 tracer_version: (*payload.tracerVersion).into(),
148 runtime_id: (*payload.runtimeID).into(),
149 sequence: payload.sequence,
150 agent_aggregation: (*payload.agentAggregation).into(),
151 container_id: (*payload.containerID).into(),
152 tags: payload.tags.iter().map(|t| MetaString::from(&**t)).collect(),
153 git_commit_sha: (*payload.git_commit_sha).into(),
154 image_tag: (*payload.image_tag).into(),
155 process_tags_hash: payload.process_tags_hash,
156 process_tags: (*payload.process_tags).into(),
157 }
158 }
159}
160
161#[derive(Clone, Copy, Debug, DeserializeFromStr, Hash, Eq, PartialEq, SerializeDisplay)]
163pub struct BucketTimeframe {
164 pub start_time_ns: u64,
166
167 pub duration_ns: u64,
169}
170
171impl fmt::Display for BucketTimeframe {
172 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173 write!(f, "[{},{}]", self.start_time_ns, self.duration_ns)
174 }
175}
176
177impl FromStr for BucketTimeframe {
178 type Err = String;
179
180 fn from_str(s: &str) -> Result<Self, Self::Err> {
181 let parts: Vec<&str> = s.trim_matches(['[', ']']).split(',').collect();
182 if parts.len() != 2 {
183 return Err(format!("expected two elements, found {}", parts.len()));
184 }
185 let start_time_ns = parts[0].parse::<u64>().map_err(|e| e.to_string())?;
186 let duration_ns = parts[1].parse::<u64>().map_err(|e| e.to_string())?;
187 Ok(Self {
188 start_time_ns,
189 duration_ns,
190 })
191 }
192}
193
194impl From<&proto::ClientStatsBucket> for BucketTimeframe {
195 fn from(bucket: &proto::ClientStatsBucket) -> Self {
196 Self {
197 start_time_ns: bucket.start,
198 duration_ns: bucket.duration,
199 }
200 }
201}
202
203#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
205pub struct BucketedClientStatistics {
206 agent_metadata: AgentMetadata,
207 tracer_metadata: TracerMetadata,
208 buckets: FastHashMap<BucketTimeframe, ClientStatistics>,
209}
210
211impl BucketedClientStatistics {
212 pub fn merge(&mut self, bucket_timeframe: BucketTimeframe, stats: ClientStatistics) -> Result<(), GenericError> {
214 match self.buckets.entry(bucket_timeframe) {
215 Entry::Occupied(mut entry) => {
216 let existing_stats = entry.get_mut();
217 existing_stats.merge(stats)
218 }
219 Entry::Vacant(entry) => {
220 entry.insert(stats);
221 Ok(())
222 }
223 }
224 }
225
226 pub fn merged(&self) -> Result<Option<ClientStatistics>, GenericError> {
235 let mut bucket_stats = self.buckets.values().cloned();
236
237 let mut merged_stats = match bucket_stats.next() {
238 Some(bucket) => bucket,
239 None => return Ok(None),
240 };
241
242 for other in bucket_stats {
243 merged_stats.merge(other)?;
244 }
245
246 Ok(Some(merged_stats))
247 }
248
249 pub fn buckets(&self) -> impl Iterator<Item = (&BucketTimeframe, &ClientStatistics)> {
251 self.buckets.iter()
252 }
253}
254
255#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
257pub struct ClientStatistics {
258 service: MetaString,
259 name: MetaString,
260 resource: MetaString,
261 type_: MetaString,
262 hits: u64,
263 errors: u64,
264 duration_ns: u64,
265 ok_summary: EncodedApmStatsDDSketch,
266 error_summary: EncodedApmStatsDDSketch,
267 synthetics: bool,
268 top_level_hits: u64,
269 span_kind: MetaString,
270 peer_tags: Vec<MetaString>,
271 is_trace_root: Option<bool>,
272 db_type: MetaString,
273 grpc_status_code: MetaString,
274 http_status_code: u32,
275 http_method: MetaString,
276 http_endpoint: MetaString,
277}
278
279impl ClientStatistics {
280 pub fn merge(&mut self, other: Self) -> Result<(), GenericError> {
285 if self.service != other.service {
287 return Err(generic_error!("failed to merge client statistics: service mismatch"));
288 }
289 if self.name != other.name {
290 return Err(generic_error!("failed to merge client statistics: name mismatch"));
291 }
292 if self.resource != other.resource {
293 return Err(generic_error!("failed to merge client statistics: resource mismatch"));
294 }
295 if self.type_ != other.type_ {
296 return Err(generic_error!("failed to merge client statistics: type mismatch"));
297 }
298 if self.span_kind != other.span_kind {
299 return Err(generic_error!("failed to merge client statistics: span kind mismatch"));
300 }
301 if self.peer_tags != other.peer_tags {
302 return Err(generic_error!("failed to merge client statistics: peer tags mismatch"));
303 }
304 if self.db_type != other.db_type {
305 return Err(generic_error!("failed to merge client statistics: db type mismatch"));
306 }
307 if self.grpc_status_code != other.grpc_status_code {
308 return Err(generic_error!(
309 "failed to merge client statistics: grpc status code mismatch"
310 ));
311 }
312 if self.http_status_code != other.http_status_code {
313 return Err(generic_error!(
314 "failed to merge client statistics: http status code mismatch"
315 ));
316 }
317 if self.http_method != other.http_method {
318 return Err(generic_error!(
319 "failed to merge client statistics: http method mismatch"
320 ));
321 }
322 if self.http_endpoint != other.http_endpoint {
323 return Err(generic_error!(
324 "failed to merge client statistics: http endpoint mismatch"
325 ));
326 }
327
328 self.hits += other.hits;
330 self.errors += other.errors;
331 self.duration_ns += other.duration_ns;
332 self.synthetics |= other.synthetics;
333 self.top_level_hits += other.top_level_hits;
334
335 self.ok_summary.inner_mut().merge(other.ok_summary.inner());
336 self.error_summary.inner_mut().merge(other.error_summary.inner());
337
338 Ok(())
339 }
340}
341
342impl From<&proto::ClientGroupedStats> for ClientStatistics {
343 fn from(payload: &proto::ClientGroupedStats) -> Self {
344 let is_trace_root = match payload.is_trace_root.enum_value() {
345 Ok(Trilean::NOT_SET) => None,
346 Ok(Trilean::TRUE) => Some(true),
347 Ok(Trilean::FALSE) => Some(false),
348 Err(_) => None,
349 };
350
351 let ok_summary_proto = ProtoDDSketch::parse_from_bytes(payload.okSummary())
353 .expect("Should not fail to decode DDSketch from serialized Protocol Buffers representation.");
354 let ok_summary = EncodedApmStatsDDSketch::try_from(ok_summary_proto)
355 .expect("Should not fail to convert DDSketch from canonical Protocol Buffers representation.");
356 let error_summary_proto = ProtoDDSketch::parse_from_bytes(payload.errorSummary())
357 .expect("Should not fail to decode DDSketch from serialized Protocol Buffers representation.");
358 let error_summary = EncodedApmStatsDDSketch::try_from(error_summary_proto)
359 .expect("Should not fail to convert DDSketch from canonical Protocol Buffers representation.");
360
361 Self {
362 service: (*payload.service).into(),
363 name: (*payload.name).into(),
364 resource: (*payload.resource).into(),
365 type_: (*payload.type_).into(),
366 hits: payload.hits,
367 errors: payload.errors,
368 duration_ns: payload.duration,
369 ok_summary,
370 error_summary,
371 synthetics: payload.synthetics,
372 top_level_hits: payload.topLevelHits,
373 span_kind: (*payload.span_kind).into(),
374 peer_tags: payload.peer_tags.iter().map(|t| MetaString::from(&**t)).collect(),
375 is_trace_root,
376 db_type: (*payload.DB_type).into(),
377 grpc_status_code: (*payload.GRPC_status_code).into(),
378 http_status_code: payload.HTTP_status_code,
379 http_method: (*payload.HTTP_method).into(),
380 http_endpoint: (*payload.HTTP_endpoint).into(),
381 }
382 }
383}
384
385#[derive(Clone, Debug, DeserializeFromStr, Eq, Hash, PartialEq, SerializeDisplay)]
387pub struct AggregationKey(u64);
388
389impl fmt::Display for AggregationKey {
390 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
391 write!(f, "{}", self.0)
392 }
393}
394
395impl FromStr for AggregationKey {
396 type Err = ParseIntError;
397
398 fn from_str(s: &str) -> Result<Self, Self::Err> {
399 Ok(Self(s.parse()?))
400 }
401}
402
403impl From<&proto::ClientGroupedStats> for AggregationKey {
404 fn from(payload: &proto::ClientGroupedStats) -> Self {
405 let mut hasher = StableHasher::default();
411
412 payload.resource.hash(&mut hasher);
413 payload.service.hash(&mut hasher);
414 payload.name.hash(&mut hasher);
415 payload.type_.hash(&mut hasher);
416 payload.span_kind.hash(&mut hasher);
417 payload.HTTP_status_code.hash(&mut hasher);
418 payload.synthetics.hash(&mut hasher);
419
420 payload.peer_tags.hash(&mut hasher);
423
424 payload.is_trace_root.value().hash(&mut hasher);
425 payload.HTTP_method.hash(&mut hasher);
426 payload.HTTP_endpoint.hash(&mut hasher);
427
428 Self(hasher.finish())
429 }
430}
431
432#[derive(Clone, Debug, Default, Deserialize, Serialize)]
438pub struct ClientStatisticsAggregator {
439 groups: FastHashMap<AggregationKey, BucketedClientStatistics>,
440}
441
442impl ClientStatisticsAggregator {
443 pub fn merge_payload(&mut self, payload: &proto::StatsPayload) -> Result<(), GenericError> {
445 let agent_metadata = AgentMetadata::from(payload);
446 for client_stats_payload in payload.stats() {
447 let tracer_metadata = TracerMetadata::from(client_stats_payload);
448
449 for stats_bucket in client_stats_payload.stats() {
450 let bucket_timeframe = BucketTimeframe::from(stats_bucket);
451
452 for grouped_stat in stats_bucket.stats() {
453 let aggregation_key = AggregationKey::from(grouped_stat);
454 let stats_group = self
455 .groups
456 .entry(aggregation_key)
457 .or_insert_with(|| BucketedClientStatistics {
458 agent_metadata: agent_metadata.clone(),
459 tracer_metadata: tracer_metadata.clone(),
460 buckets: FastHashMap::default(),
461 });
462
463 if stats_group.agent_metadata != agent_metadata {
464 return Err(generic_error!("agent metadata mismatch"));
465 }
466 if stats_group.tracer_metadata != tracer_metadata {
467 return Err(generic_error!("tracer metadata mismatch"));
468 }
469
470 stats_group.merge(bucket_timeframe, ClientStatistics::from(grouped_stat))?;
471 }
472 }
473 }
474
475 Ok(())
476 }
477
478 pub fn groups(&self) -> &FastHashMap<AggregationKey, BucketedClientStatistics> {
480 &self.groups
481 }
482}