LeetGPU习题04:Reduce汇总
1988 字
10 分钟
LeetGPU习题04:Reduce汇总
习题列表
| 挑战名称 | 难度 | 规约特征说明 |
|---|---|---|
| Reduction | Medium | 直接的并行规约,对数组元素执行求和、最大值等归约为标量 |
| Dot Product | Medium | 两个向量逐元素相乘后再求和,规约得到一个标量 |
| Softmax | Medium | 计算 softmax 需先求最大值再求指数和 |
| Categorical Cross Entropy Loss | Medium | 对预测概率与真实标签计算交叉熵,涉及对数、乘法后的求和规约 |
| Mean Squared Error | Medium | 计算所有样本的平方误差之和并求平均,核心是求和规约 |
| Batch Normalization | Medium | 在批次维度上计算均值与方差,均依赖求和规约 |
| RMS Normalization | Medium | 计算均方根,需要先求平方和再归一化 |
| Histogramming | Medium | 统计数值落入各 bin 的数量,可视为对每个 bin 的计数规约(常通过原子操作或局部规约实现) |
| Count Array Element | Medium | 统计等于指定值的元素个数,本质是对布尔标记的求和规约 |
| Count 2D Array Element | Medium | 在二维数组中统计指定值的元素个数,同理为求和规约 |
| Count 3D Array Element | Medium | 在三维数组中统计指定值的元素个数,同理为求和规约 |
| Monte Carlo Integration | Medium | 通过对函数值求平均来估计积分,核心是求均值规约 |
| Subarray Sum | Medium | 计算给定区间内元素的总和,是对子区域进行求和规约 |
| 2D Subarray Sum | Medium | 计算二维子区域的总和,属于多维部分规约求和 |
| 3D Subarray Sum | Medium | 计算三维子区域的总和,属于多维部分规约求和 |
| Max Subarray Sum | Medium | 找出所有固定长度子数组和的最大值,内含多次求和与一次 max reduction |
Reduction
CUDA版本
仅供参考
#include <cuda_runtime.h>
__device__ __forceinline__ float warpReduceSum(float val) { val += __shfl_down_sync(0xffffffff, val, 16); val += __shfl_down_sync(0xffffffff, val, 8); val += __shfl_down_sync(0xffffffff, val, 4); val += __shfl_down_sync(0xffffffff, val, 2); val += __shfl_down_sync(0xffffffff, val, 1); return val;}
__device__ __forceinline__ float blockReduceSum(float val) { __shared__ float warpSums[32]; int lane = threadIdx.x & 31; int wid = threadIdx.x >> 5;
val = warpReduceSum(val);
if (lane == 0) warpSums[wid] = val; __syncthreads();
// 只让第一个 warp 做最终归约 int numWarps = blockDim.x >> 5; val = (threadIdx.x < numWarps) ? warpSums[threadIdx.x] : 0.0f; if (wid == 0) val = warpReduceSum(val);
return val;}
__global__ void reduce(const float* __restrict__ in, float* __restrict__ out, int N) { // 每个线程用 float4 向量化加载,一次处理 4 个元素 int tid = blockIdx.x * blockDim.x + threadIdx.x; int stride = blockDim.x * gridDim.x;
float sum = 0.0f;
// 向量化加载:以 float4 为单位遍历 int vecN = N >> 2; // N / 4 const float4* in4 = reinterpret_cast<const float4*>(in);
for (int i = tid; i < vecN; i += stride) { float4 v = in4[i]; sum += v.x + v.y + v.z + v.w; }
// 处理尾部不足 4 个的元素 int tailStart = vecN << 2; for (int i = tailStart + tid; i < N; i += stride) { sum += in[i]; }
// block 内归约 sum = blockReduceSum(sum);
if (threadIdx.x == 0) atomicAdd(out, sum);}
extern "C" void solve(const float* input, float* output, int N) { const int BLOCK = 256; // 每个线程处理多个 float4,不需要太多 block int grid = min((N + BLOCK * 8 - 1) / (BLOCK * 8), 1024);
// output 需要先清零 cudaMemsetAsync(output, 0, sizeof(float)); reduce<<<grid, BLOCK>>>(input, output, N);}Triton版本
import torchimport tritonimport triton.language as tl
@triton.jitdef reduce_kernel( input_ptr, partial_ptr, N, BLOCK_SIZE: tl.constexpr,): pid = tl.program_id(0) offsets = pid * BLOCK_SIZE + tl.arange(0, BLOCK_SIZE) mask = offsets < N x = tl.load(input_ptr + offsets, mask=mask, other=0.0) s = tl.sum(x, axis=0) # 注意这里同样要用原子加法 tl.atomic_add(partial_ptr, s)
def solve(input: torch.Tensor, output: torch.Tensor, N: int): BLOCK_SIZE = 1024 grid = ((N + BLOCK_SIZE - 1) // BLOCK_SIZE,)
# output 清零,因为用 atomic_add 累加 output.zero_() reduce_kernel[grid](input, output, N, BLOCK_SIZE=BLOCK_SIZE)Pytorch版本
import torch
def solve(input: torch.Tensor, output: torch.Tensor, N: int): output.copy_(input[:N].sum().unsqueeze(0))Dot Product
CUDA版本
#include "reduce.cuh"
__device__ __forceinline__float warp_reduce_sum(float val) { unsigned mask = 0xffffffff; val += __shfl_down_sync(mask, val, 16); val += __shfl_down_sync(mask, val, 8); val += __shfl_down_sync(mask, val, 4); val += __shfl_down_sync(mask, val, 2); val += __shfl_down_sync(mask, val, 1); return val;}
__global__ void dot_product_v0(const float* a, const float* b, float* out, int n) { __shared__ float sdata[256];
int tid = threadIdx.x; int idx = blockIdx.x * blockDim.x + tid;
float sum = 0.0f; int stride = blockDim.x * gridDim.x;
// grid-loop element wise版本 for(int i = idx; i < n; i += stride) { sum += a[i] * b[i]; } sdata[tid] = sum; __syncthreads();
// 规约 for (int s = blockDim.x / 2; s > 0; s /= 2) { if (tid < s) { sdata[tid] += sdata[tid + s]; } __syncthreads(); }
// 求和 if (tid == 0) { atomicAdd(out, sdata[0]); }}
template <typename VecType, int blockSize>__global__ void dot_product_v1(const float* a, const float* b, float* out, int n) {
// blockSize(256)除以 32 得到 numWarps = 8,即每个 block 有 8 个 warp,精确计算出需要的内存(进一步优化,可以考虑在编译期确定) constexpr int numWarps = blockSize / 32; __shared__ float sdata[numWarps];
// tid & 31等价于取模运算,即该线程取出低5位,即该线程在warp内部的0-31编号。 // tid >> 5 等价于除以32, 得到warp的索引 int tid = threadIdx.x; int lane = tid & 31; int warp_id = tid >> 5;
float sum = 0.0f;
// 类型为float2、float4 const VecType* a_vec = reinterpret_cast<const VecType*>(a); const VecType* b_vec = reinterpret_cast<const VecType*>(b);
constexpr int vecSize = sizeof(VecType) / sizeof(float);
int numVec = n / vecSize; int stride = blockSize * gridDim.x;
for (int i = blockIdx.x * blockSize + tid; i < numVec; i += stride) { VecType a_val = a_vec[i]; VecType b_val = b_vec[i]; if constexpr (vecSize == 1) { sum += a_val.x * b_val.x; } else if constexpr (vecSize == 2) { sum += a_val.x * b_val.x + a_val.y * b_val.y; } else { // float4 sum += a_val.x * b_val.x + a_val.y * b_val.y + a_val.z * b_val.z + a_val.w * b_val.w; } }
// 处理向量化未能覆盖的尾部剩余元素(n 不是 vecSize 倍数的情况) // 仅由线程 0 执行,避免所有线程做冗余工作,且保证只做一次。 if (tid == 0) { int start = numVec * vecSize; for (int i = start ; i < n; i++) { sum += a[i] * b[i]; } }
// warp 内部 shuffle sum = warp_reduce_sum(sum); // 把结果放到共享内存中 if (lane == 0) { sdata[warp_id] = sum; } __syncthreads();
// 前 8 个线程(tid 0~7)从共享内存读取对应 warp 的部分和,其他线程设 0。 if (warp_id == 0) { sum = (tid < numWarps) ? sdata[tid] : 0.0f; sum = warp_reduce_sum(sum); if (tid == 0) { atomicAdd(out, sum); } }}
int main() { const int N = 1 << 25; const int iter = 3;
std::vector<float> h_a(N, 1.0f), h_b(N, 2.0f); benchmark_dot_product("naive", N, iter, h_a, h_b, [&](const float* A, const float* B, float* out, int N) { constexpr int blockSize = 256; int gridSize = (N + blockSize - 1) / blockSize; cudaMemset(out, 0, sizeof(float)); dot_product_v0<<<gridSize, blockSize>>>(A, B, out, N); });
benchmark_dot_product("optimized", N, iter, h_a, h_b, [&](const float* A, const float* B, float* out, int N) { constexpr int blockSize = 256; int gridSize = 1024; cudaMemset(out, 0, sizeof(float)); dot_product_v1<float4, blockSize><<<gridSize, blockSize>>>(A, B, out, N); }); return 0;}Triton和Pytorch版本
import tritonimport triton.language as tlimport torch
@triton.jitdef dot_product_atomic_kernel(x_ptr, y_ptr, output_ptr, n_elements, BLOCK_SIZE: tl.constexpr): pid = tl.program_id(0) block_start = pid * BLOCK_SIZE offsets = block_start + tl.arange(0, BLOCK_SIZE) mask = offsets < n_elements
x = tl.load(x_ptr + offsets, mask=mask, other=0.0) y = tl.load(y_ptr + offsets, mask=mask, other=0.0) partial = tl.sum(x * y) tl.atomic_add(output_ptr, partial)
def dot_product_atomic(x: torch.Tensor, y: torch.Tensor, BLOCK_SIZE: int = 1024) -> torch.Tensor: n = x.numel() output = torch.zeros(1, device=x.device, dtype=torch.float32) grid = (triton.cdiv(n, BLOCK_SIZE),) dot_product_atomic_kernel[grid](x, y, output, n, BLOCK_SIZE=BLOCK_SIZE) return output
@triton.jitdef dot_product_partial_kernel(x_ptr, y_ptr, partial_out_ptr, n_elements, BLOCK_SIZE: tl.constexpr): """Stage 1: 计算局部点积, 写入 per-block 缓冲区.""" pid = tl.program_id(0) block_start = pid * BLOCK_SIZE offsets = block_start + tl.arange(0, BLOCK_SIZE) mask = offsets < n_elements
x = tl.load(x_ptr + offsets, mask=mask, other=0.0) y = tl.load(y_ptr + offsets, mask=mask, other=0.0) partial = tl.sum(x * y) tl.store(partial_out_ptr + pid, partial)
@triton.jitdef reduce_final_kernel(partial_ptr, output_ptr, n_partials, BLOCK_SIZE: tl.constexpr): """Stage 2: 将 partial 缓冲区归约为最终标量.""" pid = tl.program_id(0) block_start = pid * BLOCK_SIZE offsets = block_start + tl.arange(0, BLOCK_SIZE) mask = offsets < n_partials
vals = tl.load(partial_ptr + offsets, mask=mask, other=0.0) partial = tl.sum(vals) tl.atomic_add(output_ptr, partial)
def dot_product_two_stage(x: torch.Tensor, y: torch.Tensor, BLOCK_SIZE: int = 1024, REDUCE_BLOCK: int = 1024) -> torch.Tensor: n = x.numel() num_blocks = triton.cdiv(n, BLOCK_SIZE)
# Stage 1: 局部点积 partial = torch.empty(num_blocks, device=x.device, dtype=torch.float32) grid1 = (num_blocks,) dot_product_partial_kernel[grid1](x, y, partial, n, BLOCK_SIZE=BLOCK_SIZE)
# Stage 2: Triton 归约 output = torch.zeros(1, device=x.device, dtype=torch.float32) num_reduce = triton.cdiv(num_blocks, REDUCE_BLOCK) grid2 = (num_reduce,) reduce_final_kernel[grid2](partial, output, num_blocks, BLOCK_SIZE=REDUCE_BLOCK)
return output
def dot_product_pytorch_dot(x: torch.Tensor, y: torch.Tensor) -> torch.Tensor: return torch.dot(x, y)
def dot_product_pytorch_sum(x: torch.Tensor, y: torch.Tensor) -> torch.Tensor: return torch.sum(x * y)
def benchmark(fn, x, y, ref, name, iters=100, warmup=10): for _ in range(warmup): fn(x, y) torch.cuda.synchronize()
start = torch.cuda.Event(enable_timing=True) end = torch.cuda.Event(enable_timing=True)
start.record() for _ in range(iters): fn(x, y) end.record() torch.cuda.synchronize()
elapsed_ms = start.elapsed_time(end) / iters
result = fn(x, y) result_val = result.item() ref_val = ref.item() if isinstance(ref, torch.Tensor) else ref
error = abs(result_val - ref_val) rel_error = error / (abs(ref_val) + 1e-10) passed = rel_error < 1e-4
bytes_read = 2 * x.numel() * x.element_size() bw = bytes_read / (elapsed_ms * 1e6)
tag = "PASS" if passed else "FAIL" print(f" [{name}]") print(f" Time: {elapsed_ms:.4f} ms | BW: {bw:.2f} GB/s | {tag}") if not passed: print(f" Debug: Ref={ref_val:.6f}, Result={result_val:.6f}, " f"RelError={rel_error:.6e}")
return elapsed_ms, bw, passed
def main(): configs = [ ("32M", 33554432), ]
for name, N in configs: print(f"--- N = {name} ({N:,} elements) ---")
x = torch.randn(N, device='cuda', dtype=torch.float32) y = torch.randn(N, device='cuda', dtype=torch.float32) ref = torch.dot(x.cpu().double(), y.cpu().double())
benchmark(dot_product_atomic, x, y, ref, "Atomic") benchmark(dot_product_two_stage, x, y, ref, "Two-Stage") benchmark(dot_product_pytorch_dot, x, y, ref, "PyTorch dot") benchmark(dot_product_pytorch_sum, x, y, ref, "PyTorch sum(a*b)") print()
if __name__ == "__main__": main()支持与分享
如果这篇文章对你有帮助,欢迎分享给更多人或赞助支持!
LeetGPU习题04:Reduce汇总
https://dlog.com.cn/posts/leetgpu04/leet_reduce/