LeetGPU习题01:Matrix Addition
在 LeetGPU 的习题列表中,Element-wise 算子指的是对输入张量/数组的每个元素独立执行相同操作、不依赖于其他元素或全局统计量的算子。
以下是明确的 Element-wise 算子:
算子说明
| 名称 | 说明 |
|---|---|
| Vector Addition | 两个向量逐元素相加 |
| Matrix Addition | 两个矩阵逐元素相加 |
| Matrix Copy | 逐元素复制矩阵 |
| Color Inversion | 对每个像素独立取反 |
| Reverse Array | 反转数组,每个元素独立移动位置 |
| ReLU | 逐元素应用 ReLU 函数 |
| Leaky ReLU | 逐元素应用 Leaky ReLU |
| Sigmoid Activation | 逐元素应用 Sigmoid 函数 |
| Value Clipping | 逐元素裁剪到指定范围 |
| Sigmoid Linear Unit (SiLU) | 逐元素 SiLU 激活 |
| Swish-Gated Linear Unit (SWiGLU) | 逐元素 SWiGLU(门控部分也为逐元素) |
| Gaussian Error Gated Linear Unit (GEGLU) | 逐元素 GEGLU 激活 |
| RGB to Grayscale | 每个像素独立转换,不依赖邻域 |
| Interleave Arrays | 交替合并两数组,每个输出元素仅依赖对应位置输入 |
| Rotary Positional Embedding | 对每个位置独立应用旋转矩阵 |
| Weight Dequantization | 每个权重独立反量化 |
| INT8 Quantized MatMul(仅反量化部分) | 反量化部分为逐元素,整体不是 |
| Simple Inference | 线性层前向包含矩阵乘,非 element-wise,但其中的激活部分可能是逐元素 |
Vector Addition
1. Matrix Addition题目
实现一个在 GPU 上对两个包含 32 位浮点数的矩阵进行逐元素相加的程序。程序接收两个相同维度的输入矩阵,输出一个矩阵,其中的每个元素为对应位置元素之和。
示例 1:
输入:A = [[1.0, 2.0], [3.0, 4.0]]B = [[5.0, 6.0], [7.0, 8.0]]
输出:C = [[6.0, 8.0], [10.0, 12.0]]示例 2:
输入:A = [[1.5, 2.5, 3.5], [4.5, 5.5, 6.5], [7.5, 8.5, 9.5]]B = [[0.5, 0.5, 0.5], [0.5, 0.5, 0.5], [0.5, 0.5, 0.5]]
输出:C = [[2.0, 3.0, 4.0], [5.0, 6.0, 7.0], [8.0, 9.0, 10.0]]约束条件
- 输入矩阵
A和B维度相同 - 1 ≤ N ≤ 4096
- 所有元素均为 32 位浮点数
- 性能评测基于 N = 4,096
2. Pytorch题解
import torch# A, B, C are tensors on the GPUdef solve(A: torch.Tensor, B: torch.Tensor, C: torch.Tensor, N: int): C.copy_(A + B)3. Triton题解

