Skip to content

[fix](streaming-job) fix filteredRows always 0 on single-table S3 streaming#62816

Open
JNSimba wants to merge 1 commit intoapache:masterfrom
JNSimba:fix/streaming-filtered-rows-single-table
Open

[fix](streaming-job) fix filteredRows always 0 on single-table S3 streaming#62816
JNSimba wants to merge 1 commit intoapache:masterfrom
JNSimba:fix/streaming-filtered-rows-single-table

Conversation

@JNSimba
Copy link
Copy Markdown
Member

@JNSimba JNSimba commented Apr 24, 2026

What problem does this PR solve?

Problem Summary:

For single-table S3 streaming insert jobs running under session.enable_insert_strict=false + session.insert_max_filter_ratio>0, BE correctly filters bad rows but jobStatistic.filteredRows stays at 0 in the jobs("type"="insert") view. The issue reproduces in the live path (before any restart) and also after FE EditLog replay, because the whole commit chain never carried filteredRows end-to-end for the txn path.

Root cause — the single-table txn commit pipeline has no filteredRows channel:

  1. StreamingTaskCommitAttachmentPB has no filtered_rows field (only scanned_rows / load_bytes / num_files / file_bytes).
  2. LoadStatistic has no filteredRows field; the value only lives as a local int in AbstractInsertExecutor for strict-mode / insert_max_filter_ratio checks and is never pushed anywhere persistent.
  3. StreamingInsertJob.beforeCommitted() builds the attachment from loadStatistic.get*() — so there is nothing to read even if the attachment class had a field.
  4. updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment, boolean) and updateCloudJobStatisticAndOffset() accumulate every other stat but skip filteredRows.

The multi-table CDC non-txn path (CommitOffsetRequestupdateNoTxnJobStatisticAndOffset()) already accumulates filteredRows into nonTxnJobStatistic correctly; the single-table txn path needed the same wiring.

Fix

Thread filteredRows along the same channel scanned/loadBytes/fileNumber/fileSize already use:

  1. cloud.proto: add optional int64 filtered_rows = 7 to StreamingTaskCommitAttachmentPB.
  2. StreamingTaskTxnCommitAttachment: add filteredRows field (@SerializedName("fr")), extend full-args constructor, read from PB in the PB constructor, include in toString().
  3. TxnUtil.streamingTaskTxnCommitAttachmentToPb: populate the new PB field.
  4. LoadStatistic: add filteredRows field + getter/setter; surface it in toJson() so the loadStatistic column of jobs("type"="insert") shows it too.
  5. AbstractInsertExecutor.execImpl: after reading filteredRows from coordinator.getLoadCounters(), persist it into insertLoadJob.getLoadStatistic().setFilteredRows(...) (symmetric to how BrokerLoadJob pushes DPP_ABNORMAL_ALL into its own job state).
  6. StreamingInsertJob.beforeCommitted: pass loadStatistic.getFilteredRows() into the new attachment constructor arg.
  7. StreamingInsertJob.updateJobStatisticAndOffset(attachment, isReplay) — live + FE EditLog replay accumulate: jobStatistic.setFilteredRows(old + attachment.getFilteredRows()).
  8. StreamingInsertJob.updateCloudJobStatisticAndOffset — cloud MS replay overwrite: jobStatistic.setFilteredRows(attachment.getFilteredRows()), matching the existing latest-snapshot semantics of that method.

After the fix all three read paths (live accumulate, replayOnCommitted, replayOnCloudMode) see the same PB field, so filteredRows is correct whether BE or FE is restarted.

Added regression test test_streaming_insert_job_filtered_rows: loads example_[0-1].csv into a table with c2 INT NOT NULL. Non-parseable name strings on a NOT NULL int column force every row to be filtered (mirrors the age int NOT NULL + 'abc' pattern from test_streaming_mysql_job_errormsg). Asserts scannedRows=20, filteredRows=20, fileNumber=2, and the target table ends up empty. Before this fix the test fails at filteredRows == 20 (observed 0); after the fix it passes.

Release note

Fix filteredRows always reported as 0 in jobs("type"="insert") for single-table S3 streaming insert jobs under enable_insert_strict=false + insert_max_filter_ratio>0. The filter count is now propagated from BE through the txn commit attachment into job statistics, and survives FE EditLog replay and cloud meta-service round-trip.

Check List (For Author)

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:
      • This is a refactor/code format and no logic has been changed.
      • Previous test can cover this change.
      • No code files have been changed.
      • Other reason
  • Behavior changed:

    • No.
    • Yes. jobs("type"="insert").jobStatistic.filteredRows (and loadStatistic.FilteredRows) now report the actual number of rows filtered by BE on the single-table streaming commit path, instead of always 0.
  • Does this need documentation?

    • No.
    • Yes.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

…xn commit

For single-table S3 streaming insert jobs running with
enable_insert_strict=false + insert_max_filter_ratio>0, BE correctly
filters bad rows but jobStatistic.filteredRows stays at 0 because the
txn commit path had no channel for filteredRows end-to-end:

- StreamingTaskCommitAttachmentPB has no filtered_rows field.
- LoadStatistic has no filteredRows field; the value only lives as a
  local int in AbstractInsertExecutor for strict/ratio checks.
- beforeCommitted() builds the attachment from loadStatistic.get*() so
  there is nothing to read.
- updateJobStatisticAndOffset() / updateCloudJobStatisticAndOffset()
  accumulate every other stat but skip filteredRows.

The multi-table CDC non-txn path via CommitOffsetRequest already does
the symmetric accumulate into nonTxnJobStatistic correctly; this PR
wires the same channel for the single-table txn path:

1. cloud.proto: add optional int64 filtered_rows = 7 to
   StreamingTaskCommitAttachmentPB.
2. StreamingTaskTxnCommitAttachment: add filteredRows field
   (@SerializedName("fr")), extend full-args constructor, read from PB
   in PB constructor, include in toString().
3. TxnUtil.streamingTaskTxnCommitAttachmentToPb: populate the new PB
   field.
4. LoadStatistic: add filteredRows field + getter/setter; expose it in
   toJson() so the loadStatistic column surfaces it too.
5. AbstractInsertExecutor.execImpl: after reading filteredRows from
   coordinator.getLoadCounters(), persist it into
   insertLoadJob.loadStatistic (symmetric to how BrokerLoadJob pushes
   DPP_ABNORMAL_ALL into its own job state).
6. StreamingInsertJob.beforeCommitted: pass loadStatistic.getFilteredRows()
   into the new attachment constructor arg.
7. StreamingInsertJob.updateJobStatisticAndOffset: accumulate
   jobStatistic.setFilteredRows(old + attachment.getFilteredRows()).
8. StreamingInsertJob.updateCloudJobStatisticAndOffset: overwrite
   jobStatistic.setFilteredRows(attachment.getFilteredRows()) to match
   the existing latest-snapshot semantics of that method.

After this fix, filteredRows is correct for live accumulation, FE
EditLog replay (replayOnCommitted) and cloud MS replay
(replayOnCloudMode), all three paths reading the same PB.

Added regression test test_streaming_insert_job_filtered_rows which
loads example_[0-1].csv into a table with c2 INT NOT NULL (non-parseable
names force every row to be filtered, mirroring the pattern in
test_streaming_mysql_job_errormsg), and asserts
scannedRows=20, filteredRows=20, fileNumber=2 and an empty table.
@Thearas
Copy link
Copy Markdown
Contributor

Thearas commented Apr 24, 2026

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants