Table of Contents

Class CudaRingKernelRuntime

Namespace
DotCompute.Backends.CUDA.RingKernels
Assembly
DotCompute.Backends.CUDA.dll

CUDA runtime for managing persistent ring kernels.

public sealed class CudaRingKernelRuntime : IRingKernelRuntime, IAsyncDisposable
Inheritance
CudaRingKernelRuntime
Implements
Inherited Members

Remarks

Provides lifecycle management for CUDA-based ring kernels including:

  • Kernel launch with cooperative groups
  • Activation/deactivation control
  • Message routing and queue management
  • Status monitoring and metrics collection

Constructors

CudaRingKernelRuntime(ILogger<CudaRingKernelRuntime>, CudaRingKernelCompiler, MessageQueueRegistry, RingKernelFaultRecoveryOptions?)

Initializes a new instance of the CudaRingKernelRuntime class.

public CudaRingKernelRuntime(ILogger<CudaRingKernelRuntime> logger, CudaRingKernelCompiler compiler, MessageQueueRegistry registry, RingKernelFaultRecoveryOptions? faultRecoveryOptions = null)

Parameters

logger ILogger<CudaRingKernelRuntime>

Logger instance.

compiler CudaRingKernelCompiler

Ring kernel compiler.

registry MessageQueueRegistry

Message queue registry for named queues.

faultRecoveryOptions RingKernelFaultRecoveryOptions

Optional fault recovery configuration.

Properties

Watchdog

Gets the watchdog instance for this runtime.

public RingKernelWatchdog? Watchdog { get; }

Property Value

RingKernelWatchdog

Methods

ActivateAsync(string, CancellationToken)

Activates a launched ring kernel to begin processing.

public Task ActivateAsync(string kernelId, CancellationToken cancellationToken = default)

Parameters

kernelId string

Kernel identifier.

cancellationToken CancellationToken

Cancellation token.

Returns

Task

A task representing the activation operation.

Remarks

For persistent kernels, activation causes the kernel to begin its main processing loop. For event-driven kernels, it enables event handling. The kernel continues running until deactivated or terminated.

Exceptions

InvalidOperationException

Thrown if the kernel has not been launched or is already active.

CreateMessageQueueAsync<T>(int, CancellationToken)

Creates a message queue for inter-kernel communication.

public Task<IMessageQueue<T>> CreateMessageQueueAsync<T>(int capacity, CancellationToken cancellationToken = default) where T : unmanaged

Parameters

capacity int

Queue capacity (must be power of 2).

cancellationToken CancellationToken

Cancellation token.

Returns

Task<IMessageQueue<T>>

An initialized message queue.

Type Parameters

T

Message payload type.

CreateNamedMessageQueueAsync<T>(string, MessageQueueOptions, CancellationToken)

Creates a named message queue with advanced configuration options.

public Task<IMessageQueue<T>> CreateNamedMessageQueueAsync<T>(string queueName, MessageQueueOptions options, CancellationToken cancellationToken = default) where T : IRingKernelMessage

Parameters

queueName string

Unique queue identifier.

options MessageQueueOptions

Queue configuration options.

cancellationToken CancellationToken

Cancellation token.

Returns

Task<IMessageQueue<T>>

An initialized message queue.

Type Parameters

T

Message type implementing IRingKernelMessage.

Remarks

Named queues support:

  • Priority-based ordering
  • Message deduplication
  • Multiple backpressure strategies
  • Timeout-based message expiration
  • Lock-free concurrent access

Queue names must be unique within the runtime. Attempting to create a queue with an existing name throws InvalidOperationException.

Exceptions

ArgumentException

Thrown if queueName is null or empty.

InvalidOperationException

Thrown if a queue with the specified name already exists.

DeactivateAsync(string, CancellationToken)

Deactivates a ring kernel (pause without termination).

public Task DeactivateAsync(string kernelId, CancellationToken cancellationToken = default)

Parameters

kernelId string

Kernel identifier.

cancellationToken CancellationToken

Cancellation token.

Returns

Task

A task representing the deactivation operation.

Remarks

Deactivation pauses the kernel's processing loop but keeps it resident on the GPU. State is preserved and the kernel can be reactivated without relaunching. Useful for dynamic load balancing or power management.

Exceptions

InvalidOperationException

Thrown if the kernel is not currently active.

DestroyNamedMessageQueueAsync(string, CancellationToken)

Destroys a named message queue and releases resources.

