diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/RowAdapter.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/RowAdapter.java index 874529bbe1..8a53f7fcc0 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/RowAdapter.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/RowAdapter.java @@ -35,6 +35,8 @@ public interface RowAdapter { ByteString getKey(RowT row); + default void onLargeRow(ByteString rowKey) {} + /** * A SAX style row factory. It is responsible for creating two types of rows: standard data rows * and special marker rows. Marker rows are emitted when skipping lots of rows due to filters. The diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java index 93b6b548dd..8d36d1b088 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeReadRowsResumptionStrategy.java @@ -91,6 +91,7 @@ public RowT processResponse(RowT response) { public Throwable processError(Throwable throwable) { ByteString rowKeyExtracted = extractLargeRowKey(throwable); if (rowKeyExtracted != null) { + rowAdapter.onLargeRow(rowKeyExtracted); LOGGER.warning("skipping large row " + rowKeyExtracted); this.largeRowKey = rowKeyExtracted; numProcessed = numProcessed + 1; diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeRowPaginator.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeRowPaginator.java new file mode 100644 index 0000000000..fdc580fb06 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeRowPaginator.java @@ -0,0 +1,84 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.readrows; + +import com.google.api.core.InternalApi; +import com.google.cloud.bigtable.data.v2.models.Filters; +import javax.annotation.Nullable; + +/** + * A paginator for fetching large rows from Bigtable chunk by chunk to avoid the 256MB size limit. + * It yields Filters that chunk the row by cell limits and handles limit halving if + * FAILED_PRECONDITION occurs. + */ +@InternalApi("For internal usage only") +public class LargeRowPaginator { + private int currentLimit; + private int currentOffset; + private boolean hasMore; + @Nullable private final Filters.Filter baseFilter; + + public LargeRowPaginator(int initialLimit, @Nullable Filters.Filter filter) { + this.currentLimit = initialLimit; + this.currentOffset = 0; + this.hasMore = true; + this.baseFilter = filter; + } + + /** Yields the filter required to fetch the next chunk of cells for the large row. */ + public Filters.Filter getNextFilter() { + Filters.ChainFilter chain = Filters.FILTERS.chain(); + if (baseFilter != null) { + chain.filter(baseFilter); + } + if (currentOffset > 0) { + chain.filter(Filters.FILTERS.offset().cellsPerRow(currentOffset)); + } + chain.filter(Filters.FILTERS.limit().cellsPerRow(currentLimit)); + return chain; + } + + /** + * Advances the internal offset. Call this after a successful Bigtable API call. + * + * @param cellsReadInLastChunk The number of cells returned in the last chunk. + * @return true if there are potentially more cells to fetch. + */ + public boolean advance(int cellsReadInLastChunk) { + this.currentOffset += cellsReadInLastChunk; + + // If we read fewer cells than requested, we've hit the end of the row. + if (cellsReadInLastChunk < currentLimit) { + this.hasMore = false; + } + return this.hasMore; + } + + /** + * Call this if the Bigtable API call fails with a FAILED_PRECONDITION due to size limits. It + * reduces the batch size to fetch a smaller chunk on the next attempt. + */ + public void halveLimit() { + this.currentLimit /= 2; + if (this.currentLimit == 0) { + throw new RuntimeException("Cannot divide limit further. A single cell might be too large."); + } + } + + public boolean hasNext() { + return this.hasMore; + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeRowPaginatorTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeRowPaginatorTest.java new file mode 100644 index 0000000000..da5e14673b --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/LargeRowPaginatorTest.java @@ -0,0 +1,86 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.readrows; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; + +import com.google.cloud.bigtable.data.v2.models.Filters; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class LargeRowPaginatorTest { + + @Test + public void testPaginatorAdvancesProperly() { + LargeRowPaginator paginator = new LargeRowPaginator(10, null); + + assertThat(paginator.hasNext()).isTrue(); + + // Simulate reading exactly the limit (10 cells). Paginator assumes more data exists. + boolean hasMore = paginator.advance(10); + assertThat(hasMore).isTrue(); + assertThat(paginator.hasNext()).isTrue(); + + // Simulate reading 5 cells (less than limit, indicating end of row) + hasMore = paginator.advance(5); + assertThat(hasMore).isFalse(); + assertThat(paginator.hasNext()).isFalse(); + } + + @Test + public void testPaginatorHalvesLimit() { + LargeRowPaginator paginator = new LargeRowPaginator(10, null); + + paginator.halveLimit(); // Internal limit becomes 5 + + // Simulate reading exactly the new limit (5 cells) + boolean hasMore = paginator.advance(5); + assertThat(hasMore).isTrue(); + + paginator.halveLimit(); // Internal limit becomes 2 + + // Simulate reading 1 cell (less than the new limit of 2) + hasMore = paginator.advance(1); + assertThat(hasMore).isFalse(); + } + + @Test + public void testPaginatorThrowsOnZeroLimit() { + LargeRowPaginator paginator = new LargeRowPaginator(1, null); + + RuntimeException exception = assertThrows(RuntimeException.class, () -> paginator.halveLimit()); + assertThat(exception).hasMessageThat().contains("Cannot divide limit further"); + } + + @Test + public void testPaginatorWithBaseFilter() { + LargeRowPaginator paginator = + new LargeRowPaginator(10, Filters.FILTERS.family().exactMatch("cf")); + + Filters.Filter nextFilter = paginator.getNextFilter(); + + Filters.Filter expectedFilter = + Filters.FILTERS + .chain() + .filter(Filters.FILTERS.family().exactMatch("cf")) + .filter(Filters.FILTERS.limit().cellsPerRow(10)); + + assertThat(nextFilter.toProto()).isEqualTo(expectedFilter.toProto()); + } +}