Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/dht/BootStrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ public Future<StreamState> bootstrap(StreamStateStore stateStore, boolean useStr
DatabaseDescriptor.getStreamingConnectionsPerHost(),
movements,
strictMovements,
true);
true,
null);

if (beingReplaced != null)
streamer.addSourceFilter(new RangeStreamer.ExcludedSourcesFilter(Collections.singleton(beingReplaced)));
Expand Down
17 changes: 13 additions & 4 deletions src/java/org/apache/cassandra/dht/RangeStreamer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.cassandra.dht;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -103,6 +104,7 @@ public class RangeStreamer
private final MovementMap movements;
private final MovementMap strictMovements;
private final boolean excludeAccordTables;
private final Set<String> tables;

public static class FetchReplica
{
Expand Down Expand Up @@ -304,10 +306,11 @@ public RangeStreamer(ClusterMetadata metadata,
int connectionsPerHost,
MovementMap movements,
MovementMap strictMovements,
boolean excludeAccordTables)
boolean excludeAccordTables,
Set<String> tables)

@smiklosovic smiklosovic Jun 10, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe annotated this with @Nullable or make it so that it works with empty set or similar

{
this(metadata, streamOperation, useStrictConsistency, proximity, stateStore,
FailureDetector.instance, connectSequentially, connectionsPerHost, movements, strictMovements, excludeAccordTables);
FailureDetector.instance, connectSequentially, connectionsPerHost, movements, strictMovements, excludeAccordTables, tables);
}

RangeStreamer(ClusterMetadata metadata,
Expand All @@ -320,7 +323,8 @@ public RangeStreamer(ClusterMetadata metadata,
int connectionsPerHost,
MovementMap movements,
MovementMap strictMovements,
boolean excludeAccordTables)
boolean excludeAccordTables,
Set<String> tables)
{
this.excludeAccordTables = excludeAccordTables;
Preconditions.checkArgument(streamOperation == StreamOperation.BOOTSTRAP || streamOperation == StreamOperation.REBUILD, streamOperation);
Expand All @@ -333,6 +337,7 @@ public RangeStreamer(ClusterMetadata metadata,
this.movements = movements;
this.strictMovements = strictMovements;
streamPlan.listeners(this.stateStore);
this.tables = tables;

// We're _always_ filtering out a local node and down sources
addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(failureDetector));
Expand Down Expand Up @@ -772,11 +777,15 @@ public StreamResultFuture fetchAsync()
{
String[] cfNames = StreamPlan.nonAccordTablesForKeyspace(ksm);
if (cfNames != null)
{
cfNames = Arrays.stream(cfNames).filter(table -> tables == null || tables.contains(table)).toArray(String[]::new);

@smiklosovic smiklosovic Jun 10, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if this.tables will be an empty set? Then cfNames will be empty array, no? Somebody might think that empty set is "all tables".

In "rebuild" you do

tables == null || tables.isEmpty() ? "(All tables)" : tables

so null and empty is indeed as "everything" which is not the case here ...

streamPlan.requestRanges(source, keyspace, full, transientReplicas, cfNames);
}
}
else
{
streamPlan.requestRanges(source, keyspace, full, transientReplicas);
String[] cfNames = tables == null ? new String[0] : tables.toArray(String[]::new);
streamPlan.requestRanges(source, keyspace, full, transientReplicas, cfNames);
}
});
});
Expand Down
16 changes: 12 additions & 4 deletions src/java/org/apache/cassandra/service/Rebuild.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public static void unsafeResetRebuilding()
isRebuilding.set(false);
}

public static void rebuild(String sourceDc, String keyspace, String tokens, String specificSources, boolean excludeLocalDatacenterNodes)
public static void rebuild(String sourceDc, String keyspace, String tokens, Set<String> tables, String specificSources, boolean excludeLocalDatacenterNodes)
{
// check ongoing rebuild
if (!isRebuilding.compareAndSet(false, true))
Expand Down Expand Up @@ -100,9 +100,16 @@ public static void rebuild(String sourceDc, String keyspace, String tokens, Stri
throw new IllegalArgumentException("Cannot specify tokens without keyspace.");
}

logger.info("rebuild from dc: {}, {}, {}", sourceDc == null ? "(any dc)" : sourceDc,
// check the arguments
if (keyspace == null && (tables != null && !tables.isEmpty()))
{
throw new IllegalArgumentException("Cannot specify tables without keyspace.");
}

logger.info("rebuild from dc: {}, {}, {}, {}", sourceDc == null ? "(any dc)" : sourceDc,
keyspace == null ? "(All keyspaces)" : keyspace,
tokens == null ? "(All tokens)" : tokens);
tokens == null ? "(All tokens)" : tokens,
tables == null || tables.isEmpty() ? "(All tables)" : tables);

StorageService.instance.repairPaxosForTopologyChange("rebuild");
ClusterMetadata metadata = ClusterMetadata.current();
Expand All @@ -117,7 +124,8 @@ public static void rebuild(String sourceDc, String keyspace, String tokens, Stri
DatabaseDescriptor.getStreamingConnectionsPerHost(),
rebuildMovements,
null,
true);
true,
tables);
if (sourceDc != null)
streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(metadata.locator, sourceDc));