public Task<bool> DestroyNamedMessageQueueAsync(string queueName, CancellationToken cancellationToken = default)

Parameters

queueName string

Queue identifier to destroy.

cancellationToken CancellationToken

Cancellation token.

Returns

Task<bool>

true if the queue was found and destroyed; otherwise, false.

Remarks

Destroying a queue:

  • Discards all pending messages
  • Releases memory resources
  • Removes the queue from the registry

Any subsequent operations on the queue will fail. Ensure all producers and consumers have finished before destroying.

DisposeAsync()

Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources asynchronously.

public ValueTask DisposeAsync()

Returns

ValueTask

A task that represents the asynchronous dispose operation.

GetCircuitBreakerState(string)

Gets the circuit breaker state for a specific kernel.

public CircuitBreakerState? GetCircuitBreakerState(string kernelId)

Parameters

kernelId string

The kernel identifier.

Returns

CircuitBreakerState?

The circuit breaker state, or null if watchdog is disabled or kernel not found.

GetMetricsAsync(string, CancellationToken)

Gets performance metrics for a ring kernel.

[UnconditionalSuppressMessage("Trimming", "IL2075:UnrecognizedReflectionPattern", Justification = "Queue type reflection is required for generic runtime operation")]
public Task<RingKernelMetrics> GetMetricsAsync(string kernelId, CancellationToken cancellationToken = default)

Parameters

kernelId string

Kernel identifier.

cancellationToken CancellationToken

Cancellation token.

Returns

Task<RingKernelMetrics>

Performance metrics including throughput and latency.

GetNamedMessageQueueAsync<T>(string, CancellationToken)

Gets an existing named message queue.

public Task<IMessageQueue<T>?> GetNamedMessageQueueAsync<T>(string queueName, CancellationToken cancellationToken = default) where T : IRingKernelMessage

Parameters

queueName string

Queue identifier.

cancellationToken CancellationToken

Cancellation token.

Returns

Task<IMessageQueue<T>>

The message queue if found; otherwise, null.

Type Parameters

T

Message type implementing IRingKernelMessage.

Remarks

Use this method to retrieve queues created by other components or to check queue existence before creation.

GetRegisteredAssemblies()

Gets all assemblies registered for ring kernel discovery.

public Assembly[] GetRegisteredAssemblies()

Returns

Assembly[]

Array of registered assemblies.

GetStatusAsync(string, CancellationToken)

Gets the current status of a ring kernel.

[UnconditionalSuppressMessage("Trimming", "IL2075:UnrecognizedReflectionPattern", Justification = "Queue type reflection is required for generic runtime operation")]
public Task<RingKernelStatus> GetStatusAsync(string kernelId, CancellationToken cancellationToken = default)

Parameters

kernelId string

Kernel identifier.

cancellationToken CancellationToken

Cancellation token.

Returns

Task<RingKernelStatus>

The kernel's current status.

GetTelemetryAsync(string, CancellationToken)

Gets the current real-time telemetry snapshot for a running ring kernel.

public Task<RingKernelTelemetry> GetTelemetryAsync(string kernelId, CancellationToken cancellationToken = default)

Parameters

kernelId string

Kernel identifier.

cancellationToken CancellationToken

Cancellation token.

Returns

Task<RingKernelTelemetry>

A snapshot of the kernel's current telemetry data.

Remarks

This is a zero-copy operation that reads telemetry directly from GPU-accessible memory (<1μs latency). The telemetry struct is updated atomically by the GPU kernel and polled by the CPU.

Unlike GetMetricsAsync which provides aggregated historical metrics, GetTelemetryAsync provides real-time counters for: - Messages processed/dropped - Current queue depth - Processing latency (min/max/avg) - Last error code - GPU timestamp of last processed message

Use this for: - Real-time health monitoring - Stuck kernel detection (via LastProcessedTimestamp) - Backpressure monitoring (via QueueDepth) - Performance profiling (via latency metrics)

Exceptions

ArgumentException

Thrown if the kernel ID is not found.

InvalidOperationException

Thrown if telemetry is not enabled for the kernel.

GetWatchdogStatistics()

Gets the current watchdog statistics.

public WatchdogStatistics? GetWatchdogStatistics()

Returns

WatchdogStatistics

Watchdog statistics, or null if watchdog is disabled.

LaunchAsync(string, int, int, RingKernelLaunchOptions?, CancellationToken)