3.1. 向量化如何计算offset?
block_start = pid * BLOCK_SIZE * VEC_WIDTH
offsets = block_start + tl.arange(0, BLOCK_SIZE)[:, None] * VEC_WIDTH + tl.arange(0, VEC_WIDTH)[None, :]offsets = tl.reshape(offsets, (BLOCK_SIZE * VEC_WIDTH,))- 先来看一个例子,
假设
BLOCK_SIZE=4,VEC_WIDTH=2,pid=0(起始 0):
-
tl.arange(0,4)[:,None]*2→ 列向量[[0], [2], [4], [6]] -
tl.arange(0,2)[None,:]→ 行向量[[0, 1]]
广播相加得到:
[[0, 1], [2, 3], [4, 5], [6, 7]]展平后得到 [0, 1, 2, 3, 4, 5, 6, 7]。这正好是线程块 0 要处理的前 8 个元素的连续索引。
- 计算详解
block_start就是一次性要处理BLOCK_SIZE * VEC_WIDTH个元素,再乘以当前的pid。
之后构造每一个线程的偏移量数组:
tl.arange(0, BLOCK_SIZE) # shape:(1, BLOCK_SIZE)tl.arange(0, BLOCK_SIZE)[:, None] # 转置: (BLOCK_SIZE, 1)tl.arange(0, BLOCK_SIZE)[:, None] * VEC_WIDTH # 每个线程本身负责的第一个元素的偏移量例如:VEC_WIDTH=4时,tid=0得到下标0,tid=1得到下标4,tid=2得到下标8等等。
在上述处理之后,我们还需要处理内部的元素:
+ tl.arange(0, VEC_WITDH) # shape(1, VEC_WIDTH)+ tl.arange(0, VEC_WIDTH)[None, :] # 行向量 (1, VEC_WIDTH)最终得到一个形状为(BLOCK_SIZE, VEC_WIDTH)的二维数组,每一行是一个线程,每一列是单个线程要处理的连续偏移。
例如tid=1 & VEC_WIDTH=4 该行的内容为[4,5,6,7]
offsets = block_start + tl.arange(0, BLOCK_SIZE)[:, None] * VEC_WIDTH + tl.arange(0, VEC_WIDTH)[None, :]在计算完向量化的坐标之后,我们再将其变为一维数组:
offsets = tl.reshape(offsets, (BLOCK_SIZE * VEC_WIDTH,))3.2. 二维块指针构造
实际上,在处理天然具有二维结构的数据时,并且并行算法也是二维划分时,选择二维线程块是一个自然的选择。
矩阵运算、图像处理、卷积池化等。
在这些场景下,代码会更加直观,可以直接对应数据块的行列位置,并且无需进行重新转换ID,并且可以调整并行度以适应不同的硬件架构和数据模型。
当访存模式非常复杂时,就不太适合转换为1维操作来实现。
a_block_ptr = tl.make_block_ptr( base=a_ptr, # 内存基地址 shape=(N, N), # 完整数据的形状 strides=(N, 1), # 行步长、列步长 offsets=(pid_m * BLOCK_M, pid_n * BLOCK_N), # 当前块的起始坐标 block_shape=(BLOCK_M, BLOCK_N), # 要加载的块大小 order=(1, 0) # 线程映射顺序)假设我们有一个4*4的矩阵A,在GPU以行优先存储,内存布局如下:

3.3. 完整的三种方法对比
import torchimport tritonimport triton.language as tlimport timeimport numpy as np
# ------------------------------------------------------------# 方案 1:直接加法# ------------------------------------------------------------@triton.jitdef matrix_add_kernel(a, b, c, n_elements, BLOCK_SIZE: tl.constexpr): pid = tl.program_id(axis=0)
block_start = pid * BLOCK_SIZE offsets = block_start + tl.arange(0, BLOCK_SIZE) mask = offsets < n_elements
ga = tl.load(a + offsets, mask=mask) gb = tl.load(b + offsets, mask=mask) gc = ga + gb tl.store(c + offsets, gc, mask=mask)
def solve_triton_naive(a: torch.Tensor, b: torch.Tensor, c: torch.Tensor, N: int): BLOCK_SIZE = 1024 n_elements = N * N grid = (triton.cdiv(n_elements, BLOCK_SIZE),) matrix_add_kernel[grid](a, b, c, n_elements, BLOCK_SIZE)
# ------------------------------------------------------------# 方案 2:Triton 一维向量化 + Autotune# ------------------------------------------------------------@triton.autotune( configs=[ triton.Config({'BLOCK_SIZE': 1024, 'VEC_WIDTH': 1}, num_warps=4), triton.Config({'BLOCK_SIZE': 1024, 'VEC_WIDTH': 2}, num_warps=4), triton.Config({'BLOCK_SIZE': 2048, 'VEC_WIDTH': 2}, num_warps=8), triton.Config({'BLOCK_SIZE': 4096, 'VEC_WIDTH': 4}, num_warps=8), triton.Config({'BLOCK_SIZE': 4096, 'VEC_WIDTH': 8}, num_warps=16), ], key=['n_elements'],)@triton.jitdef matrix_add_kernel_1d( a_ptr, b_ptr, c_ptr, n_elements: tl.constexpr, BLOCK_SIZE: tl.constexpr, VEC_WIDTH: tl.constexpr,): pid = tl.program_id(axis=0) block_start = pid * BLOCK_SIZE * VEC_WIDTH
offsets = block_start + tl.arange(0, BLOCK_SIZE)[:, None] * VEC_WIDTH + tl.arange(0, VEC_WIDTH)[None, :] offsets = tl.reshape(offsets, (BLOCK_SIZE * VEC_WIDTH,))
mask = offsets < n_elements
a_vals = tl.load(a_ptr + offsets, mask=mask, other=0.0) b_vals = tl.load(b_ptr + offsets, mask=mask, other=0.0) c_vals = a_vals + b_vals
tl.store(c_ptr + offsets, c_vals, mask=mask)
def solve_triton_1d(A: torch.Tensor, B: torch.Tensor, C: torch.Tensor, N: int): n_elements = N * N grid = lambda meta: (triton.cdiv(n_elements, meta['BLOCK_SIZE'] * meta['VEC_WIDTH']),) matrix_add_kernel_1d[grid](A, B, C, n_elements)
# ------------------------------------------------------------# 方案 3:Triton 二维块指针 + Autotune# ------------------------------------------------------------@triton.autotune( configs=[ triton.Config({'BLOCK_M': 128, 'BLOCK_N': 128}, num_warps=4), triton.Config({'BLOCK_M': 128, 'BLOCK_N': 256}, num_warps=4), triton.Config({'BLOCK_M': 256, 'BLOCK_N': 128}, num_warps=8), triton.Config({'BLOCK_M': 256, 'BLOCK_N': 256}, num_warps=8), triton.Config({'BLOCK_M': 512, 'BLOCK_N': 128}, num_warps=8), triton.Config({'BLOCK_M': 512, 'BLOCK_N': 256}, num_warps=8), ], key=['N'],)@triton.jitdef matrix_add_kernel_2d( a_ptr, b_ptr, c_ptr, N, BLOCK_M: tl.constexpr, BLOCK_N: tl.constexpr,): pid_m = tl.program_id(axis=0) pid_n = tl.program_id(axis=1)
a_block_ptr = tl.make_block_ptr( base=a_ptr, shape=(N, N), strides=(N, 1), offsets=(pid_m * BLOCK_M, pid_n * BLOCK_N), block_shape=(BLOCK_M, BLOCK_N), order=(1, 0), ) b_block_ptr = tl.make_block_ptr( base=b_ptr, shape=(N, N), strides=(N, 1), offsets=(pid_m * BLOCK_M, pid_n * BLOCK_N), block_shape=(BLOCK_M, BLOCK_N), order=(1, 0), ) c_block_ptr = tl.make_block_ptr( base=c_ptr, shape=(N, N), strides=(N, 1), offsets=(pid_m * BLOCK_M, pid_n * BLOCK_N), block_shape=(BLOCK_M, BLOCK_N), order=(1, 0), )
a = tl.load(a_block_ptr, boundary_check=(0, 1)) b = tl.load(b_block_ptr, boundary_check=(0, 1)) c = a + b tl.store(c_block_ptr, c, boundary_check=(0, 1))
def solve_triton_2d(A: torch.Tensor, B: torch.Tensor, C: torch.Tensor, N: int): grid = lambda meta: ( triton.cdiv(N, meta['BLOCK_M']), triton.cdiv(N, meta['BLOCK_N']), ) matrix_add_kernel_2d[grid](A, B, C, N)
# ------------------------------------------------------------# 性能测试工具# ------------------------------------------------------------def benchmark(func, A, B, C, N, warmup=10, repeat=100): """ 运行指定函数多次,返回平均耗时(毫秒)。 """ # 预热 for _ in range(warmup): func(A, B, C, N) torch.cuda.synchronize()
# 计时 start = time.perf_counter() for _ in range(repeat): func(A, B, C, N) torch.cuda.synchronize() end = time.perf_counter()
avg_time_ms = (end - start) / repeat * 1000 return avg_time_ms
def verify_results(C_triton_naive, C_triton_1d, C_triton_2d): """ 验证三种方案结果是否一致。 """ if torch.allclose(C_triton_naive, C_triton_1d, atol=1e-5): print("✅ PyTorch 与 Triton 1D 结果一致") else: print("❌ PyTorch 与 Triton 1D 结果不一致")
if torch.allclose(C_triton_naive, C_triton_2d, atol=1e-5): print("✅ PyTorch 与 Triton 2D 结果一致") else: print("❌ PyTorch 与 Triton 2D 结果不一致")
def main(): # 检查 CUDA 可用性 if not torch.cuda.is_available(): raise RuntimeError("CUDA 不可用,请在有 GPU 的环境下运行") device = torch.device("cuda") print(f"运行设备: {torch.cuda.get_device_name(device)}")
# 问题规模 N = 4096
# 分配 GPU 张量 A = torch.randn(N, N, device=device, dtype=torch.float32) B = torch.randn(N, N, device=device, dtype=torch.float32) C_triton_naive = torch.empty_like(A) C_triton_1d = torch.empty_like(A) C_triton_2d = torch.empty_like(A)
print(f"\n矩阵大小: {N} x {N} ({N*N} 个元素)")
# 验证正确性(单独运行一次) solve_triton_naive(A, B, C_triton_naive, N) solve_triton_1d(A, B, C_triton_1d, N) solve_triton_2d(A, B, C_triton_2d, N) verify_results(C_triton_naive, C_triton_1d, C_triton_2d)
# 性能测试 print("\n开始性能测试 (预热 10 次,计时 100 次取平均)...\n")
time_pytorch = benchmark(solve_triton_naive, A, B, C_triton_naive, N) time_triton_1d = benchmark(solve_triton_1d, A, B, C_triton_1d, N) time_triton_2d = benchmark(solve_triton_2d, A, B, C_triton_2d, N)
# 输出结果 print(f"PyTorch 直接加法: {time_pytorch:.4f} ms") print(f"Triton 1D 向量化: {time_triton_1d:.4f} ms") print(f"Triton 2D 块指针: {time_triton_2d:.4f} ms")
# 计算加速比 baseline = time_pytorch print(f"\n相对于 PyTorch 的加速比:") print(f" Triton 1D: {baseline / time_triton_1d:.2f}x") print(f" Triton 2D: {baseline / time_triton_2d:.2f}x")
# 计算内存带宽 bytes_per_element = 4 # float32 total_bytes = 3 * N * N * bytes_per_element # A读 + B读 + C写 bw_pytorch = total_bytes / (time_pytorch / 1000) / 1e9 bw_triton_1d = total_bytes / (time_triton_1d / 1000) / 1e9 bw_triton_2d = total_bytes / (time_triton_2d / 1000) / 1e9
print(f"\n估算内存带宽 (GB/s):") print(f" PyTorch: {bw_pytorch:.2f} GB/s") print(f" Triton 1D: {bw_triton_1d:.2f} GB/s") print(f" Triton 2D: {bw_triton_2d:.2f} GB/s")
if __name__ == "__main__": main()支持与分享
如果这篇文章对你有帮助,欢迎分享给更多人或赞助支持!