Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ public final class ProcedureMessages {
public static final String PID_REMOVEREGIONGROUP_STATE_FAILED =
"[pid{}][RemoveRegionGroup] state {} failed";
public static final String PID_REMOVEREGIONGROUP_DELETE_REPLICA_FAILED =
"[pid{}][RemoveRegionGroup] failed to delete a replica of region {} (attempt {}/{}), will retry. reason: {}";
"[pid{}][RemoveRegionGroup] failed to delete a replica of region {} (attempt {}), will keep retrying until it is deleted. reason: {}";
public static final String PID_REMOVEREGIONGROUP_SUCCESS_PROCEDURE_TOOK =
"[pid{}][RemoveRegionGroup] success, region group {} has been deleted. Procedure took {} (started at {}).";
public static final String PID_MIGRATEREGION_STARTED_WILL_BE_MIGRATED_FROM_DATANODE_TO =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ public final class ProcedureMessages {
public static final String PID_REMOVEREGIONGROUP_STATE_FAILED =
"[pid{}][RemoveRegionGroup] 状态 {} 失败";
public static final String PID_REMOVEREGIONGROUP_DELETE_REPLICA_FAILED =
"[pid{}][RemoveRegionGroup] 删除 region {} 的一个副本失败(第 {}/{} 次尝试),将重试。原因:{}";
"[pid{}][RemoveRegionGroup] 删除 region {} 的一个副本失败(第 {} 次尝试),将持续重试直到删除成功。原因:{}";
public static final String PID_REMOVEREGIONGROUP_SUCCESS_PROCEDURE_TOOK =
"[pid{}][RemoveRegionGroup] 成功,region group {} 已删除。过程耗时 {}(开始于 {})。";
public static final String PID_MIGRATEREGION_STARTED_WILL_BE_MIGRATED_FROM_DATANODE_TO =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,11 @@ protected Flow executeFromState(
case SHUNT_REGION_REPLICAS:
persistPlan = new CreateRegionGroupsPlan();
final OfferRegionMaintainTasksPlan offerPlan = new OfferRegionMaintainTasksPlan();
// RegionGroups that failed to reach a serving quorum are removed via a child
// RemoveRegionGroupProcedure, which deletes every replica that did get created.
// RegionGroups that failed to reach a serving quorum have their redundant (already-created)
// replicas removed via an independent root RemoveRegionGroupProcedure. Submitting them as
// root procedures (instead of children) keeps this procedure from waiting for or being
// failed by the cleanup: each one retries forever until those replicas are deleted, while
// this procedure proceeds to activate the region groups that did form a quorum.
final List<RemoveRegionGroupProcedure> removeRegionGroupProcedures = new ArrayList<>();
// Filter those RegionGroups that created successfully
createRegionGroupsPlan
Expand Down Expand Up @@ -197,7 +200,26 @@ protected Flow executeFromState(
LOGGER.warn(
ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e);
}
removeRegionGroupProcedures.forEach(this::addChildProcedure);
// Submit the redundant-replica cleanups as independent root procedures. This is
// intentionally NOT guarded by isStateDeserialized(): the executor persists a procedure at
// a state BEFORE that state's body has run (it advances the state on the previous cycle,
// then may stop at the inter-state boundary on a leader switch — see
// ProcedureExecutor#executeProcedure), so a recovery that lands on SHUNT_REGION_REPLICAS
// means the submissions have NOT happened yet. Skipping them would leave the
// already-created
// replicas of sub-quorum region groups on disk with no cleanup and no partition-table
// record
// (the else branch above never persisted them). Re-submitting on recovery is safe instead:
// the cleanups are recomputed from the serialized failedRegionReplicaSets, each gets a
// fresh
// procId and performs an idempotent delete, so a duplicate is harmless whereas a skip
// leaks.
removeRegionGroupProcedures.forEach(
removeRegionGroupProcedure ->
env.getConfigManager()
.getProcedureManager()
.getExecutor()
.submitProcedure(removeRegionGroupProcedure));
setNextState(CreateRegionGroupsState.REBALANCE_DATA_PARTITION_POLICY);
break;
case REBALANCE_DATA_PARTITION_POLICY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,21 @@
* already-absent peer, so it works for a group of any size — including a sub-quorum group that
* never finished forming. The DataNode runs the deletion asynchronously and this procedure polls
* for the result, so a slow deletion is never wrongly reported as finished.
*
* <p>This procedure is submitted as an independent root procedure (not a child) by its callers,
* which only enqueue the deletion and return immediately. It therefore owns the deletion end to
* end: on any failure it retries the current replica forever (backing off between attempts) instead
* of giving up, because there is no parent left to fall back to and the region's peer/data must not
* be left on disk. Each genuine re-attempt uses a FRESH DataNode-side taskId (the DataNode dedups
* by taskId and caches a terminal result forever, so reusing one taskId would make every retry a
* no-op that never re-runs the delete); the in-flight taskId is persisted so a leader change
* re-polls the same task rather than double-submitting. It carries its own {@link
* TRegionReplicaSet} copy, so it can finish even after the caller has dropped the partition table,
* and it survives ConfigNode leader change / restart.
*/
public class RemoveRegionGroupProcedure extends RegionOperationProcedure<RemoveRegionGroupState> {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoveRegionGroupProcedure.class);

private static final int MAX_DELETE_REPLICA_RETRY = 3;
private static final long DELETE_REPLICA_RETRY_INTERVAL_MS = 5_000;

private TRegionReplicaSet regionReplicaSet;
Expand All @@ -75,10 +85,32 @@
// not finished deleting.
private int currentReplicaIndex;

// Number of failed attempts on the replica at currentReplicaIndex. Transient: a leader change
// restarts the retry budget for the current replica.
// Number of failed attempts on the replica at currentReplicaIndex, used only for logging. Retries
// are unbounded, so this is not a budget. Transient: a leader change restarts the counter for the
// current replica.
private transient int attemptedForCurrentReplica;

// Monotonic count of delete tasks this procedure has submitted, across all replicas. Persisted
// and
// only ever incremented. It is the low half of the DataNode-side taskId (see deleteTaskId): a
// fresh value per genuine re-attempt makes the DataNode re-run the delete instead of replaying a
// cached terminal result for a reused taskId (the DataNode dedups by taskId and never clears the
// cache), which is the bug this fixes. It never resets, so every taskId this procedure emits is
// distinct even across replicas and retries.
private long deleteTaskSeq;

// Whether a delete task for the replica at currentReplicaIndex has already been submitted (and
// thus
// deleteTaskSeq already identifies an in-flight task to re-poll) rather than needing a fresh one.
// Persisted so a leader change mid-attempt re-polls the SAME in-flight task instead of submitting
// a
// duplicate; cleared on success or when a terminal failure forces a fresh re-attempt.
private boolean deleteTaskSubmitted;

// Bit budget for deleteTaskId(): sign bit (=> negative) + PROC_ID_BITS + SEQ_BITS must be <= 64.
private static final int SEQ_BITS = 20;
private static final int PROC_ID_BITS = 43;

public RemoveRegionGroupProcedure() {
super();
}
Expand All @@ -93,15 +125,27 @@
this.currentReplicaIndex = currentReplicaIndex;
}

@TestOnly
void setDeleteTaskState(long deleteTaskSeq, boolean deleteTaskSubmitted) {
this.deleteTaskSeq = deleteTaskSeq;
this.deleteTaskSubmitted = deleteTaskSubmitted;
}

@TestOnly
long deleteTaskIdForTest() {
return deleteTaskId();
}

@Override
protected Flow executeFromState(ConfigNodeProcedureEnv env, RemoveRegionGroupState state)
throws InterruptedException {
final List<TDataNodeLocation> dataNodeLocations =
regionReplicaSet == null ? null : regionReplicaSet.getDataNodeLocations();
if (dataNodeLocations == null) {
// A null replica set means deserialization failed; fail loudly instead of silently reporting
// the group as deleted, otherwise the parent would drop the partition table while the region
// data is still on disk.
// A null replica set means deserialization failed. Retrying cannot recover the lost
// locations,
// so fail loudly instead of silently reporting the group as deleted (which would leave the
// region's peer/data on disk with no record of where it lives).
setFailure(
new ProcedureException(ProcedureMessages.UNSUPPORTED_STATE + "missing regionReplicaSet"));
return Flow.NO_MORE_STATE;
Expand Down Expand Up @@ -137,50 +181,62 @@
regionId,
simplifiedLocation(targetDataNode));

// deleteLocalPeer is idempotent (it tolerates an already-absent peer) and the DataNode
// dedups by taskId, so re-submitting after a leader change or a retry is safe.
// Start a fresh attempt (fresh taskId) unless we are resuming an already-submitted one
// after
// a leader change, in which case we re-poll the SAME task rather than submitting a
// duplicate.
if (!deleteTaskSubmitted) {
deleteTaskSeq++;
deleteTaskSubmitted = true;
}
final long deleteTaskId = deleteTaskId();

// deleteLocalPeer is idempotent (it tolerates an already-absent peer), and re-submitting
// the
// same taskId re-polls the same DataNode task, so resuming after a leader change is safe.
final TSStatus submitStatus;
final TRegionMigrateResult result;
try {
submitStatus =
handler.submitDeleteOldRegionPeerTask(getProcId(), targetDataNode, regionId);
handler.submitDeleteOldRegionPeerTask(deleteTaskId, targetDataNode, regionId);
setKillPoint(state);
if (submitStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
return retryCurrentReplicaOrFail(
return retryCurrentReplica(
String.format(
"submit delete task for region %s to DataNode %s failed: %s",
regionId, simplifiedLocation(targetDataNode), submitStatus));
}
result = handler.waitTaskFinish(getProcId(), targetDataNode);
result = handler.waitTaskFinish(deleteTaskId, targetDataNode);
} catch (InterruptedException e) {
throw e;
} catch (Exception e) {
LOGGER.error(ProcedureMessages.PID_REMOVEREGIONGROUP_STATE_FAILED, getProcId(), state, e);
return retryCurrentReplicaOrFail(
return retryCurrentReplica(
String.format(
"delete region %s from DataNode %s threw %s",
regionId, simplifiedLocation(targetDataNode), e));
}

switch (result.getTaskStatus()) {
case SUCCESS:
// Advance to the next replica with a fresh retry budget.
// Advance to the next replica with a fresh retry counter and a fresh delete task.
currentReplicaIndex++;
attemptedForCurrentReplica = 0;
deleteTaskSubmitted = false;
setNextState(RemoveRegionGroupState.DELETE_REGION_REPLICAS);
return Flow.HAS_MORE_STATE;
case PROCESSING:
// waitTaskFinish() only returns PROCESSING when its polling loop was interrupted, i.e.
// this ConfigNode is shutting down / losing leadership. The delete task is still
// running
// on the DataNode, so persist and re-poll after recovery: stay on this replica without
// advancing it and without consuming a retry attempt.
// running on the DataNode, so persist and re-poll after recovery: stay on this replica
// without advancing it, without consuming a retry attempt, and keeping deleteTaskSeq /
// deleteTaskSubmitted so the re-poll targets the same in-flight task.
setNextState(RemoveRegionGroupState.DELETE_REGION_REPLICAS);
return Flow.HAS_MORE_STATE;
case TASK_NOT_EXIST:
case FAIL:
default:
return retryCurrentReplicaOrFail(
return retryCurrentReplica(
String.format(
"delete region %s from DataNode %s, task status is %s",
regionId, simplifiedLocation(targetDataNode), result.getTaskStatus()));
Expand All @@ -192,30 +248,57 @@
}

/**
* Retry the replica at {@link #currentReplicaIndex} after a backoff, or fail the whole procedure
* once the per-replica retry budget is exhausted. Failing (rather than skipping the replica)
* keeps the parent from dropping the partition table while a region's peer/data is still on disk.
* Retry the replica at {@link #currentReplicaIndex} after a backoff. This procedure never gives
* up on a replica: because it is submitted as an independent root procedure, there is no parent
* to fall back to, and skipping or failing would leave the region's peer/data on disk. So it
* backs off and re-runs the same state until the replica is deleted, which eventually succeeds
* once the target DataNode is reachable: the delete is idempotent, and clearing {@link
* #deleteTaskSubmitted} here makes the next attempt use a FRESH DataNode-side taskId (a new
* {@link #deleteTaskSeq}), so the DataNode actually re-executes the delete instead of returning a
* cached terminal result for the previous taskId.
*/
private Flow retryCurrentReplicaOrFail(String reason) throws InterruptedException {
private Flow retryCurrentReplica(String reason) throws InterruptedException {

Check warning on line 260 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionGroupProcedure.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Expected @throws tag for 'InterruptedException'.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8i05sSpZOfj-La8maa&open=AZ8i05sSpZOfj-La8maa&pullRequest=18097
attemptedForCurrentReplica++;
if (attemptedForCurrentReplica <= MAX_DELETE_REPLICA_RETRY) {
LOGGER.warn(
ProcedureMessages.PID_REMOVEREGIONGROUP_DELETE_REPLICA_FAILED,
getProcId(),
regionId,
attemptedForCurrentReplica,
MAX_DELETE_REPLICA_RETRY + 1,
reason);
Thread.sleep(DELETE_REPLICA_RETRY_INTERVAL_MS);
setNextState(RemoveRegionGroupState.DELETE_REGION_REPLICAS);
return Flow.HAS_MORE_STATE;
LOGGER.warn(
ProcedureMessages.PID_REMOVEREGIONGROUP_DELETE_REPLICA_FAILED,
getProcId(),
regionId,
attemptedForCurrentReplica,
reason);
// Force a fresh delete task on the next attempt so the DataNode re-runs the delete rather than
// replaying a cached FAIL/SUCCESS for this taskId.
deleteTaskSubmitted = false;
Thread.sleep(DELETE_REPLICA_RETRY_INTERVAL_MS);
setNextState(RemoveRegionGroupState.DELETE_REGION_REPLICAS);
return Flow.HAS_MORE_STATE;
}

/**
* The DataNode-side taskId for the current attempt, derived from this procedure's (globally
* unique, consensus-replicated) procId and its monotonic {@link #deleteTaskSeq}. It is packed
* into the NEGATIVE i64 space, which is disjoint from every real procId (all {@code >= 0}); other
* region-maintain procedures (add/remove peer) use {@code getProcId()} directly as the taskId
* against the same DataNode task map, so a negative id can never collide with theirs. Unlike
* minting from the procedure-store id allocator, this needs nothing extra replicated: procId is
* already replicated and deleteTaskSeq is persisted with this procedure, so the taskId is stable
* across a leader change and never regresses.
*
* <p>Layout: sign bit set (=> negative) | {@value PROC_ID_BITS} bits of procId | {@value
* SEQ_BITS} bits of deleteTaskSeq. The bounds are astronomically beyond any real cluster (a
* procId needs 2^43 procedures; a single group delete needs 2^20 retries), and are asserted
* rather than silently wrapped so a violation fails the procedure loudly instead of emitting a
* colliding id.
*/
private long deleteTaskId() {
final long procId = getProcId();
if (procId < 0 || procId >= (1L << PROC_ID_BITS) || deleteTaskSeq >= (1L << SEQ_BITS)) {
throw new IllegalStateException(

Check warning on line 295 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionGroupProcedure.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Expected @throws tag for 'IllegalStateException'.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8i05sSpZOfj-La8mab&open=AZ8i05sSpZOfj-La8mab&pullRequest=18097
String.format(
"cannot derive a collision-free delete taskId: procId=%d, deleteTaskSeq=%d exceed the "

Check warning on line 297 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionGroupProcedure.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 101).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8i05sSpZOfj-La8mac&open=AZ8i05sSpZOfj-La8mac&pullRequest=18097
+ "%d/%d-bit budget",
procId, deleteTaskSeq, PROC_ID_BITS, SEQ_BITS));
}
setFailure(
new ProcedureException(
String.format(
"[pid%d][RemoveRegionGroup] gave up after %d attempts: %s",
getProcId(), attemptedForCurrentReplica, reason)));
return Flow.NO_MORE_STATE;
return Long.MIN_VALUE | (procId << SEQ_BITS) | deleteTaskSeq;
}

@Override
Expand Down Expand Up @@ -243,6 +326,11 @@
super.serialize(stream);
ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(regionReplicaSet, stream);
ReadWriteIOUtils.write(currentReplicaIndex, stream);
// Persist the delete-task cursor so a leader change re-derives the SAME in-flight taskId and
// re-polls it (deleteTaskSubmitted == true) instead of submitting a duplicate, and so the
// monotonic deleteTaskSeq never regresses.
ReadWriteIOUtils.write(deleteTaskSeq, stream);
ReadWriteIOUtils.write(deleteTaskSubmitted, stream);
}

@Override
Expand All @@ -252,6 +340,14 @@
regionReplicaSet = ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(byteBuffer);
regionId = regionReplicaSet.getRegionId();
currentReplicaIndex = ReadWriteIOUtils.readInt(byteBuffer);
// deleteTaskSeq/deleteTaskSubmitted were appended after the first version of this procedure.
// That first version only ever existed on the unreleased branch that added this procedure
// (never in a release), but a dev/CI cluster could persist a blob without these trailing
// fields; tolerate it by defaulting to "no in-flight task" instead of reading past the end.
if (byteBuffer.hasRemaining()) {
deleteTaskSeq = ReadWriteIOUtils.readLong(byteBuffer);
deleteTaskSubmitted = ReadWriteIOUtils.readBool(byteBuffer);
}
} catch (ThriftSerDeException e) {
LOGGER.error(ProcedureMessages.ERROR_IN_DESERIALIZE, this.getClass(), e);
}
Expand All @@ -264,12 +360,14 @@
}
RemoveRegionGroupProcedure procedure = (RemoveRegionGroupProcedure) obj;
return this.currentReplicaIndex == procedure.currentReplicaIndex
&& this.deleteTaskSeq == procedure.deleteTaskSeq
&& this.deleteTaskSubmitted == procedure.deleteTaskSubmitted
&& Objects.equals(this.regionReplicaSet, procedure.regionReplicaSet);
}

@Override
public int hashCode() {
return Objects.hash(regionReplicaSet, currentReplicaIndex);
return Objects.hash(regionReplicaSet, currentReplicaIndex, deleteTaskSeq, deleteTaskSubmitted);
}

@Override
Expand All @@ -279,6 +377,10 @@
+ regionReplicaSet
+ ", currentReplicaIndex="
+ currentReplicaIndex
+ ", deleteTaskSeq="
+ deleteTaskSeq
+ ", deleteTaskSubmitted="
+ deleteTaskSubmitted
+ '}';
}
}
Loading
Loading