1use std::{
2 collections::hash_map::Entry,
3 fmt,
4 hash::{Hash as _, Hasher as _},
5 num::ParseIntError,
6 str::FromStr,
7};
8
9use base64::Engine as _;
10use datadog_protos::{
11 sketches::DDSketch as ProtoDDSketch,
12 traces::{self as proto, Trilean},
13};
14use ddsketch::canonical::{mapping::LogarithmicMapping, DDSketch};
15use protobuf::Message as _;
16use saluki_common::{collections::FastHashMap, hash::StableHasher};
17use saluki_error::{generic_error, ErrorContext as _, GenericError};
18use serde::{Deserialize, Serialize};
19use serde_with::{DeserializeFromStr, SerializeDisplay};
20use stringtheory::MetaString;
21
22#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
31#[serde(try_from = "String", into = "String")]
32pub struct EncodedApmStatsDDSketch {
33 inner: DDSketch,
34}
35
36impl EncodedApmStatsDDSketch {
37 pub fn inner(&self) -> &DDSketch {
39 &self.inner
40 }
41
42 pub fn inner_mut(&mut self) -> &mut DDSketch {
44 &mut self.inner
45 }
46}
47
48impl TryFrom<ProtoDDSketch> for EncodedApmStatsDDSketch {
49 type Error = GenericError;
50
51 fn try_from(value: ProtoDDSketch) -> Result<Self, Self::Error> {
52 let mapping = LogarithmicMapping::new(0.01).expect("Relative accuracy should be valid (0 <= 0.01 <= 1)");
54 let inner = DDSketch::from_proto(&value, mapping)
55 .error_context("Failed to convert DDSketch from canonical Protocol Buffers representation.")?;
56
57 Ok(Self { inner })
58 }
59}
60
61impl TryFrom<String> for EncodedApmStatsDDSketch {
62 type Error = GenericError;
63
64 fn try_from(value: String) -> Result<Self, Self::Error> {
65 let sketch_raw_proto = base64::engine::general_purpose::STANDARD
68 .decode(value)
69 .error_context("Failed to decode base64-encoded DDSketch.")?;
70
71 let sketch_proto = ProtoDDSketch::parse_from_bytes(&sketch_raw_proto[..])
72 .error_context("Failed to decode DDSketch from canonical Protocol Buffers representation.")?;
73
74 Self::try_from(sketch_proto)
75 }
76}
77
78impl From<EncodedApmStatsDDSketch> for ProtoDDSketch {
79 fn from(value: EncodedApmStatsDDSketch) -> Self {
80 value.inner.to_proto()
81 }
82}
83
84impl From<EncodedApmStatsDDSketch> for String {
85 fn from(value: EncodedApmStatsDDSketch) -> Self {
86 let sketch_proto = ProtoDDSketch::from(value);
87 let sketch_raw_proto = sketch_proto
88 .write_to_bytes()
89 .expect("Should not fail to encode DDSketch to serialized Protocol Buffers representation.");
90
91 base64::engine::general_purpose::STANDARD.encode(sketch_raw_proto)
92 }
93}
94
95#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
96struct AgentMetadata {
97 hostname: MetaString,
98 env: MetaString,
99 agent_version: MetaString,
100 client_computed: bool,
101 split_payload: bool,
102}
103
104impl From<&proto::StatsPayload> for AgentMetadata {
105 fn from(payload: &proto::StatsPayload) -> Self {
106 Self {
107 hostname: (*payload.agentHostname).into(),
108 env: (*payload.agentEnv).into(),
109 agent_version: (*payload.agentVersion).into(),
110 client_computed: payload.clientComputed,
111 split_payload: payload.splitPayload,
112 }
113 }
114}
115
116#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
117struct TracerMetadata {
118 hostname: MetaString,
119 env: MetaString,
120 version: MetaString,
121 service: MetaString,
122 language: MetaString,
123 tracer_version: MetaString,
124 runtime_id: MetaString,
125 sequence: u64,
126 agent_aggregation: MetaString,
127 container_id: MetaString,
128 tags: Vec<MetaString>,
129 git_commit_sha: MetaString,
130 image_tag: MetaString,
131 process_tags_hash: u64,
132 process_tags: MetaString,
133}
134
135impl From<&proto::ClientStatsPayload> for TracerMetadata {
136 fn from(payload: &proto::ClientStatsPayload) -> Self {
137 Self {
138 hostname: (*payload.hostname).into(),
139 env: (*payload.env).into(),
140 version: (*payload.version).into(),
141 service: (*payload.service).into(),
142 language: (*payload.lang).into(),
143 tracer_version: (*payload.tracerVersion).into(),
144 runtime_id: (*payload.runtimeID).into(),
145 sequence: payload.sequence,
146 agent_aggregation: (*payload.agentAggregation).into(),
147 container_id: (*payload.containerID).into(),
148 tags: payload.tags.iter().map(|t| MetaString::from(&**t)).collect(),
149 git_commit_sha: (*payload.git_commit_sha).into(),
150 image_tag: (*payload.image_tag).into(),
151 process_tags_hash: payload.process_tags_hash,
152 process_tags: (*payload.process_tags).into(),
153 }
154 }
155}
156
157#[derive(Clone, Copy, Debug, DeserializeFromStr, Hash, Eq, PartialEq, SerializeDisplay)]
159pub struct BucketTimeframe {
160 pub start_time_ns: u64,
162
163 pub duration_ns: u64,
165}
166
167impl fmt::Display for BucketTimeframe {
168 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169 write!(f, "[{},{}]", self.start_time_ns, self.duration_ns)
170 }
171}
172
173impl FromStr for BucketTimeframe {
174 type Err = String;
175
176 fn from_str(s: &str) -> Result<Self, Self::Err> {
177 let parts: Vec<&str> = s.trim_matches(['[', ']']).split(',').collect();
178 if parts.len() != 2 {
179 return Err(format!("expected two elements, found {}", parts.len()));
180 }
181 let start_time_ns = parts[0].parse::<u64>().map_err(|e| e.to_string())?;
182 let duration_ns = parts[1].parse::<u64>().map_err(|e| e.to_string())?;
183 Ok(Self {
184 start_time_ns,
185 duration_ns,
186 })
187 }
188}
189
190impl From<&proto::ClientStatsBucket> for BucketTimeframe {
191 fn from(bucket: &proto::ClientStatsBucket) -> Self {
192 Self {
193 start_time_ns: bucket.start,
194 duration_ns: bucket.duration,
195 }
196 }
197}
198
199#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
201pub struct BucketedClientStatistics {
202 agent_metadata: AgentMetadata,
203 tracer_metadata: TracerMetadata,
204 buckets: FastHashMap<BucketTimeframe, ClientStatistics>,
205}
206
207impl BucketedClientStatistics {
208 pub fn merge(&mut self, bucket_timeframe: BucketTimeframe, stats: ClientStatistics) -> Result<(), GenericError> {
210 match self.buckets.entry(bucket_timeframe) {
211 Entry::Occupied(mut entry) => {
212 let existing_stats = entry.get_mut();
213 existing_stats.merge(stats)
214 }
215 Entry::Vacant(entry) => {
216 entry.insert(stats);
217 Ok(())
218 }
219 }
220 }
221
222 pub fn merged(&self) -> Result<Option<ClientStatistics>, GenericError> {
231 let mut bucket_stats = self.buckets.values().cloned();
232
233 let mut merged_stats = match bucket_stats.next() {
234 Some(bucket) => bucket,
235 None => return Ok(None),
236 };
237
238 for other in bucket_stats {
239 merged_stats.merge(other)?;
240 }
241
242 Ok(Some(merged_stats))
243 }
244
245 pub fn buckets(&self) -> impl Iterator<Item = (&BucketTimeframe, &ClientStatistics)> {
247 self.buckets.iter()
248 }
249}
250
251#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
253pub struct ClientStatistics {
254 service: MetaString,
255 name: MetaString,
256 resource: MetaString,
257 type_: MetaString,
258 hits: u64,
259 errors: u64,
260 duration_ns: u64,
261 ok_summary: EncodedApmStatsDDSketch,
262 error_summary: EncodedApmStatsDDSketch,
263 synthetics: bool,
264 top_level_hits: u64,
265 span_kind: MetaString,
266 peer_tags: Vec<MetaString>,
267 is_trace_root: Option<bool>,
268 db_type: MetaString,
269 grpc_status_code: MetaString,
270 http_status_code: u32,
271 http_method: MetaString,
272 http_endpoint: MetaString,
273}
274
275impl ClientStatistics {
276 pub fn merge(&mut self, other: Self) -> Result<(), GenericError> {
281 if self.service != other.service {
283 return Err(generic_error!("failed to merge client statistics: service mismatch"));
284 }
285 if self.name != other.name {
286 return Err(generic_error!("failed to merge client statistics: name mismatch"));
287 }
288 if self.resource != other.resource {
289 return Err(generic_error!("failed to merge client statistics: resource mismatch"));
290 }
291 if self.type_ != other.type_ {
292 return Err(generic_error!("failed to merge client statistics: type mismatch"));
293 }
294 if self.span_kind != other.span_kind {
295 return Err(generic_error!("failed to merge client statistics: span kind mismatch"));
296 }
297 if self.peer_tags != other.peer_tags {
298 return Err(generic_error!("failed to merge client statistics: peer tags mismatch"));
299 }
300 if self.db_type != other.db_type {
301 return Err(generic_error!("failed to merge client statistics: db type mismatch"));
302 }
303 if self.grpc_status_code != other.grpc_status_code {
304 return Err(generic_error!(
305 "failed to merge client statistics: grpc status code mismatch"
306 ));
307 }
308 if self.http_status_code != other.http_status_code {
309 return Err(generic_error!(
310 "failed to merge client statistics: http status code mismatch"
311 ));
312 }
313 if self.http_method != other.http_method {
314 return Err(generic_error!(
315 "failed to merge client statistics: http method mismatch"
316 ));
317 }
318 if self.http_endpoint != other.http_endpoint {
319 return Err(generic_error!(
320 "failed to merge client statistics: http endpoint mismatch"
321 ));
322 }
323
324 self.hits += other.hits;
326 self.errors += other.errors;
327 self.duration_ns += other.duration_ns;
328 self.synthetics |= other.synthetics;
329 self.top_level_hits += other.top_level_hits;
330
331 self.ok_summary.inner_mut().merge(other.ok_summary.inner());
332 self.error_summary.inner_mut().merge(other.error_summary.inner());
333
334 Ok(())
335 }
336}
337
338impl From<&proto::ClientGroupedStats> for ClientStatistics {
339 fn from(payload: &proto::ClientGroupedStats) -> Self {
340 let is_trace_root = match payload.is_trace_root.enum_value() {
341 Ok(Trilean::NOT_SET) => None,
342 Ok(Trilean::TRUE) => Some(true),
343 Ok(Trilean::FALSE) => Some(false),
344 Err(_) => None,
345 };
346
347 let ok_summary_proto = ProtoDDSketch::parse_from_bytes(payload.okSummary())
349 .expect("Should not fail to decode DDSketch from serialized Protocol Buffers representation.");
350 let ok_summary = EncodedApmStatsDDSketch::try_from(ok_summary_proto)
351 .expect("Should not fail to convert DDSketch from canonical Protocol Buffers representation.");
352 let error_summary_proto = ProtoDDSketch::parse_from_bytes(payload.errorSummary())
353 .expect("Should not fail to decode DDSketch from serialized Protocol Buffers representation.");
354 let error_summary = EncodedApmStatsDDSketch::try_from(error_summary_proto)
355 .expect("Should not fail to convert DDSketch from canonical Protocol Buffers representation.");
356
357 Self {
358 service: (*payload.service).into(),
359 name: (*payload.name).into(),
360 resource: (*payload.resource).into(),
361 type_: (*payload.type_).into(),
362 hits: payload.hits,
363 errors: payload.errors,
364 duration_ns: payload.duration,
365 ok_summary,
366 error_summary,
367 synthetics: payload.synthetics,
368 top_level_hits: payload.topLevelHits,
369 span_kind: (*payload.span_kind).into(),
370 peer_tags: payload.peer_tags.iter().map(|t| MetaString::from(&**t)).collect(),
371 is_trace_root,
372 db_type: (*payload.DB_type).into(),
373 grpc_status_code: (*payload.GRPC_status_code).into(),
374 http_status_code: payload.HTTP_status_code,
375 http_method: (*payload.HTTP_method).into(),
376 http_endpoint: (*payload.HTTP_endpoint).into(),
377 }
378 }
379}
380
381#[derive(Clone, Debug, DeserializeFromStr, Eq, Hash, PartialEq, SerializeDisplay)]
383pub struct AggregationKey(u64);
384
385impl fmt::Display for AggregationKey {
386 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
387 write!(f, "{}", self.0)
388 }
389}
390
391impl FromStr for AggregationKey {
392 type Err = ParseIntError;
393
394 fn from_str(s: &str) -> Result<Self, Self::Err> {
395 Ok(Self(s.parse()?))
396 }
397}
398
399impl From<&proto::ClientGroupedStats> for AggregationKey {
400 fn from(payload: &proto::ClientGroupedStats) -> Self {
401 let mut hasher = StableHasher::default();
407
408 payload.resource.hash(&mut hasher);
409 payload.service.hash(&mut hasher);
410 payload.name.hash(&mut hasher);
411 payload.type_.hash(&mut hasher);
412 payload.span_kind.hash(&mut hasher);
413 payload.HTTP_status_code.hash(&mut hasher);
414 payload.synthetics.hash(&mut hasher);
415
416 payload.peer_tags.hash(&mut hasher);
419
420 payload.is_trace_root.value().hash(&mut hasher);
421 payload.HTTP_method.hash(&mut hasher);
422 payload.HTTP_endpoint.hash(&mut hasher);
423
424 Self(hasher.finish())
425 }
426}
427
428#[derive(Clone, Debug, Default, Deserialize, Serialize)]
434pub struct ClientStatisticsAggregator {
435 groups: FastHashMap<AggregationKey, BucketedClientStatistics>,
436}
437
438impl ClientStatisticsAggregator {
439 pub fn merge_payload(&mut self, payload: &proto::StatsPayload) -> Result<(), GenericError> {
441 let agent_metadata = AgentMetadata::from(payload);
442 for client_stats_payload in payload.stats() {
443 let tracer_metadata = TracerMetadata::from(client_stats_payload);
444
445 for stats_bucket in client_stats_payload.stats() {
446 let bucket_timeframe = BucketTimeframe::from(stats_bucket);
447
448 for grouped_stat in stats_bucket.stats() {
449 let aggregation_key = AggregationKey::from(grouped_stat);
450 let stats_group = self
451 .groups
452 .entry(aggregation_key)
453 .or_insert_with(|| BucketedClientStatistics {
454 agent_metadata: agent_metadata.clone(),
455 tracer_metadata: tracer_metadata.clone(),
456 buckets: FastHashMap::default(),
457 });
458
459 if stats_group.agent_metadata != agent_metadata {
460 return Err(generic_error!("agent metadata mismatch"));
461 }
462 if stats_group.tracer_metadata != tracer_metadata {
463 return Err(generic_error!("tracer metadata mismatch"));
464 }
465
466 stats_group.merge(bucket_timeframe, ClientStatistics::from(grouped_stat))?;
467 }
468 }
469 }
470
471 Ok(())
472 }
473
474 pub fn groups(&self) -> &FastHashMap<AggregationKey, BucketedClientStatistics> {
479 &self.groups
480 }
481}