Skip to content

[BugFix] Fix chained streamstats with window causing NPE (#4800)#5359

Merged
songkant-aws merged 5 commits intoopensearch-project:mainfrom
qianheng-aws:worktree-agent-a084b1f7
Apr 23, 2026
Merged

[BugFix] Fix chained streamstats with window causing NPE (#4800)#5359
songkant-aws merged 5 commits intoopensearch-project:mainfrom
qianheng-aws:worktree-agent-a084b1f7

Conversation

@qianheng-aws
Copy link
Copy Markdown
Collaborator

Description

Chained streamstats commands with window + group by caused NPE in Calcite's RelDecorrelator.createValueGenerator during query plan preparation. This happened because the global=true + window + group path generated LogicalCorrelate nodes, and when two such streamstats were chained, the second correlate contained the first inside its right side, creating nested correlates that Calcite's decorrelator couldn't handle.

Root Cause: The buildStreamWindowJoinPlan method created a LogicalCorrelate where both left and right sides shared the same plan tree. When chaining multiple streamstats, nested correlates triggered NPE in RelDecorrelator.createValueGenerator at line 1272 (frame.oldToNewOutputs.get(oldCorVarOffset) returned null).

Fix: Replaced the correlate-based plan with a self-join (LogicalJoin) plan for the global=true + window + group case. The self-join approach:

  1. Builds a minimal right-side projection with prefixed field names to avoid collisions
  2. Joins left and right on window frame + group equality conditions
  3. Aggregates by all left fields with the window function applied to right-side fields
  4. Sorts by the global sequence number and removes helper columns

This avoids generating LogicalCorrelate nodes for this code path entirely.

Related Issues

Resolves #4800

Check List

  • New functionality includes testing
  • Commits signed per DCO (-s)
  • spotlessCheck passed
  • Unit tests passed
  • Integration tests passed

…ecorrelator (opensearch-project#4800)

Replace the correlate-based plan with a self-join plan for the
global=true + window + group case in streamstats. Nested correlates
(produced when chaining two streamstats with window + group by) caused
NPE in Calcite's RelDecorrelator.createValueGenerator during plan
preparation.

The self-join approach builds a LogicalJoin instead of LogicalCorrelate,
which avoids triggering the decorrelation path that fails with nested
correlation variable references.

Signed-off-by: Heng Qian <qianheng@amazon.com>
@qianheng-aws
Copy link
Copy Markdown
Collaborator Author

Decision Log

Root Cause: The global=true + window + group code path in visitStreamWindow used buildStreamWindowJoinPlan which created LogicalCorrelate nodes. When chaining two streamstats commands (both with window + group by), the second correlate's right side contained the first correlate, creating nested correlates. Calcite 1.41.0's RelDecorrelator.createValueGenerator threw NPE because frame.oldToNewOutputs.get(oldCorVarOffset) returned null for the nested correlation variable.

Approach: Replaced the correlate-based plan with a self-join (LogicalJoin) approach for the global=true + window + group case. The new buildStreamWindowSelfJoinPlan method builds a minimal right-side projection with prefixed field names (__r_<name>__), joins on window frame + group conditions, and aggregates by all left fields. This completely avoids LogicalCorrelate for this code path.

Alternatives Rejected:

  • Converting nested correlates to a single correlate: Too complex, would require restructuring the entire plan
  • Pre-decorrelating the inner correlate before building the outer one: Calcite's decorrelator API doesn't support partial decorrelation
  • Using OVER window functions with PARTITION BY group ORDER BY __seq__: Gives global=false semantics (window within partition) rather than global=true (window over global sequence with group filter)
  • Materializing intermediate results: Calcite doesn't support materializing within a logical plan

Pitfalls:

  • Field name collisions: The self-join duplicates all field names. Solved by prefixing right-side fields with __r_<name>__.
  • Alias propagation: Alias.getAlias() returns null for PPL streamstats (it's a SQL-only field). Must use Alias.getName() instead.
  • Aggregate function mapping: Built a manual mapping from PPL window function names (AVG, MAX, MIN, etc.) to Calcite aggregate calls, since the existing aggVisitor.analyze() would resolve field references against the wrong (left) side of the join.

Things to Watch:

  • The reset + chained window streamstats case (streamstats reset_before=... | streamstats window=...) still fails because the reset path uses correlate and the self-join copies it into the right side. This is tracked with a TODO in the IT class.
  • The self-join plan may have different performance characteristics than the correlate plan for large datasets, since it performs a full self-join + aggregate instead of a correlated scan.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 16, 2026

PR Reviewer Guide 🔍

(Review updated until commit 19d74db)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
📝 TODO sections

🔀 Multiple PR themes

Sub-PR theme: Replace LogicalCorrelate with self-join plan for streamstats window+group

Relevant files:

  • core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java
  • ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java
  • integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global.yaml
  • integ-test/src/test/resources/expectedOutput/calcite/explain_streamstats_global_null_bucket.yaml
  • integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global.yaml
  • integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_streamstats_global_null_bucket.yaml

Sub-PR theme: Add integration tests for chained streamstats with window (issue

Relevant files:

  • integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteStreamstatsCommandIT.java
  • integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4800.yml

⚡ Recommended focus areas for review

Name Collision Risk

The RIGHT_SIDE_FIELD_PREFIX (__r_) and RIGHT_SIDE_FIELD_SUFFIX (__) are simple strings. If a user's actual field name happens to match the pattern __r_<fieldname>__, the right-side projection will silently collide with or shadow that field. There is no guard or uniqueness check against existing field names in the schema.

private static final String RIGHT_SIDE_FIELD_PREFIX = "__r_";

private static final String RIGHT_SIDE_FIELD_SUFFIX = "__";

/** Name of the right-side sequence column in the streamstats self-join plan. */
private static final String RIGHT_SIDE_SEQ_COLUMN =
    RIGHT_SIDE_FIELD_PREFIX + "seq" + RIGHT_SIDE_FIELD_SUFFIX;
Incomplete Field Rewrite

rewriteFieldNamesToRightSide handles Field, QualifiedName, Alias, and Function, but does not handle other expression types (e.g., arithmetic expressions, nested functions, or literals). If a window function argument contains an unsupported expression type, it will be returned as-is without renaming, potentially causing incorrect field resolution against the joined row type.

private UnresolvedExpression rewriteFieldNamesToRightSide(UnresolvedExpression expr) {
  if (expr instanceof Field f && f.getField() instanceof QualifiedName qn) {
    return new Field(toRightSideQualifiedName(qn), f.getFieldArgs());
  }
  if (expr instanceof QualifiedName qn) {
    return toRightSideQualifiedName(qn);
  }
  if (expr instanceof Alias a) {
    return new Alias(a.getName(), rewriteFieldNamesToRightSide(a.getDelegated()));
  }
  if (expr instanceof Function func) {
    List<UnresolvedExpression> rewrittenArgs =
        func.getFuncArgs().stream().map(this::rewriteFieldNamesToRightSide).toList();
    return new Function(func.getFuncName(), rewrittenArgs);
  }
  return expr;
}
Known Limitation (TODO)

The TODO comment in the integration test acknowledges that the reset_before + window chained streamstats path still uses LogicalCorrelate and is broken. This is a known regression/limitation left unresolved in this PR. The old buildStreamWindowJoinPlan method still exists and is still used for the reset path, meaning the original NPE can still be triggered via that code path.

  // For all other nodes, continue traversal
  return super.visit(other);
}
Double Table Scan

The self-join plan pushes leftWithHelpers as the left side and a freshly projected rightProjected (built from leftWithHelpers) as the right side. This means the underlying table is scanned twice (once for left, once for right). For large datasets this could be a significant performance regression compared to the previous correlate-based approach. It should be validated whether the query planner can deduplicate these scans.

// Push left and right
context.relBuilder.push(leftWithHelpers);
context.relBuilder.push(rightProjected);

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 16, 2026

PR Code Suggestions ✨

Latest suggestions up to 19d74db

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Handle QualifiedName in field name collection

The collectFieldNames method does not handle QualifiedName expressions directly. If
an aggregate argument is a bare QualifiedName (not wrapped in a Field), its name
will be silently skipped, causing the right-side projection to omit that field and
leading to a field-not-found error at join time.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [2388-2400]

 private void collectFieldNames(UnresolvedExpression expr, Set<String> fieldNames) {
   if (expr instanceof Field f) {
     fieldNames.add(f.getField().toString());
+  } else if (expr instanceof QualifiedName qn) {
+    fieldNames.add(qn.toString());
   } else if (expr instanceof Alias a) {
     collectFieldNames(a.getDelegated(), fieldNames);
   } else if (expr instanceof WindowFunction wf) {
     collectFieldNames(wf.getFunction(), fieldNames);
   } else if (expr instanceof Function func) {
     for (UnresolvedExpression arg : func.getFuncArgs()) {
       collectFieldNames(arg, fieldNames);
     }
   }
 }
Suggestion importance[1-10]: 6

__

Why: The collectFieldNames method indeed misses bare QualifiedName expressions, which could cause field-not-found errors if aggregate arguments are represented as QualifiedName rather than Field. The fix is accurate and addresses a real gap in the implementation.

Low
Fix potential single-element AND condition issue

When groupFilters has exactly one element, context.relBuilder.and(groupFilters) is
called with a single-element list, which may behave unexpectedly depending on the
RelBuilder implementation. Use context.relBuilder.and(frameFilter,
context.relBuilder.and(groupFilters)) only when there are multiple group filters, or
flatten the list by prepending frameFilter to groupFilters and calling and once on
the combined list.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [2356-2359]

-RexNode joinCondition =
-    groupFilters.isEmpty()
-        ? frameFilter
-        : context.relBuilder.and(frameFilter, context.relBuilder.and(groupFilters));
+List<RexNode> allConditions = new ArrayList<>();
+allConditions.add(frameFilter);
+allConditions.addAll(groupFilters);
+RexNode joinCondition = context.relBuilder.and(allConditions);
Suggestion importance[1-10]: 5

__

Why: The current code calls context.relBuilder.and(groupFilters) which may work fine with a single-element list in most RelBuilder implementations, but the suggested approach of combining all conditions into one list and calling and once is cleaner and avoids potential edge cases. The improvement is valid but the risk is low.

Low
General
Handle all Field types during right-side renaming

A Field whose inner name is not a QualifiedName (e.g., a plain UnresolvedAttribute
or other UnresolvedExpression subtype) falls through without renaming, so its
reference will still point to the left-side column name instead of the right-side
prefixed name. Add a fallback branch to handle Field instances regardless of the
inner name type.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [2449-2465]

 private UnresolvedExpression rewriteFieldNamesToRightSide(UnresolvedExpression expr) {
   if (expr instanceof Field f && f.getField() instanceof QualifiedName qn) {
     return new Field(toRightSideQualifiedName(qn), f.getFieldArgs());
   }
+  if (expr instanceof Field f) {
+    String renamed = toRightSideFieldName(f.getField().toString());
+    return new Field(new QualifiedName(renamed), f.getFieldArgs());
+  }
   if (expr instanceof QualifiedName qn) {
     return toRightSideQualifiedName(qn);
   }
-  ...
+  if (expr instanceof Alias a) {
+    return new Alias(a.getName(), rewriteFieldNamesToRightSide(a.getDelegated()));
+  }
+  if (expr instanceof Function func) {
+    List<UnresolvedExpression> rewrittenArgs =
+        func.getFuncArgs().stream().map(this::rewriteFieldNamesToRightSide).toList();
+    return new Function(func.getFuncName(), rewrittenArgs);
+  }
+  return expr;
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that Field instances with non-QualifiedName inner names won't be renamed, potentially causing incorrect field references. However, in practice PPL field references are typically QualifiedName, so this is a defensive improvement rather than a critical bug fix.

Low

Previous suggestions

Suggestions up to commit a1adf1e
CategorySuggestion                                                                                                                                    Impact
Possible issue
Rewrite all Field variants to right-side names

The method only rewrites Field nodes whose inner field is a QualifiedName, but Field
can also wrap a plain String-based name. If a Field holds a non-QualifiedName field
reference, it will pass through unrewritten, causing the aggregate to reference the
original (left-side) column name instead of the prefixed right-side column. Add a
fallback branch to handle plain Field nodes.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [2449-2465]

 private UnresolvedExpression rewriteFieldNamesToRightSide(UnresolvedExpression expr) {
     if (expr instanceof Field f && f.getField() instanceof QualifiedName qn) {
       return new Field(toRightSideQualifiedName(qn), f.getFieldArgs());
     }
+    if (expr instanceof Field f) {
+      return new Field(
+          new QualifiedName(toRightSideFieldName(f.getField().toString())), f.getFieldArgs());
+    }
     if (expr instanceof QualifiedName qn) {
       return toRightSideQualifiedName(qn);
     }
-    ...
+    if (expr instanceof Alias a) {
+      return new Alias(a.getName(), rewriteFieldNamesToRightSide(a.getDelegated()));
+    }
+    if (expr instanceof Function func) {
+      List<UnresolvedExpression> rewrittenArgs =
+          func.getFuncArgs().stream().map(this::rewriteFieldNamesToRightSide).toList();
+      return new Function(func.getFuncName(), rewrittenArgs);
+    }
+    return expr;
   }
Suggestion importance[1-10]: 7

__

Why: The rewriteFieldNamesToRightSide method only handles Field nodes wrapping a QualifiedName, missing plain Field nodes. Since PPL field references commonly use plain string-based Field nodes, this gap could cause aggregate resolution failures against the right-side projected columns.

Medium
Handle QualifiedName in field name collection

The collectFieldNames method does not handle QualifiedName expressions, which can
appear as direct arguments to aggregate functions. If a QualifiedName is passed as a
function argument, its field name will be silently skipped, causing the right-side
projection to omit that field and leading to a resolution error when building
aggregate calls. Add a branch to handle QualifiedName similarly to Field.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [2388-2400]

 private void collectFieldNames(UnresolvedExpression expr, Set<String> fieldNames) {
     if (expr instanceof Field f) {
       fieldNames.add(f.getField().toString());
+    } else if (expr instanceof QualifiedName qn) {
+      fieldNames.add(qn.toString());
     } else if (expr instanceof Alias a) {
       collectFieldNames(a.getDelegated(), fieldNames);
     } else if (expr instanceof WindowFunction wf) {
       collectFieldNames(wf.getFunction(), fieldNames);
     } else if (expr instanceof Function func) {
       for (UnresolvedExpression arg : func.getFuncArgs()) {
         collectFieldNames(arg, fieldNames);
       }
     }
   }
Suggestion importance[1-10]: 6

__

Why: The collectFieldNames method indeed doesn't handle QualifiedName directly, which could cause fields to be missed if they appear as direct arguments. However, in practice PPL field references typically come through Field nodes, so this may be a low-frequency edge case. Still a valid defensive improvement.

Low
General
Combine join conditions in a single and-call

When groupFilters has exactly one element, context.relBuilder.and(groupFilters) is
called with a single-element list, which may behave unexpectedly depending on the
RelBuilder implementation. It is safer to combine frameFilter with all group filters
in a single and call to avoid potential issues.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [2356-2359]

-RexNode joinCondition =
-        groupFilters.isEmpty()
-            ? frameFilter
-            : context.relBuilder.and(frameFilter, context.relBuilder.and(groupFilters));
+List<RexNode> allConditions = new ArrayList<>();
+allConditions.add(frameFilter);
+allConditions.addAll(groupFilters);
+RexNode joinCondition = context.relBuilder.and(allConditions);
Suggestion importance[1-10]: 3

__

Why: The nested and call with a single-element list is unlikely to cause issues in practice since RelBuilder.and handles single-element lists correctly, but consolidating into one call is cleaner. This is a minor style/robustness improvement with low impact.

Low
Suggestions up to commit 45cda73
CategorySuggestion                                                                                                                                    Impact
General
Avoid unnecessary double-wrapping of join conditions

When groupFilters has exactly one element, context.relBuilder.and(groupFilters)
wraps a single condition in an AND node unnecessarily. While functionally correct,
it produces a slightly less clean plan. More importantly,
context.relBuilder.and(frameFilter, context.relBuilder.and(groupFilters)) could be
simplified to
context.relBuilder.and(ImmutableList.builder().add(frameFilter).addAll(groupFilters).build())
to avoid the double wrapping.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [2343-2346]

-RexNode joinCondition =
-    groupFilters.isEmpty()
-        ? frameFilter
-        : context.relBuilder.and(frameFilter, context.relBuilder.and(groupFilters));
+RexNode joinCondition;
+if (groupFilters.isEmpty()) {
+  joinCondition = frameFilter;
+} else {
+  List<RexNode> allConditions = new ArrayList<>();
+  allConditions.add(frameFilter);
+  allConditions.addAll(groupFilters);
+  joinCondition = context.relBuilder.and(allConditions);
+}
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly identifies that context.relBuilder.and(frameFilter, context.relBuilder.and(groupFilters)) creates unnecessary nesting when groupFilters has elements. The improved code flattens all conditions into a single and() call, producing a cleaner query plan. This is a valid optimization but has minor practical impact.

Low
Improve error message for missing aggregate input fields

If an aggregate function references a field that doesn't exist in the left relation
(e.g., a typo or a computed column not yet projected),
context.relBuilder.field(aggField) will throw an exception with a potentially
confusing message. Adding a check or a try-catch with a more descriptive error
message would improve debuggability.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [2293-2296]

 for (String aggField : aggInputFields) {
-  rightFields.add(context.relBuilder.field(aggField));
+  try {
+    rightFields.add(context.relBuilder.field(aggField));
+  } catch (Exception e) {
+    throw new IllegalArgumentException(
+        "Aggregate input field '" + aggField + "' not found in the input relation. "
+            + "Available fields: " + leftWithHelpers.getRowType().getFieldNames(), e);
+  }
   rightFieldNames.add("__r_" + aggField + "__");
 }
Suggestion importance[1-10]: 3

__

Why: This is a minor improvement to error handling that adds a more descriptive error message when an aggregate field is not found. It improves debuggability but doesn't fix a bug or address a critical issue.

Low
Suggestions up to commit 7c8f991
CategorySuggestion                                                                                                                                    Impact
General
Validate aggregate input fields exist before projection

The collectFieldNames method recursively collects all Field references from the
expression tree, including alias names that may not correspond to actual input
columns. If a WindowFunction wraps a Function that references a computed alias from
a previous step (not a raw column), the field lookup
context.relBuilder.field(aggField) will fail at runtime. Consider adding error
handling or validating that each collected field name exists in the current row type
before adding it to the right projection.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [2283-2296]

-// Include aggregate input fields (extract field names from window functions)
 Set<String> aggInputFields = new LinkedHashSet<>();
 for (UnresolvedExpression wfExpr : node.getWindowFunctionList()) {
   collectFieldNames(wfExpr, aggInputFields);
 }
 // Remove already-included fields
 aggInputFields.remove(seqCol);
 for (UnresolvedExpression groupExpr : groupList) {
   aggInputFields.remove(extractGroupFieldName(groupExpr));
 }
+// Validate that all collected fields exist in the current row type
+List<String> availableFields = leftWithHelpers.getRowType().getFieldNames();
+for (String aggField : aggInputFields) {
+  if (!availableFields.contains(aggField)) {
+    throw new IllegalArgumentException(
+        "Aggregate input field '" + aggField + "' not found in input row type: " + availableFields);
+  }
+}
Suggestion importance[1-10]: 4

__

Why: Adding early validation that collected field names exist in the input row type is a reasonable defensive measure that would produce clearer error messages. The improved_code accurately reflects the suggested change and the concern is legitimate for computed aliases from prior steps.

Low
Validate single-argument assumption for aggregate functions

The EARLIEST and LATEST mappings to min/max may produce incorrect results when the
input has multiple arguments or when ordering semantics differ from min/max. More
critically, if a function has multiple arguments (e.g., COUNT(DISTINCT x, y)), only
args.get(0) is used and the rest are silently ignored. Consider validating the
argument count or documenting this limitation explicitly.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [2456-2484]

 aggCall =
     switch (funcName) {
       case "AVG" -> context.relBuilder.avg(false, alias, argRef);
       case "SUM" -> context.relBuilder.sum(false, alias, argRef);
       case "MIN" -> context.relBuilder.min(alias, argRef);
       case "MAX" -> context.relBuilder.max(alias, argRef);
       case "COUNT" -> context.relBuilder.count(false, alias, argRef);
       case "DC", "DISTINCT_COUNT" -> context.relBuilder.count(true, alias, argRef);
       ...
       case "EARLIEST" -> context.relBuilder.min(alias, argRef);
       case "LATEST" -> context.relBuilder.max(alias, argRef);
       default ->
           throw new UnsupportedOperationException("Unexpected window function: " + funcName);
     };
+if (args.size() > 1) {
+  throw new UnsupportedOperationException(
+      "Window function '" + funcName + "' with multiple arguments is not supported in self-join plan");
+}
Suggestion importance[1-10]: 3

__

Why: The suggestion raises a valid concern about silently ignoring extra arguments, but the improved_code places the validation after the switch statement rather than before args.get(0) is accessed, making it logically misplaced. The concern about EARLIEST/LATEST semantics is minor since they are intentionally mapped to min/max in the existing codebase.

Low
Guard against field resolution on wrong join side

The field lookup is performed on context.relBuilder.peek(), but at this point the
builder's top node is the join result (before the aggregate is built). However,
buildAggCallsFromJoinedRight is called before context.relBuilder.aggregate(...), so
peek() correctly reflects the join. The issue is that leftFieldCount is captured
before the join is pushed, but the join output has leftFieldCount + rightFieldCount
fields. If the right side has duplicate field names with the left side, the lookup
could match a left-side field instead. Consider asserting that rightFieldIndex >=
leftFieldCount to guard against this.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [2441-2453]

-// Find it in the right side (after left fields)
-int rightFieldIndex = -1;
-RelDataType joinedRowType = context.relBuilder.peek().getRowType();
-List<String> fieldNames = joinedRowType.getFieldNames();
 for (int i = leftFieldCount; i < fieldNames.size(); i++) {
   if (fieldNames.get(i).equals(rightArgFieldName)) {
     rightFieldIndex = i;
     break;
   }
 }
 if (rightFieldIndex == -1) {
   throw new IllegalArgumentException(
       "Cannot find aggregate input field '" + rightArgFieldName + "' in right side of join");
 }
+assert rightFieldIndex >= leftFieldCount
+    : "Field '" + rightArgFieldName + "' resolved to left side of join at index " + rightFieldIndex;
Suggestion importance[1-10]: 3

__

Why: The loop already starts at leftFieldCount, so it can never match a left-side field. The assertion would always be true and adds no real protection. The suggestion misunderstands the existing code's loop bounds.

Low
Suggestions up to commit b5e5995
CategorySuggestion                                                                                                                                    Impact
Possible issue
Guard zero-argument branch to COUNT only

When args is empty and the function name is not COUNT, this branch silently produces
a COUNT(*) for any zero-argument function. The function name should be checked
before defaulting to count(), or an unsupported-operation exception should be thrown
for unrecognized zero-argument functions to avoid silent incorrect results.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [2425-2431]

 if (args.isEmpty()) {
-  // COUNT()
+  if (!funcName.equals("COUNT")) {
+    throw new UnsupportedOperationException(
+        "Zero-argument window function not supported: " + funcName);
+  }
   aggCall = context.relBuilder.count();
   if (alias != null) {
     aggCall = aggCall.as(alias);
   }
 } else {
Suggestion importance[1-10]: 5

__

Why: The suggestion is valid - silently defaulting to COUNT(*) for any zero-argument function is a potential correctness issue. However, in practice, streamstats window functions with zero arguments are unlikely to be anything other than COUNT, so the impact is moderate.

Low
General
Use deterministic set ordering for aggregate fields

Iterating over a HashSet gives non-deterministic ordering of the right-side
projected fields. Since buildSingleAggCall later searches by name this is safe for
correctness today, but if two aggregate input fields share a mangled name (e.g. a
field literally named r_X) the lookup could match the wrong column. Using a
LinkedHashSet instead of HashSet for aggInputFields would at least make the ordering
deterministic and easier to debug.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [2283-2295]

-for (String aggField : aggInputFields) {
-  rightFields.add(context.relBuilder.field(aggField));
-  rightFieldNames.add("__r_" + aggField + "__");
-}
+Set<String> aggInputFields = new LinkedHashSet<>();
Suggestion importance[1-10]: 4

__

Why: Using LinkedHashSet instead of HashSet for aggInputFields improves determinism and debuggability. The suggestion correctly identifies the non-deterministic ordering issue, though the improved_code only shows the declaration change and not the full context of the loop.

Low
Handle non-field aggregate argument expressions

Aggregate function arguments can be arbitrary expressions (e.g. age * 2, a literal,
or a nested function call), not just Field or Alias. When such an expression is
encountered, extractFieldNameFromExpr throws an IllegalArgumentException, causing
the entire query to fail. The method should handle at least Function and literal
cases, or buildSingleAggCall should fall back gracefully.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [2484-2492]

 private String extractFieldNameFromExpr(UnresolvedExpression expr) {
   if (expr instanceof Field f) {
     return f.getField().toString();
   } else if (expr instanceof Alias a) {
     return extractFieldNameFromExpr(a.getDelegated());
+  } else if (expr instanceof Function func) {
+    // For expressions like func(field), recurse into first arg
+    if (!func.getFuncArgs().isEmpty()) {
+      return extractFieldNameFromExpr(func.getFuncArgs().get(0));
+    }
+    throw new IllegalArgumentException("Cannot extract field name from function with no args: " + expr);
   } else {
     throw new IllegalArgumentException("Cannot extract field name from: " + expr);
   }
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly identifies that extractFieldNameFromExpr only handles Field and Alias, which could fail for expressions like age * 2. However, the proposed fix of recursing into the first argument of a Function is a simplistic heuristic that may not correctly handle all cases.

Low
Add assertions to the new unit test

The test only verifies that getRelNode does not throw, but makes no assertions about
the resulting logical plan or SQL output. Without at least a verifyLogical or
verifyPPLToSparkSQL call, regressions in the generated plan will go undetected.

ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStreamstatsTest.java [206-212]

 @Test
 public void testMultipleStreamstatsWithWindow() {
   String ppl =
       "source=EMP | streamstats window=2 avg(SAL) as avg_sal by DEPTNO"
           + " | streamstats window=2 avg(avg_sal) as avg_dept_sal by DEPTNO";
   RelNode root = getRelNode(ppl);
+  // Verify the plan can be built without NPE and produces a valid logical plan
+  assertNotNull(root);
+  verifyLogical(root, root.explain()); // replace with expected string once stable
 }
Suggestion importance[1-10]: 3

__

Why: The test indeed lacks assertions beyond not throwing an exception. However, the improved_code uses root.explain() as the expected string which is self-referential and not a meaningful assertion, making the suggested improvement questionable in practice.

Low

- Update explain_streamstats_global.yaml and explain_streamstats_global_null_bucket.yaml
  for both pushdown and no-pushdown cases to reflect the new self-join plan
  (LogicalJoin + LogicalAggregate) instead of the old LogicalCorrelate plan
- Guard zero-argument branch in buildSingleAggCall to only allow COUNT()
- Use LinkedHashSet for aggInputFields for deterministic field ordering
- Add assertions to testMultipleStreamstatsWithWindow unit test

Signed-off-by: Heng Qian <qianheng@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 7c8f991

Merged origin/main which includes version bump to OpenSearch 3.7
(Jackson 2 to 3 parser API change that removes empty _source.excludes
arrays from serialization) and the unified SQL language spec PR.

Two conflicts in streamstats global explain YAMLs were resolved by:
- Keeping this PR's new LogicalJoin-based plan (replaces LogicalCorrelate)
- Adopting the new serialization format that omits "excludes":[]

Signed-off-by: Heng Qian <qianheng@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 45cda73

}
RexNode argRef = context.relBuilder.field(rightFieldIndex);

aggCall =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this code aggCall=... reuse the CalciteAggCallVisitor? Or we have to maintain multiple similar logic spots

Copy link
Copy Markdown
Collaborator Author

@qianheng-aws qianheng-aws Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call — refactored in 19d74db to reuse aggVisitor (CalciteAggCallVisitor). Instead of switching on the PPL function name to build AggCalls, the new code rewrites the window function's field references to the prefixed right-side names (SAL -> r_SAL), unwraps the WindowFunction to its inner Function, and delegates to aggVisitor.analyze(). This drops ~100 lines of duplicated AVG/SUM/MIN/MAX/COUNT/DC/STDDEV_/VAR_/EARLIEST/LATEST mapping and keeps the streamstats self-join path consistent with regular stats/eventstats aggregation handling.

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit a1adf1e

Address review feedback from @LantaoJin: instead of duplicating the
PPL-window-function to Calcite-AggCall mapping inside
buildSingleAggCall, rewrite the window function's field references to
the prefixed right-side column names and delegate aggregate resolution
to the shared aggVisitor. This eliminates ~100 lines of parallel
function-name switching (AVG/SUM/MIN/MAX/COUNT/DC/STDDEV_*/VAR_*/
EARLIEST/LATEST) and keeps the streamstats self-join path consistent
with regular stats/eventstats aggregation handling.

Also extracts the "__r_<name>__" naming convention into named constants
(RIGHT_SIDE_FIELD_PREFIX / RIGHT_SIDE_FIELD_SUFFIX /
RIGHT_SIDE_SEQ_COLUMN) and a toRightSideFieldName helper so the prefix
is defined in one place.

Harness: add a reminder under Path B (AST / Function Implementation)
in ppl-bugfix-reference.md to reuse aggVisitor / rexVisitor before
hand-rolling a new function-name switch.

Signed-off-by: Heng Qian <qianheng@amazon.com>
@qianheng-aws qianheng-aws force-pushed the worktree-agent-a084b1f7 branch from a1adf1e to 19d74db Compare April 23, 2026 05:52
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 19d74db

@songkant-aws songkant-aws merged commit 3b16951 into opensearch-project:main Apr 23, 2026
58 of 59 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] Some complex use cases of streamstats failed after "Bump Calcite to 1.41.0"

3 participants