Multi-Kernel Pipelines
This module covers building efficient multi-stage GPU processing pipelines with proper data flow management.
Why Multiple Kernels?
Complex computations often decompose into stages:
| Stage | Purpose | Example |
|---|---|---|
| Preprocessing | Data transformation | Normalization |
| Processing | Main computation | Convolution |
| Reduction | Aggregate results | Sum/Average |
| Postprocessing | Final transformation | Thresholding |
Breaking into multiple kernels provides:
- Better resource utilization
- Simpler debugging
- Reusable components
Basic Pipeline Pattern
Sequential Execution
public class ImagePipeline
{
private readonly IComputeOrchestrator _orchestrator;
public async Task<byte[]> ProcessAsync(byte[] input, int width, int height)
{
int pixelCount = width * height;
// Allocate pipeline buffers
using var inputBuffer = _orchestrator.CreateBuffer<byte>(pixelCount * 4);
using var normalizedBuffer = _orchestrator.CreateBuffer<float>(pixelCount * 4);
using var processedBuffer = _orchestrator.CreateBuffer<float>(pixelCount * 4);
using var outputBuffer = _orchestrator.CreateBuffer<byte>(pixelCount * 4);
// Transfer input
await inputBuffer.CopyFromAsync(input);
var config = CalculateConfig(pixelCount);
// Stage 1: Normalize (byte -> float, 0-255 -> 0-1)
await _orchestrator.ExecuteKernelAsync(
ImageKernels.Normalize,
config,
inputBuffer, normalizedBuffer);
// Stage 2: Process (e.g., sharpen)
await _orchestrator.ExecuteKernelAsync(
ImageKernels.Sharpen,
config,
normalizedBuffer, processedBuffer, width, height);
// Stage 3: Denormalize (float -> byte)
await _orchestrator.ExecuteKernelAsync(
ImageKernels.Denormalize,
config,
processedBuffer, outputBuffer);
// Transfer output
var output = new byte[pixelCount * 4];
await outputBuffer.CopyToAsync(output);
return output;
}
}
Pipeline Kernels
public static partial class ImageKernels
{
[Kernel]
public static void Normalize(
ReadOnlySpan<byte> input,
Span<float> output)
{
int idx = Kernel.ThreadId.X;
if (idx < output.Length)
{
output[idx] = input[idx] / 255.0f;
}
}
[Kernel]
public static void Sharpen(
ReadOnlySpan<float> input,
Span<float> output,
int width, int height)
{
int idx = Kernel.ThreadId.X;
if (idx >= output.Length) return;
int x = idx % width;
int y = idx / width;
// Skip border pixels
if (x == 0 || x == width - 1 || y == 0 || y == height - 1)
{
output[idx] = input[idx];
return;
}
// Sharpen kernel: center * 5 - neighbors
float center = input[idx] * 5.0f;
float neighbors = input[idx - 1] + input[idx + 1] +
input[idx - width] + input[idx + width];
output[idx] = MathF.Max(0, MathF.Min(1, center - neighbors));
}
[Kernel]
public static void Denormalize(
ReadOnlySpan<float> input,
Span<byte> output)
{
int idx = Kernel.ThreadId.X;
if (idx < output.Length)
{
output[idx] = (byte)(input[idx] * 255.0f);
}
}
}
Concurrent Execution
Using CUDA Streams
Execute independent operations concurrently:
public async Task ProcessMultipleAsync(byte[][] inputs)
{
// Create streams for concurrent execution
var streams = Enumerable.Range(0, 4)
.Select(_ => _orchestrator.CreateStream())
.ToArray();
var tasks = new List<Task>();
for (int i = 0; i < inputs.Length; i++)
{
var stream = streams[i % streams.Length];
var input = inputs[i];
tasks.Add(ProcessOnStreamAsync(input, stream));
}
await Task.WhenAll(tasks);
// Cleanup streams
foreach (var stream in streams)
stream.Dispose();
}
private async Task ProcessOnStreamAsync(byte[] input, IComputeStream stream)
{
using var buffer = _orchestrator.CreateBuffer<byte>(input.Length);
// All operations on same stream execute in order
// Operations on different streams can execute concurrently
await buffer.CopyFromAsync(input, stream);
await _orchestrator.ExecuteKernelAsync(kernel, config, stream, buffer);
await buffer.CopyToAsync(input, stream);
}
Graph-Based Execution
Define dependencies explicitly:
public class ComputeGraph
{
public async Task ExecutePipelineAsync()
{
// Define nodes
var normalize = new KernelNode(ImageKernels.Normalize, inputBuffer, normalizedBuffer);
var blur = new KernelNode(ImageKernels.Blur, normalizedBuffer, blurredBuffer);
var sharpen = new KernelNode(ImageKernels.Sharpen, normalizedBuffer, sharpenedBuffer);
var combine = new KernelNode(ImageKernels.Combine, blurredBuffer, sharpenedBuffer, outputBuffer);
// Define dependencies
// normalize
// / \
// blur sharpen
// \ /
// combine
blur.DependsOn(normalize);
sharpen.DependsOn(normalize);
combine.DependsOn(blur, sharpen);
// Execute (blur and sharpen run concurrently)
await _orchestrator.ExecuteGraphAsync(new[] { normalize, blur, sharpen, combine });
}
}
Buffer Management Strategies
Strategy 1: Static Allocation
Pre-allocate all buffers:
public class StaticPipeline : IDisposable
{
private readonly IBuffer<float>[] _stageBuffers;
public StaticPipeline(IComputeOrchestrator service, int maxSize, int stages)
{
_stageBuffers = new IBuffer<float>[stages];
for (int i = 0; i < stages; i++)
{
_stageBuffers[i] = service.CreateBuffer<float>(maxSize);
}
}
public async Task ProcessAsync(int actualSize)
{
// Use pre-allocated buffers
for (int stage = 0; stage < _stageBuffers.Length - 1; stage++)
{
await _orchestrator.ExecuteKernelAsync(
_kernels[stage],
GetConfig(actualSize),
_stageBuffers[stage],
_stageBuffers[stage + 1]);
}
}
public void Dispose()
{
foreach (var buffer in _stageBuffers)
buffer.Dispose();
}
}
Strategy 2: Ping-Pong Buffers
Alternate between two buffers:
public async Task PingPongProcessAsync(float[] data, int iterations)
{
using var bufferA = _orchestrator.CreateBuffer<float>(data.Length);
using var bufferB = _orchestrator.CreateBuffer<float>(data.Length);
await bufferA.CopyFromAsync(data);
for (int i = 0; i < iterations; i++)
{
var source = (i % 2 == 0) ? bufferA : bufferB;
var dest = (i % 2 == 0) ? bufferB : bufferA;
await _orchestrator.ExecuteKernelAsync(kernel, config, source, dest);
}
// Result is in appropriate buffer based on iteration count
var result = (iterations % 2 == 0) ? bufferA : bufferB;
await result.CopyToAsync(data);
}
Strategy 3: In-Place with Synchronization
[Kernel(RequiresBarrier = true)]
public static void InPlaceProcess(Span<float> data)
{
int idx = Kernel.ThreadId.X;
if (idx >= data.Length) return;
// Phase 1: Read neighbors
float left = (idx > 0) ? data[idx - 1] : 0;
float right = (idx < data.Length - 1) ? data[idx + 1] : 0;
// Synchronize - ensure all reads complete
Kernel.SyncThreads();
// Phase 2: Write result
data[idx] = (left + data[idx] + right) / 3.0f;
}
Reduction Patterns
Parallel Reduction
public static partial class ReductionKernels
{
[Kernel(UseSharedMemory = true)]
public static void SumReduce(
ReadOnlySpan<float> input,
Span<float> partialSums,
int n)
{
// Shared memory for block reduction
var shared = Kernel.AllocateShared<float>(256);
int tid = Kernel.ThreadId.X;
int idx = Kernel.BlockId.X * Kernel.BlockDim.X + tid;
// Load to shared memory
shared[tid] = (idx < n) ? input[idx] : 0;
Kernel.SyncThreads();
// Reduce within block
for (int stride = Kernel.BlockDim.X / 2; stride > 0; stride >>= 1)
{
if (tid < stride)
{
shared[tid] += shared[tid + stride];
}
Kernel.SyncThreads();
}
// Write block result
if (tid == 0)
{
partialSums[Kernel.BlockId.X] = shared[0];
}
}
}
public async Task<float> SumAsync(float[] data)
{
int n = data.Length;
int blockSize = 256;
int gridSize = (n + blockSize - 1) / blockSize;
using var inputBuffer = _orchestrator.CreateBuffer<float>(n);
using var partialBuffer = _orchestrator.CreateBuffer<float>(gridSize);
await inputBuffer.CopyFromAsync(data);
// First reduction pass
await _orchestrator.ExecuteKernelAsync(
ReductionKernels.SumReduce,
new KernelConfig { BlockSize = blockSize, GridSize = gridSize },
inputBuffer, partialBuffer, n);
// Continue until single value
while (gridSize > 1)
{
int newGridSize = (gridSize + blockSize - 1) / blockSize;
using var newPartialBuffer = _orchestrator.CreateBuffer<float>(newGridSize);
await _orchestrator.ExecuteKernelAsync(
ReductionKernels.SumReduce,
new KernelConfig { BlockSize = blockSize, GridSize = newGridSize },
partialBuffer, newPartialBuffer, gridSize);
gridSize = newGridSize;
// Swap buffers
}
var result = new float[1];
await partialBuffer.CopyToAsync(result);
return result[0];
}
Pipeline Performance Tips
1. Minimize Kernel Launches
// BAD: Many small kernels
await ExecuteAsync(Normalize, data);
await ExecuteAsync(Square, data);
await ExecuteAsync(Sum, data);
// BETTER: Fused kernel
[Kernel]
public static void NormalizeSquareSum(Span<float> data)
{
int idx = Kernel.ThreadId.X;
float val = data[idx] / 255.0f; // Normalize
data[idx] = val * val; // Square
// Sum handled separately (requires reduction)
}
2. Overlap Transfers and Compute
// Process in batches with overlap
for (int batch = 0; batch < batchCount; batch++)
{
// Start transfer of next batch while processing current
var transferTask = (batch < batchCount - 1)
? buffers[(batch + 1) % 2].CopyFromAsync(GetBatch(batch + 1))
: Task.CompletedTask;
// Process current batch
await _orchestrator.ExecuteKernelAsync(kernel, config, buffers[batch % 2]);
// Ensure transfer complete before next iteration
await transferTask;
}
3. Use Appropriate Buffer Sizes
Balance between parallelism and overhead:
| Data Size | Recommendation |
|---|---|
| < 10K | Single kernel, consider CPU |
| 10K - 1M | Standard pipeline |
| > 1M | Stream with overlap |
Exercises
Exercise 1: Image Filter Pipeline
Build a pipeline: Grayscale → Blur → Edge Detection → Threshold
Exercise 2: Parallel Reduction
Implement parallel min/max reduction.
Exercise 3: Stream Overlap
Measure speedup from overlapping transfer and compute.
Key Takeaways
- Decompose complex operations into pipeline stages
- Reuse buffers to minimize allocation
- Use streams for concurrent execution
- Fuse kernels when possible to reduce launch overhead
- Overlap transfers and compute for maximum throughput
Next Module
Learn to debug GPU code and handle failures gracefully.