Table of Contents

Class MessageQueueBridge<T>

Namespace
DotCompute.Core.Messaging
Assembly
DotCompute.Core.dll

Bridges host-side named message queues to GPU-resident Span buffers with bidirectional support.

public sealed class MessageQueueBridge<T> : IAsyncDisposable where T : IRingKernelMessage, new()

Type Parameters

T

The message type implementing IRingKernelMessage.

Inheritance
MessageQueueBridge<T>
Implements
Inherited Members

Remarks

Architecture (Bidirectional):

Host → Device (HostToDevice or Bidirectional):
User → SendToNamedQueueAsync() → NamedQueue (CPU)
                                       ↓
                                  PumpThread (background)
                                       ↓
                                  Serialize → PinnedStagingBuffer
                                       ↓
                                  Batch DMA Transfer (cuMemcpy/clWrite/MTLCopy)
                                       ↓
                                  GpuResidentQueue (Span<byte>) ← Kernel polls

Device → Host (DeviceToHost or Bidirectional):
Kernel writes → GpuResidentQueue (Span<byte>)
                                       ↓
                                  PumpThread (background)
                                       ↓
                                  Batch DMA Transfer (cuMemcpyDtoH/clRead/MTLCopy)
                                       ↓
                                  Deserialize → NamedQueue (CPU)
                                       ↓
                                  User ← ReceiveFromNamedQueueAsync()

Performance Optimizations: - Lock-free staging buffer for multi-producer scalability - Adaptive polling: Busy-wait (10ns) → Yield (100ns) → Sleep (1ms) backoff - Batch DMA transfers: 1-512 messages per transfer (configurable) - Zero-copy via pinned memory - Sub-microsecond serialization with DefaultMessageSerializer

Backpressure Handling: - Block: Pump thread blocks until GPU queue has space - Reject: Named queue rejects new messages when staging buffer is full - DropOldest: Named queue drops oldest message to make space - DropNew: Named queue silently drops new message when staging full

Constructors

MessageQueueBridge(IMessageQueue<T>, BridgeDirection, Func<ReadOnlyMemory<byte>, Task<bool>>?, Func<Memory<byte>, Task<int>>?, MessageQueueOptions, IMessageSerializer<T>?, ILogger?)

Initializes a new instance of the MessageQueueBridge<T> class with specified direction.

public MessageQueueBridge(IMessageQueue<T> namedQueue, BridgeDirection direction, Func<ReadOnlyMemory<byte>, Task<bool>>? hostToDeviceFunc, Func<Memory<byte>, Task<int>>? deviceToHostFunc, MessageQueueOptions options, IMessageSerializer<T>? serializer = null, ILogger? logger = null)

Parameters

namedQueue IMessageQueue<T>

The host-side named message queue.

direction BridgeDirection

The direction of message flow.

hostToDeviceFunc Func<ReadOnlyMemory<byte>, Task<bool>>

Function to transfer messages from host to device (required for HostToDevice and Bidirectional). Takes a ReadOnlyMemory<byte> of serialized messages and returns true if successful.

deviceToHostFunc Func<Memory<byte>, Task<int>>

Function to transfer messages from device to host (required for DeviceToHost and Bidirectional). Takes a Memory<byte> buffer and returns the number of bytes read.

options MessageQueueOptions

Queue configuration options.

serializer IMessageSerializer<T>

Message serializer (optional, uses default JSON serializer if null).

logger ILogger

Logger instance (optional).

MessageQueueBridge(IMessageQueue<T>, Func<ReadOnlyMemory<byte>, Task<bool>>, MessageQueueOptions, IMessageSerializer<T>?, ILogger?)

Initializes a new instance of the MessageQueueBridge<T> class for Host → Device transfers.

public MessageQueueBridge(IMessageQueue<T> namedQueue, Func<ReadOnlyMemory<byte>, Task<bool>> gpuTransferFunc, MessageQueueOptions options, IMessageSerializer<T>? serializer = null, ILogger? logger = null)

Parameters

namedQueue IMessageQueue<T>

The host-side named message queue.

gpuTransferFunc Func<ReadOnlyMemory<byte>, Task<bool>>

Function to transfer a batch of serialized messages to GPU. Takes a ReadOnlyMemory<byte> of serialized messages and returns true if successful.

options MessageQueueOptions

Queue configuration options.

serializer IMessageSerializer<T>

Message serializer (optional, uses default JSON serializer if null).

logger ILogger

Logger instance (optional).

Properties

BytesTransferred

Gets the total number of bytes transferred to GPU.

public long BytesTransferred { get; }

Property Value

long

GarbageMessages

Gets the total number of garbage messages detected (uninitialized memory).

public long GarbageMessages { get; }

Property Value

long

MegabytesPerSecond

Gets the average throughput in megabytes per second.

public double MegabytesPerSecond { get; }

Property Value

double

MessagesDropped

Gets the total number of messages dropped due to backpressure.

public long MessagesDropped { get; }

Property Value

long

MessagesPerSecond

Gets the average transfer rate in messages per second.

public double MessagesPerSecond { get; }

Property Value

double

MessagesSerialized

Gets the total number of messages serialized to the staging buffer.

public long MessagesSerialized { get; }

Property Value

long

MessagesTransferred

Gets the total number of messages successfully transferred to GPU.

public long MessagesTransferred { get; }

Property Value

long

NamedQueue

Gets the host-side named message queue for enqueue operations.

public IMessageQueue<T> NamedQueue { get; }

Property Value

IMessageQueue<T>

PollAttempts

Gets the total number of device-to-host poll attempts.

public long PollAttempts { get; }

Property Value

long

SuccessfulReads

Gets the total number of successful reads from device buffer.

public long SuccessfulReads { get; }

Property Value

long

Uptime

Gets the bridge uptime.

public TimeSpan Uptime { get; }

Property Value

TimeSpan

ValidationFailures

Gets the total number of message validation failures.

public long ValidationFailures { get; }

Property Value

long

Methods

DisposeAsync()

Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources asynchronously.

public ValueTask DisposeAsync()

Returns

ValueTask

A task that represents the asynchronous dispose operation.