Table of Contents

Class P2PMessageQueueFactory

Namespace
DotCompute.Core.Messaging
Assembly
DotCompute.Core.dll

Factory for creating P2P message queues between GPU devices.

public sealed class P2PMessageQueueFactory : IAsyncDisposable
Inheritance
P2PMessageQueueFactory
Implements
Inherited Members

Remarks

Integrates with the P2P capability detection infrastructure to automatically determine the optimal transfer mode between devices. Caches P2P capability information to avoid repeated detection overhead.

Usage:

var factory = new P2PMessageQueueFactory(logger, capabilityDetector);

// Initialize topology for all devices
await factory.InitializeTopologyAsync(devices);

// Create queue between two devices
var queue = await factory.CreateQueueAsync<MyMessage>(device0, device1);

Constructors

P2PMessageQueueFactory(ILogger, P2PCapabilityDetector, P2PMessageQueueOptions?)

Initializes a new P2P message queue factory.

public P2PMessageQueueFactory(ILogger logger, P2PCapabilityDetector capabilityDetector, P2PMessageQueueOptions? defaultOptions = null)

Parameters

logger ILogger

Logger instance.

capabilityDetector P2PCapabilityDetector

P2P capability detector.

defaultOptions P2PMessageQueueOptions

Default queue options.

Properties

ActiveQueueCount

Gets the number of active P2P queues.

public int ActiveQueueCount { get; }

Property Value

int

IsTopologyInitialized

Gets whether topology has been initialized.

public bool IsTopologyInitialized { get; }

Property Value

bool

Methods

CreateBidirectionalQueuesAsync<T>(IAccelerator, IAccelerator, P2PMessageQueueOptions?, CancellationToken)

Creates a bidirectional P2P queue pair between two devices.

public ValueTask<(IP2PMessageQueue<T> Forward, IP2PMessageQueue<T> Reverse)> CreateBidirectionalQueuesAsync<T>(IAccelerator device1, IAccelerator device2, P2PMessageQueueOptions? options = null, CancellationToken cancellationToken = default) where T : unmanaged

Parameters

device1 IAccelerator

First device.

device2 IAccelerator

Second device.

options P2PMessageQueueOptions

Optional queue configuration.

cancellationToken CancellationToken

Cancellation token.

Returns

ValueTask<(IP2PMessageQueue<T> Forward, IP2PMessageQueue<T> Reverse)>

A tuple of (device1 to device2, device2 to device1) queues.

Type Parameters

T

The message type.

CreateMeshAsync<T>(IAccelerator[], P2PMessageQueueOptions?, CancellationToken)

Creates an all-to-all mesh of P2P queues.

public ValueTask<Dictionary<(string, string), IP2PMessageQueue<T>>> CreateMeshAsync<T>(IAccelerator[] devices, P2PMessageQueueOptions? options = null, CancellationToken cancellationToken = default) where T : unmanaged

Parameters

devices IAccelerator[]

The devices to connect in a mesh.

options P2PMessageQueueOptions

Optional queue configuration.

cancellationToken CancellationToken

Cancellation token.

Returns

ValueTask<Dictionary<(string, string), IP2PMessageQueue<T>>>

Dictionary of (sourceId, destId) -> queue.

Type Parameters

T

The message type.

CreateQueueAsync<T>(IAccelerator, IAccelerator, P2PMessageQueueOptions?, CancellationToken)

Creates a P2P message queue between two devices.

public ValueTask<IP2PMessageQueue<T>> CreateQueueAsync<T>(IAccelerator sourceDevice, IAccelerator destinationDevice, P2PMessageQueueOptions? options = null, CancellationToken cancellationToken = default) where T : unmanaged

Parameters

sourceDevice IAccelerator

The source (sending) device.

destinationDevice IAccelerator

The destination (receiving) device.

options P2PMessageQueueOptions

Optional queue configuration.

cancellationToken CancellationToken

Cancellation token.

Returns

ValueTask<IP2PMessageQueue<T>>

The created P2P message queue.

Type Parameters

T

The message type.

CreateRingAsync<T>(IAccelerator[], P2PMessageQueueOptions?, CancellationToken)

Creates a ring of P2P queues connecting all devices.

public ValueTask<IP2PMessageQueue<T>[]> CreateRingAsync<T>(IAccelerator[] devices, P2PMessageQueueOptions? options = null, CancellationToken cancellationToken = default) where T : unmanaged

Parameters

devices IAccelerator[]

The devices to connect in a ring.

options P2PMessageQueueOptions

Optional queue configuration.

cancellationToken CancellationToken

Cancellation token.

Returns

ValueTask<IP2PMessageQueue<T>[]>

Array of queues forming a ring: [0->1, 1->2, ..., N-1->0].

Type Parameters

T

The message type.

DisposeAsync()

Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources asynchronously.

public ValueTask DisposeAsync()

Returns

ValueTask

A task that represents the asynchronous dispose operation.

GetConnectionInfoAsync(IAccelerator, IAccelerator, CancellationToken)

Gets P2P connection information between two devices.

public ValueTask<P2PConnectionInfo> GetConnectionInfoAsync(IAccelerator sourceDevice, IAccelerator destinationDevice, CancellationToken cancellationToken = default)

Parameters

sourceDevice IAccelerator

The source device.

destinationDevice IAccelerator

The destination device.

cancellationToken CancellationToken

Cancellation token.

Returns

ValueTask<P2PConnectionInfo>

The connection information.

InitializeTopologyAsync(IAccelerator[], CancellationToken)

Initializes P2P topology and capability detection for a set of devices.

public Task<P2PTopologyResult> InitializeTopologyAsync(IAccelerator[] devices, CancellationToken cancellationToken = default)

Parameters

devices IAccelerator[]

The devices to initialize P2P for.

cancellationToken CancellationToken

Cancellation token.

Returns

Task<P2PTopologyResult>

Topology initialization result.