• 文档 >
  • 模块代码 >
  • torch >
  • torch.nn 注意力 >
  • torch.nn.attention.flex_attention
快捷键

torch.nn.attention.flex_attention 的源代码

# mypy: 允许未类型化定义
# flake8: 无 qa C101
本模块实现了 PyTorch 中 flex_attention 的用户接口。
导入 functools
导入 检查
导入 itertools
导入 数学
导入 操作符
导入 警告
来自 枚举 导入 枚举
来自 打字 导入 任意, 可调用, 可选, 联合

导入 火炬
来自 火炬 导入 张量
来自 torch._dynamo._trace_wrapped_higher_order_op 导入 TransformGetItemToIndex
来自 torch._higher_order_ops.flex_attention 导入 flex_attention 作为 flex_attention_hop
来自 torch._higher_order_ops.utils 导入 设置编译环境
来自 torch.fx.experimental.proxy_tensor 导入 (
    临时移除 torch 函数模式中的元数据,
    临时移除 torch 函数模式中的预分发,
)
来自 torch.nn.attention._utils 导入 支持的头维度, 验证 SDPA 输入
来自 torch.utils._pytree 导入 树图模式


__all__ = [
    块掩码,
    弹性注意力,
    创建块掩码,
    "创建掩码",
    "创建嵌套块掩码",
    "或掩码",
    "与掩码",
    "noop_mask",
]

_score_mod_signature = 可调用[[张量, 张量, 张量, 张量, 张量] 张量]
_mask_mod_signature = 可调用[[张量, 张量, 张量, 张量] 张量]


 _修改类型(枚举):
    修改函数类型的枚举。
- SCORE_MOD:接受分数作为第一个参数的 score_mod 函数
- mask_mod:不接受分数的 mask 函数,仅用于生成
块掩码
"源代码"

    分数修改 = 1
    遮罩修改 = 2
    未知 = 3


def 获取模块类型(函数: 可调用) -> _修改类型:
    获取修改函数的类型。
此函数检查函数的位置参数数量以确定
修改函数的类型。如果函数有 5 个位置参数,则被视为 score_mod 函数。如果函数有 4 个位置参数,则被视为 mask 函数。
被视为 score_mod 函数。如果函数有 4 个位置参数,则被视为 mask 函数。
被视为 mask 函数。
"源代码"
    位置参数数量 = 总和(
        1
         参数  检查.签名(函数).参数.()
        如果 参数.默认 == 检查.参数.空的
    )
    断言 位置参数数量 == 5  位置参数数量 == 4
    如果 位置参数数量 == 5:
        返回 _修改类型.分数修改
    elif 位置参数数量 == 4:
        返回 _修改类型.遮罩修改
    否则:
        返回 _修改类型.未知


需要在此处定义,以免 Dynamo 跳过
def _vmap_for_bhqkv(
    函数: 可调用,
    前缀: 元组[可选[int] ...]
    后缀: 元组[可选[int] ...] = (),
    输出维度: 联盟[int, 列表[可选[int]]] = 0,
    群组维度: 布尔类型 = 错误,
):
    用于在 4 维/5 维输入上 vmap score_mods 和 mask_mods。
在[b, hq, q_idx, kv_idx]或[b, hkv, g, q_idx, kv_idx]维度上进行映射。

    Args:
fn (可调用对象): 用于 vmap 的函数。
prefix (元组): vmap 的前缀。对于得分模函数,
这应该设置为(0,)。对于 mask_mods = ()
suffix (元组): 如果 gradOut 正在映射,我们需要添加(0,),
                        and (None,) * len(other_buffers).
out_dims (元组):对于正向情况,请保持默认值 0,因为我们只返回 1 个输出。对于反向,联合图返回 B、H、Q_idx、KV_idx 和 other_buffers 的梯度,
我们只返回 1 个输出。对于反向,联合图返回 B、H、Q_idx、KV_idx 和 other_buffers 的梯度,
对于反向,联合图返回 B、H、Q_idx、KV_idx 和 other_buffers 的梯度,
因此我们将此设置为(0, None, None, None, None) + (None,) * len(other_buffers)。

返回:
可调用:vmapped 函数。
"源代码"
    我们将一个函数 vamp 4 次,广播[b, h, q_idx, kv_idx]维度。
    维度: 列表[元组[ | int,  | int,  | int,  | int]] = []
    尺寸 = [
        (, , , 0),
        (, , 0, ),
        (, 0, , ),
    ]

    如果 群组维度:
        尺寸 += [
            (, 0, , ),
        ]

    尺寸 += [
        (0, , , ),
    ]

     dims  维度:
        fn = 火炬.vmap(函数, in_dims=前缀 + dims + 后缀, 输出维度=输出维度)  # type: ignore[arg-type]
    返回 fn


def _身份(
    分数: 张量,
    批量: 张量,
    : 张量,
    令牌_q: 张量,
    令牌_kv: 张量,
) -> 张量:
    返回 分数


