1use datadog_protos::traces as proto;
2use ordered_float::OrderedFloat;
3use saluki_common::collections::FastHashMap;
4use serde::{Deserialize, Serialize};
5use stringtheory::MetaString;
6
7#[derive(Clone, Debug, Eq, PartialEq)]
8struct WrappedFloat(OrderedFloat<f64>);
9
10impl From<f64> for WrappedFloat {
11 fn from(value: f64) -> Self {
12 WrappedFloat(OrderedFloat(value))
13 }
14}
15
16impl<'de> Deserialize<'de> for WrappedFloat {
17 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
18 where
19 D: serde::Deserializer<'de>,
20 {
21 let value = f64::deserialize(deserializer)?;
22 Ok(WrappedFloat(OrderedFloat(value)))
23 }
24}
25
26impl Serialize for WrappedFloat {
27 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
28 where
29 S: serde::Serializer,
30 {
31 self.0 .0.serialize(serializer)
32 }
33}
34
35#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
36struct AgentMetadata {
37 hostname: MetaString,
38 env: MetaString,
39 tags: FastHashMap<MetaString, MetaString>,
40 agent_version: MetaString,
41 target_tps: WrappedFloat,
42 error_tps: WrappedFloat,
43 rare_sampler_enabled: bool,
44}
45
46impl From<&proto::AgentPayload> for AgentMetadata {
47 fn from(payload: &proto::AgentPayload) -> Self {
48 Self {
49 hostname: (*payload.hostName).into(),
50 env: (*payload.env).into(),
51 tags: payload.tags.iter().map(|(k, v)| ((**k).into(), (**v).into())).collect(),
52 agent_version: (*payload.agentVersion).into(),
53 target_tps: payload.targetTPS.into(),
54 error_tps: payload.errorTPS.into(),
55 rare_sampler_enabled: payload.rareSamplerEnabled,
56 }
57 }
58}
59
60#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
61struct TracerMetadata {
62 container_id: MetaString,
63 language_name: MetaString,
64 language_version: MetaString,
65 tracer_version: MetaString,
66 runtime_id: MetaString,
67 tags: FastHashMap<MetaString, MetaString>,
68 env: MetaString,
69 hostname: MetaString,
70 app_version: MetaString,
71}
72
73impl From<&proto::TracerPayload> for TracerMetadata {
74 fn from(payload: &proto::TracerPayload) -> Self {
75 Self {
76 container_id: (*payload.containerID).into(),
77 language_name: (*payload.languageName).into(),
78 language_version: (*payload.languageVersion).into(),
79 tracer_version: (*payload.tracerVersion).into(),
80 runtime_id: (*payload.runtimeID).into(),
81 tags: payload.tags.iter().map(|(k, v)| ((**k).into(), (**v).into())).collect(),
82 env: (*payload.env).into(),
83 hostname: (*payload.hostname).into(),
84 app_version: (*payload.appVersion).into(),
85 }
86 }
87}
88
89#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
90struct TraceChunkMetadata {
91 priority: i32,
92 origin: MetaString,
93 tags: FastHashMap<MetaString, MetaString>,
94 dropped_trace: bool,
95}
96
97impl From<&proto::TraceChunk> for TraceChunkMetadata {
98 fn from(value: &proto::TraceChunk) -> Self {
99 Self {
100 priority: value.priority,
101 origin: (*value.origin).into(),
102 tags: value.tags.iter().map(|(k, v)| ((**k).into(), (**v).into())).collect(),
103 dropped_trace: value.droppedTrace,
104 }
105 }
106}
107
108#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
110pub struct Span {
111 agent_metadata: AgentMetadata,
112 tracer_metadata: TracerMetadata,
113 trace_chunk_metadata: TraceChunkMetadata,
114 service: MetaString,
115 name: MetaString,
116 resource: MetaString,
117 trace_id: u64,
118 span_id: u64,
119 parent_id: u64,
120 start: i64,
121 duration: i64,
122 error: i32,
123 meta: FastHashMap<MetaString, MetaString>,
124 metrics: FastHashMap<MetaString, WrappedFloat>,
125 type_: MetaString,
126 meta_struct: FastHashMap<MetaString, Vec<u8>>,
127 span_links: Vec<SpanLink>,
128 span_events: Vec<SpanEvent>,
129}
130
131impl Span {
132 pub fn trace_id(&self) -> u64 {
134 self.trace_id
135 }
136
137 pub fn span_id(&self) -> u64 {
139 self.span_id
140 }
141
142 pub fn get_meta_field(&self, meta_key: &str) -> Option<&str> {
144 self.meta.get(meta_key).map(|s| &**s)
145 }
146
147 pub fn get_spans_from_agent_payload(payload: &proto::AgentPayload) -> Vec<Self> {
149 let agent_metadata = AgentMetadata::from(payload);
150
151 let mut spans = Vec::new();
152 for tracer_payload in payload.tracerPayloads() {
153 let tracer_metadata = TracerMetadata::from(tracer_payload);
154
155 for trace_chunk in tracer_payload.chunks() {
156 let trace_chunk_metadata = TraceChunkMetadata::from(trace_chunk);
157
158 for span in trace_chunk.spans() {
159 let span = Self::from_proto(
160 agent_metadata.clone(),
161 tracer_metadata.clone(),
162 trace_chunk_metadata.clone(),
163 span,
164 );
165 spans.push(span);
166 }
167 }
168 }
169
170 spans
171 }
172
173 fn from_proto(
174 agent_metadata: AgentMetadata, tracer_metadata: TracerMetadata, trace_chunk_metadata: TraceChunkMetadata,
175 value: &proto::Span,
176 ) -> Self {
177 let mut span_links = value.spanLinks.iter().map(SpanLink::from).collect::<Vec<_>>();
178 span_links.sort_by_key(|link| (link.trace_id, link.trace_id_high, link.span_id));
179
180 let mut span_events = value.spanEvents.iter().map(SpanEvent::from).collect::<Vec<_>>();
181 span_events.sort_by_key(|event| event.time_unix_nano);
182
183 Self {
184 agent_metadata,
185 tracer_metadata,
186 trace_chunk_metadata,
187 service: (*value.service).into(),
188 name: (*value.name).into(),
189 resource: (*value.resource).into(),
190 trace_id: value.traceID,
191 span_id: value.spanID,
192 parent_id: value.parentID,
193 start: value.start,
194 duration: value.duration,
195 error: value.error,
196 meta: value.meta.iter().map(|(k, v)| ((**k).into(), (**v).into())).collect(),
197 metrics: value.metrics.iter().map(|(k, v)| ((**k).into(), (*v).into())).collect(),
198 type_: (*value.type_).into(),
199 meta_struct: value
200 .meta_struct
201 .iter()
202 .map(|(k, v)| ((**k).into(), v.to_vec()))
203 .collect(),
204 span_links,
205 span_events,
206 }
207 }
208}
209
210#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
211struct SpanLink {
212 trace_id: u64,
213 trace_id_high: u64,
214 span_id: u64,
215 attributes: FastHashMap<MetaString, MetaString>,
216 tracestate: MetaString,
217 flags: u32,
218}
219
220impl From<&proto::SpanLink> for SpanLink {
221 fn from(value: &proto::SpanLink) -> Self {
222 Self {
223 trace_id: value.traceID,
224 trace_id_high: value.traceID_high,
225 span_id: value.spanID,
226 attributes: value
227 .attributes
228 .iter()
229 .map(|(k, v)| ((**k).into(), (**v).into()))
230 .collect(),
231 tracestate: (*value.tracestate).into(),
232 flags: value.flags,
233 }
234 }
235}
236
237#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
238struct SpanEvent {
239 time_unix_nano: u64,
240 name: MetaString,
241 attributes: FastHashMap<MetaString, AttributeAnyValue>,
242}
243
244impl From<&proto::SpanEvent> for SpanEvent {
245 fn from(value: &proto::SpanEvent) -> Self {
246 Self {
247 time_unix_nano: value.time_unix_nano,
248 name: (*value.name).into(),
249 attributes: value
250 .attributes
251 .iter()
252 .map(|(k, v)| ((**k).into(), AttributeAnyValue::from(v)))
253 .collect(),
254 }
255 }
256}
257
258#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
259enum AttributeAnyValue {
260 String(MetaString),
261 Boolean(bool),
262 Integer(i64),
263 Double(WrappedFloat),
264 Array(Vec<AttributeArrayValue>),
265}
266
267impl From<&proto::AttributeAnyValue> for AttributeAnyValue {
268 fn from(value: &proto::AttributeAnyValue) -> Self {
269 let maybe_proto_value_type = value.type_.enum_value().expect("unknown/invalid anyvalue type");
270 match maybe_proto_value_type {
271 proto::AttributeAnyValueType::STRING_VALUE => AttributeAnyValue::String((*value.string_value).into()),
272 proto::AttributeAnyValueType::BOOL_VALUE => AttributeAnyValue::Boolean(value.bool_value),
273 proto::AttributeAnyValueType::INT_VALUE => AttributeAnyValue::Integer(value.int_value),
274 proto::AttributeAnyValueType::DOUBLE_VALUE => AttributeAnyValue::Double(value.double_value.into()),
275 proto::AttributeAnyValueType::ARRAY_VALUE => {
276 let mut values = Vec::new();
277 if let Some(array_value) = value.array_value.as_ref() {
278 for value in array_value.values.iter() {
279 values.push(AttributeArrayValue::from(value));
280 }
281 }
282 AttributeAnyValue::Array(values)
283 }
284 }
285 }
286}
287
288#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
289enum AttributeArrayValue {
290 String(MetaString),
291 Boolean(bool),
292 Integer(i64),
293 Double(WrappedFloat),
294}
295
296impl From<&proto::AttributeArrayValue> for AttributeArrayValue {
297 fn from(value: &proto::AttributeArrayValue) -> Self {
298 let maybe_proto_value_type = value.type_.enum_value().expect("unknown/invalid arrayvalue type");
299 match maybe_proto_value_type {
300 proto::AttributeArrayValueType::STRING_VALUE => AttributeArrayValue::String((*value.string_value).into()),
301 proto::AttributeArrayValueType::BOOL_VALUE => AttributeArrayValue::Boolean(value.bool_value),
302 proto::AttributeArrayValueType::INT_VALUE => AttributeArrayValue::Integer(value.int_value),
303 proto::AttributeArrayValueType::DOUBLE_VALUE => AttributeArrayValue::Double(value.double_value.into()),
304 }
305 }
306}