Class CudaRingKernelRuntime
- Namespace
- DotCompute.Backends.CUDA.RingKernels
- Assembly
- DotCompute.Backends.CUDA.dll
CUDA runtime for managing persistent ring kernels.
public sealed class CudaRingKernelRuntime : IRingKernelRuntime, IAsyncDisposable
- Inheritance
-
CudaRingKernelRuntime
- Implements
- Inherited Members
Remarks
Provides lifecycle management for CUDA-based ring kernels including:
- Kernel launch with cooperative groups
- Activation/deactivation control
- Message routing and queue management
- Status monitoring and metrics collection
Constructors
CudaRingKernelRuntime(ILogger<CudaRingKernelRuntime>, CudaRingKernelCompiler, MessageQueueRegistry, RingKernelFaultRecoveryOptions?)
Initializes a new instance of the CudaRingKernelRuntime class.
public CudaRingKernelRuntime(ILogger<CudaRingKernelRuntime> logger, CudaRingKernelCompiler compiler, MessageQueueRegistry registry, RingKernelFaultRecoveryOptions? faultRecoveryOptions = null)
Parameters
loggerILogger<CudaRingKernelRuntime>Logger instance.
compilerCudaRingKernelCompilerRing kernel compiler.
registryMessageQueueRegistryMessage queue registry for named queues.
faultRecoveryOptionsRingKernelFaultRecoveryOptionsOptional fault recovery configuration.
Properties
Watchdog
Gets the watchdog instance for this runtime.
public RingKernelWatchdog? Watchdog { get; }
Property Value
Methods
ActivateAsync(string, CancellationToken)
Activates a launched ring kernel to begin processing.
public Task ActivateAsync(string kernelId, CancellationToken cancellationToken = default)
Parameters
kernelIdstringKernel identifier.
cancellationTokenCancellationTokenCancellation token.
Returns
- Task
A task representing the activation operation.
Remarks
For persistent kernels, activation causes the kernel to begin its main processing loop. For event-driven kernels, it enables event handling. The kernel continues running until deactivated or terminated.
Exceptions
- InvalidOperationException
Thrown if the kernel has not been launched or is already active.
CreateMessageQueueAsync<T>(int, CancellationToken)
Creates a message queue for inter-kernel communication.
public Task<IMessageQueue<T>> CreateMessageQueueAsync<T>(int capacity, CancellationToken cancellationToken = default) where T : unmanaged
Parameters
capacityintQueue capacity (must be power of 2).
cancellationTokenCancellationTokenCancellation token.
Returns
- Task<IMessageQueue<T>>
An initialized message queue.
Type Parameters
TMessage payload type.
CreateNamedMessageQueueAsync<T>(string, MessageQueueOptions, CancellationToken)
Creates a named message queue with advanced configuration options.
public Task<IMessageQueue<T>> CreateNamedMessageQueueAsync<T>(string queueName, MessageQueueOptions options, CancellationToken cancellationToken = default) where T : IRingKernelMessage
Parameters
queueNamestringUnique queue identifier.
optionsMessageQueueOptionsQueue configuration options.
cancellationTokenCancellationTokenCancellation token.
Returns
- Task<IMessageQueue<T>>
An initialized message queue.
Type Parameters
TMessage type implementing IRingKernelMessage.
Remarks
Named queues support:
- Priority-based ordering
- Message deduplication
- Multiple backpressure strategies
- Timeout-based message expiration
- Lock-free concurrent access
Queue names must be unique within the runtime. Attempting to create a queue with an existing name throws InvalidOperationException.
Exceptions
- ArgumentException
Thrown if queueName is null or empty.
- InvalidOperationException
Thrown if a queue with the specified name already exists.
DeactivateAsync(string, CancellationToken)
Deactivates a ring kernel (pause without termination).
public Task DeactivateAsync(string kernelId, CancellationToken cancellationToken = default)
Parameters
kernelIdstringKernel identifier.
cancellationTokenCancellationTokenCancellation token.
Returns
- Task
A task representing the deactivation operation.
Remarks
Deactivation pauses the kernel's processing loop but keeps it resident on the GPU. State is preserved and the kernel can be reactivated without relaunching. Useful for dynamic load balancing or power management.
Exceptions
- InvalidOperationException
Thrown if the kernel is not currently active.
DestroyNamedMessageQueueAsync(string, CancellationToken)
Destroys a named message queue and releases resources.
public Task<bool> DestroyNamedMessageQueueAsync(string queueName, CancellationToken cancellationToken = default)
Parameters
queueNamestringQueue identifier to destroy.
cancellationTokenCancellationTokenCancellation token.
Returns
Remarks
Destroying a queue:
- Discards all pending messages
- Releases memory resources
- Removes the queue from the registry
Any subsequent operations on the queue will fail. Ensure all producers and consumers have finished before destroying.
DisposeAsync()
Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources asynchronously.
public ValueTask DisposeAsync()
Returns
- ValueTask
A task that represents the asynchronous dispose operation.
GetCircuitBreakerState(string)
Gets the circuit breaker state for a specific kernel.
public CircuitBreakerState? GetCircuitBreakerState(string kernelId)
Parameters
kernelIdstringThe kernel identifier.
Returns
- CircuitBreakerState?
The circuit breaker state, or null if watchdog is disabled or kernel not found.
GetMetricsAsync(string, CancellationToken)
Gets performance metrics for a ring kernel.
[UnconditionalSuppressMessage("Trimming", "IL2075:UnrecognizedReflectionPattern", Justification = "Queue type reflection is required for generic runtime operation")]
public Task<RingKernelMetrics> GetMetricsAsync(string kernelId, CancellationToken cancellationToken = default)
Parameters
kernelIdstringKernel identifier.
cancellationTokenCancellationTokenCancellation token.
Returns
- Task<RingKernelMetrics>
Performance metrics including throughput and latency.
GetNamedMessageQueueAsync<T>(string, CancellationToken)
Gets an existing named message queue.
public Task<IMessageQueue<T>?> GetNamedMessageQueueAsync<T>(string queueName, CancellationToken cancellationToken = default) where T : IRingKernelMessage
Parameters
queueNamestringQueue identifier.
cancellationTokenCancellationTokenCancellation token.
Returns
- Task<IMessageQueue<T>>
The message queue if found; otherwise, null.
Type Parameters
TMessage type implementing IRingKernelMessage.
Remarks
Use this method to retrieve queues created by other components or to check queue existence before creation.
GetRegisteredAssemblies()
Gets all assemblies registered for ring kernel discovery.
public Assembly[] GetRegisteredAssemblies()
Returns
- Assembly[]
Array of registered assemblies.
GetStatusAsync(string, CancellationToken)
Gets the current status of a ring kernel.
[UnconditionalSuppressMessage("Trimming", "IL2075:UnrecognizedReflectionPattern", Justification = "Queue type reflection is required for generic runtime operation")]
public Task<RingKernelStatus> GetStatusAsync(string kernelId, CancellationToken cancellationToken = default)
Parameters
kernelIdstringKernel identifier.
cancellationTokenCancellationTokenCancellation token.
Returns
- Task<RingKernelStatus>
The kernel's current status.
GetTelemetryAsync(string, CancellationToken)
Gets the current real-time telemetry snapshot for a running ring kernel.
public Task<RingKernelTelemetry> GetTelemetryAsync(string kernelId, CancellationToken cancellationToken = default)
Parameters
kernelIdstringKernel identifier.
cancellationTokenCancellationTokenCancellation token.
Returns
- Task<RingKernelTelemetry>
A snapshot of the kernel's current telemetry data.
Remarks
This is a zero-copy operation that reads telemetry directly from GPU-accessible memory (<1μs latency). The telemetry struct is updated atomically by the GPU kernel and polled by the CPU.
Unlike GetMetricsAsync which provides aggregated historical metrics, GetTelemetryAsync provides real-time counters for: - Messages processed/dropped - Current queue depth - Processing latency (min/max/avg) - Last error code - GPU timestamp of last processed message
Use this for: - Real-time health monitoring - Stuck kernel detection (via LastProcessedTimestamp) - Backpressure monitoring (via QueueDepth) - Performance profiling (via latency metrics)
Exceptions
- ArgumentException
Thrown if the kernel ID is not found.
- InvalidOperationException
Thrown if telemetry is not enabled for the kernel.
GetWatchdogStatistics()
Gets the current watchdog statistics.
public WatchdogStatistics? GetWatchdogStatistics()
Returns
- WatchdogStatistics
Watchdog statistics, or null if watchdog is disabled.
LaunchAsync(string, int, int, RingKernelLaunchOptions?, CancellationToken)
Launches a ring kernel with specified grid and block dimensions and configuration options.
[RequiresDynamicCode("Ring kernel launch uses reflection for queue creation")]
[RequiresUnreferencedCode("Ring kernel runtime requires reflection to detect message types")]
public Task LaunchAsync(string kernelId, int gridSize, int blockSize, RingKernelLaunchOptions? options = null, CancellationToken cancellationToken = default)
Parameters
kernelIdstringUnique kernel identifier.
gridSizeintNumber of thread blocks in the grid.
blockSizeintNumber of threads per block.
optionsRingKernelLaunchOptionsLaunch options for queue configuration, or null to use production defaults.
cancellationTokenCancellationTokenCancellation token.
Returns
- Task
A task representing the launch operation.
Remarks
Launch starts the kernel on the GPU but does not activate it. The kernel remains idle until ActivateAsync is called. This two-phase launch allows message queues and state to be initialized before the kernel begins processing.
Launch Options
- null: Uses ProductionDefaults()
- Custom: Configure queue capacity, deduplication, backpressure
Example: Custom Queue Configuration
var options = new RingKernelLaunchOptions
{
QueueCapacity = 16384,
DeduplicationWindowSize = 1024,
BackpressureStrategy = BackpressureStrategy.Block
};
await runtime.LaunchAsync("MyKernel", gridSize: 1, blockSize: 256, options, ct);
Exceptions
- ArgumentException
Thrown if kernel ID is not found or grid/block sizes are invalid.
- ArgumentOutOfRangeException
Thrown if options validation fails (e.g., invalid queue capacity or deduplication window).
ListKernelsAsync()
Lists all ring kernels managed by this runtime.
public Task<IReadOnlyCollection<string>> ListKernelsAsync()
Returns
- Task<IReadOnlyCollection<string>>
Collection of kernel IDs.
ListNamedMessageQueuesAsync(CancellationToken)
Lists all named message queues in the runtime.
public Task<IReadOnlyCollection<string>> ListNamedMessageQueuesAsync(CancellationToken cancellationToken = default)
Parameters
cancellationTokenCancellationTokenCancellation token.
Returns
- Task<IReadOnlyCollection<string>>
A read-only collection of queue names.
ReadControlBlockAsync(string, CancellationToken)
Reads the control block from GPU memory for testing and debugging purposes.
public Task<RingKernelControlBlock> ReadControlBlockAsync(string kernelId, CancellationToken cancellationToken = default)
Parameters
kernelIdstringThe kernel identifier.
cancellationTokenCancellationTokenCancellation token.
Returns
- Task<RingKernelControlBlock>
The control block read from GPU memory.
Exceptions
- ArgumentException
Thrown when kernel is not found.
ReceiveFromNamedQueueAsync<T>(string, CancellationToken)
Receives a message from a named queue.
public Task<T?> ReceiveFromNamedQueueAsync<T>(string queueName, CancellationToken cancellationToken = default) where T : IRingKernelMessage
Parameters
queueNamestringSource queue identifier.
cancellationTokenCancellationTokenCancellation token.
Returns
- Task<T>
The next message in the queue, or null if the queue is empty.
Type Parameters
TMessage type implementing IRingKernelMessage.
Remarks
This is a non-blocking operation. For blocking waits, use GetNamedMessageQueueAsync and poll the queue directly.
Exceptions
- ArgumentException
Thrown if queueName is not found.
ReceiveMessageAsync<T>(string, TimeSpan, CancellationToken)
Receives a message from a ring kernel's output queue.
public Task<KernelMessage<T>?> ReceiveMessageAsync<T>(string kernelId, TimeSpan timeout = default, CancellationToken cancellationToken = default) where T : unmanaged
Parameters
kernelIdstringSource kernel identifier.
timeoutTimeSpanMaximum wait time for a message.
cancellationTokenCancellationTokenCancellation token.
Returns
- Task<KernelMessage<T>?>
The received message, or null if the timeout expires.
Type Parameters
TMessage payload type (must be unmanaged).
Remarks
This method blocks until a message is available in the kernel's output queue or the timeout expires. Use for host-side monitoring or result collection.
RegisterAssembly(Assembly)
Registers an assembly containing ring kernel definitions for discovery during kernel launch.
public bool RegisterAssembly(Assembly assembly)
Parameters
assemblyAssemblyThe assembly containing ring kernel classes with [RingKernel] attributes.
Returns
- bool
True if the assembly was newly registered; false if it was already registered.
ReportKernelActivity(string, long)
Reports activity from a kernel to the watchdog (resets stall detection). Call this when messages are processed to indicate the kernel is healthy.
public void ReportKernelActivity(string kernelId, long messagesProcessed = 0)
Parameters
kernelIdstringThe kernel identifier.
messagesProcessedlongNumber of messages processed since last report.
ResetTelemetryAsync(string, CancellationToken)
Resets all telemetry counters for a ring kernel to initial values.
public Task ResetTelemetryAsync(string kernelId, CancellationToken cancellationToken = default)
Parameters
kernelIdstringKernel identifier.
cancellationTokenCancellationTokenCancellation token.
Returns
- Task
A task that completes when telemetry is reset.
Remarks
This operation resets: - MessagesProcessed to 0 - MessagesDropped to 0 - LastProcessedTimestamp to 0 - TotalLatencyNanos to 0 - MaxLatencyNanos to 0 - MinLatencyNanos to ulong.MaxValue - ErrorCode to 0
QueueDepth is not reset as it reflects current state, not cumulative metrics.
Use this for: - Restarting measurements after configuration changes - Clearing telemetry between test runs - Benchmarking specific workload phases
Exceptions
- ArgumentException
Thrown if the kernel ID is not found.
- InvalidOperationException
Thrown if telemetry is not enabled for the kernel.
SendMessageAsync<T>(string, KernelMessage<T>, CancellationToken)
Sends a message to a ring kernel's input queue.
public Task SendMessageAsync<T>(string kernelId, KernelMessage<T> message, CancellationToken cancellationToken = default) where T : unmanaged
Parameters
kernelIdstringTarget kernel identifier.
messageKernelMessage<T>Message to send.
cancellationTokenCancellationTokenCancellation token.
Returns
- Task
A task that completes when the message is enqueued.
Type Parameters
TMessage payload type (must be unmanaged).
Remarks
Messages are enqueued in the kernel's input queue and processed asynchronously. If the queue is full, this method blocks until space is available or the operation is canceled.
Exceptions
- ArgumentException
Thrown if the kernel ID is not found.
SendToNamedQueueAsync<T>(string, T, CancellationToken)
Sends a message to a named queue.
public Task<bool> SendToNamedQueueAsync<T>(string queueName, T message, CancellationToken cancellationToken = default) where T : IRingKernelMessage
Parameters
queueNamestringTarget queue identifier.
messageTMessage to send.
cancellationTokenCancellationTokenCancellation token.
Returns
Type Parameters
TMessage type implementing IRingKernelMessage.
Remarks
Behavior depends on the queue's backpressure strategy:
- Block: Waits for space to become available
- Reject: Returns false immediately if full
- DropOldest: Removes oldest message to make space
- DropNew: Silently discards the new message
This method is a convenience wrapper around GetNamedMessageQueueAsync followed by TryEnqueue. For high-throughput scenarios, cache the queue reference and enqueue directly.
Exceptions
- ArgumentException
Thrown if queueName is not found.
SetTelemetryEnabledAsync(string, bool, CancellationToken)
Enables or disables real-time telemetry collection for a ring kernel.
public Task SetTelemetryEnabledAsync(string kernelId, bool enabled, CancellationToken cancellationToken = default)
Parameters
kernelIdstringKernel identifier.
enabledboolTrue to enable telemetry; false to disable.
cancellationTokenCancellationTokenCancellation token.
Returns
- Task
A task that completes when telemetry state is updated.
Remarks
Telemetry has minimal performance overhead (<50ns per message) but can be disabled for maximum performance in production scenarios where monitoring is not required.
When enabling telemetry on a running kernel: - Telemetry buffer is allocated if not already present - Counters are initialized to zero - Kernel begins updating telemetry on next message
When disabling telemetry: - Kernel stops updating telemetry counters - Telemetry buffer remains allocated (can be re-enabled) - Last telemetry snapshot remains readable
Exceptions
- ArgumentException
Thrown if the kernel ID is not found.
TerminateAsync(string, CancellationToken)
Terminates a ring kernel and cleans up resources.
public Task TerminateAsync(string kernelId, CancellationToken cancellationToken = default)
Parameters
kernelIdstringKernel identifier.
cancellationTokenCancellationTokenCancellation token.
Returns
- Task
A task representing the termination operation.
Remarks
Termination sends a terminate message to the kernel, waits for graceful shutdown, and frees all associated resources (message queues, state buffers, etc.). The kernel must be relaunched to execute again.