Class MessageQueueBridge<T>
- Namespace
- DotCompute.Core.Messaging
- Assembly
- DotCompute.Core.dll
Bridges host-side named message queues to GPU-resident Span buffers with bidirectional support.
public sealed class MessageQueueBridge<T> : IAsyncDisposable where T : IRingKernelMessage, new()
Type Parameters
TThe message type implementing IRingKernelMessage.
- Inheritance
-
MessageQueueBridge<T>
- Implements
- Inherited Members
Remarks
Architecture (Bidirectional):
Host → Device (HostToDevice or Bidirectional):
User → SendToNamedQueueAsync() → NamedQueue (CPU)
↓
PumpThread (background)
↓
Serialize → PinnedStagingBuffer
↓
Batch DMA Transfer (cuMemcpy/clWrite/MTLCopy)
↓
GpuResidentQueue (Span<byte>) ← Kernel polls
Device → Host (DeviceToHost or Bidirectional):
Kernel writes → GpuResidentQueue (Span<byte>)
↓
PumpThread (background)
↓
Batch DMA Transfer (cuMemcpyDtoH/clRead/MTLCopy)
↓
Deserialize → NamedQueue (CPU)
↓
User ← ReceiveFromNamedQueueAsync()
Performance Optimizations: - Lock-free staging buffer for multi-producer scalability - Adaptive polling: Busy-wait (10ns) → Yield (100ns) → Sleep (1ms) backoff - Batch DMA transfers: 1-512 messages per transfer (configurable) - Zero-copy via pinned memory - Sub-microsecond serialization with DefaultMessageSerializer
Backpressure Handling: - Block: Pump thread blocks until GPU queue has space - Reject: Named queue rejects new messages when staging buffer is full - DropOldest: Named queue drops oldest message to make space - DropNew: Named queue silently drops new message when staging full
Constructors
MessageQueueBridge(IMessageQueue<T>, BridgeDirection, Func<ReadOnlyMemory<byte>, Task<bool>>?, Func<Memory<byte>, Task<int>>?, MessageQueueOptions, IMessageSerializer<T>?, ILogger?)
Initializes a new instance of the MessageQueueBridge<T> class with specified direction.
public MessageQueueBridge(IMessageQueue<T> namedQueue, BridgeDirection direction, Func<ReadOnlyMemory<byte>, Task<bool>>? hostToDeviceFunc, Func<Memory<byte>, Task<int>>? deviceToHostFunc, MessageQueueOptions options, IMessageSerializer<T>? serializer = null, ILogger? logger = null)
Parameters
namedQueueIMessageQueue<T>The host-side named message queue.
directionBridgeDirectionThe direction of message flow.
hostToDeviceFuncFunc<ReadOnlyMemory<byte>, Task<bool>>Function to transfer messages from host to device (required for HostToDevice and Bidirectional). Takes a ReadOnlyMemory<byte> of serialized messages and returns true if successful.
deviceToHostFuncFunc<Memory<byte>, Task<int>>Function to transfer messages from device to host (required for DeviceToHost and Bidirectional). Takes a Memory<byte> buffer and returns the number of bytes read.
optionsMessageQueueOptionsQueue configuration options.
serializerIMessageSerializer<T>Message serializer (optional, uses default JSON serializer if null).
loggerILoggerLogger instance (optional).
MessageQueueBridge(IMessageQueue<T>, Func<ReadOnlyMemory<byte>, Task<bool>>, MessageQueueOptions, IMessageSerializer<T>?, ILogger?)
Initializes a new instance of the MessageQueueBridge<T> class for Host → Device transfers.
public MessageQueueBridge(IMessageQueue<T> namedQueue, Func<ReadOnlyMemory<byte>, Task<bool>> gpuTransferFunc, MessageQueueOptions options, IMessageSerializer<T>? serializer = null, ILogger? logger = null)
Parameters
namedQueueIMessageQueue<T>The host-side named message queue.
gpuTransferFuncFunc<ReadOnlyMemory<byte>, Task<bool>>Function to transfer a batch of serialized messages to GPU. Takes a ReadOnlyMemory<byte> of serialized messages and returns true if successful.
optionsMessageQueueOptionsQueue configuration options.
serializerIMessageSerializer<T>Message serializer (optional, uses default JSON serializer if null).
loggerILoggerLogger instance (optional).
Properties
BytesTransferred
Gets the total number of bytes transferred to GPU.
public long BytesTransferred { get; }
Property Value
GarbageMessages
Gets the total number of garbage messages detected (uninitialized memory).
public long GarbageMessages { get; }
Property Value
MegabytesPerSecond
Gets the average throughput in megabytes per second.
public double MegabytesPerSecond { get; }
Property Value
MessagesDropped
Gets the total number of messages dropped due to backpressure.
public long MessagesDropped { get; }
Property Value
MessagesPerSecond
Gets the average transfer rate in messages per second.
public double MessagesPerSecond { get; }
Property Value
MessagesSerialized
Gets the total number of messages serialized to the staging buffer.
public long MessagesSerialized { get; }
Property Value
MessagesTransferred
Gets the total number of messages successfully transferred to GPU.
public long MessagesTransferred { get; }
Property Value
NamedQueue
Gets the host-side named message queue for enqueue operations.
public IMessageQueue<T> NamedQueue { get; }
Property Value
PollAttempts
Gets the total number of device-to-host poll attempts.
public long PollAttempts { get; }
Property Value
SuccessfulReads
Gets the total number of successful reads from device buffer.
public long SuccessfulReads { get; }
Property Value
Uptime
Gets the bridge uptime.
public TimeSpan Uptime { get; }
Property Value
ValidationFailures
Gets the total number of message validation failures.
public long ValidationFailures { get; }
Property Value
Methods
DisposeAsync()
Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources asynchronously.
public ValueTask DisposeAsync()
Returns
- ValueTask
A task that represents the asynchronous dispose operation.