Class P2PMessageQueue<T>
- Namespace
- DotCompute.Core.Messaging
- Assembly
- DotCompute.Core.dll
High-performance P2P message queue for multi-GPU communication.
public sealed class P2PMessageQueue<T> : IP2PMessageQueue<T>, IAsyncDisposable where T : unmanaged
Type Parameters
TThe message type (must be unmanaged).
- Inheritance
-
P2PMessageQueue<T>
- Implements
- Inherited Members
Remarks
Uses a lock-free ring buffer with GPU-resident memory for minimal latency message passing between devices. Supports both direct P2P transfers (NVLink/PCIe) and host-staged transfers for cross-topology communication.
Performance Characteristics:
- Direct P2P: ~1-5 μs latency with NVLink
- Host-staged: ~10-50 μs latency via PCIe
- Zero-copy batch transfers when possible
Constructors
P2PMessageQueue(IAccelerator, IAccelerator, P2PMessageQueueOptions, ILogger, bool)
Initializes a new P2P message queue between two devices.
public P2PMessageQueue(IAccelerator sourceDevice, IAccelerator destinationDevice, P2PMessageQueueOptions options, ILogger logger, bool isDirectP2PAvailable = false)
Parameters
sourceDeviceIAcceleratorThe source (sending) device.
destinationDeviceIAcceleratorThe destination (receiving) device.
optionsP2PMessageQueueOptionsQueue configuration options.
loggerILoggerLogger instance.
isDirectP2PAvailableboolWhether direct P2P is available between devices.
Properties
Capacity
Gets the maximum capacity of the queue.
public int Capacity { get; }
Property Value
Count
Gets the current number of messages in the queue.
public int Count { get; }
Property Value
CurrentTransferMode
Gets the current transfer mode being used.
public P2PTransferMode CurrentTransferMode { get; }
Property Value
DestinationDeviceId
Gets the destination device identifier.
public string DestinationDeviceId { get; }
Property Value
IsDirectP2PAvailable
Gets a value indicating whether direct P2P is available for this queue.
public bool IsDirectP2PAvailable { get; }
Property Value
IsEmpty
Gets a value indicating whether the queue is empty.
public bool IsEmpty { get; }
Property Value
IsFull
Gets a value indicating whether the queue is full.
public bool IsFull { get; }
Property Value
SourceDeviceId
Gets the source device identifier.
public string SourceDeviceId { get; }
Property Value
Methods
ClearAsync(CancellationToken)
Clears all messages from the queue.
public ValueTask ClearAsync(CancellationToken cancellationToken = default)
Parameters
cancellationTokenCancellationTokenCancellation token.
Returns
DisposeAsync()
Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources asynchronously.
public ValueTask DisposeAsync()
Returns
- ValueTask
A task that represents the asynchronous dispose operation.
GetStatistics()
Gets queue statistics.
public P2PQueueStatistics GetStatistics()
Returns
ReceiveAsync(CancellationToken)
Receives a message from the source device.
public ValueTask<P2PReceiveResult<T>> ReceiveAsync(CancellationToken cancellationToken = default)
Parameters
cancellationTokenCancellationTokenCancellation token.
Returns
- ValueTask<P2PReceiveResult<T>>
The received message, or null if the queue is empty.
ReceiveBatchAsync(int, CancellationToken)
Receives multiple messages from the source device.
public ValueTask<P2PBatchReceiveResult<T>> ReceiveBatchAsync(int maxMessages, CancellationToken cancellationToken = default)
Parameters
maxMessagesintMaximum number of messages to receive.
cancellationTokenCancellationTokenCancellation token.
Returns
- ValueTask<P2PBatchReceiveResult<T>>
The received messages.
SendAsync(T, CancellationToken)
Sends a message to the destination device.
public ValueTask<P2PSendResult> SendAsync(T message, CancellationToken cancellationToken = default)
Parameters
messageTThe message to send.
cancellationTokenCancellationTokenCancellation token.
Returns
- ValueTask<P2PSendResult>
The result of the send operation.
SendBatchAsync(ReadOnlyMemory<T>, CancellationToken)
Sends multiple messages to the destination device in a batch.
public ValueTask<P2PBatchSendResult> SendBatchAsync(ReadOnlyMemory<T> messages, CancellationToken cancellationToken = default)
Parameters
messagesReadOnlyMemory<T>The messages to send.
cancellationTokenCancellationTokenCancellation token.
Returns
- ValueTask<P2PBatchSendResult>
The result of the batch send operation.
TryPeek(out T)
Attempts to peek at the next message without removing it.
public bool TryPeek(out T message)
Parameters
messageTThe peeked message if available.
Returns
- bool
True if a message is available; otherwise, false.