Skip to content

[RL][Feature] Add GDR streaming weight update path#7951

Open
jackyYang6 wants to merge 2 commits into
PaddlePaddle:developfrom
jackyYang6:rl/update-weights-gdr
Open

[RL][Feature] Add GDR streaming weight update path#7951
jackyYang6 wants to merge 2 commits into
PaddlePaddle:developfrom
jackyYang6:rl/update-weights-gdr

Conversation

@jackyYang6
Copy link
Copy Markdown
Contributor

Motivation

当前 RL 场景下的模型权重更新仅支持
RDMA(通过共享内存拷贝)方式,需要先将权重落盘再加载到 GPU。本 PR 新增 GPU Direct
RDMA (GDR) 流式权重更新路径,利用 CheckpointTransfer 库的 GPU Direct
能力,实现训练侧权重直接流式传输到推理侧 GPU 显存,避免中间落盘和 CPU-GPU
拷贝开销,显著降低权重更新延迟。

Modifications

  1. fastdeploy/rl/dynamic_weight_manager.py:

    • 新增 update_weights_by_gdr() 方法,基于 CheckpointTransfer 的 GPU_DIRECT
      后端接收流式权重
    • 实现 _receive_gdr_weights_as_sync_iterator():将异步 receive_weights
      转为同步迭代器,支持 prefetch queue 预取
    • 实现 _load_models_from_weight_iterator():支持主模型 + MTP
      辅助模型的流式加载,MTP 权重按 chunk 分批加载
    • 新增 GDR debug digest 工具方法,支持通过 MD5 校验验证传输正确性
    • 重构 _resolve_weight_update_version()_resolve_transfer_mode()
      为公共方法
    • 修复 ProcessGroupGlooshutdown() 方法导致的 clear_parameters 异常
  2. fastdeploy/worker/gpu_model_runner.py:

    • update_weights() 新增 transfer_mode 参数,支持 GDR 模式
    • GDR 模式下可选释放 CUDA Graph / KV Cache / MTP Cache
      以腾出显存(gdr_release_cache 配置)
    • 更新完成后自动重建 KV Cache、CUDA Graph 和 routing replay
  3. fastdeploy/worker/gpu_worker.py / metax_worker.py /
    metax_model_runner.py
    :

    • 透传 transfer_mode 参数到 model runner 层
  4. fastdeploy/entrypoints/openai/api_server.py:

    • /v1/update_weights 接口新增 transfer_mode 参数(可选值:rdmagdr
  5. fastdeploy/model_executor/utils.py:

    • 新增 _is_gdr_dynamic_load_config()
      _copy_gdr_transposed_weight_attrs() 工具函数
    • GDR 模式下 process_weight_transpose 自动复制 weight loading
      属性到转置后的参数,确保流式加载时 weight_loader 能正确切分
  6. 测试:

    • 新增 tests/rl/test_dynamic_weight_gdr.py:单元测试,覆盖 GDR
      迭代器、MTP 分块加载、debug digest、异常处理等
    • 新增 tests/engine/test_common_engine.py:GDR 模式下 version mismatch
      校验测试

Usage or Command

# 通过 API 接口触发 GDR 权重更新
curl -X POST http://localhost:8000/v1/update_weights \
  -H "Content-Type: application/json" \
  -d '{"version": "step_100", "transfer_mode": "gdr"}'

# 或通过 rsync_config 配置默认使用 GDR 模式
# 启动参数中设置:
--rsync_config '{"transfer_mode": "gdr", "gpu_direct": true,
"gdr_mtp_chunk_size": 1, "gdr_prefetch_queue_size": 2}'

Accuracy Tests

本 PR 不涉及模型前向计算逻辑变更,权重更新后模型输出与 RDMA 方式一致。GDR
传输正确性通过以下方式验证:

  • 内置 debug digest 机制(gdr_debug_tensor_digest=true),对比传输前后权重 MD5
  • 单元测试覆盖流式加载、MTP 分块、异常中断等场景

Checklist

  • Add at least a tag in the PR title.
    • Tag list: [[FDConfig],[APIServer],[Engine], [Scheduler], [PD Disaggregation], [Executor], [Graph Optimization], [Speculative Decoding], [RL], [Models], [Quantization], [Loader], [OP], [KVCache], [DataProcessor], [BugFix], [Docs], [CI], [Optimization], [Feature], [Benchmark], [Others], [XPU], [HPU], [GCU], [DCU], [Iluvatar], [Metax]]
    • You can add new tags based on the PR content, but the semantics must be clear.
  • Format your code, run pre-commit before commit.
  • Add unit tests. Please write the reason in this PR if no unit tests.
  • Provide accuracy results.
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch, then cherry-pick it to the release branch with the [Cherry-Pick] PR tag.

