1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::convert::TryFrom;
4use std::fmt;
5use std::fmt::Debug;
6
7use prost::{Message, UnknownEnumValue};
8use substrait::proto::extensions::AdvancedExtension;
9use substrait::proto::fetch_rel::CountMode;
10use substrait::proto::plan_rel::RelType as PlanRelType;
11use substrait::proto::read_rel::ReadType;
12use substrait::proto::rel::RelType;
13use substrait::proto::rel_common::EmitKind;
14use substrait::proto::sort_field::{SortDirection, SortKind};
15use substrait::proto::{
16 AggregateFunction, AggregateRel, Expression, ExtensionLeafRel, ExtensionMultiRel,
17 ExtensionSingleRel, FetchRel, FilterRel, JoinRel, NamedStruct, PlanRel, ProjectRel, ReadRel,
18 Rel, RelCommon, RelRoot, SortField, SortRel, Type, join_rel,
19};
20
21use super::expressions::Reference;
22use super::types::Name;
23use super::{PlanError, Scope, Textify};
24use crate::FormatError;
25use crate::extensions::any::AnyRef;
26use crate::extensions::{ExtensionColumn, ExtensionValue};
27
28pub trait NamedRelation {
29 fn name(&self) -> &'static str;
30}
31
32impl NamedRelation for Rel {
33 fn name(&self) -> &'static str {
34 match self.rel_type.as_ref() {
35 None => "UnknownRel",
36 Some(RelType::Read(_)) => "Read",
37 Some(RelType::Filter(_)) => "Filter",
38 Some(RelType::Project(_)) => "Project",
39 Some(RelType::Fetch(_)) => "Fetch",
40 Some(RelType::Aggregate(_)) => "Aggregate",
41 Some(RelType::Sort(_)) => "Sort",
42 Some(RelType::HashJoin(_)) => "HashJoin",
43 Some(RelType::Exchange(_)) => "Exchange",
44 Some(RelType::Join(_)) => "Join",
45 Some(RelType::Set(_)) => "Set",
46 Some(RelType::ExtensionLeaf(_)) => "ExtensionLeaf",
47 Some(RelType::Cross(_)) => "Cross",
48 Some(RelType::Reference(_)) => "Reference",
49 Some(RelType::ExtensionSingle(_)) => "ExtensionSingle",
50 Some(RelType::ExtensionMulti(_)) => "ExtensionMulti",
51 Some(RelType::Write(_)) => "Write",
52 Some(RelType::Ddl(_)) => "Ddl",
53 Some(RelType::Update(_)) => "Update",
54 Some(RelType::MergeJoin(_)) => "MergeJoin",
55 Some(RelType::NestedLoopJoin(_)) => "NestedLoopJoin",
56 Some(RelType::Window(_)) => "Window",
57 Some(RelType::Expand(_)) => "Expand",
58 }
59 }
60}
61
62impl Textify for Rel {
63 fn name() -> &'static str {
64 "Rel"
65 }
66
67 fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
68 Relation::from_rel(self, ctx).textify(ctx, w)
71 }
72}
73
74pub trait ValueEnum {
80 fn as_enum_str(&self) -> Result<Cow<'static, str>, PlanError>;
81}
82
83#[derive(Debug, Clone)]
84pub struct NamedArg<'a> {
85 pub name: Cow<'a, str>,
86 pub value: Value<'a>,
87}
88
89#[derive(Debug, Clone)]
90pub enum Value<'a> {
91 Name(Name<'a>),
92 TableName(Vec<Name<'a>>),
93 Field(Option<Name<'a>>, Option<&'a Type>),
94 Tuple(Vec<Value<'a>>),
95 List(Vec<Value<'a>>),
96 Reference(i32),
97 Expression(&'a Expression),
98 AggregateFunction(&'a AggregateFunction),
99 Missing(PlanError),
101 Enum(Cow<'a, str>),
103 EmptyGroup,
104 Integer(i64),
105 Float(f64),
106 Boolean(bool),
107 ExtValue(ExtensionValue),
109 ExtColumn(ExtensionColumn),
111}
112
113impl<'a> Value<'a> {
114 pub fn expect(maybe_value: Option<Self>, f: impl FnOnce() -> PlanError) -> Self {
115 match maybe_value {
116 Some(s) => s,
117 None => Value::Missing(f()),
118 }
119 }
120}
121
122impl<'a> From<Result<Vec<Name<'a>>, PlanError>> for Value<'a> {
123 fn from(token: Result<Vec<Name<'a>>, PlanError>) -> Self {
124 match token {
125 Ok(value) => Value::TableName(value),
126 Err(err) => Value::Missing(err),
127 }
128 }
129}
130
131impl<'a> Textify for Value<'a> {
132 fn name() -> &'static str {
133 "Value"
134 }
135
136 fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
137 match self {
138 Value::Name(name) => write!(w, "{}", ctx.display(name)),
139 Value::TableName(names) => write!(w, "{}", ctx.separated(names, ".")),
140 Value::Field(name, typ) => {
141 write!(w, "{}:{}", ctx.expect(name.as_ref()), ctx.expect(*typ))
142 }
143 Value::Tuple(values) => write!(w, "({})", ctx.separated(values, ", ")),
144 Value::List(values) => write!(w, "[{}]", ctx.separated(values, ", ")),
145 Value::Reference(i) => write!(w, "{}", Reference(*i)),
146 Value::Expression(e) => write!(w, "{}", ctx.display(*e)),
147 Value::AggregateFunction(agg_fn) => agg_fn.textify(ctx, w),
148 Value::Missing(err) => write!(w, "{}", ctx.failure(err.clone())),
149 Value::Enum(res) => write!(w, "&{res}"),
150 Value::Integer(i) => write!(w, "{i}"),
151 Value::EmptyGroup => write!(w, "_"),
152 Value::Float(f) => write!(w, "{f}"),
153 Value::Boolean(b) => write!(w, "{b}"),
154 Value::ExtValue(ev) => ev.textify(ctx, w),
155 Value::ExtColumn(ec) => ec.textify(ctx, w),
156 }
157 }
158}
159
160fn schema_to_values<'a>(schema: &'a NamedStruct) -> Vec<Value<'a>> {
161 let mut fields = schema
162 .r#struct
163 .as_ref()
164 .map(|s| s.types.iter())
165 .into_iter()
166 .flatten();
167 let mut names = schema.names.iter();
168
169 let mut values = Vec::new();
173 loop {
174 let field = fields.next();
175 let name = names.next().map(|n| Name(n));
176 if field.is_none() && name.is_none() {
177 break;
178 }
179
180 values.push(Value::Field(name, field));
181 }
182
183 values
184}
185
186struct Emitted<'a> {
187 pub values: &'a [Value<'a>],
188 pub emit: Option<&'a EmitKind>,
189}
190
191impl<'a> Emitted<'a> {
192 pub fn new(values: &'a [Value<'a>], emit: Option<&'a EmitKind>) -> Self {
193 Self { values, emit }
194 }
195
196 pub fn write_direct<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
197 write!(w, "{}", ctx.separated(self.values.iter(), ", "))
198 }
199}
200
201impl<'a> Textify for Emitted<'a> {
202 fn name() -> &'static str {
203 "Emitted"
204 }
205
206 fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
207 if ctx.options().show_emit {
208 return self.write_direct(ctx, w);
209 }
210
211 let indices = match &self.emit {
212 Some(EmitKind::Emit(e)) => &e.output_mapping,
213 Some(EmitKind::Direct(_)) => return self.write_direct(ctx, w),
214 None => return self.write_direct(ctx, w),
215 };
216
217 for (i, &index) in indices.iter().enumerate() {
218 if i > 0 {
219 write!(w, ", ")?;
220 }
221
222 match self.values.get(index as usize) {
223 Some(value) => write!(w, "{}", ctx.display(value))?,
224 None => write!(w, "{}", ctx.failure(PlanError::invalid(
225 "Emitted",
226 Some("output_mapping"),
227 format!(
228 "Output mapping index {} is out of bounds for values collection of size {}",
229 index, self.values.len()
230 )
231 )))?,
232 }
233 }
234
235 Ok(())
236 }
237}
238
239#[derive(Debug, Clone)]
240pub struct Arguments<'a> {
241 pub positional: Vec<Value<'a>>,
243 pub named: Vec<NamedArg<'a>>,
245}
246
247impl<'a> Textify for Arguments<'a> {
248 fn name() -> &'static str {
249 "Arguments"
250 }
251 fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
252 if self.positional.is_empty() && self.named.is_empty() {
253 return write!(w, "_");
254 }
255
256 write!(w, "{}", ctx.separated(self.positional.iter(), ", "))?;
257 if !self.positional.is_empty() && !self.named.is_empty() {
258 write!(w, ", ")?;
259 }
260 write!(w, "{}", ctx.separated(self.named.iter(), ", "))
261 }
262}
263
264pub struct Relation<'a> {
265 pub name: Cow<'a, str>,
266 pub arguments: Option<Arguments<'a>>,
275 pub columns: Vec<Value<'a>>,
278 pub emit: Option<&'a EmitKind>,
280 pub advanced_extension: Option<&'a AdvancedExtension>,
286 pub children: Vec<Option<Relation<'a>>>,
288}
289
290impl Textify for Relation<'_> {
291 fn name() -> &'static str {
292 "Relation"
293 }
294
295 fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
296 self.write_header(ctx, w)?;
297 let child_scope = ctx.push_indent();
298 if let Some(adv_ext) = self.advanced_extension {
302 adv_ext.textify(&child_scope, w)?;
303 }
304 self.write_children(ctx, w)?;
305 Ok(())
306 }
307}
308
309impl Relation<'_> {
310 pub fn write_header<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
314 let cols = Emitted::new(&self.columns, self.emit);
315 let indent = ctx.indent();
316 let name = &self.name;
317 let cols = ctx.display(&cols);
318 match &self.arguments {
319 None => {
320 write!(w, "{indent}{name}[{cols}]")
321 }
322 Some(args) => {
323 let args = ctx.display(args);
324 write!(w, "{indent}{name}[{args} => {cols}]")
325 }
326 }
327 }
328
329 pub fn write_children<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
332 let child_scope = ctx.push_indent();
333 for child in self.children.iter().flatten() {
334 writeln!(w)?;
335 child.textify(&child_scope, w)?;
336 }
337 Ok(())
338 }
339}
340
341impl<'a> Relation<'a> {
342 pub fn emitted(&self) -> usize {
343 match self.emit {
344 Some(EmitKind::Emit(e)) => e.output_mapping.len(),
345 Some(EmitKind::Direct(_)) => self.columns.len(),
346 None => self.columns.len(),
347 }
348 }
349}
350
351#[derive(Debug, Copy, Clone)]
352pub struct TableName<'a>(&'a [String]);
353
354impl<'a> Textify for TableName<'a> {
355 fn name() -> &'static str {
356 "TableName"
357 }
358
359 fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
360 let names = self.0.iter().map(|n| Name(n)).collect::<Vec<_>>();
361 write!(w, "{}", ctx.separated(names.iter(), "."))
362 }
363}
364
365impl<'a> Relation<'a> {
366 fn from_read<S: Scope>(rel: &'a ReadRel, _ctx: &S) -> Self {
367 let columns = read_columns(rel);
368 let emit = rel.common.as_ref().and_then(|c| c.emit_kind.as_ref());
369
370 match &rel.read_type {
371 Some(ReadType::NamedTable(table)) => {
372 let table_name = Value::TableName(table.names.iter().map(|n| Name(n)).collect());
373 Relation {
374 name: Cow::Borrowed("Read"),
375 arguments: Some(Arguments {
376 positional: vec![table_name],
377 named: vec![],
378 }),
379 columns,
380 emit,
381 advanced_extension: rel.advanced_extension.as_ref(),
382 children: vec![],
383 }
384 }
385 Some(ReadType::VirtualTable(vt)) => {
386 let positional = vt
387 .expressions
388 .iter()
389 .map(|row| Value::Tuple(row.fields.iter().map(Value::Expression).collect()))
390 .collect();
391
392 Relation {
393 name: Cow::Borrowed("Read:Virtual"),
394 arguments: Some(Arguments {
395 positional,
396 named: vec![],
397 }),
398 columns,
399 emit,
400 advanced_extension: rel.advanced_extension.as_ref(),
401 children: vec![],
402 }
403 }
404 other => {
405 let err = PlanError::unimplemented(
406 "ReadRel",
407 Some("read_type"),
408 format!("Unsupported read type {other:?}"),
409 );
410 Relation {
411 name: Cow::Borrowed("Read"),
412 arguments: Some(Arguments {
413 positional: vec![Value::Missing(err)],
414 named: vec![],
415 }),
416 columns,
417 emit,
418 advanced_extension: rel.advanced_extension.as_ref(),
419 children: vec![],
420 }
421 }
422 }
423 }
424}
425
426fn read_columns<'a>(rel: &'a ReadRel) -> Vec<Value<'a>> {
427 match rel.base_schema {
428 Some(ref schema) => schema_to_values(schema),
429 None => {
430 let err =
431 PlanError::unimplemented("ReadRel", Some("base_schema"), "Base schema is required");
432 vec![Value::Missing(err)]
433 }
434 }
435}
436
437pub fn get_emit(rel: Option<&RelCommon>) -> Option<&EmitKind> {
438 rel.as_ref().and_then(|c| c.emit_kind.as_ref())
439}
440
441impl<'a> Relation<'a> {
442 pub fn input_refs(&self) -> Vec<Value<'a>> {
450 let len = self.emitted();
451 (0..len).map(|i| Value::Reference(i as i32)).collect()
452 }
453
454 pub fn convert_children<S: Scope>(
458 refs: Vec<Option<&'a Rel>>,
459 ctx: &S,
460 ) -> (Vec<Option<Relation<'a>>>, usize) {
461 let mut children = vec![];
462 let mut inputs = 0;
463
464 for maybe_rel in refs {
465 match maybe_rel {
466 Some(rel) => {
467 let child = Relation::from_rel(rel, ctx);
468 inputs += child.emitted();
469 children.push(Some(child));
470 }
471 None => children.push(None),
472 }
473 }
474
475 (children, inputs)
476 }
477}
478
479impl<'a> Relation<'a> {
480 fn from_filter<S: Scope>(rel: &'a FilterRel, ctx: &S) -> Self {
481 let condition = rel
482 .condition
483 .as_ref()
484 .map(|c| Value::Expression(c.as_ref()));
485 let condition = Value::expect(condition, || {
486 PlanError::unimplemented("FilterRel", Some("condition"), "Condition is None")
487 });
488 let positional = vec![condition];
489 let arguments = Some(Arguments {
490 positional,
491 named: vec![],
492 });
493 let emit = get_emit(rel.common.as_ref());
494 let (children, columns) = Relation::convert_children(vec![rel.input.as_deref()], ctx);
495 let columns = (0..columns).map(|i| Value::Reference(i as i32)).collect();
496
497 Relation {
498 name: Cow::Borrowed("Filter"),
499 arguments,
500 columns,
501 emit,
502 advanced_extension: rel.advanced_extension.as_ref(),
503 children,
504 }
505 }
506
507 fn from_project<S: Scope>(rel: &'a ProjectRel, ctx: &S) -> Self {
508 let (children, input_columns) = Relation::convert_children(vec![rel.input.as_deref()], ctx);
509 let mut columns: Vec<Value> = vec![];
510 for i in 0..input_columns {
511 columns.push(Value::Reference(i as i32));
512 }
513 for expr in &rel.expressions {
514 columns.push(Value::Expression(expr));
515 }
516
517 Relation {
518 name: Cow::Borrowed("Project"),
519 arguments: None,
520 columns,
521 emit: get_emit(rel.common.as_ref()),
522 advanced_extension: rel.advanced_extension.as_ref(),
523 children,
524 }
525 }
526
527 pub fn from_rel<S: Scope>(rel: &'a Rel, ctx: &S) -> Self {
528 match rel.rel_type.as_ref() {
529 Some(RelType::Read(r)) => Relation::from_read(r, ctx),
530 Some(RelType::Filter(r)) => Relation::from_filter(r, ctx),
531 Some(RelType::Project(r)) => Relation::from_project(r, ctx),
532 Some(RelType::Aggregate(r)) => Relation::from_aggregate(r, ctx),
533 Some(RelType::Sort(r)) => Relation::from_sort(r, ctx),
534 Some(RelType::Fetch(r)) => Relation::from_fetch(r, ctx),
535 Some(RelType::Join(r)) => Relation::from_join(r, ctx),
536 Some(RelType::ExtensionLeaf(r)) => Relation::from_extension_leaf(r, ctx),
537 Some(RelType::ExtensionSingle(r)) => Relation::from_extension_single(r, ctx),
538 Some(RelType::ExtensionMulti(r)) => Relation::from_extension_multi(r, ctx),
539 _ => {
540 let name = rel.name();
541 let token = ctx.failure(FormatError::Format(PlanError::unimplemented(
542 "Rel",
543 Some(name),
544 format!("{name} is not yet supported in the text format"),
545 )));
546 Relation {
547 name: Cow::Owned(format!("{token}")),
548 arguments: None,
549 columns: vec![],
550 emit: None,
551 advanced_extension: None,
552 children: vec![],
553 }
554 }
555 }
556 }
557
558 fn from_extension_leaf<S: Scope>(rel: &'a ExtensionLeafRel, ctx: &S) -> Self {
559 let detail_ref = rel.detail.as_ref().map(AnyRef::from);
560 let decoded = match detail_ref {
561 Some(d) => ctx.extension_registry().decode(d),
562 None => Err(crate::extensions::registry::ExtensionError::MissingDetail),
563 };
564 Relation::from_extension("ExtensionLeaf", decoded, vec![], ctx)
565 }
566
567 fn from_extension_single<S: Scope>(rel: &'a ExtensionSingleRel, ctx: &S) -> Self {
568 let detail_ref = rel.detail.as_ref().map(AnyRef::from);
569 let decoded = match detail_ref {
570 Some(d) => ctx.extension_registry().decode(d),
571 None => Err(crate::extensions::registry::ExtensionError::MissingDetail),
572 };
573 Relation::from_extension("ExtensionSingle", decoded, vec![rel.input.as_deref()], ctx)
574 }
575
576 fn from_extension_multi<S: Scope>(rel: &'a ExtensionMultiRel, ctx: &S) -> Self {
577 let detail_ref = rel.detail.as_ref().map(AnyRef::from);
578 let decoded = match detail_ref {
579 Some(d) => ctx.extension_registry().decode(d),
580 None => Err(crate::extensions::registry::ExtensionError::MissingDetail),
581 };
582 let mut child_refs: Vec<Option<&'a Rel>> = vec![];
583 for input in &rel.inputs {
584 child_refs.push(Some(input));
585 }
586 Relation::from_extension("ExtensionMulti", decoded, child_refs, ctx)
587 }
588
589 fn from_extension<S: Scope>(
590 ext_type: &'static str,
591 decoded: Result<
592 (String, crate::extensions::ExtensionArgs),
593 crate::extensions::registry::ExtensionError,
594 >,
595 child_refs: Vec<Option<&'a Rel>>,
596 ctx: &S,
597 ) -> Self {
598 match decoded {
599 Ok((name, args)) => {
600 let (children, _) = Relation::convert_children(child_refs, ctx);
601 let mut positional = vec![];
602 for value in args.positional {
603 positional.push(Value::ExtValue(value));
604 }
605 let mut named = vec![];
606 for (key, value) in args.named {
607 named.push(NamedArg {
608 name: Cow::Owned(key),
609 value: Value::ExtValue(value),
610 });
611 }
612 let mut columns = vec![];
613 for col in args.output_columns {
614 columns.push(Value::ExtColumn(col));
615 }
616 Relation {
617 name: Cow::Owned(format!("{}:{}", ext_type, name)),
618 arguments: Some(Arguments { positional, named }),
619 columns,
620 emit: None,
621 advanced_extension: None,
625 children,
626 }
627 }
628 Err(error) => {
629 let (children, _) = Relation::convert_children(child_refs, ctx);
630 Relation {
631 name: Cow::Borrowed(ext_type),
632 arguments: None,
633 columns: vec![Value::Missing(PlanError::invalid(
634 "extension",
635 None::<&str>,
636 error.to_string(),
637 ))],
638 emit: None,
639 advanced_extension: None,
640 children,
641 }
642 }
643 }
644 }
645
646 fn from_aggregate<S: Scope>(rel: &'a AggregateRel, ctx: &S) -> Self {
656 let mut grouping_sets: Vec<Vec<Value>> = vec![]; let expression_list: Vec<Value>; #[allow(deprecated)]
663 if rel.grouping_expressions.is_empty()
664 && !rel.groupings.is_empty()
665 && !rel.groupings[0].grouping_expressions.is_empty()
666 {
667 (expression_list, grouping_sets) = Relation::get_grouping_sets(rel);
668 } else {
669 expression_list = rel
670 .grouping_expressions
671 .iter()
672 .map(Value::Expression)
673 .collect::<Vec<_>>(); for group in &rel.groupings {
675 let mut grouping_set: Vec<Value> = vec![];
676 for i in &group.expression_references {
677 grouping_set.push(Value::Reference(*i as i32));
678 }
679 grouping_sets.push(grouping_set);
680 }
681 if rel.groupings.is_empty() {
683 grouping_sets.push(vec![]);
684 }
685 }
686
687 let is_single = grouping_sets.len() == 1;
688 let mut positional: Vec<Value> = vec![];
689 for g in grouping_sets {
690 if g.is_empty() {
691 positional.push(Value::EmptyGroup);
692 } else if is_single {
693 positional.extend(g);
695 } else {
696 positional.push(Value::Tuple(g));
697 }
698 }
699
700 let arguments = Some(Arguments {
702 positional,
703 named: vec![],
704 });
705
706 let mut all_outputs: Vec<Value> = expression_list;
708
709 for m in &rel.measures {
712 if let Some(agg_fn) = m.measure.as_ref() {
713 all_outputs.push(Value::AggregateFunction(agg_fn));
714 }
715 }
716 let emit = get_emit(rel.common.as_ref());
717 let (children, _) = Relation::convert_children(vec![rel.input.as_deref()], ctx);
718
719 Relation {
720 name: Cow::Borrowed("Aggregate"),
721 arguments,
722 columns: all_outputs,
723 emit,
724 advanced_extension: rel.advanced_extension.as_ref(),
725 children,
726 }
727 }
728
729 fn get_grouping_sets(rel: &'a AggregateRel) -> (Vec<Value<'a>>, Vec<Vec<Value<'a>>>) {
730 let mut grouping_sets: Vec<Vec<Value>> = vec![];
731 let mut expression_list: Vec<Value> = Vec::new();
732
733 let mut expression_index_map = HashMap::new();
735 let mut i: i32 = 0; for group in &rel.groupings {
738 let mut grouping_set: Vec<Value> = vec![];
739 #[allow(deprecated)]
740 for exp in &group.grouping_expressions {
741 let key = exp.encode_to_vec();
745 expression_index_map.entry(key.clone()).or_insert_with(|| {
746 let value = Value::Expression(exp);
747 expression_list.push(value); let index = i;
750 i += 1;
751 index });
753 grouping_set.push(Value::Reference(expression_index_map[&key]));
754 }
755 grouping_sets.push(grouping_set);
756 }
757 (expression_list, grouping_sets)
758 }
759}
760
761impl Textify for RelRoot {
762 fn name() -> &'static str {
763 "RelRoot"
764 }
765
766 fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
767 let names = self.names.iter().map(|n| Name(n)).collect::<Vec<_>>();
768
769 write!(
770 w,
771 "{}Root[{}]",
772 ctx.indent(),
773 ctx.separated(names.iter(), ", ")
774 )?;
775 let child_scope = ctx.push_indent();
776 for child in self.input.iter() {
777 writeln!(w)?;
778 child.textify(&child_scope, w)?;
779 }
780
781 Ok(())
782 }
783}
784
785impl Textify for PlanRelType {
786 fn name() -> &'static str {
787 "PlanRelType"
788 }
789
790 fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
791 match self {
792 PlanRelType::Rel(rel) => rel.textify(ctx, w),
793 PlanRelType::Root(root) => root.textify(ctx, w),
794 }
795 }
796}
797
798impl Textify for PlanRel {
799 fn name() -> &'static str {
800 "PlanRel"
801 }
802
803 fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
806 write!(w, "{}", ctx.expect(self.rel_type.as_ref()))
807 }
808}
809
810impl<'a> Relation<'a> {
811 fn from_sort<S: Scope>(rel: &'a SortRel, ctx: &S) -> Self {
812 let (children, input_columns) = Relation::convert_children(vec![rel.input.as_deref()], ctx);
813 let mut positional = vec![];
814 for sort_field in &rel.sorts {
815 positional.push(Value::from(sort_field));
816 }
817 let arguments = Some(Arguments {
818 positional,
819 named: vec![],
820 });
821 let mut col_values = vec![];
823 for i in 0..input_columns {
824 col_values.push(Value::Reference(i as i32));
825 }
826 let emit = get_emit(rel.common.as_ref());
827 Relation {
828 name: Cow::Borrowed("Sort"),
829 arguments,
830 columns: col_values,
831 emit,
832 advanced_extension: rel.advanced_extension.as_ref(),
833 children,
834 }
835 }
836
837 fn from_fetch<S: Scope>(rel: &'a FetchRel, ctx: &S) -> Self {
838 let (children, input_columns) = Relation::convert_children(vec![rel.input.as_deref()], ctx);
839 let mut named_args: Vec<NamedArg> = vec![];
840 match &rel.count_mode {
841 Some(CountMode::CountExpr(expr)) => {
842 named_args.push(NamedArg {
843 name: Cow::Borrowed("limit"),
844 value: Value::Expression(expr),
845 });
846 }
847 #[allow(deprecated)]
848 Some(CountMode::Count(val)) => {
849 named_args.push(NamedArg {
850 name: Cow::Borrowed("limit"),
851 value: Value::Integer(*val),
852 });
853 }
854 None => {}
855 }
856 if let Some(offset) = &rel.offset_mode {
857 match offset {
858 substrait::proto::fetch_rel::OffsetMode::OffsetExpr(expr) => {
859 named_args.push(NamedArg {
860 name: Cow::Borrowed("offset"),
861 value: Value::Expression(expr),
862 });
863 }
864 #[allow(deprecated)]
865 substrait::proto::fetch_rel::OffsetMode::Offset(val) => {
866 named_args.push(NamedArg {
867 name: Cow::Borrowed("offset"),
868 value: Value::Integer(*val),
869 });
870 }
871 }
872 }
873
874 let emit = get_emit(rel.common.as_ref());
875 let columns: Vec<Value> = (0..input_columns)
877 .map(|i| Value::Reference(i as i32))
878 .collect();
879 Relation {
880 name: Cow::Borrowed("Fetch"),
881 arguments: Some(Arguments {
882 positional: vec![],
883 named: named_args,
884 }),
885 columns,
886 emit,
887 advanced_extension: rel.advanced_extension.as_ref(),
888 children,
889 }
890 }
891}
892
893fn join_output_columns(
894 join_type: join_rel::JoinType,
895 left_columns: usize,
896 right_columns: usize,
897) -> Vec<Value<'static>> {
898 let total_columns = match join_type {
899 join_rel::JoinType::Inner
901 | join_rel::JoinType::Left
902 | join_rel::JoinType::Right
903 | join_rel::JoinType::Outer => left_columns + right_columns,
904
905 join_rel::JoinType::LeftSemi | join_rel::JoinType::LeftAnti => left_columns,
907
908 join_rel::JoinType::RightSemi | join_rel::JoinType::RightAnti => right_columns,
910
911 join_rel::JoinType::LeftSingle => left_columns,
913 join_rel::JoinType::RightSingle => right_columns,
914
915 join_rel::JoinType::LeftMark => left_columns + 1,
917 join_rel::JoinType::RightMark => right_columns + 1,
918
919 join_rel::JoinType::Unspecified => left_columns + right_columns,
921 };
922
923 (0..total_columns)
925 .map(|i| Value::Reference(i as i32))
926 .collect()
927}
928
929impl<'a> Relation<'a> {
930 fn from_join<S: Scope>(rel: &'a JoinRel, ctx: &S) -> Self {
931 let (children, _total_columns) =
932 Relation::convert_children(vec![rel.left.as_deref(), rel.right.as_deref()], ctx);
933
934 assert_eq!(
936 children.len(),
937 2,
938 "convert_children should return same number of elements as input"
939 );
940
941 let left_columns = match &children[0] {
943 Some(child) => child.emitted(),
944 None => 0,
945 };
946 let right_columns = match &children[1] {
947 Some(child) => child.emitted(),
948 None => 0,
949 };
950
951 let (join_type, join_type_value) = match join_rel::JoinType::try_from(rel.r#type) {
954 Ok(join_type) => {
955 let join_type_value = match join_type.as_enum_str() {
956 Ok(s) => Value::Enum(s),
957 Err(e) => Value::Missing(e),
958 };
959 (join_type, join_type_value)
960 }
961 Err(_) => {
962 let join_type_error = Value::Missing(PlanError::invalid(
964 "JoinRel",
965 Some("type"),
966 format!("Unknown join type: {}", rel.r#type),
967 ));
968 (join_rel::JoinType::Unspecified, join_type_error)
969 }
970 };
971
972 let condition = rel
974 .expression
975 .as_ref()
976 .map(|c| Value::Expression(c.as_ref()));
977 let condition = Value::expect(condition, || {
978 PlanError::unimplemented("JoinRel", Some("expression"), "Join condition is None")
979 });
980
981 let positional = vec![join_type_value, condition];
985 let arguments = Some(Arguments {
986 positional,
987 named: vec![],
988 });
989
990 let emit = get_emit(rel.common.as_ref());
991 let columns = join_output_columns(join_type, left_columns, right_columns);
992
993 Relation {
994 name: Cow::Borrowed("Join"),
995 arguments,
996 columns,
997 emit,
998 advanced_extension: rel.advanced_extension.as_ref(),
999 children,
1000 }
1001 }
1002}
1003
1004impl<'a> From<&'a SortField> for Value<'a> {
1005 fn from(sf: &'a SortField) -> Self {
1006 let field = match &sf.expr {
1007 Some(expr) => match &expr.rex_type {
1008 Some(substrait::proto::expression::RexType::Selection(fref)) => {
1009 if let Some(substrait::proto::expression::field_reference::ReferenceType::DirectReference(seg)) = &fref.reference_type {
1010 if let Some(substrait::proto::expression::reference_segment::ReferenceType::StructField(sf)) = &seg.reference_type {
1011 Value::Reference(sf.field)
1012 } else { Value::Missing(PlanError::unimplemented("SortField", Some("expr"), "Not a struct field")) }
1013 } else { Value::Missing(PlanError::unimplemented("SortField", Some("expr"), "Not a direct reference")) }
1014 }
1015 _ => Value::Missing(PlanError::unimplemented(
1016 "SortField",
1017 Some("expr"),
1018 "Not a selection",
1019 )),
1020 },
1021 None => Value::Missing(PlanError::unimplemented(
1022 "SortField",
1023 Some("expr"),
1024 "Missing expr",
1025 )),
1026 };
1027 let direction = match &sf.sort_kind {
1028 Some(kind) => Value::from(kind),
1029 None => Value::Missing(PlanError::invalid(
1030 "SortKind",
1031 Some(Cow::Borrowed("sort_kind")),
1032 "Missing sort_kind",
1033 )),
1034 };
1035 Value::Tuple(vec![field, direction])
1036 }
1037}
1038
1039impl<'a, T: ValueEnum + ?Sized> From<&'a T> for Value<'a> {
1040 fn from(enum_val: &'a T) -> Self {
1041 match enum_val.as_enum_str() {
1042 Ok(s) => Value::Enum(s),
1043 Err(e) => Value::Missing(e),
1044 }
1045 }
1046}
1047
1048impl ValueEnum for SortKind {
1049 fn as_enum_str(&self) -> Result<Cow<'static, str>, PlanError> {
1050 let d = match self {
1051 &SortKind::Direction(d) => SortDirection::try_from(d),
1052 SortKind::ComparisonFunctionReference(f) => {
1053 return Err(PlanError::invalid(
1054 "SortKind",
1055 Some(Cow::Owned(format!("function reference{f}"))),
1056 "SortKind::ComparisonFunctionReference unimplemented",
1057 ));
1058 }
1059 };
1060 let s = match d {
1061 Err(UnknownEnumValue(d)) => {
1062 return Err(PlanError::invalid(
1063 "SortKind",
1064 Some(Cow::Owned(format!("unknown variant: {d:?}"))),
1065 "Unknown SortDirection",
1066 ));
1067 }
1068 Ok(SortDirection::AscNullsFirst) => "AscNullsFirst",
1069 Ok(SortDirection::AscNullsLast) => "AscNullsLast",
1070 Ok(SortDirection::DescNullsFirst) => "DescNullsFirst",
1071 Ok(SortDirection::DescNullsLast) => "DescNullsLast",
1072 Ok(SortDirection::Clustered) => "Clustered",
1073 Ok(SortDirection::Unspecified) => {
1074 return Err(PlanError::invalid(
1075 "SortKind",
1076 Option::<Cow<str>>::None,
1077 "Unspecified SortDirection",
1078 ));
1079 }
1080 };
1081 Ok(Cow::Borrowed(s))
1082 }
1083}
1084
1085impl ValueEnum for join_rel::JoinType {
1086 fn as_enum_str(&self) -> Result<Cow<'static, str>, PlanError> {
1087 let s = match self {
1088 join_rel::JoinType::Unspecified => {
1089 return Err(PlanError::invalid(
1090 "JoinType",
1091 Option::<Cow<str>>::None,
1092 "Unspecified JoinType",
1093 ));
1094 }
1095 join_rel::JoinType::Inner => "Inner",
1096 join_rel::JoinType::Outer => "Outer",
1097 join_rel::JoinType::Left => "Left",
1098 join_rel::JoinType::Right => "Right",
1099 join_rel::JoinType::LeftSemi => "LeftSemi",
1100 join_rel::JoinType::RightSemi => "RightSemi",
1101 join_rel::JoinType::LeftAnti => "LeftAnti",
1102 join_rel::JoinType::RightAnti => "RightAnti",
1103 join_rel::JoinType::LeftSingle => "LeftSingle",
1104 join_rel::JoinType::RightSingle => "RightSingle",
1105 join_rel::JoinType::LeftMark => "LeftMark",
1106 join_rel::JoinType::RightMark => "RightMark",
1107 };
1108 Ok(Cow::Borrowed(s))
1109 }
1110}
1111
1112impl<'a> Textify for NamedArg<'a> {
1113 fn name() -> &'static str {
1114 "NamedArg"
1115 }
1116 fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
1117 write!(w, "{}=", self.name)?;
1118 self.value.textify(ctx, w)
1119 }
1120}
1121
1122#[cfg(test)]
1123mod tests {
1124 use substrait::proto::aggregate_rel::Grouping;
1125 use substrait::proto::expression::literal::LiteralType;
1126 use substrait::proto::expression::{Literal, RexType, ScalarFunction};
1127 use substrait::proto::function_argument::ArgType;
1128 use substrait::proto::read_rel::{NamedTable, ReadType};
1129 use substrait::proto::rel_common::{Direct, Emit};
1130 use substrait::proto::r#type::{self as ptype, Kind, Nullability, Struct};
1131 use substrait::proto::{
1132 Expression, FunctionArgument, NamedStruct, ReadRel, Type, aggregate_rel,
1133 };
1134
1135 use super::*;
1136 use crate::fixtures::TestContext;
1137 use crate::parser::expressions::FieldIndex;
1138
1139 #[test]
1140 fn test_read_rel() {
1141 let ctx = TestContext::new();
1142
1143 let read_rel = ReadRel {
1145 common: None,
1146 base_schema: Some(NamedStruct {
1147 names: vec!["col1".into(), "column 2".into()],
1148 r#struct: Some(Struct {
1149 type_variation_reference: 0,
1150 types: vec![
1151 Type {
1152 kind: Some(Kind::I32(ptype::I32 {
1153 type_variation_reference: 0,
1154 nullability: Nullability::Nullable as i32,
1155 })),
1156 },
1157 Type {
1158 kind: Some(Kind::String(ptype::String {
1159 type_variation_reference: 0,
1160 nullability: Nullability::Nullable as i32,
1161 })),
1162 },
1163 ],
1164 nullability: Nullability::Nullable as i32,
1165 }),
1166 }),
1167 filter: None,
1168 best_effort_filter: None,
1169 projection: None,
1170 advanced_extension: None,
1171 read_type: Some(ReadType::NamedTable(NamedTable {
1172 names: vec!["some_db".into(), "test_table".into()],
1173 advanced_extension: None,
1174 })),
1175 };
1176
1177 let rel = Rel {
1178 rel_type: Some(RelType::Read(Box::new(read_rel))),
1179 };
1180 let (result, errors) = ctx.textify(&rel);
1181 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1182 assert_eq!(
1183 result,
1184 "Read[some_db.test_table => col1:i32?, \"column 2\":string?]"
1185 );
1186 }
1187
1188 #[test]
1189 fn test_filter_rel() {
1190 let ctx = TestContext::new()
1191 .with_urn(1, "test_urn")
1192 .with_function(1, 10, "gt");
1193
1194 let read_rel = ReadRel {
1196 common: None,
1197 base_schema: Some(NamedStruct {
1198 names: vec!["col1".into(), "col2".into()],
1199 r#struct: Some(Struct {
1200 type_variation_reference: 0,
1201 types: vec![
1202 Type {
1203 kind: Some(Kind::I32(ptype::I32 {
1204 type_variation_reference: 0,
1205 nullability: Nullability::Nullable as i32,
1206 })),
1207 },
1208 Type {
1209 kind: Some(Kind::I32(ptype::I32 {
1210 type_variation_reference: 0,
1211 nullability: Nullability::Nullable as i32,
1212 })),
1213 },
1214 ],
1215 nullability: Nullability::Nullable as i32,
1216 }),
1217 }),
1218 filter: None,
1219 best_effort_filter: None,
1220 projection: None,
1221 advanced_extension: None,
1222 read_type: Some(ReadType::NamedTable(NamedTable {
1223 names: vec!["test_table".into()],
1224 advanced_extension: None,
1225 })),
1226 };
1227
1228 let filter_expr = Expression {
1230 rex_type: Some(RexType::ScalarFunction(ScalarFunction {
1231 function_reference: 10, arguments: vec![
1233 FunctionArgument {
1234 arg_type: Some(ArgType::Value(Reference(0).into())),
1235 },
1236 FunctionArgument {
1237 arg_type: Some(ArgType::Value(Expression {
1238 rex_type: Some(RexType::Literal(Literal {
1239 literal_type: Some(LiteralType::I32(10)),
1240 nullable: false,
1241 type_variation_reference: 0,
1242 })),
1243 })),
1244 },
1245 ],
1246 options: vec![],
1247 output_type: None,
1248 #[allow(deprecated)]
1249 args: vec![],
1250 })),
1251 };
1252
1253 let filter_rel = FilterRel {
1254 common: None,
1255 input: Some(Box::new(Rel {
1256 rel_type: Some(RelType::Read(Box::new(read_rel))),
1257 })),
1258 condition: Some(Box::new(filter_expr)),
1259 advanced_extension: None,
1260 };
1261
1262 let rel = Rel {
1263 rel_type: Some(RelType::Filter(Box::new(filter_rel))),
1264 };
1265
1266 let (result, errors) = ctx.textify(&rel);
1267 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1268 let expected = r#"
1269Filter[gt($0, 10:i32) => $0, $1]
1270 Read[test_table => col1:i32?, col2:i32?]"#
1271 .trim_start();
1272 assert_eq!(result, expected);
1273 }
1274
1275 #[test]
1276 fn test_aggregate_function_textify() {
1277 let ctx = TestContext::new()
1278 .with_urn(1, "https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate.yaml")
1279 .with_function(1, 10, "sum")
1280 .with_function(1, 11, "count");
1281
1282 let agg_fn = get_aggregate_func(10, 1);
1284
1285 let value = Value::AggregateFunction(&agg_fn);
1286 let (result, errors) = ctx.textify(&value);
1287
1288 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1289 assert_eq!(result, "sum($1)");
1290 }
1291
1292 #[test]
1293 fn test_aggregate_relation_textify() {
1294 let ctx = TestContext::new()
1295 .with_urn(1, "https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate.yaml")
1296 .with_function(1, 10, "sum")
1297 .with_function(1, 11, "count");
1298
1299 let agg_fn1 = get_aggregate_func(10, 1);
1301 let agg_fn2 = get_aggregate_func(11, 1);
1302
1303 let grouping_expressions = vec![Expression {
1304 rex_type: Some(RexType::Selection(Box::new(
1305 FieldIndex(0).to_field_reference(),
1306 ))),
1307 }];
1308
1309 let measures = vec![
1310 aggregate_rel::Measure {
1311 measure: Some(agg_fn1),
1312 filter: None,
1313 },
1314 aggregate_rel::Measure {
1315 measure: Some(agg_fn2),
1316 filter: None,
1317 },
1318 ];
1319
1320 let common = Some(RelCommon {
1321 emit_kind: Some(EmitKind::Emit(Emit {
1322 output_mapping: vec![1, 2], })),
1324 ..Default::default()
1325 });
1326
1327 let aggregate_rel = create_aggregate_rel(grouping_expressions, vec![], measures, common);
1328
1329 let rel = Rel {
1330 rel_type: Some(RelType::Aggregate(Box::new(aggregate_rel))),
1331 };
1332 let (result, errors) = ctx.textify(&rel);
1333
1334 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1335 assert!(result.contains("Aggregate[_ => sum($1), count($1)]"));
1337 }
1338
1339 #[test]
1340 fn test_multiple_groupings_on_aggregate_deprecated() {
1341 let ctx = TestContext::new()
1344 .with_urn(1, "https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate.yaml")
1345 .with_function(1, 11, "count");
1346
1347 let grouping_expr_0 = create_exp(0);
1348 let grouping_expr_1 = create_exp(1);
1349
1350 let grouping_sets = vec![
1351 aggregate_rel::Grouping {
1352 #[allow(deprecated)]
1353 grouping_expressions: vec![grouping_expr_0.clone()],
1354 expression_references: vec![],
1355 },
1356 aggregate_rel::Grouping {
1357 #[allow(deprecated)]
1358 grouping_expressions: vec![grouping_expr_0.clone(), grouping_expr_1.clone()],
1359 expression_references: vec![],
1360 },
1361 ];
1362
1363 let aggregate_rel = create_aggregate_rel(vec![], grouping_sets, vec![], None);
1364
1365 let rel = Rel {
1366 rel_type: Some(RelType::Aggregate(Box::new(aggregate_rel))),
1367 };
1368 let (result, errors) = ctx.textify(&rel);
1369
1370 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1371 assert!(result.contains("Aggregate[($0), ($0, $1) => $0, $1]"));
1372 }
1373
1374 #[test]
1375 fn test_multiple_groupings_with_measure_deprecated() {
1376 let ctx = TestContext::new()
1379 .with_urn(1, "https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate.yaml")
1380 .with_function(1, 11, "count");
1381
1382 let agg_fn1 = get_aggregate_func(11, 2);
1383
1384 let grouping_expr_0 = create_exp(0);
1385 let grouping_expr_1 = create_exp(1);
1386
1387 let grouping_sets = vec![
1388 aggregate_rel::Grouping {
1389 #[allow(deprecated)]
1390 grouping_expressions: vec![grouping_expr_0.clone()],
1391 expression_references: vec![],
1392 },
1393 aggregate_rel::Grouping {
1394 #[allow(deprecated)]
1395 grouping_expressions: vec![grouping_expr_0.clone(), grouping_expr_1.clone()],
1396 expression_references: vec![],
1397 },
1398 ];
1399
1400 let measures = vec![aggregate_rel::Measure {
1401 measure: Some(agg_fn1),
1402 filter: None,
1403 }];
1404
1405 let aggregate_rel = create_aggregate_rel(vec![], grouping_sets, measures, None);
1406
1407 let rel = Rel {
1408 rel_type: Some(RelType::Aggregate(Box::new(aggregate_rel))),
1409 };
1410 let (result, errors) = ctx.textify(&rel);
1411
1412 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1413 assert!(result.contains("($0), ($0, $1) => $0, $1, count($2)"));
1414 }
1415
1416 #[test]
1417 fn test_multiple_groupings_on_aggregate() {
1418 let ctx = TestContext::new()
1419 .with_urn(1, "https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate.yaml")
1420 .with_function(1, 11, "count");
1421
1422 let agg_fn2 = get_aggregate_func(11, 2);
1423
1424 let grouping_expressions = vec![
1425 Expression {
1426 rex_type: Some(RexType::Selection(Box::new(
1427 FieldIndex(0).to_field_reference(),
1428 ))),
1429 },
1430 Expression {
1431 rex_type: Some(RexType::Selection(Box::new(
1432 FieldIndex(1).to_field_reference(),
1433 ))),
1434 },
1435 ];
1436
1437 let grouping_sets = vec![
1438 Grouping {
1439 #[allow(deprecated)]
1440 grouping_expressions: vec![],
1441 expression_references: vec![0, 1],
1442 },
1443 Grouping {
1444 #[allow(deprecated)]
1445 grouping_expressions: vec![],
1446 expression_references: vec![0, 1],
1447 },
1448 Grouping {
1449 #[allow(deprecated)]
1450 grouping_expressions: vec![],
1451 expression_references: vec![1],
1452 },
1453 Grouping {
1454 #[allow(deprecated)]
1455 grouping_expressions: vec![],
1456 expression_references: vec![1, 1],
1457 },
1458 Grouping {
1459 #[allow(deprecated)]
1460 grouping_expressions: vec![],
1461 expression_references: vec![],
1462 },
1463 ];
1464
1465 let measures = vec![aggregate_rel::Measure {
1466 measure: Some(agg_fn2),
1467 filter: None,
1468 }];
1469
1470 let aggregate_rel =
1471 create_aggregate_rel(grouping_expressions, grouping_sets, measures, None);
1472
1473 let rel = Rel {
1474 rel_type: Some(RelType::Aggregate(Box::new(aggregate_rel))),
1475 };
1476 let (result, errors) = ctx.textify(&rel);
1477
1478 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1479 assert!(
1480 result
1481 .contains("Aggregate[($0, $1), ($0, $1), ($1), ($1, $1), _ => $0, $1, count($2)]")
1482 );
1483 }
1484
1485 #[test]
1486 fn test_arguments_textify_positional_only() {
1487 let ctx = TestContext::new();
1488 let args = Arguments {
1489 positional: vec![Value::Integer(42), Value::Integer(7)],
1490 named: vec![],
1491 };
1492 let (result, errors) = ctx.textify(&args);
1493 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1494 assert_eq!(result, "42, 7");
1495 }
1496
1497 #[test]
1498 fn test_arguments_textify_named_only() {
1499 let ctx = TestContext::new();
1500 let args = Arguments {
1501 positional: vec![],
1502 named: vec![
1503 NamedArg {
1504 name: Cow::Borrowed("limit"),
1505 value: Value::Integer(10),
1506 },
1507 NamedArg {
1508 name: Cow::Borrowed("offset"),
1509 value: Value::Integer(5),
1510 },
1511 ],
1512 };
1513 let (result, errors) = ctx.textify(&args);
1514 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1515 assert_eq!(result, "limit=10, offset=5");
1516 }
1517
1518 #[test]
1519 fn test_join_relation_unknown_type() {
1520 let ctx = TestContext::new();
1521
1522 let join_rel = JoinRel {
1524 left: Some(Box::new(Rel {
1525 rel_type: Some(RelType::Read(Box::default())),
1526 })),
1527 right: Some(Box::new(Rel {
1528 rel_type: Some(RelType::Read(Box::default())),
1529 })),
1530 expression: Some(Box::new(Expression::default())),
1531 r#type: 999, common: None,
1533 post_join_filter: None,
1534 advanced_extension: None,
1535 };
1536
1537 let rel = Rel {
1538 rel_type: Some(RelType::Join(Box::new(join_rel))),
1539 };
1540 let (result, errors) = ctx.textify(&rel);
1541
1542 assert!(!errors.is_empty(), "Expected errors for unknown join type");
1544 assert!(
1545 result.contains("!{JoinRel}"),
1546 "Expected error token for unknown join type"
1547 );
1548 assert!(
1549 result.contains("Join["),
1550 "Expected Join relation to be formatted"
1551 );
1552 }
1553
1554 #[test]
1555 fn test_arguments_textify_both() {
1556 let ctx = TestContext::new();
1557 let args = Arguments {
1558 positional: vec![Value::Integer(1)],
1559 named: vec![NamedArg {
1560 name: "foo".into(),
1561 value: Value::Integer(2),
1562 }],
1563 };
1564 let (result, errors) = ctx.textify(&args);
1565 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1566 assert_eq!(result, "1, foo=2");
1567 }
1568
1569 #[test]
1570 fn test_arguments_textify_empty() {
1571 let ctx = TestContext::new();
1572 let args = Arguments {
1573 positional: vec![],
1574 named: vec![],
1575 };
1576 let (result, errors) = ctx.textify(&args);
1577 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1578 assert_eq!(result, "_");
1579 }
1580
1581 #[test]
1582 fn test_named_arg_textify_error_token() {
1583 let ctx = TestContext::new();
1584 let named_arg = NamedArg {
1585 name: "foo".into(),
1586 value: Value::Missing(PlanError::invalid(
1587 "my_enum",
1588 Some(Cow::Borrowed("my_enum")),
1589 Cow::Borrowed("my_enum"),
1590 )),
1591 };
1592 let (result, errors) = ctx.textify(&named_arg);
1593 assert!(result.contains("foo=!{my_enum}"), "Output: {result}");
1595 assert!(!errors.is_empty(), "Expected error for error token");
1597 }
1598
1599 #[test]
1600 fn test_join_type_enum_textify() {
1601 assert_eq!(join_rel::JoinType::Inner.as_enum_str().unwrap(), "Inner");
1603 assert_eq!(join_rel::JoinType::Left.as_enum_str().unwrap(), "Left");
1604 assert_eq!(
1605 join_rel::JoinType::LeftSemi.as_enum_str().unwrap(),
1606 "LeftSemi"
1607 );
1608 assert_eq!(
1609 join_rel::JoinType::LeftAnti.as_enum_str().unwrap(),
1610 "LeftAnti"
1611 );
1612 }
1613
1614 #[test]
1615 fn test_join_output_columns() {
1616 let inner_cols = super::join_output_columns(join_rel::JoinType::Inner, 2, 3);
1618 assert_eq!(inner_cols.len(), 5); assert!(matches!(inner_cols[0], Value::Reference(0)));
1620 assert!(matches!(inner_cols[4], Value::Reference(4)));
1621
1622 let left_semi_cols = super::join_output_columns(join_rel::JoinType::LeftSemi, 2, 3);
1624 assert_eq!(left_semi_cols.len(), 2); assert!(matches!(left_semi_cols[0], Value::Reference(0)));
1626 assert!(matches!(left_semi_cols[1], Value::Reference(1)));
1627
1628 let right_semi_cols = super::join_output_columns(join_rel::JoinType::RightSemi, 2, 3);
1630 assert_eq!(right_semi_cols.len(), 3); assert!(matches!(right_semi_cols[0], Value::Reference(0))); assert!(matches!(right_semi_cols[1], Value::Reference(1)));
1633 assert!(matches!(right_semi_cols[2], Value::Reference(2))); let left_mark_cols = super::join_output_columns(join_rel::JoinType::LeftMark, 2, 3);
1637 assert_eq!(left_mark_cols.len(), 3); assert!(matches!(left_mark_cols[0], Value::Reference(0)));
1639 assert!(matches!(left_mark_cols[1], Value::Reference(1)));
1640 assert!(matches!(left_mark_cols[2], Value::Reference(2))); let right_mark_cols = super::join_output_columns(join_rel::JoinType::RightMark, 2, 3);
1644 assert_eq!(right_mark_cols.len(), 4); assert!(matches!(right_mark_cols[0], Value::Reference(0))); assert!(matches!(right_mark_cols[1], Value::Reference(1)));
1647 assert!(matches!(right_mark_cols[2], Value::Reference(2))); assert!(matches!(right_mark_cols[3], Value::Reference(3))); }
1650
1651 fn get_aggregate_func(func_ref: u32, column_ind: i32) -> AggregateFunction {
1652 AggregateFunction {
1653 function_reference: func_ref,
1654 arguments: vec![FunctionArgument {
1655 arg_type: Some(ArgType::Value(Expression {
1656 rex_type: Some(RexType::Selection(Box::new(
1657 FieldIndex(column_ind).to_field_reference(),
1658 ))),
1659 })),
1660 }],
1661 options: vec![],
1662 output_type: None,
1663 invocation: 0,
1664 phase: 0,
1665 sorts: vec![],
1666 #[allow(deprecated)]
1667 args: vec![],
1668 }
1669 }
1670
1671 fn create_aggregate_rel(
1672 grouping_expressions: Vec<Expression>,
1673 grouping_sets: Vec<Grouping>,
1674 measures: Vec<aggregate_rel::Measure>,
1675 common: Option<RelCommon>,
1676 ) -> AggregateRel {
1677 let common = common.or_else(|| {
1678 Some(RelCommon {
1679 emit_kind: Some(EmitKind::Direct(Direct {})),
1680 ..Default::default()
1681 })
1682 });
1683 AggregateRel {
1684 input: Some(Box::new(Rel {
1685 rel_type: Some(RelType::Read(Box::new(ReadRel {
1686 common: None,
1687 base_schema: Some(get_basic_schema()),
1688 filter: None,
1689 best_effort_filter: None,
1690 projection: None,
1691 advanced_extension: None,
1692 read_type: Some(ReadType::NamedTable(NamedTable {
1693 names: vec!["orders".into()],
1694 advanced_extension: None,
1695 })),
1696 }))),
1697 })),
1698 grouping_expressions,
1699 groupings: grouping_sets,
1700 measures,
1701 common,
1702 advanced_extension: None,
1703 }
1704 }
1705
1706 fn get_basic_schema() -> NamedStruct {
1707 NamedStruct {
1708 names: vec!["category".into(), "amount".into(), "value".into()],
1709 r#struct: Some(Struct {
1710 type_variation_reference: 0,
1711 types: vec![
1712 Type {
1713 kind: Some(Kind::String(ptype::String {
1714 type_variation_reference: 0,
1715 nullability: Nullability::Nullable as i32,
1716 })),
1717 },
1718 Type {
1719 kind: Some(Kind::Fp64(ptype::Fp64 {
1720 type_variation_reference: 0,
1721 nullability: Nullability::Nullable as i32,
1722 })),
1723 },
1724 Type {
1725 kind: Some(Kind::I32(ptype::I32 {
1726 type_variation_reference: 0,
1727 nullability: Nullability::Nullable as i32,
1728 })),
1729 },
1730 ],
1731 nullability: Nullability::Nullable as i32,
1732 }),
1733 }
1734 }
1735
1736 fn create_exp(column_ind: i32) -> Expression {
1737 Expression {
1738 rex_type: Some(RexType::Selection(Box::new(
1739 FieldIndex(column_ind).to_field_reference(),
1740 ))),
1741 }
1742 }
1743
1744 #[test]
1745 fn test_unsupported_rel_type_produces_failure_token() {
1746 use substrait::proto::CrossRel;
1747
1748 let ctx = TestContext::new();
1749
1750 let rel = Rel {
1754 rel_type: Some(RelType::Cross(Box::new(CrossRel {
1755 common: None,
1756 left: None,
1757 right: None,
1758 advanced_extension: None,
1759 }))),
1760 };
1761
1762 let (result, errors) = ctx.textify(&rel);
1763
1764 assert!(
1766 result.contains("!{Rel}"),
1767 "Expected '!{{Rel}}' in output, got: {result}"
1768 );
1769
1770 assert_eq!(errors.0.len(), 1, "Expected exactly one error: {errors:?}");
1772
1773 match &errors.0[0] {
1775 FormatError::Format(plan_err) => {
1776 assert_eq!(plan_err.message, "Rel");
1777 assert_eq!(
1778 plan_err.error_type,
1779 crate::textify::foundation::FormatErrorType::Unimplemented
1780 );
1781 assert!(
1782 plan_err.lookup.as_deref().unwrap_or("").contains("Cross"),
1783 "Expected lookup to mention 'Cross', got: {:?}",
1784 plan_err.lookup
1785 );
1786 }
1787 other => panic!("Expected FormatError::Format, got: {other:?}"),
1788 }
1789 }
1790}