[文档]def 无操作掩码( 批量: Tensor, 头: Tensor, 令牌_q: Tensor, token_kv: 张量, ) -> 张量: """返回一个空操作 mask_mod""" return batch.new_ones(size=(), dtype=torch.bool, device=batch.device)
_默认稀疏块大小 = 128 _大稀疏块大小 = 1 << 30 def _有序转稠密
(行内方块数: 张量, 列索引: 张量): 行数 = 列索引.shape[-2] 列数 = 列索引.shape[-1] 批维度 = 行内方块数.shape[-1] 设备 = 行内方块数.设备 def 创建密集型一(块数量, kv 索引): 稠密掩码 = kv 索引.新零(行数, 列数 + 1, 数据类型=火炬.int32) 行索引 = 火炬.arange(行数, 数据类型=火炬.int, 设备=设备).展平( -1 ) 列范围 = 火炬.arange(列数, 数据类型=火炬.int, 设备=设备) 索引掩码 = 列范围 < 块数量.展平(-1) 我们将数据写入一个“越界”的位置 有效索引 = 火炬.哪里(索引掩码, kv 索引, 列数) 设置 'a' 中的值在有效索引处为 1 稠密掩码[行索引, 有效索引] = 稠密掩码.新一(()) 返回 稠密掩码[:, :列数].连续() 创建密集批处理 = 创建密集型一 _ 范围(长度(批处理维度)): 创建密集批处理 = 火炬.vmap(create_dense_batched, in_dims=(0, 0)) out = create_dense_batched(行内方块数, 列索引) 返回 out def 稠密到有序(稠密掩码) -> 元组[张量, 张量]: 稠密掩码 = 稠密掩码.(数据类型=火炬.int32) 行中块数 = 稠密掩码.总和(暗淡=-1) 列索引 = 火炬.argsort(稠密掩码, 暗淡=-1, 下降的=True, 稳定=True) 返回 ( 行内方块数.(火炬.int32, 内存格式=火炬.连续格式), 列索引.(火炬.int32, 内存格式=火炬.连续格式), ) def _转置顺序(行内方块数: 张量, 列索引: 张量): 稠密 = _有序转稠密(行内方块数, 列索引) 返回 稠密到有序(密集.转置(-2, -1)) def 调整区块和索引数量( 块数: 张量, 索引: 张量, 新的行数: int, 新增列数: int, ): 索引 = 索引[:, :, :新的行数, :新增列数] 块数 = 块数[:, :, :新的行数] 块数 = 火炬.哪里(块数 < 新增列数, 块数, 新增列数) 块数 = 火炬.总和(索引 < 块数[:, :, :, ] 暗淡=-1).(火炬.int32) 返回 块数, 索引
[文档] 块掩码: r"" 块掩码是我们表示块稀疏注意力掩码的格式。 它在 BCSR 和非稀疏格式之间有所交叉。 基础 ------ 块稀疏掩码意味着,不是表示掩码中各个元素的稀疏性, 而是将 KV_BLOCK_SIZE x Q_BLOCK_SIZE 块视为稀疏,只有当该块中的所有元素都是稀疏时, 才认为该块是稀疏的。这与硬件相吻合,因为硬件通常期望执行, 这与硬件相吻合,因为硬件通常期望执行, 连续加载和计算。 这种格式主要优化了 1. 简单性,和 2. 内核 效率。值得注意的是,它**不是**针对大小进行优化的,因为这个掩码始终 减少到 KV_BLOCK_SIZE * Q_BLOCK_SIZE 的倍数。如果大小是 关注,通过增加块大小可以减小张量的大小。 我们格式的要点是: num_blocks_in_row: Tensor[ROWS]: 描述每行中存在的块的数量。 col_indices: Tensor[行数, 每列最大块数]: `col_indices[i]` 是第 i 行的块位置序列。这些值的 这一行在 `col_indices[i][num_blocks_in_row[i]]` 之后是未定义的。 例如,要从这种格式中重建原始张量: .. 代码块 :: python dense_mask = torch.zeros(ROWS, COLS) for row in range(ROWS): for block_idx in range(num_blocks_in_row[row]): dense_mask[row, col_indices[row, block_idx]] = 1 显然,这种格式使得沿着掩码的*行*进行缩减变得更容易。 详细信息 我们格式的基礎僅需要 kv_num_blocks 和 kv_indices。 ------- 但是,我们的格式基础需要 kv_num_blocks 和 kv_indices。 该物体最多有 8 个张量。这代表 4 对: 1. (kv_num_blocks, kv_indices):用于注意力前向传递 我们沿着 KV 维度进行降维。 2. [可选] (full_kv_num_blocks, full_kv_indices):这是可选的, 这纯粹是一种优化。结果证明,对每个块应用掩码是非常昂贵的! 如果我们具体知道哪些块是“满的”并且根本不需要掩码,那么我们可以跳过对这些块应用 mask_mod。 这需要用户从 mask_mod 中分离出一个单独的掩码。 这需要用户从 mask_mod 中分离出一个单独的掩码。 score_mod. 对于因果掩码,这大约提高了 15%的速度。 3. [生成] (q_num_blocks, q_indices): 需要用于反向传播, 因为计算 dKV 需要沿着 Q 维度迭代掩码。这些是从 1 自动生成的。 4. [生成] (full_q_num_blocks, full_q_indices): 与上面相同,但用于 反向传播。这些是从 2 自动生成的。 "源代码" 序列长度: 元组[int, int] 块数量: 张量 kv 索引: 张量 完整键值对区块数量: 可选[张量] 完整键值对索引: 可选[张量] q_num_blocks: 可选[张量] q 索引: 可选[张量] full_q_num_blocks: 可选[张量] 全量索引: 可选[张量] BLOCK_SIZE: 元组[int, int] 遮罩模块: _mask_mod_signature def 初始化( 自己, 序列长度: 元组[int, int] 块数量: 张量, kv 索引: 张量, 完整键值对区块数量: 可选[张量] 完整键值对索引: 可选[张量] q_num_blocks: 可选[张量] q 索引: 可选[张量] full_q_num_blocks: 可选[张量] 全量索引: 可选[张量] BLOCK_SIZE: 元组[int, int] 遮罩模块: _mask_mod_signature, ): 如果 kv 索引.暗淡() < 2: 提升 运行时错误("BlockMask 必须至少有 2 个维度") 断言 kv_num_blocks not , "必须提供 kv_num_blocks" 断言 kv 索引 not , "kv 索引必须提供" 断言 q 块数量 not , "q 块数量必须提供" 断言 q_indices not , "q_indices 必须提供" 断言 (full_kv_num_blocks ) == ( full_kv_indices ), "必须同时提供或省略 full_kv_num_blocks 和 full_kv_indices" 断言 (full_q_num_blocks ) == ( 全查询索引 ), "full_q_num_blocks 和 full_q_indices 必须同时提供或都不提供" 自己.序列长度 = 序列长度 自己.kv_num_blocks = kv_num_blocks 自己.kv 索引 = kv 索引 自己.full_kv_num_blocks = full_kv_num_blocks 自己.full_kv_indices = full_kv_indices 自己.q 块数量 = q 块数量 自己.q_indices = q_indices 自己.full_q_num_blocks = full_q_num_blocks 自己.全查询索引 = 全查询索引 自己.块大小 = 块大小 自己.面具模式 = 面具模式
[文档] @classmethod def 来自键值块( , 块数量: 张量, kv 索引: 张量, 完整键值对区块数量: 可选[张量] = , 完整键值对索引: 可选[张量] = , BLOCK_SIZE: 联盟[int, 元组[int, int]] = 默认稀疏块大小, 遮罩模块: 可选[_mask_mod_signature] = , 序列长度: 可选[元组[int, int]] = , ): "" 从键值块信息创建一个 BlockMask 实例。 Args: kv_num_blocks (Tensor):每行 Q_BLOCK_SIZE 瓦片中的 kv_blocks 数量。 kv_indices (Tensor):每行 Q_BLOCK_SIZE 瓦片中的键值块索引。 full_kv_num_blocks (Optional[Tensor]):每行 Q_BLOCK_SIZE 瓦片中的完整 kv_blocks 数量。 full_kv_indices (Optional[Tensor]):每行 Q_BLOCK_SIZE 瓦片中的完整键值块索引。 BLOCK_SIZE(联合整数,整数元组):KV_BLOCK_SIZE x Q_BLOCK_SIZE 瓦片的大小。 mask_mod(可选[可调用]):修改掩码的函数。 返回: BlockMask:通过 _transposed_ordered 生成的完整 Q 信息的实例。 抛出异常: RuntimeError:如果 kv_indices 维度小于 2。 断言错误:如果只提供了 full_kv_*参数中的一个。 "源代码" 如果 kv 索引.暗淡() < 2: 提升 运行时错误("BlockMask 必须至少有 2 个维度") 断言 (full_kv_num_blocks ) == ( full_kv_indices ), "必须同时提供或省略 full_kv_num_blocks 和 full_kv_indices" # 生成 q_num_blocks 和 q_indices q_num_blocks, q_indices = _转置顺序(块数量, kv 索引) 如果 full_kv_num_blocks not : 断言 full_kv_indices not full_q_num_blocks, 全查询索引 = _转置顺序( 完整键值对区块数量, full_kv_indices ) 否则: full_q_num_blocks, 全查询索引 = , 如果 isinstance(BLOCK_SIZE, int): 块大小 = (BLOCK_SIZE, BLOCK_SIZE) 面具模式 = 面具模式 如果 面具模式 not 否则 无操作面具 如果 序列长度 : q 长度 = kv 索引.shape[-2] * BLOCK_SIZE[0] kv 长度 = q 索引.shape[-2] * BLOCK_SIZE[1] 序列长度 = (q 长度, kv 长度) 返回 ( 序列长度=序列长度, 块数量=块数量, kv 索引=kv 索引, 完整键值对区块数量=完整键值对区块数量, 完整键值对索引=完整键值对索引, q_num_blocks=q_num_blocks, q 索引=q 索引, full_q_num_blocks=full_q_num_blocks, 全量索引=全量索引, BLOCK_SIZE=BLOCK_SIZE, 遮罩模块=遮罩模块, )
[文档] def as_tuple(self, flatten: bool = True): """ 返回 BlockMask 属性的元组。 参数: flatten (布尔值): 如果为 True,则将 (KV_BLOCK_SIZE, Q_BLOCK_SIZE) 的元组进行展平 """ if flatten: block_size = (self.BLOCK_SIZE[0], self.BLOCK_SIZE[1]) # type: ignore[assignment] seq_lengths = (self.seq_lengths[0], self.seq_lengths[1]) # 忽略[赋值]警告 else: block_size = (self.BLOCK_SIZE,) # 忽略[赋值]警告 seq_lengths = (self.seq_lengths,) # 忽略[赋值]警告 return ( *seq_lengths, self.kv_num_blocks, self.kv_indices, self.full_kv_num_blocks, self.full_kv_indices, self.q_num_blocks, self.q_indices, self.full_q_num_blocks, self.full_q_indices, *block_size, self.mask_mod, )
@property def shape(自己): *批处理维度
, _, _ = 自己.kv 索引.形状 返回 元组(批处理维度) + 自己.序列长度 def __str__(自己): s = f"块掩码(shape=){自己.shape},稀疏度={自己.稀疏性():.2f}%, 输入文本翻译为简体中文为: \n" mask_str = 自己.转换为字符串().strip() s += mask_str s += "输入文本翻译为简体中文为:\n)" 返回 s def __getitem__(自己, 索引) -> 块掩码: "" 通过获取给定索引位置的掩码来返回一个新的 BlockMask 实例。 Args: index: 应用到所有属性的索引。 例子用法: .. 代码块 :: python def causal_mask(b, h, q_idx, kv_idx): return q_idx >= kv_idx block_mask = create_block_mask(causal_mask, 4, 2, 512, 512, device="cuda") assert block_mask.kv_num_blocks.shape == (4,2,4) assert block_mask.kv_indices.shape == (4,2,4,4) # 在批次维度上的索引 new_block_mask = block_mask[0] 断言 new_block_mask.kv_num_blocks.shape 等于 (2,4) 断言 new_block_mask.kv_indices.shape 等于 (2,4,4) # 在批次和头部维度上的索引 new_block_mask = block_mask[0, 1] 断言 new_block_mask.kv_num_blocks.shape 等于 (4,) 断言 new_block_mask.kv_indices.shape 等于 (4,4) # 在批量和头部维度上进行切片 new_block_mask = block_mask[0:2, 1:2] assert new_block_mask.kv_num_blocks.shape == (2,1,4) assert new_block_mask.kv_indices.shape == (2,1,4,4) 在批处理、头部和查询维度上进行切片 new_block_mask = block_mask[0:2, 1:2, torch.tensor([1], dtype=torch.int32)] 断言 new_block_mask.kv_num_blocks.shape 等于 (2,1,1) 断言 new_block_mask.kv_indices.shape 等于 (2,1,1,4) "源代码" 新增 kv_num_blocks = 自己.块数量[索引] 新键值索引 = 自己.kv 索引[索引] 如果 自己.full_kv_num_blocks not : 断言 自己.full_kv_indices not 新增全键值块数量 = 自己.完整键值对区块数量[索引] 新增全键值索引 = 自己.完整键值对索引[索引] 否则: 新增全键值块数量 = 新增全键值索引 = 返回 块掩码.来自键值块( 新的 KV 块数, 新的 KV 索引, 新的完整 KV 块数, 新的完整 KV 索引, BLOCK_SIZE=自己.BLOCK_SIZE, 遮罩模块=, 序列长度=自己.序列长度, ) def __repr__(自己): def shape_or_none(x: 可选[火炬.张量)] 返回 x.形状 如果 x not 否则 返回 ( fBlockMask(输入文本翻译为简体中文为:\n" fkv_num_blocks={自己.块数量.shape},输入文本翻译为简体中文为:\n" f" kv_indices="{自己.kv 索引.shape},输入文本翻译为简体中文为:\n" f" full_kv_num_blocks="{shape_or_none(自己.full_kv_num_blocks )},输入文本翻译为简体中文为:\n" f" full_kv_indices="{shape_or_none(自己.完整键值对索引)},输入文本翻译为简体中文为:\n" f" q_num_blocks="{shape_or_none(自己.q_num_blocks)},输入文本翻译为简体中文为:\n" f" q_indices="{shape_or_none(自己.q 索引)},输入文本翻译为简体中文为:\n" ffull_q_num_blocks={shape_or_none(自己.full_q_num_blocks)},输入文本翻译为简体中文为:\n" f" full_q_indices="{shape_or_none(自己.全量索引)},输入文本翻译为简体中文为:\n" f" BLOCK_SIZE="{自己.BLOCK_SIZE},输入文本翻译为简体中文为:\n" f" shape="{自己.shape},输入文本翻译为简体中文为:\n" fsparsity={自己.稀疏性():.2f}%,输入文本翻译为简体中文为:\n" fmask_mod={自己.遮罩模块.__name__ 如果 有属性(自己.遮罩模块, '__名称__') 否则 自己.遮罩模块}输入文本翻译为简体中文为:\n" f)" ) def _调整(自己, 新的队列长度: int, 新的键值长度: int): 新的行数 = (新的队列长度 + 自己.BLOCK_SIZE[0] - 1) // 自己.BLOCK_SIZE[0] new_num_cols = (新键值长度 + 自己.BLOCK_SIZE[1] - 1) // 自己.BLOCK_SIZE[1] 新的 KV 块数, 新键值索引 = 调整区块和索引数量( 自己.块数量, 自己.kv 索引, 新的行数, new_num_cols ) 如果 自己.full_kv_num_blocks not : 断言 自己.full_kv_indices not ( 新的完整 KV 块数, 新的完整 KV 索引, ) = 调整区块和索引数量( 自己.完整键值对区块数量, 自己.完整键值对索引, 新的行数, 新增列数, ) 否则: 新增全键值块数量 = 新增全键值索引 = 返回 自己.来自键值块( 新的 KV 块数, 新的 KV 索引, 新的完整 KV 块数, 新的完整 KV 索引, 自己.BLOCK_SIZE, 自己.遮罩模块, )
[文档] def numel(self): """返回元素数量(不考虑稀疏性)。""" shape = self.shape def _prod(xs): return functools.reduce(operator.mul, xs, 1) return _prod(shape)
[文档] def sparsity(self) -> float: """计算稀疏块(即未计算的块)的百分比""" total_size = self.numel() computed_blocks = self.kv_num_blocks.sum() if self.full_kv_num_blocks is not None: computed_blocks += self.full_kv_num_blocks.sum() computed_size = computed_blocks.item() * self.BLOCK_SIZE[0] * self.BLOCK_SIZE[1] dense_ratio = computed_size / total_size return 100 * (1 - dense_ratio)
[文档] def to_dense(self) -> Tensor: “返回一个与块掩码等效的密集块。” partial_dense = _ordered_to_dense(self.kv_num_blocks, self.kv_indices) if self.full_kv_num_blocks is not None: assert self.full_kv_indices is not None return partial_dense | _ordered_to_dense( self.full_kv_num_blocks, self.full_kv_indices ) 返回部分密集
[文档] def 转换为字符串(自己, 网格大小=(20, 20), 限制=4): 返回块掩码的字符串表示。非常实用。 如果 grid_size 为 None,则输出未压缩版本。警告,这可能相当大! "源代码" 稠密掩码 = 自己.转换为稠密格式() *批处理维度, 行数, 列数 = 稠密掩码.形状 如果 isinstance(网格大小, int): 最大行数 = 网格大小 最大列数 = 网格大小 elif 网格大小 == -1: 最大行数 = 行数 最大列数 = 列数 否则: 最大行数, 最大列数 = 网格大小 def 创建块可视化(*批次索引): 描述符 = [] 描述符.追加(f"{批次索引}") 可视化 = “,”.连接(反转(描述符)) + "输入文本翻译为简体中文为:\n" def 概括段落(部分): 百分比 = 部分.float().均值().项目() 如果 百分比 == 1: 返回 "█" elif 百分比 == 0: 返回 输入文本为空,请提供需要翻译的文本 否则: 返回 "░" def cdiv(a, b): 返回 (a + (b - 1)) // b 行步 = 最大值(1, cdiv(行数, 最大行数)) col_step = 最大值(1, cdiv(列数, max_cols)) r 范围(0, 行数, 行步长): c 范围(0, 列数, 列步长): 当前掩码 = 稠密掩码 索引 批次索引: 当前掩码 = 当前掩码[索引] 字符 = 概括段落( 当前掩码[r : r + 行步长, c : c + 列步长] ) 可视化 += 字符 * 2 可视化 += "输入文本翻译为简体中文为:\n" 返回 可视化 总视觉 = [] 索引, 批次索引 列举( itertools.产品(*[范围(i) i 批处理维度]) ): 如果 索引 == 限制: 总访问量.追加(...) 总访问量.追加(要打印更多,请设置 BlockMask.to_string(limit=N)) 总访问量.追加( "您也可以通过索引(BlockMask[batch, head])来选择特定的批次或头" ) 断开 块可视化 = 创建块可视化(*批次索引) 总访问量.追加(块访问量) 返回 "输入文本翻译为简体中文为:\n".连接(总访问量)
[文档] def to(self, device: Union[torch.device, str]) -> "BlockMask": """将 BlockMask 移动到指定的设备。 参数: device (torch.device 或 str): 将 BlockMask 移动到的目标设备。 可以是一个 torch.device 对象或一个字符串(例如,'cpu', 'cuda:0')。 返回: BlockMask:一个新的 BlockMask 实例,其中所有张量组件都移动到了指定的设备。 将所有张量组件移动到指定的设备。 备注: 此方法不会原地修改原始的 BlockMask。 相反,它返回一个新的 BlockMask 实例,其中各个张量属性 可能会被移动到指定的设备,也可能不会,这取决于它们的 当前设备位置。 """ mapped_attributes = tree_map_only( torch.Tensor, lambda x: x.to(device), self.as_tuple(flatten=False), ) return BlockMask(*mapped_attributes)
def 广播到维度(x, 暗淡): while x.暗淡() < 暗淡: x = x.展平(0) 返回 x def 向上取整到倍数(x, 倍数): 返回 (x + 多重 - 1) // 多重 * 多重 def 将掩码转换为块掩码( 遮罩: 张量, 块大小=默认稀疏块大小, KV 块大小=默认稀疏块大小, 分离完整块: 布尔类型 = 错误, ) -> 元组[张量, 可选[张量]] 断言 遮罩.dtype == 火炬.布尔类型 遮罩 = 广播到维度(遮罩, 4) def 需要填充以形成倍数的空间(x, 倍数): 返回 向上取整到倍数(x, 倍数) - x 遮罩 = 火炬.nn.功能性.填充( 遮罩, ( 0, 需要填充以形成倍数的空间(遮罩.shape[-1] KV 块大小), 0, 需要填充以形成倍数的空间(遮罩.shape[-2] 块大小), ), ) B, H, Q, KV = 遮罩.形状 断言 Q % 块大小 == 0 断言 KV % 键值块大小 == 0 遮罩 = 遮罩.视图( B, H, Q // 块大小, 块大小, KV // KV 块大小, 键值块大小 ) # [B, H, Q//Q 块大小, Q 块大小, KV//KV 块大小, KV 块大小] 遮罩 = 遮罩.排列( 0, 1, 2, 4, 3, 5 ) # [B, H, Q//Q 块大小, KV//KV 块大小, Q 块大小, KV 块大小] 遮罩块求和 = 遮罩.总和( 暗淡=[-2, -1] ) # [B, H, Q//Q_BLOCK_SIZE, KV//KV_BLOCK_SIZE] 如果 分离完整块: 全块求和 = 块大小 * 键值块大小 完整块 = 遮罩块求和 == 全块求和 部分块 = (遮罩块求和 > 0) & (遮罩块求和 < 全块和) 部分块 = 部分块.(数据类型=火炬.int8) 完整块 = 完整块.(数据类型=火炬.int8) 返回 部分块, 完整块 否则: 部分块 = 遮罩块求和 > 0 部分块 = 部分块.(数据类型=火炬.int8) 返回 部分块,
[文档]def or_masks(*mask_mods: _mask_mod_signature) -> _mask_mod_signature: """返回一个由提供的 mask_mods 组成的联合 mask_mod""" 如果不是所有 mask_mods 都是可调用的: 抛出 RuntimeError 异常(f"所有输入 mask_mods 都应该为可调用类型: {mask_mods}") def or_mask(b, h, q_idx, kv_idx): result = b.new_zeros((), dtype=torch.bool) for mask in mask_mods: result = result | mask(b, h, q_idx, kv_idx) return result return or_mask
[文档]def and_masks(*mask_mods: _mask_mod_signature) -> _mask_mod_signature: 返回一个由提供的 mask_mods 交集组成的 mask_mod if not all(callable(arg) for arg in mask_mods): raise RuntimeError(f"All inputs should be callable mask_mods: {mask_mods}") def and_mask(b, h, q_idx, kv_idx): result = b.new_ones((), dtype=torch.bool) for mask in mask_mods: result = result & mask(b, h, q_idx, kv_idx) 返回结果 返回并掩码
def _将块掩码转换为掩码( 块掩码, KV 块大小
=默认稀疏块大小, 块大小=默认稀疏块大小, ) -> 张量: 断言 块掩码.暗淡() == 4 B, H, Q, KV = 块掩码.形状 块掩码 = 块掩码.展开(块大小, KV 块大小, *块掩码.shape) 块掩码 = 块掩码.排列(2, 3, 4, 0, 5, 1).重塑( B, H, Q * 块大小, KV * 键值块大小 ) 返回 块掩码 def 从块掩码创建稀疏块( 块掩码: 元组[张量, 可选[张量]], 遮罩模块: 可选[可调用] 序列长度: 元组[int, int] 块大小: 整型 = 默认稀疏块大小, KV 块大小: 整型 = 默认稀疏块大小, ) -> 块掩码: 部分块, 完整块 = 块掩码 部分 BM = 稠密到有序(部分块) 如果 完整块 not : 完整 BM: 元组[可选[张量] 可选[张量]] = 稠密到有序( 完整块 ) 否则: 全部翻译 = (, ) 返回 块掩码.来自键值块( 部分翻译[0] 部分翻译[1] 完整 BM[0] 完整 BM[1] BLOCK_SIZE=(块大小, KV 块大小), 遮罩模块=遮罩模块, 序列长度=序列长度, )
[文档]def create_mask( mod_fn: Union[分数修改签名, 遮罩修改签名], B: Optional[int], H: Optional[int], Q_LEN: int, KV_LEN: 整数, device: 字符串 = "cuda", ) -> 张量: r"""此函数从 mod_fn 函数创建一个掩码张量。 Args: mod_fn (Union[_score_mod_signature, _mask_mod_signature]): 函数用于修改注意力分数。 B (int): 批量大小。 H (int): 查询头数量。 Q_LEN (int): 查询序列长度。 KV_LEN (int): 键/值序列长度。 device (str): 在其上运行掩码创建的设备。 返回: mask (Tensor):一个形状为 (B, H, M, N) 的掩码张量。 """ 如果 B 为 None: B = 1 如果 H 为 None: H = 1 b = torch.arange(0, B, device=device) h = torch.arange(0, H, device=device) m = torch.arange(0, Q_LEN, device=device) n = torch.arange(0, KV_LEN, device=device) mod_type = _get_mod_type(mod_fn) with TransformGetItemToIndex(): 如果 mod_type == _ModificationType.SCORE_MOD: score_mod = mod_fn score_mod = _vmap_for_bhqkv(score_mod, prefix=(0,)) # 第一个输入是分数 out = score_mod(torch.zeros(B, H, Q_LEN, KV_LEN, device=device), b, h, m, n) mask = torch.where(torch.isneginf(out), False, True) return mask elif mod_type == _ModificationType.MASK_MOD: mask_mod = mod_fn mask_mod = _vmap_for_bhqkv(mask_mod, prefix=()) mask = mask_mod(b, h, m, n) return mask else: 引发断言错误
[文档]def 创建块掩码( 遮罩模块: _mask_mod_signature, B: 可选[int] H: 可选[int] Q 长度: int, KV 长度: int, 设备: 字符串 = cuda, BLOCK_SIZE: 联盟[int, 元组[int, int]] = 默认稀疏块大小, 编译=错误, ) -> 块掩码: r这个函数从 mask_mod 函数创建一个块掩码元组。 Args: mask_mod (Callable): mask_mod 函数。这是一个可调用的函数,用于定义注意力机制的掩码模式。 masking pattern for the attention mechanism. It takes four arguments: b (batch size), h (number of heads), q_idx (query index), and kv_idx (key/value index). 应返回一个布尔张量,指示哪些注意力连接被允许(True) 或被屏蔽(False)。 B (int): 批量大小。 H (int): 查询头数量。 Q_LEN(int):查询序列长度。 KV_LEN(int):键/值序列长度。 设备(str):运行面具创建的设备。 BLOCK_SIZE(int 或 tuple[int, int]):块掩码的块大小。如果提供一个单个 int,则用于查询和键/值。 返回: BlockMask:包含块掩码信息的 BlockMask 对象。 例子用法: .. 代码块 :: python def causal_mask(b, h, q_idx, kv_idx): return q_idx >= kv_idx block_mask = create_block_mask(causal_mask, 1, 1, 8192, 8192, device="cuda") query = torch.randn(1, 1, 8192, 64, device="cuda", dtype=torch.float16) key = torch.randn(1, 1, 8192, 64, device="cuda", dtype=torch.float16) value = torch.randn(1, 1, 8192, 64, device="cuda", dtype=torch.float16) output = flex_attention(query, key, value, block_mask=block_mask) "源代码" 模块类型 = 获取模块类型(遮罩模块) 断言 ( 模块类型 == _修改类型.遮罩修改 ), f"create-block_mask 需要一个 mask_mod 函数!得到{遮罩模块}" 如果 B : B = 1 如果 H : H = 1 如果 isinstance(BLOCK_SIZE, int): 块大小 = 块大小 键值块大小 = 块大小 否则: 块大小, 键值块大小 = 块大小 如果 编译: warnings.警告( "_compile 标志最初是在 create_block_mask 创建时添加的,以解决 torch.compile 的限制。该限制已被解决。因此,为了编译 create_block_mask,我们建议使用 torch.compile(create_block_mask)。目前这仍然有效,但将来将被移除。", 弃用警告, ) 返回 火炬.编译(创建块掩码)( 遮罩模块, B, H, Q 长度, KV 长度, 设备, 块大小 ) 遮罩张量 = 创建遮罩(遮罩模块, B, H, Q 长度, KV 长度, 设备) 部分块掩码, 全块掩码 = 将掩码转换为块掩码( 掩码张量, 块大小=块大小, KV 块大小=KV 块大小, 分离完整块=True, ) 块掩码 = 从块掩码创建稀疏块( (部分块掩码, 完整块掩码), 遮罩模块, (Q 长度, KV 长度), 块大小, KV 块大小, ) 返回 块掩码
def _创建空块掩码(查询: 张量, : 张量) -> 块掩码: r"""默认块掩码,用于弹性注意力。 如果用户没有指定任何块稀疏掩码信息,我们将创建这个 空的块稀疏掩码。这将创建一个包含 1 个块的 BlockMask,该块是查询和键张量的完整长度 的。 "源代码" 设备 = 查询.设备 返回 块掩码.来自键值块( 块数量=火炬.([1, 1, 1] 数据类型=火炬.int32, 设备=设备), kv 索引=火炬.([1, 1, 1, 1] 数据类型=火炬.int32, 设备=设备), BLOCK_SIZE=_大稀疏块大小, 序列长度=(1, 1), ) def _嵌套模块函数适配器( 原始模块函数: 联盟[_得分模块签名, _mask_mod_signature] q_nt: 火炬.张量, kv_nt: 火炬.张量, 是否分数模块: 布尔, ) -> 联盟[_得分模块签名, _mask_mod_signature]: r适配器,将 score_mod / mask_mod 转换为 NJT 兼容。给定的 mod 函数 应编写为在单个序列项上操作。此适配器将 处理长度为 `sum(S)` 的 "堆叠序列" 的索引转换 对于 NJT 中的序列长度"S",将其转换为范围"[0, S)"内的"序列相对"索引。 Args: orig_mod_func (Callable): 用于修改注意力分数的函数。根据是否传递 mask_mod 或 score_mod 函数,它接受四个或五个参数。 arguments, depending on whether a mask_mod or score_mod func is passed. q_nt (torch.Tensor): 定义序列长度的交错布局嵌套张量 (NJT)。 text structure for query. kv_nt (torch.Tensor):定义序列长度的 Jagged 布局嵌套张量(NJT)。 键/值结构。 is_score_mod (bool): 指示 mod 函数是否为 score_mod。 返回: nt_score_mod:orig_score_mod 的 NJT 兼容版本 "源代码" 用于将“堆叠”序列中的索引(范围[0, sum(*)]) 转换为“序列局部”索引(对于每个 S,范围[0, S])。 def 构建序列索引(偏移量, 总长度): 范围张量 = 火炬.arange( 总长度, 设备=偏移量.设备, 数据类型=火炬.int32 类型 ) 使用 searchsorted 查找每个位置的索引 # NB: 这假设 offsets[0]到 offsets[-1]覆盖了值的打包维度。 # 如果我们放宽这个限制,此逻辑将需要更新。 序列索引 = 火炬.搜索排序(偏移量, 范围张量, 正确=True) - 1 返回 序列索引 q 偏移量 = q_nt.偏移量 # 类型: 忽略[attr-defined] kv 偏移量 = kv_nt.偏移量 # 类型: 忽略[attr-defined] q 序列索引 = 构建序列索引(q_offsets, q_nt..shape[q_nt._ragged_idx - 1]) # 类型: 忽略[attr-defined] 如果 q_nt kv_nt: kv 序列索引 = q 序列索引 否则: # 交叉注意力案例 kv 序列索引 = 构建序列索引(kv 偏移量, kv_nt..shape[kv_nt._ragged_idx - 1]) # 类型: 忽略[attr-defined] 将 q_idx / kv_idx 从[0, total_length)转换为[0, S),其中 S 表示 将 NJT 中每个序列的序列长度用于给定 # score_mod. 这允许用户将 score_mod 编写成 在单个序列上操作,而“堆叠序列”被分割 自动将它们转换为单独的序列。 如果 是否分数模块: def nt_score_mod(分数, b, h, q_idx, kv_idx): b_nested = q_seq_idx[q_idx] q_nested = q_idx - q_offsets[q_seq_idx[q_idx]] kv_nested = kv_idx - kv 偏移量[kv_seq_idx[kv_idx]] 是同一序列 = q_seq_idx[q_idx] == kv_seq_idx[kv_idx] 返回 火炬.哪里( 是否相同序列, 原始模块函数(分数, B 嵌套, h, Q 嵌套, kv 嵌套), 忽略调用参数 # 不允许序列间注意力 float(-无穷大), ) 返回 nt 得分修改 否则: def nt 掩码修改(b, h, q_idx, kv_idx): b_nested = q_seq_idx[q_idx] q_nested = q_idx - q_offsets[q_seq_idx[q_idx]] kv_nested = kv_idx - kv 偏移量[kv_seq_idx[kv_idx]] # 不允许序列间注意力 相同序列 = q_seq_idx[q_idx] == kv_seq_idx[kv_idx] 返回 原始模块函数(B 嵌套, h, Q 嵌套, kv 嵌套) & 相同序列 忽略调用参数 返回 nt_mask_mod
[文档]def 创建嵌套块掩码( 遮罩模块: _mask_mod_signature, B: 可选[int] H: 可选[int] q_nt: 火炬.张量, kv_nt: 可选[火炬.张量] = , BLOCK_SIZE: 联盟[int, 元组[int, int]] = 默认稀疏块大小, 编译=错误, ) -> 块掩码: r该函数从掩码函数创建一个与嵌套张量兼容的块掩码元组。返回的 BlockMask 将在输入嵌套张量指定的设备上。 该函数创建一个与嵌套张量兼容的块掩码元组。返回的 BlockMask 将位于输入嵌套张量指定的设备上。 Args: mask_mod (Callable): mask_mod 函数。这是一个可调用的函数,用于定义注意力机制的掩码模式。 masking pattern for the attention mechanism. It takes four arguments: b (batch size), h (number of heads), q_idx (query index), and kv_idx (key/value index). It should return a boolean tensor indicating which attention connections are allowed (True) 或屏蔽 (False)。 B (int): 批量大小。 H (int): 查询头数量。 q_nt (torch.Tensor): 定义序列长度的交错布局嵌套张量 (NJT)。 查询结构。块掩码将被构建以在长度为“sum(S)”的“堆叠”序列上操作。 从 NJT 的序列长度“S”中,序列长度为“``sum(S)``”的“序列”。 kv_nt (torch.Tensor):定义序列长度的 Jagged 布局嵌套张量(NJT)。 结构用于键/值,允许进行交叉注意力。块掩码将被构建。 构造以在长度为 `sum(S)` 的 "堆叠序列" 上操作 的 "S" 长度上。如果此值为 None,则使用 `q_nt` 定义结构 的键/值。默认:None BLOCK_SIZE (int 或 tuple[int, int]):块掩码的块大小。如果是一个单独的 int 如果用于查询和键/值。 返回: BlockMask:包含块掩码信息的 BlockMask 对象。 例子用法: .. 代码块 :: python # 形状为 (B, num_heads, seq_len*, D),其中 seq_len* 在批次中变化 查询 = torch.nested.nested_tensor(..., layout=torch.jagged) 键 = torch.nested.nested_tensor(..., layout=torch.jagged) value = torch.nested.nested_tensor(..., 布局=torch.jagged) def causal_mask(b, h, q_idx, kv_idx): return q_idx >= kv_idx block_mask = create_nested_block_mask(causal_mask, 1, 1, query, _compile=True) output = flex_attention(query, key, value, block_mask=block_mask) .. 代码块 :: python # 形状 (B, num_heads, seq_len*, D) 其中 seq_len* 在批次中变化 query = torch.nested.nested_tensor(..., 布局=torch.jagged) key = torch.nested.nested_tensor(..., layout=torch.jagged) value = torch.nested.nested_tensor(..., layout=torch.jagged) def causal_mask(b, h, q_idx, kv_idx): return q_idx >= kv_idx # 交叉注意力案例:传递查询和键/值 NJTs block_mask = create_nested_block_mask(causal_mask, 1, 1, query, key, _compile=True) output = flex_attention(query, key, value, block_mask=block_mask) "源代码" # 使用与 q 相同的结构默认为 kv 如果 kv_nt : kv_nt = q_nt 如果 q_nt.设备 != kv_nt.设备: 提升 ValueError( "create_nested_block_mask():期望 q_nt 和 kv_nt 在同一个设备上" ) 返回 创建块掩码( _嵌套模块函数适配器(遮罩模块, q_nt, kv_nt, 是否分数模块=错误), # type: ignore[arg-type] B, H, q_nt..shape[q_nt._ragged_idx - 1] # 类型: 忽略[attr-defined] kv_nt..shape[kv_nt._ragged_idx - 1] # 类型: 忽略[attr-defined] 设备=q_nt.设备, # type: ignore[arg-type] 编译很重要,这样我们才不会实例化一个 mask_tensor 形状为 (1, 1, total_seqlen, total_seqlen) BLOCK_SIZE=BLOCK_SIZE, 编译=编译, )
def 应用内核选项( 查询: 张量, : 张量, : 张量, 返回 LSE: 布尔, 内核选项 ): 内核选项 = {} 如果 内核选项 否则 字典(内核选项) 内核选项.setdefault("预缩放 QK", 错误) 内核选项.setdefault("保证行安全", 错误) 内核选项.setdefault("块是连续的", 错误) # 这将强制所有偏置梯度散布都在反向传播的 DQ 迭代循环中完成 内核选项.setdefault(WRITE_DQ, True) 如果前向内核需要返回 logsumexp,则此规则内部决定。 断言 "OUTPUT_LOGSUMEXP" not 内核选项 内核选项["OUTPUT_LOGSUMEXP"] = 真实 如果 not 返回 LSE: # 我们曾经检查过 q,k,v 是否需要梯度,但捕获的缓冲区可能需要梯度 # 我们总是写入,除非在 no_grad 模式下 输出 logsumexp = 火炬.梯度是否启用() 内核选项["OUTPUT_LOGSUMEXP"] = 输出 logsumexp CPU 设备上的任何输入 = ( 查询.设备.类型 == cpu .设备.类型 == cpu .设备.类型 == cpu ) 如果 CPU 设备上的任何输入: # 现在 CPU 使用 torch.compile 支持推理,且不会返回 lse # TODO: 支持 CPU 训练并返回 lse 内核选项["OUTPUT_LOGSUMEXP"] = 返回 内核选项 def _验证嵌入维度(查询: 张量, : 张量, : 张量): 如果 查询.尺寸(-1) != .尺寸(-1): 提升 ValueError( f"期望查询和键/值具有相同的嵌入维度" f但是得到了 E={查询.尺寸(-1)}并且 E={.尺寸(-1)} ) 返回 # TODO 此配置在 Triton 中崩溃,除非: # https://github.com/triton-lang/triton/pull/4540 如果 not ( 支持的头维度(查询.尺寸(-1)) 支持的头维度(.尺寸(-1)) ): 提升 ValueError( f"NYI:目前不支持非 2 的幂次方嵌入维度。" f"获取 E="{查询.尺寸(-1)}和 Ev={.尺寸(-1)} ) def _验证设备(查询: 张量, : 张量, : 张量): "TODO:一旦添加了对非 cuda/cpu 设备的支持后删除" 我们只需要检查查询,因为我们已经知道 q,k,v 都在同一设备上 "源代码" 如果 查询.设备.类型 != "cuda" 查询.设备.类型 != "cpu": 提升 ValueError( FlexAttention 仅在 CUDA 或 CPU 设备上受支持。 f在设备上找到输入张量。{查询.设备.类型}在设备上。 ) def _验证嵌套性(查询: 张量, : 张量, : 张量): 目前,输入只能全部嵌套或全部不嵌套。 如果 查询.是嵌套的 != .是嵌套的 .是嵌套的 != .是嵌套的: 提升 ValueError( "FlexAttention 不支持混合嵌套张量/非嵌套张量输入。" "请提交一个请求此功能的工单,如果这对您很重要。" ) 如果 ( (查询.是嵌套的 查询.长度 not ) # 类型: 忽略[attr-defined] (.是嵌套的 .长度 not ) # 类型: 忽略[attr-defined] (.是嵌套的 .长度 not ) # 类型: 忽略[attr-defined] ): 提升 ValueError( "FlexAttention 不支持非连续且有空洞的嵌套张量。" "请提交一个请求此功能的工单,如果这对您很重要。" )
[文档]def flex_attention( 查询: 张量, : 张量, : 张量, 分数模块: 可选[_得分模块签名] = , 块掩码: 可选[块掩码] = , 比例: 可选[float] = , enable_gqa: 布尔类型 = 错误, 返回 LSE: 布尔类型 = 错误, 内核选项: 可选[字典[字符串, 任意]] = , ) -> 联盟[张量, 元组[张量, 张量]] r该函数实现了具有任意注意力分数修改函数的缩放点积注意力。 该函数计算查询、键和值张量之间的缩放点积注意力,使用用户定义的 注意力分数修改函数。在注意力分数修改函数应用之后,将注意力分数修改函数应用。 查询和关键张量之间的分数已计算。注意力分数计算如下: ``score_mod`` 函数应具有以下签名: .. 代码块 :: python def score_mod( 分数:张量 批量:张量 头:张量 q_idx:张量 k_idx: 索引张量 ) -> 索引张量: 哪里: - ``score``: 一个表示注意力分数的标量张量, 与查询、键和值张量具有相同的数据类型和设备。 - ``batch``、``head``、``q_idx``、``k_idx``:表示批索引、查询头索引、查询索引和键/值索引的标量张量。 这些应该具有 ``torch.int`` 数据类型,并且位于得分张量相同的设备上。 这些应该具有 ``torch.int`` 数据类型,并且位于得分张量相同的设备上。 Args: 查询(张量):查询张量;形状:\(B, Hq, L, E\)。 密钥(张量):密钥张量;形状:\(B, Hkv, S, E\)。 值(张量):值张量;形状:\(B, Hkv, S, Ev\)。 score_mod(可选[可调用]):修改注意力分数的函数。默认不应用 score_mod。 block_mask (Optional[BlockMask]): 控制注意力块稀疏模式的 BlockMask 对象。 scale (Optional[float]): 在 softmax 之前应用的缩放因子。如果没有指定,默认值设置为 :math:`\frac{1}{\sqrt{E}}`。 enable_gqa (bool): 如果设置为 True,则启用分组查询注意力(GQA)并将键/值头广播到查询头。 return_lse (bool): 是否返回注意力分数的 logsumexp。默认为 False。 kernel_options (Optional[Dict[str, Any]]): 将选项传递给 Triton 内核。 返回: output (Tensor): 注意力输出;形状 :math:`(B, Hq, L, Ev)`。 形状说明: - :math:`N: 批处理大小 ... : 其他批处理维度数量(可选)` - S:源序列长度 - L:目标序列长度 - E:查询和键的嵌入维度 - Ev:值的嵌入维度 .. 警告:: `torch.nn.attention.flex_attention` 是 PyTorch 中的一个原型功能。 请期待 PyTorch 未来版本中更稳定的实现。 在此处阅读关于特征分类的更多信息:https://maskerprc.github.io/blog/pytorch-feature-classification-changes/#prototype "源代码" # 一些基本的输入验证 验证 SDPA 输入(查询, , ) _验证嵌入维度(查询, , ) _验证设备(查询, , ) _验证嵌套性(查询, , ) 如果 查询.暗淡() != 4 .暗淡() != 4 .暗淡() != 4: 提升 不支持的操作异常(NYI:查询、键和值必须是 4D 张量) 如果 (not enable_gqa) 查询.尺寸(-3) != .尺寸(-3): 提升 ValueError( f"期望查询和键值对具有相同数量的头 " f"但得到 Hq= "{查询.尺寸(-3)} and Hkv={.尺寸(-3)}. f尝试设置 enable_gqa=True 以启用 GQA。 ) 如果 enable_gqa: Hq = 查询.尺寸(1) Hkv = .尺寸(1) 如果 Hq % Hkv != 0: 提升 ValueError( f"期望查询头数是 GQA 中 kv 头数的倍数" f"但得到 Hq= "{Hq} and Hkv={Hkv} ) 如果 查询.尺寸(0) != .尺寸(0): 如果 块掩码 : 提升 ValueError( f"期望查询和键值对具有相同的批量大小," f"或者非空块掩码," f"但得到块掩码=None,Bq={查询.尺寸(0)},并且 Bkv={.尺寸(0)} ) 如果 块掩码.块数量.尺寸(0) != 查询.尺寸(0): 提升 ValueError( f"期望查询和键值对具有相同的批量大小," f"或者块掩码和查询具有相同的批量大小," f但是得到了 Bq={查询.尺寸(0)},Bkv={.尺寸(0)}B_块掩码={块掩码.块数量.尺寸(0)} ) 如果 分数模块 : 分数模块 = 身份 elif 查询.是嵌套的: 如果 q 和 kv 之间的序列长度不匹配的 ragged 结构相同,则使用相同的 NJT kv = ( 查询 如果 查询.尺寸(查询._ragged_idx) == .尺寸(查询._ragged_idx) # 类型: 忽略[attr-defined] 否则 key ) 分数模块 = _嵌套模块函数适配器(分数模块, 查询, kv, 是否分数模块=True) # 类型:忽略[赋值] 如果 块掩码 : 块掩码 = _创建空块掩码(查询, ) 如果 ( 块掩码.BLOCK_SIZE[0] == _大稀疏块大小 块掩码.BLOCK_SIZE[1] == _大稀疏块大小 ): # 这对应于我们基本上有一个“无操作”块掩码的情况。 通过 elif 查询.是嵌套的: 如果 块掩码.shape[-2] != 查询..尺寸(查询._ragged_idx - 1): # 类型: 忽略[attr-defined] 提升 运行时错误( f"块掩码的形状"{块掩码.shape}与嵌套张量输入不兼容" f"序列总长度为"{查询..尺寸(查询._ragged_idx - 1)}" # 类型: 忽略[attr-defined] ) 否则: block_mask_q_len = 块掩码.shape[-2] 块掩码 kv 长度 = 块掩码.shape[-1] 如果 查询.尺寸(-2) > block_mask_q_len .尺寸(-2) > block_mask_kv_len: 提升 ValueError( f"block_mask 是为了 block_mask.shape="{块掩码.shape}但是获取到 q_len={查询.尺寸(-2)} and kv_len={.尺寸(-2)}. 由于块掩码是为比您现在使用的长度更小的长度创建的,您可能需要创建一个新的块掩码。 ) elif ( 查询.尺寸(-2) < block_mask_q_len .尺寸(-2) 块掩码 kv 长度 ) (查询.尺寸(-2) block_mask_q_len .尺寸(-2) < block_mask_kv_len): 提升 ValueError( f"block_mask 是为了 block_mask.shape="{块掩码.shape}但是获取到 q_len={查询.尺寸(-2)} and kv_len={.尺寸(-2)}. 由于块掩码是为比您使用的长度更大的长度创建的,因此您可以:1. 创建一个新的块掩码,长度正确;2. 通过调用 block_mask._adjust(q_len, kv_len) 来“调整”现有的块掩码到正确的长度。这实际上是将块掩码“裁剪”到左上角,但这并不适用于所有 mask_mods! ) 断言 查询.尺寸(-2) == block_mask_q_len 断言 .尺寸(-2) == 块掩码 kv 长度 如果 缩放 : 缩放 = 1.0 / 数学.平方根(查询.尺寸(-1)) 如果 查询.设备 != 块掩码.块数量.设备: # 类型:忽略[联合属性] 提升 运行时错误( f"期望 q/k/v 和 block_mask 位于同一设备上" f"但得到了"{查询.设备}{块掩码.块数量.设备} # 类型:忽略[联合属性] ) 内核选项 = 应用内核选项( 查询, , , 返回 LSE, 内核选项, ) 如果 火炬.编译器.动态编译中(): # 将头维度和头的数量设置为静态 x [查询, , ]: 火炬._dynamo.静态标记(x, -3) 火炬._dynamo.静态标记(x, -1) 输出, 伦敦证券交易所 = 弹性注意力跳转( 查询, , , 分数模块, 块掩码.as_tuple(), 比例, 内核选项 # 类型:忽略[联合属性] ) 如果 返回 LSE: 返回 输出, 伦敦证券交易所 * 数学.日志(2) 否则: 返回 out 如果 not 火炬._dynamo.支持 Dynamo(): 提升 运行时错误("flex_attention 需要 Dynamo 支持") 来自 torch._dynamo.backends.debugging 导入 ( 使用 torch_function_mode 创建急切后端, ) 动态期望一个具有 "__code__" 属性的可调用对象。 我们不能直接传递 hop 给它。所以我们将它包装在一个虚拟函数中。 def _flex_attention_hop_wrapper(*参数, **kwargs): 返回 弹性注意力跳转(*参数, **kwargs) 设置编译环境(): 火炬._dynamo.工具.禁用缓存限制(): 临时移除 torch 函数模式中的预分发(): 临时移除 torch 函数模式中的元数据() 作为 元数据模式: 如果 元数据模式: 后端 = 使用 torch_function_mode 创建急切后端( 元数据模式 ) 否则: 后端 = 贪婪 输出, 伦敦证券交易所 = 火炬.编译( _flex_attention_hop_wrapper, 后端=后端, 全图=真实 )( 查询, , , 分数模块, 块掩码.as_tuple(), # 类型:忽略[联合属性] 比例, 内核选项, ) 如果 返回 LSE: 返回 输出, 伦敦证券交易所 * 数学.日志(2) 否则: 返回 输出

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源