Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions core/src/main/java/io/substrait/dsl/SubstraitBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import io.substrait.extension.DefaultExtensionCatalog;
import io.substrait.extension.SimpleExtension;
import io.substrait.function.ToTypeString;
import io.substrait.plan.ImmutableExecutionBehavior;
import io.substrait.plan.Plan;
import io.substrait.plan.Plan.ExecutionBehavior.VariableEvaluationMode;
import io.substrait.relation.AbstractWriteRel;
import io.substrait.relation.Aggregate;
import io.substrait.relation.Aggregate.Measure;
Expand Down Expand Up @@ -1540,13 +1542,54 @@ public Plan.Root root(Rel rel) {
}

/**
* Creates a plan from a plan root.
* Creates a plan from a plan root with default execution behavior.
*
* <p>The plan is created with {@link VariableEvaluationMode#VARIABLE_EVALUATION_MODE_PER_PLAN} as
* the default variable evaluation mode. To specify a custom execution behavior, use {@link
* #plan(Plan.ExecutionBehavior, Plan.Root)} instead.
*
* @param root the plan root
* @return a new {@link Plan}
*/
public Plan plan(Plan.Root root) {
return Plan.builder().addRoots(root).build();
return plan(
ImmutableExecutionBehavior.builder()
.variableEvaluationMode(VariableEvaluationMode.VARIABLE_EVALUATION_MODE_PER_PLAN)
Comment thread
benbellick marked this conversation as resolved.
.build(),
root);
}

/**
* Creates a plan from a plan root with custom execution behavior.
*
* @param executionBehavior the execution behavior for the plan
* @param root the plan root
* @return a new {@link Plan}
*/
public Plan plan(Plan.ExecutionBehavior executionBehavior, Plan.Root root) {
return Plan.builder().executionBehavior(executionBehavior).addRoots(root).build();
}

/**
* Creates a plan from multiple plan roots with custom execution behavior.
*
* @param executionBehavior the execution behavior for the plan
* @param roots the plan roots
* @return a new {@link Plan}
*/
public Plan plan(Plan.ExecutionBehavior executionBehavior, Plan.Root... roots) {
return Plan.builder().executionBehavior(executionBehavior).roots(Arrays.asList(roots)).build();
}

/**
* Creates a plan from multiple plan roots with custom execution behavior.
*
* @param executionBehavior the execution behavior for the plan
* @param roots the plan roots as an iterable
* @return a new {@link Plan}
*/
public Plan plan(Plan.ExecutionBehavior executionBehavior, Iterable<Plan.Root> roots) {
return Plan.builder().executionBehavior(executionBehavior).roots(roots).build();
}

/**
Expand Down
44 changes: 44 additions & 0 deletions core/src/main/java/io/substrait/plan/Plan.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,35 @@ public Version getVersion() {

public abstract Optional<AdvancedExtension> getAdvancedExtension();

public abstract ExecutionBehavior getExecutionBehavior();

/**
* Validates that the execution behavior is properly configured.
*
* <p>This validation method ensures that:
*
* <ul>
* <li>The {@link ExecutionBehavior} field is present (not null or empty) - ExecutionBehavior is
* a required field
* <li>The {@link ExecutionBehavior.VariableEvaluationMode} is set to a valid value (not {@link
* ExecutionBehavior.VariableEvaluationMode#VARIABLE_EVALUATION_MODE_UNSPECIFIED})
* </ul>
*
* @throws IllegalArgumentException if the execution behavior is not present, or if the variable
* evaluation mode is set to {@link
* ExecutionBehavior.VariableEvaluationMode#VARIABLE_EVALUATION_MODE_UNSPECIFIED}
*/
@Value.Check
protected void check() {
ExecutionBehavior behavior = getExecutionBehavior();
if (behavior.getVariableEvaluationMode()
== ExecutionBehavior.VariableEvaluationMode.VARIABLE_EVALUATION_MODE_UNSPECIFIED) {
throw new IllegalArgumentException(
"ExecutionBehavior requires a specified VariableEvaluationMode, but got: "
+ behavior.getVariableEvaluationMode());
}
}

public static ImmutablePlan.Builder builder() {
return ImmutablePlan.builder();
}
Expand Down Expand Up @@ -69,4 +98,19 @@ public static ImmutableRoot.Builder builder() {
return ImmutableRoot.builder();
}
}

