快捷键

torch.distributed.elastic.rendezvous.etcd_store 的源代码

# mypy: 允许未类型化定义
版权所有(c)Facebook,Inc.及其关联公司
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。

导入 datetime
导入 随机
导入 时间
来自 base64 导入 base64 解码, base64 编码
来自 打字 导入 可选

# pyre-ignore[21]: 在`torch.distributed`中找不到名称`Store`。
来自 torch.distributed 导入 店铺


try:
    导入 etcd  # type: ignore[import]
除了 模块未找到错误:
    来自 . 导入 _etcd_stub 作为 etcd


# 延迟(休眠)一小段时间以减少 CAS 失败。
这不会影响正确性,但会减少对 etcd 服务器的请求。
def cas 延迟():
    时间.睡眠(随机.均匀(0, 0.1))


# pyre-fixme[11]: 注解 `Store` 未定义为类型。
[文档] Etcd 存储(存储): "" 通过在 rendezvous etcd 实例上附加来实现 c10 Store 接口。 这是 ``EtcdRendezvous`` 返回的存储对象。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` def 初始化( 自身, etcd 客户端, etcd 存储前缀, # 默认超时与 c10d/Store.hpp 中相同 超时: 可选[日期时间.时间差] = , ): 超级().初始化() # 用于 pybind trampoline 的必需项 自身.客户端 = etcd 客户端 自身.前缀 = etcd 存储前缀 如果 超时 not : 自身.设置超时(超时) 如果 not 自身.前缀.以...结尾(根目录): 自身.前缀 += 根目录
[文档] def 设置(self, 键, 值): """ 将键值对写入 `EtcdStore`。 键和值可以是 Python 的 `str` 或 `bytes` 类型。 """ self.client.set(key=self.prefix + self._encode(key), value=self._encode(value))
[文档] def get(self, key) -> bytes: """ 通过键获取值,可能需要进行阻塞等待。 如果键不是立即存在的,将进行阻塞等待 对于最多 ``timeout`` 持续时间或直到键被发布。 返回: 值(字节) 抛出: 查找错误 - 如果在超时后键仍未发布 """ b64_key = self.prefix + self._encode(key) kvs = self._try_wait_get([b64_key]) 如果 kvs 为 None: 抛出 LookupError 异常(f"在 EtcdStore 中未找到键 {key} ") 返回 self._decode(kvs[b64_key])
[文档] def add(self, key, num: int) -> int: "空字符串" 以整数形式原子地增加一个值。 整数以 10 进制字符串表示。如果键不存在, 将假定默认值为 ``0``。 返回值: 新的(递增的)值 “” b64_key = self._encode(key) # c10d Store 假设值是一个以十进制字符串表示的整数 try: # 假设默认值为"0",如果这个键尚未设置: node = self.client.write( key=self.prefix + b64_key, value=self._encode(str(num)), # 即 0 + num prevExist=False, ) return int(self._decode(node.value)) except etcd.EtcdAlreadyExist: pass while True: 因此,c10d Store 没有删除键的方法,所以我们可以确信它仍然存在。 因此,我们可以确信它仍然存在。 node = self.client.get(key=self.prefix + b64_key) new_value = self._encode(str(int(self._decode(node.value)) + num)) try: node = self.client.test_and_set( key=node.key, value=new_value, prev_value=node.value ) return int(self._decode(node.value)) except etcd.EtcdCompareFailed: cas_delay()
[文档] def wait(self, keys, override_timeout: Optional[datetime.timedelta] = None): """ 等待所有密钥发布完成,或者直到超时。 Raises: LookupError - 如果发生超时 """ b64_keys = [self.prefix + self._encode(key) for key in keys] kvs = self._try_wait_get(b64_keys, override_timeout) if kvs is None: 引发 LookupError("在等待 EtcdStore 中的键时超时")
# 成功时无返回值
[文档] def check(self, keys) -> bool: 检查所有键是否立即存在(无需等待)。 b64_keys = [self.prefix + self._encode(key) for key in keys] kvs = self._try_wait_get( b64_keys, override_timeout=datetime.timedelta(microseconds=1), # 看起来就像没有等待 ) 返回 kvs 不为 None
# # 将键值数据编码为 base64,以便我们可以存储任意二进制数据 # 在 EtcdStore 中。输入可以是 `str` 或 `bytes`。 # In case of `str`, utf-8 encoding is assumed. # def 编码
(自身, ) -> 字符串: 如果 类型() == 字节: 返回 base64 编码().解码() elif 类型() == 字符串: 返回 base64 编码(.编码()).解码() 提升 ValueError("值必须是 str 或 bytes 类型") # 解码一个 base64 字符串(类型为`str`或`bytes`) 返回类型为`bytes`,与 Store 接口一起使用更方便。 # def _decode(自身, ) -> 字节: 如果 类型() == 字节: 返回 base64 解码() elif 类型() == 字符串: 返回 base64 解码(.编码()) 提升 ValueError("值必须是类型为 str 或 bytes") # 一次性获取所有(base64 编码的)etcd 键,或等待直到所有键 # 已发布或超时发生。 这是一个公共接口方法的辅助方法。 # 成功时,返回一个字典,其中包含 {etcd 键 -> etcd 值}。 超时时返回 None。 # def _try_wait_get(自身, b64_keys, 覆盖超时=): 超时 = 自身.超时 如果 覆盖超时 None 否则 覆盖超时 # 类型: 忽略[attr-defined] 截止日期 = 时间.时间() + 超时.total_seconds() while True: # 读取整个目录(键),仅过滤等待的键 所有节点 = None try: 所有节点 = 自身.客户端.获取(=自身.前缀) 请求节点 = { 节点.: 节点.value 节点 所有节点.子代 如果 节点.key b64 密钥 } 如果 长度(请求节点) == 长度(b64 密钥): 所有密钥均可用 返回 节点请求 除了 etcd.Etcd 键未找到: 通过 监视超时 = 截止日期 - 时间.时间() 如果 监视超时 0: 返回 None try: 索引 = 所有节点.etcd 索引 + 1 如果 所有节点 否则 0 自身.客户端.查看( =自身.前缀, 递归=True, 超时=监视超时, 索引=索引, ) 除了 etcd.Etcd 监视超时: 如果 时间.时间() >= 截止日期: 返回 None 否则: 继续 除了 etcd.Etcd 事件索引已清除: 继续

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源