1use std::{
2 collections::hash_map::Entry,
3 fmt,
4 hash::{Hash as _, Hasher as _},
5 num::ParseIntError,
6 str::FromStr,
7};
8
9use datadog_protos::traces::{self as proto, Trilean};
10use saluki_common::{collections::FastHashMap, hash::StableHasher};
11use saluki_error::{generic_error, GenericError};
12use serde::{Deserialize, Serialize};
13use serde_with::{DeserializeFromStr, SerializeDisplay};
14use stringtheory::MetaString;
15
16#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
17struct AgentMetadata {
18 hostname: MetaString,
19 env: MetaString,
20 agent_version: MetaString,
21 client_computed: bool,
22 split_payload: bool,
23}
24
25impl From<&proto::StatsPayload> for AgentMetadata {
26 fn from(payload: &proto::StatsPayload) -> Self {
27 Self {
28 hostname: (*payload.agentHostname).into(),
29 env: (*payload.agentEnv).into(),
30 agent_version: (*payload.agentVersion).into(),
31 client_computed: payload.clientComputed,
32 split_payload: payload.splitPayload,
33 }
34 }
35}
36
37#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
38struct TracerMetadata {
39 hostname: MetaString,
40 env: MetaString,
41 version: MetaString,
42 service: MetaString,
43 language: MetaString,
44 tracer_version: MetaString,
45 runtime_id: MetaString,
46 sequence: u64,
47 agent_aggregation: MetaString,
48 container_id: MetaString,
49 tags: Vec<MetaString>,
50 git_commit_sha: MetaString,
51 image_tag: MetaString,
52 process_tags_hash: u64,
53 process_tags: MetaString,
54}
55
56impl From<&proto::ClientStatsPayload> for TracerMetadata {
57 fn from(payload: &proto::ClientStatsPayload) -> Self {
58 Self {
59 hostname: (*payload.hostname).into(),
60 env: (*payload.env).into(),
61 version: (*payload.version).into(),
62 service: (*payload.service).into(),
63 language: (*payload.lang).into(),
64 tracer_version: (*payload.tracerVersion).into(),
65 runtime_id: (*payload.runtimeID).into(),
66 sequence: payload.sequence,
67 agent_aggregation: (*payload.agentAggregation).into(),
68 container_id: (*payload.containerID).into(),
69 tags: payload.tags.iter().map(|t| MetaString::from(&**t)).collect(),
70 git_commit_sha: (*payload.git_commit_sha).into(),
71 image_tag: (*payload.image_tag).into(),
72 process_tags_hash: payload.process_tags_hash,
73 process_tags: (*payload.process_tags).into(),
74 }
75 }
76}
77
78#[derive(Clone, Copy, Debug, DeserializeFromStr, Hash, Eq, PartialEq, SerializeDisplay)]
80pub struct BucketTimeframe {
81 pub start_time_ns: u64,
83
84 pub duration_ns: u64,
86}
87
88impl fmt::Display for BucketTimeframe {
89 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90 write!(f, "[{},{}]", self.start_time_ns, self.duration_ns)
91 }
92}
93
94impl FromStr for BucketTimeframe {
95 type Err = String;
96
97 fn from_str(s: &str) -> Result<Self, Self::Err> {
98 let parts: Vec<&str> = s.trim_matches(['[', ']']).split(',').collect();
99 if parts.len() != 2 {
100 return Err(format!("expected two elements, found {}", parts.len()));
101 }
102 let start_time_ns = parts[0].parse::<u64>().map_err(|e| e.to_string())?;
103 let duration_ns = parts[1].parse::<u64>().map_err(|e| e.to_string())?;
104 Ok(Self {
105 start_time_ns,
106 duration_ns,
107 })
108 }
109}
110
111impl From<&proto::ClientStatsBucket> for BucketTimeframe {
112 fn from(bucket: &proto::ClientStatsBucket) -> Self {
113 Self {
114 start_time_ns: bucket.start,
115 duration_ns: bucket.duration,
116 }
117 }
118}
119
120#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
122pub struct BucketedClientStatistics {
123 agent_metadata: AgentMetadata,
124 tracer_metadata: TracerMetadata,
125 buckets: FastHashMap<BucketTimeframe, ClientStatistics>,
126}
127
128impl BucketedClientStatistics {
129 pub fn merge(&mut self, bucket_timeframe: BucketTimeframe, stats: ClientStatistics) -> Result<(), GenericError> {
131 match self.buckets.entry(bucket_timeframe) {
132 Entry::Occupied(mut entry) => {
133 let existing_stats = entry.get_mut();
134 existing_stats.merge(stats)
135 }
136 Entry::Vacant(entry) => {
137 entry.insert(stats);
138 Ok(())
139 }
140 }
141 }
142
143 pub fn buckets(&self) -> impl Iterator<Item = (&BucketTimeframe, &ClientStatistics)> {
145 self.buckets.iter()
146 }
147}
148
149#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
151pub struct ClientStatistics {
152 service: MetaString,
153 name: MetaString,
154 resource: MetaString,
155 type_: MetaString,
156 hits: u64,
157 errors: u64,
158 duration_ns: u64,
159 ok_summary: Vec<u8>,
161 error_summary: Vec<u8>,
163 synthetics: bool,
164 top_level_hits: u64,
165 span_kind: MetaString,
166 peer_tags: Vec<MetaString>,
167 is_trace_root: Option<bool>,
168 db_type: MetaString,
169 grpc_status_code: MetaString,
170 http_status_code: u32,
171 http_method: MetaString,
172 http_endpoint: MetaString,
173}
174
175impl ClientStatistics {
176 pub fn merge(&mut self, other: Self) -> Result<(), GenericError> {
181 if self.service != other.service {
183 return Err(generic_error!("failed to merge client statistics: service mismatch"));
184 }
185 if self.name != other.name {
186 return Err(generic_error!("failed to merge client statistics: name mismatch"));
187 }
188 if self.resource != other.resource {
189 return Err(generic_error!("failed to merge client statistics: resource mismatch"));
190 }
191 if self.type_ != other.type_ {
192 return Err(generic_error!("failed to merge client statistics: type mismatch"));
193 }
194 if self.span_kind != other.span_kind {
195 return Err(generic_error!("failed to merge client statistics: span kind mismatch"));
196 }
197 if self.peer_tags != other.peer_tags {
198 return Err(generic_error!("failed to merge client statistics: peer tags mismatch"));
199 }
200 if self.db_type != other.db_type {
201 return Err(generic_error!("failed to merge client statistics: db type mismatch"));
202 }
203 if self.grpc_status_code != other.grpc_status_code {
204 return Err(generic_error!(
205 "failed to merge client statistics: grpc status code mismatch"
206 ));
207 }
208 if self.http_status_code != other.http_status_code {
209 return Err(generic_error!(
210 "failed to merge client statistics: http status code mismatch"
211 ));
212 }
213 if self.http_method != other.http_method {
214 return Err(generic_error!(
215 "failed to merge client statistics: http method mismatch"
216 ));
217 }
218 if self.http_endpoint != other.http_endpoint {
219 return Err(generic_error!(
220 "failed to merge client statistics: http endpoint mismatch"
221 ));
222 }
223
224 self.hits += other.hits;
226 self.errors += other.errors;
227 self.duration_ns += other.duration_ns;
228 self.synthetics |= other.synthetics;
229 self.top_level_hits += other.top_level_hits;
230
231 Ok(())
234 }
235}
236
237impl From<&proto::ClientGroupedStats> for ClientStatistics {
238 fn from(payload: &proto::ClientGroupedStats) -> Self {
239 let is_trace_root = match payload.is_trace_root.enum_value() {
240 Ok(Trilean::NOT_SET) => None,
241 Ok(Trilean::TRUE) => Some(true),
242 Ok(Trilean::FALSE) => Some(false),
243 Err(_) => None,
244 };
245
246 Self {
247 service: (*payload.service).into(),
248 name: (*payload.name).into(),
249 resource: (*payload.resource).into(),
250 type_: (*payload.type_).into(),
251 hits: payload.hits,
252 errors: payload.errors,
253 duration_ns: payload.duration,
254 ok_summary: payload.okSummary.to_vec(),
255 error_summary: payload.errorSummary.to_vec(),
256 synthetics: payload.synthetics,
257 top_level_hits: payload.topLevelHits,
258 span_kind: (*payload.span_kind).into(),
259 peer_tags: payload.peer_tags.iter().map(|t| MetaString::from(&**t)).collect(),
260 is_trace_root,
261 db_type: (*payload.DB_type).into(),
262 grpc_status_code: (*payload.GRPC_status_code).into(),
263 http_status_code: payload.HTTP_status_code,
264 http_method: (*payload.HTTP_method).into(),
265 http_endpoint: (*payload.HTTP_endpoint).into(),
266 }
267 }
268}
269
270#[derive(Clone, Debug, DeserializeFromStr, Eq, Hash, PartialEq, SerializeDisplay)]
272pub struct AggregationKey(u64);
273
274impl fmt::Display for AggregationKey {
275 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
276 write!(f, "{}", self.0)
277 }
278}
279
280impl FromStr for AggregationKey {
281 type Err = ParseIntError;
282
283 fn from_str(s: &str) -> Result<Self, Self::Err> {
284 Ok(Self(s.parse()?))
285 }
286}
287
288impl From<&proto::ClientGroupedStats> for AggregationKey {
289 fn from(payload: &proto::ClientGroupedStats) -> Self {
290 let mut hasher = StableHasher::default();
296
297 payload.resource.hash(&mut hasher);
298 payload.service.hash(&mut hasher);
299 payload.name.hash(&mut hasher);
300 payload.type_.hash(&mut hasher);
301 payload.span_kind.hash(&mut hasher);
302 payload.HTTP_status_code.hash(&mut hasher);
303 payload.synthetics.hash(&mut hasher);
304
305 payload.peer_tags.hash(&mut hasher);
308
309 payload.is_trace_root.value().hash(&mut hasher);
310 payload.HTTP_method.hash(&mut hasher);
311 payload.HTTP_endpoint.hash(&mut hasher);
312
313 Self(hasher.finish())
314 }
315}
316
317#[derive(Clone, Debug, Default, Deserialize, Serialize)]
323pub struct ClientStatisticsAggregator {
324 groups: FastHashMap<AggregationKey, BucketedClientStatistics>,
325}
326
327impl ClientStatisticsAggregator {
328 pub fn merge_payload(&mut self, payload: &proto::StatsPayload) -> Result<(), GenericError> {
330 let agent_metadata = AgentMetadata::from(payload);
331 for client_stats_payload in payload.stats() {
332 let tracer_metadata = TracerMetadata::from(client_stats_payload);
333
334 for stats_bucket in client_stats_payload.stats() {
335 let bucket_timeframe = BucketTimeframe::from(stats_bucket);
336
337 for grouped_stat in stats_bucket.stats() {
338 let aggregation_key = AggregationKey::from(grouped_stat);
339 let stats_group = self
340 .groups
341 .entry(aggregation_key)
342 .or_insert_with(|| BucketedClientStatistics {
343 agent_metadata: agent_metadata.clone(),
344 tracer_metadata: tracer_metadata.clone(),
345 buckets: FastHashMap::default(),
346 });
347
348 if stats_group.agent_metadata != agent_metadata {
349 return Err(generic_error!("agent metadata mismatch"));
350 }
351 if stats_group.tracer_metadata != tracer_metadata {
352 return Err(generic_error!("tracer metadata mismatch"));
353 }
354
355 stats_group.merge(bucket_timeframe, ClientStatistics::from(grouped_stat))?;
356 }
357 }
358 }
359
360 Ok(())
361 }
362
363 pub fn groups(&self) -> &FastHashMap<AggregationKey, BucketedClientStatistics> {
368 &self.groups
369 }
370}