LeetGPU习题01:Matrix Addition
在 LeetGPU 的习题列表中,Element-wise 算子指的是对输入张量/数组的每个元素独立执行相同操作、不依赖于其他元素或全局统计量的算子。
以下是明确的 Element-wise 算子:
算子说明
| 名称 | 说明 |
|---|---|
| Vector Addition | 两个向量逐元素相加 |
| Matrix Addition | 两个矩阵逐元素相加 |
| Matrix Copy | 逐元素复制矩阵 |
| Color Inversion | 对每个像素独立取反 |
| Reverse Array | 反转数组,每个元素独立移动位置 |
| ReLU | 逐元素应用 ReLU 函数 |
| Leaky ReLU | 逐元素应用 Leaky ReLU |
| Sigmoid Activation | 逐元素应用 Sigmoid 函数 |
| Value Clipping | 逐元素裁剪到指定范围 |
| Sigmoid Linear Unit (SiLU) | 逐元素 SiLU 激活 |
| Swish-Gated Linear Unit (SWiGLU) | 逐元素 SWiGLU(门控部分也为逐元素) |
| Gaussian Error Gated Linear Unit (GEGLU) | 逐元素 GEGLU 激活 |
| RGB to Grayscale | 每个像素独立转换,不依赖邻域 |
| Interleave Arrays | 交替合并两数组,每个输出元素仅依赖对应位置输入 |
| Rotary Positional Embedding | 对每个位置独立应用旋转矩阵 |
| Weight Dequantization | 每个权重独立反量化 |
| INT8 Quantized MatMul(仅反量化部分) | 反量化部分为逐元素,整体不是 |
| Simple Inference | 线性层前向包含矩阵乘,非 element-wise,但其中的激活部分可能是逐元素 |
Vector Addition
1. Matrix Addition题目
实现一个在 GPU 上对两个包含 32 位浮点数的矩阵进行逐元素相加的程序。程序接收两个相同维度的输入矩阵,输出一个矩阵,其中的每个元素为对应位置元素之和。
示例 1:
输入:A = [[1.0, 2.0], [3.0, 4.0]]B = [[5.0, 6.0], [7.0, 8.0]]
输出:C = [[6.0, 8.0], [10.0, 12.0]]示例 2:
输入:A = [[1.5, 2.5, 3.5], [4.5, 5.5, 6.5], [7.5, 8.5, 9.5]]B = [[0.5, 0.5, 0.5], [0.5, 0.5, 0.5], [0.5, 0.5, 0.5]]
输出:C = [[2.0, 3.0, 4.0], [5.0, 6.0, 7.0], [8.0, 9.0, 10.0]]约束条件
- 输入矩阵
A和B维度相同 - 1 ≤ N ≤ 4096
- 所有元素均为 32 位浮点数
- 性能评测基于 N = 4,096
2. Pytorch题解
import torch# A, B, C are tensors on the GPUdef solve(A: torch.Tensor, B: torch.Tensor, C: torch.Tensor, N: int): C.copy_(A + B)3. Triton题解
(1) 向量化 (2)二维直接计算

