Skip to content

fix distributed barrier imbalance in async_save_dcp#1852

Open
VincentCheungKokomo wants to merge 2 commits into
InternLM:mainfrom
VincentCheungKokomo:fix_distributed_barrier_imbalance
Open

fix distributed barrier imbalance in async_save_dcp#1852
VincentCheungKokomo wants to merge 2 commits into
InternLM:mainfrom
VincentCheungKokomo:fix_distributed_barrier_imbalance

Conversation

@VincentCheungKokomo

@VincentCheungKokomo VincentCheungKokomo commented May 28, 2026

Copy link
Copy Markdown
Contributor

commit d66ff05
fix(engine): fix distributed barrier imbalance in async_save_dcp

Three correctness fixes for async_save_dcp on multi-rank training:

  1. Add dist.barrier() after rank-0 rmtree+mkdir to prevent non-rank-0
    processes from writing into an incomplete directory while rank-0 is
    still cleaning up a stale .incomplete dir.

  2. Use dist.all_reduce(MAX) to broadcast the retry/fatal decision so
    all ranks agree before raising or retrying. Without this, ranks that
    classify the exception differently would deadlock at the barrier.

  3. Only query weights_dir.exists() on rank-0 and broadcast the result
    via dist.broadcast, ensuring all ranks raise FileExistsError (or
    proceed) together even under NFS cache inconsistency.

commit 7ee1557
[Fix] Fix async DCP checkpoint "received 0 items of ancdata" and add early failure detection

  1. Coalesce per-tensor shared memory into per-dtype buffers to reduce fd count
    from ~3000 to ~2 during daemon subprocess handoff, fixing the ancdata bug.

  2. Add warmup_async_save_dcp() to trigger daemon init before training starts,
    surfacing port conflicts (EADDRINUSE) immediately instead of mid-training.

  3. Add _check_async_save_health() to detect async save failures within one
    step rather than waiting until the next checkpoint interval.

  4. Allow snapshot saves to use async path.

@VincentCheungKokomo VincentCheungKokomo force-pushed the fix_distributed_barrier_imbalance branch from 00f88f4 to d66ff05 Compare May 28, 2026 13:34
# raise (or continue) together. Without this, NFS cache inconsistencies
# could cause some ranks to raise while others proceed to the barrier,
# resulting in a deadlock.
dir_exists = torch.tensor(int(weights_dir.exists() if dist.get_rank() == 0 else 0), dtype=torch.int32)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we only check if the directory exists on rank0?

@VincentCheungKokomo VincentCheungKokomo May 30, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This already only calls exists() on rank 0 (via weights_dir.exists() if dist.get_rank() == 0 else 0) — other ranks simply use 0.
The next line dist.broadcast(dir_exists, src=0, group=async_checkpoint_pg) then syncs rank 0's result to all ranks. This way all
ranks get a consistent decision and either all raise or all proceed together, avoiding a deadlock where NFS cache inconsistencies
cause some ranks to raise while others continue to the barrier.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants