torch.distributed.elastic.rendezvous.c10d_rendezvous_backend 源代码
# mypy: 允许未类型化定义
版权所有(c)Facebook,Inc.及其关联公司
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。
导入 binascii
导入
记录日志
导入
操作系统
导入 tempfile
来自 base64
导入
base64 解码,
base64 编码
来自 datetime
导入
时间差
来自
打字
导入
任意,
角色,
可选
来自 torch.distributed
导入
文件存储,
存储,
TCP 存储
来自 torch.distributed.elastic.events
导入
构建并记录会话事件,
节点状态
来自 .api
导入 (
约会连接错误,
预约错误,
会合参数,
预约状态错误,
)
来自
.动态 rendezvous
导入
rendezvous 后端,
令牌
来自
.工具
导入 _matches_machine_hostname,
解析预约端点
日志记录器 =
记录日志.
获取日志记录器(__name__)
TCP 存储的默认端口
默认端口 = 29400
[文档]
类
C10d rendezvous 后端(
rendezvous 后端):
表示基于 C10d 的会合后端。
参数:
存储:
使用 :py:class:`torch.distributed.Store` 实例。
与 C10d 存储进行通信。
run_id:
约会运行的 ID。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
# 在__init__方法中查看解释。
_NULL_SENTINEL = "Y2FuaW1hZGFt"
_store: 店铺
_键:
字符串
def 初始化(
自身,
店铺:
存储, run_id:
字符串) ->
无:
如果 not run_id:
提升 ValueError(
"运行 ID 必须是非空字符串。")
自身._store =
存储
自身.
_键 = "torch.rendezvous." +
运行 ID
# 存储的读取操作会阻塞调用者,直到指定的
# 可用性成为关键。这种行为使得它难以作为一个常规的键值字典使用。
# 作为一种解决方案,我们最初设置了一个哨兵值作为汇合状态。
#
# 当这个值被返回时,我们将其视为 None。
# 作为一种解决方案,我们最初设置了一个哨兵值作为汇合状态。
自身._call_store("compare_set",
自身.
_键, "",
自身._NULL_SENTINEL)
@property
def 名称(
自身) ->
字符串:
"查看基类。"
返回 "c10d"
[文档] 获取状态(self) -> Optional[tuple[bytes, Token]]:
"""查看基类。"""
base64_state: bytes = self._call_store("get", self._key)
return self._decode_state(base64_state)
[文档] def 设置状态(
self, 状态: bytes, 令牌: Optional[Token] = None
) -> Optional[tuple[bytes, Token, bool]]:
"""查看基类。"""
base64_state_str: str = b64encode(state).decode()
if token:
# 确定 token 无效时的快捷方式。
if not isinstance(token, bytes):
result = self.get_state()
if result is not None:
tmp = *result, False
# Python 3.6 不支持返回时的元组解包
#语句。
返回 tmp
返回 None
token = token.decode()
else:
token = self._NULL_SENTINEL
base64_state: bytes = self._call_store(
"compare_set", self._key, token, base64_state_str
)
state_token_pair = self._decode_state(base64_state)
if state_token_pair is None:
return None
新状态,新令牌 = 状态令牌对
# C10d Store 的 compare_set 方法没有提供一种简单的方式来找出
# 我们的写入尝试是否成功。作为一个暴力解决方案,我们
# 对我们的本地状态和远程状态进行位运算比较。
返回新状态、新令牌、新状态等于状态
def _call_store(自身,
存储操作:
字符串, *
参数, **kwargs) ->
任意:
try:
返回 getattr(
自身._store,
存储操作)(*
参数, **kwargs)
除了 (ValueError,
运行时错误,
超时错误)
作为
异常:
提升
约会连接错误(
"连接到 C10d 存储失败。请查看内部异常以获取详细信息。"
) 来自 exc
def _解码状态(
自身,
base64 状态:
字节) ->
可选[
元组[
字节,
令牌
]]
如果
base64 状态 ==
自身._NULL_SENTINEL.
编码():
返回 None
try:
状态 =
base64 解码(
base64 状态)
除了 binascii.
错误
作为
异常:
提升
预约状态错误(
状态对象已损坏。请查看内部异常以获取详细信息。
) 来自 exc
返回
状态,
base64 状态
def 创建 TCP 存储(
参数:
会合参数) ->
TCP 存储:
主机,
端口 =
解析 rendezvous 端点(
参数.
端点,
默认端口=DEFAULT_PORT)
cfg_is_host = 参数.get_as_bool("is_host")
如果用户明确指定了我们的进程是否应该托管存储,请尊重它。
# the store, respect it.
如果 cfg_is_host
是 not
无:
is_host = cfg_is_host
否则尝试根据我们的主机名确定我们是否是主机
IP 地址。
否则:
是主机。 = _matches_machine_hostname(host)
超时
读取超时 =
角色(int,
参数.
获取整数值(
"读取超时", 60))
如果 read_timeout
≤ 0:
提升 ValueError(
读取超时必须是一个正整数。)
在特定情况下,我们尝试两次实例化存储。详情
请参阅下面的异常子句中的解释。
为
是否为服务器
在 [is_host,
错误
]:
try:
存储 =
TCP 存储(
主机,
端口,
是否为主机=
是否为服务器,
多租户=True,
超时=
时间差(
秒数=read_timeout),
)
如果
是否为服务器:
msg = f"进程"{
操作系统.
获取进程 ID()}
承载 C10d rendezvous 后端的 TCP 存储。
构建并记录会话事件(
run_id=参数.run_id,
消息=
信息,
节点状态=
节点状态.
初始化
)
记录器.
信息(
信息)
断开
除了 (ValueError,
运行时错误,
超时错误)
作为
异常:
如果我们启发式地推断 is_host 的值为 True,并且我们第一次尝试实例化 TCP 存储失败,再试一次。
如果第一次尝试实例化 TCP 存储失败,再试一次。
# more time with is_host 设置为 False。作为一个边缘情况,可以有
同一会合中的多个进程
# 机器中只有一个最终会托管商店。
如果 not
是服务器
或 cfg_is_host
是 not
无:
提升
约会连接错误(
"C10d 存储连接失败。请查看内部异常以获取详细信息。"
) 来自 exc
返回
存储 # type: ignore[possibly-undefined]
def _create_file_store(参数:
会合参数) ->
文件存储:
如果用户指定了端点,我们将其视为文件的路径。
如果
参数.
端点:
路径 =
参数.
端点
否则:
try:
# 临时文件只对用户可读可写
# 此进程。
_, 路径 =
临时文件.mkstemp()
除了 OSError
作为
异常:
提升
预约错误(
"C10d 存储的文件创建失败。请查看内部异常详情。"
) 来自 exc
try:
存储 =
文件存储(
路径)
除了 (ValueError,
运行时错误)
作为
异常:
提升
约会连接错误(
"连接到 C10d 存储失败。请查看内部异常详情。"
) 来自 exc
返回
存储
[文档]def create_backend(
参数:
会合参数) ->
元组[
C10d rendezvous 后端,
存储
]:
从指定的参数创建一个新的 :py:class:`C10dRendezvousBackend`。
+--------------+-----------------------------------------------------------+
| Parameter | 描述 |
+==============+===========================================================+
| store_type | C10d 存储的类型。目前支持的类型 |
| | "tcp" 和 "file" 对应于 |
| | :py:class:`torch.distributed.TCPStore` 和 |
| | :py:class:`torch.distributed.FileStore`,分别。|
默认为 "tcp"。
+--------------+-----------------------------------------------------------+
读取超时 | 读取超时,以秒为单位,用于存储操作。
默认为 60 秒。
| | |
请注意,这仅适用于
`torch.distributed.TCPStore` 不相关。
`torch.distributed.FileStore` 不接受超时参数。
`is_host` 是一个布尔值,表示是否为该后端实例。
+--------------+-----------------------------------------------------------+
`is_host` 是一个布尔值,表示该后端实例是否为主机。
| | 将托管 C10d 存储。如未指定,则默认使用 |
通过匹配主机名或 IP 推断的启发式方法
| | 本机地址与指定的会合点地址进行比对 |
| | 端点。默认为 `None`。 |
| | |
请注意,此配置选项仅适用于
| | :py:class:`torch.distributed.TCPStore`. 在正常情况下 |
| | 在您可安全跳过的环境下;唯一需要的时候 |
| | 如果其值无法正确显示则需要 |
| | 确定地(例如,会合端点有一个 CNAME 作为 |
| | 主机名或与机器的 FQDN 不匹配)。 |
+--------------+-----------------------------------------------------------+
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
截至今天,我们仅支持 TCPStore 和 FileStore。其他存储类型暂不支持。
还没有实现所需的功能(例如 compare_set)。
店铺类型 =
参数.
获取(
"店铺类型", "tcp").strip().
小写()
店铺:
店铺
try:
如果
店铺类型 ==
文件:
存储 =
创建文件存储(
参数)
elif 店铺类型 == "tcp":
存储 =
_创建_tcp 存储(
参数)
否则:
提升 ValueError(
"提供的存储类型无效。目前仅支持文件和 tcp。"
)
后端 =
C10d rendezvous 后端(
店铺,
参数.run_id)
除了
异常
作为 e:
构建并记录会话事件(
消息=f"{
类型(e).__name__}: {
字符串(e)}",
run_id=参数.run_id,
节点状态=
节点状态.
失败,
)
提升
返回
后端,
店铺