3.1. 向量化如何计算offset?
在编写Triton Kernel的时候,我们希望一个现成一次处理多个连续元素,例如,每个线程一次读取4个float32, 就等价于一条128位宽的向量加载指令,能够显著提升内存带宽利用率.
那么如何来生成每个线程要访问的全局内存偏移量呢?
一个直观的例子
假设每个线程块有4个线程,每个线程一次处理2个元素,则BLOCK_SIZE=4,VEC_WIDTH=2,pid=0(起始 0).
我们希望这4个线程共同覆盖一块连续的内存区域,总元素为BLOCK_SIZE * VEC_WIDTH = 8
. 一种自然的分配方式是:
线程0: [0, 1]线程1: [2, 3]线程2: [4, 5]线程3: [6, 7]因此我们希望Triton能够生成一个形状为(4, 2)的偏移量矩阵,其中第i行第j列的值刚好是全局内存的线性索引.
block_start = pid * BLOCK_SIZE * VEC_WIDTH
offsets = ( block_start # 当前线程块的起始偏移 + tl.arange(0, BLOCK_SIZE)[:, None] * VEC_WIDTH # 每个线程的基地址偏移 + tl.arange(0, VEC_WIDTH)[None, :] # 线程内的向量偏移)按步骤拆解
首先是计算线程块的起始偏移:
block_start = pid * BLOCK_SIZE * VEC_WIDTH.
每个线程块整体负责的数量为BLOCK_SIZE * VEC_WIDTH, 乘以pid后,就得到当前线程块在整个数据中的起始索引.
其次计算每个线程负责的第一个元素偏移:
tl.arange(0, BLOCK_SIZE)[:, None] * VEC_WIDTH.
tl.arange(0, BLOCK_SIZE) 生成一个一维向量 [0, 1, 2, 3],代表线程的局部 ID。
[:, None] 将其转换为列向量 (BLOCK_SIZE, 1),即形状变为 4×1.
[[0], [1], [2], [3]]乘以VEC_WIDTH后得到:
[[0], # 线程0负责的第一个元素距离块起始的偏移 [2], # 线程1负责的第一个元素偏移 [4], [6]]最后再加上线程内的连续向量偏移:
tl.arange(0, VEC_WIDTH)[None, :]
同样先生成一个[0, 1]的范围,然后[None,:]将其转换为行向量,即形状为(1, 2).
现在,将三者相加,根据Triton的广播机制,列会沿着列方向复制,行会沿着行方向复制,最终的到一个的矩阵:
[[0 + 0, 0 + 1], → [[0, 1], [2 + 0, 2 + 1], [2, 3], [4 + 0, 4 + 1], [4, 5], [6 + 0, 6 + 1]] [6, 7]]计算完成偏移量之后,我们将其展平为一维数组:
offsets = tl.reshape(offsets, (BLOCK_SIZE * VEC_WIDTH,))这样我们就可以直接用这个偏移量数组调用tl.load或者tl.store,一次性进行向量化访问.
3.2. 二维块指针构造
实际上,在处理矩阵、图像这类天然具有二维结构的数据时,将并行任务也按二维方式划分,往往能让代码思路更加清晰。此时,每个线程块直接对应数据中的一个子矩阵(或子图像区域),省去了将二维坐标手动转换为一维线性 ID 的麻烦。更重要的是,这种划分方式让我们可以灵活调整行、列两个方向上的并行度,更好地适配不同硬件架构与问题规模。
Triton 提供了 tl.make_block_ptr 来优雅地描述这种二维块访问模式。我们先通过一个直观的例子感受它的便利,再逐一拆解每个参数的含义。
一个直观的例子
假设我们有一个4*4的矩阵A,在GPU以行优先存储,内存布局如下:

我们有一个 4×4 的矩阵 A,在 GPU 内存中按行优先(row-major)方式存储,其内存布局如下图所示(数字表示元素在内存中的线性偏移):
列 0 列 1 列 2 列 3┌────┬────┬────┬────┐│ 0 │ 1 │ 2 │ 3 │ 行 0├────┼────┼────┼────┤│ 4 │ 5 │ 6 │ 7 │ 行 1├────┼────┼────┼────┤│ 8 │ 9 │10 │11 │ 行 2├────┼────┼────┼────┤│12 │13 │14 │15 │ 行 3└────┴────┴────┴────┘现在我们希望在 kernel 中每次处理一个 2×2 的子块。 例如,假设当前线程块负责处理图中右下角的子块(包含元素 10, 11, 14, 15),它的左上角位于矩阵的第 2 行、第 2 列。
我们使用tl.make_block_ptr来描述这个子块:
a_block_ptr = tl.make_block_ptr( base=a_ptr, # ① 内存基地址 shape=(4, 4), # ② 完整矩阵形状 strides=(4, 1), # ③ 行步长、列步长 offsets=(2, 2), # ④ 子块左上角坐标 block_shape=(2, 2), # ⑤ 子块大小 order=(1, 0) # ⑥ 线程映射顺序)创建好这个块指针后,我们只需一行 tl.load(a_block_ptr),Triton 便会自动计算出所有需要访问的全局内存地址,并安全地处理边界情况。接下来,我们详细看看每个参数的具体作用。
参数详解
- base —— 内存基地址,即矩阵在 GPU 内存中的起始地址,通常通过 PyTorch Tensor 的 .data_ptr() 方法获得。
- shape —— 完整数据的逻辑形状. 此处的 (4, 4) 告诉 Triton 矩阵共有4行4列。
- strides —— 各维度的内存跨步. 这是连接“逻辑坐标”与“物理地址”的关键。对于行优先存储的矩阵:
沿着第 0 维(行) 移动 1,意味着从当前行跳到下一行,需要跨越一整行的元素,因此步长为 4(即一行有 4 个元素)。
沿着第 1 维(列) 移动 1,意味着在同一行内移动到下一列,相邻元素在内存中紧挨着,因此步长为 1。 所以 strides = (4, 1)。反之,如果矩阵是列优先存储,则 strides 应为 (1, 4)。
- offsets —— 子块左上角的起始坐标. 它是一个元组 (row_start, col_start)。
在实际 kernel 中,offsets 会根据线程块的 ID 动态计算,例如:
pid_m = tl.program_id(0) # 行方向的块 IDpid_n = tl.program_id(1) # 列方向的块 IDoffsets = (pid_m * BLOCK_M, pid_n * BLOCK_N)- block_shape —— 要加载的块大小.
这是我们在 kernel 中设定的超参数,例如 BLOCK_M = 2、BLOCK_N = 2。它决定了每次 tl.load 会从内存中读取多少个元素。合适的块大小需要根据共享内存容量、计算资源以及数据复用程度进行权衡,常见取值如 64×64、128×128 等。
- order —— 线程数据分布映射顺序
这个参数稍微有些进阶。简单来说,它告诉编译器数据在块内是如何分配给各个线程的,会影响生成的 PTX 代码结构。在大多数常规场景下(例如非 Tensor Core 的简单加载/存储),order=(0,1) 与 order=(1,0) 的性能差异微乎其微,因此你不必过分纠结。
不过,当你使用 NVIDIA Hopper 架构(H100 等)并希望启用 TMA(Tensor Memory Accelerator) 硬件加速时,正确设置 order 就显得至关重要了,它可以带来数倍的访存性能提升。一般情况下,对于矩阵乘法中的 A 矩阵加载,常用 order=(1,0);对于 B 矩阵,常用 order=(0,1)。若想深入探究,可以借我一张H100测试一下!
3.3. Naive, Vec, 2D 完整实现
import torchimport tritonimport triton.language as tlimport timeimport numpy as np
# 方案 1:直接加法@triton.jitdef matrix_add_kernel(a, b, c, n_elements, BLOCK_SIZE: tl.constexpr): pid = tl.program_id(axis=0)
block_start = pid * BLOCK_SIZE offsets = block_start + tl.arange(0, BLOCK_SIZE) mask = offsets < n_elements
ga = tl.load(a + offsets, mask=mask) gb = tl.load(b + offsets, mask=mask) gc = ga + gb tl.store(c + offsets, gc, mask=mask)
def solve_triton_naive(a: torch.Tensor, b: torch.Tensor, c: torch.Tensor, N: int): BLOCK_SIZE = 1024 n_elements = N * N grid = (triton.cdiv(n_elements, BLOCK_SIZE),) matrix_add_kernel[grid](a, b, c, n_elements, BLOCK_SIZE)
# 方案 2:Triton 一维向量化 + Autotune@triton.autotune( configs=[ triton.Config({'BLOCK_SIZE': 1024, 'VEC_WIDTH': 1}, num_warps=4), triton.Config({'BLOCK_SIZE': 1024, 'VEC_WIDTH': 2}, num_warps=4), triton.Config({'BLOCK_SIZE': 2048, 'VEC_WIDTH': 2}, num_warps=8), triton.Config({'BLOCK_SIZE': 4096, 'VEC_WIDTH': 4}, num_warps=8), triton.Config({'BLOCK_SIZE': 4096, 'VEC_WIDTH': 8}, num_warps=16), ], key=['n_elements'],)@triton.jitdef matrix_add_kernel_1d( a_ptr, b_ptr, c_ptr, n_elements: tl.constexpr, BLOCK_SIZE: tl.constexpr, VEC_WIDTH: tl.constexpr,): pid = tl.program_id(axis=0) block_start = pid * BLOCK_SIZE * VEC_WIDTH
offsets = block_start + tl.arange(0, BLOCK_SIZE)[:, None] * VEC_WIDTH + tl.arange(0, VEC_WIDTH)[None, :] offsets = tl.reshape(offsets, (BLOCK_SIZE * VEC_WIDTH,))
mask = offsets < n_elements
a_vals = tl.load(a_ptr + offsets, mask=mask, other=0.0) b_vals = tl.load(b_ptr + offsets, mask=mask, other=0.0) c_vals = a_vals + b_vals
tl.store(c_ptr + offsets, c_vals, mask=mask)
def solve_triton_1d(A: torch.Tensor, B: torch.Tensor, C: torch.Tensor, N: int): n_elements = N * N grid = lambda meta: (triton.cdiv(n_elements, meta['BLOCK_SIZE'] * meta['VEC_WIDTH']),) matrix_add_kernel_1d[grid](A, B, C, n_elements)
# 方案 3:Triton 二维块指针 + Autotune@triton.autotune( configs=[ triton.Config({'BLOCK_M': 128, 'BLOCK_N': 128}, num_warps=4), triton.Config({'BLOCK_M': 128, 'BLOCK_N': 256}, num_warps=4), triton.Config({'BLOCK_M': 256, 'BLOCK_N': 128}, num_warps=8), triton.Config({'BLOCK_M': 256, 'BLOCK_N': 256}, num_warps=8), triton.Config({'BLOCK_M': 512, 'BLOCK_N': 128}, num_warps=8), triton.Config({'BLOCK_M': 512, 'BLOCK_N': 256}, num_warps=8), triton.Config({'BLOCK_M': 512, 'BLOCK_N': 512}, num_warps=8), ], key=['N'],)@triton.jitdef matrix_add_kernel_2d( a_ptr, b_ptr, c_ptr, N, BLOCK_M: tl.constexpr, BLOCK_N: tl.constexpr,): pid_m = tl.program_id(axis=0) pid_n = tl.program_id(axis=1)
a_block_ptr = tl.make_block_ptr( base=a_ptr, shape=(N, N), strides=(N, 1), offsets=(pid_m * BLOCK_M, pid_n * BLOCK_N), block_shape=(BLOCK_M, BLOCK_N), order=(1, 0), ) b_block_ptr = tl.make_block_ptr( base=b_ptr, shape=(N, N), strides=(N, 1), offsets=(pid_m * BLOCK_M, pid_n * BLOCK_N), block_shape=(BLOCK_M, BLOCK_N), order=(1, 0), ) c_block_ptr = tl.make_block_ptr( base=c_ptr, shape=(N, N), strides=(N, 1), offsets=(pid_m * BLOCK_M, pid_n * BLOCK_N), block_shape=(BLOCK_M, BLOCK_N), order=(1, 0), )
a = tl.load(a_block_ptr, boundary_check=(0, 1)) b = tl.load(b_block_ptr, boundary_check=(0, 1)) c = a + b tl.store(c_block_ptr, c, boundary_check=(0, 1))
def solve_triton_2d(A: torch.Tensor, B: torch.Tensor, C: torch.Tensor, N: int): grid = lambda meta: ( triton.cdiv(N, meta['BLOCK_M']), triton.cdiv(N, meta['BLOCK_N']), ) matrix_add_kernel_2d[grid](A, B, C, N)
# ------------------------------------------------------------# 性能测试工具# ------------------------------------------------------------def benchmark(func, A, B, C, N, warmup=10, repeat=100): """ 运行指定函数多次,返回平均耗时(毫秒)。 """ # 预热 for _ in range(warmup): func(A, B, C, N) torch.cuda.synchronize()
# 计时 start = time.perf_counter() for _ in range(repeat): func(A, B, C, N) torch.cuda.synchronize() end = time.perf_counter()
avg_time_ms = (end - start) / repeat * 1000 return avg_time_ms
def verify_results(C_triton_naive, C_triton_1d, C_triton_2d): """ 验证三种方案结果是否一致。 """ if torch.allclose(C_triton_naive, C_triton_1d, atol=1e-5): print("✅ Naive Triton 与 Triton 1D 结果一致") else: print("❌ Naive Triton 与 Triton 1D 结果不一致")
if torch.allclose(C_triton_naive, C_triton_2d, atol=1e-5): print("✅ Naive Triton 与 Triton 2D 结果一致") else: print("❌ Naive Triton 与 Triton 2D 结果不一致")
def main(): # 检查 CUDA 可用性 if not torch.cuda.is_available(): raise RuntimeError("CUDA 不可用,请在有 GPU 的环境下运行") device = torch.device("cuda") print(f"运行设备: {torch.cuda.get_device_name(device)}")
# 问题规模 N = 4096
# 分配 GPU 张量 A = torch.randn(N, N, device=device, dtype=torch.float32) B = torch.randn(N, N, device=device, dtype=torch.float32) C_triton_naive = torch.empty_like(A) C_triton_1d = torch.empty_like(A) C_triton_2d = torch.empty_like(A)
print(f"\n矩阵大小: {N} x {N} ({N*N} 个元素)")
# 验证正确性 solve_triton_naive(A, B, C_triton_naive, N) solve_triton_1d(A, B, C_triton_1d, N) solve_triton_2d(A, B, C_triton_2d, N) verify_results(C_triton_naive, C_triton_1d, C_triton_2d)
# 性能测试 print("\n开始性能测试 (预热 10 次,计时 100 次取平均)...\n")
time_pytorch = benchmark(solve_triton_naive, A, B, C_triton_naive, N) time_triton_1d = benchmark(solve_triton_1d, A, B, C_triton_1d, N) time_triton_2d = benchmark(solve_triton_2d, A, B, C_triton_2d, N)
# 输出结果 print(f"Triton 直接加法: {time_pytorch:.4f} ms") print(f"Triton 1D 向量化: {time_triton_1d:.4f} ms") print(f"Triton 2D 块指针: {time_triton_2d:.4f} ms")
# 计算加速比 baseline = time_pytorch print(f"\n相对于 Naive 的加速比:") print(f" Triton 1D: {baseline / time_triton_1d:.2f}x") print(f" Triton 2D: {baseline / time_triton_2d:.2f}x")
# 计算内存带宽 bytes_per_element = 4 # float32 total_bytes = 3 * N * N * bytes_per_element # A读 + B读 + C写 bw_pytorch = total_bytes / (time_pytorch / 1000) / 1e9 bw_triton_1d = total_bytes / (time_triton_1d / 1000) / 1e9 bw_triton_2d = total_bytes / (time_triton_2d / 1000) / 1e9
print(f"\n估算内存带宽 (GB/s):") print(f" Triton Naive: {bw_pytorch:.2f} GB/s") print(f" Triton 1D: {bw_triton_1d:.2f} GB/s") print(f" Triton 2D: {bw_triton_2d:.2f} GB/s")
if __name__ == "__main__": main()4. CUDA 题解
支持与分享
如果这篇文章对你有帮助,欢迎分享给更多人或赞助支持!