LeetGPU习题01:Matrix Addition

3180 字
16 分钟
LeetGPU习题01:Matrix Addition
2026-04-20
更新中

在 LeetGPU 的习题列表中,Element-wise 算子指的是对输入张量/数组的每个元素独立执行相同操作、不依赖于其他元素或全局统计量的算子。

以下是明确的 Element-wise 算子:

算子说明#

名称说明
Vector Addition两个向量逐元素相加
Matrix Addition两个矩阵逐元素相加
Matrix Copy逐元素复制矩阵
Color Inversion对每个像素独立取反
Reverse Array反转数组,每个元素独立移动位置
ReLU逐元素应用 ReLU 函数
Leaky ReLU逐元素应用 Leaky ReLU
Sigmoid Activation逐元素应用 Sigmoid 函数
Value Clipping逐元素裁剪到指定范围
Sigmoid Linear Unit (SiLU)逐元素 SiLU 激活
Swish-Gated Linear Unit (SWiGLU)逐元素 SWiGLU(门控部分也为逐元素)
Gaussian Error Gated Linear Unit (GEGLU)逐元素 GEGLU 激活
RGB to Grayscale每个像素独立转换,不依赖邻域
Interleave Arrays交替合并两数组,每个输出元素仅依赖对应位置输入
Rotary Positional Embedding对每个位置独立应用旋转矩阵
Weight Dequantization每个权重独立反量化
INT8 Quantized MatMul(仅反量化部分)反量化部分为逐元素,整体不是
Simple Inference线性层前向包含矩阵乘,非 element-wise,但其中的激活部分可能是逐元素

Vector Addition#

1. Matrix Addition题目#

实现一个在 GPU 上对两个包含 32 位浮点数的矩阵进行逐元素相加的程序。程序接收两个相同维度的输入矩阵,输出一个矩阵,其中的每个元素为对应位置元素之和。

示例 1:

输入:
A = [[1.0, 2.0],
[3.0, 4.0]]
B = [[5.0, 6.0],
[7.0, 8.0]]
输出:
C = [[6.0, 8.0],
[10.0, 12.0]]

示例 2:

输入:
A = [[1.5, 2.5, 3.5],
[4.5, 5.5, 6.5],
[7.5, 8.5, 9.5]]
B = [[0.5, 0.5, 0.5],
[0.5, 0.5, 0.5],
[0.5, 0.5, 0.5]]
输出:
C = [[2.0, 3.0, 4.0],
[5.0, 6.0, 7.0],
[8.0, 9.0, 10.0]]

约束条件

  • 输入矩阵 AB 维度相同
  • 1 ≤ N ≤ 4096
  • 所有元素均为 32 位浮点数
  • 性能评测基于 N = 4,096

2. Pytorch题解#

import torch
# A, B, C are tensors on the GPU
def solve(A: torch.Tensor, B: torch.Tensor, C: torch.Tensor, N: int):
C.copy_(A + B)

3. Triton题解#

这里应该如何优化呢?

(1) 向量化 (2)二维直接计算

Naive、Vec、2D性能对比
Naive、Vec、2D性能对比

3.1. 向量化如何计算offset?#

在编写Triton Kernel的时候,我们希望一个现成一次处理多个连续元素,例如,每个线程一次读取4个float32, 就等价于一条128位宽的向量加载指令,能够显著提升内存带宽利用率.

那么如何来生成每个线程要访问的全局内存偏移量呢?

一个直观的例子#

假设每个线程块有4个线程,每个线程一次处理2个元素,则BLOCK_SIZE=4,VEC_WIDTH=2,pid=0(起始 0).

我们希望这4个线程共同覆盖一块连续的内存区域,总元素为BLOCK_SIZE * VEC_WIDTH = 8 . 一种自然的分配方式是:

线程0: [0, 1]
线程1: [2, 3]
线程2: [4, 5]
线程3: [6, 7]

因此我们希望Triton能够生成一个形状为(4, 2)的偏移量矩阵,其中第i行第j列的值刚好是全局内存的线性索引.

