stele/
traces.rs

1use datadog_protos::traces as proto;
2use ordered_float::OrderedFloat;
3use saluki_common::collections::FastHashMap;
4use serde::{Deserialize, Serialize};
5use stringtheory::MetaString;
6
7#[derive(Clone, Debug, Eq, PartialEq)]
8struct WrappedFloat(OrderedFloat<f64>);
9
10impl From<f64> for WrappedFloat {
11    fn from(value: f64) -> Self {
12        WrappedFloat(OrderedFloat(value))
13    }
14}
15
16impl<'de> Deserialize<'de> for WrappedFloat {
17    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
18    where
19        D: serde::Deserializer<'de>,
20    {
21        let value = f64::deserialize(deserializer)?;
22        Ok(WrappedFloat(OrderedFloat(value)))
23    }
24}
25
26impl Serialize for WrappedFloat {
27    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
28    where
29        S: serde::Serializer,
30    {
31        self.0 .0.serialize(serializer)
32    }
33}
34
35#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
36struct AgentMetadata {
37    hostname: MetaString,
38    env: MetaString,
39    tags: FastHashMap<MetaString, MetaString>,
40    agent_version: MetaString,
41    target_tps: WrappedFloat,
42    error_tps: WrappedFloat,
43    rare_sampler_enabled: bool,
44}
45
46impl From<&proto::AgentPayload> for AgentMetadata {
47    fn from(payload: &proto::AgentPayload) -> Self {
48        Self {
49            hostname: (*payload.hostName).into(),
50            env: (*payload.env).into(),
51            tags: payload.tags.iter().map(|(k, v)| ((**k).into(), (**v).into())).collect(),
52            agent_version: (*payload.agentVersion).into(),
53            target_tps: payload.targetTPS.into(),
54            error_tps: payload.errorTPS.into(),
55            rare_sampler_enabled: payload.rareSamplerEnabled,
56        }
57    }
58}
59
60#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
61struct TracerMetadata {
62    container_id: MetaString,
63    language_name: MetaString,
64    language_version: MetaString,
65    tracer_version: MetaString,
66    runtime_id: MetaString,
67    tags: FastHashMap<MetaString, MetaString>,
68    env: MetaString,
69    hostname: MetaString,
70    app_version: MetaString,
71}
72
73impl From<&proto::TracerPayload> for TracerMetadata {
74    fn from(payload: &proto::TracerPayload) -> Self {
75        Self {
76            container_id: (*payload.containerID).into(),
77            language_name: (*payload.languageName).into(),
78            language_version: (*payload.languageVersion).into(),
79            tracer_version: (*payload.tracerVersion).into(),
80            runtime_id: (*payload.runtimeID).into(),
81            tags: payload.tags.iter().map(|(k, v)| ((**k).into(), (**v).into())).collect(),
82            env: (*payload.env).into(),
83            hostname: (*payload.hostname).into(),
84            app_version: (*payload.appVersion).into(),
85        }
86    }
87}
88
89#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
90struct TraceChunkMetadata {
91    priority: i32,
92    origin: MetaString,
93    tags: FastHashMap<MetaString, MetaString>,
94    dropped_trace: bool,
95}
96
97impl From<&proto::TraceChunk> for TraceChunkMetadata {
98    fn from(value: &proto::TraceChunk) -> Self {
99        Self {
100            priority: value.priority,
101            origin: (*value.origin).into(),
102            tags: value.tags.iter().map(|(k, v)| ((**k).into(), (**v).into())).collect(),
103            dropped_trace: value.droppedTrace,
104        }
105    }
106}
107
108/// A simplified span representation.
109#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
110pub struct Span {
111    agent_metadata: AgentMetadata,
112    tracer_metadata: TracerMetadata,
113    trace_chunk_metadata: TraceChunkMetadata,
114    service: MetaString,
115    name: MetaString,
116    resource: MetaString,
117    trace_id: u64,
118    span_id: u64,
119    parent_id: u64,
120    start: i64,
121    duration: i64,
122    error: i32,
123    meta: FastHashMap<MetaString, MetaString>,
124    metrics: FastHashMap<MetaString, WrappedFloat>,
125    type_: MetaString,
126    meta_struct: FastHashMap<MetaString, Vec<u8>>,
127    span_links: Vec<SpanLink>,
128    span_events: Vec<SpanEvent>,
129}
130
131impl Span {
132    /// Returns the trace ID this span belongs to.
133    pub fn trace_id(&self) -> u64 {
134        self.trace_id
135    }
136
137    /// Returns the ID of this span.
138    pub fn span_id(&self) -> u64 {
139        self.span_id
140    }
141
142    /// Returns the value of the metadata entry of the given key, if it exists.
143    pub fn get_meta_field(&self, meta_key: &str) -> Option<&str> {
144        self.meta.get(meta_key).map(|s| &**s)
145    }
146
147    /// Gets all spans from the given `AgentPayload`.
148    pub fn get_spans_from_agent_payload(payload: &proto::AgentPayload) -> Vec<Self> {
149        let agent_metadata = AgentMetadata::from(payload);
150
151        let mut spans = Vec::new();
152        for tracer_payload in payload.tracerPayloads() {
153            let tracer_metadata = TracerMetadata::from(tracer_payload);
154
155            for trace_chunk in tracer_payload.chunks() {
156                let trace_chunk_metadata = TraceChunkMetadata::from(trace_chunk);
157
158                for span in trace_chunk.spans() {
159                    let span = Self::from_proto(
160                        agent_metadata.clone(),
161                        tracer_metadata.clone(),
162                        trace_chunk_metadata.clone(),
163                        span,
164                    );
165                    spans.push(span);
166                }
167            }
168        }
169
170        spans
171    }
172
173    fn from_proto(
174        agent_metadata: AgentMetadata, tracer_metadata: TracerMetadata, trace_chunk_metadata: TraceChunkMetadata,
175        value: &proto::Span,
176    ) -> Self {
177        let mut span_links = value.spanLinks.iter().map(SpanLink::from).collect::<Vec<_>>();
178        span_links.sort_by_key(|link| (link.trace_id, link.trace_id_high, link.span_id));
179
180        let mut span_events = value.spanEvents.iter().map(SpanEvent::from).collect::<Vec<_>>();
181        span_events.sort_by_key(|event| event.time_unix_nano);
182
183        Self {
184            agent_metadata,
185            tracer_metadata,
186            trace_chunk_metadata,
187            service: (*value.service).into(),
188            name: (*value.name).into(),
189            resource: (*value.resource).into(),
190            trace_id: value.traceID,
191            span_id: value.spanID,
192            parent_id: value.parentID,
193            start: value.start,
194            duration: value.duration,
195            error: value.error,
196            meta: value.meta.iter().map(|(k, v)| ((**k).into(), (**v).into())).collect(),
197            metrics: value.metrics.iter().map(|(k, v)| ((**k).into(), (*v).into())).collect(),
198            type_: (*value.type_).into(),
199            meta_struct: value
200                .meta_struct
201                .iter()
202                .map(|(k, v)| ((**k).into(), v.to_vec()))
203                .collect(),
204            span_links,
205            span_events,
206        }
207    }
208}
209
210#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
211struct SpanLink {
212    trace_id: u64,
213    trace_id_high: u64,
214    span_id: u64,
215    attributes: FastHashMap<MetaString, MetaString>,
216    tracestate: MetaString,
217    flags: u32,
218}
219
220impl From<&proto::SpanLink> for SpanLink {
221    fn from(value: &proto::SpanLink) -> Self {
222        Self {
223            trace_id: value.traceID,
224            trace_id_high: value.traceID_high,
225            span_id: value.spanID,
226            attributes: value
227                .attributes
228                .iter()
229                .map(|(k, v)| ((**k).into(), (**v).into()))
230                .collect(),
231            tracestate: (*value.tracestate).into(),
232            flags: value.flags,
233        }
234    }
235}
236
237#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
238struct SpanEvent {
239    time_unix_nano: u64,
240    name: MetaString,
241    attributes: FastHashMap<MetaString, AttributeAnyValue>,
242}
243
244impl From<&proto::SpanEvent> for SpanEvent {
245    fn from(value: &proto::SpanEvent) -> Self {
246        Self {
247            time_unix_nano: value.time_unix_nano,
248            name: (*value.name).into(),
249            attributes: value
250                .attributes
251                .iter()
252                .map(|(k, v)| ((**k).into(), AttributeAnyValue::from(v)))
253                .collect(),
254        }
255    }
256}
257
258#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
259enum AttributeAnyValue {
260    String(MetaString),
261    Boolean(bool),
262    Integer(i64),
263    Double(WrappedFloat),
264    Array(Vec<AttributeArrayValue>),
265}
266
267impl From<&proto::AttributeAnyValue> for AttributeAnyValue {
268    fn from(value: &proto::AttributeAnyValue) -> Self {
269        let maybe_proto_value_type = value.type_.enum_value().expect("unknown/invalid anyvalue type");
270        match maybe_proto_value_type {
271            proto::AttributeAnyValueType::STRING_VALUE => AttributeAnyValue::String((*value.string_value).into()),
272            proto::AttributeAnyValueType::BOOL_VALUE => AttributeAnyValue::Boolean(value.bool_value),
273            proto::AttributeAnyValueType::INT_VALUE => AttributeAnyValue::Integer(value.int_value),
274            proto::AttributeAnyValueType::DOUBLE_VALUE => AttributeAnyValue::Double(value.double_value.into()),
275            proto::AttributeAnyValueType::ARRAY_VALUE => {
276                let mut values = Vec::new();
277                if let Some(array_value) = value.array_value.as_ref() {
278                    for value in array_value.values.iter() {
279                        values.push(AttributeArrayValue::from(value));
280                    }
281                }
282                AttributeAnyValue::Array(values)
283            }
284        }
285    }
286}
287
288#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
289enum AttributeArrayValue {
290    String(MetaString),
291    Boolean(bool),
292    Integer(i64),
293    Double(WrappedFloat),
294}
295
296impl From<&proto::AttributeArrayValue> for AttributeArrayValue {
297    fn from(value: &proto::AttributeArrayValue) -> Self {
298        let maybe_proto_value_type = value.type_.enum_value().expect("unknown/invalid arrayvalue type");
299        match maybe_proto_value_type {
300            proto::AttributeArrayValueType::STRING_VALUE => AttributeArrayValue::String((*value.string_value).into()),
301            proto::AttributeArrayValueType::BOOL_VALUE => AttributeArrayValue::Boolean(value.bool_value),
302            proto::AttributeArrayValueType::INT_VALUE => AttributeArrayValue::Integer(value.int_value),
303            proto::AttributeArrayValueType::DOUBLE_VALUE => AttributeArrayValue::Double(value.double_value.into()),
304        }
305    }
306}