@paddle-bot
Copy link
Copy Markdown

paddle-bot Bot commented May 28, 2026

Thanks for your contribution!

Replace the custom async-to-sync GDR iterator with checkpoint_transfer's
built-in receive_weights_sync API. Introduce FD_USE_CHECKPOINT_TRANSFER
env flag to enable the unified path, which uses load_strategy to
distinguish GDR (GPU_DIRECT) vs IPC backends internally.

Key changes:
- Add update_weights_by_ct() and _build_ct_transfer_config() methods
- Remove update_weights_by_gdr, _receive_gdr_weights_as_sync_iterator,
  _resolve_transfer_mode (replaced by CT unified path)
- Remove transfer_mode param from update_weights_by_rdma
- Revert metax GDR additions (metax doesn't use CT path)
- IPC mode: always clear cache + rebuild CUDA graph (shared GPU)
- GDR mode: skip cache clear by default (training-inference separation)
- Rewrite tests to cover both GDR and IPC CT paths

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
PaddlePaddle-bot

This comment was marked as outdated.

Copy link
Copy Markdown

@PaddlePaddle-bot PaddlePaddle-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Paddle-CI-Agent | pr_review | 2026-05-29 19:25:47

📋 Review 摘要

PR 概述:新增 GPU Direct RDMA (GDR) 流式权重更新路径,通过 CheckpointTransfer 库实现训练侧权重直接流式传输到推理侧 GPU 显存
变更范围fastdeploy/rl/fastdeploy/worker/fastdeploy/model_executor/
影响面 Tag[RL] [Loader]

问题

级别 文件 概述
🟡 建议 gpu_model_runner.py:3144 RDMA 路径新增 finalize_update() 调用未同步到 metax_model_runner.py
🟡 建议 dynamic_weight_manager.py:42 _tensor_to_numpy_for_digest 函数在两个文件中完全重复定义

历史 Findings 修复情况

Finding 问题 状态
PR 规范 PR 描述缺少必填段落 ✅ 已修复

📝 PR 规范检查

PR 标题 [RL][Feature] Add GDR streaming weight update path 格式合规,描述包含所有必填段落(Motivation、Modifications、Usage or Command、Accuracy Tests、Checklist),符合规范。

总体评价

整体实现思路清晰,GDR 流式权重更新路径设计合理,单测覆盖充分。建议关注多硬件同步和代码复用两个改进点。

return result
else:
result = self.dynamic_weight_manager.update_weights_by_rdma(version, verify_checksum)
self.dynamic_weight_manager.finalize_update()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 RDMA 分支新增了 finalize_update() 调用(验证参数 + 更新共享内存状态),但 metax_model_runner.py:2554update_weights() 仍为旧逻辑(直接 return,无 finalize)。

这是一个行为变更:之前 GPU 的 RDMA 路径也没有调用 finalize_update(),现在补上了。建议同步到 metax:

def update_weights(self, version: str = None, verify_checksum: bool = False):
    result = self.dynamic_weight_manager.update_weights_by_rdma(version, verify_checksum)
    self.dynamic_weight_manager.finalize_update()
    return result

)


def _tensor_to_numpy_for_digest(tensor):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 _tensor_to_numpy_for_digest 函数与 default_loader_v1.py 中的实现完全相同(含 cannot pickle 异常处理逻辑)。

建议提取到公共模块(如 fastdeploy/model_executor/utils.py)避免重复维护,后续修 bug 只需改一处。

@PaddlePaddle-bot
Copy link
Copy Markdown

PaddlePaddle-bot commented May 29, 2026

🤖 Paddle-CI-Agent | ci_status_monitor | 2026-05-29 21:16:43

CI报告基于以下代码生成(30分钟更新一次):


1 任务总览

当前 CI 存在 3 个 required 任务失败,需优先处理后方可合并。

总执行(rerun次数) 总任务 ✅ 通过 ❌ 失败 ⏳ 运行中 ⏸️ 等待中 跳过
40(0) 40 35 5 0 0 0

2 任务状态汇总

2.1 Required任务 : 7/10 通过

必选任务阻塞合并,失败需优先处理。

状态 任务 耗时 根因 修复建议 日志 重跑
Run FastDeploy Unit Tests and Coverage / run_tests_with_coverage 1h32m 不稳定问题:engine_worker_queue IPC 管理进程崩溃,ConnectionResetError 请 rerun Job -
Pre Commit 35s PR问题:PR代码未通过 pre-commit 代码风格检查 本地运行 pre-commit 修复代码格式后重新提交 Job -
Approval 20s 需要 Approval 请通过人工审批 Job -
其余 7 个必选任务通过 - - - - -

2.2 可选任务 — 28/30 通过

可选任务不阻塞合并,失败仅供参考。

