Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public Optimizer(CascadesContext cascadesContext) {
*/
public void execute() {
MoreFieldsThread.keepFunctionSignature(() -> {
cascadesContext.setRewritePlan(normalizeCtePlan(cascadesContext.getRewritePlan()));
// generate inlined CTE alternative for CBO comparison
Plan cboInlinedPlan = generateCTEInlineAlternative();
// init memo
Expand Down Expand Up @@ -151,7 +152,7 @@ private Plan generateFullCTEInline() {
CTEInliner cteInliner = new CTEInliner(cascadesContext.getStatementContext());
Plan inlinedPlan = cteInliner.generateInlinedPlan(rewritePlan);
if (inlinedPlan != null) {
return rewriteInlinedPlan(inlinedPlan);
return normalizeCtePlan(rewriteInlinedPlan(inlinedPlan));
}
return null;
}
Expand All @@ -166,14 +167,29 @@ private Plan generateSelectiveCTEInline() {
if (inlinedPlan != null) {
inlinedPlan = rewriteInlinedPlan(inlinedPlan);
if (inlinedPlan.anyMatch(p -> p instanceof LogicalEmptyRelation)) {
inlinedPlan = eliminateEmptyRelation(inlinedPlan);
inlinedPlan = normalizeCtePlan(inlinedPlan);
cascadesContext.setRewritePlan(inlinedPlan);
return null;
}
}
return null;
}

private Plan normalizeCtePlan(Plan plan) {
Plan currentPlan = plan;
while (true) {
CTEInliner cteInliner = new CTEInliner(cascadesContext.getStatementContext());
Plan normalizedPlan = cteInliner.inlineByCurrentConsumerCount(currentPlan);
if (normalizedPlan.anyMatch(p -> p instanceof LogicalEmptyRelation)) {
normalizedPlan = eliminateEmptyRelation(normalizedPlan);
}
if (currentPlan.equals(normalizedPlan)) {
return normalizedPlan;
}
currentPlan = normalizedPlan;
}
}

private Plan eliminateEmptyRelation(Plan plan) {
CascadesContext ctx = CascadesContext.initContext(
cascadesContext.getStatementContext(), plan, PhysicalProperties.ANY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
Expand All @@ -41,8 +42,10 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Generate an inlined alternative plan for CTE optimization.
Expand All @@ -65,6 +68,7 @@ public class CTEInliner extends DefaultPlanRewriter<Void> {
private final StatementContext statementContext;
// Map from CTEId to the CTE producer node (extracted from CTEAnchor.left())
private final Map<CTEId, LogicalCTEProducer<?>> cteProducers = new HashMap<>();
private final Set<CTEId> cteIdsToRemove = new HashSet<>();
private final boolean unionAllOnly;

public CTEInliner(StatementContext statementContext) {
Expand All @@ -81,6 +85,7 @@ public CTEInliner(StatementContext statementContext, boolean unionAllOnly) {
* Returns null if no CTEs can be inlined.
*/
public Plan generateInlinedPlan(Plan plan) {
clearRewriteCandidates();
// First pass: collect all CTE producers that can be inlined
collectCTEProducers(plan);

Expand All @@ -92,6 +97,23 @@ public Plan generateInlinedPlan(Plan plan) {
return plan.accept(this, null);
}

/**
* Recursively remove unused CTE anchors and inline CTEs whose live consumer count
* is small enough after rewrite rules change the plan shape.
*/
public Plan inlineByCurrentConsumerCount(Plan plan) {
Plan currentPlan = plan;
while (collectConsumerDrivenCandidates(currentPlan)) {
currentPlan = currentPlan.accept(this, null);
}
return currentPlan;
}

private void clearRewriteCandidates() {
cteProducers.clear();
cteIdsToRemove.clear();
}

private void collectCTEProducers(Plan plan) {
plan.foreach(p -> {
if (p instanceof LogicalCTEAnchor) {
Expand All @@ -113,6 +135,40 @@ private void collectCTEProducers(Plan plan) {
});
}

private boolean collectConsumerDrivenCandidates(Plan plan) {
clearRewriteCandidates();
Map<CTEId, LogicalCTEProducer<?>> allCteProducers = new HashMap<>();
Map<CTEId, Integer> cteConsumerCounts = new HashMap<>();
plan.foreach(p -> {
if (p instanceof LogicalCTEAnchor) {
LogicalCTEAnchor<?, ?> anchor = (LogicalCTEAnchor<?, ?>) p;
allCteProducers.put(anchor.getCteId(), (LogicalCTEProducer<?>) anchor.left());
} else if (p instanceof LogicalCTEConsumer) {
LogicalCTEConsumer consumer = (LogicalCTEConsumer) p;
cteConsumerCounts.merge(consumer.getCteId(), 1, Integer::sum);
}
});

int threshold = statementContext.getConnectContext().getSessionVariable().inlineCTEReferencedThreshold;
for (Map.Entry<CTEId, LogicalCTEProducer<?>> entry : allCteProducers.entrySet()) {
CTEId cteId = entry.getKey();
LogicalCTEProducer<?> producer = entry.getValue();
int consumerCount = cteConsumerCounts.getOrDefault(cteId, 0);
if (consumerCount == 0) {
cteIdsToRemove.add(cteId);
} else if (producer.child() instanceof LogicalEmptyRelation
|| (consumerCount <= threshold && canInline(producer))) {
cteProducers.put(cteId, producer);
}
}
return !cteProducers.isEmpty() || !cteIdsToRemove.isEmpty();
}

private boolean canInline(LogicalCTEProducer<?> producer) {
return !statementContext.isForceMaterializeCTE(producer.getCteId())
&& !containsNondeterministicFunction(producer);
}

private boolean containsNondeterministicFunction(LogicalCTEProducer<?> producer) {
List<Expression> nondeterministicFunctions = new ArrayList<>();
producer.accept(NondeterministicFunctionCollector.INSTANCE, nondeterministicFunctions);
Expand All @@ -127,13 +183,14 @@ private boolean containsUnionAll(LogicalCTEProducer<?> producer) {
@Override
public Plan visitLogicalCTEAnchor(LogicalCTEAnchor<? extends Plan, ? extends Plan> cteAnchor, Void context) {
CTEId cteId = cteAnchor.getCteId();
if (cteProducers.containsKey(cteId)) {
// Inline: skip anchor and producer, process the right (consumer) subtree
if (cteProducers.containsKey(cteId) || cteIdsToRemove.contains(cteId)) {
// Inline or remove: skip anchor and producer, process the right (consumer) subtree
return cteAnchor.right().accept(this, null);
} else {
// Force materialize: keep the structure, only process the right subtree
// Keep the structure and continue trimming nested CTEs in both children.
Plan left = cteAnchor.left().accept(this, null);
Plan right = cteAnchor.right().accept(this, null);
return cteAnchor.withChildren(cteAnchor.left(), right);
return cteAnchor.withChildren(left, right);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2552,7 +2552,7 @@ public static boolean isEagerAggregationOnJoin() {
@VarAttrDef.VarAttr(name = ENABLE_ORDERED_SCAN_RANGE_LOCATIONS)
public boolean enableOrderedScanRangeLocations = false;

@VarAttrDef.VarAttr(name = CTE_INLINE_MODE, alias = "cbo_cte_inline_mode", description = {
@VarAttrDef.VarAttr(name = CTE_INLINE_MODE, description = {
"CTE内联模式。<0:禁用; =0:仅当CTE体含UNION ALL且filter可消除部分分支时内联; >=1:CBO比较物化与内联",
"CTE inline mode. <0: disable; =0: only inline when CTE body contains UNION ALL "
+ "and consumer filters can eliminate some union branches; "
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_cbo_cte_inline_prune") {
sql "DROP TABLE IF EXISTS cte_cbo_inline_tbl"
sql """
CREATE TABLE cte_cbo_inline_tbl (
id INT,
val INT
) ENGINE=OLAP
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES ("replication_num" = "1")
"""
sql "INSERT INTO cte_cbo_inline_tbl VALUES (1, 10), (2, 20), (3, 30)"

sql "SET cte_inline_mode=1"
sql "SET inline_cte_referenced_threshold=1"

explain {
sql """
shape plan
WITH cte_base AS (
SELECT id, val, 1 AS tag FROM cte_cbo_inline_tbl
)
SELECT * FROM cte_base WHERE tag = 2
UNION ALL
SELECT * FROM cte_base WHERE tag = 3
"""
contains("PhysicalEmptyRelation")
notContains("PhysicalCteProducer")
}

explain {
sql """
shape plan
WITH cte_base AS (
SELECT id, val, 1 AS tag FROM cte_cbo_inline_tbl
),
cte_keep AS (
SELECT id, val FROM cte_base WHERE tag = 1
),
cte_drop AS (
SELECT id, val FROM cte_base WHERE tag = 2
)
SELECT * FROM cte_keep
UNION ALL
SELECT * FROM cte_keep
UNION ALL
SELECT * FROM cte_drop WHERE 1 = 0
"""
multiContains("PhysicalCteProducer", 0)
}
}
Loading