From f2dd66968910d846aca321c1c3644b895414882b Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Fri, 12 Jun 2026 14:26:26 +0100 Subject: [PATCH] Add policy for selecting CMS host when submitting commit request Patch by Jon Meredith; reviewed by Sam Tunnicliffe for CASSANDRA-21456 --- .../org/apache/cassandra/config/Config.java | 28 ++- .../cassandra/config/DatabaseDescriptor.java | 16 ++ .../cassandra/service/StorageService.java | 13 ++ .../service/StorageServiceMBean.java | 12 ++ .../apache/cassandra/tcm/RemoteProcessor.java | 74 +++++++- .../config/DatabaseDescriptorRefTest.java | 1 + .../cassandra/tcm/RemoteProcessorTest.java | 166 ++++++++++++++++++ 7 files changed, 308 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 3a485fddc5b2..aca2bbcdde30 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -191,6 +191,7 @@ public static Set splitCommaDelimited(String src) public volatile DurationSpec.IntMillisecondsBound cms_default_max_retry_backoff = null; public String cms_retry_delay = "50ms*attempts <= 500ms ... 100ms*attempts <= 1s,retries=10"; + public volatile CMSCommitMemberPreferencePolicy cms_commit_member_preference_policy = CMSCommitMemberPreferencePolicy.random; public volatile int epoch_aware_debounce_inflight_tracker_max_size = 100; /** @@ -1254,6 +1255,31 @@ public enum CommitLogSync group } + /** + * Strategy for selecting which CMS member to contact for TCM commits. + */ + public enum CMSCommitMemberPreferencePolicy + { + /** Shuffle candidates randomly (original behavior) */ + random, + + /** Shuffle local DC candidates randomly, followed by shuffled non-local */ + local_random, + + /** Sort by address - all nodes converge on same member with the goal + * of reducing Paxos contention, but will increase hot-spotting on the + * determined member + */ + deterministic, + + /** Sort local members first by name, then non-local. Nodes in each DC + * will converge on same member with the goal of reducing Paxos contention d + * below the random but with lower latency, though higher contention than + * globally deterministic. + */ + local_deterministic + } + public enum FlushCompression { none, @@ -1560,4 +1586,4 @@ public enum CQLStartTime * 6.0 and later. */ public volatile boolean gossip_quarantine_disabled = false; -} +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 9ee09db70709..e03f13723f06 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -175,6 +175,7 @@ import static org.apache.cassandra.io.util.FileUtils.ONE_MIB; import static org.apache.cassandra.journal.Params.FlushMode.PERIODIC; import static org.apache.cassandra.utils.Clock.Global.logInitializationOutcome; +import static org.apache.cassandra.utils.LocalizeString.toLowerCaseLocalized; public class DatabaseDescriptor { @@ -6092,6 +6093,21 @@ public static DurationSpec getCmsAwaitTimeout() return conf.cms_await_timeout; } + public static Config.CMSCommitMemberPreferencePolicy getCmsCommitMemberPreferencePolicy() + { + return conf.cms_commit_member_preference_policy; + } + + public static void setCmsCommitMemberPreferencePolicy(Config.CMSCommitMemberPreferencePolicy policy) + { + conf.cms_commit_member_preference_policy = policy; + } + + public static void setCmsCommitMemberPreferencePolicy(String policy) + { + setCmsCommitMemberPreferencePolicy(Config.CMSCommitMemberPreferencePolicy.valueOf(toLowerCaseLocalized(policy))); + } + public static int getEpochAwareDebounceInFlightTrackerMaxSize() { return conf.epoch_aware_debounce_inflight_tracker_max_size; diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index d32d19460d68..0b461093348d 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1761,6 +1761,19 @@ public String listConsensusMigrations(@Nullable Set keyspaceNames, return pojoMapToString(snapshotAsMap, format); } + @Override + public String getCmsCommitMemberPreferencePolicy() + { + return DatabaseDescriptor.getCmsCommitMemberPreferencePolicy().name(); + } + + @Override + public void setCmsCommitMemberPreferencePolicy(String policy) + { + DatabaseDescriptor.setCmsCommitMemberPreferencePolicy(policy); + logger.info("Set cms_commit_member_preference_policy to {}", policy); + } + public Map> getConcurrency(List stageNames) { Stream stageStream = stageNames.isEmpty() ? stream(Stage.values()) : stageNames.stream().map(Stage::fromPoolName); diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index b9f47b3b2dd6..34db8b418057 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -1170,6 +1170,18 @@ Integer finishConsensusMigration(@Nonnull String keyspace, List getAccordManagedKeyspaces(); List getAccordManagedTables(); + /** Get the CMS commit member preference policy + * + * @return how to choose the cms member preference order for commits + */ + public String getCmsCommitMemberPreferencePolicy(); + + /** Update the CMS commit member preference policy + * + * @param policy see Config.CMSCommitMemberPreferencePolicy + */ + public void setCmsCommitMemberPreferencePolicy(String policy); + /** Gets the concurrency settings for processing stages*/ static class StageConcurrency implements Serializable { diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java index b14e73feb851..69830b64da04 100644 --- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java +++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java @@ -31,16 +31,19 @@ import java.util.function.Supplier; import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.RequestFailure; import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Locator; import org.apache.cassandra.metrics.TCMMetrics; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessageDelivery; @@ -52,6 +55,7 @@ import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.tcm.log.LocalLog; import org.apache.cassandra.tcm.log.LogState; +import org.apache.cassandra.tcm.membership.Location; import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; @@ -128,11 +132,79 @@ private List candidates(boolean allowDiscovery) } } - Collections.shuffle(candidates); + sortCandidates(candidates); return candidates; } + private static void sortCandidates(List candidates) + { + Config.CMSCommitMemberPreferencePolicy policy = DatabaseDescriptor.getCmsCommitMemberPreferencePolicy(); + sortCandidates(candidates, policy, DatabaseDescriptor.getLocator()); + } + + @VisibleForTesting + static void sortCandidates(List candidates, + Config.CMSCommitMemberPreferencePolicy policy, + Locator locator) + { + switch (policy) + { + case random: + Collections.shuffle(candidates); + break; + case local_random: + shuffleLocalDcFirstThenShuffleRest(candidates, locator); + break; + case deterministic: + Collections.sort(candidates); + break; + case local_deterministic: + sortLocalDcFirstThenByAddress(candidates, locator); + break; + default: + throw new IllegalStateException(policy.toString()); + } + } + + @VisibleForTesting + static void shuffleLocalDcFirstThenShuffleRest(List candidates, Locator locator) + { + Location local = locator.local(); + + List localDc = new ArrayList<>(); + List remoteDc = new ArrayList<>(); + + for (InetAddressAndPort ep : candidates) + { + if (local.sameDatacenter(locator.location(ep))) + localDc.add(ep); + else + remoteDc.add(ep); + } + + Collections.shuffle(localDc); + Collections.shuffle(remoteDc); + + candidates.clear(); + candidates.addAll(localDc); + candidates.addAll(remoteDc); + } + + @VisibleForTesting + static void sortLocalDcFirstThenByAddress(List candidates, Locator locator) + { + Location local = locator.local(); + + candidates.sort((a, b) -> { + boolean aLocal = local.sameDatacenter(locator.location(a)); + boolean bLocal = local.sameDatacenter(locator.location(b)); + if (aLocal != bLocal) + return aLocal ? -1 : 1; + return a.compareTo(b); + }); + } + @Override public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry retryPolicy) { diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java index 73a58b75a490..33b5c6f51352 100644 --- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java +++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java @@ -96,6 +96,7 @@ public class DatabaseDescriptorRefTest "org.apache.cassandra.config.CassandraRelevantProperties$PropertyConverter", "org.apache.cassandra.config.Config", "org.apache.cassandra.config.Config$1", + "org.apache.cassandra.config.Config$CMSCommitMemberPreferencePolicy", "org.apache.cassandra.config.Config$CommitFailurePolicy", "org.apache.cassandra.config.Config$CQLStartTime", "org.apache.cassandra.config.Config$CommitLogSync", diff --git a/test/unit/org/apache/cassandra/tcm/RemoteProcessorTest.java b/test/unit/org/apache/cassandra/tcm/RemoteProcessorTest.java index 61b1b37e5132..85dcfac8a1bd 100644 --- a/test/unit/org/apache/cassandra/tcm/RemoteProcessorTest.java +++ b/test/unit/org/apache/cassandra/tcm/RemoteProcessorTest.java @@ -19,13 +19,19 @@ package org.apache.cassandra.tcm; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.junit.Test; +import org.apache.cassandra.config.Config; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Locator; +import org.apache.cassandra.tcm.membership.Location; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -107,4 +113,164 @@ private List eps(int endpointCount) } return allEndpoints; } + + // ========== CMS Member Selection Tests ========== + + @Test + public void testCandidatesDeterministic() + { + // Create endpoints in non-sorted order + List candidates = new ArrayList<>(Arrays.asList( + InetAddressAndPort.getByNameUnchecked("127.0.0.5"), + InetAddressAndPort.getByNameUnchecked("127.0.0.1"), + InetAddressAndPort.getByNameUnchecked("127.0.0.9"), + InetAddressAndPort.getByNameUnchecked("127.0.0.3") + )); + + // Deterministic policy should sort by address + RemoteProcessor.sortCandidates(candidates, Config.CMSCommitMemberPreferencePolicy.deterministic, null); + + assertEquals(InetAddressAndPort.getByNameUnchecked("127.0.0.1"), candidates.get(0)); + assertEquals(InetAddressAndPort.getByNameUnchecked("127.0.0.3"), candidates.get(1)); + assertEquals(InetAddressAndPort.getByNameUnchecked("127.0.0.5"), candidates.get(2)); + assertEquals(InetAddressAndPort.getByNameUnchecked("127.0.0.9"), candidates.get(3)); + } + + @Test + public void testCandidatesRandom() + { + // Create endpoints + List original = Arrays.asList( + InetAddressAndPort.getByNameUnchecked("127.0.0.1"), + InetAddressAndPort.getByNameUnchecked("127.0.0.2"), + InetAddressAndPort.getByNameUnchecked("127.0.0.3"), + InetAddressAndPort.getByNameUnchecked("127.0.0.4") + ); + List candidates = new ArrayList<>(original); + + // Random policy should shuffle (but contain same elements) + RemoteProcessor.sortCandidates(candidates, Config.CMSCommitMemberPreferencePolicy.random, null); + + // Same elements, possibly different order + assertEquals(new HashSet<>(original), new HashSet<>(candidates)); + assertEquals(original.size(), candidates.size()); + } + + @Test + public void testCandidatesLocalDeterministic() + { + // DC1 endpoints (local) + InetAddressAndPort dc1_1 = InetAddressAndPort.getByNameUnchecked("127.0.0.5"); + InetAddressAndPort dc1_2 = InetAddressAndPort.getByNameUnchecked("127.0.0.1"); + // DC2 endpoints (remote) + InetAddressAndPort dc2_1 = InetAddressAndPort.getByNameUnchecked("127.0.0.3"); + InetAddressAndPort dc2_2 = InetAddressAndPort.getByNameUnchecked("127.0.0.2"); + + List candidates = new ArrayList<>(Arrays.asList(dc2_1, dc1_1, dc2_2, dc1_2)); + + // Create a test locator + Location dc1 = new Location("DC1", "rack1"); + Location dc2 = new Location("DC2", "rack1"); + Map locationMap = new HashMap<>(); + locationMap.put(dc1_1, dc1); + locationMap.put(dc1_2, dc1); + locationMap.put(dc2_1, dc2); + locationMap.put(dc2_2, dc2); + + TestLocator locator = new TestLocator(dc1, locationMap); + + // local_deterministic: local DC sorted first, then remote DC sorted + RemoteProcessor.sortCandidates(candidates, Config.CMSCommitMemberPreferencePolicy.local_deterministic, locator); + + // Local DC first (sorted), then remote DC (sorted) + assertEquals(dc1_2, candidates.get(0)); // 127.0.0.1 (DC1) + assertEquals(dc1_1, candidates.get(1)); // 127.0.0.5 (DC1) + assertEquals(dc2_2, candidates.get(2)); // 127.0.0.2 (DC2) + assertEquals(dc2_1, candidates.get(3)); // 127.0.0.3 (DC2) + } + + @Test + public void testCandidatesLocalRandom() + { + // DC1 endpoints (local) + InetAddressAndPort dc1_1 = InetAddressAndPort.getByNameUnchecked("127.0.0.1"); + InetAddressAndPort dc1_2 = InetAddressAndPort.getByNameUnchecked("127.0.0.2"); + // DC2 endpoints (remote) + InetAddressAndPort dc2_1 = InetAddressAndPort.getByNameUnchecked("127.0.0.3"); + InetAddressAndPort dc2_2 = InetAddressAndPort.getByNameUnchecked("127.0.0.4"); + + List candidates = new ArrayList<>(Arrays.asList(dc2_1, dc1_1, dc2_2, dc1_2)); + + // Create a test locator + Location dc1 = new Location("DC1", "rack1"); + Location dc2 = new Location("DC2", "rack1"); + Map locationMap = new HashMap<>(); + locationMap.put(dc1_1, dc1); + locationMap.put(dc1_2, dc1); + locationMap.put(dc2_1, dc2); + locationMap.put(dc2_2, dc2); + + TestLocator locator = new TestLocator(dc1, locationMap); + + // local_random: local DC shuffled first, then remote DC shuffled + RemoteProcessor.sortCandidates(candidates, Config.CMSCommitMemberPreferencePolicy.local_random, locator); + + // Local DC should be in first 2 positions, remote DC in last 2 + Set localDcEndpoints = new HashSet<>(Arrays.asList(dc1_1, dc1_2)); + Set remoteDcEndpoints = new HashSet<>(Arrays.asList(dc2_1, dc2_2)); + + assertTrue(localDcEndpoints.contains(candidates.get(0))); + assertTrue(localDcEndpoints.contains(candidates.get(1))); + assertTrue(remoteDcEndpoints.contains(candidates.get(2))); + assertTrue(remoteDcEndpoints.contains(candidates.get(3))); + } + + @Test + public void testDeterministicPolicyProducesSameOrderAcrossCalls() + { + List candidates1 = new ArrayList<>(Arrays.asList( + InetAddressAndPort.getByNameUnchecked("127.0.0.5"), + InetAddressAndPort.getByNameUnchecked("127.0.0.1"), + InetAddressAndPort.getByNameUnchecked("127.0.0.3") + )); + List candidates2 = new ArrayList<>(Arrays.asList( + InetAddressAndPort.getByNameUnchecked("127.0.0.3"), + InetAddressAndPort.getByNameUnchecked("127.0.0.5"), + InetAddressAndPort.getByNameUnchecked("127.0.0.1") + )); + + RemoteProcessor.sortCandidates(candidates1, Config.CMSCommitMemberPreferencePolicy.deterministic, null); + RemoteProcessor.sortCandidates(candidates2, Config.CMSCommitMemberPreferencePolicy.deterministic, null); + + // Both should produce identical ordering regardless of initial order + assertEquals(candidates1, candidates2); + } + + /** + * Test Locator implementation for unit testing + */ + private static class TestLocator extends Locator + { + private final Location localLocation; + private final Map locationMap; + + public TestLocator(Location localLocation, Map locationMap) + { + super(null, null, () -> localLocation, null); + this.localLocation = localLocation; + this.locationMap = locationMap; + } + + @Override + public Location local() + { + return localLocation; + } + + @Override + public Location location(InetAddressAndPort endpoint) + { + return locationMap.getOrDefault(endpoint, Location.UNKNOWN); + } + } }