block_start = pid * BLOCK_SIZE * VEC_WIDTH
offsets = (
block_start # 当前线程块的起始偏移
+ tl.arange(0, BLOCK_SIZE)[:, None] * VEC_WIDTH # 每个线程的基地址偏移
+ tl.arange(0, VEC_WIDTH)[None, :] # 线程内的向量偏移
)

按步骤拆解#

首先是计算线程块的起始偏移: block_start = pid * BLOCK_SIZE * VEC_WIDTH. 每个线程块整体负责的数量为BLOCK_SIZE * VEC_WIDTH, 乘以pid后,就得到当前线程块在整个数据中的起始索引.

其次计算每个线程负责的第一个元素偏移: tl.arange(0, BLOCK_SIZE)[:, None] * VEC_WIDTH. tl.arange(0, BLOCK_SIZE) 生成一个一维向量 [0, 1, 2, 3],代表线程的局部 ID。

[:, None] 将其转换为列向量 (BLOCK_SIZE, 1),即形状变为 4×1.

[[0],
[1],
[2],
[3]]

乘以VEC_WIDTH后得到:

[[0], # 线程0负责的第一个元素距离块起始的偏移
[2], # 线程1负责的第一个元素偏移
[4],
[6]]

最后再加上线程内的连续向量偏移: tl.arange(0, VEC_WIDTH)[None, :] 同样先生成一个[0, 1]的范围,然后[None,:]将其转换为行向量,即形状为(1, 2).

现在,将三者相加,根据Triton的广播机制,列会沿着列方向复制,行会沿着行方向复制,最终的到一个4×24 \times 2的矩阵:

[[0 + 0, 0 + 1], → [[0, 1],
[2 + 0, 2 + 1], [2, 3],
[4 + 0, 4 + 1], [4, 5],
[6 + 0, 6 + 1]] [6, 7]]

计算完成偏移量之后,我们将其展平为一维数组:

offsets = tl.reshape(offsets, (BLOCK_SIZE * VEC_WIDTH,))

这样我们就可以直接用这个偏移量数组调用tl.load或者tl.store,一次性进行向量化访问.

3.2. 二维块指针构造#

实际上,在处理矩阵、图像这类天然具有二维结构的数据时,将并行任务也按二维方式划分,往往能让代码思路更加清晰。此时,每个线程块直接对应数据中的一个子矩阵(或子图像区域),省去了将二维坐标手动转换为一维线性 ID 的麻烦。更重要的是,这种划分方式让我们可以灵活调整行、列两个方向上的并行度,更好地适配不同硬件架构与问题规模。

Triton 提供了 tl.make_block_ptr 来优雅地描述这种二维块访问模式。我们先通过一个直观的例子感受它的便利,再逐一拆解每个参数的含义。

一个直观的例子#

假设我们有一个4*4的矩阵A,在GPU以行优先存储,内存布局如下:

矩阵和内存映射
矩阵和内存映射

我们有一个 4×4 的矩阵 A,在 GPU 内存中按行优先(row-major)方式存储,其内存布局如下图所示(数字表示元素在内存中的线性偏移):

列 0 列 1 列 2 列 3
┌────┬────┬────┬────┐
│ 0 │ 1 │ 2 │ 3 │ 行 0
├────┼────┼────┼────┤
│ 4 │ 5 │ 6 │ 7 │ 行 1
├────┼────┼────┼────┤
│ 8 │ 9 │10 │11 │ 行 2
├────┼────┼────┼────┤
│12 │13 │14 │15 │ 行 3
└────┴────┴────┴────┘

现在我们希望在 kernel 中每次处理一个 2×2 的子块。 例如,假设当前线程块负责处理图中右下角的子块(包含元素 10, 11, 14, 15),它的左上角位于矩阵的第 2 行、第 2 列。

我们使用tl.make_block_ptr来描述这个子块:

a_block_ptr = tl.make_block_ptr(
base=a_ptr, # ① 内存基地址
shape=(4, 4), # ② 完整矩阵形状
strides=(4, 1), # ③ 行步长、列步长
offsets=(2, 2), # ④ 子块左上角坐标
block_shape=(2, 2), # ⑤ 子块大小
order=(1, 0) # ⑥ 线程映射顺序
)