Launches a ring kernel with specified grid and block dimensions and configuration options.

[RequiresDynamicCode("Ring kernel launch uses reflection for queue creation")]
[RequiresUnreferencedCode("Ring kernel runtime requires reflection to detect message types")]
public Task LaunchAsync(string kernelId, int gridSize, int blockSize, RingKernelLaunchOptions? options = null, CancellationToken cancellationToken = default)

Parameters

kernelId string

Unique kernel identifier.

gridSize int

Number of thread blocks in the grid.

blockSize int

Number of threads per block.

options RingKernelLaunchOptions

Launch options for queue configuration, or null to use production defaults.

cancellationToken CancellationToken

Cancellation token.

Returns

Task

A task representing the launch operation.

Remarks

Launch starts the kernel on the GPU but does not activate it. The kernel remains idle until ActivateAsync is called. This two-phase launch allows message queues and state to be initialized before the kernel begins processing.

Launch Options

Example: Custom Queue Configuration

var options = new RingKernelLaunchOptions
{
    QueueCapacity = 16384,
    DeduplicationWindowSize = 1024,
    BackpressureStrategy = BackpressureStrategy.Block
};
await runtime.LaunchAsync("MyKernel", gridSize: 1, blockSize: 256, options, ct);

Exceptions

ArgumentException

Thrown if kernel ID is not found or grid/block sizes are invalid.

ArgumentOutOfRangeException

Thrown if options validation fails (e.g., invalid queue capacity or deduplication window).

ListKernelsAsync()

Lists all ring kernels managed by this runtime.

public Task<IReadOnlyCollection<string>> ListKernelsAsync()

Returns

Task<IReadOnlyCollection<string>>

Collection of kernel IDs.

ListNamedMessageQueuesAsync(CancellationToken)

Lists all named message queues in the runtime.

public Task<IReadOnlyCollection<string>> ListNamedMessageQueuesAsync(CancellationToken cancellationToken = default)

Parameters

cancellationToken CancellationToken

Cancellation token.

Returns

Task<IReadOnlyCollection<string>>

A read-only collection of queue names.

ReadControlBlockAsync(string, CancellationToken)

Reads the control block from GPU memory for testing and debugging purposes.

public Task<RingKernelControlBlock> ReadControlBlockAsync(string kernelId, CancellationToken cancellationToken = default)

Parameters

kernelId string

The kernel identifier.

cancellationToken CancellationToken

Cancellation token.

Returns

Task<RingKernelControlBlock>

The control block read from GPU memory.

Exceptions

ArgumentException

Thrown when kernel is not found.

ReceiveFromNamedQueueAsync<T>(string, CancellationToken)

Receives a message from a named queue.

public Task<T?> ReceiveFromNamedQueueAsync<T>(string queueName, CancellationToken cancellationToken = default) where T : IRingKernelMessage

Parameters

queueName string

Source queue identifier.

cancellationToken CancellationToken

Cancellation token.

Returns

Task<T>

The next message in the queue, or null if the queue is empty.

Type Parameters

T

Message type implementing IRingKernelMessage.

Remarks

This is a non-blocking operation. For blocking waits, use GetNamedMessageQueueAsync and poll the queue directly.

Exceptions

ArgumentException

Thrown if queueName is not found.

ReceiveMessageAsync<T>(string, TimeSpan, CancellationToken)

Receives a message from a ring kernel's output queue.

public Task<KernelMessage<T>?> ReceiveMessageAsync<T>(string kernelId, TimeSpan timeout = default, CancellationToken cancellationToken = default) where T : unmanaged

Parameters

kernelId string

Source kernel identifier.

timeout TimeSpan

Maximum wait time for a message.

cancellationToken CancellationToken

Cancellation token.

Returns

Task<KernelMessage<T>?>

The received message, or null if the timeout expires.

Type Parameters

T

Message payload type (must be unmanaged).

Remarks

This method blocks until a message is available in the kernel's output queue or the timeout expires. Use for host-side monitoring or result collection.

RegisterAssembly(Assembly)

Registers an assembly containing ring kernel definitions for discovery during kernel launch.

public bool RegisterAssembly(Assembly assembly)

Parameters

assembly Assembly

The assembly containing ring kernel classes with [RingKernel] attributes.

Returns

bool

True if the assembly was newly registered; false if it was already registered.

ReportKernelActivity(string, long)

Reports activity from a kernel to the watchdog (resets stall detection). Call this when messages are processed to indicate the kernel is healthy.

