substrait_explain/textify/
rels.rs

1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::convert::TryFrom;
4use std::fmt;
5use std::fmt::Debug;
6
7use prost::{Message, UnknownEnumValue};
8use substrait::proto::extensions::AdvancedExtension;
9use substrait::proto::fetch_rel::CountMode;
10use substrait::proto::plan_rel::RelType as PlanRelType;
11use substrait::proto::read_rel::ReadType;
12use substrait::proto::rel::RelType;
13use substrait::proto::rel_common::EmitKind;
14use substrait::proto::sort_field::{SortDirection, SortKind};
15use substrait::proto::{
16    AggregateFunction, AggregateRel, Expression, ExtensionLeafRel, ExtensionMultiRel,
17    ExtensionSingleRel, FetchRel, FilterRel, JoinRel, NamedStruct, PlanRel, ProjectRel, ReadRel,
18    Rel, RelCommon, RelRoot, SortField, SortRel, Type, join_rel,
19};
20
21use super::expressions::Reference;
22use super::types::Name;
23use super::{PlanError, Scope, Textify};
24use crate::FormatError;
25use crate::extensions::any::AnyRef;
26use crate::extensions::{ExtensionColumn, ExtensionValue};
27
28pub trait NamedRelation {
29    fn name(&self) -> &'static str;
30}
31
32impl NamedRelation for Rel {
33    fn name(&self) -> &'static str {
34        match self.rel_type.as_ref() {
35            None => "UnknownRel",
36            Some(RelType::Read(_)) => "Read",
37            Some(RelType::Filter(_)) => "Filter",
38            Some(RelType::Project(_)) => "Project",
39            Some(RelType::Fetch(_)) => "Fetch",
40            Some(RelType::Aggregate(_)) => "Aggregate",
41            Some(RelType::Sort(_)) => "Sort",
42            Some(RelType::HashJoin(_)) => "HashJoin",
43            Some(RelType::Exchange(_)) => "Exchange",
44            Some(RelType::Join(_)) => "Join",
45            Some(RelType::Set(_)) => "Set",
46            Some(RelType::ExtensionLeaf(_)) => "ExtensionLeaf",
47            Some(RelType::Cross(_)) => "Cross",
48            Some(RelType::Reference(_)) => "Reference",
49            Some(RelType::ExtensionSingle(_)) => "ExtensionSingle",
50            Some(RelType::ExtensionMulti(_)) => "ExtensionMulti",
51            Some(RelType::Write(_)) => "Write",
52            Some(RelType::Ddl(_)) => "Ddl",
53            Some(RelType::Update(_)) => "Update",
54            Some(RelType::MergeJoin(_)) => "MergeJoin",
55            Some(RelType::NestedLoopJoin(_)) => "NestedLoopJoin",
56            Some(RelType::Window(_)) => "Window",
57            Some(RelType::Expand(_)) => "Expand",
58        }
59    }
60}
61
62impl Textify for Rel {
63    fn name() -> &'static str {
64        "Rel"
65    }
66
67    fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
68        // delegates to `Relation` which carries `advanced_extension`, so the full
69        // header → enhancement → children sequence is handled uniformly there.
70        Relation::from_rel(self, ctx).textify(ctx, w)
71    }
72}
73
74/// Trait for enums that can be converted to a string representation for
75/// textification.
76///
77/// Returns Ok(str) for valid enum values, or Err([PlanError]) for invalid or
78/// unknown values.
79pub trait ValueEnum {
80    fn as_enum_str(&self) -> Result<Cow<'static, str>, PlanError>;
81}
82
83#[derive(Debug, Clone)]
84pub struct NamedArg<'a> {
85    pub name: Cow<'a, str>,
86    pub value: Value<'a>,
87}
88
89#[derive(Debug, Clone)]
90pub enum Value<'a> {
91    Name(Name<'a>),
92    TableName(Vec<Name<'a>>),
93    Field(Option<Name<'a>>, Option<&'a Type>),
94    Tuple(Vec<Value<'a>>),
95    List(Vec<Value<'a>>),
96    Reference(i32),
97    Expression(&'a Expression),
98    AggregateFunction(&'a AggregateFunction),
99    /// Represents a missing, invalid, or unspecified value.
100    Missing(PlanError),
101    /// Represents a valid enum value as a string for textification.
102    Enum(Cow<'a, str>),
103    EmptyGroup,
104    Integer(i64),
105    Float(f64),
106    Boolean(bool),
107    /// A decoded extension argument value.
108    ExtValue(ExtensionValue),
109    /// A decoded extension output column.
110    ExtColumn(ExtensionColumn),
111}
112
113impl<'a> Value<'a> {
114    pub fn expect(maybe_value: Option<Self>, f: impl FnOnce() -> PlanError) -> Self {
115        match maybe_value {
116            Some(s) => s,
117            None => Value::Missing(f()),
118        }
119    }
120}
121
122impl<'a> From<Result<Vec<Name<'a>>, PlanError>> for Value<'a> {
123    fn from(token: Result<Vec<Name<'a>>, PlanError>) -> Self {
124        match token {
125            Ok(value) => Value::TableName(value),
126            Err(err) => Value::Missing(err),
127        }
128    }
129}
130
131impl<'a> Textify for Value<'a> {
132    fn name() -> &'static str {
133        "Value"
134    }
135
136    fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
137        match self {
138            Value::Name(name) => write!(w, "{}", ctx.display(name)),
139            Value::TableName(names) => write!(w, "{}", ctx.separated(names, ".")),
140            Value::Field(name, typ) => {
141                write!(w, "{}:{}", ctx.expect(name.as_ref()), ctx.expect(*typ))
142            }
143            Value::Tuple(values) => write!(w, "({})", ctx.separated(values, ", ")),
144            Value::List(values) => write!(w, "[{}]", ctx.separated(values, ", ")),
145            Value::Reference(i) => write!(w, "{}", Reference(*i)),
146            Value::Expression(e) => write!(w, "{}", ctx.display(*e)),
147            Value::AggregateFunction(agg_fn) => agg_fn.textify(ctx, w),
148            Value::Missing(err) => write!(w, "{}", ctx.failure(err.clone())),
149            Value::Enum(res) => write!(w, "&{res}"),
150            Value::Integer(i) => write!(w, "{i}"),
151            Value::EmptyGroup => write!(w, "_"),
152            Value::Float(f) => write!(w, "{f}"),
153            Value::Boolean(b) => write!(w, "{b}"),
154            Value::ExtValue(ev) => ev.textify(ctx, w),
155            Value::ExtColumn(ec) => ec.textify(ctx, w),
156        }
157    }
158}
159
160fn schema_to_values<'a>(schema: &'a NamedStruct) -> Vec<Value<'a>> {
161    let mut fields = schema
162        .r#struct
163        .as_ref()
164        .map(|s| s.types.iter())
165        .into_iter()
166        .flatten();
167    let mut names = schema.names.iter();
168
169    // let field_count = schema.r#struct.as_ref().map(|s| s.types.len()).unwrap_or(0);
170    // let name_count = schema.names.len();
171
172    let mut values = Vec::new();
173    loop {
174        let field = fields.next();
175        let name = names.next().map(|n| Name(n));
176        if field.is_none() && name.is_none() {
177            break;
178        }
179
180        values.push(Value::Field(name, field));
181    }
182
183    values
184}
185
186struct Emitted<'a> {
187    pub values: &'a [Value<'a>],
188    pub emit: Option<&'a EmitKind>,
189}
190
191impl<'a> Emitted<'a> {
192    pub fn new(values: &'a [Value<'a>], emit: Option<&'a EmitKind>) -> Self {
193        Self { values, emit }
194    }
195
196    pub fn write_direct<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
197        write!(w, "{}", ctx.separated(self.values.iter(), ", "))
198    }
199}
200
201impl<'a> Textify for Emitted<'a> {
202    fn name() -> &'static str {
203        "Emitted"
204    }
205
206    fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
207        if ctx.options().show_emit {
208            return self.write_direct(ctx, w);
209        }
210
211        let indices = match &self.emit {
212            Some(EmitKind::Emit(e)) => &e.output_mapping,
213            Some(EmitKind::Direct(_)) => return self.write_direct(ctx, w),
214            None => return self.write_direct(ctx, w),
215        };
216
217        for (i, &index) in indices.iter().enumerate() {
218            if i > 0 {
219                write!(w, ", ")?;
220            }
221
222            match self.values.get(index as usize) {
223                Some(value) => write!(w, "{}", ctx.display(value))?,
224                None => write!(w, "{}", ctx.failure(PlanError::invalid(
225                    "Emitted",
226                    Some("output_mapping"),
227                    format!(
228                        "Output mapping index {} is out of bounds for values collection of size {}",
229                        index, self.values.len()
230                    )
231                )))?,
232            }
233        }
234
235        Ok(())
236    }
237}
238
239#[derive(Debug, Clone)]
240pub struct Arguments<'a> {
241    /// Positional arguments (e.g., a filter condition, group-bys, etc.)
242    pub positional: Vec<Value<'a>>,
243    /// Named arguments (e.g., limit=10, offset=5)
244    pub named: Vec<NamedArg<'a>>,
245}
246
247impl<'a> Textify for Arguments<'a> {
248    fn name() -> &'static str {
249        "Arguments"
250    }
251    fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
252        if self.positional.is_empty() && self.named.is_empty() {
253            return write!(w, "_");
254        }
255
256        write!(w, "{}", ctx.separated(self.positional.iter(), ", "))?;
257        if !self.positional.is_empty() && !self.named.is_empty() {
258            write!(w, ", ")?;
259        }
260        write!(w, "{}", ctx.separated(self.named.iter(), ", "))
261    }
262}
263
264pub struct Relation<'a> {
265    pub name: Cow<'a, str>,
266    /// Arguments to the relation, if any.
267    ///
268    /// - `None` means this relation does not take arguments, and the argument
269    ///   section is omitted entirely.
270    /// - `Some(args)` with both vectors empty means the relation takes
271    ///   arguments, but none are provided; this will print as `_ => ...`.
272    /// - `Some(args)` with non-empty vectors will print as usual, with
273    ///   positional arguments first, then named arguments, separated by commas.
274    pub arguments: Option<Arguments<'a>>,
275    /// The columns emitted by this relation, pre-emit - the 'direct' column
276    /// output.
277    pub columns: Vec<Value<'a>>,
278    /// The emit kind, if any. If none, use the columns directly.
279    pub emit: Option<&'a EmitKind>,
280    /// The advanced extension (enhancement and/or optimizations) attached to
281    /// this relation, if any.  Mirrors the `advanced_extension` field carried
282    /// by standard relation types in the protobuf (Read, Filter, Project, etc...)
283    ///  Extension relations (`ExtensionLeaf`, `ExtensionSingle`, `ExtensionMulti`)
284    /// do not carry this field and always set it to `None`.
285    pub advanced_extension: Option<&'a AdvancedExtension>,
286    /// The input relations.
287    pub children: Vec<Option<Relation<'a>>>,
288}
289
290impl Textify for Relation<'_> {
291    fn name() -> &'static str {
292        "Relation"
293    }
294
295    fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
296        self.write_header(ctx, w)?;
297        let child_scope = ctx.push_indent();
298        // Emit any enhancement / optimizations between the header line and the
299        // child relations, indented one level deeper than this relation — the
300        // same position they occupy in the text format when parsed.
301        if let Some(adv_ext) = self.advanced_extension {
302            adv_ext.textify(&child_scope, w)?;
303        }
304        self.write_children(ctx, w)?;
305        Ok(())
306    }
307}
308
309impl Relation<'_> {
310    /// Write the single header line for this relation, e.g. `Filter[$0 => $0]`.
311    /// Does not write a trailing newline; callers are responsible for any
312    /// newline that follows (either from adv_ext or from the next child).
313    pub fn write_header<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
314        let cols = Emitted::new(&self.columns, self.emit);
315        let indent = ctx.indent();
316        let name = &self.name;
317        let cols = ctx.display(&cols);
318        match &self.arguments {
319            None => {
320                write!(w, "{indent}{name}[{cols}]")
321            }
322            Some(args) => {
323                let args = ctx.display(args);
324                write!(w, "{indent}{name}[{args} => {cols}]")
325            }
326        }
327    }
328
329    /// Write each child relation at one indent level deeper than `ctx`.
330    /// Each child is preceded by a newline.
331    pub fn write_children<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
332        let child_scope = ctx.push_indent();
333        for child in self.children.iter().flatten() {
334            writeln!(w)?;
335            child.textify(&child_scope, w)?;
336        }
337        Ok(())
338    }
339}
340
341impl<'a> Relation<'a> {
342    pub fn emitted(&self) -> usize {
343        match self.emit {
344            Some(EmitKind::Emit(e)) => e.output_mapping.len(),
345            Some(EmitKind::Direct(_)) => self.columns.len(),
346            None => self.columns.len(),
347        }
348    }
349}
350
351#[derive(Debug, Copy, Clone)]
352pub struct TableName<'a>(&'a [String]);
353
354impl<'a> Textify for TableName<'a> {
355    fn name() -> &'static str {
356        "TableName"
357    }
358
359    fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
360        let names = self.0.iter().map(|n| Name(n)).collect::<Vec<_>>();
361        write!(w, "{}", ctx.separated(names.iter(), "."))
362    }
363}
364
365impl<'a> Relation<'a> {
366    fn from_read<S: Scope>(rel: &'a ReadRel, _ctx: &S) -> Self {
367        let columns = read_columns(rel);
368        let emit = rel.common.as_ref().and_then(|c| c.emit_kind.as_ref());
369
370        match &rel.read_type {
371            Some(ReadType::NamedTable(table)) => {
372                let table_name = Value::TableName(table.names.iter().map(|n| Name(n)).collect());
373                Relation {
374                    name: Cow::Borrowed("Read"),
375                    arguments: Some(Arguments {
376                        positional: vec![table_name],
377                        named: vec![],
378                    }),
379                    columns,
380                    emit,
381                    advanced_extension: rel.advanced_extension.as_ref(),
382                    children: vec![],
383                }
384            }
385            Some(ReadType::VirtualTable(vt)) => {
386                let positional = vt
387                    .expressions
388                    .iter()
389                    .map(|row| Value::Tuple(row.fields.iter().map(Value::Expression).collect()))
390                    .collect();
391
392                Relation {
393                    name: Cow::Borrowed("Read:Virtual"),
394                    arguments: Some(Arguments {
395                        positional,
396                        named: vec![],
397                    }),
398                    columns,
399                    emit,
400                    advanced_extension: rel.advanced_extension.as_ref(),
401                    children: vec![],
402                }
403            }
404            other => {
405                let err = PlanError::unimplemented(
406                    "ReadRel",
407                    Some("read_type"),
408                    format!("Unsupported read type {other:?}"),
409                );
410                Relation {
411                    name: Cow::Borrowed("Read"),
412                    arguments: Some(Arguments {
413                        positional: vec![Value::Missing(err)],
414                        named: vec![],
415                    }),
416                    columns,
417                    emit,
418                    advanced_extension: rel.advanced_extension.as_ref(),
419                    children: vec![],
420                }
421            }
422        }
423    }
424}
425
426fn read_columns<'a>(rel: &'a ReadRel) -> Vec<Value<'a>> {
427    match rel.base_schema {
428        Some(ref schema) => schema_to_values(schema),
429        None => {
430            let err =
431                PlanError::unimplemented("ReadRel", Some("base_schema"), "Base schema is required");
432            vec![Value::Missing(err)]
433        }
434    }
435}
436
437pub fn get_emit(rel: Option<&RelCommon>) -> Option<&EmitKind> {
438    rel.as_ref().and_then(|c| c.emit_kind.as_ref())
439}
440
441impl<'a> Relation<'a> {
442    /// Create a vector of values that are references to the emitted outputs of
443    /// this relation. "Emitted" here meaning the outputs of this relation after
444    /// the emit kind has been applied.
445    ///
446    /// This is useful for relations like Filter and Limit whose direct outputs
447    /// are primarily those of its children (direct here meaning before the emit
448    /// has been applied).
449    pub fn input_refs(&self) -> Vec<Value<'a>> {
450        let len = self.emitted();
451        (0..len).map(|i| Value::Reference(i as i32)).collect()
452    }
453
454    /// Convert a vector of relation references into their structured form.
455    ///
456    /// Returns a list of children (with None for ones missing), and a count of input columns.
457    pub fn convert_children<S: Scope>(
458        refs: Vec<Option<&'a Rel>>,
459        ctx: &S,
460    ) -> (Vec<Option<Relation<'a>>>, usize) {
461        let mut children = vec![];
462        let mut inputs = 0;
463
464        for maybe_rel in refs {
465            match maybe_rel {
466                Some(rel) => {
467                    let child = Relation::from_rel(rel, ctx);
468                    inputs += child.emitted();
469                    children.push(Some(child));
470                }
471                None => children.push(None),
472            }
473        }
474
475        (children, inputs)
476    }
477}
478
479impl<'a> Relation<'a> {
480    fn from_filter<S: Scope>(rel: &'a FilterRel, ctx: &S) -> Self {
481        let condition = rel
482            .condition
483            .as_ref()
484            .map(|c| Value::Expression(c.as_ref()));
485        let condition = Value::expect(condition, || {
486            PlanError::unimplemented("FilterRel", Some("condition"), "Condition is None")
487        });
488        let positional = vec![condition];
489        let arguments = Some(Arguments {
490            positional,
491            named: vec![],
492        });
493        let emit = get_emit(rel.common.as_ref());
494        let (children, columns) = Relation::convert_children(vec![rel.input.as_deref()], ctx);
495        let columns = (0..columns).map(|i| Value::Reference(i as i32)).collect();
496
497        Relation {
498            name: Cow::Borrowed("Filter"),
499            arguments,
500            columns,
501            emit,
502            advanced_extension: rel.advanced_extension.as_ref(),
503            children,
504        }
505    }
506
507    fn from_project<S: Scope>(rel: &'a ProjectRel, ctx: &S) -> Self {
508        let (children, input_columns) = Relation::convert_children(vec![rel.input.as_deref()], ctx);
509        let mut columns: Vec<Value> = vec![];
510        for i in 0..input_columns {
511            columns.push(Value::Reference(i as i32));
512        }
513        for expr in &rel.expressions {
514            columns.push(Value::Expression(expr));
515        }
516
517        Relation {
518            name: Cow::Borrowed("Project"),
519            arguments: None,
520            columns,
521            emit: get_emit(rel.common.as_ref()),
522            advanced_extension: rel.advanced_extension.as_ref(),
523            children,
524        }
525    }
526
527    pub fn from_rel<S: Scope>(rel: &'a Rel, ctx: &S) -> Self {
528        match rel.rel_type.as_ref() {
529            Some(RelType::Read(r)) => Relation::from_read(r, ctx),
530            Some(RelType::Filter(r)) => Relation::from_filter(r, ctx),
531            Some(RelType::Project(r)) => Relation::from_project(r, ctx),
532            Some(RelType::Aggregate(r)) => Relation::from_aggregate(r, ctx),
533            Some(RelType::Sort(r)) => Relation::from_sort(r, ctx),
534            Some(RelType::Fetch(r)) => Relation::from_fetch(r, ctx),
535            Some(RelType::Join(r)) => Relation::from_join(r, ctx),
536            Some(RelType::ExtensionLeaf(r)) => Relation::from_extension_leaf(r, ctx),
537            Some(RelType::ExtensionSingle(r)) => Relation::from_extension_single(r, ctx),
538            Some(RelType::ExtensionMulti(r)) => Relation::from_extension_multi(r, ctx),
539            _ => {
540                let name = rel.name();
541                let token = ctx.failure(FormatError::Format(PlanError::unimplemented(
542                    "Rel",
543                    Some(name),
544                    format!("{name} is not yet supported in the text format"),
545                )));
546                Relation {
547                    name: Cow::Owned(format!("{token}")),
548                    arguments: None,
549                    columns: vec![],
550                    emit: None,
551                    advanced_extension: None,
552                    children: vec![],
553                }
554            }
555        }
556    }
557
558    fn from_extension_leaf<S: Scope>(rel: &'a ExtensionLeafRel, ctx: &S) -> Self {
559        let detail_ref = rel.detail.as_ref().map(AnyRef::from);
560        let decoded = match detail_ref {
561            Some(d) => ctx.extension_registry().decode(d),
562            None => Err(crate::extensions::registry::ExtensionError::MissingDetail),
563        };
564        Relation::from_extension("ExtensionLeaf", decoded, vec![], ctx)
565    }
566
567    fn from_extension_single<S: Scope>(rel: &'a ExtensionSingleRel, ctx: &S) -> Self {
568        let detail_ref = rel.detail.as_ref().map(AnyRef::from);
569        let decoded = match detail_ref {
570            Some(d) => ctx.extension_registry().decode(d),
571            None => Err(crate::extensions::registry::ExtensionError::MissingDetail),
572        };
573        Relation::from_extension("ExtensionSingle", decoded, vec![rel.input.as_deref()], ctx)
574    }
575
576    fn from_extension_multi<S: Scope>(rel: &'a ExtensionMultiRel, ctx: &S) -> Self {
577        let detail_ref = rel.detail.as_ref().map(AnyRef::from);
578        let decoded = match detail_ref {
579            Some(d) => ctx.extension_registry().decode(d),
580            None => Err(crate::extensions::registry::ExtensionError::MissingDetail),
581        };
582        let mut child_refs: Vec<Option<&'a Rel>> = vec![];
583        for input in &rel.inputs {
584            child_refs.push(Some(input));
585        }
586        Relation::from_extension("ExtensionMulti", decoded, child_refs, ctx)
587    }
588
589    fn from_extension<S: Scope>(
590        ext_type: &'static str,
591        decoded: Result<
592            (String, crate::extensions::ExtensionArgs),
593            crate::extensions::registry::ExtensionError,
594        >,
595        child_refs: Vec<Option<&'a Rel>>,
596        ctx: &S,
597    ) -> Self {
598        match decoded {
599            Ok((name, args)) => {
600                let (children, _) = Relation::convert_children(child_refs, ctx);
601                let mut positional = vec![];
602                for value in args.positional {
603                    positional.push(Value::ExtValue(value));
604                }
605                let mut named = vec![];
606                for (key, value) in args.named {
607                    named.push(NamedArg {
608                        name: Cow::Owned(key),
609                        value: Value::ExtValue(value),
610                    });
611                }
612                let mut columns = vec![];
613                for col in args.output_columns {
614                    columns.push(Value::ExtColumn(col));
615                }
616                Relation {
617                    name: Cow::Owned(format!("{}:{}", ext_type, name)),
618                    arguments: Some(Arguments { positional, named }),
619                    columns,
620                    emit: None,
621                    // Extension relations use `detail` rather than
622                    // `advanced_extension`; the field does not exist on these
623                    // proto types.
624                    advanced_extension: None,
625                    children,
626                }
627            }
628            Err(error) => {
629                let (children, _) = Relation::convert_children(child_refs, ctx);
630                Relation {
631                    name: Cow::Borrowed(ext_type),
632                    arguments: None,
633                    columns: vec![Value::Missing(PlanError::invalid(
634                        "extension",
635                        None::<&str>,
636                        error.to_string(),
637                    ))],
638                    emit: None,
639                    advanced_extension: None,
640                    children,
641                }
642            }
643        }
644    }
645
646    /// Convert an AggregateRel to a Relation for textification.
647    ///
648    /// The conversion follows this logic:
649    /// 1. Arguments: Group-by expressions (as Value::Expression)
650    /// 2. Columns: All possible outputs in order:
651    ///    - First: Group-by field references (Value::Reference)
652    ///    - Then: Aggregate function measures (Value::AggregateFunction)
653    /// 3. Emit: Uses the relation's emit mapping to select which outputs to display
654    /// 4. Children: The input relation
655    fn from_aggregate<S: Scope>(rel: &'a AggregateRel, ctx: &S) -> Self {
656        let mut grouping_sets: Vec<Vec<Value>> = vec![]; // the Groupings in the Aggregate
657        let expression_list: Vec<Value>; // grouping_expressions defined on Aggregate
658
659        // if rel.grouping_expressions is empty, the deprecated rel.groupings.grouping_expressions might be set
660        // If *both* the deprecated `rel.groupings.grouping_expressions` and `rel.grouping_expressions` are
661        // set, then we silently ignore the deprecated one.
662        #[allow(deprecated)]
663        if rel.grouping_expressions.is_empty()
664            && !rel.groupings.is_empty()
665            && !rel.groupings[0].grouping_expressions.is_empty()
666        {
667            (expression_list, grouping_sets) = Relation::get_grouping_sets(rel);
668        } else {
669            expression_list = rel
670                .grouping_expressions
671                .iter()
672                .map(Value::Expression)
673                .collect::<Vec<_>>(); // already a list of the unique expressions
674            for group in &rel.groupings {
675                let mut grouping_set: Vec<Value> = vec![];
676                for i in &group.expression_references {
677                    grouping_set.push(Value::Reference(*i as i32));
678                }
679                grouping_sets.push(grouping_set);
680            }
681            // no defined groupings means there is global group by
682            if rel.groupings.is_empty() {
683                grouping_sets.push(vec![]);
684            }
685        }
686
687        let is_single = grouping_sets.len() == 1;
688        let mut positional: Vec<Value> = vec![];
689        for g in grouping_sets {
690            if g.is_empty() {
691                positional.push(Value::EmptyGroup);
692            } else if is_single {
693                // Single non-empty grouping set: spread expressions directly without parens
694                positional.extend(g);
695            } else {
696                positional.push(Value::Tuple(g));
697            }
698        }
699
700        // adding the grouping_sets as a list of Arguments to Aggregate Rel
701        let arguments = Some(Arguments {
702            positional,
703            named: vec![],
704        });
705
706        // The columns are the direct outputs of this relation (before emit)
707        let mut all_outputs: Vec<Value> = expression_list;
708
709        // Then, add all measures (aggregate functions)
710        // These are indexed after the group-by fields
711        for m in &rel.measures {
712            if let Some(agg_fn) = m.measure.as_ref() {
713                all_outputs.push(Value::AggregateFunction(agg_fn));
714            }
715        }
716        let emit = get_emit(rel.common.as_ref());
717        let (children, _) = Relation::convert_children(vec![rel.input.as_deref()], ctx);
718
719        Relation {
720            name: Cow::Borrowed("Aggregate"),
721            arguments,
722            columns: all_outputs,
723            emit,
724            advanced_extension: rel.advanced_extension.as_ref(),
725            children,
726        }
727    }
728
729    fn get_grouping_sets(rel: &'a AggregateRel) -> (Vec<Value<'a>>, Vec<Vec<Value<'a>>>) {
730        let mut grouping_sets: Vec<Vec<Value>> = vec![];
731        let mut expression_list: Vec<Value> = Vec::new();
732
733        // groupings might have the same expressions in their set so we use a map to get unique expressions
734        let mut expression_index_map = HashMap::new();
735        let mut i: i32 = 0; // index for the unique expression in the grouping_expressions list
736
737        for group in &rel.groupings {
738            let mut grouping_set: Vec<Value> = vec![];
739            #[allow(deprecated)]
740            for exp in &group.grouping_expressions {
741                // TODO: use a better key here than encoding to bytes.
742                // Ideally, substrait-rs would support `PartialEq` and `Hash`,
743                // but as there isn't an easy way to do that now, we'll skip.
744                let key = exp.encode_to_vec();
745                expression_index_map.entry(key.clone()).or_insert_with(|| {
746                    let value = Value::Expression(exp);
747                    expression_list.push(value); // new unique expression found
748                    // mapping the byte encoded expression to its index in the group_expression list
749                    let index = i;
750                    i += 1;
751                    index // is expression returned by this closure and inserted into map
752                });
753                grouping_set.push(Value::Reference(expression_index_map[&key]));
754            }
755            grouping_sets.push(grouping_set);
756        }
757        (expression_list, grouping_sets)
758    }
759}
760
761impl Textify for RelRoot {
762    fn name() -> &'static str {
763        "RelRoot"
764    }
765
766    fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
767        let names = self.names.iter().map(|n| Name(n)).collect::<Vec<_>>();
768
769        write!(
770            w,
771            "{}Root[{}]",
772            ctx.indent(),
773            ctx.separated(names.iter(), ", ")
774        )?;
775        let child_scope = ctx.push_indent();
776        for child in self.input.iter() {
777            writeln!(w)?;
778            child.textify(&child_scope, w)?;
779        }
780
781        Ok(())
782    }
783}
784
785impl Textify for PlanRelType {
786    fn name() -> &'static str {
787        "PlanRelType"
788    }
789
790    fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
791        match self {
792            PlanRelType::Rel(rel) => rel.textify(ctx, w),
793            PlanRelType::Root(root) => root.textify(ctx, w),
794        }
795    }
796}
797
798impl Textify for PlanRel {
799    fn name() -> &'static str {
800        "PlanRel"
801    }
802
803    /// Write the relation as a string. Inputs are ignored - those are handled
804    /// separately.
805    fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
806        write!(w, "{}", ctx.expect(self.rel_type.as_ref()))
807    }
808}
809
810impl<'a> Relation<'a> {
811    fn from_sort<S: Scope>(rel: &'a SortRel, ctx: &S) -> Self {
812        let (children, input_columns) = Relation::convert_children(vec![rel.input.as_deref()], ctx);
813        let mut positional = vec![];
814        for sort_field in &rel.sorts {
815            positional.push(Value::from(sort_field));
816        }
817        let arguments = Some(Arguments {
818            positional,
819            named: vec![],
820        });
821        // The columns are the direct outputs of this relation (before emit)
822        let mut col_values = vec![];
823        for i in 0..input_columns {
824            col_values.push(Value::Reference(i as i32));
825        }
826        let emit = get_emit(rel.common.as_ref());
827        Relation {
828            name: Cow::Borrowed("Sort"),
829            arguments,
830            columns: col_values,
831            emit,
832            advanced_extension: rel.advanced_extension.as_ref(),
833            children,
834        }
835    }
836
837    fn from_fetch<S: Scope>(rel: &'a FetchRel, ctx: &S) -> Self {
838        let (children, input_columns) = Relation::convert_children(vec![rel.input.as_deref()], ctx);
839        let mut named_args: Vec<NamedArg> = vec![];
840        match &rel.count_mode {
841            Some(CountMode::CountExpr(expr)) => {
842                named_args.push(NamedArg {
843                    name: Cow::Borrowed("limit"),
844                    value: Value::Expression(expr),
845                });
846            }
847            #[allow(deprecated)]
848            Some(CountMode::Count(val)) => {
849                named_args.push(NamedArg {
850                    name: Cow::Borrowed("limit"),
851                    value: Value::Integer(*val),
852                });
853            }
854            None => {}
855        }
856        if let Some(offset) = &rel.offset_mode {
857            match offset {
858                substrait::proto::fetch_rel::OffsetMode::OffsetExpr(expr) => {
859                    named_args.push(NamedArg {
860                        name: Cow::Borrowed("offset"),
861                        value: Value::Expression(expr),
862                    });
863                }
864                #[allow(deprecated)]
865                substrait::proto::fetch_rel::OffsetMode::Offset(val) => {
866                    named_args.push(NamedArg {
867                        name: Cow::Borrowed("offset"),
868                        value: Value::Integer(*val),
869                    });
870                }
871            }
872        }
873
874        let emit = get_emit(rel.common.as_ref());
875        // Fetch is passthrough — direct output is all input columns.
876        let columns: Vec<Value> = (0..input_columns)
877            .map(|i| Value::Reference(i as i32))
878            .collect();
879        Relation {
880            name: Cow::Borrowed("Fetch"),
881            arguments: Some(Arguments {
882                positional: vec![],
883                named: named_args,
884            }),
885            columns,
886            emit,
887            advanced_extension: rel.advanced_extension.as_ref(),
888            children,
889        }
890    }
891}
892
893fn join_output_columns(
894    join_type: join_rel::JoinType,
895    left_columns: usize,
896    right_columns: usize,
897) -> Vec<Value<'static>> {
898    let total_columns = match join_type {
899        // Inner, Left, Right, Outer joins output columns from both sides
900        join_rel::JoinType::Inner
901        | join_rel::JoinType::Left
902        | join_rel::JoinType::Right
903        | join_rel::JoinType::Outer => left_columns + right_columns,
904
905        // Left semi/anti joins only output columns from the left side
906        join_rel::JoinType::LeftSemi | join_rel::JoinType::LeftAnti => left_columns,
907
908        // Right semi/anti joins output columns from the right side
909        join_rel::JoinType::RightSemi | join_rel::JoinType::RightAnti => right_columns,
910
911        // Single joins behave like semi joins
912        join_rel::JoinType::LeftSingle => left_columns,
913        join_rel::JoinType::RightSingle => right_columns,
914
915        // Mark joins output base columns plus one mark column
916        join_rel::JoinType::LeftMark => left_columns + 1,
917        join_rel::JoinType::RightMark => right_columns + 1,
918
919        // Unspecified - fallback to all columns
920        join_rel::JoinType::Unspecified => left_columns + right_columns,
921    };
922
923    // Output is always a contiguous range starting from $0
924    (0..total_columns)
925        .map(|i| Value::Reference(i as i32))
926        .collect()
927}
928
929impl<'a> Relation<'a> {
930    fn from_join<S: Scope>(rel: &'a JoinRel, ctx: &S) -> Self {
931        let (children, _total_columns) =
932            Relation::convert_children(vec![rel.left.as_deref(), rel.right.as_deref()], ctx);
933
934        // convert_children should preserve input vector length
935        assert_eq!(
936            children.len(),
937            2,
938            "convert_children should return same number of elements as input"
939        );
940
941        // Calculate left and right column counts separately
942        let left_columns = match &children[0] {
943            Some(child) => child.emitted(),
944            None => 0,
945        };
946        let right_columns = match &children[1] {
947            Some(child) => child.emitted(),
948            None => 0,
949        };
950
951        // Convert join type from protobuf i32 to enum value
952        // JoinType is stored as i32 in protobuf, convert to typed enum for processing
953        let (join_type, join_type_value) = match join_rel::JoinType::try_from(rel.r#type) {
954            Ok(join_type) => {
955                let join_type_value = match join_type.as_enum_str() {
956                    Ok(s) => Value::Enum(s),
957                    Err(e) => Value::Missing(e),
958                };
959                (join_type, join_type_value)
960            }
961            Err(_) => {
962                // Use Unspecified for the join_type but create an error for the join_type_value
963                let join_type_error = Value::Missing(PlanError::invalid(
964                    "JoinRel",
965                    Some("type"),
966                    format!("Unknown join type: {}", rel.r#type),
967                ));
968                (join_rel::JoinType::Unspecified, join_type_error)
969            }
970        };
971
972        // Join condition
973        let condition = rel
974            .expression
975            .as_ref()
976            .map(|c| Value::Expression(c.as_ref()));
977        let condition = Value::expect(condition, || {
978            PlanError::unimplemented("JoinRel", Some("expression"), "Join condition is None")
979        });
980
981        // TODO: Add support for post_join_filter when grammar is extended
982        // Currently post_join_filter is not supported in the text format
983        // grammar
984        let positional = vec![join_type_value, condition];
985        let arguments = Some(Arguments {
986            positional,
987            named: vec![],
988        });
989
990        let emit = get_emit(rel.common.as_ref());
991        let columns = join_output_columns(join_type, left_columns, right_columns);
992
993        Relation {
994            name: Cow::Borrowed("Join"),
995            arguments,
996            columns,
997            emit,
998            advanced_extension: rel.advanced_extension.as_ref(),
999            children,
1000        }
1001    }
1002}
1003
1004impl<'a> From<&'a SortField> for Value<'a> {
1005    fn from(sf: &'a SortField) -> Self {
1006        let field = match &sf.expr {
1007            Some(expr) => match &expr.rex_type {
1008                Some(substrait::proto::expression::RexType::Selection(fref)) => {
1009                    if let Some(substrait::proto::expression::field_reference::ReferenceType::DirectReference(seg)) = &fref.reference_type {
1010                        if let Some(substrait::proto::expression::reference_segment::ReferenceType::StructField(sf)) = &seg.reference_type {
1011                            Value::Reference(sf.field)
1012                        } else { Value::Missing(PlanError::unimplemented("SortField", Some("expr"), "Not a struct field")) }
1013                    } else { Value::Missing(PlanError::unimplemented("SortField", Some("expr"), "Not a direct reference")) }
1014                }
1015                _ => Value::Missing(PlanError::unimplemented(
1016                    "SortField",
1017                    Some("expr"),
1018                    "Not a selection",
1019                )),
1020            },
1021            None => Value::Missing(PlanError::unimplemented(
1022                "SortField",
1023                Some("expr"),
1024                "Missing expr",
1025            )),
1026        };
1027        let direction = match &sf.sort_kind {
1028            Some(kind) => Value::from(kind),
1029            None => Value::Missing(PlanError::invalid(
1030                "SortKind",
1031                Some(Cow::Borrowed("sort_kind")),
1032                "Missing sort_kind",
1033            )),
1034        };
1035        Value::Tuple(vec![field, direction])
1036    }
1037}
1038
1039impl<'a, T: ValueEnum + ?Sized> From<&'a T> for Value<'a> {
1040    fn from(enum_val: &'a T) -> Self {
1041        match enum_val.as_enum_str() {
1042            Ok(s) => Value::Enum(s),
1043            Err(e) => Value::Missing(e),
1044        }
1045    }
1046}
1047
1048impl ValueEnum for SortKind {
1049    fn as_enum_str(&self) -> Result<Cow<'static, str>, PlanError> {
1050        let d = match self {
1051            &SortKind::Direction(d) => SortDirection::try_from(d),
1052            SortKind::ComparisonFunctionReference(f) => {
1053                return Err(PlanError::invalid(
1054                    "SortKind",
1055                    Some(Cow::Owned(format!("function reference{f}"))),
1056                    "SortKind::ComparisonFunctionReference unimplemented",
1057                ));
1058            }
1059        };
1060        let s = match d {
1061            Err(UnknownEnumValue(d)) => {
1062                return Err(PlanError::invalid(
1063                    "SortKind",
1064                    Some(Cow::Owned(format!("unknown variant: {d:?}"))),
1065                    "Unknown SortDirection",
1066                ));
1067            }
1068            Ok(SortDirection::AscNullsFirst) => "AscNullsFirst",
1069            Ok(SortDirection::AscNullsLast) => "AscNullsLast",
1070            Ok(SortDirection::DescNullsFirst) => "DescNullsFirst",
1071            Ok(SortDirection::DescNullsLast) => "DescNullsLast",
1072            Ok(SortDirection::Clustered) => "Clustered",
1073            Ok(SortDirection::Unspecified) => {
1074                return Err(PlanError::invalid(
1075                    "SortKind",
1076                    Option::<Cow<str>>::None,
1077                    "Unspecified SortDirection",
1078                ));
1079            }
1080        };
1081        Ok(Cow::Borrowed(s))
1082    }
1083}
1084
1085impl ValueEnum for join_rel::JoinType {
1086    fn as_enum_str(&self) -> Result<Cow<'static, str>, PlanError> {
1087        let s = match self {
1088            join_rel::JoinType::Unspecified => {
1089                return Err(PlanError::invalid(
1090                    "JoinType",
1091                    Option::<Cow<str>>::None,
1092                    "Unspecified JoinType",
1093                ));
1094            }
1095            join_rel::JoinType::Inner => "Inner",
1096            join_rel::JoinType::Outer => "Outer",
1097            join_rel::JoinType::Left => "Left",
1098            join_rel::JoinType::Right => "Right",
1099            join_rel::JoinType::LeftSemi => "LeftSemi",
1100            join_rel::JoinType::RightSemi => "RightSemi",
1101            join_rel::JoinType::LeftAnti => "LeftAnti",
1102            join_rel::JoinType::RightAnti => "RightAnti",
1103            join_rel::JoinType::LeftSingle => "LeftSingle",
1104            join_rel::JoinType::RightSingle => "RightSingle",
1105            join_rel::JoinType::LeftMark => "LeftMark",
1106            join_rel::JoinType::RightMark => "RightMark",
1107        };
1108        Ok(Cow::Borrowed(s))
1109    }
1110}
1111
1112impl<'a> Textify for NamedArg<'a> {
1113    fn name() -> &'static str {
1114        "NamedArg"
1115    }
1116    fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
1117        write!(w, "{}=", self.name)?;
1118        self.value.textify(ctx, w)
1119    }
1120}
1121
1122#[cfg(test)]
1123mod tests {
1124    use substrait::proto::aggregate_rel::Grouping;
1125    use substrait::proto::expression::literal::LiteralType;
1126    use substrait::proto::expression::{Literal, RexType, ScalarFunction};
1127    use substrait::proto::function_argument::ArgType;
1128    use substrait::proto::read_rel::{NamedTable, ReadType};
1129    use substrait::proto::rel_common::{Direct, Emit};
1130    use substrait::proto::r#type::{self as ptype, Kind, Nullability, Struct};
1131    use substrait::proto::{
1132        Expression, FunctionArgument, NamedStruct, ReadRel, Type, aggregate_rel,
1133    };
1134
1135    use super::*;
1136    use crate::fixtures::TestContext;
1137    use crate::parser::expressions::FieldIndex;
1138
1139    #[test]
1140    fn test_read_rel() {
1141        let ctx = TestContext::new();
1142
1143        // Create a simple ReadRel with a NamedStruct schema
1144        let read_rel = ReadRel {
1145            common: None,
1146            base_schema: Some(NamedStruct {
1147                names: vec!["col1".into(), "column 2".into()],
1148                r#struct: Some(Struct {
1149                    type_variation_reference: 0,
1150                    types: vec![
1151                        Type {
1152                            kind: Some(Kind::I32(ptype::I32 {
1153                                type_variation_reference: 0,
1154                                nullability: Nullability::Nullable as i32,
1155                            })),
1156                        },
1157                        Type {
1158                            kind: Some(Kind::String(ptype::String {
1159                                type_variation_reference: 0,
1160                                nullability: Nullability::Nullable as i32,
1161                            })),
1162                        },
1163                    ],
1164                    nullability: Nullability::Nullable as i32,
1165                }),
1166            }),
1167            filter: None,
1168            best_effort_filter: None,
1169            projection: None,
1170            advanced_extension: None,
1171            read_type: Some(ReadType::NamedTable(NamedTable {
1172                names: vec!["some_db".into(), "test_table".into()],
1173                advanced_extension: None,
1174            })),
1175        };
1176
1177        let rel = Rel {
1178            rel_type: Some(RelType::Read(Box::new(read_rel))),
1179        };
1180        let (result, errors) = ctx.textify(&rel);
1181        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1182        assert_eq!(
1183            result,
1184            "Read[some_db.test_table => col1:i32?, \"column 2\":string?]"
1185        );
1186    }
1187
1188    #[test]
1189    fn test_filter_rel() {
1190        let ctx = TestContext::new()
1191            .with_urn(1, "test_urn")
1192            .with_function(1, 10, "gt");
1193
1194        // Create a simple FilterRel with a ReadRel input and a filter expression
1195        let read_rel = ReadRel {
1196            common: None,
1197            base_schema: Some(NamedStruct {
1198                names: vec!["col1".into(), "col2".into()],
1199                r#struct: Some(Struct {
1200                    type_variation_reference: 0,
1201                    types: vec![
1202                        Type {
1203                            kind: Some(Kind::I32(ptype::I32 {
1204                                type_variation_reference: 0,
1205                                nullability: Nullability::Nullable as i32,
1206                            })),
1207                        },
1208                        Type {
1209                            kind: Some(Kind::I32(ptype::I32 {
1210                                type_variation_reference: 0,
1211                                nullability: Nullability::Nullable as i32,
1212                            })),
1213                        },
1214                    ],
1215                    nullability: Nullability::Nullable as i32,
1216                }),
1217            }),
1218            filter: None,
1219            best_effort_filter: None,
1220            projection: None,
1221            advanced_extension: None,
1222            read_type: Some(ReadType::NamedTable(NamedTable {
1223                names: vec!["test_table".into()],
1224                advanced_extension: None,
1225            })),
1226        };
1227
1228        // Create a filter expression: col1 > 10
1229        let filter_expr = Expression {
1230            rex_type: Some(RexType::ScalarFunction(ScalarFunction {
1231                function_reference: 10, // gt function
1232                arguments: vec![
1233                    FunctionArgument {
1234                        arg_type: Some(ArgType::Value(Reference(0).into())),
1235                    },
1236                    FunctionArgument {
1237                        arg_type: Some(ArgType::Value(Expression {
1238                            rex_type: Some(RexType::Literal(Literal {
1239                                literal_type: Some(LiteralType::I32(10)),
1240                                nullable: false,
1241                                type_variation_reference: 0,
1242                            })),
1243                        })),
1244                    },
1245                ],
1246                options: vec![],
1247                output_type: None,
1248                #[allow(deprecated)]
1249                args: vec![],
1250            })),
1251        };
1252
1253        let filter_rel = FilterRel {
1254            common: None,
1255            input: Some(Box::new(Rel {
1256                rel_type: Some(RelType::Read(Box::new(read_rel))),
1257            })),
1258            condition: Some(Box::new(filter_expr)),
1259            advanced_extension: None,
1260        };
1261
1262        let rel = Rel {
1263            rel_type: Some(RelType::Filter(Box::new(filter_rel))),
1264        };
1265
1266        let (result, errors) = ctx.textify(&rel);
1267        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1268        let expected = r#"
1269Filter[gt($0, 10:i32) => $0, $1]
1270  Read[test_table => col1:i32?, col2:i32?]"#
1271            .trim_start();
1272        assert_eq!(result, expected);
1273    }
1274
1275    #[test]
1276    fn test_aggregate_function_textify() {
1277        let ctx = TestContext::new()
1278        .with_urn(1, "https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate.yaml")
1279        .with_function(1, 10, "sum")
1280        .with_function(1, 11, "count");
1281
1282        // Create a simple AggregateFunction
1283        let agg_fn = get_aggregate_func(10, 1);
1284
1285        let value = Value::AggregateFunction(&agg_fn);
1286        let (result, errors) = ctx.textify(&value);
1287
1288        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1289        assert_eq!(result, "sum($1)");
1290    }
1291
1292    #[test]
1293    fn test_aggregate_relation_textify() {
1294        let ctx = TestContext::new()
1295        .with_urn(1, "https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate.yaml")
1296        .with_function(1, 10, "sum")
1297        .with_function(1, 11, "count");
1298
1299        // Create a simple AggregateRel
1300        let agg_fn1 = get_aggregate_func(10, 1);
1301        let agg_fn2 = get_aggregate_func(11, 1);
1302
1303        let grouping_expressions = vec![Expression {
1304            rex_type: Some(RexType::Selection(Box::new(
1305                FieldIndex(0).to_field_reference(),
1306            ))),
1307        }];
1308
1309        let measures = vec![
1310            aggregate_rel::Measure {
1311                measure: Some(agg_fn1),
1312                filter: None,
1313            },
1314            aggregate_rel::Measure {
1315                measure: Some(agg_fn2),
1316                filter: None,
1317            },
1318        ];
1319
1320        let common = Some(RelCommon {
1321            emit_kind: Some(EmitKind::Emit(Emit {
1322                output_mapping: vec![1, 2], // measures only
1323            })),
1324            ..Default::default()
1325        });
1326
1327        let aggregate_rel = create_aggregate_rel(grouping_expressions, vec![], measures, common);
1328
1329        let rel = Rel {
1330            rel_type: Some(RelType::Aggregate(Box::new(aggregate_rel))),
1331        };
1332        let (result, errors) = ctx.textify(&rel);
1333
1334        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1335        // Expected: Aggregate[_ => sum($1), count($1)] we chose to emit only measures
1336        assert!(result.contains("Aggregate[_ => sum($1), count($1)]"));
1337    }
1338
1339    #[test]
1340    fn test_multiple_groupings_on_aggregate_deprecated() {
1341        // Protobuf plan that uses AggregateRel.groupings with deprecated
1342        // grouping_expressions, leaving AggregateRel.grouping_expressions empty.
1343        let ctx = TestContext::new()
1344        .with_urn(1, "https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate.yaml")
1345        .with_function(1, 11, "count");
1346
1347        let grouping_expr_0 = create_exp(0);
1348        let grouping_expr_1 = create_exp(1);
1349
1350        let grouping_sets = vec![
1351            aggregate_rel::Grouping {
1352                #[allow(deprecated)]
1353                grouping_expressions: vec![grouping_expr_0.clone()],
1354                expression_references: vec![],
1355            },
1356            aggregate_rel::Grouping {
1357                #[allow(deprecated)]
1358                grouping_expressions: vec![grouping_expr_0.clone(), grouping_expr_1.clone()],
1359                expression_references: vec![],
1360            },
1361        ];
1362
1363        let aggregate_rel = create_aggregate_rel(vec![], grouping_sets, vec![], None);
1364
1365        let rel = Rel {
1366            rel_type: Some(RelType::Aggregate(Box::new(aggregate_rel))),
1367        };
1368        let (result, errors) = ctx.textify(&rel);
1369
1370        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1371        assert!(result.contains("Aggregate[($0), ($0, $1) => $0, $1]"));
1372    }
1373
1374    #[test]
1375    fn test_multiple_groupings_with_measure_deprecated() {
1376        // Protobuf plan that uses AggregateRel.groupings with deprecated
1377        // grouping_expressions, leaving AggregateRel.grouping_expressions empty.
1378        let ctx = TestContext::new()
1379        .with_urn(1, "https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate.yaml")
1380        .with_function(1, 11, "count");
1381
1382        let agg_fn1 = get_aggregate_func(11, 2);
1383
1384        let grouping_expr_0 = create_exp(0);
1385        let grouping_expr_1 = create_exp(1);
1386
1387        let grouping_sets = vec![
1388            aggregate_rel::Grouping {
1389                #[allow(deprecated)]
1390                grouping_expressions: vec![grouping_expr_0.clone()],
1391                expression_references: vec![],
1392            },
1393            aggregate_rel::Grouping {
1394                #[allow(deprecated)]
1395                grouping_expressions: vec![grouping_expr_0.clone(), grouping_expr_1.clone()],
1396                expression_references: vec![],
1397            },
1398        ];
1399
1400        let measures = vec![aggregate_rel::Measure {
1401            measure: Some(agg_fn1),
1402            filter: None,
1403        }];
1404
1405        let aggregate_rel = create_aggregate_rel(vec![], grouping_sets, measures, None);
1406
1407        let rel = Rel {
1408            rel_type: Some(RelType::Aggregate(Box::new(aggregate_rel))),
1409        };
1410        let (result, errors) = ctx.textify(&rel);
1411
1412        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1413        assert!(result.contains("($0), ($0, $1) => $0, $1, count($2)"));
1414    }
1415
1416    #[test]
1417    fn test_multiple_groupings_on_aggregate() {
1418        let ctx = TestContext::new()
1419        .with_urn(1, "https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate.yaml")
1420        .with_function(1, 11, "count");
1421
1422        let agg_fn2 = get_aggregate_func(11, 2);
1423
1424        let grouping_expressions = vec![
1425            Expression {
1426                rex_type: Some(RexType::Selection(Box::new(
1427                    FieldIndex(0).to_field_reference(),
1428                ))),
1429            },
1430            Expression {
1431                rex_type: Some(RexType::Selection(Box::new(
1432                    FieldIndex(1).to_field_reference(),
1433                ))),
1434            },
1435        ];
1436
1437        let grouping_sets = vec![
1438            Grouping {
1439                #[allow(deprecated)]
1440                grouping_expressions: vec![],
1441                expression_references: vec![0, 1],
1442            },
1443            Grouping {
1444                #[allow(deprecated)]
1445                grouping_expressions: vec![],
1446                expression_references: vec![0, 1],
1447            },
1448            Grouping {
1449                #[allow(deprecated)]
1450                grouping_expressions: vec![],
1451                expression_references: vec![1],
1452            },
1453            Grouping {
1454                #[allow(deprecated)]
1455                grouping_expressions: vec![],
1456                expression_references: vec![1, 1],
1457            },
1458            Grouping {
1459                #[allow(deprecated)]
1460                grouping_expressions: vec![],
1461                expression_references: vec![],
1462            },
1463        ];
1464
1465        let measures = vec![aggregate_rel::Measure {
1466            measure: Some(agg_fn2),
1467            filter: None,
1468        }];
1469
1470        let aggregate_rel =
1471            create_aggregate_rel(grouping_expressions, grouping_sets, measures, None);
1472
1473        let rel = Rel {
1474            rel_type: Some(RelType::Aggregate(Box::new(aggregate_rel))),
1475        };
1476        let (result, errors) = ctx.textify(&rel);
1477
1478        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1479        assert!(
1480            result
1481                .contains("Aggregate[($0, $1), ($0, $1), ($1), ($1, $1), _ => $0, $1, count($2)]")
1482        );
1483    }
1484
1485    #[test]
1486    fn test_arguments_textify_positional_only() {
1487        let ctx = TestContext::new();
1488        let args = Arguments {
1489            positional: vec![Value::Integer(42), Value::Integer(7)],
1490            named: vec![],
1491        };
1492        let (result, errors) = ctx.textify(&args);
1493        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1494        assert_eq!(result, "42, 7");
1495    }
1496
1497    #[test]
1498    fn test_arguments_textify_named_only() {
1499        let ctx = TestContext::new();
1500        let args = Arguments {
1501            positional: vec![],
1502            named: vec![
1503                NamedArg {
1504                    name: Cow::Borrowed("limit"),
1505                    value: Value::Integer(10),
1506                },
1507                NamedArg {
1508                    name: Cow::Borrowed("offset"),
1509                    value: Value::Integer(5),
1510                },
1511            ],
1512        };
1513        let (result, errors) = ctx.textify(&args);
1514        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1515        assert_eq!(result, "limit=10, offset=5");
1516    }
1517
1518    #[test]
1519    fn test_join_relation_unknown_type() {
1520        let ctx = TestContext::new();
1521
1522        // Create a join with an unknown/invalid type
1523        let join_rel = JoinRel {
1524            left: Some(Box::new(Rel {
1525                rel_type: Some(RelType::Read(Box::default())),
1526            })),
1527            right: Some(Box::new(Rel {
1528                rel_type: Some(RelType::Read(Box::default())),
1529            })),
1530            expression: Some(Box::new(Expression::default())),
1531            r#type: 999, // Invalid join type
1532            common: None,
1533            post_join_filter: None,
1534            advanced_extension: None,
1535        };
1536
1537        let rel = Rel {
1538            rel_type: Some(RelType::Join(Box::new(join_rel))),
1539        };
1540        let (result, errors) = ctx.textify(&rel);
1541
1542        // Should contain error for unknown join type but still show condition and columns
1543        assert!(!errors.is_empty(), "Expected errors for unknown join type");
1544        assert!(
1545            result.contains("!{JoinRel}"),
1546            "Expected error token for unknown join type"
1547        );
1548        assert!(
1549            result.contains("Join["),
1550            "Expected Join relation to be formatted"
1551        );
1552    }
1553
1554    #[test]
1555    fn test_arguments_textify_both() {
1556        let ctx = TestContext::new();
1557        let args = Arguments {
1558            positional: vec![Value::Integer(1)],
1559            named: vec![NamedArg {
1560                name: "foo".into(),
1561                value: Value::Integer(2),
1562            }],
1563        };
1564        let (result, errors) = ctx.textify(&args);
1565        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1566        assert_eq!(result, "1, foo=2");
1567    }
1568
1569    #[test]
1570    fn test_arguments_textify_empty() {
1571        let ctx = TestContext::new();
1572        let args = Arguments {
1573            positional: vec![],
1574            named: vec![],
1575        };
1576        let (result, errors) = ctx.textify(&args);
1577        assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1578        assert_eq!(result, "_");
1579    }
1580
1581    #[test]
1582    fn test_named_arg_textify_error_token() {
1583        let ctx = TestContext::new();
1584        let named_arg = NamedArg {
1585            name: "foo".into(),
1586            value: Value::Missing(PlanError::invalid(
1587                "my_enum",
1588                Some(Cow::Borrowed("my_enum")),
1589                Cow::Borrowed("my_enum"),
1590            )),
1591        };
1592        let (result, errors) = ctx.textify(&named_arg);
1593        // Should show !{my_enum} in the output
1594        assert!(result.contains("foo=!{my_enum}"), "Output: {result}");
1595        // Should also accumulate an error
1596        assert!(!errors.is_empty(), "Expected error for error token");
1597    }
1598
1599    #[test]
1600    fn test_join_type_enum_textify() {
1601        // Test that JoinType enum values convert correctly to their string representation
1602        assert_eq!(join_rel::JoinType::Inner.as_enum_str().unwrap(), "Inner");
1603        assert_eq!(join_rel::JoinType::Left.as_enum_str().unwrap(), "Left");
1604        assert_eq!(
1605            join_rel::JoinType::LeftSemi.as_enum_str().unwrap(),
1606            "LeftSemi"
1607        );
1608        assert_eq!(
1609            join_rel::JoinType::LeftAnti.as_enum_str().unwrap(),
1610            "LeftAnti"
1611        );
1612    }
1613
1614    #[test]
1615    fn test_join_output_columns() {
1616        // Test Inner join - outputs all columns from both sides
1617        let inner_cols = super::join_output_columns(join_rel::JoinType::Inner, 2, 3);
1618        assert_eq!(inner_cols.len(), 5); // 2 + 3 = 5 columns
1619        assert!(matches!(inner_cols[0], Value::Reference(0)));
1620        assert!(matches!(inner_cols[4], Value::Reference(4)));
1621
1622        // Test LeftSemi join - outputs only left columns
1623        let left_semi_cols = super::join_output_columns(join_rel::JoinType::LeftSemi, 2, 3);
1624        assert_eq!(left_semi_cols.len(), 2); // Only left columns
1625        assert!(matches!(left_semi_cols[0], Value::Reference(0)));
1626        assert!(matches!(left_semi_cols[1], Value::Reference(1)));
1627
1628        // Test RightSemi join - outputs right columns as contiguous range starting from $0
1629        let right_semi_cols = super::join_output_columns(join_rel::JoinType::RightSemi, 2, 3);
1630        assert_eq!(right_semi_cols.len(), 3); // Only right columns
1631        assert!(matches!(right_semi_cols[0], Value::Reference(0))); // Contiguous range starts at $0
1632        assert!(matches!(right_semi_cols[1], Value::Reference(1)));
1633        assert!(matches!(right_semi_cols[2], Value::Reference(2))); // Last right column
1634
1635        // Test LeftMark join - outputs left columns plus a mark column as contiguous range
1636        let left_mark_cols = super::join_output_columns(join_rel::JoinType::LeftMark, 2, 3);
1637        assert_eq!(left_mark_cols.len(), 3); // 2 left + 1 mark
1638        assert!(matches!(left_mark_cols[0], Value::Reference(0)));
1639        assert!(matches!(left_mark_cols[1], Value::Reference(1)));
1640        assert!(matches!(left_mark_cols[2], Value::Reference(2))); // Mark column at contiguous position
1641
1642        // Test RightMark join - outputs right columns plus a mark column as contiguous range
1643        let right_mark_cols = super::join_output_columns(join_rel::JoinType::RightMark, 2, 3);
1644        assert_eq!(right_mark_cols.len(), 4); // 3 right + 1 mark
1645        assert!(matches!(right_mark_cols[0], Value::Reference(0))); // Contiguous range starts at $0
1646        assert!(matches!(right_mark_cols[1], Value::Reference(1)));
1647        assert!(matches!(right_mark_cols[2], Value::Reference(2))); // Last right column
1648        assert!(matches!(right_mark_cols[3], Value::Reference(3))); // Mark column at contiguous position
1649    }
1650
1651    fn get_aggregate_func(func_ref: u32, column_ind: i32) -> AggregateFunction {
1652        AggregateFunction {
1653            function_reference: func_ref,
1654            arguments: vec![FunctionArgument {
1655                arg_type: Some(ArgType::Value(Expression {
1656                    rex_type: Some(RexType::Selection(Box::new(
1657                        FieldIndex(column_ind).to_field_reference(),
1658                    ))),
1659                })),
1660            }],
1661            options: vec![],
1662            output_type: None,
1663            invocation: 0,
1664            phase: 0,
1665            sorts: vec![],
1666            #[allow(deprecated)]
1667            args: vec![],
1668        }
1669    }
1670
1671    fn create_aggregate_rel(
1672        grouping_expressions: Vec<Expression>,
1673        grouping_sets: Vec<Grouping>,
1674        measures: Vec<aggregate_rel::Measure>,
1675        common: Option<RelCommon>,
1676    ) -> AggregateRel {
1677        let common = common.or_else(|| {
1678            Some(RelCommon {
1679                emit_kind: Some(EmitKind::Direct(Direct {})),
1680                ..Default::default()
1681            })
1682        });
1683        AggregateRel {
1684            input: Some(Box::new(Rel {
1685                rel_type: Some(RelType::Read(Box::new(ReadRel {
1686                    common: None,
1687                    base_schema: Some(get_basic_schema()),
1688                    filter: None,
1689                    best_effort_filter: None,
1690                    projection: None,
1691                    advanced_extension: None,
1692                    read_type: Some(ReadType::NamedTable(NamedTable {
1693                        names: vec!["orders".into()],
1694                        advanced_extension: None,
1695                    })),
1696                }))),
1697            })),
1698            grouping_expressions,
1699            groupings: grouping_sets,
1700            measures,
1701            common,
1702            advanced_extension: None,
1703        }
1704    }
1705
1706    fn get_basic_schema() -> NamedStruct {
1707        NamedStruct {
1708            names: vec!["category".into(), "amount".into(), "value".into()],
1709            r#struct: Some(Struct {
1710                type_variation_reference: 0,
1711                types: vec![
1712                    Type {
1713                        kind: Some(Kind::String(ptype::String {
1714                            type_variation_reference: 0,
1715                            nullability: Nullability::Nullable as i32,
1716                        })),
1717                    },
1718                    Type {
1719                        kind: Some(Kind::Fp64(ptype::Fp64 {
1720                            type_variation_reference: 0,
1721                            nullability: Nullability::Nullable as i32,
1722                        })),
1723                    },
1724                    Type {
1725                        kind: Some(Kind::I32(ptype::I32 {
1726                            type_variation_reference: 0,
1727                            nullability: Nullability::Nullable as i32,
1728                        })),
1729                    },
1730                ],
1731                nullability: Nullability::Nullable as i32,
1732            }),
1733        }
1734    }
1735
1736    fn create_exp(column_ind: i32) -> Expression {
1737        Expression {
1738            rex_type: Some(RexType::Selection(Box::new(
1739                FieldIndex(column_ind).to_field_reference(),
1740            ))),
1741        }
1742    }
1743
1744    #[test]
1745    fn test_unsupported_rel_type_produces_failure_token() {
1746        use substrait::proto::CrossRel;
1747
1748        let ctx = TestContext::new();
1749
1750        // CrossRel is a valid Substrait relation type that the textifier
1751        // does not yet support.  Wrapping it in a Rel and textifying should
1752        // produce a `!{Rel}` failure token rather than panicking.
1753        let rel = Rel {
1754            rel_type: Some(RelType::Cross(Box::new(CrossRel {
1755                common: None,
1756                left: None,
1757                right: None,
1758                advanced_extension: None,
1759            }))),
1760        };
1761
1762        let (result, errors) = ctx.textify(&rel);
1763
1764        // The output should contain the failure token, not an empty string.
1765        assert!(
1766            result.contains("!{Rel}"),
1767            "Expected '!{{Rel}}' in output, got: {result}"
1768        );
1769
1770        // Exactly one error should have been collected.
1771        assert_eq!(errors.0.len(), 1, "Expected exactly one error: {errors:?}");
1772
1773        // The error should be a Format / Unimplemented error mentioning CrossRel.
1774        match &errors.0[0] {
1775            FormatError::Format(plan_err) => {
1776                assert_eq!(plan_err.message, "Rel");
1777                assert_eq!(
1778                    plan_err.error_type,
1779                    crate::textify::foundation::FormatErrorType::Unimplemented
1780                );
1781                assert!(
1782                    plan_err.lookup.as_deref().unwrap_or("").contains("Cross"),
1783                    "Expected lookup to mention 'Cross', got: {:?}",
1784                    plan_err.lookup
1785                );
1786            }
1787            other => panic!("Expected FormatError::Format, got: {other:?}"),
1788        }
1789    }
1790}