Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions be/src/exec/runtime_filter/runtime_filter_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "exec/runtime_filter/runtime_filter_consumer.h"

#include "exec/runtime_filter/runtime_filter_selectivity.h"
#include "exprs/minmax_predicate.h"
#include "exprs/vbitmap_predicate.h"
#include "exprs/vbloom_predicate.h"
Expand Down Expand Up @@ -83,11 +84,11 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
auto real_filter_type = _wrapper->get_real_type();
bool null_aware = _wrapper->contain_null();

// Set sampling frequency based on disable_always_true_logic status
// Determine sampling frequency for the always_true optimization.
// This will be propagated to VExprContext in VRuntimeFilterWrapper::open().
int sampling_frequency = _wrapper->disable_always_true_logic()
? RuntimeFilterSelectivity::DISABLE_SAMPLING
: config::runtime_filter_sampling_frequency;
probe_ctx->get_runtime_filter_selectivity().set_sampling_frequency(sampling_frequency);

switch (real_filter_type) {
case RuntimeFilterType::IN_FILTER: {
Expand All @@ -105,7 +106,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
in_pred->add_child(probe_ctx->root());
auto wrapper = VRuntimeFilterWrapper::create_shared(
node, in_pred, get_in_list_ignore_thredhold(_wrapper->hybrid_set()->size()),
null_aware, _wrapper->filter_id());
null_aware, _wrapper->filter_id(), sampling_frequency);
container.push_back(wrapper);
break;
}
Expand All @@ -123,7 +124,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
DCHECK(null_aware == false) << "only min predicate do not support null aware";
container.push_back(VRuntimeFilterWrapper::create_shared(
min_pred_node, min_pred, get_comparison_ignore_thredhold(), null_aware,
_wrapper->filter_id()));
_wrapper->filter_id(), sampling_frequency));
break;
}
case RuntimeFilterType::MAX_FILTER: {
Expand All @@ -140,7 +141,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
DCHECK(null_aware == false) << "only max predicate do not support null aware";
container.push_back(VRuntimeFilterWrapper::create_shared(
max_pred_node, max_pred, get_comparison_ignore_thredhold(), null_aware,
_wrapper->filter_id()));
_wrapper->filter_id(), sampling_frequency));
break;
}
case RuntimeFilterType::MINMAX_FILTER: {
Expand All @@ -156,7 +157,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
max_pred->add_child(max_literal);
container.push_back(VRuntimeFilterWrapper::create_shared(
max_pred_node, max_pred, get_comparison_ignore_thredhold(), null_aware,
_wrapper->filter_id()));
_wrapper->filter_id(), sampling_frequency));

VExprContextSPtr new_probe_ctx;
RETURN_IF_ERROR(VExpr::create_expr_tree(probe_expr, new_probe_ctx));
Expand All @@ -173,7 +174,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
min_pred->add_child(min_literal);
container.push_back(VRuntimeFilterWrapper::create_shared(
min_pred_node, min_pred, get_comparison_ignore_thredhold(), null_aware,
_wrapper->filter_id()));
_wrapper->filter_id(), sampling_frequency));
break;
}
case RuntimeFilterType::BLOOM_FILTER: {
Expand All @@ -188,9 +189,9 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
auto bloom_pred = VBloomPredicate::create_shared(node);
bloom_pred->set_filter(_wrapper->bloom_filter_func());
bloom_pred->add_child(probe_ctx->root());
auto wrapper = VRuntimeFilterWrapper::create_shared(node, bloom_pred,
get_bloom_filter_ignore_thredhold(),
null_aware, _wrapper->filter_id());
auto wrapper = VRuntimeFilterWrapper::create_shared(
node, bloom_pred, get_bloom_filter_ignore_thredhold(), null_aware,
_wrapper->filter_id(), sampling_frequency);
container.push_back(wrapper);
break;
}
Expand All @@ -207,8 +208,8 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
bitmap_pred->set_filter(_wrapper->bitmap_filter_func());
bitmap_pred->add_child(probe_ctx->root());
DCHECK(null_aware == false) << "bitmap predicate do not support null aware";
auto wrapper = VRuntimeFilterWrapper::create_shared(node, bitmap_pred, 0, null_aware,
_wrapper->filter_id());
auto wrapper = VRuntimeFilterWrapper::create_shared(
node, bitmap_pred, 0, null_aware, _wrapper->filter_id(), sampling_frequency);
container.push_back(wrapper);
break;
}
Expand Down
6 changes: 4 additions & 2 deletions be/src/exprs/vruntimefilter_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ class VExprContext;

VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExprSPtr impl,
double ignore_thredhold, bool null_aware,
int filter_id)
int filter_id, int sampling_frequency)
: VExpr(node),
_impl(std::move(impl)),
_ignore_thredhold(ignore_thredhold),
_null_aware(null_aware),
_filter_id(filter_id) {}
_filter_id(filter_id),
_sampling_frequency(sampling_frequency) {}

Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc,
VExprContext* context) {
Expand All @@ -76,6 +77,7 @@ Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
FunctionContext::FunctionStateScope scope) {
DCHECK(_prepare_finished);
RETURN_IF_ERROR(_impl->open(state, context, scope));
context->get_runtime_filter_selectivity().set_sampling_frequency(_sampling_frequency);
_open_finished = true;
return Status::OK();
}
Expand Down
5 changes: 4 additions & 1 deletion be/src/exprs/vruntimefilter_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include "common/config.h"
#include "common/status.h"
#include "exec/runtime_filter/runtime_filter_selectivity.h"
#include "exprs/function_context.h"
#include "exprs/vexpr.h"
#include "runtime/runtime_profile.h"
Expand All @@ -51,7 +52,8 @@ class VRuntimeFilterWrapper final : public VExpr {

public:
VRuntimeFilterWrapper(const TExprNode& node, VExprSPtr impl, double ignore_thredhold,
bool null_aware, int filter_id);
bool null_aware, int filter_id,
int sampling_frequency = RuntimeFilterSelectivity::DISABLE_SAMPLING);
~VRuntimeFilterWrapper() override = default;
Status execute_column(VExprContext* context, const Block* block, Selector* selector,
size_t count, ColumnPtr& result_column) const override;
Expand Down Expand Up @@ -126,6 +128,7 @@ class VRuntimeFilterWrapper final : public VExpr {
double _ignore_thredhold;
bool _null_aware;
int _filter_id;
int _sampling_frequency;
};

using VRuntimeFilterPtr = std::shared_ptr<VRuntimeFilterWrapper>;
Expand Down
46 changes: 46 additions & 0 deletions be/test/exec/runtime_filter/runtime_filter_selectivity_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,4 +228,50 @@ TEST_F(RuntimeFilterSelectivityTest, different_thresholds) {
}
}

// Regression test: with default sampling_frequency (-1), update_judge_counter()
// always resets because (_judge_counter++) >= -1 is always true.
// This was the root cause of the selectivity accumulation bug.
TEST_F(RuntimeFilterSelectivityTest, default_sampling_frequency_always_resets) {
RuntimeFilterSelectivity selectivity;
// Don't set sampling_frequency — defaults to DISABLE_SAMPLING (-1)

// Accumulate selectivity data: low filter rate -> should be always_true
selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
// With default -1, maybe_always_true_can_ignore returns false (disabled)
EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());

// Now call update_judge_counter — with -1, it immediately resets
selectivity.update_judge_counter();
// Verify: accumulated data has been wiped out by the reset
// Even after setting a valid sampling_frequency, the previously accumulated
// selectivity data is gone
selectivity.set_sampling_frequency(100);
// always_true was reset to false by the premature reset
EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
}

// Verify that setting sampling_frequency correctly prevents premature reset
TEST_F(RuntimeFilterSelectivityTest, proper_sampling_frequency_preserves_accumulation) {
RuntimeFilterSelectivity selectivity;
selectivity.set_sampling_frequency(32);

// Accumulate selectivity: low filter rate
selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());

// Counter increments don't reset before reaching sampling_frequency.
// Post-increment semantics: check uses old value, so need 33 calls total
// to trigger reset (counter must reach 32 before comparison fires).
for (int i = 0; i < 32; i++) {
selectivity.update_judge_counter();
}
// Still always_true because counter value 31 was compared last (31 >= 32 → false)
EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());

// 33rd call: counter=32, 32 >= 32 → true → triggers reset
selectivity.update_judge_counter();
// After reset, needs re-evaluation
EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
}

} // namespace doris
181 changes: 181 additions & 0 deletions be/test/exec/runtime_filter/vruntimefilter_wrapper_sampling_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <glog/logging.h>
#include <gtest/gtest.h>

#include "exec/runtime_filter/runtime_filter_selectivity.h"
#include "exec/runtime_filter/runtime_filter_test_utils.h"
#include "exprs/vexpr_context.h"
#include "exprs/vruntimefilter_wrapper.h"

namespace doris {

// Minimal VExpr implementation for testing VRuntimeFilterWrapper in isolation.
class StubVExpr : public VExpr {
public:
StubVExpr() : VExpr(make_texpr_node()) {}

const std::string& expr_name() const override {
static const std::string name = "StubVExpr";
return name;
}

Status execute(VExprContext*, Block*, int*) const override { return Status::OK(); }

Status execute_column(VExprContext*, const Block*, Selector*, size_t,
ColumnPtr&) const override {
return Status::OK();
}

// SLOT_REF is not a constant — without this override, VExpr::is_constant()
// returns true for a leaf node (no children), causing get_const_col() to
// DCHECK-fail on the second open() call.
bool is_constant() const override { return false; }

private:
static TExprNode make_texpr_node() {
return TExprNodeBuilder(TExprNodeType::SLOT_REF,
TTypeDescBuilder()
.set_types(TTypeNodeBuilder()
.set_type(TTypeNodeType::SCALAR)
.set_scalar_type(TPrimitiveType::INT)
.build())
.build(),
0)
.build();
}
};

class VRuntimeFilterWrapperSamplingTest : public RuntimeFilterTest {};

// Test that VRuntimeFilterWrapper stores and propagates sampling_frequency
// through open() to VExprContext. This is the core fix for the bug where
// sampling_frequency was lost when _append_rf_into_conjuncts creates a new
// VExprContext via VExprContext::create_shared(expr).
TEST_F(VRuntimeFilterWrapperSamplingTest, open_propagates_sampling_frequency) {
auto stub = std::make_shared<StubVExpr>();
auto node = TExprNodeBuilder(TExprNodeType::SLOT_REF,
TTypeDescBuilder()
.set_types(TTypeNodeBuilder()
.set_type(TTypeNodeType::SCALAR)
.set_scalar_type(TPrimitiveType::INT)
.build())
.build(),
0)
.build();

const int expected_frequency = 32;
auto wrapper = VRuntimeFilterWrapper::create_shared(node, stub, 0.4, false, /*filter_id=*/1,
expected_frequency);

// Simulate the VExprContext recreation that happens in _append_rf_into_conjuncts.
// A fresh VExprContext has default sampling_frequency = DISABLE_SAMPLING (-1).
auto context = std::make_shared<VExprContext>(wrapper);
ASSERT_EQ(context->get_runtime_filter_selectivity().maybe_always_true_can_ignore(), false);

RowDescriptor row_desc;
ASSERT_TRUE(wrapper->prepare(_runtime_states[0].get(), row_desc, context.get()).ok());
ASSERT_TRUE(
wrapper->open(_runtime_states[0].get(), context.get(), FunctionContext::FRAGMENT_LOCAL)
.ok());

// After open(), sampling_frequency should be propagated from VRuntimeFilterWrapper
// to VExprContext. Verify by accumulating low-selectivity data and checking
// that always_true can now be detected.
auto& selectivity = context->get_runtime_filter_selectivity();
selectivity.update_judge_selectivity(1, 2000, 50000, 0.1);
EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
}

// Test that default sampling_frequency (DISABLE_SAMPLING) disables the always_true
// optimization, matching the behavior when disable_always_true_logic is set.
TEST_F(VRuntimeFilterWrapperSamplingTest, default_sampling_frequency_disables_optimization) {
auto stub = std::make_shared<StubVExpr>();
auto node = TExprNodeBuilder(TExprNodeType::SLOT_REF,
TTypeDescBuilder()
.set_types(TTypeNodeBuilder()
.set_type(TTypeNodeType::SCALAR)
.set_scalar_type(TPrimitiveType::INT)
.build())
.build(),
0)
.build();

// No sampling_frequency argument - uses default DISABLE_SAMPLING
auto wrapper = VRuntimeFilterWrapper::create_shared(node, stub, 0.4, false, /*filter_id=*/1);

auto context = std::make_shared<VExprContext>(wrapper);
RowDescriptor row_desc;
ASSERT_TRUE(wrapper->prepare(_runtime_states[0].get(), row_desc, context.get()).ok());
ASSERT_TRUE(
wrapper->open(_runtime_states[0].get(), context.get(), FunctionContext::FRAGMENT_LOCAL)
.ok());

// Even with low-selectivity data, always_true should NOT be detected
// because sampling is disabled
auto& selectivity = context->get_runtime_filter_selectivity();
selectivity.update_judge_selectivity(1, 2000, 50000, 0.1);
EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
}

// Test that sampling_frequency survives VExprContext recreation, which is the
// exact scenario that caused the original bug.
TEST_F(VRuntimeFilterWrapperSamplingTest, sampling_frequency_survives_context_recreation) {
auto stub = std::make_shared<StubVExpr>();
auto node = TExprNodeBuilder(TExprNodeType::SLOT_REF,
TTypeDescBuilder()
.set_types(TTypeNodeBuilder()
.set_type(TTypeNodeType::SCALAR)
.set_scalar_type(TPrimitiveType::INT)
.build())
.build(),
0)
.build();

const int expected_frequency = 32;
auto wrapper = VRuntimeFilterWrapper::create_shared(node, stub, 0.4, false, /*filter_id=*/1,
expected_frequency);

// First context - prepare and open work
auto context1 = std::make_shared<VExprContext>(wrapper);
RowDescriptor row_desc;
ASSERT_TRUE(wrapper->prepare(_runtime_states[0].get(), row_desc, context1.get()).ok());
ASSERT_TRUE(
wrapper->open(_runtime_states[0].get(), context1.get(), FunctionContext::FRAGMENT_LOCAL)
.ok());

// Create a brand new non-clone VExprContext with the same VRuntimeFilterWrapper,
// matching the production path in _append_rf_into_conjuncts which calls
// VExprContext::create_shared(expr) then conjunct->prepare() and conjunct->open().
auto context2 = std::make_shared<VExprContext>(wrapper);
EXPECT_FALSE(context2->get_runtime_filter_selectivity().maybe_always_true_can_ignore());

// Drive the recreated context through prepare/open via VExprContext (not the
// wrapper directly), matching the production _append_rf_into_conjuncts lifecycle.
ASSERT_TRUE(context2->prepare(_runtime_states[0].get(), row_desc).ok());
ASSERT_TRUE(context2->open(_runtime_states[0].get()).ok());

// After open(), sampling_frequency should be propagated from VRuntimeFilterWrapper
// to context2. Verify by accumulating low-selectivity data and checking that
// always_true can be detected — this is the actual behavior the fix protects.
auto& selectivity = context2->get_runtime_filter_selectivity();
selectivity.update_judge_selectivity(1, 2000, 50000, 0.1);
EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
}

} // namespace doris
Loading