@Value.Immutable
public abstract static class ExecutionBehavior {
public abstract VariableEvaluationMode getVariableEvaluationMode();

public static ImmutableExecutionBehavior.Builder builder() {
return ImmutableExecutionBehavior.builder();
}

public enum VariableEvaluationMode {
VARIABLE_EVALUATION_MODE_UNSPECIFIED,
VARIABLE_EVALUATION_MODE_PER_PLAN,
VARIABLE_EVALUATION_MODE_PER_RECORD
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an established pattern for representing these enums in the library that we should follow here for consistency:

/** Defines the type of window bounds (rows or range) for window functions. */
enum WindowBoundsType {
/** Unspecified window bounds type. */
UNSPECIFIED(io.substrait.proto.Expression.WindowFunction.BoundsType.BOUNDS_TYPE_UNSPECIFIED),
/** Window bounds based on row count. */
ROWS(io.substrait.proto.Expression.WindowFunction.BoundsType.BOUNDS_TYPE_ROWS),
/** Window bounds based on value range. */
RANGE(io.substrait.proto.Expression.WindowFunction.BoundsType.BOUNDS_TYPE_RANGE);
private final io.substrait.proto.Expression.WindowFunction.BoundsType proto;
WindowBoundsType(io.substrait.proto.Expression.WindowFunction.BoundsType proto) {
this.proto = proto;
}

/** Defines how aggregation functions are invoked (ALL, DISTINCT). */
enum AggregationInvocation {
/** Unspecified aggregation invocation. */
UNSPECIFIED(AggregateFunction.AggregationInvocation.AGGREGATION_INVOCATION_UNSPECIFIED),
/** Aggregate over all values. */
ALL(AggregateFunction.AggregationInvocation.AGGREGATION_INVOCATION_ALL),
/** Aggregate over distinct values only. */
DISTINCT(AggregateFunction.AggregationInvocation.AGGREGATION_INVOCATION_DISTINCT);
private final io.substrait.proto.AggregateFunction.AggregationInvocation proto;
AggregationInvocation(io.substrait.proto.AggregateFunction.AggregationInvocation proto) {
this.proto = proto;
}

}
66 changes: 66 additions & 0 deletions core/src/main/java/io/substrait/plan/PlanProtoConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import io.substrait.extension.ExtensionCollector;
import io.substrait.extension.ExtensionProtoConverter;
import io.substrait.extension.SimpleExtension.ExtensionCollection;
import io.substrait.proto.ExecutionBehavior;
import io.substrait.proto.ExecutionBehavior.VariableEvaluationMode;
import io.substrait.proto.Plan;
import io.substrait.proto.PlanRel;
import io.substrait.proto.Rel;
Expand Down Expand Up @@ -61,6 +63,13 @@ public PlanProtoConverter(
this.extensionProtoConverter = extensionProtoConverter;
}

/**
* Converts a {@link io.substrait.plan.Plan} object to its protobuf representation.
*
* @param plan the Plan object to convert, must not be null
* @return the protobuf Plan representation
* @throws IllegalArgumentException if the plan contains invalid data
*/
public Plan toProto(final io.substrait.plan.Plan plan) {
final List<PlanRel> planRels = new ArrayList<>();
final ExtensionCollector functionCollector = new ExtensionCollector(extensionCollection);
Expand Down Expand Up @@ -97,6 +106,63 @@ public Plan toProto(final io.substrait.plan.Plan plan) {

builder.setVersion(versionBuilder);

// Set execution behavior
builder.setExecutionBehavior(toProtoExecutionBehavior(plan.getExecutionBehavior()));
Comment thread
vbarua marked this conversation as resolved.

return builder.build();
}

/**
* Converts an {@link io.substrait.plan.Plan.ExecutionBehavior} to its protobuf representation.
*
* <p>This method converts the execution behavior configuration, including the variable evaluation
* mode, from the POJO representation to the protobuf format.
*
* @param executionBehavior the ExecutionBehavior to convert, must not be null
* @return the protobuf ExecutionBehavior representation
* @throws IllegalArgumentException if the variable evaluation mode is unknown
*/
private ExecutionBehavior toProtoExecutionBehavior(
final io.substrait.plan.Plan.ExecutionBehavior executionBehavior) {
return ExecutionBehavior.newBuilder()
.setVariableEvalMode(
toProtoVariableEvaluationMode(executionBehavior.getVariableEvaluationMode()))
.build();
}

/**
* Converts a {@link io.substrait.plan.Plan.ExecutionBehavior.VariableEvaluationMode} to its
* protobuf representation.
*
* <p>Supported modes:
*
* <ul>
* <li>{@link
* io.substrait.plan.Plan.ExecutionBehavior.VariableEvaluationMode#VARIABLE_EVALUATION_MODE_UNSPECIFIED}
* - Unspecified mode (should be avoided in valid plans)
* <li>{@link
* io.substrait.plan.Plan.ExecutionBehavior.VariableEvaluationMode#VARIABLE_EVALUATION_MODE_PER_PLAN}
* - Variables are evaluated once per plan execution
* <li>{@link
* io.substrait.plan.Plan.ExecutionBehavior.VariableEvaluationMode#VARIABLE_EVALUATION_MODE_PER_RECORD}
* - Variables are evaluated for each record
* </ul>
*
* @param mode the VariableEvaluationMode to convert, must not be null
* @return the protobuf VariableEvaluationMode representation
* @throws IllegalArgumentException if the mode is unknown or not supported
*/
private VariableEvaluationMode toProtoVariableEvaluationMode(
final io.substrait.plan.Plan.ExecutionBehavior.VariableEvaluationMode mode) {
switch (mode) {
case VARIABLE_EVALUATION_MODE_UNSPECIFIED:
return VariableEvaluationMode.VARIABLE_EVALUATION_MODE_UNSPECIFIED;
case VARIABLE_EVALUATION_MODE_PER_PLAN:
return VariableEvaluationMode.VARIABLE_EVALUATION_MODE_PER_PLAN;
case VARIABLE_EVALUATION_MODE_PER_RECORD:
return VariableEvaluationMode.VARIABLE_EVALUATION_MODE_PER_RECORD;
default:
throw new IllegalArgumentException("Unknown VariableEvaluationMode: " + mode);
}
}
}
107 changes: 98 additions & 9 deletions core/src/main/java/io/substrait/plan/ProtoPlanConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import io.substrait.extension.ImmutableExtensionLookup;
import io.substrait.extension.ProtoExtensionConverter;
import io.substrait.extension.SimpleExtension.ExtensionCollection;
import io.substrait.plan.Plan.ExecutionBehavior;
import io.substrait.plan.Plan.ExecutionBehavior.VariableEvaluationMode;
import io.substrait.proto.PlanRel;
import io.substrait.relation.ProtoRelConverter;
import io.substrait.relation.Rel;
Expand Down Expand Up @@ -66,6 +68,21 @@ protected ProtoRelConverter getProtoRelConverter(final ExtensionLookup functionL
return new ProtoRelConverter(functionLookup, this.extensionCollection, protoExtensionConverter);
}

/**
Comment thread
nielspardon marked this conversation as resolved.
* Converts a protobuf {@link io.substrait.proto.Plan} to a {@link Plan} POJO.
*
* <p><b>Note:</b> Execution behavior is optional in the protobuf message, but the {@link Plan}
* POJO requires it. Conversion handles a missing execution behavior based on the plan version:
*
* <p><b>Note:</b> If {@code Plan.ExecutionBehavior.VariableEvaluationMode} is not set, it will be
* defaulted to {@code VARIABLE_EVALUATION_MODE_PER_PLAN}. Once other producers populate this
* field correctly, this compatibility workaround will be removed.
*
* @param plan the protobuf Plan to convert, must not be null
* @return the converted Plan POJO
* @throws IllegalArgumentException if the plan contains invalid data or if the execution behavior
* validation fails
*/
public Plan from(io.substrait.proto.Plan plan) {
ExtensionLookup functionLookup = ImmutableExtensionLookup.builder().from(plan).build();
ProtoRelConverter relConverter = getProtoRelConverter(functionLookup);
Expand All @@ -92,15 +109,87 @@ public Plan from(io.substrait.proto.Plan plan) {
versionBuilder.producer(Optional.of(plan.getVersion().getProducer()));
}

return Plan.builder()
.roots(roots)
.expectedTypeUrls(plan.getExpectedTypeUrlsList())
.advancedExtension(
Optional.ofNullable(
plan.hasAdvancedExtensions()
? protoExtensionConverter.fromProto(plan.getAdvancedExtensions())
: null))
.version(versionBuilder.build())
ImmutablePlan.Builder planBuilder =
Plan.builder()
.roots(roots)
.expectedTypeUrls(plan.getExpectedTypeUrlsList())
.advancedExtension(
Optional.ofNullable(
plan.hasAdvancedExtensions()
? protoExtensionConverter.fromProto(plan.getAdvancedExtensions())
: null))
.version(versionBuilder.build());

// Set execution behavior (required field)
if (plan.hasExecutionBehavior()) {
planBuilder.executionBehavior(fromProtoExecutionBehavior(plan.getExecutionBehavior()));
} else {
// Set default ExecutionBehavior for older plans that don't have it
planBuilder.executionBehavior(
ExecutionBehavior.builder()
.variableEvaluationMode(VariableEvaluationMode.VARIABLE_EVALUATION_MODE_PER_PLAN)
.build());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually the only piece we need for backwards compatability IMO.

}

return planBuilder.build();
}

/**
* Converts a protobuf {@link io.substrait.proto.ExecutionBehavior} to its POJO representation.
*
* <p>This method converts the execution behavior configuration from the protobuf format to the
* POJO representation, including the variable evaluation mode.
*
* @param executionBehavior the protobuf ExecutionBehavior to convert, must not be null
* @return the POJO ExecutionBehavior representation
* @throws IllegalArgumentException if the variable evaluation mode is unknown or UNRECOGNIZED
*/
private io.substrait.plan.Plan.ExecutionBehavior fromProtoExecutionBehavior(
final io.substrait.proto.ExecutionBehavior executionBehavior) {
return io.substrait.plan.Plan.ExecutionBehavior.builder()
.variableEvaluationMode(
fromProtoVariableEvaluationMode(executionBehavior.getVariableEvalMode()))
.build();
}

/**
* Converts a protobuf {@link io.substrait.proto.ExecutionBehavior.VariableEvaluationMode} to its
* POJO representation.
*
* <p>Supported modes:
*
* <ul>
* <li>{@link
* io.substrait.proto.ExecutionBehavior.VariableEvaluationMode#VARIABLE_EVALUATION_MODE_UNSPECIFIED}
* - Unspecified mode (will cause validation failure in Plan)
* <li>{@link
* io.substrait.proto.ExecutionBehavior.VariableEvaluationMode#VARIABLE_EVALUATION_MODE_PER_PLAN}
* - Variables are evaluated once per plan execution
* <li>{@link
* io.substrait.proto.ExecutionBehavior.VariableEvaluationMode#VARIABLE_EVALUATION_MODE_PER_RECORD}
* - Variables are evaluated for each record
* </ul>
*
* @param mode the protobuf VariableEvaluationMode to convert, must not be null
* @return the POJO VariableEvaluationMode representation
* @throws IllegalArgumentException if the mode is UNRECOGNIZED or not supported
*/
private io.substrait.plan.Plan.ExecutionBehavior.VariableEvaluationMode
fromProtoVariableEvaluationMode(
final io.substrait.proto.ExecutionBehavior.VariableEvaluationMode mode) {
switch (mode) {
case VARIABLE_EVALUATION_MODE_UNSPECIFIED:
return io.substrait.plan.Plan.ExecutionBehavior.VariableEvaluationMode
.VARIABLE_EVALUATION_MODE_UNSPECIFIED;
case VARIABLE_EVALUATION_MODE_PER_PLAN:
return io.substrait.plan.Plan.ExecutionBehavior.VariableEvaluationMode
.VARIABLE_EVALUATION_MODE_PER_PLAN;
case VARIABLE_EVALUATION_MODE_PER_RECORD:
return io.substrait.plan.Plan.ExecutionBehavior.VariableEvaluationMode
.VARIABLE_EVALUATION_MODE_PER_RECORD;
case UNRECOGNIZED:
default:
throw new IllegalArgumentException("Unknown VariableEvaluationMode: " + mode);
}
}
}
Loading
Loading