torch.distributed.elastic.rendezvous.etcd_server 源代码
#!/usr/bin/env python3
# mypy: 允许未类型化定义
版权所有(c)Facebook,Inc.及其关联公司
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。
导入 atexit
导入
记录日志
导入
操作系统
导入 shlex
导入 shutil
导入
套接字
导入 subprocess
导入 tempfile
导入
时间
来自
打字
导入
可选,
文本输入输出,
联合
try:
导入 etcd # type: ignore[import]
除了
模块未找到错误:
通过
日志记录器 =
记录日志.
获取日志记录器(__name__)
def 查找空闲端口():
""
查找一个空闲端口并将临时套接字绑定到该端口,以便在使用之前“保留”该端口。
.. 注意:: 返回的套接字在使用端口之前必须关闭,
否则将发生“地址已在使用”错误。
套接字应尽可能保持并关闭,
尽可能作为端口的消费者,否则,有更大的机会发生竞争条件,因为不同的进程可能会看到该端口是空闲的并占用它。
返回:绑定到保留空闲端口的套接字。
返回:绑定到保留空闲端口的套接字。
返回:绑定到保留空闲端口的套接字。
用法:
sock = find_free_port()
port = sock.getsockname()[1]
sock.close()
use_port(port)
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
地址 =
套接字.getaddrinfo(
host=本地主机,
端口=
无,
家族=
套接字.AF_UNSPEC,
类型=
套接字.SOCK_STREAM
)
为
地址
在 addrs:
family, 类型, proto, _, _ =
地址
try:
s = 套接字.
套接字(
家庭,
类型, proto)
s.绑定((
本地主机, 0))
s.监听(0)
返回 s
除了 OSError
作为 e:
s.关闭() # type: ignore[possibly-undefined]
打印(f
"套接字创建尝试失败:"{e}")
提升
运行时错误(
"创建套接字失败")
def 停止 etcd(
子进程,
数据目录:
可选[
字符串] =
无):
如果 subprocess
和
子进程.
轮询()
是
无:
记录器.
信息(
"正在停止 etcd 服务器")
子进程.
终止()
子进程.
等待()
如果
数据目录:
记录器.
信息(
删除 etcd 数据目录:%s",
数据目录)
shutil.rmtree(数据目录,
忽略错误=True)
[文档]
类
Etcd 服务器:
""
.. 注意:: 在 etcd 服务器 v3.4.3 上测试过。
启动和停止本地独立 etcd 服务器,随机占用空闲
端口。适用于单节点、多工作进程启动或测试,
在哪里使用边车 etcd 服务器比单独设置 etcd 服务器更方便。
。
此类注册了终止处理程序以在退出时关闭 etcd
子进程。此终止处理程序不是替代方案。
调用 `stop()` 方法。
以下回退机制用于查找 etcd 可执行文件:
1. 使用环境变量 TORCHELASTIC_ETCD_BINARY_PATH
2. 如果存在,则使用 `/bin/etcd`
使用来自“PATH”的`etcd`
使用
::
server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd")
server.start()
client = server.get_client()
使用客户端
server.stop()
参数:
etcd_binary_path: etcd 服务器二进制文件路径(见上文以获取备用路径)
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
def 初始化(
自身,
数据目录:
可选[
字符串] =
无):
自身.
端口 = -1
自身.
主机 =
本地主机
根 =
操作系统.
路径.
目录名(__file__)
默认 etcd 二进制文件 =
操作系统.
路径.
加入(
根,
bin/etcd)
自身._etcd_binary_path =
操作系统.
环境.
获取(
TORCHELASTIC_ETCD_BINARY_PATH, default_etcd_bin
)
如果 not
操作系统.
路径.
判断是否为文件(
自身.
etcd 二进制路径):
自身.
etcd 二进制路径 =
etcd
自身.
基础数据目录 = (
数据目录
如果
数据目录
否则
临时文件.mkdtemp(
前缀=
torchelastic_etcd 数据)
)
自身.
_etcd 命令 = None
自身.
_etcd 进程:
可选[
子进程.Popen] = None
def _get_etcd_server_process(自身) ->
子进程.Popen:
如果 not
自身.
_etcd 进程:
提升
运行时错误(
"未启动 etcd 服务器进程。请先调用 etcd_server.start()"
)
否则:
返回
自身.
_etcd 进程
def 获取端口(
自身) -> int:
"返回服务器运行的端口。"
返回
自身.
端口
def get_host(自身) ->
字符串:
"返回服务器运行的宿主机。"
返回
自身.
主机
def 获取端点(
自身) ->
字符串:
返回 etcd 服务器端点(主机:端口)。
返回 f"{
自身.
主机}:{
自身.
端口}"
def 开始(
自身,
超时:
整型 = 60,
重试次数:
整型 = 3,
标准错误输出:
联盟[int,
文本输入输出,
无] =
无,
) -> 无:
""
启动服务器,并等待其准备就绪。当此函数返回时,服务器已准备好接收请求。
参数:
超时:等待服务器准备就绪的时间(以秒为单位)
在放弃之前。
num_retries:启动服务器的重试次数。每次重试
将等待最大 ``timeout`` 时间,然后将其视为失败。
标准错误文件句柄。有效值包括
`subprocess.PIPE`,`subprocess.DEVNULL`,一个现有文件
描述符(一个正整数),一个现有文件对象,以及
`None`。
抛出异常:
超时错误:如果服务器在指定的超时时间内没有准备好
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
当前重试次数 = 0
while True:
try:
数据目录 =
操作系统.
路径.
加入(
自身.
_基本数据目录,
字符串(
当前重试次数))
操作系统.
创建多级目录(
数据目录, exist_ok=True)
返回
自身._start(
数据目录,
超时,
标准错误输出)
除了
异常
作为 e:
当前重试次数 += 1
停止 etcd(
自身.
_etcd 进程)
记录器.
警告(
"启动 etcd 服务器失败,发生错误:"%s
,正在重试,
字符串(e)
)
如果
当前重试次数 >=
重试次数:
shutil.rmtree(自身.
基础数据目录,
忽略错误=True)
提升
atexit.注册(
停止 etcd,
自身.
_etcd 进程,
自身.
_基本数据目录)
def _start(
自身,
数据目录:
字符串,
超时:
整型 = 60,
标准错误输出:
联盟[int,
文本输入输出,
无] = None
) -> 无:
线 =
查找空闲端口()
线路对端 =
查找空闲端口()
自身.
_端口号 =
套接字.
获取套接字名称()[1]
对等端口 =
套接字对等方.getsockname()[1]
etcd 命令 = shlex.
分割(
输入文本为空,请提供需要翻译的文本.
加入(
[
自身.
etcd 二进制路径,
--启用 v2,
--数据目录,
data_dir,
--监听客户端地址,
fhttp://{
自身.
_主机}:{
自身.
_端口}",
"--advertise-client-urls",
f"http://"{
自身._host}:{
自身._port}",
"--监听对等节点 URL",
f"http://"{
自身.
_主机}:{
对等端口}",
]
)
)
记录器.
信息(
启动 etcd 服务器:[%s
]],
etcd 命令)
套接字.
关闭()
套接字对等方.
关闭()
自身.
_etcd 进程 =
子进程.Popen(
etcd 命令,
关闭文件描述符=True,
标准错误输出=
标准错误输出)
自身.
等待就绪(
超时)
def 获取客户端(
自身):
返回一个 etcd 客户端对象,可用于向此服务器发送请求。
返回 etcd.
客户端(
host=自身._host,
端口=
自身.
端口,
版本前缀="/v2", read_timeout=10
)
def _等待就绪(
自身,
超时:
整型 = 60) ->
无:
客户端 = etcd.
客户端(
host=f"{自身.
主机}",
端口=
自身.
端口,
版本前缀="/v2", read_timeout=5
)
最大时间 =
时间.
时间() +
超时
while 时间.
时间() <
最大时间:
如果
自身.
_获取 etcd 服务器进程().
轮询()
是 not
无:
# etcd 服务器进程结束
exitcode = 自身.
获取 etcd 服务器进程().
返回码
提升
运行时错误(
f"etcd 服务器进程已退出,退出码:"{
退出码}"
)
try:
记录器.
信息(
"etcd 服务器已就绪。版本:"%s",
客户端.
版本)
返回
除了
异常:
时间.
睡眠(1)
提升
超时错误(
等待 etcd 服务器准备超时!)
def 停止(
自身) ->
无:
停止服务器并清理自动生成的资源(例如数据目录)。
记录器.
信息(
调用 EtcdServer 停止方法)
停止 etcd(
自身.
_etcd 进程,
自身.
_基本数据目录)