创建好这个块指针后,我们只需一行 tl.load(a_block_ptr),Triton 便会自动计算出所有需要访问的全局内存地址,并安全地处理边界情况。接下来,我们详细看看每个参数的具体作用。

参数详解#

  1. base —— 内存基地址,即矩阵在 GPU 内存中的起始地址,通常通过 PyTorch Tensor 的 .data_ptr() 方法获得。
  2. shape —— 完整数据的逻辑形状. 此处的 (4, 4) 告诉 Triton 矩阵共有4行4列。
  3. strides —— 各维度的内存跨步. 这是连接“逻辑坐标”与“物理地址”的关键。对于行优先存储的矩阵:
strides详解

沿着第 0 维(行) 移动 1,意味着从当前行跳到下一行,需要跨越一整行的元素,因此步长为 4(即一行有 4 个元素)。

沿着第 1 维(列) 移动 1,意味着在同一行内移动到下一列,相邻元素在内存中紧挨着,因此步长为 1。 所以 strides = (4, 1)。反之,如果矩阵是列优先存储,则 strides 应为 (1, 4)。

  1. offsets —— 子块左上角的起始坐标. 它是一个元组 (row_start, col_start)。
offsets计算详解

在实际 kernel 中,offsets 会根据线程块的 ID 动态计算,例如:

pid_m = tl.program_id(0) # 行方向的块 ID
pid_n = tl.program_id(1) # 列方向的块 ID
offsets = (pid_m * BLOCK_M, pid_n * BLOCK_N)
  1. block_shape —— 要加载的块大小.

这是我们在 kernel 中设定的超参数,例如 BLOCK_M = 2、BLOCK_N = 2。它决定了每次 tl.load 会从内存中读取多少个元素。合适的块大小需要根据共享内存容量、计算资源以及数据复用程度进行权衡,常见取值如 64×64、128×128 等。

  1. order —— 线程数据分布映射顺序

这个参数稍微有些进阶。简单来说,它告诉编译器数据在块内是如何分配给各个线程的,会影响生成的 PTX 代码结构。在大多数常规场景下(例如非 Tensor Core 的简单加载/存储),order=(0,1) 与 order=(1,0) 的性能差异微乎其微,因此你不必过分纠结。

不过,当你使用 NVIDIA Hopper 架构(H100 等)并希望启用 TMA(Tensor Memory Accelerator) 硬件加速时,正确设置 order 就显得至关重要了,它可以带来数倍的访存性能提升。一般情况下,对于矩阵乘法中的 A 矩阵加载,常用 order=(1,0);对于 B 矩阵,常用 order=(0,1)。若想深入探究,可以借我一张H100测试一下!

3.3. Naive, Vec, 2D 完整实现#

