LeetGPU习题01:Matrix Addition

2007 字
10 分钟
LeetGPU习题01:Matrix Addition
2026-04-20
更新中

在 LeetGPU 的习题列表中,Element-wise 算子指的是对输入张量/数组的每个元素独立执行相同操作、不依赖于其他元素或全局统计量的算子。

以下是明确的 Element-wise 算子:

算子说明#

名称说明
Vector Addition两个向量逐元素相加
Matrix Addition两个矩阵逐元素相加
Matrix Copy逐元素复制矩阵
Color Inversion对每个像素独立取反
Reverse Array反转数组,每个元素独立移动位置
ReLU逐元素应用 ReLU 函数
Leaky ReLU逐元素应用 Leaky ReLU
Sigmoid Activation逐元素应用 Sigmoid 函数
Value Clipping逐元素裁剪到指定范围
Sigmoid Linear Unit (SiLU)逐元素 SiLU 激活
Swish-Gated Linear Unit (SWiGLU)逐元素 SWiGLU(门控部分也为逐元素)
Gaussian Error Gated Linear Unit (GEGLU)逐元素 GEGLU 激活
RGB to Grayscale每个像素独立转换,不依赖邻域
Interleave Arrays交替合并两数组,每个输出元素仅依赖对应位置输入
Rotary Positional Embedding对每个位置独立应用旋转矩阵
Weight Dequantization每个权重独立反量化
INT8 Quantized MatMul(仅反量化部分)反量化部分为逐元素,整体不是
Simple Inference线性层前向包含矩阵乘,非 element-wise,但其中的激活部分可能是逐元素

Vector Addition#

1. Matrix Addition题目#

实现一个在 GPU 上对两个包含 32 位浮点数的矩阵进行逐元素相加的程序。程序接收两个相同维度的输入矩阵,输出一个矩阵,其中的每个元素为对应位置元素之和。

示例 1:

输入:
A = [[1.0, 2.0],
[3.0, 4.0]]
B = [[5.0, 6.0],
[7.0, 8.0]]
输出:
C = [[6.0, 8.0],
[10.0, 12.0]]

示例 2:

输入:
A = [[1.5, 2.5, 3.5],
[4.5, 5.5, 6.5],
[7.5, 8.5, 9.5]]
B = [[0.5, 0.5, 0.5],
[0.5, 0.5, 0.5],
[0.5, 0.5, 0.5]]
输出:
C = [[2.0, 3.0, 4.0],
[5.0, 6.0, 7.0],
[8.0, 9.0, 10.0]]

约束条件

  • 输入矩阵 AB 维度相同
  • 1 ≤ N ≤ 4096
  • 所有元素均为 32 位浮点数
  • 性能评测基于 N = 4,096

2. Pytorch题解#

import torch
# A, B, C are tensors on the GPU
def solve(A: torch.Tensor, B: torch.Tensor, C: torch.Tensor, N: int):
C.copy_(A + B)

3. Triton题解#

这里应该如何优化呢? 参考向量化、二维化

Naive、Vec、2D性能对比
Naive、Vec、2D性能对比

3.1. 向量化如何计算offset?#

block_start = pid * BLOCK_SIZE * VEC_WIDTH
offsets = block_start + tl.arange(0, BLOCK_SIZE)[:, None] * VEC_WIDTH + tl.arange(0, VEC_WIDTH)[None, :]
offsets = tl.reshape(offsets, (BLOCK_SIZE * VEC_WIDTH,))
  1. 先来看一个例子, 假设 BLOCK_SIZE=4,VEC_WIDTH=2,pid=0(起始 0):
  • tl.arange(0,4)[:,None]*2 → 列向量 [[0], [2], [4], [6]]

  • tl.arange(0,2)[None,:] → 行向量 [[0, 1]]

广播相加得到:

[[0, 1],
[2, 3],
[4, 5],
[6, 7]]

展平后得到 [0, 1, 2, 3, 4, 5, 6, 7]。这正好是线程块 0 要处理的前 8 个元素的连续索引。

  1. 计算详解

block_start就是一次性要处理BLOCK_SIZE * VEC_WIDTH个元素,再乘以当前的pid。