Expand Down
7 changes: 6 additions & 1 deletion src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1185,7 +1185,12 @@ public void rebuild(String sourceDc, String keyspace, String tokens, String spec

public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources, boolean excludeLocalDatacenterNodes)
{
Rebuild.rebuild(sourceDc, keyspace, tokens, specificSources, excludeLocalDatacenterNodes);
Rebuild.rebuild(sourceDc, keyspace, tokens, null, specificSources, excludeLocalDatacenterNodes);
}

public void rebuild(String sourceDc, String keyspace, String tokens, Set<String> tables, String specificSources, boolean excludeLocalDatacenterNodes)
{
Rebuild.rebuild(sourceDc, keyspace, tokens, tables, specificSources, excludeLocalDatacenterNodes);
}

public void setRpcTimeout(long value)
Expand Down
15 changes: 15 additions & 0 deletions src/java/org/apache/cassandra/service/StorageServiceMBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,21 @@ default int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion,
*/
public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources, boolean excludeLocalDatacenterNodes);


/**
* Same as {@link #rebuild(String)}, but only for specified keyspace and ranges. It excludes local data center nodes
*
* @param sourceDc Name of DC from which to select sources for streaming or null to pick any node
* @param keyspace Name of the keyspace which to rebuild or null to rebuild all keyspaces.
* @param tables Name of tables to rebuild or null/empty set to rebuild all tables.
* @param tokens Range of tokens to rebuild or null to rebuild all token ranges. In the format of:
* "(start_token_1,end_token_1],(start_token_2,end_token_2],...(start_token_n,end_token_n]"
* @param specificSources list of sources that can be used for rebuilding. Mostly other nodes in the cluster.
* @param excludeLocalDatacenterNodes Flag to indicate whether local data center nodes should be excluded as sources for streaming.
*/
public void rebuild(String sourceDc, String keyspace, String tokens, Set<String> tables, String specificSources, boolean excludeLocalDatacenterNodes);


/** Starts a bulk load and blocks until it completes. */
public void bulkLoad(String directory);

Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/tools/NodeProbe.java
Original file line number Diff line number Diff line change
Expand Up @@ -1826,9 +1826,9 @@ public List<String> describeRing(String keyspaceName, boolean withPort) throws I
return withPort ? ssProxy.describeRingWithPortJMX(keyspaceName) : ssProxy.describeRingJMX(keyspaceName);
}

public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources, boolean excludeLocalDatacenterNodes)
public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources, boolean excludeLocalDatacenterNodes, Set<String> tables)
{
ssProxy.rebuild(sourceDc, keyspace, tokens, specificSources, excludeLocalDatacenterNodes);
ssProxy.rebuild(sourceDc, keyspace, tokens, tables, specificSources, excludeLocalDatacenterNodes);
}

public List<String> sampleKeyRange()
Expand Down
10 changes: 9 additions & 1 deletion src/java/org/apache/cassandra/tools/nodetool/Rebuild.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.cassandra.tools.nodetool;

import java.util.HashSet;
import java.util.Set;

import org.apache.cassandra.tools.NodeProbe;

import picocli.CommandLine.Command;
Expand Down Expand Up @@ -52,6 +55,11 @@ public class Rebuild extends AbstractCommand
description = "Use --exclude-local-dc to exclude nodes in local data center as source for streaming.")
private boolean excludeLocalDatacenterNodes = false;

@Option(paramLabel = "specific_tables",
names = {"-tb", "--table"},

@smiklosovic smiklosovic Jun 10, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do this? --table tb1 --table tb2? Or is it meant to be --table tb1 tb2?

description = "Use -tb to scope the rebuild to particular table")
private Set<String> tables = new HashSet<>();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so on empty --tables, all code path will be executed with empty set instead of null, right?


