torch.distributed 的源代码
# mypy: 允许未类型化定义
导入
记录日志
导入
调试器
导入
系统
导入
追踪回溯
导入
打字
导入
火炬
日志 =
记录.
获取日志记录器(__name__)
[文档]def is_available() -> bool:
"""
返回 ``True`` 如果分布式包可用。
否则,
`torch.distributed` 不暴露任何其他 API。目前,
`torch.distributed` 可在 Linux、MacOS 和 Windows 上使用。在从源代码构建 PyTorch 时,设置
`USE_DISTRIBUTED=1` 以启用它。
目前,Linux 和 Windows 的默认值是 `USE_DISTRIBUTED=1`,
``USE_DISTRIBUTED=0`` 用于 MacOS。
"""
返回 torch._C 是否具有 "_c10d_init" 属性
if 是否可用()
以及 not
PyTorch._C._c10d_init():
抛出 RuntimeError(
"初始化 torch.distributed 失败")
来自分布式包的自定义运行时错误
分布式错误 =
PyTorch._C.
_分布式错误
DistBackendError = PyTorch._C._DistBackendError
DistNetworkError = PyTorch._C._DistNetworkError
DistStore 错误 =
PyTorch._C.
_DistStore 错误_
if 是否可用():
from torch._C._分布式_c10d
导入 (
_广播合并_,
根据大小计算桶分配,
控制集体,
默认第一个桶的字节数,
创建 nccl_premul_sum,
注册内置通信钩子,
注册通信钩子,
存储集体,
测试 Python 存储,
_在各个流程中验证参数,
后端
作为
_后端,
内置通信钩子类型,
调试等级,
文件存储,
获取调试级别,
梯度桶,
记录器,
前缀存储,
流程组
作为
流程组,
减法器,
设置调试级别,
从环境设置调试级别,
存储,
TCP 存储,
工作
作为
工作,
)
类
分布式 Pdb(pdb.Pdb):
""
支持在多进程子进程中使用 PDB。
使用方法:
_DistributedPdb().set_trace()
"""
定义
交互(
我, *
参数, **
关键字参数):
标准输入 = sys.
标准输入
尝试:
sys.标准输入 =
打开(
/dev/标准输入)
蛋白质数据库.
蛋白质数据库.
交互作用(
我, *
参数, **
关键字参数)
最后:
sys.标准输入 =
内置标准输入
断点缓存:
字典[int,
输入法.
任何] = {}
[文档] def breakpoint(等级: int = 0, 跳过: int = 0):
```python
# 输入文本
input_text = '"""'
# 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 假设的翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
设置一个断点,但只在一个等级上。其他所有等级将等待你完成
完成断点前的操作,继续。
Args:
rank (int): 要中断的排名。默认值:``0``
skip (int): 跳过第一个 ``skip`` 次到这个断点的调用。默认值:``0``。
"""
if skip > 0:
key = hash(str(traceback.format_exc()))
counter = _breakpoint_cache.get(key, 0) + 1
_breakpoint_cache[key] = counter
如果计数器 <= 跳过:
记录警告("跳过断点,计数器=%d", 计数器)
返回
如果获取排名() == 排名:
pdb = _DistributedPdb()
pdb.message(
!!! 注意 !!!
输入 'up' 跳转到调用 dist.breakpoint(rank={rank}) 的帧
)
pdb.set_trace()
# 如果 TLS 中存在元/Python 密钥,我们希望确保我们忽略它们
# 并且调用默认的 CPU/CUDA 实现的重栅栏。
meta_in_tls = torch._C._meta_in_tls_dispatch_include()
guard = torch._C._DisableTorchDispatch() # type: ignore[attr-defined]
torch._C._set_meta_in_tls_dispatch_include(False)
try:
barrier()
finally:
torch._C._set_meta_in_tls_dispatch_include(meta_in_tls)
del guard
if sys.平台 !=
win32:
from torch._C._distributed_c10d 导入 HashStore
from .device_mesh 导入
设备网格,
初始化设备网格
# 以下划线开头的变量不会被自动导入
# 请参阅 `distributed_c10d.py` 中的注释,了解为什么我们公开 `_backend`
# 这
from .分布式_c10d
导入 * # noqa: F403
from .分布式_c10d
导入 (
_all_gather_base,
合并管理器,
合并管理器,
创建进程组包装器,
获取进程组名称,
_rank_not_in_group,
_reduce_scatter_base,
get_node_local_rank,
)
from .remote_device 导入
远程设备
from .会合点
导入 (
_从选项创建存储,
注册会合处理程序,
预约,
)
从环境变量设置调试级别()
else:
# 这个存根足以获取
# 运行测试:python test/test_public_bindings.py -k test_correct_module_names
即使 USE_DISTRIBUTED=0 也正常工作。请随意添加更多
如有必要添加占位符
我们不能直接定义占位符,因为它们会混淆 pyre
类 _ProcessGroupStub:
通过
sys.模块[
torch.distributed].
进程组 =
_进程组占位符
# 类型:忽略[已定义]