之后构造每一个线程的偏移量数组:

tl.arange(0, BLOCK_SIZE) # shape:(1, BLOCK_SIZE)
tl.arange(0, BLOCK_SIZE)[:, None] # 转置: (BLOCK_SIZE, 1)
tl.arange(0, BLOCK_SIZE)[:, None] * VEC_WIDTH # 每个线程本身负责的第一个元素的偏移量

例如:VEC_WIDTH=4时,tid=0得到下标0,tid=1得到下标4,tid=2得到下标8等等。

在上述处理之后,我们还需要处理内部的元素:

+ tl.arange(0, VEC_WITDH) # shape(1, VEC_WIDTH)
+ tl.arange(0, VEC_WIDTH)[None, :] # 行向量 (1, VEC_WIDTH)

最终得到一个形状为(BLOCK_SIZE, VEC_WIDTH)的二维数组,每一行是一个线程,每一列是单个线程要处理的连续偏移。

例如tid=1 & VEC_WIDTH=4 该行的内容为[4,5,6,7]

offsets = block_start + tl.arange(0, BLOCK_SIZE)[:, None] * VEC_WIDTH + tl.arange(0, VEC_WIDTH)[None, :]

在计算完向量化的坐标之后,我们再将其变为一维数组:

offsets = tl.reshape(offsets, (BLOCK_SIZE * VEC_WIDTH,))

3.2. 二维块指针构造#

实际上,在处理天然具有二维结构的数据时,并且并行算法也是二维划分时,选择二维线程块是一个自然的选择。

典型场景

矩阵运算、图像处理、卷积池化等。

在这些场景下,代码会更加直观,可以直接对应数据块的行列位置,并且无需进行重新转换ID,并且可以调整并行度以适应不同的硬件架构和数据模型。

当访存模式非常复杂时,就不太适合转换为1维操作来实现。

a_block_ptr = tl.make_block_ptr(
base=a_ptr, # 内存基地址
shape=(N, N), # 完整数据的形状
strides=(N, 1), # 行步长、列步长
offsets=(pid_m * BLOCK_M, pid_n * BLOCK_N), # 当前块的起始坐标
block_shape=(BLOCK_M, BLOCK_N), # 要加载的块大小
order=(1, 0) # 线程映射顺序
)

假设我们有一个4*4的矩阵A,在GPU以行优先存储,内存布局如下:

矩阵和内存映射
矩阵和内存映射

3.3. 完整的三种方法对比#

import torch
import triton
import triton.language as tl
import time
import numpy as np
# ------------------------------------------------------------
# 方案 1:直接加法
# ------------------------------------------------------------
@triton.jit
def matrix_add_kernel(a, b, c, n_elements, BLOCK_SIZE: tl.constexpr):
pid = tl.program_id(axis=0)
block_start = pid * BLOCK_SIZE
offsets = block_start + tl.arange(0, BLOCK_SIZE)
mask = offsets < n_elements
ga = tl.load(a + offsets, mask=mask)
gb = tl.load(b + offsets, mask=mask)
gc = ga + gb
tl.store(c + offsets, gc, mask=mask)
def solve_triton_naive(a: torch.Tensor, b: torch.Tensor, c: torch.Tensor, N: int):
BLOCK_SIZE = 1024
n_elements = N * N
grid = (triton.cdiv(n_elements, BLOCK_SIZE),)
matrix_add_kernel[grid](a, b, c, n_elements, BLOCK_SIZE)
# ------------------------------------------------------------
# 方案 2:Triton 一维向量化 + Autotune
# ------------------------------------------------------------
@triton.autotune(
configs=[
triton.Config({'BLOCK_SIZE': 1024, 'VEC_WIDTH': 1}, num_warps=4),
triton.Config({'BLOCK_SIZE': 1024, 'VEC_WIDTH': 2}, num_warps=4),
triton.Config({'BLOCK_SIZE': 2048, 'VEC_WIDTH': 2}, num_warps=8),
triton.Config({'BLOCK_SIZE': 4096, 'VEC_WIDTH': 4}, num_warps=8),
triton.Config({'BLOCK_SIZE': 4096, 'VEC_WIDTH': 8}, num_warps=16),
],
key=['n_elements'],
)
@triton.jit
def matrix_add_kernel_1d(
a_ptr, b_ptr, c_ptr,
n_elements: tl.constexpr,
BLOCK_SIZE: tl.constexpr,
VEC_WIDTH: tl.constexpr,
):
pid = tl.program_id(axis=0)
block_start = pid * BLOCK_SIZE * VEC_WIDTH
offsets = block_start + tl.arange(0, BLOCK_SIZE)[:, None] * VEC_WIDTH + tl.arange(0, VEC_WIDTH)[None, :]
offsets = tl.reshape(offsets, (BLOCK_SIZE * VEC_WIDTH,))
mask = offsets < n_elements
a_vals = tl.load(a_ptr + offsets, mask=mask, other=0.0)
b_vals = tl.load(b_ptr + offsets, mask=mask, other=0.0)
c_vals = a_vals + b_vals
tl.store(c_ptr + offsets, c_vals, mask=mask)
def solve_triton_1d(A: torch.Tensor, B: torch.Tensor, C: torch.Tensor, N: int):
n_elements = N * N
grid = lambda meta: (triton.cdiv(n_elements, meta['BLOCK_SIZE'] * meta['VEC_WIDTH']),)
matrix_add_kernel_1d[grid](A, B, C, n_elements)
# ------------------------------------------------------------
# 方案 3:Triton 二维块指针 + Autotune
# ------------------------------------------------------------
@triton.autotune(
configs=[
triton.Config({'BLOCK_M': 128, 'BLOCK_N': 128}, num_warps=4),
triton.Config({'BLOCK_M': 128, 'BLOCK_N': 256}, num_warps=4),
triton.Config({'BLOCK_M': 256, 'BLOCK_N': 128}, num_warps=8),
triton.Config({'BLOCK_M': 256, 'BLOCK_N': 256}, num_warps=8),
triton.Config({'BLOCK_M': 512, 'BLOCK_N': 128}, num_warps=8),
triton.Config({'BLOCK_M': 512, 'BLOCK_N': 256}, num_warps=8),
],
key=['N'],
)
@triton.jit
def matrix_add_kernel_2d(
a_ptr, b_ptr, c_ptr,
N,
BLOCK_M: tl.constexpr,
BLOCK_N: tl.constexpr,
):
pid_m = tl.program_id(axis=0)
pid_n = tl.program_id(axis=1)
a_block_ptr = tl.make_block_ptr(
base=a_ptr,
shape=(N, N),
strides=(N, 1),
offsets=(pid_m * BLOCK_M, pid_n * BLOCK_N),
block_shape=(BLOCK_M, BLOCK_N),
order=(1, 0),
)
b_block_ptr = tl.make_block_ptr(
base=b_ptr,
shape=(N, N),
strides=(N, 1),
offsets=(pid_m * BLOCK_M, pid_n * BLOCK_N),
block_shape=(BLOCK_M, BLOCK_N),
order=(1, 0),
)
c_block_ptr = tl.make_block_ptr(
base=c_ptr,
shape=(N, N),
strides=(N, 1),
offsets=(pid_m * BLOCK_M, pid_n * BLOCK_N),
block_shape=(BLOCK_M, BLOCK_N),
order=(1, 0),
)
a = tl.load(a_block_ptr, boundary_check=(0, 1))
b = tl.load(b_block_ptr, boundary_check=(0, 1))
c = a + b
tl.store(c_block_ptr, c, boundary_check=(0, 1))
def solve_triton_2d(A: torch.Tensor, B: torch.Tensor, C: torch.Tensor, N: int):
grid = lambda meta: (
triton.cdiv(N, meta['BLOCK_M']),
triton.cdiv(N, meta['BLOCK_N']),
)
matrix_add_kernel_2d[grid](A, B, C, N)
# ------------------------------------------------------------
# 性能测试工具
# ------------------------------------------------------------
def benchmark(func, A, B, C, N, warmup=10, repeat=100):
"""
运行指定函数多次,返回平均耗时(毫秒)。
"""
# 预热
for _ in range(warmup):
func(A, B, C, N)
torch.cuda.synchronize()
# 计时
start = time.perf_counter()
for _ in range(repeat):
func(A, B, C, N)
torch.cuda.synchronize()
end = time.perf_counter()
avg_time_ms = (end - start) / repeat * 1000
return avg_time_ms
def verify_results(C_triton_naive, C_triton_1d, C_triton_2d):
"""
验证三种方案结果是否一致。
"""
if torch.allclose(C_triton_naive, C_triton_1d, atol=1e-5):
print("✅ PyTorch 与 Triton 1D 结果一致")
else:
print("❌ PyTorch 与 Triton 1D 结果不一致")
if torch.allclose(C_triton_naive, C_triton_2d, atol=1e-5):
print("✅ PyTorch 与 Triton 2D 结果一致")
else:
print("❌ PyTorch 与 Triton 2D 结果不一致")
def main():
# 检查 CUDA 可用性
if not torch.cuda.is_available():
raise RuntimeError("CUDA 不可用,请在有 GPU 的环境下运行")
device = torch.device("cuda")
print(f"运行设备: {torch.cuda.get_device_name(device)}")
# 问题规模
N = 4096
# 分配 GPU 张量
A = torch.randn(N, N, device=device, dtype=torch.float32)
B = torch.randn(N, N, device=device, dtype=torch.float32)
C_triton_naive = torch.empty_like(A)
C_triton_1d = torch.empty_like(A)
C_triton_2d = torch.empty_like(A)
print(f"\n矩阵大小: {N} x {N} ({N*N} 个元素)")
# 验证正确性(单独运行一次)
solve_triton_naive(A, B, C_triton_naive, N)
solve_triton_1d(A, B, C_triton_1d, N)
solve_triton_2d(A, B, C_triton_2d, N)
verify_results(C_triton_naive, C_triton_1d, C_triton_2d)
# 性能测试
print("\n开始性能测试 (预热 10 次,计时 100 次取平均)...\n")
time_pytorch = benchmark(solve_triton_naive, A, B, C_triton_naive, N)
time_triton_1d = benchmark(solve_triton_1d, A, B, C_triton_1d, N)
time_triton_2d = benchmark(solve_triton_2d, A, B, C_triton_2d, N)
# 输出结果
print(f"PyTorch 直接加法: {time_pytorch:.4f} ms")
print(f"Triton 1D 向量化: {time_triton_1d:.4f} ms")
print(f"Triton 2D 块指针: {time_triton_2d:.4f} ms")
# 计算加速比
baseline = time_pytorch
print(f"\n相对于 PyTorch 的加速比:")
print(f" Triton 1D: {baseline / time_triton_1d:.2f}x")
print(f" Triton 2D: {baseline / time_triton_2d:.2f}x")
# 计算内存带宽
bytes_per_element = 4 # float32
total_bytes = 3 * N * N * bytes_per_element # A读 + B读 + C写
bw_pytorch = total_bytes / (time_pytorch / 1000) / 1e9
bw_triton_1d = total_bytes / (time_triton_1d / 1000) / 1e9
bw_triton_2d = total_bytes / (time_triton_2d / 1000) / 1e9
print(f"\n估算内存带宽 (GB/s):")
print(f" PyTorch: {bw_pytorch:.2f} GB/s")
print(f" Triton 1D: {bw_triton_1d:.2f} GB/s")
print(f" Triton 2D: {bw_triton_2d:.2f} GB/s")
if __name__ == "__main__":
main()

支持与分享

如果这篇文章对你有帮助,欢迎分享给更多人或赞助支持!

赞助
LeetGPU习题01:Matrix Addition
https://dlog.com.cn/posts/leetgpu01/all_element_wise/
作者
杜子源
发布于
2026-04-20
许可协议
CC BY-NC-SA 4.0
Profile Image of the Author
杜子源
都是风景,幸会
公告
如果需要源码,请还请打赏后B站私信我哦~您的支持是我最大的动力!
音乐
封面

音乐

暂未播放

0:00 0:00
暂无歌词
分类
标签
站点统计
文章
14
分类
6
标签
8
总字数
22,015
运行时长
0
最后活动
0 天前

目录