public void ReportKernelActivity(string kernelId, long messagesProcessed = 0)

Parameters

kernelId string

The kernel identifier.

messagesProcessed long

Number of messages processed since last report.

ResetTelemetryAsync(string, CancellationToken)

Resets all telemetry counters for a ring kernel to initial values.

public Task ResetTelemetryAsync(string kernelId, CancellationToken cancellationToken = default)

Parameters

kernelId string

Kernel identifier.

cancellationToken CancellationToken

Cancellation token.

Returns

Task

A task that completes when telemetry is reset.

Remarks

This operation resets: - MessagesProcessed to 0 - MessagesDropped to 0 - LastProcessedTimestamp to 0 - TotalLatencyNanos to 0 - MaxLatencyNanos to 0 - MinLatencyNanos to ulong.MaxValue - ErrorCode to 0

QueueDepth is not reset as it reflects current state, not cumulative metrics.

Use this for: - Restarting measurements after configuration changes - Clearing telemetry between test runs - Benchmarking specific workload phases

Exceptions

ArgumentException

Thrown if the kernel ID is not found.

InvalidOperationException

Thrown if telemetry is not enabled for the kernel.

SendMessageAsync<T>(string, KernelMessage<T>, CancellationToken)

Sends a message to a ring kernel's input queue.

public Task SendMessageAsync<T>(string kernelId, KernelMessage<T> message, CancellationToken cancellationToken = default) where T : unmanaged

Parameters

kernelId string

Target kernel identifier.

message KernelMessage<T>

Message to send.

cancellationToken CancellationToken

Cancellation token.

Returns

Task

A task that completes when the message is enqueued.

Type Parameters

T

Message payload type (must be unmanaged).

Remarks

Messages are enqueued in the kernel's input queue and processed asynchronously. If the queue is full, this method blocks until space is available or the operation is canceled.

Exceptions

ArgumentException

Thrown if the kernel ID is not found.

SendToNamedQueueAsync<T>(string, T, CancellationToken)

Sends a message to a named queue.

public Task<bool> SendToNamedQueueAsync<T>(string queueName, T message, CancellationToken cancellationToken = default) where T : IRingKernelMessage

Parameters

queueName string

Target queue identifier.

message T

Message to send.

cancellationToken CancellationToken

Cancellation token.

Returns

Task<bool>

true if the message was successfully enqueued; otherwise, false.

Type Parameters

T

Message type implementing IRingKernelMessage.

Remarks

Behavior depends on the queue's backpressure strategy:

  • Block: Waits for space to become available
  • Reject: Returns false immediately if full
  • DropOldest: Removes oldest message to make space
  • DropNew: Silently discards the new message

This method is a convenience wrapper around GetNamedMessageQueueAsync followed by TryEnqueue. For high-throughput scenarios, cache the queue reference and enqueue directly.

Exceptions

ArgumentException

Thrown if queueName is not found.

SetTelemetryEnabledAsync(string, bool, CancellationToken)

Enables or disables real-time telemetry collection for a ring kernel.

public Task SetTelemetryEnabledAsync(string kernelId, bool enabled, CancellationToken cancellationToken = default)

Parameters

kernelId string

Kernel identifier.

enabled bool

True to enable telemetry; false to disable.

cancellationToken CancellationToken

Cancellation token.

Returns

Task

A task that completes when telemetry state is updated.

Remarks

Telemetry has minimal performance overhead (<50ns per message) but can be disabled for maximum performance in production scenarios where monitoring is not required.

When enabling telemetry on a running kernel: - Telemetry buffer is allocated if not already present - Counters are initialized to zero - Kernel begins updating telemetry on next message

When disabling telemetry: - Kernel stops updating telemetry counters - Telemetry buffer remains allocated (can be re-enabled) - Last telemetry snapshot remains readable

Exceptions

ArgumentException

Thrown if the kernel ID is not found.

TerminateAsync(string, CancellationToken)

Terminates a ring kernel and cleans up resources.

public Task TerminateAsync(string kernelId, CancellationToken cancellationToken = default)

Parameters

kernelId string

Kernel identifier.

cancellationToken CancellationToken

Cancellation token.

Returns

Task

A task representing the termination operation.

Remarks

Termination sends a terminate message to the kernel, waits for graceful shutdown, and frees all associated resources (message queues, state buffers, etc.). The kernel must be relaunched to execute again.