import torch
import triton
import triton.language as tl
import time
import numpy as np
# 方案 1:直接加法
@triton.jit
def matrix_add_kernel(a, b, c, n_elements, BLOCK_SIZE: tl.constexpr):
pid = tl.program_id(axis=0)
block_start = pid * BLOCK_SIZE
offsets = block_start + tl.arange(0, BLOCK_SIZE)
mask = offsets < n_elements
ga = tl.load(a + offsets, mask=mask)
gb = tl.load(b + offsets, mask=mask)
gc = ga + gb
tl.store(c + offsets, gc, mask=mask)
def solve_triton_naive(a: torch.Tensor, b: torch.Tensor, c: torch.Tensor, N: int):
BLOCK_SIZE = 1024
n_elements = N * N
grid = (triton.cdiv(n_elements, BLOCK_SIZE),)
matrix_add_kernel[grid](a, b, c, n_elements, BLOCK_SIZE)
# 方案 2:Triton 一维向量化 + Autotune
@triton.autotune(
configs=[
triton.Config({'BLOCK_SIZE': 1024, 'VEC_WIDTH': 1}, num_warps=4),
triton.Config({'BLOCK_SIZE': 1024, 'VEC_WIDTH': 2}, num_warps=4),
triton.Config({'BLOCK_SIZE': 2048, 'VEC_WIDTH': 2}, num_warps=8),
triton.Config({'BLOCK_SIZE': 4096, 'VEC_WIDTH': 4}, num_warps=8),
triton.Config({'BLOCK_SIZE': 4096, 'VEC_WIDTH': 8}, num_warps=16),
],
key=['n_elements'],
)
@triton.jit
def matrix_add_kernel_1d(
a_ptr, b_ptr, c_ptr,
n_elements: tl.constexpr,
BLOCK_SIZE: tl.constexpr,
VEC_WIDTH: tl.constexpr,
):
pid = tl.program_id(axis=0)
block_start = pid * BLOCK_SIZE * VEC_WIDTH
offsets = block_start + tl.arange(0, BLOCK_SIZE)[:, None] * VEC_WIDTH + tl.arange(0, VEC_WIDTH)[None, :]
offsets = tl.reshape(offsets, (BLOCK_SIZE * VEC_WIDTH,))
mask = offsets < n_elements
a_vals = tl.load(a_ptr + offsets, mask=mask, other=0.0)
b_vals = tl.load(b_ptr + offsets, mask=mask, other=0.0)
c_vals = a_vals + b_vals
tl.store(c_ptr + offsets, c_vals, mask=mask)
def solve_triton_1d(A: torch.Tensor, B: torch.Tensor, C: torch.Tensor, N: int):
n_elements = N * N
grid = lambda meta: (triton.cdiv(n_elements, meta['BLOCK_SIZE'] * meta['VEC_WIDTH']),)
matrix_add_kernel_1d[grid](A, B, C, n_elements)
# 方案 3:Triton 二维块指针 + Autotune
@triton.autotune(
configs=[
triton.Config({'BLOCK_M': 128, 'BLOCK_N': 128}, num_warps=4),
triton.Config({'BLOCK_M': 128, 'BLOCK_N': 256}, num_warps=4),
triton.Config({'BLOCK_M': 256, 'BLOCK_N': 128}, num_warps=8),
triton.Config({'BLOCK_M': 256, 'BLOCK_N': 256}, num_warps=8),
triton.Config({'BLOCK_M': 512, 'BLOCK_N': 128}, num_warps=8),
triton.Config({'BLOCK_M': 512, 'BLOCK_N': 256}, num_warps=8),
triton.Config({'BLOCK_M': 512, 'BLOCK_N': 512}, num_warps=8),
],
key=['N'],
)
@triton.jit
def matrix_add_kernel_2d(
a_ptr, b_ptr, c_ptr,
N,
BLOCK_M: tl.constexpr,
BLOCK_N: tl.constexpr,
):
pid_m = tl.program_id(axis=0)
pid_n = tl.program_id(axis=1)
a_block_ptr = tl.make_block_ptr(
base=a_ptr,
shape=(N, N),
strides=(N, 1),
offsets=(pid_m * BLOCK_M, pid_n * BLOCK_N),
block_shape=(BLOCK_M, BLOCK_N),
order=(1, 0),
)
b_block_ptr = tl.make_block_ptr(
base=b_ptr,
shape=(N, N),
strides=(N, 1),
offsets=(pid_m * BLOCK_M, pid_n * BLOCK_N),
block_shape=(BLOCK_M, BLOCK_N),
order=(1, 0),
)
c_block_ptr = tl.make_block_ptr(
base=c_ptr,
shape=(N, N),
strides=(N, 1),
offsets=(pid_m * BLOCK_M, pid_n * BLOCK_N),
block_shape=(BLOCK_M, BLOCK_N),
order=(1, 0),
)
a = tl.load(a_block_ptr, boundary_check=(0, 1))
b = tl.load(b_block_ptr, boundary_check=(0, 1))
c = a + b
tl.store(c_block_ptr, c, boundary_check=(0, 1))
def solve_triton_2d(A: torch.Tensor, B: torch.Tensor, C: torch.Tensor, N: int):
grid = lambda meta: (
triton.cdiv(N, meta['BLOCK_M']),
triton.cdiv(N, meta['BLOCK_N']),
)
matrix_add_kernel_2d[grid](A, B, C, N)
# ------------------------------------------------------------
# 性能测试工具
# ------------------------------------------------------------
def benchmark(func, A, B, C, N, warmup=10, repeat=100):
"""
运行指定函数多次,返回平均耗时(毫秒)。
"""
# 预热
for _ in range(warmup):
func(A, B, C, N)
torch.cuda.synchronize()
# 计时
start = time.perf_counter()
for _ in range(repeat):
func(A, B, C, N)
torch.cuda.synchronize()
end = time.perf_counter()
avg_time_ms = (end - start) / repeat * 1000
return avg_time_ms
def verify_results(C_triton_naive, C_triton_1d, C_triton_2d):
"""
验证三种方案结果是否一致。
"""
if torch.allclose(C_triton_naive, C_triton_1d, atol=1e-5):
print("✅ Naive Triton 与 Triton 1D 结果一致")
else:
print("❌ Naive Triton 与 Triton 1D 结果不一致")
if torch.allclose(C_triton_naive, C_triton_2d, atol=1e-5):
print("✅ Naive Triton 与 Triton 2D 结果一致")
else:
print("❌ Naive Triton 与 Triton 2D 结果不一致")
def main():
# 检查 CUDA 可用性
if not torch.cuda.is_available():
raise RuntimeError("CUDA 不可用,请在有 GPU 的环境下运行")
device = torch.device("cuda")
print(f"运行设备: {torch.cuda.get_device_name(device)}")
# 问题规模
N = 4096
# 分配 GPU 张量
A = torch.randn(N, N, device=device, dtype=torch.float32)
B = torch.randn(N, N, device=device, dtype=torch.float32)
C_triton_naive = torch.empty_like(A)
C_triton_1d = torch.empty_like(A)
C_triton_2d = torch.empty_like(A)
print(f"\n矩阵大小: {N} x {N} ({N*N} 个元素)")
# 验证正确性
solve_triton_naive(A, B, C_triton_naive, N)
solve_triton_1d(A, B, C_triton_1d, N)
solve_triton_2d(A, B, C_triton_2d, N)
verify_results(C_triton_naive, C_triton_1d, C_triton_2d)
# 性能测试
print("\n开始性能测试 (预热 10 次,计时 100 次取平均)...\n")
time_pytorch = benchmark(solve_triton_naive, A, B, C_triton_naive, N)
time_triton_1d = benchmark(solve_triton_1d, A, B, C_triton_1d, N)
time_triton_2d = benchmark(solve_triton_2d, A, B, C_triton_2d, N)
# 输出结果
print(f"Triton 直接加法: {time_pytorch:.4f} ms")
print(f"Triton 1D 向量化: {time_triton_1d:.4f} ms")
print(f"Triton 2D 块指针: {time_triton_2d:.4f} ms")
# 计算加速比
baseline = time_pytorch
print(f"\n相对于 Naive 的加速比:")
print(f" Triton 1D: {baseline / time_triton_1d:.2f}x")
print(f" Triton 2D: {baseline / time_triton_2d:.2f}x")
# 计算内存带宽
bytes_per_element = 4 # float32
total_bytes = 3 * N * N * bytes_per_element # A读 + B读 + C写
bw_pytorch = total_bytes / (time_pytorch / 1000) / 1e9
bw_triton_1d = total_bytes / (time_triton_1d / 1000) / 1e9
bw_triton_2d = total_bytes / (time_triton_2d / 1000) / 1e9
print(f"\n估算内存带宽 (GB/s):")
print(f" Triton Naive: {bw_pytorch:.2f} GB/s")
print(f" Triton 1D: {bw_triton_1d:.2f} GB/s")
print(f" Triton 2D: {bw_triton_2d:.2f} GB/s")
if __name__ == "__main__":
main()

4. CUDA 题解#

支持与分享

如果这篇文章对你有帮助,欢迎分享给更多人或赞助支持!

赞助
LeetGPU习题01:Matrix Addition
https://dlog.com.cn/posts/leetgpu01/matrix_addition/
作者
杜子源
发布于
2026-04-20
许可协议
CC BY-NC-SA 4.0
Profile Image of the Author
杜子源
都是风景,幸会
公告
请狠狠地打赏我,打赏一次,爆更一篇!!
音乐
封面

音乐

暂未播放

0:00 0:00
暂无歌词
分类
标签
站点统计
文章
14
分类
6
标签
8
总字数
23,256
运行时长
0
最后活动
0 天前

目录