Skip to main content

substrait_explain/textify/
rels.rs

1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::convert::TryFrom;
4use std::fmt;
5use std::fmt::Debug;
6
7use prost::{Message, UnknownEnumValue};
8use substrait::proto::fetch_rel::CountMode;
9use substrait::proto::plan_rel::RelType as PlanRelType;
10use substrait::proto::read_rel::ReadType;
11use substrait::proto::rel::RelType;
12use substrait::proto::rel_common::EmitKind;
13use substrait::proto::sort_field::{SortDirection, SortKind};
14use substrait::proto::{
15    AggregateFunction, AggregateRel, Expression, ExtensionLeafRel, ExtensionMultiRel,
16    ExtensionSingleRel, FetchRel, FilterRel, JoinRel, NamedStruct, PlanRel, ProjectRel, ReadRel,
17    Rel, RelCommon, RelRoot, SortField, SortRel, Type, join_rel,
18};
19
20use super::addenda::AddendumLines;
21use super::expressions::Reference;
22use super::types::Name;
23use super::{PlanError, Scope, Textify};
24use crate::FormatError;
25use crate::extensions::any::AnyRef;
26use crate::extensions::{ExtensionArgs, ExtensionColumn, ExtensionError, ExtensionValue};
27
28pub trait NamedRelation {
29    fn name(&self) -> &'static str;
30}
31
32impl NamedRelation for Rel {
33    fn name(&self) -> &'static str {
34        match self.rel_type.as_ref() {
35            None => "UnknownRel",
36            Some(RelType::Read(_)) => "Read",
37            Some(RelType::Filter(_)) => "Filter",
38            Some(RelType::Project(_)) => "Project",
39            Some(RelType::Fetch(_)) => "Fetch",
40            Some(RelType::Aggregate(_)) => "Aggregate",
41            Some(RelType::Sort(_)) => "Sort",
42            Some(RelType::HashJoin(_)) => "HashJoin",
43            Some(RelType::Exchange(_)) => "Exchange",
44            Some(RelType::Join(_)) => "Join",
45            Some(RelType::Set(_)) => "Set",
46            Some(RelType::ExtensionLeaf(_)) => "ExtensionLeaf",
47            Some(RelType::Cross(_)) => "Cross",
48            Some(RelType::Reference(_)) => "Reference",
49            Some(RelType::ExtensionSingle(_)) => "ExtensionSingle",
50            Some(RelType::ExtensionMulti(_)) => "ExtensionMulti",
51            Some(RelType::Write(_)) => "Write",
52            Some(RelType::Ddl(_)) => "Ddl",
53            Some(RelType::Update(_)) => "Update",
54            Some(RelType::MergeJoin(_)) => "MergeJoin",
55            Some(RelType::NestedLoopJoin(_)) => "NestedLoopJoin",
56            Some(RelType::Window(_)) => "Window",
57            Some(RelType::Expand(_)) => "Expand",
58        }
59    }
60}
61
62impl Textify for Rel {
63    fn name() -> &'static str {
64        "Rel"
65    }
66
67    fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
68        // delegates to `Relation` which carries `advanced_extension`, so the full
69        // header → enhancement → children sequence is handled uniformly there.
70        Relation::from_rel(self, ctx).textify(ctx, w)
71    }
72}
73
74/// Trait for enums that can be converted to a string representation for
75/// textification.
76///
77/// Returns Ok(str) for valid enum values, or Err([PlanError]) for invalid or
78/// unknown values.
79pub trait ValueEnum {
80    fn as_enum_str(&self) -> Result<Cow<'static, str>, PlanError>;
81}
82
83#[derive(Debug, Clone)]
84pub struct NamedArg<'a> {
85    pub name: Cow<'a, str>,
86    pub value: Value<'a>,
87}
88
89#[derive(Debug, Clone)]
90pub enum Value<'a> {
91    TableName(Vec<Name<'a>>),
92    Field(Option<Name<'a>>, Option<&'a Type>),
93    Tuple(Vec<Value<'a>>),
94    Reference(i32),
95    Expression(&'a Expression),
96    AggregateFunction(&'a AggregateFunction),
97    /// Represents a missing, invalid, or unspecified value.
98    Missing(PlanError),
99    /// Represents a valid enum value as a string for textification.
100    Enum(Cow<'a, str>),
101    EmptyGroup,
102    Integer(i64),
103    /// A decoded extension argument value.
104    ExtensionArgument(ExtensionValue),
105    /// A decoded extension output column.
106    ExtColumn(ExtensionColumn),
107}
108
109impl<'a> Value<'a> {
110    pub fn expect(maybe_value: Option<Self>, f: impl FnOnce() -> PlanError) -> Self {
111        match maybe_value {
112            Some(s) => s,
113            None => Value::Missing(f()),
114        }
115    }
116}
117
118impl<'a> From<Result<Vec<Name<'a>>, PlanError>> for Value<'a> {
119    fn from(token: Result<Vec<Name<'a>>, PlanError>) -> Self {
120        match token {
121            Ok(value) => Value::TableName(value),
122            Err(err) => Value::Missing(err),
123        }
124    }
125}
126
127impl<'a> Textify for Value<'a> {
128    fn name() -> &'static str {
129        "Value"
130    }
131
132    fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
133        match self {
134            Value::TableName(names) => write!(w, "{}", ctx.separated(names, ".")),
135            Value::Field(name, typ) => {
136                write!(w, "{}:{}", ctx.expect(name.as_ref()), ctx.expect(*typ))
137            }
138            Value::Tuple(values) => write!(w, "({})", ctx.separated(values, ", ")),
139            Value::Reference(i) => write!(w, "{}", Reference(*i)),
140            Value::Expression(e) => write!(w, "{}", ctx.display(*e)),
141            Value::AggregateFunction(agg_fn) => agg_fn.textify(ctx, w),
142            Value::Missing(err) => write!(w, "{}", ctx.failure(err.clone())),
143            Value::Enum(res) => write!(w, "&{res}"),
144            Value::Integer(i) => write!(w, "{i}"),
145            Value::EmptyGroup => write!(w, "_"),
146            Value::ExtensionArgument(ev) => ev.textify(ctx, w),
147            Value::ExtColumn(ec) => ec.textify(ctx, w),
148        }
149    }
150}
151
152fn schema_to_values<'a>(schema: &'a NamedStruct) -> Vec<Value<'a>> {
153    let mut fields = schema
154        .r#struct
155        .as_ref()
156        .map(|s| s.types.iter())
157        .into_iter()
158        .flatten();
159    let mut names = schema.names.iter();
160
161    // let field_count = schema.r#struct.as_ref().map(|s| s.types.len()).unwrap_or(0);
162    // let name_count = schema.names.len();
163
164    let mut values = Vec::new();
165    loop {
166        let field = fields.next();
167        let name = names.next().map(|n| Name(n));
168        if field.is_none() && name.is_none() {
169            break;
170        }
171
172        values.push(Value::Field(name, field));
173    }
174
175    values
176}
177
178struct Emitted<'a> {
179    pub values: &'a [Value<'a>],
180    pub emit: Option<&'a EmitKind>,
181}
182
183impl<'a> Emitted<'a> {
184    pub fn new(values: &'a [Value<'a>], emit: Option<&'a EmitKind>) -> Self {
185        Self { values, emit }
186    }
187
188    pub fn write_direct<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
189        write!(w, "{}", ctx.separated(self.values.iter(), ", "))
190    }
191}
192
193impl<'a> Textify for Emitted<'a> {
194    fn name() -> &'static str {
195        "Emitted"
196    }
197
198    fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
199        if ctx.options().show_emit {
200            return self.write_direct(ctx, w);
201        }
202
203        let indices = match &self.emit {
204            Some(EmitKind::Emit(e)) => &e.output_mapping,
205            Some(EmitKind::Direct(_)) => return self.write_direct(ctx, w),
206            None => return self.write_direct(ctx, w),
207        };
208
209        for (i, &index) in indices.iter().enumerate() {
210            if i > 0 {
211                write!(w, ", ")?;
212            }
213
214            match self.values.get(index as usize) {
215                Some(value) => write!(w, "{}", ctx.display(value))?,
216                None => write!(w, "{}", ctx.failure(PlanError::invalid(
217                    "Emitted",
218                    Some("output_mapping"),
219                    format!(
220                        "Output mapping index {} is out of bounds for values collection of size {}",
221                        index, self.values.len()
222                    )
223                )))?,
224            }
225        }
226
227        Ok(())
228    }
229}
230
231#[derive(Debug, Clone)]
232pub struct Arguments<'a> {
233    /// Positional arguments (e.g., a filter condition, group-bys, etc.)
234    pub positional: Vec<Value<'a>>,
235    /// Named arguments (e.g., limit=10, offset=5)
236    pub named: Vec<NamedArg<'a>>,
237}
238
239impl<'a> Textify for Arguments<'a> {
240    fn name() -> &'static str {
241        "Arguments"
242    }
243    fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
244        if self.positional.is_empty() && self.named.is_empty() {
245            return write!(w, "_");
246        }
247
248        write!(w, "{}", ctx.separated(self.positional.iter(), ", "))?;
249        if !self.positional.is_empty() && !self.named.is_empty() {
250            write!(w, ", ")?;
251        }
252        write!(w, "{}", ctx.separated(self.named.iter(), ", "))
253    }
254}
255
256pub struct Relation<'a> {
257    pub name: Cow<'a, str>,
258    /// Arguments to the relation, if any.
259    ///
260    /// - `None` means this relation does not take arguments, and the argument
261    ///   section is omitted entirely.
262    /// - `Some(args)` with both vectors empty means the relation takes
263    ///   arguments, but none are provided; this will print as `_ => ...`.
264    /// - `Some(args)` with non-empty vectors will print as usual, with
265    ///   positional arguments first, then named arguments, separated by commas.
266    pub arguments: Option<Arguments<'a>>,
267    /// The columns emitted by this relation, pre-emit - the 'direct' column
268    /// output.
269    pub columns: Vec<Value<'a>>,
270    /// The emit kind, if any. If none, use the columns directly.
271    pub emit: Option<&'a EmitKind>,
272    /// `+`-prefixed addendum lines to emit between this relation's header and
273    /// children.  This owns the canonical ordering for `+ Ext`, `+ Enh`, and
274    /// `+ Opt` lines rather than making the generic relation shape grow one
275    /// field per addendum kind.
276    addenda: AddendumLines,
277    /// The input relations.
278    pub children: Vec<Option<Relation<'a>>>,
279}
280
281impl Textify for Relation<'_> {
282    fn name() -> &'static str {
283        "Relation"
284    }
285
286    fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
287        self.write_header(ctx, w)?;
288        let child_scope = ctx.push_indent();
289        self.addenda.textify(&child_scope, w)?;
290        self.write_children(ctx, w)?;
291        Ok(())
292    }
293}
294
295impl Relation<'_> {
296    /// Write the single header line for this relation, e.g. `Filter[$0 => $0]`.
297    /// Does not write a trailing newline; callers are responsible for any
298    /// newline that follows (either from an addendum or from the next child).
299    pub fn write_header<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
300        let cols = Emitted::new(&self.columns, self.emit);
301        let indent = ctx.indent();
302        let name = &self.name;
303        let cols = ctx.display(&cols);
304        match &self.arguments {
305            None => {
306                write!(w, "{indent}{name}[{cols}]")
307            }
308            Some(args) => {
309                let args = ctx.display(args);
310                write!(w, "{indent}{name}[{args} => {cols}]")
311            }
312        }
313    }
314
315    /// Write each child relation at one indent level deeper than `ctx`.
316    /// Each child is preceded by a newline.
317    pub fn write_children<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
318        let child_scope = ctx.push_indent();
319        for child in self.children.iter().flatten() {
320            writeln!(w)?;
321            child.textify(&child_scope, w)?;
322        }
323        Ok(())
324    }
325}
326
327impl<'a> Relation<'a> {
328    pub fn emitted(&self) -> usize {
329        match self.emit {
330            Some(EmitKind::Emit(e)) => e.output_mapping.len(),
331            Some(EmitKind::Direct(_)) => self.columns.len(),
332            None => self.columns.len(),
333        }
334    }
335}
336
337impl<'a> Relation<'a> {
338    fn from_read<S: Scope>(rel: &'a ReadRel, ctx: &S) -> Self {
339        let columns = read_columns(rel);
340        let emit = rel.common.as_ref().and_then(|c| c.emit_kind.as_ref());
341
342        match &rel.read_type {
343            Some(ReadType::NamedTable(table)) => {
344                let table_name = Value::TableName(table.names.iter().map(|n| Name(n)).collect());
345                Relation {
346                    name: Cow::Borrowed("Read"),
347                    arguments: Some(Arguments {
348                        positional: vec![table_name],
349                        named: vec![],
350                    }),
351                    columns,
352                    emit,
353                    addenda: AddendumLines::from_advanced_extension(
354                        ctx,
355                        rel.advanced_extension.as_ref(),
356                    ),
357                    children: vec![],
358                }
359            }
360            Some(ReadType::VirtualTable(vt)) => {
361                let positional = vt
362                    .expressions
363                    .iter()
364                    .map(|row| Value::Tuple(row.fields.iter().map(Value::Expression).collect()))
365                    .collect();
366
367                Relation {
368                    name: Cow::Borrowed("Read:Virtual"),
369                    arguments: Some(Arguments {
370                        positional,
371                        named: vec![],
372                    }),
373                    columns,
374                    emit,
375                    addenda: AddendumLines::from_advanced_extension(
376                        ctx,
377                        rel.advanced_extension.as_ref(),
378                    ),
379                    children: vec![],
380                }
381            }
382            Some(ReadType::ExtensionTable(table)) => {
383                let decoded = match table.detail.as_ref().map(AnyRef::from) {
384                    Some(detail) => ctx.extension_registry().decode_extension_table(detail),
385                    None => Err(ExtensionError::MissingDetail),
386                };
387
388                Relation {
389                    name: Cow::Borrowed("Read:Extension"),
390                    arguments: None,
391                    columns,
392                    emit,
393                    addenda: AddendumLines::extension_table(
394                        ctx,
395                        decoded,
396                        rel.advanced_extension.as_ref(),
397                    ),
398                    children: vec![],
399                }
400            }
401            other => {
402                let err = PlanError::unimplemented(
403                    "ReadRel",
404                    Some("read_type"),
405                    format!("Unsupported read type {other:?}"),
406                );
407                Relation {
408                    name: Cow::Borrowed("Read"),
409                    arguments: Some(Arguments {
410                        positional: vec![Value::Missing(err)],
411                        named: vec![],
412                    }),
413                    columns,
414                    emit,
415                    addenda: AddendumLines::from_advanced_extension(
416                        ctx,
417                        rel.advanced_extension.as_ref(),
418                    ),
419                    children: vec![],
420                }
421            }
422        }
423    }
424}
425
426fn read_columns<'a>(rel: &'a ReadRel) -> Vec<Value<'a>> {
427    match rel.base_schema {
428        Some(ref schema) => schema_to_values(schema),
429        None => {
430            let err =
431                PlanError::unimplemented("ReadRel", Some("base_schema"), "Base schema is required");
432            vec![Value::Missing(err)]
433        }
434    }
435}
436
437pub fn get_emit(rel: Option<&RelCommon>) -> Option<&EmitKind> {
438    rel.as_ref().and_then(|c| c.emit_kind.as_ref())
439}
440
441impl<'a> Relation<'a> {
442    /// Convert a vector of relation references into their structured form.
443    ///
444    /// Returns a list of children (with None for ones missing), and a count of input columns.
445    pub fn convert_children<S: Scope>(
446        refs: Vec<Option<&'a Rel>>,
447        ctx: &S,
448    ) -> (Vec<Option<Relation<'a>>>, usize) {
449        let mut children = vec![];
450        let mut inputs = 0;
451
452        for maybe_rel in refs {
453            match maybe_rel {
454                Some(rel) => {
455                    let child = Relation::from_rel(rel, ctx);
456                    inputs += child.emitted();
457                    children.push(Some(child));
458                }
459                None => children.push(None),
460            }
461        }
462
463        (children, inputs)
464    }
465}
466
467impl<'a> Relation<'a> {
468    fn from_filter<S: Scope>(rel: &'a FilterRel, ctx: &S) -> Self {
469        let condition = rel
470            .condition
471            .as_ref()
472            .map(|c| Value::Expression(c.as_ref()));
473        let condition = Value::expect(condition, || {
474            PlanError::unimplemented("FilterRel", Some("condition"), "Condition is None")
475        });
476        let positional = vec![condition];
477        let arguments = Some(Arguments {
478            positional,
479            named: vec![],
480        });
481        let emit = get_emit(rel.common.as_ref());
482        let (children, columns) = Relation::convert_children(vec![rel.input.as_deref()], ctx);
483        let columns = (0..columns).map(|i| Value::Reference(i as i32)).collect();
484
485        Relation {
486            name: Cow::Borrowed("Filter"),
487            arguments,
488            columns,
489            emit,
490            addenda: AddendumLines::from_advanced_extension(ctx, rel.advanced_extension.as_ref()),
491            children,
492        }
493    }
494
495    fn from_project<S: Scope>(rel: &'a ProjectRel, ctx: &S) -> Self {
496        let (children, input_columns) = Relation::convert_children(vec![rel.input.as_deref()], ctx);
497        let mut columns: Vec<Value> = vec![];
498        for i in 0..input_columns {
499            columns.push(Value::Reference(i as i32));
500        }
501        for expr in &rel.expressions {
502            columns.push(Value::Expression(expr));
503        }
504
505        Relation {
506            name: Cow::Borrowed("Project"),
507            arguments: None,
508            columns,
509            emit: get_emit(rel.common.as_ref()),
510            addenda: AddendumLines::from_advanced_extension(ctx, rel.advanced_extension.as_ref()),
511            children,
512        }
513    }
514
515    pub fn from_rel<S: Scope>(rel: &'a Rel, ctx: &S) -> Self {
516        match rel.rel_type.as_ref() {
517            Some(RelType::Read(r)) => Relation::from_read(r, ctx),
518            Some(RelType::Filter(r)) => Relation::from_filter(r, ctx),
519            Some(RelType::Project(r)) => Relation::from_project(r, ctx),
520            Some(RelType::Aggregate(r)) => Relation::from_aggregate(r, ctx),
521            Some(RelType::Sort(r)) => Relation::from_sort(r, ctx),
522            Some(RelType::Fetch(r)) => Relation::from_fetch(r, ctx),
523            Some(RelType::Join(r)) => Relation::from_join(r, ctx),
524            Some(RelType::ExtensionLeaf(r)) => Relation::from_extension_leaf(r, ctx),
525            Some(RelType::ExtensionSingle(r)) => Relation::from_extension_single(r, ctx),
526            Some(RelType::ExtensionMulti(r)) => Relation::from_extension_multi(r, ctx),
527            _ => {
528                let name = rel.name();
529                let token = ctx.failure(FormatError::Format(PlanError::unimplemented(
530                    "Rel",
531                    Some(name),
532                    format!("{name} is not yet supported in the text format"),
533                )));
534                Relation {
535                    name: Cow::Owned(format!("{token}")),
536                    arguments: None,
537                    columns: vec![],
538                    emit: None,
539                    addenda: AddendumLines::none(),
540                    children: vec![],
541                }
542            }
543        }
544    }
545
546    fn from_extension_leaf<S: Scope>(rel: &'a ExtensionLeafRel, ctx: &S) -> Self {
547        let detail_ref = rel.detail.as_ref().map(AnyRef::from);
548        let decoded = match detail_ref {
549            Some(d) => ctx.extension_registry().decode(d),
550            None => Err(ExtensionError::MissingDetail),
551        };
552        Relation::from_extension("ExtensionLeaf", decoded, vec![], ctx)
553    }
554
555    fn from_extension_single<S: Scope>(rel: &'a ExtensionSingleRel, ctx: &S) -> Self {
556        let detail_ref = rel.detail.as_ref().map(AnyRef::from);
557        let decoded = match detail_ref {
558            Some(d) => ctx.extension_registry().decode(d),
559            None => Err(ExtensionError::MissingDetail),
560        };
561        Relation::from_extension("ExtensionSingle", decoded, vec![rel.input.as_deref()], ctx)
562    }
563
564    fn from_extension_multi<S: Scope>(rel: &'a ExtensionMultiRel, ctx: &S) -> Self {
565        let detail_ref = rel.detail.as_ref().map(AnyRef::from);
566        let decoded = match detail_ref {
567            Some(d) => ctx.extension_registry().decode(d),
568            None => Err(ExtensionError::MissingDetail),
569        };
570        let mut child_refs: Vec<Option<&'a Rel>> = vec![];
571        for input in &rel.inputs {
572            child_refs.push(Some(input));
573        }
574        Relation::from_extension("ExtensionMulti", decoded, child_refs, ctx)
575    }
576
577    fn from_extension<S: Scope>(
578        ext_type: &'static str,
579        decoded: Result<(String, ExtensionArgs), ExtensionError>,
580        child_refs: Vec<Option<&'a Rel>>,
581        ctx: &S,
582    ) -> Self {
583        match decoded {
584            Ok((name, args)) => {
585                let (children, _) = Relation::convert_children(child_refs, ctx);
586                let mut positional = vec![];
587                for value in args.positional {
588                    positional.push(Value::ExtensionArgument(value));
589                }
590                let mut named = vec![];
591                for (key, value) in args.named {
592                    named.push(NamedArg {
593                        name: Cow::Owned(key),
594                        value: Value::ExtensionArgument(value),
595                    });
596                }
597                let mut columns = vec![];
598                for col in args.output_columns {
599                    columns.push(Value::ExtColumn(col));
600                }
601                Relation {
602                    name: Cow::Owned(format!("{}:{}", ext_type, name)),
603                    arguments: Some(Arguments { positional, named }),
604                    columns,
605                    emit: None,
606                    // Extension relations use `detail` rather than
607                    // `advanced_extension`; the field does not exist on these
608                    // proto types.
609                    addenda: AddendumLines::none(),
610                    children,
611                }
612            }
613            Err(error) => {
614                let (children, _) = Relation::convert_children(child_refs, ctx);
615                Relation {
616                    name: Cow::Borrowed(ext_type),
617                    arguments: None,
618                    columns: vec![Value::Missing(PlanError::invalid(
619                        "extension",
620                        None::<&str>,
621                        error.to_string(),
622                    ))],
623                    emit: None,
624                    addenda: AddendumLines::none(),
625                    children,
626                }
627            }
628        }
629    }
630
631    /// Convert an AggregateRel to a Relation for textification.
632    ///
633    /// The conversion follows this logic:
634    /// 1. Arguments: Group-by expressions (as Value::Expression)
635    /// 2. Columns: All possible outputs in order:
636    ///    - First: Group-by field references (Value::Reference)
637    ///    - Then: Aggregate function measures (Value::AggregateFunction)
638    /// 3. Emit: Uses the relation's emit mapping to select which outputs to display
639    /// 4. Children: The input relation
640    fn from_aggregate<S: Scope>(rel: &'a AggregateRel, ctx: &S) -> Self {
641        let mut grouping_sets: Vec<Vec<Value>> = vec![]; // the Groupings in the Aggregate
642        let expression_list: Vec<Value>; // grouping_expressions defined on Aggregate
643
644        // if rel.grouping_expressions is empty, the deprecated rel.groupings.grouping_expressions might be set
645        // If *both* the deprecated `rel.groupings.grouping_expressions` and `rel.grouping_expressions` are
646        // set, then we silently ignore the deprecated one.
647        #[allow(deprecated)]
648        if rel.grouping_expressions.is_empty()
649            && !rel.groupings.is_empty()
650            && !rel.groupings[0].grouping_expressions.is_empty()
651        {
652            (expression_list, grouping_sets) = Relation::get_grouping_sets(rel);
653        } else {
654            expression_list = rel
655                .grouping_expressions
656                .iter()
657                .map(Value::Expression)
658                .collect::<Vec<_>>(); // already a list of the unique expressions
659            for group in &rel.groupings {
660                let mut grouping_set: Vec<Value> = vec![];
661                for i in &group.expression_references {
662                    grouping_set.push(Value::Reference(*i as i32));
663                }
664                grouping_sets.push(grouping_set);
665            }
666            // no defined groupings means there is global group by
667            if rel.groupings.is_empty() {
668                grouping_sets.push(vec![]);
669            }
670        }
671
672        let is_single = grouping_sets.len() == 1;
673        let mut positional: Vec<Value> = vec![];
674        for g in grouping_sets {
675            if g.is_empty() {
676                positional.push(Value::EmptyGroup);
677            } else if is_single {
678                // Single non-empty grouping set: spread expressions directly without parens
679                positional.extend(g);
680            } else {
681                positional.push(Value::Tuple(g));
682            }
683        }
684
685        // adding the grouping_sets as a list of Arguments to Aggregate Rel
686        let arguments = Some(Arguments {
687            positional,
688            named: vec![],
689        });
690
691        // The columns are the direct outputs of this relation (before emit)
692        let mut all_outputs: Vec<Value> = expression_list;
693
694        // Then, add all measures (aggregate functions)
695        // These are indexed after the group-by fields
696        for m in &rel.measures {
697            if let Some(agg_fn) = m.measure.as_ref() {
698                all_outputs.push(Value::AggregateFunction(agg_fn));
699            }
700        }
701        let emit = get_emit(rel.common.as_ref());
702        let (children, _) = Relation::convert_children(vec![rel.input.as_deref()], ctx);
703
704        Relation {
705            name: Cow::Borrowed("Aggregate"),
706            arguments,
707            columns: all_outputs,
708            emit,
709            addenda: AddendumLines::from_advanced_extension(ctx, rel.advanced_extension.as_ref()),
710            children,
711        }
712    }
713
714    fn get_grouping_sets(rel: &'a AggregateRel) -> (Vec<Value<'a>>, Vec<Vec<Value<'a>>>) {
715        let mut grouping_sets: Vec<Vec<Value>> = vec![];
716        let mut expression_list: Vec<Value> = Vec::new();
717
718        // groupings might have the same expressions in their set so we use a map to get unique expressions
719        let mut expression_index_map = HashMap::new();
720        let mut i: i32 = 0; // index for the unique expression in the grouping_expressions list
721
722        for group in &rel.groupings {
723            let mut grouping_set: Vec<Value> = vec![];
724            #[allow(deprecated)]
725            for exp in &group.grouping_expressions {
726                // TODO: use a better key here than encoding to bytes.
727                // Ideally, substrait-rs would support `PartialEq` and `Hash`,
728                // but as there isn't an easy way to do that now, we'll skip.
729                let key = exp.encode_to_vec();
730                expression_index_map.entry(key.clone()).or_insert_with(|| {
731                    let value = Value::Expression(exp);
732                    expression_list.push(value); // new unique expression found
733                    // mapping the byte encoded expression to its index in the group_expression list
734                    let index = i;
735                    i += 1;
736                    index // is expression returned by this closure and inserted into map
737                });
738                grouping_set.push(Value::Reference(expression_index_map[&key]));
739            }
740            grouping_sets.push(grouping_set);
741        }
742        (expression_list, grouping_sets)
743    }
744}
745
746impl Textify for RelRoot {
747    fn name() -> &'static str {
748        "RelRoot"
749    }
750
751    fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
752        let names = self.names.iter().map(|n| Name(n)).collect::<Vec<_>>();
753
754        write!(
755            w,
756            "{}Root[{}]",
757            ctx.indent(),
758            ctx.separated(names.iter(), ", ")
759        )?;
760        let child_scope = ctx.push_indent();
761        for child in self.input.iter() {
762            writeln!(w)?;
763            child.textify(&child_scope, w)?;
764        }
765
766        Ok(())
767    }
768}
769
770impl Textify for PlanRelType {
771    fn name() -> &'static str {
772        "PlanRelType"
773    }
774
775    fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
776        match self {
777            PlanRelType::Rel(rel) => rel.textify(ctx, w),
778            PlanRelType::Root(root) => root.textify(ctx, w),
779        }
780    }
781}
782
783impl Textify for PlanRel {
784    fn name() -> &'static str {
785        "PlanRel"
786    }
787
788    /// Write the relation as a string. Inputs are ignored - those are handled
789    /// separately.
790    fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
791        write!(w, "{}", ctx.expect(self.rel_type.as_ref()))
792    }
793}
794
795impl<'a> Relation<'a> {
796    fn from_sort<S: Scope>(rel: &'a SortRel, ctx: &S) -> Self {
797        let (children, input_columns) = Relation::convert_children(vec![rel.input.as_deref()], ctx);
798        let mut positional = vec![];
799        for sort_field in &rel.sorts {
800            positional.push(Value::from(sort_field));
801        }
802        let arguments = Some(Arguments {
803            positional,
804            named: vec![],
805        });
806        // The columns are the direct outputs of this relation (before emit)
807        let mut col_values = vec![];
808        for i in 0..input_columns {
809            col_values.push(Value::Reference(i as i32));
810        }
811        let emit = get_emit(rel.common.as_ref());
812        Relation {
813            name: Cow::Borrowed("Sort"),
814            arguments,
815            columns: col_values,
816            emit,
817            addenda: AddendumLines::from_advanced_extension(ctx, rel.advanced_extension.as_ref()),
818            children,
819        }
820    }
821
822    fn from_fetch<S: Scope>(rel: &'a FetchRel, ctx: &S) -> Self {
823        let (children, input_columns) = Relation::convert_children(vec![rel.input.as_deref()], ctx);
824        let mut named_args: Vec<NamedArg> = vec![];
825        match &rel.count_mode {
826            Some(CountMode::CountExpr(expr)) => {
827                named_args.push(NamedArg {
828                    name: Cow::Borrowed("limit"),
829                    value: Value::Expression(expr),
830                });
831            }
832            #[allow(deprecated)]
833            Some(CountMode::Count(val)) => {
834                named_args.push(NamedArg {
835                    name: Cow::Borrowed("limit"),
836                    value: Value::Integer(*val),
837                });
838            }
839            None => {}
840        }
841        if let Some(offset) = &rel.offset_mode {
842            match offset {
843                substrait::proto::fetch_rel::OffsetMode::OffsetExpr(expr) => {
844                    named_args.push(NamedArg {
845                        name: Cow::Borrowed("offset"),
846                        value: Value::Expression(expr),
847                    });
848                }
849                #[allow(deprecated)]
850                substrait::proto::fetch_rel::OffsetMode::Offset(val) => {
851                    named_args.push(NamedArg {
852                        name: Cow::Borrowed("offset"),
853                        value: Value::Integer(*val),
854                    });
855                }
856            }
857        }
858
859        let emit = get_emit(rel.common.as_ref());
860        // Fetch is passthrough — direct output is all input columns.
861        let columns: Vec<Value> = (0..input_columns)
862            .map(|i| Value::Reference(i as i32))
863            .collect();
864        Relation {
865            name: Cow::Borrowed("Fetch"),
866            arguments: Some(Arguments {
867                positional: vec![],
868                named: named_args,
869            }),
870            columns,
871            emit,
872            addenda: AddendumLines::from_advanced_extension(ctx, rel.advanced_extension.as_ref()),
873            children,
874        }
875    }
876}
877
878fn join_output_columns(
879    join_type: join_rel::JoinType,
880    left_columns: usize,
881    right_columns: usize,
882) -> Vec<Value<'static>> {
883    let total_columns = match join_type {
884        // Inner, Left, Right, Outer joins output columns from both sides
885        join_rel::JoinType::Inner
886        | join_rel::JoinType::Left
887        | join_rel::JoinType::Right
888        | join_rel::JoinType::Outer => left_columns + right_columns,
889
890        // Left semi/anti joins only output columns from the left side
891        join_rel::JoinType::LeftSemi | join_rel::JoinType::LeftAnti => left_columns,
892
893        // Right semi/anti joins output columns from the right side
894        join_rel::JoinType::RightSemi | join_rel::JoinType::RightAnti => right_columns,
895
896        // Single joins behave like semi joins
897        join_rel::JoinType::LeftSingle => left_columns,
898        join_rel::JoinType::RightSingle => right_columns,
899
900        // Mark joins output base columns plus one mark column
901        join_rel::JoinType::LeftMark => left_columns + 1,
902        join_rel::JoinType::RightMark => right_columns + 1,
903
904        // Unspecified - fallback to all columns
905        join_rel::JoinType::Unspecified => left_columns + right_columns,
906    };
907
908    // Output is always a contiguous range starting from $0
909    (0..total_columns)
910        .map(|i| Value::Reference(i as i32))
911        .collect()
912}
913
914impl<'a> Relation<'a> {
915    fn from_join<S: Scope>(rel: &'a JoinRel, ctx: &S) -> Self {
916        let (children, _total_columns) =
917            Relation::convert_children(vec![rel.left.as_deref(), rel.right.as_deref()], ctx);
918
919        // convert_children should preserve input vector length
920        assert_eq!(
921            children.len(),
922            2,
923            "convert_children should return same number of elements as input"
924        );
925
926        // Calculate left and right column counts separately
927        let left_columns = match &children[0] {
928            Some(child) => child.emitted(),
929            None => 0,
930        };
931        let right_columns = match &children[1] {
932            Some(child) => child.emitted(),
933            None => 0,
934        };
935
936        // Convert join type from protobuf i32 to enum value
937        // JoinType is stored as i32 in protobuf, convert to typed enum for processing
938        let (join_type, join_type_value) = match join_rel::JoinType::try_from(rel.r#type) {
939            Ok(join_type) => {
940                let join_type_value = match join_type.as_enum_str() {
941                    Ok(s) => Value::Enum(s),
942                    Err(e) => Value::Missing(e),
943                };
944                (join_type, join_type_value)
945            }
946            Err(_) => {
947                // Use Unspecified for the join_type but create an error for the join_type_value
948                let join_type_error = Value::Missing(PlanError::invalid(
949                    "JoinRel",
950                    Some("type"),
951                    format!("Unknown join type: {}", rel.r#type),
952                ));
953                (join_rel::JoinType::Unspecified, join_type_error)
954            }
955        };
956
957        // Join condition
958        let condition = rel
959            .expression
960            .as_ref()
961            .map(|c| Value::Expression(c.as_ref()));
962        let condition = Value::expect(condition, || {
963            PlanError::unimplemented("JoinRel", Some("expression"), "Join condition is None")
964        });
965
966        // TODO: Add support for post_join_filter when grammar is extended
967        // Currently post_join_filter is not supported in the text format
968        // grammar
969        let positional = vec![join_type_value, condition];
970        let arguments = Some(Arguments {
971            positional,
972            named: vec![],
973        });
974
975        let emit = get_emit(rel.common.as_ref());
976        let columns = join_output_columns(join_type, left_columns, right_columns);
977
978        Relation {
979            name: Cow::Borrowed("Join"),
980            arguments,
981            columns,
982            emit,
983            addenda: AddendumLines::from_advanced_extension(ctx, rel.advanced_extension.as_ref()),
984            children,
985        }
986    }
987}
988
989impl<'a> From<&'a SortField> for Value<'a> {
990    fn from(sf: &'a SortField) -> Self {
991        let field = match &sf.expr {
992            Some(expr) => match &expr.rex_type {
993                Some(substrait::proto::expression::RexType::Selection(fref)) => {
994                    if let Some(substrait::proto::expression::field_reference::ReferenceType::DirectReference(seg)) = &fref.reference_type {
995                        if let Some(substrait::proto::expression::reference_segment::ReferenceType::StructField(sf)) = &seg.reference_type {
996                            Value::Reference(sf.field)
997                        } else { Value::Missing(PlanError::unimplemented("SortField", Some("expr"), "Not a struct field")) }
998                    } else { Value::Missing(PlanError::unimplemented("SortField", Some("expr"), "Not a direct reference")) }
999                }
1000                _ => Value::Missing(PlanError::unimplemented(
1001                    "SortField",
1002                    Some("expr"),
1003                    "Not a selection",
1004                )),
1005            },
1006            None => Value::Missing(PlanError::unimplemented(
1007                "SortField",
1008                Some("expr"),
1009                "Missing expr",
1010            )),
1011        };
1012        let direction = match &sf.sort_kind {
1013            Some(kind) => Value::from(kind),
1014            None => Value::Missing(PlanError::invalid(
1015                "SortKind",
1016                Some(Cow::Borrowed("sort_kind")),
1017                "Missing sort_kind",
1018            )),
1019        };
1020        Value::Tuple(vec![field, direction])
1021    }
1022}
1023
1024impl<'a, T: ValueEnum + ?Sized> From<&'a T> for Value<'a> {
1025    fn from(enum_val: &'a T) -> Self {
1026        match enum_val.as_enum_str() {
1027            Ok(s) => Value::Enum(s),
1028            Err(e) => Value::Missing(e),
1029        }
1030    }
1031}
1032
1033impl ValueEnum for SortKind {
1034    fn as_enum_str(&self) -> Result<Cow<'static, str>, PlanError> {
1035        let d = match self {
1036            &SortKind::Direction(d) => SortDirection::try_from(d),
1037            SortKind::ComparisonFunctionReference(f) => {
1038                return Err(PlanError::invalid(
1039                    "SortKind",
1040                    Some(Cow::Owned(format!("function reference{f}"))),
1041                    "SortKind::ComparisonFunctionReference unimplemented",
1042                ));
1043            }
1044        };
1045        let s = match d {
1046            Err(UnknownEnumValue(d)) => {
1047                return Err(PlanError::invalid(
1048                    "SortKind",
1049                    Some(Cow::Owned(format!("unknown variant: {d:?}"))),
1050                    "Unknown SortDirection",
1051                ));
1052            }
1053            Ok(SortDirection::AscNullsFirst) => "AscNullsFirst",
1054            Ok(SortDirection::AscNullsLast) => "AscNullsLast",
1055            Ok(SortDirection::DescNullsFirst) => "DescNullsFirst",
1056            Ok(SortDirection::DescNullsLast) => "DescNullsLast",
1057            Ok(SortDirection::Clustered) => "Clustered",
1058            Ok(SortDirection::Unspecified) => {
1059                return Err(PlanError::invalid(
1060                    "SortKind",
1061                    Option::<Cow<str>>::None,
1062                    "Unspecified SortDirection",
1063                ));
1064            }
1065        };
1066        Ok(Cow::Borrowed(s))
1067    }
1068}
1069
1070impl ValueEnum for join_rel::JoinType {
1071    fn as_enum_str(&self) -> Result<Cow<'static, str>, PlanError> {
1072        let s = match self {
1073            join_rel::JoinType::Unspecified => {
1074                return Err(PlanError::invalid(
1075                    "JoinType",
1076                    Option::<Cow<str>>::None,
1077                    "Unspecified JoinType",
1078                ));
1079            }
1080            join_rel::JoinType::Inner => "Inner",
1081            join_rel::JoinType::Outer => "Outer",
1082            join_rel::JoinType::Left => "Left",
1083            join_rel::JoinType::Right => "Right",
1084            join_rel::JoinType::LeftSemi => "LeftSemi",
1085            join_rel::JoinType::RightSemi => "RightSemi",
1086            join_rel::JoinType::LeftAnti => "LeftAnti",
1087            join_rel::JoinType::RightAnti => "RightAnti",
1088            join_rel::JoinType::LeftSingle => "LeftSingle",
1089            join_rel::JoinType::RightSingle => "RightSingle",
1090            join_rel::JoinType::LeftMark => "LeftMark",
1091            join_rel::JoinType::RightMark => "RightMark",
1092        };
1093        Ok(Cow::Borrowed(s))
1094    }
1095}
1096
1097impl<'a> Textify for NamedArg<'a> {
1098    fn name() -> &'static str {
1099        "NamedArg"
1100    }
1101    fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
1102        write!(w, "{}=", self.name)?;
1103        self.value.textify(ctx, w)
1104    }
1105}
1106
1107#[cfg(test)]
1108mod tests {
1109    use substrait::proto::aggregate_rel::Grouping;
1110    use substrait::proto::expression::literal::LiteralType;
1111    use substrait::proto::expression::{Literal, RexType, ScalarFunction};
1112    use substrait::proto::function_argument::ArgType;
1113    use substrait::proto::read_rel::{NamedTable, ReadType};
1114    use substrait::proto::rel_common::{Direct, Emit};
1115    use substrait::proto::r#type::{self as ptype, Boolean, I64, Kind, Nullability, Struct};
1116    use substrait::proto::{
1117        Expression, FunctionArgument, NamedStruct, ReadRel, Type, aggregate_rel,
1118    };
1119
1120    use super::*;
1121    use crate::fixtures::TestContext;
1122    use crate::parser::expressions::FieldIndex;
1123
1124    #[test]
1125    fn test_read_rel() {
1126        let ctx = TestContext::new();
1127
1128        // Create a simple ReadRel with a NamedStruct schema
1129        let read_rel = ReadRel {
1130            common: None,
1131            base_schema: Some(NamedStruct {
1132                names: vec!["col1".into(), "column 2".into()],
1133                r#struct: Some(Struct {
1134                    type_variation_reference: 0,
1135                    types: vec![
1136                        Type {
1137                            kind: Some(Kind::I32(ptype::I32 {
1138                                type_variation_reference: 0,
1139                                nullability: Nullability::Nullable as i32,
1140                            })),
1141                        },
1142                        Type {
1143                            kind: Some(Kind::String(ptype::String {
1144                                type_variation_reference: 0,
1145                                nullability: Nullability::Nullable as i32,
1146                            })),
1147                        },
1148                    ],
1149                    nullability: Nullability::Nullable as i32,
1150                }),
1151            }),
1152            filter: None,
1153            best_effort_filter: None,
1154            projection: None,
1155            advanced_extension: None,
1156            read_type: Some(ReadType::NamedTable(NamedTable {
1157                names: vec!["some_db".into(), "test_table".into()],
1158                advanced_extension: None,
1159            })),
1160        };
1161
1162        let rel = Rel {
1163            rel_type: Some(RelType::Read(Box::new(read_rel))),
1164        };
1165        let (result, errors) = ctx.textify(&rel);
1166        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1167        assert_eq!(
1168            result,
1169            "Read[some_db.test_table => col1:i32?, \"column 2\":string?]"
1170        );
1171    }
1172
1173    #[test]
1174    fn test_filter_rel() {
1175        let ctx = TestContext::new()
1176            .with_urn(1, "test_urn")
1177            .with_function(1, 10, "gt");
1178
1179        // Create a simple FilterRel with a ReadRel input and a filter expression
1180        let read_rel = ReadRel {
1181            common: None,
1182            base_schema: Some(NamedStruct {
1183                names: vec!["col1".into(), "col2".into()],
1184                r#struct: Some(Struct {
1185                    type_variation_reference: 0,
1186                    types: vec![
1187                        Type {
1188                            kind: Some(Kind::I32(ptype::I32 {
1189                                type_variation_reference: 0,
1190                                nullability: Nullability::Nullable as i32,
1191                            })),
1192                        },
1193                        Type {
1194                            kind: Some(Kind::I32(ptype::I32 {
1195                                type_variation_reference: 0,
1196                                nullability: Nullability::Nullable as i32,
1197                            })),
1198                        },
1199                    ],
1200                    nullability: Nullability::Nullable as i32,
1201                }),
1202            }),
1203            filter: None,
1204            best_effort_filter: None,
1205            projection: None,
1206            advanced_extension: None,
1207            read_type: Some(ReadType::NamedTable(NamedTable {
1208                names: vec!["test_table".into()],
1209                advanced_extension: None,
1210            })),
1211        };
1212
1213        // Create a filter expression: col1 > 10
1214        let filter_expr = Expression {
1215            rex_type: Some(RexType::ScalarFunction(ScalarFunction {
1216                function_reference: 10, // gt function
1217                arguments: vec![
1218                    FunctionArgument {
1219                        arg_type: Some(ArgType::Value(Reference(0).into())),
1220                    },
1221                    FunctionArgument {
1222                        arg_type: Some(ArgType::Value(Expression {
1223                            rex_type: Some(RexType::Literal(Literal {
1224                                literal_type: Some(LiteralType::I32(10)),
1225                                nullable: false,
1226                                type_variation_reference: 0,
1227                            })),
1228                        })),
1229                    },
1230                ],
1231                options: vec![],
1232                output_type: Some(Type {
1233                    kind: Some(Kind::Bool(Boolean {
1234                        nullability: Nullability::Required as i32,
1235                        type_variation_reference: 0,
1236                    })),
1237                }),
1238                #[allow(deprecated)]
1239                args: vec![],
1240            })),
1241        };
1242
1243        let filter_rel = FilterRel {
1244            common: None,
1245            input: Some(Box::new(Rel {
1246                rel_type: Some(RelType::Read(Box::new(read_rel))),
1247            })),
1248            condition: Some(Box::new(filter_expr)),
1249            advanced_extension: None,
1250        };
1251
1252        let rel = Rel {
1253            rel_type: Some(RelType::Filter(Box::new(filter_rel))),
1254        };
1255
1256        let (result, errors) = ctx.textify(&rel);
1257        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1258        let expected = r#"
1259Filter[gt($0, 10:i32):boolean => $0, $1]
1260  Read[test_table => col1:i32?, col2:i32?]"#
1261            .trim_start();
1262        assert_eq!(result, expected);
1263    }
1264
1265    #[test]
1266    fn test_aggregate_function_textify() {
1267        let ctx = TestContext::new()
1268        .with_urn(1, "https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate.yaml")
1269        .with_function(1, 10, "sum")
1270        .with_function(1, 11, "count");
1271
1272        // Create a simple AggregateFunction
1273        let agg_fn = get_aggregate_func(10, 1);
1274
1275        let value = Value::AggregateFunction(&agg_fn);
1276        let (result, errors) = ctx.textify(&value);
1277
1278        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1279        assert_eq!(result, "sum($1):i64");
1280    }
1281
1282    #[test]
1283    fn test_aggregate_relation_textify() {
1284        let ctx = TestContext::new()
1285        .with_urn(1, "https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate.yaml")
1286        .with_function(1, 10, "sum")
1287        .with_function(1, 11, "count");
1288
1289        // Create a simple AggregateRel
1290        let agg_fn1 = get_aggregate_func(10, 1);
1291        let agg_fn2 = get_aggregate_func(11, 1);
1292
1293        let grouping_expressions = vec![Expression {
1294            rex_type: Some(RexType::Selection(Box::new(
1295                FieldIndex(0).to_field_reference(),
1296            ))),
1297        }];
1298
1299        let measures = vec![
1300            aggregate_rel::Measure {
1301                measure: Some(agg_fn1),
1302                filter: None,
1303            },
1304            aggregate_rel::Measure {
1305                measure: Some(agg_fn2),
1306                filter: None,
1307            },
1308        ];
1309
1310        let common = Some(RelCommon {
1311            emit_kind: Some(EmitKind::Emit(Emit {
1312                output_mapping: vec![1, 2], // measures only
1313            })),
1314            ..Default::default()
1315        });
1316
1317        let aggregate_rel = create_aggregate_rel(grouping_expressions, vec![], measures, common);
1318
1319        let rel = Rel {
1320            rel_type: Some(RelType::Aggregate(Box::new(aggregate_rel))),
1321        };
1322        let (result, errors) = ctx.textify(&rel);
1323
1324        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1325        // Expected: Aggregate[_ => sum($1):i64, count($1):i64] we chose to emit only measures
1326        assert!(result.contains("Aggregate[_ => sum($1):i64, count($1):i64]"));
1327    }
1328
1329    #[test]
1330    fn test_multiple_groupings_on_aggregate_deprecated() {
1331        // Protobuf plan that uses AggregateRel.groupings with deprecated
1332        // grouping_expressions, leaving AggregateRel.grouping_expressions empty.
1333        let ctx = TestContext::new()
1334        .with_urn(1, "https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate.yaml")
1335        .with_function(1, 11, "count");
1336
1337        let grouping_expr_0 = create_exp(0);
1338        let grouping_expr_1 = create_exp(1);
1339
1340        let grouping_sets = vec![
1341            aggregate_rel::Grouping {
1342                #[allow(deprecated)]
1343                grouping_expressions: vec![grouping_expr_0.clone()],
1344                expression_references: vec![],
1345            },
1346            aggregate_rel::Grouping {
1347                #[allow(deprecated)]
1348                grouping_expressions: vec![grouping_expr_0.clone(), grouping_expr_1.clone()],
1349                expression_references: vec![],
1350            },
1351        ];
1352
1353        let aggregate_rel = create_aggregate_rel(vec![], grouping_sets, vec![], None);
1354
1355        let rel = Rel {
1356            rel_type: Some(RelType::Aggregate(Box::new(aggregate_rel))),
1357        };
1358        let (result, errors) = ctx.textify(&rel);
1359
1360        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1361        assert!(result.contains("Aggregate[($0), ($0, $1) => $0, $1]"));
1362    }
1363
1364    #[test]
1365    fn test_multiple_groupings_with_measure_deprecated() {
1366        // Protobuf plan that uses AggregateRel.groupings with deprecated
1367        // grouping_expressions, leaving AggregateRel.grouping_expressions empty.
1368        let ctx = TestContext::new()
1369        .with_urn(1, "https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate.yaml")
1370        .with_function(1, 11, "count");
1371
1372        let agg_fn1 = get_aggregate_func(11, 2);
1373
1374        let grouping_expr_0 = create_exp(0);
1375        let grouping_expr_1 = create_exp(1);
1376
1377        let grouping_sets = vec![
1378            aggregate_rel::Grouping {
1379                #[allow(deprecated)]
1380                grouping_expressions: vec![grouping_expr_0.clone()],
1381                expression_references: vec![],
1382            },
1383            aggregate_rel::Grouping {
1384                #[allow(deprecated)]
1385                grouping_expressions: vec![grouping_expr_0.clone(), grouping_expr_1.clone()],
1386                expression_references: vec![],
1387            },
1388        ];
1389
1390        let measures = vec![aggregate_rel::Measure {
1391            measure: Some(agg_fn1),
1392            filter: None,
1393        }];
1394
1395        let aggregate_rel = create_aggregate_rel(vec![], grouping_sets, measures, None);
1396
1397        let rel = Rel {
1398            rel_type: Some(RelType::Aggregate(Box::new(aggregate_rel))),
1399        };
1400        let (result, errors) = ctx.textify(&rel);
1401
1402        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1403        assert!(result.contains("($0), ($0, $1) => $0, $1, count($2):i64"));
1404    }
1405
1406    #[test]
1407    fn test_multiple_groupings_on_aggregate() {
1408        let ctx = TestContext::new()
1409        .with_urn(1, "https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate.yaml")
1410        .with_function(1, 11, "count");
1411
1412        let agg_fn2 = get_aggregate_func(11, 2);
1413
1414        let grouping_expressions = vec![
1415            Expression {
1416                rex_type: Some(RexType::Selection(Box::new(
1417                    FieldIndex(0).to_field_reference(),
1418                ))),
1419            },
1420            Expression {
1421                rex_type: Some(RexType::Selection(Box::new(
1422                    FieldIndex(1).to_field_reference(),
1423                ))),
1424            },
1425        ];
1426
1427        let grouping_sets = vec![
1428            Grouping {
1429                #[allow(deprecated)]
1430                grouping_expressions: vec![],
1431                expression_references: vec![0, 1],
1432            },
1433            Grouping {
1434                #[allow(deprecated)]
1435                grouping_expressions: vec![],
1436                expression_references: vec![0, 1],
1437            },
1438            Grouping {
1439                #[allow(deprecated)]
1440                grouping_expressions: vec![],
1441                expression_references: vec![1],
1442            },
1443            Grouping {
1444                #[allow(deprecated)]
1445                grouping_expressions: vec![],
1446                expression_references: vec![1, 1],
1447            },
1448            Grouping {
1449                #[allow(deprecated)]
1450                grouping_expressions: vec![],
1451                expression_references: vec![],
1452            },
1453        ];
1454
1455        let measures = vec![aggregate_rel::Measure {
1456            measure: Some(agg_fn2),
1457            filter: None,
1458        }];
1459
1460        let aggregate_rel =
1461            create_aggregate_rel(grouping_expressions, grouping_sets, measures, None);
1462
1463        let rel = Rel {
1464            rel_type: Some(RelType::Aggregate(Box::new(aggregate_rel))),
1465        };
1466        let (result, errors) = ctx.textify(&rel);
1467
1468        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1469        assert!(
1470            result.contains(
1471                "Aggregate[($0, $1), ($0, $1), ($1), ($1, $1), _ => $0, $1, count($2):i64]"
1472            )
1473        );
1474    }
1475
1476    #[test]
1477    fn test_arguments_textify_positional_only() {
1478        let ctx = TestContext::new();
1479        let args = Arguments {
1480            positional: vec![Value::Integer(42), Value::Integer(7)],
1481            named: vec![],
1482        };
1483        let (result, errors) = ctx.textify(&args);
1484        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1485        assert_eq!(result, "42, 7");
1486    }
1487
1488    #[test]
1489    fn test_arguments_textify_named_only() {
1490        let ctx = TestContext::new();
1491        let args = Arguments {
1492            positional: vec![],
1493            named: vec![
1494                NamedArg {
1495                    name: Cow::Borrowed("limit"),
1496                    value: Value::Integer(10),
1497                },
1498                NamedArg {
1499                    name: Cow::Borrowed("offset"),
1500                    value: Value::Integer(5),
1501                },
1502            ],
1503        };
1504        let (result, errors) = ctx.textify(&args);
1505        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1506        assert_eq!(result, "limit=10, offset=5");
1507    }
1508
1509    #[test]
1510    fn test_join_relation_unknown_type() {
1511        let ctx = TestContext::new();
1512
1513        // Create a join with an unknown/invalid type
1514        let join_rel = JoinRel {
1515            left: Some(Box::new(Rel {
1516                rel_type: Some(RelType::Read(Box::default())),
1517            })),
1518            right: Some(Box::new(Rel {
1519                rel_type: Some(RelType::Read(Box::default())),
1520            })),
1521            expression: Some(Box::new(Expression::default())),
1522            r#type: 999, // Invalid join type
1523            common: None,
1524            post_join_filter: None,
1525            advanced_extension: None,
1526        };
1527
1528        let rel = Rel {
1529            rel_type: Some(RelType::Join(Box::new(join_rel))),
1530        };
1531        let (result, errors) = ctx.textify(&rel);
1532
1533        // Should contain error for unknown join type but still show condition and columns
1534        assert!(!errors.is_empty(), "Expected errors for unknown join type");
1535        assert!(
1536            result.contains("!{JoinRel}"),
1537            "Expected error token for unknown join type"
1538        );
1539        assert!(
1540            result.contains("Join["),
1541            "Expected Join relation to be formatted"
1542        );
1543    }
1544
1545    #[test]
1546    fn test_arguments_textify_both() {
1547        let ctx = TestContext::new();
1548        let args = Arguments {
1549            positional: vec![Value::Integer(1)],
1550            named: vec![NamedArg {
1551                name: "foo".into(),
1552                value: Value::Integer(2),
1553            }],
1554        };
1555        let (result, errors) = ctx.textify(&args);
1556        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1557        assert_eq!(result, "1, foo=2");
1558    }
1559
1560    #[test]
1561    fn test_arguments_textify_empty() {
1562        let ctx = TestContext::new();
1563        let args = Arguments {
1564            positional: vec![],
1565            named: vec![],
1566        };
1567        let (result, errors) = ctx.textify(&args);
1568        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1569        assert_eq!(result, "_");
1570    }
1571
1572    #[test]
1573    fn test_named_arg_textify_error_token() {
1574        let ctx = TestContext::new();
1575        let named_arg = NamedArg {
1576            name: "foo".into(),
1577            value: Value::Missing(PlanError::invalid(
1578                "my_enum",
1579                Some(Cow::Borrowed("my_enum")),
1580                Cow::Borrowed("my_enum"),
1581            )),
1582        };
1583        let (result, errors) = ctx.textify(&named_arg);
1584        // Should show !{my_enum} in the output
1585        assert!(result.contains("foo=!{my_enum}"), "Output: {result}");
1586        // Should also accumulate an error
1587        assert!(!errors.is_empty(), "Expected error for error token");
1588    }
1589
1590    #[test]
1591    fn test_join_type_enum_textify() {
1592        // Test that JoinType enum values convert correctly to their string representation
1593        assert_eq!(join_rel::JoinType::Inner.as_enum_str().unwrap(), "Inner");
1594        assert_eq!(join_rel::JoinType::Left.as_enum_str().unwrap(), "Left");
1595        assert_eq!(
1596            join_rel::JoinType::LeftSemi.as_enum_str().unwrap(),
1597            "LeftSemi"
1598        );
1599        assert_eq!(
1600            join_rel::JoinType::LeftAnti.as_enum_str().unwrap(),
1601            "LeftAnti"
1602        );
1603    }
1604
1605    #[test]
1606    fn test_join_output_columns() {
1607        // Test Inner join - outputs all columns from both sides
1608        let inner_cols = super::join_output_columns(join_rel::JoinType::Inner, 2, 3);
1609        assert_eq!(inner_cols.len(), 5); // 2 + 3 = 5 columns
1610        assert!(matches!(inner_cols[0], Value::Reference(0)));
1611        assert!(matches!(inner_cols[4], Value::Reference(4)));
1612
1613        // Test LeftSemi join - outputs only left columns
1614        let left_semi_cols = super::join_output_columns(join_rel::JoinType::LeftSemi, 2, 3);
1615        assert_eq!(left_semi_cols.len(), 2); // Only left columns
1616        assert!(matches!(left_semi_cols[0], Value::Reference(0)));
1617        assert!(matches!(left_semi_cols[1], Value::Reference(1)));
1618
1619        // Test RightSemi join - outputs right columns as contiguous range starting from $0
1620        let right_semi_cols = super::join_output_columns(join_rel::JoinType::RightSemi, 2, 3);
1621        assert_eq!(right_semi_cols.len(), 3); // Only right columns
1622        assert!(matches!(right_semi_cols[0], Value::Reference(0))); // Contiguous range starts at $0
1623        assert!(matches!(right_semi_cols[1], Value::Reference(1)));
1624        assert!(matches!(right_semi_cols[2], Value::Reference(2))); // Last right column
1625
1626        // Test LeftMark join - outputs left columns plus a mark column as contiguous range
1627        let left_mark_cols = super::join_output_columns(join_rel::JoinType::LeftMark, 2, 3);
1628        assert_eq!(left_mark_cols.len(), 3); // 2 left + 1 mark
1629        assert!(matches!(left_mark_cols[0], Value::Reference(0)));
1630        assert!(matches!(left_mark_cols[1], Value::Reference(1)));
1631        assert!(matches!(left_mark_cols[2], Value::Reference(2))); // Mark column at contiguous position
1632
1633        // Test RightMark join - outputs right columns plus a mark column as contiguous range
1634        let right_mark_cols = super::join_output_columns(join_rel::JoinType::RightMark, 2, 3);
1635        assert_eq!(right_mark_cols.len(), 4); // 3 right + 1 mark
1636        assert!(matches!(right_mark_cols[0], Value::Reference(0))); // Contiguous range starts at $0
1637        assert!(matches!(right_mark_cols[1], Value::Reference(1)));
1638        assert!(matches!(right_mark_cols[2], Value::Reference(2))); // Last right column
1639        assert!(matches!(right_mark_cols[3], Value::Reference(3))); // Mark column at contiguous position
1640    }
1641
1642    fn get_aggregate_func(func_ref: u32, column_ind: i32) -> AggregateFunction {
1643        AggregateFunction {
1644            function_reference: func_ref,
1645            arguments: vec![FunctionArgument {
1646                arg_type: Some(ArgType::Value(Expression {
1647                    rex_type: Some(RexType::Selection(Box::new(
1648                        FieldIndex(column_ind).to_field_reference(),
1649                    ))),
1650                })),
1651            }],
1652            options: vec![],
1653            output_type: Some(Type {
1654                kind: Some(Kind::I64(I64 {
1655                    nullability: Nullability::Required as i32,
1656                    type_variation_reference: 0,
1657                })),
1658            }),
1659            invocation: 0,
1660            phase: 0,
1661            sorts: vec![],
1662            #[allow(deprecated)]
1663            args: vec![],
1664        }
1665    }
1666
1667    fn create_aggregate_rel(
1668        grouping_expressions: Vec<Expression>,
1669        grouping_sets: Vec<Grouping>,
1670        measures: Vec<aggregate_rel::Measure>,
1671        common: Option<RelCommon>,
1672    ) -> AggregateRel {
1673        let common = common.or_else(|| {
1674            Some(RelCommon {
1675                emit_kind: Some(EmitKind::Direct(Direct {})),
1676                ..Default::default()
1677            })
1678        });
1679        AggregateRel {
1680            input: Some(Box::new(Rel {
1681                rel_type: Some(RelType::Read(Box::new(ReadRel {
1682                    common: None,
1683                    base_schema: Some(get_basic_schema()),
1684                    filter: None,
1685                    best_effort_filter: None,
1686                    projection: None,
1687                    advanced_extension: None,
1688                    read_type: Some(ReadType::NamedTable(NamedTable {
1689                        names: vec!["orders".into()],
1690                        advanced_extension: None,
1691                    })),
1692                }))),
1693            })),
1694            grouping_expressions,
1695            groupings: grouping_sets,
1696            measures,
1697            common,
1698            advanced_extension: None,
1699        }
1700    }
1701
1702    fn get_basic_schema() -> NamedStruct {
1703        NamedStruct {
1704            names: vec!["category".into(), "amount".into(), "value".into()],
1705            r#struct: Some(Struct {
1706                type_variation_reference: 0,
1707                types: vec![
1708                    Type {
1709                        kind: Some(Kind::String(ptype::String {
1710                            type_variation_reference: 0,
1711                            nullability: Nullability::Nullable as i32,
1712                        })),
1713                    },
1714                    Type {
1715                        kind: Some(Kind::Fp64(ptype::Fp64 {
1716                            type_variation_reference: 0,
1717                            nullability: Nullability::Nullable as i32,
1718                        })),
1719                    },
1720                    Type {
1721                        kind: Some(Kind::I32(ptype::I32 {
1722                            type_variation_reference: 0,
1723                            nullability: Nullability::Nullable as i32,
1724                        })),
1725                    },
1726                ],
1727                nullability: Nullability::Nullable as i32,
1728            }),
1729        }
1730    }
1731
1732    fn create_exp(column_ind: i32) -> Expression {
1733        Expression {
1734            rex_type: Some(RexType::Selection(Box::new(
1735                FieldIndex(column_ind).to_field_reference(),
1736            ))),
1737        }
1738    }
1739
1740    #[test]
1741    fn test_unsupported_rel_type_produces_failure_token() {
1742        use substrait::proto::CrossRel;
1743
1744        let ctx = TestContext::new();
1745
1746        // CrossRel is a valid Substrait relation type that the textifier
1747        // does not yet support.  Wrapping it in a Rel and textifying should
1748        // produce a `!{Rel}` failure token rather than panicking.
1749        let rel = Rel {
1750            rel_type: Some(RelType::Cross(Box::new(CrossRel {
1751                common: None,
1752                left: None,
1753                right: None,
1754                advanced_extension: None,
1755            }))),
1756        };
1757
1758        let (result, errors) = ctx.textify(&rel);
1759
1760        // The output should contain the failure token, not an empty string.
1761        assert!(
1762            result.contains("!{Rel}"),
1763            "Expected '!{{Rel}}' in output, got: {result}"
1764        );
1765
1766        // Exactly one error should have been collected.
1767        assert_eq!(errors.0.len(), 1, "Expected exactly one error: {errors:?}");
1768
1769        // The error should be a Format / Unimplemented error mentioning CrossRel.
1770        match &errors.0[0] {
1771            FormatError::Format(plan_err) => {
1772                assert_eq!(plan_err.message, "Rel");
1773                assert_eq!(
1774                    plan_err.error_type,
1775                    crate::textify::foundation::FormatErrorType::Unimplemented
1776                );
1777                assert!(
1778                    plan_err.lookup.as_deref().unwrap_or("").contains("Cross"),
1779                    "Expected lookup to mention 'Cross', got: {:?}",
1780                    plan_err.lookup
1781                );
1782            }
1783            other => panic!("Expected FormatError::Format, got: {other:?}"),
1784        }
1785    }
1786}