状态 任务 耗时 日志 重跑
Run iluvatar Tests / run_iluvatar_cases 1m57s Job -
Trigger Jenkins for PR 21s Job -
其余 28 个可选任务通过 - - -

3 失败详情(仅 required)

Run FastDeploy Unit Tests and Coverage / run_tests_with_coverage — 基础设施(置信度: 中)

Run FastDeploy Unit Tests and Coverage / run_tests_with_coverage

  • 状态: ❌ 失败
  • 错误类型: 基础设施
  • 置信度: 中
  • 根因摘要: engine_worker_queue IPC 管理进程崩溃,ConnectionResetError
  • 分析器: ci_analyze_unittest_fastdeploy

失败用例:

测试 错误 根因
tests/e2e/test_ernie_03b_pd_router_v1_ipc.py::test_non_chat_usage_non_stream AttributeError: 'NoneType' object has no attribute 'json' 服务端 IPC 崩溃导致请求超时,send_request 返回 None

根因详情:
e2e 推理服务(prefill worker)在测试执行约 4 分钟后(20:00:04)发生 IPC 通信崩溃。engine_worker_queue 的多进程 Manager(共享状态服务端)意外退出,导致 worker 进程(worker_process.py:484 event_loop_normal)和引擎进程同时收到 ConnectionResetError: [Errno 104] Connection reset by peer,推理服务不可用。本 PR 的 GDR 代码路径需 FD_USE_CHECKPOINT_TRANSFER=1 才激活(CI 中未设置),且 shutdown_comm_group_if_worker_idle 参数不在本 PR diff 中,故判断与 PR 代码无直接关联。

关键日志:

ERROR 2026-05-29 20:00:04,650 common_engine.py Error happened while insert task to engine:
  [Errno 104] Connection reset by peer
  File "engine_worker_queue.py", line 501, in exist_tasks
    return self.exist_tasks_inter_signal.get() == 1
ConnectionResetError: [Errno 104] Connection reset by peer

# workerlog.0:
File "worker_process.py", line 484, in event_loop_normal
    if self.task_queue.exist_tasks():
ConnectionResetError: [Errno 104] Connection reset by peer
resource_tracker: There appear to be 10 leaked shared_memory objects to clean up at shutdown

修复建议:

  1. 请 rerun 此任务(IPC 间歇性崩溃,与 PR 变更无关)
  2. 若持续失败,排查 engine_worker_queue Manager 进程的稳定性和 shutdown_comm_group_if_worker_idle 参数配置

修复建议摘要: 不稳定问题,请 rerun

链接: 查看日志

Pre Commit — 代码规范(置信度: 高)

Pre Commit

  • 状态: ❌ 失败
  • 错误类型: 代码规范
  • 置信度: 高
  • 根因摘要: PR代码未通过pre-commit代码风格检查
  • 分析器: 通用分析(fallback)

根因详情:
pre-commit 检测到 PR 中多个文件存在代码格式问题,主要是 Python 代码的多行表达式需要使用括号包裹的多行语法。涉及文件包括 fastdeploy/rl/dynamic_weight_manager.pytests/rl/test_dynamic_weight_gdr.py 等共 6 个文件。

修复建议:

  1. 本地安装并运行 pre-commit:pip install pre-commit==4.2.0 clang-format==13.0.0 && pre-commit install
  2. 执行检查并修复:pre-commit run --files fastdeploy/rl/dynamic_weight_manager.py tests/rl/test_dynamic_weight_gdr.py ...
  3. 提交格式化后的代码

修复建议摘要: 本地运行pre-commit修复代码格式后重新提交

链接: 查看日志

Approval — 需要人工审批(置信度: 高)

Approval

该 Job 需要人工 Approval,完成审批后 CI 才会继续执行。

修复建议摘要: 请通过人工审批

链接: 查看日志

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 29, 2026

Codecov Report

❌ Patch coverage is 47.89644% with 161 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (develop@c1b7c08). Learn more about missing BASE report.

Files with missing lines Patch % Lines
fastdeploy/rl/dynamic_weight_manager.py 57.50% 77 Missing and 8 partials ⚠️
fastdeploy/worker/gpu_model_runner.py 6.66% 42 Missing ⚠️
...y/model_executor/model_loader/default_loader_v1.py 21.42% 32 Missing and 1 partial ⚠️
fastdeploy/model_executor/utils.py 95.45% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             develop    #7951   +/-   ##
==========================================
  Coverage           ?   67.50%           
==========================================
  Files              ?      467           
  Lines              ?    65491           
  Branches           ?    10068           
==========================================
  Hits               ?    44207           
  Misses             ?    18453           
  Partials           ?     2831           
Flag Coverage Δ
GPU 77.68% <47.89%> (?)
XPU 7.05% <0.00%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants