torch.xpu.streams 的源代码
# mypy: 允许未类型化定义
导入 ctypes
导入
火炬
from torch._utils 导入 _dummy_type
如果
不
有属性(
火把._C,
_XpuStreamBase):
定义虚拟基类
火把._C.
字典[
_XpuStreamBase] = _dummy_type(
_XpuStreamBase)
火把._C.
字典[
_XpuEventBase] = _dummy_type(
_XpuEventBase)
[文档]
类
流(
火把._C._XpuStreamBase):
rXPU 流包装器。
XPU 流是特定于某个执行流的线性执行序列。
设备,独立于其他流。它支持 with 语句作为上下文管理器,以确保 with 块内的操作在相应的流上运行。
上下文管理器,以确保 with 块内的操作在相应的流上运行。
在相应的流上。
参数:
device(torch.device 或 int,可选):在指定设备上分配。
流。如果 :attr:`device` 是 ``None``(默认)或负数
整数,这将使用当前设备。
优先级(int, 可选): 流的优先级,可以是正数、0 或负数。
数字越小,优先级越高。默认情况下,优先级设置为 0。
如果值超出允许的优先级范围,它将自动
映射到最近的合法优先级(对于大的正数或
最高值(对于大负数)。
"文档"
def __new__(类,
设备=
无,
优先级=0, **kwargs):
设备管理器设置成本高昂,因此除非必要,我们才避免使用它
如果
设备
是
无
或者 ("stream_id"
在 kwargs
并且 "device_index"
在 kwargs):
返回
超级().__new__(
类,
优先级=
优先级, **kwargs)
否则:
与
火把.xpu.
设备(
设备):
返回
超级().__new__(
类,
优先级=
优先级, **kwargs)
[文档] def wait_event(self, event) -> None:
将所有提交到流的未来工作等待事件。
参数:
event (torch.xpu.Event): 等待的事件。
"""
event.wait(self)
[文档] def wait_stream(self, stream) -> None:
r"""与另一个流同步。
所有未来提交到此流的作业将等待,直到调用时所有提交给给定流的内核完成。
提交给给定流的作业在调用时将等待,直到所有内核完成。
Args:
stream (流): 用于同步的流。
"""
self.wait_event(stream.record_event())
[文档] def record_event(self, event=None):
r"""记录事件。
参数:
event (torch.xpu.Event, 可选): 要记录的事件。如果未提供,则创建一个新的。
将被分配。
返回:
记录的事件。
"""
如果事件为空:
事件 = Event()
事件.record(self)
返回事件
[文档] def 查询(self) -> bool:
r"""检查提交的所有工作是否已完成。
返回:
一个布尔值,表示此流中所有内核是否已完成。
""
返回 super().query()
[文档] def 同步(self) -> None:
r"""等待此流中所有内核完成。"""
super().同步()
@property
def _as_parameter_(我):
返回 ctypes.c_void_p(
我.
sycl 队列)
def __等于__(
我, o):
如果 isinstance(o,
流):
返回
超级().
__等于__(o)
返回
假
def __哈希__(
我):
返回
哈希((
我.
sycl 队列,
我.
设备))
def __repr__(我):
返回 f
"torch.xpu.Stream(设备={
我.
设备}
sycl 队列={
我.
sycl 队列:#x})"
[文档]
类
活动(
火把._C._XpuEventBase):
rXPU 事件的基础包装器。
XPU 事件是同步标记,可用于监控设备的进度,以及同步 XPU 流。
XPU 事件是同步标记,可用于监控设备的进度,以及同步 XPU 流。
底层 XPU 事件在事件首次初始化时是懒加载的
录制。创建后,只有同一设备上的流才能录制
事件。然而,任何设备上的流都可以等待该事件。
参数:
启用计时(bool,可选):指示事件是否应测量时间
(默认:``False``)
"文档"
def __new__(类,
启用计时=
错误):
返回
超级().__new__(
类,
启用计时=
启用计时)
[文档] def record(self, stream=None) -> None:
记录给定流中的事件。
如果未指定流,则使用 ``torch.xpu.current_stream()``。
流的设备必须与事件的设备匹配。
"""
如果流为空:
流 = torch.xpu.current_stream()
super().record(stream)
[文档] def wait(self, stream=None) -> None:
r"""使所有提交给指定流的后续工作等待此事件。
使用 ``torch.xpu.current_stream()`` 如果未指定流。
"""
如果 stream 为 None:
stream = torch.xpu.current_stream()
super().wait(stream)
[文档] def 查询(self) -> bool:
r"""检查当前由事件捕获的所有工作是否已完成。
返回值:
一个布尔值,表示当前由事件捕获的所有工作是否已完成
已完成。
"""
返回 super().query()
[文档] def elapsed_time(self, end_event):
r"""返回经过的时间。
报告的时间为事件记录后的毫秒数,
在记录 end_event 之前。
"""
返回 super().elapsed_time(end_event)
[文档] def 同步(self) -> None:
等待事件完成。
等待此事件中捕获的所有工作完成。
这防止了 CPU 线程在事件完成之前继续执行。
```python
# 输入文本
input_text = '"""'
# 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 假设的翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
super().synchronize()
@property
def _as_parameter_(我):
返回 ctypes.c_void_p(
我.sycl_event)
def __repr__(我):
如果
我.sycl_event:
返回 f"torch.xpu.Event(sycl_event={
我.
sycl 事件:#x})"
否则:
返回
"torch.xpu.Event(未初始化)"