Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ public static Set<String> splitCommaDelimited(String src)
public volatile DurationSpec.IntMillisecondsBound cms_default_max_retry_backoff = null;
public String cms_retry_delay = "50ms*attempts <= 500ms ... 100ms*attempts <= 1s,retries=10";

public volatile CMSCommitMemberPreferencePolicy cms_commit_member_preference_policy = CMSCommitMemberPreferencePolicy.random;
public volatile int epoch_aware_debounce_inflight_tracker_max_size = 100;

/**
Expand Down Expand Up @@ -1254,6 +1255,31 @@ public enum CommitLogSync
group
}

/**
* Strategy for selecting which CMS member to contact for TCM commits.
*/
public enum CMSCommitMemberPreferencePolicy
{
/** Shuffle candidates randomly (original behavior) */
random,

/** Shuffle local DC candidates randomly, followed by shuffled non-local */
local_random,

/** Sort by address - all nodes converge on same member with the goal
* of reducing Paxos contention, but will increase hot-spotting on the
* determined member
*/
deterministic,

/** Sort local members first by name, then non-local. Nodes in each DC
* will converge on same member with the goal of reducing Paxos contention d
* below the random but with lower latency, though higher contention than
* globally deterministic.
*/
local_deterministic
}

public enum FlushCompression
{
none,
Expand Down Expand Up @@ -1560,4 +1586,4 @@ public enum CQLStartTime
* 6.0 and later.
*/
public volatile boolean gossip_quarantine_disabled = false;
}
}
16 changes: 16 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@
import static org.apache.cassandra.io.util.FileUtils.ONE_MIB;
import static org.apache.cassandra.journal.Params.FlushMode.PERIODIC;
import static org.apache.cassandra.utils.Clock.Global.logInitializationOutcome;
import static org.apache.cassandra.utils.LocalizeString.toLowerCaseLocalized;

public class DatabaseDescriptor
{
Expand Down Expand Up @@ -6092,6 +6093,21 @@ public static DurationSpec getCmsAwaitTimeout()
return conf.cms_await_timeout;
}

public static Config.CMSCommitMemberPreferencePolicy getCmsCommitMemberPreferencePolicy()
{
return conf.cms_commit_member_preference_policy;
}

public static void setCmsCommitMemberPreferencePolicy(Config.CMSCommitMemberPreferencePolicy policy)
{
conf.cms_commit_member_preference_policy = policy;
}

public static void setCmsCommitMemberPreferencePolicy(String policy)
{
setCmsCommitMemberPreferencePolicy(Config.CMSCommitMemberPreferencePolicy.valueOf(toLowerCaseLocalized(policy)));
}

public static int getEpochAwareDebounceInFlightTrackerMaxSize()
{
return conf.epoch_aware_debounce_inflight_tracker_max_size;
Expand Down
13 changes: 13 additions & 0 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1761,6 +1761,19 @@ public String listConsensusMigrations(@Nullable Set<String> keyspaceNames,
return pojoMapToString(snapshotAsMap, format);
}

@Override
public String getCmsCommitMemberPreferencePolicy()
{
return DatabaseDescriptor.getCmsCommitMemberPreferencePolicy().name();
}

@Override
public void setCmsCommitMemberPreferencePolicy(String policy)
{
DatabaseDescriptor.setCmsCommitMemberPreferencePolicy(policy);
logger.info("Set cms_commit_member_preference_policy to {}", policy);
}

public Map<String,List<Integer>> getConcurrency(List<String> stageNames)
{
Stream<Stage> stageStream = stageNames.isEmpty() ? stream(Stage.values()) : stageNames.stream().map(Stage::fromPoolName);
Expand Down
12 changes: 12 additions & 0 deletions src/java/org/apache/cassandra/service/StorageServiceMBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -1170,6 +1170,18 @@ Integer finishConsensusMigration(@Nonnull String keyspace,
List<String> getAccordManagedKeyspaces();
List<String> getAccordManagedTables();

/** Get the CMS commit member preference policy
*
* @return how to choose the cms member preference order for commits
*/
public String getCmsCommitMemberPreferencePolicy();

/** Update the CMS commit member preference policy
*
* @param policy see Config.CMSCommitMemberPreferencePolicy
*/
public void setCmsCommitMemberPreferencePolicy(String policy);

/** Gets the concurrency settings for processing stages*/
static class StageConcurrency implements Serializable
{
Expand Down
74 changes: 73 additions & 1 deletion src/java/org/apache/cassandra/tcm/RemoteProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,19 @@
import java.util.function.Supplier;

import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.RequestFailure;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Locator;
import org.apache.cassandra.metrics.TCMMetrics;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageDelivery;
Expand All @@ -52,6 +55,7 @@
import org.apache.cassandra.tcm.log.Entry;
import org.apache.cassandra.tcm.log.LocalLog;
import org.apache.cassandra.tcm.log.LogState;
import org.apache.cassandra.tcm.membership.Location;
import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
Expand Down Expand Up @@ -128,11 +132,79 @@ private List<InetAddressAndPort> candidates(boolean allowDiscovery)
}
}

Collections.shuffle(candidates);
sortCandidates(candidates);

return candidates;
}

private static void sortCandidates(List<InetAddressAndPort> candidates)
{
Config.CMSCommitMemberPreferencePolicy policy = DatabaseDescriptor.getCmsCommitMemberPreferencePolicy();
sortCandidates(candidates, policy, DatabaseDescriptor.getLocator());
}

@VisibleForTesting
static void sortCandidates(List<InetAddressAndPort> candidates,
Config.CMSCommitMemberPreferencePolicy policy,
Locator locator)
{
switch (policy)
{
case random:
Collections.shuffle(candidates);
break;
case local_random:
shuffleLocalDcFirstThenShuffleRest(candidates, locator);
break;
case deterministic:
Collections.sort(candidates);
break;
case local_deterministic:
sortLocalDcFirstThenByAddress(candidates, locator);
break;
default:
throw new IllegalStateException(policy.toString());
}
}

@VisibleForTesting
static void shuffleLocalDcFirstThenShuffleRest(List<InetAddressAndPort> candidates, Locator locator)
{
Location local = locator.local();

List<InetAddressAndPort> localDc = new ArrayList<>();
List<InetAddressAndPort> remoteDc = new ArrayList<>();

for (InetAddressAndPort ep : candidates)
{
if (local.sameDatacenter(locator.location(ep)))
localDc.add(ep);
else
remoteDc.add(ep);
}

Collections.shuffle(localDc);
Collections.shuffle(remoteDc);

candidates.clear();
candidates.addAll(localDc);
candidates.addAll(remoteDc);
}

@VisibleForTesting
static void sortLocalDcFirstThenByAddress(List<InetAddressAndPort> candidates, Locator locator)
{
Location local = locator.local();

candidates.sort((a, b) -> {
boolean aLocal = local.sameDatacenter(locator.location(a));
boolean bLocal = local.sameDatacenter(locator.location(b));
if (aLocal != bLocal)
return aLocal ? -1 : 1;
return a.compareTo(b);
});
}

@Override
public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry retryPolicy)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class DatabaseDescriptorRefTest
"org.apache.cassandra.config.CassandraRelevantProperties$PropertyConverter",
"org.apache.cassandra.config.Config",
"org.apache.cassandra.config.Config$1",
"org.apache.cassandra.config.Config$CMSCommitMemberPreferencePolicy",
"org.apache.cassandra.config.Config$CommitFailurePolicy",
"org.apache.cassandra.config.Config$CQLStartTime",
"org.apache.cassandra.config.Config$CommitLogSync",
Expand Down
Loading