1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::convert::TryFrom;
4use std::fmt;
5use std::fmt::Debug;
6
7use prost::{Message, UnknownEnumValue};
8use substrait::proto::fetch_rel::CountMode;
9use substrait::proto::plan_rel::RelType as PlanRelType;
10use substrait::proto::read_rel::ReadType;
11use substrait::proto::rel::RelType;
12use substrait::proto::rel_common::EmitKind;
13use substrait::proto::sort_field::{SortDirection, SortKind};
14use substrait::proto::{
15 AggregateFunction, AggregateRel, Expression, ExtensionLeafRel, ExtensionMultiRel,
16 ExtensionSingleRel, FetchRel, FilterRel, JoinRel, NamedStruct, PlanRel, ProjectRel, ReadRel,
17 Rel, RelCommon, RelRoot, SortField, SortRel, Type, join_rel,
18};
19
20use super::addenda::AddendumLines;
21use super::expressions::Reference;
22use super::types::Name;
23use super::{PlanError, Scope, Textify};
24use crate::FormatError;
25use crate::extensions::any::AnyRef;
26use crate::extensions::{ExtensionArgs, ExtensionColumn, ExtensionError, ExtensionValue};
27
28pub trait NamedRelation {
29 fn name(&self) -> &'static str;
30}
31
32impl NamedRelation for Rel {
33 fn name(&self) -> &'static str {
34 match self.rel_type.as_ref() {
35 None => "UnknownRel",
36 Some(RelType::Read(_)) => "Read",
37 Some(RelType::Filter(_)) => "Filter",
38 Some(RelType::Project(_)) => "Project",
39 Some(RelType::Fetch(_)) => "Fetch",
40 Some(RelType::Aggregate(_)) => "Aggregate",
41 Some(RelType::Sort(_)) => "Sort",
42 Some(RelType::HashJoin(_)) => "HashJoin",
43 Some(RelType::Exchange(_)) => "Exchange",
44 Some(RelType::Join(_)) => "Join",
45 Some(RelType::Set(_)) => "Set",
46 Some(RelType::ExtensionLeaf(_)) => "ExtensionLeaf",
47 Some(RelType::Cross(_)) => "Cross",
48 Some(RelType::Reference(_)) => "Reference",
49 Some(RelType::ExtensionSingle(_)) => "ExtensionSingle",
50 Some(RelType::ExtensionMulti(_)) => "ExtensionMulti",
51 Some(RelType::Write(_)) => "Write",
52 Some(RelType::Ddl(_)) => "Ddl",
53 Some(RelType::Update(_)) => "Update",
54 Some(RelType::MergeJoin(_)) => "MergeJoin",
55 Some(RelType::NestedLoopJoin(_)) => "NestedLoopJoin",
56 Some(RelType::Window(_)) => "Window",
57 Some(RelType::Expand(_)) => "Expand",
58 }
59 }
60}
61
62impl Textify for Rel {
63 fn name() -> &'static str {
64 "Rel"
65 }
66
67 fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
68 Relation::from_rel(self, ctx).textify(ctx, w)
71 }
72}
73
74pub trait ValueEnum {
80 fn as_enum_str(&self) -> Result<Cow<'static, str>, PlanError>;
81}
82
83#[derive(Debug, Clone)]
84pub struct NamedArg<'a> {
85 pub name: Cow<'a, str>,
86 pub value: Value<'a>,
87}
88
89#[derive(Debug, Clone)]
90pub enum Value<'a> {
91 TableName(Vec<Name<'a>>),
92 Field(Option<Name<'a>>, Option<&'a Type>),
93 Tuple(Vec<Value<'a>>),
94 Reference(i32),
95 Expression(&'a Expression),
96 AggregateFunction(&'a AggregateFunction),
97 Missing(PlanError),
99 Enum(Cow<'a, str>),
101 EmptyGroup,
102 Integer(i64),
103 ExtensionArgument(ExtensionValue),
105 ExtColumn(ExtensionColumn),
107}
108
109impl<'a> Value<'a> {
110 pub fn expect(maybe_value: Option<Self>, f: impl FnOnce() -> PlanError) -> Self {
111 match maybe_value {
112 Some(s) => s,
113 None => Value::Missing(f()),
114 }
115 }
116}
117
118impl<'a> From<Result<Vec<Name<'a>>, PlanError>> for Value<'a> {
119 fn from(token: Result<Vec<Name<'a>>, PlanError>) -> Self {
120 match token {
121 Ok(value) => Value::TableName(value),
122 Err(err) => Value::Missing(err),
123 }
124 }
125}
126
127impl<'a> Textify for Value<'a> {
128 fn name() -> &'static str {
129 "Value"
130 }
131
132 fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
133 match self {
134 Value::TableName(names) => write!(w, "{}", ctx.separated(names, ".")),
135 Value::Field(name, typ) => {
136 write!(w, "{}:{}", ctx.expect(name.as_ref()), ctx.expect(*typ))
137 }
138 Value::Tuple(values) => write!(w, "({})", ctx.separated(values, ", ")),
139 Value::Reference(i) => write!(w, "{}", Reference(*i)),
140 Value::Expression(e) => write!(w, "{}", ctx.display(*e)),
141 Value::AggregateFunction(agg_fn) => agg_fn.textify(ctx, w),
142 Value::Missing(err) => write!(w, "{}", ctx.failure(err.clone())),
143 Value::Enum(res) => write!(w, "&{res}"),
144 Value::Integer(i) => write!(w, "{i}"),
145 Value::EmptyGroup => write!(w, "_"),
146 Value::ExtensionArgument(ev) => ev.textify(ctx, w),
147 Value::ExtColumn(ec) => ec.textify(ctx, w),
148 }
149 }
150}
151
152fn schema_to_values<'a>(schema: &'a NamedStruct) -> Vec<Value<'a>> {
153 let mut fields = schema
154 .r#struct
155 .as_ref()
156 .map(|s| s.types.iter())
157 .into_iter()
158 .flatten();
159 let mut names = schema.names.iter();
160
161 let mut values = Vec::new();
165 loop {
166 let field = fields.next();
167 let name = names.next().map(|n| Name(n));
168 if field.is_none() && name.is_none() {
169 break;
170 }
171
172 values.push(Value::Field(name, field));
173 }
174
175 values
176}
177
178struct Emitted<'a> {
179 pub values: &'a [Value<'a>],
180 pub emit: Option<&'a EmitKind>,
181}
182
183impl<'a> Emitted<'a> {
184 pub fn new(values: &'a [Value<'a>], emit: Option<&'a EmitKind>) -> Self {
185 Self { values, emit }
186 }
187
188 pub fn write_direct<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
189 write!(w, "{}", ctx.separated(self.values.iter(), ", "))
190 }
191}
192
193impl<'a> Textify for Emitted<'a> {
194 fn name() -> &'static str {
195 "Emitted"
196 }
197
198 fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
199 if ctx.options().show_emit {
200 return self.write_direct(ctx, w);
201 }
202
203 let indices = match &self.emit {
204 Some(EmitKind::Emit(e)) => &e.output_mapping,
205 Some(EmitKind::Direct(_)) => return self.write_direct(ctx, w),
206 None => return self.write_direct(ctx, w),
207 };
208
209 for (i, &index) in indices.iter().enumerate() {
210 if i > 0 {
211 write!(w, ", ")?;
212 }
213
214 match self.values.get(index as usize) {
215 Some(value) => write!(w, "{}", ctx.display(value))?,
216 None => write!(w, "{}", ctx.failure(PlanError::invalid(
217 "Emitted",
218 Some("output_mapping"),
219 format!(
220 "Output mapping index {} is out of bounds for values collection of size {}",
221 index, self.values.len()
222 )
223 )))?,
224 }
225 }
226
227 Ok(())
228 }
229}
230
231#[derive(Debug, Clone)]
232pub struct Arguments<'a> {
233 pub positional: Vec<Value<'a>>,
235 pub named: Vec<NamedArg<'a>>,
237}
238
239impl<'a> Textify for Arguments<'a> {
240 fn name() -> &'static str {
241 "Arguments"
242 }
243 fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
244 if self.positional.is_empty() && self.named.is_empty() {
245 return write!(w, "_");
246 }
247
248 write!(w, "{}", ctx.separated(self.positional.iter(), ", "))?;
249 if !self.positional.is_empty() && !self.named.is_empty() {
250 write!(w, ", ")?;
251 }
252 write!(w, "{}", ctx.separated(self.named.iter(), ", "))
253 }
254}
255
256pub struct Relation<'a> {
257 pub name: Cow<'a, str>,
258 pub arguments: Option<Arguments<'a>>,
267 pub columns: Vec<Value<'a>>,
270 pub emit: Option<&'a EmitKind>,
272 addenda: AddendumLines,
277 pub children: Vec<Option<Relation<'a>>>,
279}
280
281impl Textify for Relation<'_> {
282 fn name() -> &'static str {
283 "Relation"
284 }
285
286 fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
287 self.write_header(ctx, w)?;
288 let child_scope = ctx.push_indent();
289 self.addenda.textify(&child_scope, w)?;
290 self.write_children(ctx, w)?;
291 Ok(())
292 }
293}
294
295impl Relation<'_> {
296 pub fn write_header<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
300 let cols = Emitted::new(&self.columns, self.emit);
301 let indent = ctx.indent();
302 let name = &self.name;
303 let cols = ctx.display(&cols);
304 match &self.arguments {
305 None => {
306 write!(w, "{indent}{name}[{cols}]")
307 }
308 Some(args) => {
309 let args = ctx.display(args);
310 write!(w, "{indent}{name}[{args} => {cols}]")
311 }
312 }
313 }
314
315 pub fn write_children<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
318 let child_scope = ctx.push_indent();
319 for child in self.children.iter().flatten() {
320 writeln!(w)?;
321 child.textify(&child_scope, w)?;
322 }
323 Ok(())
324 }
325}
326
327impl<'a> Relation<'a> {
328 pub fn emitted(&self) -> usize {
329 match self.emit {
330 Some(EmitKind::Emit(e)) => e.output_mapping.len(),
331 Some(EmitKind::Direct(_)) => self.columns.len(),
332 None => self.columns.len(),
333 }
334 }
335}
336
337impl<'a> Relation<'a> {
338 fn from_read<S: Scope>(rel: &'a ReadRel, ctx: &S) -> Self {
339 let columns = read_columns(rel);
340 let emit = rel.common.as_ref().and_then(|c| c.emit_kind.as_ref());
341
342 match &rel.read_type {
343 Some(ReadType::NamedTable(table)) => {
344 let table_name = Value::TableName(table.names.iter().map(|n| Name(n)).collect());
345 Relation {
346 name: Cow::Borrowed("Read"),
347 arguments: Some(Arguments {
348 positional: vec![table_name],
349 named: vec![],
350 }),
351 columns,
352 emit,
353 addenda: AddendumLines::from_advanced_extension(
354 ctx,
355 rel.advanced_extension.as_ref(),
356 ),
357 children: vec![],
358 }
359 }
360 Some(ReadType::VirtualTable(vt)) => {
361 let positional = vt
362 .expressions
363 .iter()
364 .map(|row| Value::Tuple(row.fields.iter().map(Value::Expression).collect()))
365 .collect();
366
367 Relation {
368 name: Cow::Borrowed("Read:Virtual"),
369 arguments: Some(Arguments {
370 positional,
371 named: vec![],
372 }),
373 columns,
374 emit,
375 addenda: AddendumLines::from_advanced_extension(
376 ctx,
377 rel.advanced_extension.as_ref(),
378 ),
379 children: vec![],
380 }
381 }
382 Some(ReadType::ExtensionTable(table)) => {
383 let decoded = match table.detail.as_ref().map(AnyRef::from) {
384 Some(detail) => ctx.extension_registry().decode_extension_table(detail),
385 None => Err(ExtensionError::MissingDetail),
386 };
387
388 Relation {
389 name: Cow::Borrowed("Read:Extension"),
390 arguments: None,
391 columns,
392 emit,
393 addenda: AddendumLines::extension_table(
394 ctx,
395 decoded,
396 rel.advanced_extension.as_ref(),
397 ),
398 children: vec![],
399 }
400 }
401 other => {
402 let err = PlanError::unimplemented(
403 "ReadRel",
404 Some("read_type"),
405 format!("Unsupported read type {other:?}"),
406 );
407 Relation {
408 name: Cow::Borrowed("Read"),
409 arguments: Some(Arguments {
410 positional: vec![Value::Missing(err)],
411 named: vec![],
412 }),
413 columns,
414 emit,
415 addenda: AddendumLines::from_advanced_extension(
416 ctx,
417 rel.advanced_extension.as_ref(),
418 ),
419 children: vec![],
420 }
421 }
422 }
423 }
424}
425
426fn read_columns<'a>(rel: &'a ReadRel) -> Vec<Value<'a>> {
427 match rel.base_schema {
428 Some(ref schema) => schema_to_values(schema),
429 None => {
430 let err =
431 PlanError::unimplemented("ReadRel", Some("base_schema"), "Base schema is required");
432 vec![Value::Missing(err)]
433 }
434 }
435}
436
437pub fn get_emit(rel: Option<&RelCommon>) -> Option<&EmitKind> {
438 rel.as_ref().and_then(|c| c.emit_kind.as_ref())
439}
440
441impl<'a> Relation<'a> {
442 pub fn convert_children<S: Scope>(
446 refs: Vec<Option<&'a Rel>>,
447 ctx: &S,
448 ) -> (Vec<Option<Relation<'a>>>, usize) {
449 let mut children = vec![];
450 let mut inputs = 0;
451
452 for maybe_rel in refs {
453 match maybe_rel {
454 Some(rel) => {
455 let child = Relation::from_rel(rel, ctx);
456 inputs += child.emitted();
457 children.push(Some(child));
458 }
459 None => children.push(None),
460 }
461 }
462
463 (children, inputs)
464 }
465}
466
467impl<'a> Relation<'a> {
468 fn from_filter<S: Scope>(rel: &'a FilterRel, ctx: &S) -> Self {
469 let condition = rel
470 .condition
471 .as_ref()
472 .map(|c| Value::Expression(c.as_ref()));
473 let condition = Value::expect(condition, || {
474 PlanError::unimplemented("FilterRel", Some("condition"), "Condition is None")
475 });
476 let positional = vec![condition];
477 let arguments = Some(Arguments {
478 positional,
479 named: vec![],
480 });
481 let emit = get_emit(rel.common.as_ref());
482 let (children, columns) = Relation::convert_children(vec![rel.input.as_deref()], ctx);
483 let columns = (0..columns).map(|i| Value::Reference(i as i32)).collect();
484
485 Relation {
486 name: Cow::Borrowed("Filter"),
487 arguments,
488 columns,
489 emit,
490 addenda: AddendumLines::from_advanced_extension(ctx, rel.advanced_extension.as_ref()),
491 children,
492 }
493 }
494
495 fn from_project<S: Scope>(rel: &'a ProjectRel, ctx: &S) -> Self {
496 let (children, input_columns) = Relation::convert_children(vec![rel.input.as_deref()], ctx);
497 let mut columns: Vec<Value> = vec![];
498 for i in 0..input_columns {
499 columns.push(Value::Reference(i as i32));
500 }
501 for expr in &rel.expressions {
502 columns.push(Value::Expression(expr));
503 }
504
505 Relation {
506 name: Cow::Borrowed("Project"),
507 arguments: None,
508 columns,
509 emit: get_emit(rel.common.as_ref()),
510 addenda: AddendumLines::from_advanced_extension(ctx, rel.advanced_extension.as_ref()),
511 children,
512 }
513 }
514
515 pub fn from_rel<S: Scope>(rel: &'a Rel, ctx: &S) -> Self {
516 match rel.rel_type.as_ref() {
517 Some(RelType::Read(r)) => Relation::from_read(r, ctx),
518 Some(RelType::Filter(r)) => Relation::from_filter(r, ctx),
519 Some(RelType::Project(r)) => Relation::from_project(r, ctx),
520 Some(RelType::Aggregate(r)) => Relation::from_aggregate(r, ctx),
521 Some(RelType::Sort(r)) => Relation::from_sort(r, ctx),
522 Some(RelType::Fetch(r)) => Relation::from_fetch(r, ctx),
523 Some(RelType::Join(r)) => Relation::from_join(r, ctx),
524 Some(RelType::ExtensionLeaf(r)) => Relation::from_extension_leaf(r, ctx),
525 Some(RelType::ExtensionSingle(r)) => Relation::from_extension_single(r, ctx),
526 Some(RelType::ExtensionMulti(r)) => Relation::from_extension_multi(r, ctx),
527 _ => {
528 let name = rel.name();
529 let token = ctx.failure(FormatError::Format(PlanError::unimplemented(
530 "Rel",
531 Some(name),
532 format!("{name} is not yet supported in the text format"),
533 )));
534 Relation {
535 name: Cow::Owned(format!("{token}")),
536 arguments: None,
537 columns: vec![],
538 emit: None,
539 addenda: AddendumLines::none(),
540 children: vec![],
541 }
542 }
543 }
544 }
545
546 fn from_extension_leaf<S: Scope>(rel: &'a ExtensionLeafRel, ctx: &S) -> Self {
547 let detail_ref = rel.detail.as_ref().map(AnyRef::from);
548 let decoded = match detail_ref {
549 Some(d) => ctx.extension_registry().decode(d),
550 None => Err(ExtensionError::MissingDetail),
551 };
552 Relation::from_extension("ExtensionLeaf", decoded, vec![], ctx)
553 }
554
555 fn from_extension_single<S: Scope>(rel: &'a ExtensionSingleRel, ctx: &S) -> Self {
556 let detail_ref = rel.detail.as_ref().map(AnyRef::from);
557 let decoded = match detail_ref {
558 Some(d) => ctx.extension_registry().decode(d),
559 None => Err(ExtensionError::MissingDetail),
560 };
561 Relation::from_extension("ExtensionSingle", decoded, vec![rel.input.as_deref()], ctx)
562 }
563
564 fn from_extension_multi<S: Scope>(rel: &'a ExtensionMultiRel, ctx: &S) -> Self {
565 let detail_ref = rel.detail.as_ref().map(AnyRef::from);
566 let decoded = match detail_ref {
567 Some(d) => ctx.extension_registry().decode(d),
568 None => Err(ExtensionError::MissingDetail),
569 };
570 let mut child_refs: Vec<Option<&'a Rel>> = vec![];
571 for input in &rel.inputs {
572 child_refs.push(Some(input));
573 }
574 Relation::from_extension("ExtensionMulti", decoded, child_refs, ctx)
575 }
576
577 fn from_extension<S: Scope>(
578 ext_type: &'static str,
579 decoded: Result<(String, ExtensionArgs), ExtensionError>,
580 child_refs: Vec<Option<&'a Rel>>,
581 ctx: &S,
582 ) -> Self {
583 match decoded {
584 Ok((name, args)) => {
585 let (children, _) = Relation::convert_children(child_refs, ctx);
586 let mut positional = vec![];
587 for value in args.positional {
588 positional.push(Value::ExtensionArgument(value));
589 }
590 let mut named = vec![];
591 for (key, value) in args.named {
592 named.push(NamedArg {
593 name: Cow::Owned(key),
594 value: Value::ExtensionArgument(value),
595 });
596 }
597 let mut columns = vec![];
598 for col in args.output_columns {
599 columns.push(Value::ExtColumn(col));
600 }
601 Relation {
602 name: Cow::Owned(format!("{}:{}", ext_type, name)),
603 arguments: Some(Arguments { positional, named }),
604 columns,
605 emit: None,
606 addenda: AddendumLines::none(),
610 children,
611 }
612 }
613 Err(error) => {
614 let (children, _) = Relation::convert_children(child_refs, ctx);
615 Relation {
616 name: Cow::Borrowed(ext_type),
617 arguments: None,
618 columns: vec![Value::Missing(PlanError::invalid(
619 "extension",
620 None::<&str>,
621 error.to_string(),
622 ))],
623 emit: None,
624 addenda: AddendumLines::none(),
625 children,
626 }
627 }
628 }
629 }
630
631 fn from_aggregate<S: Scope>(rel: &'a AggregateRel, ctx: &S) -> Self {
641 let mut grouping_sets: Vec<Vec<Value>> = vec![]; let expression_list: Vec<Value>; #[allow(deprecated)]
648 if rel.grouping_expressions.is_empty()
649 && !rel.groupings.is_empty()
650 && !rel.groupings[0].grouping_expressions.is_empty()
651 {
652 (expression_list, grouping_sets) = Relation::get_grouping_sets(rel);
653 } else {
654 expression_list = rel
655 .grouping_expressions
656 .iter()
657 .map(Value::Expression)
658 .collect::<Vec<_>>(); for group in &rel.groupings {
660 let mut grouping_set: Vec<Value> = vec![];
661 for i in &group.expression_references {
662 grouping_set.push(Value::Reference(*i as i32));
663 }
664 grouping_sets.push(grouping_set);
665 }
666 if rel.groupings.is_empty() {
668 grouping_sets.push(vec![]);
669 }
670 }
671
672 let is_single = grouping_sets.len() == 1;
673 let mut positional: Vec<Value> = vec![];
674 for g in grouping_sets {
675 if g.is_empty() {
676 positional.push(Value::EmptyGroup);
677 } else if is_single {
678 positional.extend(g);
680 } else {
681 positional.push(Value::Tuple(g));
682 }
683 }
684
685 let arguments = Some(Arguments {
687 positional,
688 named: vec![],
689 });
690
691 let mut all_outputs: Vec<Value> = expression_list;
693
694 for m in &rel.measures {
697 if let Some(agg_fn) = m.measure.as_ref() {
698 all_outputs.push(Value::AggregateFunction(agg_fn));
699 }
700 }
701 let emit = get_emit(rel.common.as_ref());
702 let (children, _) = Relation::convert_children(vec![rel.input.as_deref()], ctx);
703
704 Relation {
705 name: Cow::Borrowed("Aggregate"),
706 arguments,
707 columns: all_outputs,
708 emit,
709 addenda: AddendumLines::from_advanced_extension(ctx, rel.advanced_extension.as_ref()),
710 children,
711 }
712 }
713
714 fn get_grouping_sets(rel: &'a AggregateRel) -> (Vec<Value<'a>>, Vec<Vec<Value<'a>>>) {
715 let mut grouping_sets: Vec<Vec<Value>> = vec![];
716 let mut expression_list: Vec<Value> = Vec::new();
717
718 let mut expression_index_map = HashMap::new();
720 let mut i: i32 = 0; for group in &rel.groupings {
723 let mut grouping_set: Vec<Value> = vec![];
724 #[allow(deprecated)]
725 for exp in &group.grouping_expressions {
726 let key = exp.encode_to_vec();
730 expression_index_map.entry(key.clone()).or_insert_with(|| {
731 let value = Value::Expression(exp);
732 expression_list.push(value); let index = i;
735 i += 1;
736 index });
738 grouping_set.push(Value::Reference(expression_index_map[&key]));
739 }
740 grouping_sets.push(grouping_set);
741 }
742 (expression_list, grouping_sets)
743 }
744}
745
746impl Textify for RelRoot {
747 fn name() -> &'static str {
748 "RelRoot"
749 }
750
751 fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
752 let names = self.names.iter().map(|n| Name(n)).collect::<Vec<_>>();
753
754 write!(
755 w,
756 "{}Root[{}]",
757 ctx.indent(),
758 ctx.separated(names.iter(), ", ")
759 )?;
760 let child_scope = ctx.push_indent();
761 for child in self.input.iter() {
762 writeln!(w)?;
763 child.textify(&child_scope, w)?;
764 }
765
766 Ok(())
767 }
768}
769
770impl Textify for PlanRelType {
771 fn name() -> &'static str {
772 "PlanRelType"
773 }
774
775 fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
776 match self {
777 PlanRelType::Rel(rel) => rel.textify(ctx, w),
778 PlanRelType::Root(root) => root.textify(ctx, w),
779 }
780 }
781}
782
783impl Textify for PlanRel {
784 fn name() -> &'static str {
785 "PlanRel"
786 }
787
788 fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
791 write!(w, "{}", ctx.expect(self.rel_type.as_ref()))
792 }
793}
794
795impl<'a> Relation<'a> {
796 fn from_sort<S: Scope>(rel: &'a SortRel, ctx: &S) -> Self {
797 let (children, input_columns) = Relation::convert_children(vec![rel.input.as_deref()], ctx);
798 let mut positional = vec![];
799 for sort_field in &rel.sorts {
800 positional.push(Value::from(sort_field));
801 }
802 let arguments = Some(Arguments {
803 positional,
804 named: vec![],
805 });
806 let mut col_values = vec![];
808 for i in 0..input_columns {
809 col_values.push(Value::Reference(i as i32));
810 }
811 let emit = get_emit(rel.common.as_ref());
812 Relation {
813 name: Cow::Borrowed("Sort"),
814 arguments,
815 columns: col_values,
816 emit,
817 addenda: AddendumLines::from_advanced_extension(ctx, rel.advanced_extension.as_ref()),
818 children,
819 }
820 }
821
822 fn from_fetch<S: Scope>(rel: &'a FetchRel, ctx: &S) -> Self {
823 let (children, input_columns) = Relation::convert_children(vec![rel.input.as_deref()], ctx);
824 let mut named_args: Vec<NamedArg> = vec![];
825 match &rel.count_mode {
826 Some(CountMode::CountExpr(expr)) => {
827 named_args.push(NamedArg {
828 name: Cow::Borrowed("limit"),
829 value: Value::Expression(expr),
830 });
831 }
832 #[allow(deprecated)]
833 Some(CountMode::Count(val)) => {
834 named_args.push(NamedArg {
835 name: Cow::Borrowed("limit"),
836 value: Value::Integer(*val),
837 });
838 }
839 None => {}
840 }
841 if let Some(offset) = &rel.offset_mode {
842 match offset {
843 substrait::proto::fetch_rel::OffsetMode::OffsetExpr(expr) => {
844 named_args.push(NamedArg {
845 name: Cow::Borrowed("offset"),
846 value: Value::Expression(expr),
847 });
848 }
849 #[allow(deprecated)]
850 substrait::proto::fetch_rel::OffsetMode::Offset(val) => {
851 named_args.push(NamedArg {
852 name: Cow::Borrowed("offset"),
853 value: Value::Integer(*val),
854 });
855 }
856 }
857 }
858
859 let emit = get_emit(rel.common.as_ref());
860 let columns: Vec<Value> = (0..input_columns)
862 .map(|i| Value::Reference(i as i32))
863 .collect();
864 Relation {
865 name: Cow::Borrowed("Fetch"),
866 arguments: Some(Arguments {
867 positional: vec![],
868 named: named_args,
869 }),
870 columns,
871 emit,
872 addenda: AddendumLines::from_advanced_extension(ctx, rel.advanced_extension.as_ref()),
873 children,
874 }
875 }
876}
877
878fn join_output_columns(
879 join_type: join_rel::JoinType,
880 left_columns: usize,
881 right_columns: usize,
882) -> Vec<Value<'static>> {
883 let total_columns = match join_type {
884 join_rel::JoinType::Inner
886 | join_rel::JoinType::Left
887 | join_rel::JoinType::Right
888 | join_rel::JoinType::Outer => left_columns + right_columns,
889
890 join_rel::JoinType::LeftSemi | join_rel::JoinType::LeftAnti => left_columns,
892
893 join_rel::JoinType::RightSemi | join_rel::JoinType::RightAnti => right_columns,
895
896 join_rel::JoinType::LeftSingle => left_columns,
898 join_rel::JoinType::RightSingle => right_columns,
899
900 join_rel::JoinType::LeftMark => left_columns + 1,
902 join_rel::JoinType::RightMark => right_columns + 1,
903
904 join_rel::JoinType::Unspecified => left_columns + right_columns,
906 };
907
908 (0..total_columns)
910 .map(|i| Value::Reference(i as i32))
911 .collect()
912}
913
914impl<'a> Relation<'a> {
915 fn from_join<S: Scope>(rel: &'a JoinRel, ctx: &S) -> Self {
916 let (children, _total_columns) =
917 Relation::convert_children(vec![rel.left.as_deref(), rel.right.as_deref()], ctx);
918
919 assert_eq!(
921 children.len(),
922 2,
923 "convert_children should return same number of elements as input"
924 );
925
926 let left_columns = match &children[0] {
928 Some(child) => child.emitted(),
929 None => 0,
930 };
931 let right_columns = match &children[1] {
932 Some(child) => child.emitted(),
933 None => 0,
934 };
935
936 let (join_type, join_type_value) = match join_rel::JoinType::try_from(rel.r#type) {
939 Ok(join_type) => {
940 let join_type_value = match join_type.as_enum_str() {
941 Ok(s) => Value::Enum(s),
942 Err(e) => Value::Missing(e),
943 };
944 (join_type, join_type_value)
945 }
946 Err(_) => {
947 let join_type_error = Value::Missing(PlanError::invalid(
949 "JoinRel",
950 Some("type"),
951 format!("Unknown join type: {}", rel.r#type),
952 ));
953 (join_rel::JoinType::Unspecified, join_type_error)
954 }
955 };
956
957 let condition = rel
959 .expression
960 .as_ref()
961 .map(|c| Value::Expression(c.as_ref()));
962 let condition = Value::expect(condition, || {
963 PlanError::unimplemented("JoinRel", Some("expression"), "Join condition is None")
964 });
965
966 let positional = vec![join_type_value, condition];
970 let arguments = Some(Arguments {
971 positional,
972 named: vec![],
973 });
974
975 let emit = get_emit(rel.common.as_ref());
976 let columns = join_output_columns(join_type, left_columns, right_columns);
977
978 Relation {
979 name: Cow::Borrowed("Join"),
980 arguments,
981 columns,
982 emit,
983 addenda: AddendumLines::from_advanced_extension(ctx, rel.advanced_extension.as_ref()),
984 children,
985 }
986 }
987}
988
989impl<'a> From<&'a SortField> for Value<'a> {
990 fn from(sf: &'a SortField) -> Self {
991 let field = match &sf.expr {
992 Some(expr) => match &expr.rex_type {
993 Some(substrait::proto::expression::RexType::Selection(fref)) => {
994 if let Some(substrait::proto::expression::field_reference::ReferenceType::DirectReference(seg)) = &fref.reference_type {
995 if let Some(substrait::proto::expression::reference_segment::ReferenceType::StructField(sf)) = &seg.reference_type {
996 Value::Reference(sf.field)
997 } else { Value::Missing(PlanError::unimplemented("SortField", Some("expr"), "Not a struct field")) }
998 } else { Value::Missing(PlanError::unimplemented("SortField", Some("expr"), "Not a direct reference")) }
999 }
1000 _ => Value::Missing(PlanError::unimplemented(
1001 "SortField",
1002 Some("expr"),
1003 "Not a selection",
1004 )),
1005 },
1006 None => Value::Missing(PlanError::unimplemented(
1007 "SortField",
1008 Some("expr"),
1009 "Missing expr",
1010 )),
1011 };
1012 let direction = match &sf.sort_kind {
1013 Some(kind) => Value::from(kind),
1014 None => Value::Missing(PlanError::invalid(
1015 "SortKind",
1016 Some(Cow::Borrowed("sort_kind")),
1017 "Missing sort_kind",
1018 )),
1019 };
1020 Value::Tuple(vec![field, direction])
1021 }
1022}
1023
1024impl<'a, T: ValueEnum + ?Sized> From<&'a T> for Value<'a> {
1025 fn from(enum_val: &'a T) -> Self {
1026 match enum_val.as_enum_str() {
1027 Ok(s) => Value::Enum(s),
1028 Err(e) => Value::Missing(e),
1029 }
1030 }
1031}
1032
1033impl ValueEnum for SortKind {
1034 fn as_enum_str(&self) -> Result<Cow<'static, str>, PlanError> {
1035 let d = match self {
1036 &SortKind::Direction(d) => SortDirection::try_from(d),
1037 SortKind::ComparisonFunctionReference(f) => {
1038 return Err(PlanError::invalid(
1039 "SortKind",
1040 Some(Cow::Owned(format!("function reference{f}"))),
1041 "SortKind::ComparisonFunctionReference unimplemented",
1042 ));
1043 }
1044 };
1045 let s = match d {
1046 Err(UnknownEnumValue(d)) => {
1047 return Err(PlanError::invalid(
1048 "SortKind",
1049 Some(Cow::Owned(format!("unknown variant: {d:?}"))),
1050 "Unknown SortDirection",
1051 ));
1052 }
1053 Ok(SortDirection::AscNullsFirst) => "AscNullsFirst",
1054 Ok(SortDirection::AscNullsLast) => "AscNullsLast",
1055 Ok(SortDirection::DescNullsFirst) => "DescNullsFirst",
1056 Ok(SortDirection::DescNullsLast) => "DescNullsLast",
1057 Ok(SortDirection::Clustered) => "Clustered",
1058 Ok(SortDirection::Unspecified) => {
1059 return Err(PlanError::invalid(
1060 "SortKind",
1061 Option::<Cow<str>>::None,
1062 "Unspecified SortDirection",
1063 ));
1064 }
1065 };
1066 Ok(Cow::Borrowed(s))
1067 }
1068}
1069
1070impl ValueEnum for join_rel::JoinType {
1071 fn as_enum_str(&self) -> Result<Cow<'static, str>, PlanError> {
1072 let s = match self {
1073 join_rel::JoinType::Unspecified => {
1074 return Err(PlanError::invalid(
1075 "JoinType",
1076 Option::<Cow<str>>::None,
1077 "Unspecified JoinType",
1078 ));
1079 }
1080 join_rel::JoinType::Inner => "Inner",
1081 join_rel::JoinType::Outer => "Outer",
1082 join_rel::JoinType::Left => "Left",
1083 join_rel::JoinType::Right => "Right",
1084 join_rel::JoinType::LeftSemi => "LeftSemi",
1085 join_rel::JoinType::RightSemi => "RightSemi",
1086 join_rel::JoinType::LeftAnti => "LeftAnti",
1087 join_rel::JoinType::RightAnti => "RightAnti",
1088 join_rel::JoinType::LeftSingle => "LeftSingle",
1089 join_rel::JoinType::RightSingle => "RightSingle",
1090 join_rel::JoinType::LeftMark => "LeftMark",
1091 join_rel::JoinType::RightMark => "RightMark",
1092 };
1093 Ok(Cow::Borrowed(s))
1094 }
1095}
1096
1097impl<'a> Textify for NamedArg<'a> {
1098 fn name() -> &'static str {
1099 "NamedArg"
1100 }
1101 fn textify<S: Scope, W: fmt::Write>(&self, ctx: &S, w: &mut W) -> fmt::Result {
1102 write!(w, "{}=", self.name)?;
1103 self.value.textify(ctx, w)
1104 }
1105}
1106
1107#[cfg(test)]
1108mod tests {
1109 use substrait::proto::aggregate_rel::Grouping;
1110 use substrait::proto::expression::literal::LiteralType;
1111 use substrait::proto::expression::{Literal, RexType, ScalarFunction};
1112 use substrait::proto::function_argument::ArgType;
1113 use substrait::proto::read_rel::{NamedTable, ReadType};
1114 use substrait::proto::rel_common::{Direct, Emit};
1115 use substrait::proto::r#type::{self as ptype, Boolean, I64, Kind, Nullability, Struct};
1116 use substrait::proto::{
1117 Expression, FunctionArgument, NamedStruct, ReadRel, Type, aggregate_rel,
1118 };
1119
1120 use super::*;
1121 use crate::fixtures::TestContext;
1122 use crate::parser::expressions::FieldIndex;
1123
1124 #[test]
1125 fn test_read_rel() {
1126 let ctx = TestContext::new();
1127
1128 let read_rel = ReadRel {
1130 common: None,
1131 base_schema: Some(NamedStruct {
1132 names: vec!["col1".into(), "column 2".into()],
1133 r#struct: Some(Struct {
1134 type_variation_reference: 0,
1135 types: vec![
1136 Type {
1137 kind: Some(Kind::I32(ptype::I32 {
1138 type_variation_reference: 0,
1139 nullability: Nullability::Nullable as i32,
1140 })),
1141 },
1142 Type {
1143 kind: Some(Kind::String(ptype::String {
1144 type_variation_reference: 0,
1145 nullability: Nullability::Nullable as i32,
1146 })),
1147 },
1148 ],
1149 nullability: Nullability::Nullable as i32,
1150 }),
1151 }),
1152 filter: None,
1153 best_effort_filter: None,
1154 projection: None,
1155 advanced_extension: None,
1156 read_type: Some(ReadType::NamedTable(NamedTable {
1157 names: vec!["some_db".into(), "test_table".into()],
1158 advanced_extension: None,
1159 })),
1160 };
1161
1162 let rel = Rel {
1163 rel_type: Some(RelType::Read(Box::new(read_rel))),
1164 };
1165 let (result, errors) = ctx.textify(&rel);
1166 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1167 assert_eq!(
1168 result,
1169 "Read[some_db.test_table => col1:i32?, \"column 2\":string?]"
1170 );
1171 }
1172
1173 #[test]
1174 fn test_filter_rel() {
1175 let ctx = TestContext::new()
1176 .with_urn(1, "test_urn")
1177 .with_function(1, 10, "gt");
1178
1179 let read_rel = ReadRel {
1181 common: None,
1182 base_schema: Some(NamedStruct {
1183 names: vec!["col1".into(), "col2".into()],
1184 r#struct: Some(Struct {
1185 type_variation_reference: 0,
1186 types: vec![
1187 Type {
1188 kind: Some(Kind::I32(ptype::I32 {
1189 type_variation_reference: 0,
1190 nullability: Nullability::Nullable as i32,
1191 })),
1192 },
1193 Type {
1194 kind: Some(Kind::I32(ptype::I32 {
1195 type_variation_reference: 0,
1196 nullability: Nullability::Nullable as i32,
1197 })),
1198 },
1199 ],
1200 nullability: Nullability::Nullable as i32,
1201 }),
1202 }),
1203 filter: None,
1204 best_effort_filter: None,
1205 projection: None,
1206 advanced_extension: None,
1207 read_type: Some(ReadType::NamedTable(NamedTable {
1208 names: vec!["test_table".into()],
1209 advanced_extension: None,
1210 })),
1211 };
1212
1213 let filter_expr = Expression {
1215 rex_type: Some(RexType::ScalarFunction(ScalarFunction {
1216 function_reference: 10, arguments: vec![
1218 FunctionArgument {
1219 arg_type: Some(ArgType::Value(Reference(0).into())),
1220 },
1221 FunctionArgument {
1222 arg_type: Some(ArgType::Value(Expression {
1223 rex_type: Some(RexType::Literal(Literal {
1224 literal_type: Some(LiteralType::I32(10)),
1225 nullable: false,
1226 type_variation_reference: 0,
1227 })),
1228 })),
1229 },
1230 ],
1231 options: vec![],
1232 output_type: Some(Type {
1233 kind: Some(Kind::Bool(Boolean {
1234 nullability: Nullability::Required as i32,
1235 type_variation_reference: 0,
1236 })),
1237 }),
1238 #[allow(deprecated)]
1239 args: vec![],
1240 })),
1241 };
1242
1243 let filter_rel = FilterRel {
1244 common: None,
1245 input: Some(Box::new(Rel {
1246 rel_type: Some(RelType::Read(Box::new(read_rel))),
1247 })),
1248 condition: Some(Box::new(filter_expr)),
1249 advanced_extension: None,
1250 };
1251
1252 let rel = Rel {
1253 rel_type: Some(RelType::Filter(Box::new(filter_rel))),
1254 };
1255
1256 let (result, errors) = ctx.textify(&rel);
1257 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1258 let expected = r#"
1259Filter[gt($0, 10:i32):boolean => $0, $1]
1260 Read[test_table => col1:i32?, col2:i32?]"#
1261 .trim_start();
1262 assert_eq!(result, expected);
1263 }
1264
1265 #[test]
1266 fn test_aggregate_function_textify() {
1267 let ctx = TestContext::new()
1268 .with_urn(1, "https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate.yaml")
1269 .with_function(1, 10, "sum")
1270 .with_function(1, 11, "count");
1271
1272 let agg_fn = get_aggregate_func(10, 1);
1274
1275 let value = Value::AggregateFunction(&agg_fn);
1276 let (result, errors) = ctx.textify(&value);
1277
1278 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1279 assert_eq!(result, "sum($1):i64");
1280 }
1281
1282 #[test]
1283 fn test_aggregate_relation_textify() {
1284 let ctx = TestContext::new()
1285 .with_urn(1, "https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate.yaml")
1286 .with_function(1, 10, "sum")
1287 .with_function(1, 11, "count");
1288
1289 let agg_fn1 = get_aggregate_func(10, 1);
1291 let agg_fn2 = get_aggregate_func(11, 1);
1292
1293 let grouping_expressions = vec![Expression {
1294 rex_type: Some(RexType::Selection(Box::new(
1295 FieldIndex(0).to_field_reference(),
1296 ))),
1297 }];
1298
1299 let measures = vec![
1300 aggregate_rel::Measure {
1301 measure: Some(agg_fn1),
1302 filter: None,
1303 },
1304 aggregate_rel::Measure {
1305 measure: Some(agg_fn2),
1306 filter: None,
1307 },
1308 ];
1309
1310 let common = Some(RelCommon {
1311 emit_kind: Some(EmitKind::Emit(Emit {
1312 output_mapping: vec![1, 2], })),
1314 ..Default::default()
1315 });
1316
1317 let aggregate_rel = create_aggregate_rel(grouping_expressions, vec![], measures, common);
1318
1319 let rel = Rel {
1320 rel_type: Some(RelType::Aggregate(Box::new(aggregate_rel))),
1321 };
1322 let (result, errors) = ctx.textify(&rel);
1323
1324 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1325 assert!(result.contains("Aggregate[_ => sum($1):i64, count($1):i64]"));
1327 }
1328
1329 #[test]
1330 fn test_multiple_groupings_on_aggregate_deprecated() {
1331 let ctx = TestContext::new()
1334 .with_urn(1, "https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate.yaml")
1335 .with_function(1, 11, "count");
1336
1337 let grouping_expr_0 = create_exp(0);
1338 let grouping_expr_1 = create_exp(1);
1339
1340 let grouping_sets = vec![
1341 aggregate_rel::Grouping {
1342 #[allow(deprecated)]
1343 grouping_expressions: vec![grouping_expr_0.clone()],
1344 expression_references: vec![],
1345 },
1346 aggregate_rel::Grouping {
1347 #[allow(deprecated)]
1348 grouping_expressions: vec![grouping_expr_0.clone(), grouping_expr_1.clone()],
1349 expression_references: vec![],
1350 },
1351 ];
1352
1353 let aggregate_rel = create_aggregate_rel(vec![], grouping_sets, vec![], None);
1354
1355 let rel = Rel {
1356 rel_type: Some(RelType::Aggregate(Box::new(aggregate_rel))),
1357 };
1358 let (result, errors) = ctx.textify(&rel);
1359
1360 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1361 assert!(result.contains("Aggregate[($0), ($0, $1) => $0, $1]"));
1362 }
1363
1364 #[test]
1365 fn test_multiple_groupings_with_measure_deprecated() {
1366 let ctx = TestContext::new()
1369 .with_urn(1, "https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate.yaml")
1370 .with_function(1, 11, "count");
1371
1372 let agg_fn1 = get_aggregate_func(11, 2);
1373
1374 let grouping_expr_0 = create_exp(0);
1375 let grouping_expr_1 = create_exp(1);
1376
1377 let grouping_sets = vec![
1378 aggregate_rel::Grouping {
1379 #[allow(deprecated)]
1380 grouping_expressions: vec![grouping_expr_0.clone()],
1381 expression_references: vec![],
1382 },
1383 aggregate_rel::Grouping {
1384 #[allow(deprecated)]
1385 grouping_expressions: vec![grouping_expr_0.clone(), grouping_expr_1.clone()],
1386 expression_references: vec![],
1387 },
1388 ];
1389
1390 let measures = vec![aggregate_rel::Measure {
1391 measure: Some(agg_fn1),
1392 filter: None,
1393 }];
1394
1395 let aggregate_rel = create_aggregate_rel(vec![], grouping_sets, measures, None);
1396
1397 let rel = Rel {
1398 rel_type: Some(RelType::Aggregate(Box::new(aggregate_rel))),
1399 };
1400 let (result, errors) = ctx.textify(&rel);
1401
1402 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1403 assert!(result.contains("($0), ($0, $1) => $0, $1, count($2):i64"));
1404 }
1405
1406 #[test]
1407 fn test_multiple_groupings_on_aggregate() {
1408 let ctx = TestContext::new()
1409 .with_urn(1, "https://github.com/substrait-io/substrait/blob/main/extensions/functions_aggregate.yaml")
1410 .with_function(1, 11, "count");
1411
1412 let agg_fn2 = get_aggregate_func(11, 2);
1413
1414 let grouping_expressions = vec![
1415 Expression {
1416 rex_type: Some(RexType::Selection(Box::new(
1417 FieldIndex(0).to_field_reference(),
1418 ))),
1419 },
1420 Expression {
1421 rex_type: Some(RexType::Selection(Box::new(
1422 FieldIndex(1).to_field_reference(),
1423 ))),
1424 },
1425 ];
1426
1427 let grouping_sets = vec![
1428 Grouping {
1429 #[allow(deprecated)]
1430 grouping_expressions: vec![],
1431 expression_references: vec![0, 1],
1432 },
1433 Grouping {
1434 #[allow(deprecated)]
1435 grouping_expressions: vec![],
1436 expression_references: vec![0, 1],
1437 },
1438 Grouping {
1439 #[allow(deprecated)]
1440 grouping_expressions: vec![],
1441 expression_references: vec![1],
1442 },
1443 Grouping {
1444 #[allow(deprecated)]
1445 grouping_expressions: vec![],
1446 expression_references: vec![1, 1],
1447 },
1448 Grouping {
1449 #[allow(deprecated)]
1450 grouping_expressions: vec![],
1451 expression_references: vec![],
1452 },
1453 ];
1454
1455 let measures = vec![aggregate_rel::Measure {
1456 measure: Some(agg_fn2),
1457 filter: None,
1458 }];
1459
1460 let aggregate_rel =
1461 create_aggregate_rel(grouping_expressions, grouping_sets, measures, None);
1462
1463 let rel = Rel {
1464 rel_type: Some(RelType::Aggregate(Box::new(aggregate_rel))),
1465 };
1466 let (result, errors) = ctx.textify(&rel);
1467
1468 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1469 assert!(
1470 result.contains(
1471 "Aggregate[($0, $1), ($0, $1), ($1), ($1, $1), _ => $0, $1, count($2):i64]"
1472 )
1473 );
1474 }
1475
1476 #[test]
1477 fn test_arguments_textify_positional_only() {
1478 let ctx = TestContext::new();
1479 let args = Arguments {
1480 positional: vec![Value::Integer(42), Value::Integer(7)],
1481 named: vec![],
1482 };
1483 let (result, errors) = ctx.textify(&args);
1484 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1485 assert_eq!(result, "42, 7");
1486 }
1487
1488 #[test]
1489 fn test_arguments_textify_named_only() {
1490 let ctx = TestContext::new();
1491 let args = Arguments {
1492 positional: vec![],
1493 named: vec![
1494 NamedArg {
1495 name: Cow::Borrowed("limit"),
1496 value: Value::Integer(10),
1497 },
1498 NamedArg {
1499 name: Cow::Borrowed("offset"),
1500 value: Value::Integer(5),
1501 },
1502 ],
1503 };
1504 let (result, errors) = ctx.textify(&args);
1505 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1506 assert_eq!(result, "limit=10, offset=5");
1507 }
1508
1509 #[test]
1510 fn test_join_relation_unknown_type() {
1511 let ctx = TestContext::new();
1512
1513 let join_rel = JoinRel {
1515 left: Some(Box::new(Rel {
1516 rel_type: Some(RelType::Read(Box::default())),
1517 })),
1518 right: Some(Box::new(Rel {
1519 rel_type: Some(RelType::Read(Box::default())),
1520 })),
1521 expression: Some(Box::new(Expression::default())),
1522 r#type: 999, common: None,
1524 post_join_filter: None,
1525 advanced_extension: None,
1526 };
1527
1528 let rel = Rel {
1529 rel_type: Some(RelType::Join(Box::new(join_rel))),
1530 };
1531 let (result, errors) = ctx.textify(&rel);
1532
1533 assert!(!errors.is_empty(), "Expected errors for unknown join type");
1535 assert!(
1536 result.contains("!{JoinRel}"),
1537 "Expected error token for unknown join type"
1538 );
1539 assert!(
1540 result.contains("Join["),
1541 "Expected Join relation to be formatted"
1542 );
1543 }
1544
1545 #[test]
1546 fn test_arguments_textify_both() {
1547 let ctx = TestContext::new();
1548 let args = Arguments {
1549 positional: vec![Value::Integer(1)],
1550 named: vec![NamedArg {
1551 name: "foo".into(),
1552 value: Value::Integer(2),
1553 }],
1554 };
1555 let (result, errors) = ctx.textify(&args);
1556 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1557 assert_eq!(result, "1, foo=2");
1558 }
1559
1560 #[test]
1561 fn test_arguments_textify_empty() {
1562 let ctx = TestContext::new();
1563 let args = Arguments {
1564 positional: vec![],
1565 named: vec![],
1566 };
1567 let (result, errors) = ctx.textify(&args);
1568 assert!(errors.is_empty(), "Expected no errors, got: {errors:?}");
1569 assert_eq!(result, "_");
1570 }
1571
1572 #[test]
1573 fn test_named_arg_textify_error_token() {
1574 let ctx = TestContext::new();
1575 let named_arg = NamedArg {
1576 name: "foo".into(),
1577 value: Value::Missing(PlanError::invalid(
1578 "my_enum",
1579 Some(Cow::Borrowed("my_enum")),
1580 Cow::Borrowed("my_enum"),
1581 )),
1582 };
1583 let (result, errors) = ctx.textify(&named_arg);
1584 assert!(result.contains("foo=!{my_enum}"), "Output: {result}");
1586 assert!(!errors.is_empty(), "Expected error for error token");
1588 }
1589
1590 #[test]
1591 fn test_join_type_enum_textify() {
1592 assert_eq!(join_rel::JoinType::Inner.as_enum_str().unwrap(), "Inner");
1594 assert_eq!(join_rel::JoinType::Left.as_enum_str().unwrap(), "Left");
1595 assert_eq!(
1596 join_rel::JoinType::LeftSemi.as_enum_str().unwrap(),
1597 "LeftSemi"
1598 );
1599 assert_eq!(
1600 join_rel::JoinType::LeftAnti.as_enum_str().unwrap(),
1601 "LeftAnti"
1602 );
1603 }
1604
1605 #[test]
1606 fn test_join_output_columns() {
1607 let inner_cols = super::join_output_columns(join_rel::JoinType::Inner, 2, 3);
1609 assert_eq!(inner_cols.len(), 5); assert!(matches!(inner_cols[0], Value::Reference(0)));
1611 assert!(matches!(inner_cols[4], Value::Reference(4)));
1612
1613 let left_semi_cols = super::join_output_columns(join_rel::JoinType::LeftSemi, 2, 3);
1615 assert_eq!(left_semi_cols.len(), 2); assert!(matches!(left_semi_cols[0], Value::Reference(0)));
1617 assert!(matches!(left_semi_cols[1], Value::Reference(1)));
1618
1619 let right_semi_cols = super::join_output_columns(join_rel::JoinType::RightSemi, 2, 3);
1621 assert_eq!(right_semi_cols.len(), 3); assert!(matches!(right_semi_cols[0], Value::Reference(0))); assert!(matches!(right_semi_cols[1], Value::Reference(1)));
1624 assert!(matches!(right_semi_cols[2], Value::Reference(2))); let left_mark_cols = super::join_output_columns(join_rel::JoinType::LeftMark, 2, 3);
1628 assert_eq!(left_mark_cols.len(), 3); assert!(matches!(left_mark_cols[0], Value::Reference(0)));
1630 assert!(matches!(left_mark_cols[1], Value::Reference(1)));
1631 assert!(matches!(left_mark_cols[2], Value::Reference(2))); let right_mark_cols = super::join_output_columns(join_rel::JoinType::RightMark, 2, 3);
1635 assert_eq!(right_mark_cols.len(), 4); assert!(matches!(right_mark_cols[0], Value::Reference(0))); assert!(matches!(right_mark_cols[1], Value::Reference(1)));
1638 assert!(matches!(right_mark_cols[2], Value::Reference(2))); assert!(matches!(right_mark_cols[3], Value::Reference(3))); }
1641
1642 fn get_aggregate_func(func_ref: u32, column_ind: i32) -> AggregateFunction {
1643 AggregateFunction {
1644 function_reference: func_ref,
1645 arguments: vec![FunctionArgument {
1646 arg_type: Some(ArgType::Value(Expression {
1647 rex_type: Some(RexType::Selection(Box::new(
1648 FieldIndex(column_ind).to_field_reference(),
1649 ))),
1650 })),
1651 }],
1652 options: vec![],
1653 output_type: Some(Type {
1654 kind: Some(Kind::I64(I64 {
1655 nullability: Nullability::Required as i32,
1656 type_variation_reference: 0,
1657 })),
1658 }),
1659 invocation: 0,
1660 phase: 0,
1661 sorts: vec![],
1662 #[allow(deprecated)]
1663 args: vec![],
1664 }
1665 }
1666
1667 fn create_aggregate_rel(
1668 grouping_expressions: Vec<Expression>,
1669 grouping_sets: Vec<Grouping>,
1670 measures: Vec<aggregate_rel::Measure>,
1671 common: Option<RelCommon>,
1672 ) -> AggregateRel {
1673 let common = common.or_else(|| {
1674 Some(RelCommon {
1675 emit_kind: Some(EmitKind::Direct(Direct {})),
1676 ..Default::default()
1677 })
1678 });
1679 AggregateRel {
1680 input: Some(Box::new(Rel {
1681 rel_type: Some(RelType::Read(Box::new(ReadRel {
1682 common: None,
1683 base_schema: Some(get_basic_schema()),
1684 filter: None,
1685 best_effort_filter: None,
1686 projection: None,
1687 advanced_extension: None,
1688 read_type: Some(ReadType::NamedTable(NamedTable {
1689 names: vec!["orders".into()],
1690 advanced_extension: None,
1691 })),
1692 }))),
1693 })),
1694 grouping_expressions,
1695 groupings: grouping_sets,
1696 measures,
1697 common,
1698 advanced_extension: None,
1699 }
1700 }
1701
1702 fn get_basic_schema() -> NamedStruct {
1703 NamedStruct {
1704 names: vec!["category".into(), "amount".into(), "value".into()],
1705 r#struct: Some(Struct {
1706 type_variation_reference: 0,
1707 types: vec![
1708 Type {
1709 kind: Some(Kind::String(ptype::String {
1710 type_variation_reference: 0,
1711 nullability: Nullability::Nullable as i32,
1712 })),
1713 },
1714 Type {
1715 kind: Some(Kind::Fp64(ptype::Fp64 {
1716 type_variation_reference: 0,
1717 nullability: Nullability::Nullable as i32,
1718 })),
1719 },
1720 Type {
1721 kind: Some(Kind::I32(ptype::I32 {
1722 type_variation_reference: 0,
1723 nullability: Nullability::Nullable as i32,
1724 })),
1725 },
1726 ],
1727 nullability: Nullability::Nullable as i32,
1728 }),
1729 }
1730 }
1731
1732 fn create_exp(column_ind: i32) -> Expression {
1733 Expression {
1734 rex_type: Some(RexType::Selection(Box::new(
1735 FieldIndex(column_ind).to_field_reference(),
1736 ))),
1737 }
1738 }
1739
1740 #[test]
1741 fn test_unsupported_rel_type_produces_failure_token() {
1742 use substrait::proto::CrossRel;
1743
1744 let ctx = TestContext::new();
1745
1746 let rel = Rel {
1750 rel_type: Some(RelType::Cross(Box::new(CrossRel {
1751 common: None,
1752 left: None,
1753 right: None,
1754 advanced_extension: None,
1755 }))),
1756 };
1757
1758 let (result, errors) = ctx.textify(&rel);
1759
1760 assert!(
1762 result.contains("!{Rel}"),
1763 "Expected '!{{Rel}}' in output, got: {result}"
1764 );
1765
1766 assert_eq!(errors.0.len(), 1, "Expected exactly one error: {errors:?}");
1768
1769 match &errors.0[0] {
1771 FormatError::Format(plan_err) => {
1772 assert_eq!(plan_err.message, "Rel");
1773 assert_eq!(
1774 plan_err.error_type,
1775 crate::textify::foundation::FormatErrorType::Unimplemented
1776 );
1777 assert!(
1778 plan_err.lookup.as_deref().unwrap_or("").contains("Cross"),
1779 "Expected lookup to mention 'Cross', got: {:?}",
1780 plan_err.lookup
1781 );
1782 }
1783 other => panic!("Expected FormatError::Format, got: {other:?}"),
1784 }
1785 }
1786}