torch.distributed.elastic.rendezvous.etcd_store 的源代码
# mypy: 允许未类型化定义
版权所有(c)Facebook,Inc.及其关联公司
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。
导入 datetime
导入
随机
导入
时间
来自 base64
导入
base64 解码,
base64 编码
来自
打字
导入
可选
# pyre-ignore[21]: 在`torch.distributed`中找不到名称`Store`。
来自 torch.distributed
导入
店铺
try:
导入 etcd # type: ignore[import]
除了
模块未找到错误:
来自 .
导入 _etcd_stub
作为 etcd
# 延迟(休眠)一小段时间以减少 CAS 失败。
这不会影响正确性,但会减少对 etcd 服务器的请求。
def cas 延迟():
时间.
睡眠(
随机.
均匀(0, 0.1))
# pyre-fixme[11]: 注解 `Store` 未定义为类型。
[文档]
类
Etcd 存储(
存储):
""
通过在 rendezvous etcd 实例上附加来实现 c10 Store 接口。
这是 ``EtcdRendezvous`` 返回的存储对象。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
def 初始化(
自身,
etcd 客户端,
etcd 存储前缀,
# 默认超时与 c10d/Store.hpp 中相同
超时:
可选[
日期时间.
时间差] =
无,
):
超级().
初始化()
# 用于 pybind trampoline 的必需项
自身.
客户端 =
etcd 客户端
自身.
前缀 =
etcd 存储前缀
如果
超时
是 not
无:
自身.
设置超时(
超时)
如果 not
自身.
前缀.
以...结尾(
根目录):
自身.
前缀 +=
根目录
[文档] def 设置(self, 键, 值):
"""
将键值对写入 `EtcdStore`。
键和值可以是 Python 的 `str` 或 `bytes` 类型。
"""
self.client.set(key=self.prefix + self._encode(key), value=self._encode(value))
[文档] def get(self, key) -> bytes:
"""
通过键获取值,可能需要进行阻塞等待。
如果键不是立即存在的,将进行阻塞等待
对于最多 ``timeout`` 持续时间或直到键被发布。
返回:
值(字节)
抛出:
查找错误 - 如果在超时后键仍未发布
"""
b64_key = self.prefix + self._encode(key)
kvs = self._try_wait_get([b64_key])
如果 kvs 为 None:
抛出 LookupError 异常(f"在 EtcdStore 中未找到键 {key} ")
返回 self._decode(kvs[b64_key])
[文档] def add(self, key, num: int) -> int:
"空字符串"
以整数形式原子地增加一个值。
整数以 10 进制字符串表示。如果键不存在,
将假定默认值为 ``0``。
返回值:
新的(递增的)值
“”
b64_key = self._encode(key)
# c10d Store 假设值是一个以十进制字符串表示的整数
try:
# 假设默认值为"0",如果这个键尚未设置:
node = self.client.write(
key=self.prefix + b64_key,
value=self._encode(str(num)), # 即 0 + num
prevExist=False,
)
return int(self._decode(node.value))
except etcd.EtcdAlreadyExist:
pass
while True:
因此,c10d Store 没有删除键的方法,所以我们可以确信它仍然存在。
因此,我们可以确信它仍然存在。
node = self.client.get(key=self.prefix + b64_key)
new_value = self._encode(str(int(self._decode(node.value)) + num))
try:
node = self.client.test_and_set(
key=node.key, value=new_value, prev_value=node.value
)
return int(self._decode(node.value))
except etcd.EtcdCompareFailed:
cas_delay()
[文档] def wait(self, keys, override_timeout: Optional[datetime.timedelta] = None):
"""
等待所有密钥发布完成,或者直到超时。
Raises:
LookupError - 如果发生超时
"""
b64_keys = [self.prefix + self._encode(key) for key in keys]
kvs = self._try_wait_get(b64_keys, override_timeout)
if kvs is None:
引发 LookupError("在等待 EtcdStore 中的键时超时")
# 成功时无返回值
[文档] def check(self, keys) -> bool:
检查所有键是否立即存在(无需等待)。
b64_keys = [self.prefix + self._encode(key) for key in keys]
kvs = self._try_wait_get(
b64_keys,
override_timeout=datetime.timedelta(microseconds=1), # 看起来就像没有等待
)
返回 kvs 不为 None
#
# 将键值数据编码为 base64,以便我们可以存储任意二进制数据
# 在 EtcdStore 中。输入可以是 `str` 或 `bytes`。
# In case of `str`, utf-8 encoding is assumed.
#
def 编码(
自身,
值) ->
字符串:
如果
类型(
值) ==
字节:
返回
base64 编码(
值).
解码()
elif 类型(
值) ==
字符串:
返回
base64 编码(
值.
编码()).
解码()
提升 ValueError(
"值必须是 str 或 bytes 类型")
#
解码一个 base64 字符串(类型为`str`或`bytes`)
返回类型为`bytes`,与 Store 接口一起使用更方便。
#
def _decode(自身,
值) ->
字节:
如果
类型(
值) ==
字节:
返回
base64 解码(
值)
elif 类型(
值) ==
字符串:
返回
base64 解码(
值.
编码())
提升 ValueError(
"值必须是类型为 str 或 bytes")
#
一次性获取所有(base64 编码的)etcd 键,或等待直到所有键
# 已发布或超时发生。
这是一个公共接口方法的辅助方法。
#
成功时,返回一个字典,其中包含 {etcd 键 -> etcd 值}。
超时时返回 None。
#
def _try_wait_get(自身, b64_keys,
覆盖超时=
无):
超时 =
自身.
超时
如果
覆盖超时
是 None
否则
覆盖超时
# 类型: 忽略[attr-defined]
截止日期 =
时间.
时间() +
超时.total_seconds()
while True:
# 读取整个目录(键),仅过滤等待的键
所有节点 = None
try:
所有节点 =
自身.
客户端.
获取(
键=
自身.
前缀)
请求节点 = {
节点.
键:
节点.value
为
节点
在
所有节点.
子代
如果
节点.key
在
b64 密钥
}
如果
长度(
请求节点) ==
长度(
b64 密钥):
所有密钥均可用
返回
节点请求
除了 etcd.
Etcd 键未找到:
通过
监视超时 =
截止日期 -
时间.
时间()
如果
监视超时
≤ 0:
返回 None
try:
索引 =
所有节点.
etcd 索引 + 1
如果
所有节点
否则 0
自身.
客户端.
查看(
键=
自身.
前缀,
递归=True,
超时=
监视超时,
索引=
索引,
)
除了 etcd.
Etcd 监视超时:
如果
时间.
时间() >=
截止日期:
返回 None
否则:
继续
除了 etcd.
Etcd 事件索引已清除:
继续