@Override
public void execute(NodeProbe probe)
{
Expand All @@ -61,6 +69,6 @@ public void execute(NodeProbe probe)
throw new IllegalArgumentException("Cannot specify tokens without keyspace.");
}

probe.rebuild(sourceDataCenterName, keyspace, tokens, specificSources, excludeLocalDatacenterNodes);
probe.rebuild(sourceDataCenterName, keyspace, tokens, specificSources, excludeLocalDatacenterNodes, tables);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,27 @@ public class RebuildStreamingTest extends TestBaseImpl
private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]);
// zero copy streaming sends all components, so the events will include non-Data files as well
private static final int NUM_COMPONENTS = 7;
private String[] args;

@Test
public void zeroCopy() throws IOException
{
test(true);
test(true, false);
}

@Test
public void specifyTable() throws IOException
{
test(true, true);
}

@Test
public void notZeroCopy() throws IOException
{
test(false);
test(false, false);
}

private void test(boolean zeroCopyStreaming) throws IOException
private void test(boolean zeroCopyStreaming, boolean specifyTable) throws IOException
{
try (Cluster cluster = init(Cluster.build(2)
.withConfig(c -> c.with(Feature.values())
Expand All @@ -61,20 +68,23 @@ private void test(boolean zeroCopyStreaming) throws IOException
{
// streaming sends events every 65k, so need to make sure that the files are larger than this to hit
// all cases of the vtable
cluster.schemaChange(withKeyspace("CREATE TABLE %s.users (user_id varchar, spacing blob, PRIMARY KEY (user_id)) WITH compression = { 'enabled' : false };"));
String tableName = "users";
cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " (user_id varchar, spacing blob, PRIMARY KEY (user_id)) WITH compression = { 'enabled' : false };"));
cluster.stream().forEach(i -> i.nodetoolResult("disableautocompaction", KEYSPACE).asserts().success());
IInvokableInstance first = cluster.get(1);
IInvokableInstance second = cluster.get(2);
long expectedFiles = 10;
for (int i = 0; i < expectedFiles; i++)
{
first.executeInternal(withKeyspace("insert into %s.users(user_id, spacing) values (?, ? )"), "dcapwell" + i, BLOB);
first.executeInternal(withKeyspace("insert into %s." + tableName + " (user_id, spacing) values (?, ? )"), "dcapwell" + i, BLOB);
first.flush(KEYSPACE);
}
if (zeroCopyStreaming) // will include all components so need to account for
expectedFiles *= NUM_COMPONENTS;

second.nodetoolResult("rebuild", "--keyspace", KEYSPACE).asserts().success();
args = specifyTable ? new String[]{ "rebuild", "--keyspace", KEYSPACE, "--table", tableName } :
new String[]{ "rebuild", "--keyspace", KEYSPACE };
second.nodetoolResult(args).asserts().success();

SimpleQueryResult qr = first.executeInternalWithResult("SELECT * FROM system_views.streaming");
String txt = QueryResultUtil.expand(qr);
Expand Down
4 changes: 4 additions & 0 deletions test/resources/nodetool/help/rebuild
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ SYNOPSIS
[(-u <username> | --username <username>)] rebuild [--exclude-local-dc]
[(-ks <specific_keyspace> | --keyspace <specific_keyspace>)]
[(-s <specific_sources> | --sources <specific_sources>)]
[(-tb <specific_tables> | --table <specific_tables>)...]
[(-ts <specific_tokens> | --tokens <specific_tokens>)] [--]
<src-dc-name>

Expand Down Expand Up @@ -37,6 +38,9 @@ OPTIONS
is used. Multiple hosts should be separated using commas (e.g.
127.0.0.1,127.0.0.2,...)

-tb <specific_tables>, --table <specific_tables>
Use -tb to scope the rebuild to particular table or set of tables

-ts <specific_tokens>, --tokens <specific_tokens>
Use -ts to rebuild specific token ranges, in the format of
"(start_token_1,end_token_1],(start_token_2,end_token_2],...
Expand Down
3 changes: 2 additions & 1 deletion test/unit/org/apache/cassandra/dht/BootStrapperTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ private RangeStreamer getRangeStreamer() throws UnknownHostException
1,
movements.left,
movements.right,
true);
true,
null);
}

private boolean includesWraparound(Collection<Range<Token>> toFetch)
Expand Down