From a5dbc42dbda5f68c1f25b3100290d249d3d713c5 Mon Sep 17 00:00:00 2001 From: Matt Byrd Date: Tue, 11 Mar 2025 16:54:13 -0700 Subject: [PATCH 1/2] work in progress plumbing tables through the interface into rebuild --- .../apache/cassandra/dht/BootStrapper.java | 3 ++- .../apache/cassandra/dht/RangeStreamer.java | 17 ++++++++++---- .../org/apache/cassandra/service/Rebuild.java | 16 ++++++++++---- .../cassandra/service/StorageService.java | 7 +++++- .../service/StorageServiceMBean.java | 15 +++++++++++++ .../org/apache/cassandra/tools/NodeProbe.java | 4 ++-- .../cassandra/tools/nodetool/Rebuild.java | 10 ++++++++- .../test/streaming/RebuildStreamingTest.java | 22 ++++++++++++++----- .../cassandra/dht/BootStrapperTest.java | 3 ++- 9 files changed, 77 insertions(+), 20 deletions(-) diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java index 940d7b1370d2..67858e7b28d5 100644 --- a/src/java/org/apache/cassandra/dht/BootStrapper.java +++ b/src/java/org/apache/cassandra/dht/BootStrapper.java @@ -128,7 +128,8 @@ public Future bootstrap(StreamStateStore stateStore, boolean useStr DatabaseDescriptor.getStreamingConnectionsPerHost(), movements, strictMovements, - true); + true, + null); if (beingReplaced != null) streamer.addSourceFilter(new RangeStreamer.ExcludedSourcesFilter(Collections.singleton(beingReplaced))); diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index cbdb42e60fdd..28ecc1f8a0ff 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -18,6 +18,7 @@ package org.apache.cassandra.dht; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -103,6 +104,7 @@ public class RangeStreamer private final MovementMap movements; private final MovementMap strictMovements; private final boolean excludeAccordTables; + private final Set tables; public static class FetchReplica { @@ -304,10 +306,11 @@ public RangeStreamer(ClusterMetadata metadata, int connectionsPerHost, MovementMap movements, MovementMap strictMovements, - boolean excludeAccordTables) + boolean excludeAccordTables, + Set tables) { this(metadata, streamOperation, useStrictConsistency, proximity, stateStore, - FailureDetector.instance, connectSequentially, connectionsPerHost, movements, strictMovements, excludeAccordTables); + FailureDetector.instance, connectSequentially, connectionsPerHost, movements, strictMovements, excludeAccordTables, tables); } RangeStreamer(ClusterMetadata metadata, @@ -320,7 +323,8 @@ public RangeStreamer(ClusterMetadata metadata, int connectionsPerHost, MovementMap movements, MovementMap strictMovements, - boolean excludeAccordTables) + boolean excludeAccordTables, + Set tables) { this.excludeAccordTables = excludeAccordTables; Preconditions.checkArgument(streamOperation == StreamOperation.BOOTSTRAP || streamOperation == StreamOperation.REBUILD, streamOperation); @@ -333,6 +337,7 @@ public RangeStreamer(ClusterMetadata metadata, this.movements = movements; this.strictMovements = strictMovements; streamPlan.listeners(this.stateStore); + this.tables = tables; // We're _always_ filtering out a local node and down sources addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(failureDetector)); @@ -772,11 +777,15 @@ public StreamResultFuture fetchAsync() { String[] cfNames = StreamPlan.nonAccordTablesForKeyspace(ksm); if (cfNames != null) + { + cfNames = Arrays.stream(cfNames).filter(table -> tables == null || tables.contains(table)).toArray(String[]::new); streamPlan.requestRanges(source, keyspace, full, transientReplicas, cfNames); + } } else { - streamPlan.requestRanges(source, keyspace, full, transientReplicas); + String[] cfNames = tables == null ? new String[0] : tables.toArray(String[]::new); + streamPlan.requestRanges(source, keyspace, full, transientReplicas, cfNames); } }); }); diff --git a/src/java/org/apache/cassandra/service/Rebuild.java b/src/java/org/apache/cassandra/service/Rebuild.java index 7c28960b96bf..2c34ad7d69d3 100644 --- a/src/java/org/apache/cassandra/service/Rebuild.java +++ b/src/java/org/apache/cassandra/service/Rebuild.java @@ -72,7 +72,7 @@ public static void unsafeResetRebuilding() isRebuilding.set(false); } - public static void rebuild(String sourceDc, String keyspace, String tokens, String specificSources, boolean excludeLocalDatacenterNodes) + public static void rebuild(String sourceDc, String keyspace, String tokens, Set tables, String specificSources, boolean excludeLocalDatacenterNodes) { // check ongoing rebuild if (!isRebuilding.compareAndSet(false, true)) @@ -100,9 +100,16 @@ public static void rebuild(String sourceDc, String keyspace, String tokens, Stri throw new IllegalArgumentException("Cannot specify tokens without keyspace."); } - logger.info("rebuild from dc: {}, {}, {}", sourceDc == null ? "(any dc)" : sourceDc, + // check the arguments + if (keyspace == null && (tables != null && !tables.isEmpty())) + { + throw new IllegalArgumentException("Cannot specify tables without keyspace."); + } + + logger.info("rebuild from dc: {}, {}, {}, {}", sourceDc == null ? "(any dc)" : sourceDc, keyspace == null ? "(All keyspaces)" : keyspace, - tokens == null ? "(All tokens)" : tokens); + tokens == null ? "(All tokens)" : tokens, + tables == null || tables.isEmpty() ? "(All tables)" : tables); StorageService.instance.repairPaxosForTopologyChange("rebuild"); ClusterMetadata metadata = ClusterMetadata.current(); @@ -117,7 +124,8 @@ public static void rebuild(String sourceDc, String keyspace, String tokens, Stri DatabaseDescriptor.getStreamingConnectionsPerHost(), rebuildMovements, null, - true); + true, + tables); if (sourceDc != null) streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(metadata.locator, sourceDc)); diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 2446dd7fa866..deaeb4eff2e5 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1185,7 +1185,12 @@ public void rebuild(String sourceDc, String keyspace, String tokens, String spec public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources, boolean excludeLocalDatacenterNodes) { - Rebuild.rebuild(sourceDc, keyspace, tokens, specificSources, excludeLocalDatacenterNodes); + Rebuild.rebuild(sourceDc, keyspace, tokens, null, specificSources, excludeLocalDatacenterNodes); + } + + public void rebuild(String sourceDc, String keyspace, String tokens, Set tables, String specificSources, boolean excludeLocalDatacenterNodes) + { + Rebuild.rebuild(sourceDc, keyspace, tokens, tables, specificSources, excludeLocalDatacenterNodes); } public void setRpcTimeout(long value) diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index e6f7a18c8eb5..d9cc29477b51 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -944,6 +944,21 @@ default int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, */ public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources, boolean excludeLocalDatacenterNodes); + + /** + * Same as {@link #rebuild(String)}, but only for specified keyspace and ranges. It excludes local data center nodes + * + * @param sourceDc Name of DC from which to select sources for streaming or null to pick any node + * @param keyspace Name of the keyspace which to rebuild or null to rebuild all keyspaces. + * @param tables Name of tables to rebuild or null/empty set to rebuild all tables. + * @param tokens Range of tokens to rebuild or null to rebuild all token ranges. In the format of: + * "(start_token_1,end_token_1],(start_token_2,end_token_2],...(start_token_n,end_token_n]" + * @param specificSources list of sources that can be used for rebuilding. Mostly other nodes in the cluster. + * @param excludeLocalDatacenterNodes Flag to indicate whether local data center nodes should be excluded as sources for streaming. + */ + public void rebuild(String sourceDc, String keyspace, String tokens, Set tables, String specificSources, boolean excludeLocalDatacenterNodes); + + /** Starts a bulk load and blocks until it completes. */ public void bulkLoad(String directory); diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 353324e0eadd..12da86dc5cbd 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -1826,9 +1826,9 @@ public List describeRing(String keyspaceName, boolean withPort) throws I return withPort ? ssProxy.describeRingWithPortJMX(keyspaceName) : ssProxy.describeRingJMX(keyspaceName); } - public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources, boolean excludeLocalDatacenterNodes) + public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources, boolean excludeLocalDatacenterNodes, Set tables) { - ssProxy.rebuild(sourceDc, keyspace, tokens, specificSources, excludeLocalDatacenterNodes); + ssProxy.rebuild(sourceDc, keyspace, tokens, tables, specificSources, excludeLocalDatacenterNodes); } public List sampleKeyRange() diff --git a/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java b/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java index 326e7e968c1d..18585f9fb98f 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java @@ -17,6 +17,9 @@ */ package org.apache.cassandra.tools.nodetool; +import java.util.HashSet; +import java.util.Set; + import org.apache.cassandra.tools.NodeProbe; import picocli.CommandLine.Command; @@ -52,6 +55,11 @@ public class Rebuild extends AbstractCommand description = "Use --exclude-local-dc to exclude nodes in local data center as source for streaming.") private boolean excludeLocalDatacenterNodes = false; + @Option(paramLabel = "specific_tables", + names = {"-tb", "--table"}, + description = "Use -tb to scope the rebuild to particular table") + private Set tables = new HashSet<>(); + @Override public void execute(NodeProbe probe) { @@ -61,6 +69,6 @@ public void execute(NodeProbe probe) throw new IllegalArgumentException("Cannot specify tokens without keyspace."); } - probe.rebuild(sourceDataCenterName, keyspace, tokens, specificSources, excludeLocalDatacenterNodes); + probe.rebuild(sourceDataCenterName, keyspace, tokens, specificSources, excludeLocalDatacenterNodes, tables); } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java b/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java index a26e8ac9299e..ca6fd4fa8374 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java @@ -39,20 +39,27 @@ public class RebuildStreamingTest extends TestBaseImpl private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]); // zero copy streaming sends all components, so the events will include non-Data files as well private static final int NUM_COMPONENTS = 7; + private String[] args; @Test public void zeroCopy() throws IOException { - test(true); + test(true, false); + } + + @Test + public void specifyTable() throws IOException + { + test(true, true); } @Test public void notZeroCopy() throws IOException { - test(false); + test(false, false); } - private void test(boolean zeroCopyStreaming) throws IOException + private void test(boolean zeroCopyStreaming, boolean specifyTable) throws IOException { try (Cluster cluster = init(Cluster.build(2) .withConfig(c -> c.with(Feature.values()) @@ -61,20 +68,23 @@ private void test(boolean zeroCopyStreaming) throws IOException { // streaming sends events every 65k, so need to make sure that the files are larger than this to hit // all cases of the vtable - cluster.schemaChange(withKeyspace("CREATE TABLE %s.users (user_id varchar, spacing blob, PRIMARY KEY (user_id)) WITH compression = { 'enabled' : false };")); + String tableName = "users"; + cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " (user_id varchar, spacing blob, PRIMARY KEY (user_id)) WITH compression = { 'enabled' : false };")); cluster.stream().forEach(i -> i.nodetoolResult("disableautocompaction", KEYSPACE).asserts().success()); IInvokableInstance first = cluster.get(1); IInvokableInstance second = cluster.get(2); long expectedFiles = 10; for (int i = 0; i < expectedFiles; i++) { - first.executeInternal(withKeyspace("insert into %s.users(user_id, spacing) values (?, ? )"), "dcapwell" + i, BLOB); + first.executeInternal(withKeyspace("insert into %s." + tableName + " (user_id, spacing) values (?, ? )"), "dcapwell" + i, BLOB); first.flush(KEYSPACE); } if (zeroCopyStreaming) // will include all components so need to account for expectedFiles *= NUM_COMPONENTS; - second.nodetoolResult("rebuild", "--keyspace", KEYSPACE).asserts().success(); + args = specifyTable ? new String[]{ "rebuild", "--keyspace", KEYSPACE, "--table", tableName } : + new String[]{ "rebuild", "--keyspace", KEYSPACE }; + second.nodetoolResult(args).asserts().success(); SimpleQueryResult qr = first.executeInternalWithResult("SELECT * FROM system_views.streaming"); String txt = QueryResultUtil.expand(qr); diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java index 32e040f965e3..c13b91bf85e2 100644 --- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java +++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java @@ -226,7 +226,8 @@ private RangeStreamer getRangeStreamer() throws UnknownHostException 1, movements.left, movements.right, - true); + true, + null); } private boolean includesWraparound(Collection> toFetch) From fdb4c87b7a10ff76d2c6fa61ee8c09dc0670067f Mon Sep 17 00:00:00 2001 From: Matt Byrd Date: Sun, 7 Jun 2026 19:20:51 +0100 Subject: [PATCH 2/2] fix NodetoolHelpCommandsOutputTest test --- test/resources/nodetool/help/rebuild | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/resources/nodetool/help/rebuild b/test/resources/nodetool/help/rebuild index 8a67f6477b51..d49a93c4e8dc 100644 --- a/test/resources/nodetool/help/rebuild +++ b/test/resources/nodetool/help/rebuild @@ -9,6 +9,7 @@ SYNOPSIS [(-u | --username )] rebuild [--exclude-local-dc] [(-ks | --keyspace )] [(-s | --sources )] + [(-tb | --table )...] [(-ts | --tokens )] [--] @@ -37,6 +38,9 @@ OPTIONS is used. Multiple hosts should be separated using commas (e.g. 127.0.0.1,127.0.0.2,...) + -tb , --table + Use -tb to scope the rebuild to particular table or set of tables + -ts , --tokens Use -ts to rebuild specific token ranges, in the format of "(start_token_1,end_token_1],(start_token_2,end_token_2],...