[fix](streaming-job) fix filteredRows always 0 on single-table S3 streaming#62816
Open
JNSimba wants to merge 1 commit intoapache:masterfrom
Open
[fix](streaming-job) fix filteredRows always 0 on single-table S3 streaming#62816JNSimba wants to merge 1 commit intoapache:masterfrom
JNSimba wants to merge 1 commit intoapache:masterfrom
Conversation
…xn commit
For single-table S3 streaming insert jobs running with
enable_insert_strict=false + insert_max_filter_ratio>0, BE correctly
filters bad rows but jobStatistic.filteredRows stays at 0 because the
txn commit path had no channel for filteredRows end-to-end:
- StreamingTaskCommitAttachmentPB has no filtered_rows field.
- LoadStatistic has no filteredRows field; the value only lives as a
local int in AbstractInsertExecutor for strict/ratio checks.
- beforeCommitted() builds the attachment from loadStatistic.get*() so
there is nothing to read.
- updateJobStatisticAndOffset() / updateCloudJobStatisticAndOffset()
accumulate every other stat but skip filteredRows.
The multi-table CDC non-txn path via CommitOffsetRequest already does
the symmetric accumulate into nonTxnJobStatistic correctly; this PR
wires the same channel for the single-table txn path:
1. cloud.proto: add optional int64 filtered_rows = 7 to
StreamingTaskCommitAttachmentPB.
2. StreamingTaskTxnCommitAttachment: add filteredRows field
(@SerializedName("fr")), extend full-args constructor, read from PB
in PB constructor, include in toString().
3. TxnUtil.streamingTaskTxnCommitAttachmentToPb: populate the new PB
field.
4. LoadStatistic: add filteredRows field + getter/setter; expose it in
toJson() so the loadStatistic column surfaces it too.
5. AbstractInsertExecutor.execImpl: after reading filteredRows from
coordinator.getLoadCounters(), persist it into
insertLoadJob.loadStatistic (symmetric to how BrokerLoadJob pushes
DPP_ABNORMAL_ALL into its own job state).
6. StreamingInsertJob.beforeCommitted: pass loadStatistic.getFilteredRows()
into the new attachment constructor arg.
7. StreamingInsertJob.updateJobStatisticAndOffset: accumulate
jobStatistic.setFilteredRows(old + attachment.getFilteredRows()).
8. StreamingInsertJob.updateCloudJobStatisticAndOffset: overwrite
jobStatistic.setFilteredRows(attachment.getFilteredRows()) to match
the existing latest-snapshot semantics of that method.
After this fix, filteredRows is correct for live accumulation, FE
EditLog replay (replayOnCommitted) and cloud MS replay
(replayOnCloudMode), all three paths reading the same PB.
Added regression test test_streaming_insert_job_filtered_rows which
loads example_[0-1].csv into a table with c2 INT NOT NULL (non-parseable
names force every row to be filtered, mirroring the pattern in
test_streaming_mysql_job_errormsg), and asserts
scannedRows=20, filteredRows=20, fileNumber=2 and an empty table.
Contributor
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What problem does this PR solve?
Problem Summary:
For single-table S3 streaming insert jobs running under
session.enable_insert_strict=false+session.insert_max_filter_ratio>0, BE correctly filters bad rows butjobStatistic.filteredRowsstays at0in thejobs("type"="insert")view. The issue reproduces in the live path (before any restart) and also after FE EditLog replay, because the whole commit chain never carriedfilteredRowsend-to-end for the txn path.Root cause — the single-table txn commit pipeline has no
filteredRowschannel:StreamingTaskCommitAttachmentPBhas nofiltered_rowsfield (onlyscanned_rows / load_bytes / num_files / file_bytes).LoadStatistichas nofilteredRowsfield; the value only lives as a localintinAbstractInsertExecutorfor strict-mode /insert_max_filter_ratiochecks and is never pushed anywhere persistent.StreamingInsertJob.beforeCommitted()builds the attachment fromloadStatistic.get*()— so there is nothing to read even if the attachment class had a field.updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment, boolean)andupdateCloudJobStatisticAndOffset()accumulate every other stat but skipfilteredRows.The multi-table CDC non-txn path (
CommitOffsetRequest→updateNoTxnJobStatisticAndOffset()) already accumulatesfilteredRowsintononTxnJobStatisticcorrectly; the single-table txn path needed the same wiring.Fix
Thread
filteredRowsalong the same channel scanned/loadBytes/fileNumber/fileSize already use:cloud.proto: addoptional int64 filtered_rows = 7toStreamingTaskCommitAttachmentPB.StreamingTaskTxnCommitAttachment: addfilteredRowsfield (@SerializedName("fr")), extend full-args constructor, read from PB in the PB constructor, include intoString().TxnUtil.streamingTaskTxnCommitAttachmentToPb: populate the new PB field.LoadStatistic: addfilteredRowsfield + getter/setter; surface it intoJson()so theloadStatisticcolumn ofjobs("type"="insert")shows it too.AbstractInsertExecutor.execImpl: after readingfilteredRowsfromcoordinator.getLoadCounters(), persist it intoinsertLoadJob.getLoadStatistic().setFilteredRows(...)(symmetric to howBrokerLoadJobpushesDPP_ABNORMAL_ALLinto its own job state).StreamingInsertJob.beforeCommitted: passloadStatistic.getFilteredRows()into the new attachment constructor arg.StreamingInsertJob.updateJobStatisticAndOffset(attachment, isReplay)— live + FE EditLog replay accumulate:jobStatistic.setFilteredRows(old + attachment.getFilteredRows()).StreamingInsertJob.updateCloudJobStatisticAndOffset— cloud MS replay overwrite:jobStatistic.setFilteredRows(attachment.getFilteredRows()), matching the existing latest-snapshot semantics of that method.After the fix all three read paths (live accumulate,
replayOnCommitted,replayOnCloudMode) see the same PB field, sofilteredRowsis correct whether BE or FE is restarted.Added regression test
test_streaming_insert_job_filtered_rows: loadsexample_[0-1].csvinto a table withc2 INT NOT NULL. Non-parseable name strings on a NOT NULL int column force every row to be filtered (mirrors theage int NOT NULL + 'abc'pattern fromtest_streaming_mysql_job_errormsg). AssertsscannedRows=20, filteredRows=20, fileNumber=2, and the target table ends up empty. Before this fix the test fails atfilteredRows == 20(observed0); after the fix it passes.Release note
Fix
filteredRowsalways reported as 0 injobs("type"="insert")for single-table S3 streaming insert jobs underenable_insert_strict=false+insert_max_filter_ratio>0. The filter count is now propagated from BE through the txn commit attachment into job statistics, and survives FE EditLog replay and cloud meta-service round-trip.Check List (For Author)
Test
Behavior changed:
jobs("type"="insert").jobStatistic.filteredRows(andloadStatistic.FilteredRows) now report the actual number of rows filtered by BE on the single-table streaming commit path, instead of always 0.Does this need documentation?
Check List (For Reviewer who merge this PR)