Phase 7 Implementation Guide
Queue-Depth Aware Placement for GPU-Native Actors
Overview
Phase 7 focuses on intelligent grain placement based on GPU ring kernel queue depths. This enables automatic load balancing and optimal resource utilization across GPU-enabled silos.
Goals
- Monitor Queue Depths: Real-time tracking of ring kernel message queue utilization
- Intelligent Placement: Place new activations on silos with lowest queue pressure
- Dynamic Rebalancing: Migrate actors when queues become unbalanced
- Overflow Prevention: Detect and handle queue saturation before message loss
Architecture
Component Overview
┌─────────────────────────────────────────────────────────────┐
│ Orleans Cluster │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Silo A │ │ Silo B │ │
│ │ │ │ │ │
│ │ ┌────────────┐ │ │ ┌────────────┐ │ │
│ │ │ Queue │ │ │ │ Queue │ │ │
│ │ │ Monitor │ │ │ │ Monitor │ │ │
│ │ └─────┬──────┘ │ │ └─────┬──────┘ │ │
│ │ │ │ │ │ │ │
│ │ ┌─────▼──────┐ │ │ ┌─────▼──────┐ │ │
│ │ │ Ring │ │ │ │ Ring │ │ │
│ │ │ Kernels │ │ │ │ Kernels │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ Q1: 45% │ │ │ │ Q1: 82% │ │ │
│ │ │ Q2: 23% │ │ │ │ Q2: 15% │ │ │
│ │ │ Q3: 67% │ │ │ │ Q3: 91% │ │ │
│ │ └────────────┘ │ │ └────────────┘ │ │
│ │ │ │ │ │
│ │ Avg: 45% │ │ Avg: 63% │ │
│ └──────────────────┘ └──────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Placement Director │ │
│ │ │ │
│ │ New GpuResident activation request │ │
│ │ → Query all silos for queue depths │ │
│ │ → Select silo with lowest average (Silo A: 45%) │ │
│ │ → Activate on selected silo │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
New Components
1. IQueueDepthMonitor
/// <summary>
/// Monitors ring kernel queue depths for placement decisions.
/// </summary>
public interface IQueueDepthMonitor
{
/// <summary>
/// Get current queue utilization for all kernels on this silo.
/// </summary>
Task<QueueDepthSnapshot> GetSnapshotAsync();
/// <summary>
/// Get queue depth for a specific kernel.
/// </summary>
Task<float> GetKernelUtilizationAsync(string kernelId);
/// <summary>
/// Subscribe to queue depth changes.
/// </summary>
IAsyncEnumerable<QueueDepthChange> WatchAsync(
float threshold = 0.8f,
CancellationToken ct = default);
}
public record QueueDepthSnapshot(
string SiloId,
DateTimeOffset Timestamp,
IReadOnlyDictionary<string, float> KernelUtilizations,
float AverageUtilization,
float MaxUtilization);
public record QueueDepthChange(
string KernelId,
float OldUtilization,
float NewUtilization,
bool IsOverThreshold);
2. QueueDepthAwarePlacementDirector
/// <summary>
/// Placement director that considers ring kernel queue depths.
/// </summary>
public class QueueDepthAwarePlacementDirector : IPlacementDirector
{
private readonly IQueueDepthMonitor _monitor;
private readonly ILogger<QueueDepthAwarePlacementDirector> _logger;
public async Task<SiloAddress> OnAddActivation(
PlacementStrategy strategy,
PlacementTarget target,
IPlacementContext context)
{
// Get queue depths from all GPU-enabled silos
var candidates = await GetGpuEnabledSilosAsync(context);
var snapshots = await Task.WhenAll(
candidates.Select(s => GetQueueSnapshotAsync(s)));
// Select silo with lowest average queue depth
var selected = snapshots
.Where(s => s.AverageUtilization < 0.8f) // Filter overloaded
.OrderBy(s => s.AverageUtilization)
.FirstOrDefault();
if (selected == null)
{
_logger.LogWarning(
"All GPU silos above 80% queue utilization, " +
"falling back to random placement");
return candidates.RandomElement();
}
return selected.SiloAddress;
}
}
3. QueueOverflowHandler
/// <summary>
/// Handles queue overflow scenarios to prevent message loss.
/// </summary>
public interface IQueueOverflowHandler
{
/// <summary>
/// Handle queue full condition.
/// </summary>
Task<OverflowResult> HandleOverflowAsync(
string kernelId,
GpuMessage message,
OverflowStrategy strategy);
}
public enum OverflowStrategy
{
/// <summary>
/// Block until space available.
/// </summary>
Block,
/// <summary>
/// Drop oldest messages.
/// </summary>
DropOldest,
/// <summary>
/// Drop new message.
/// </summary>
DropNew,
/// <summary>
/// Redirect to CPU fallback.
/// </summary>
CpuFallback,
/// <summary>
/// Migrate actor to less loaded silo.
/// </summary>
Migrate
}
Implementation Plan
Step 1: Queue Depth Metrics
Add metrics collection to DotComputeRingKernelRuntime:
public class DotComputeRingKernelRuntime : IRingKernelRuntime
{
private readonly ConcurrentDictionary<string, QueueMetrics> _queueMetrics = new();
public async Task<RingKernelMetrics> GetMetricsAsync(string kernelId)
{
var controlBlock = await ReadControlBlockAsync(kernelId);
var metrics = new RingKernelMetrics
{
KernelId = kernelId,
InputQueueUtilization = CalculateUtilization(
controlBlock.InputQueueHead,
controlBlock.InputQueueTail,
_queueCapacity),
OutputQueueUtilization = CalculateUtilization(
controlBlock.OutputQueueHead,
controlBlock.OutputQueueTail,
_queueCapacity),
MessagesProcessed = controlBlock.MessagesProcessed,
ErrorsEncountered = controlBlock.ErrorsEncountered,
LastActivityTime = controlBlock.LastActivityTicks
};
return metrics;
}
private float CalculateUtilization(long head, long tail, int capacity)
{
long count = head - tail;
if (count < 0) count += capacity;
return (float)count / capacity;
}
}
Step 2: Cross-Silo Metrics Collection
Implement distributed metrics aggregation:
public class DistributedQueueMonitor : IQueueDepthMonitor
{
private readonly IClusterClient _clusterClient;
private readonly ILocalSiloDetails _siloDetails;
public async Task<IReadOnlyList<QueueDepthSnapshot>> GetClusterSnapshotsAsync()
{
var silos = await GetGpuEnabledSilosAsync();
var tasks = silos.Select(async silo =>
{
var grain = _clusterClient.GetGrain<IQueueMonitorGrain>(silo.Address);
return await grain.GetSnapshotAsync();
});
return await Task.WhenAll(tasks);
}
}
/// <summary>
/// Per-silo grain for queue monitoring.
/// </summary>
public interface IQueueMonitorGrain : IGrainWithStringKey
{
Task<QueueDepthSnapshot> GetSnapshotAsync();
Task<float> GetKernelUtilizationAsync(string kernelId);
}
Step 3: Placement Integration
Register the placement director:
public static class GpuPlacementExtensions
{
public static ISiloBuilder UseQueueDepthAwarePlacement(
this ISiloBuilder builder,
Action<QueueDepthPlacementOptions>? configure = null)
{
var options = new QueueDepthPlacementOptions();
configure?.Invoke(options);
builder.ConfigureServices(services =>
{
services.AddSingleton(options);
services.AddSingleton<IQueueDepthMonitor, DistributedQueueMonitor>();
services.AddSingleton<IPlacementDirector, QueueDepthAwarePlacementDirector>();
});
return builder;
}
}
public class QueueDepthPlacementOptions
{
public float HighUtilizationThreshold { get; set; } = 0.8f;
public float CriticalUtilizationThreshold { get; set; } = 0.95f;
public TimeSpan MetricsRefreshInterval { get; set; } = TimeSpan.FromSeconds(1);
public OverflowStrategy DefaultOverflowStrategy { get; set; } = OverflowStrategy.Block;
}
Step 4: Grain Attribute
/// <summary>
/// Marks a grain for queue-depth-aware placement.
/// </summary>
[AttributeUsage(AttributeTargets.Class)]
public class QueueDepthAwarePlacementAttribute : PlacementAttribute
{
public QueueDepthAwarePlacementAttribute()
: base(new QueueDepthAwarePlacementStrategy())
{
}
public float PreferredMaxUtilization { get; set; } = 0.7f;
}
Usage Example
[GpuResident]
[QueueDepthAwarePlacement(PreferredMaxUtilization = 0.6f)]
public class HighThroughputVertexGrain : Grain, ITemporalGraphVertex
{
private readonly IGpuResidentManager _residentManager;
public override async Task OnActivateAsync(CancellationToken ct)
{
// Grain will be placed on silo with lowest queue depth
_gpuHandle = await _residentManager.AllocateVertexAsync(
(ulong)this.GetPrimaryKeyLong());
await base.OnActivateAsync(ct);
}
public async Task AddEdgeAsync(ulong targetId, long timestamp)
{
// If queue is full, overflow handler kicks in
await _residentManager.SendMessageAsync(_vertexIndex, new GpuMessage
{
Type = MessageType.AddEdge,
Data0 = (long)targetId,
Data1 = timestamp
});
}
}
Testing Strategy
Unit Tests
[Fact]
public async Task QueueMonitor_ReturnsCorrectUtilization()
{
// Arrange
var runtime = CreateMockRuntime(queueCapacity: 1000, messagesInQueue: 450);
var monitor = new LocalQueueMonitor(runtime);
// Act
var snapshot = await monitor.GetSnapshotAsync();
// Assert
Assert.Equal(0.45f, snapshot.AverageUtilization, precision: 2);
}
[Fact]
public async Task PlacementDirector_SelectsLowestUtilizationSilo()
{
// Arrange
var siloA = CreateSiloWithUtilization(0.45f);
var siloB = CreateSiloWithUtilization(0.63f);
var siloC = CreateSiloWithUtilization(0.82f);
var director = new QueueDepthAwarePlacementDirector([siloA, siloB, siloC]);
// Act
var selected = await director.OnAddActivation(...);
// Assert
Assert.Equal(siloA.Address, selected);
}
Integration Tests
[Fact]
public async Task HighLoad_DistributesAcrossSilos()
{
// Arrange
var cluster = CreateTestCluster(silos: 3, gpuPerSilo: 1);
// Act: Create 100 GPU-resident actors
var tasks = Enumerable.Range(0, 100)
.Select(i => cluster.GetGrain<IVertexGrain>(i).ActivateAsync());
await Task.WhenAll(tasks);
// Assert: Distribution should be roughly balanced
var distribution = await GetActorDistributionAsync(cluster);
Assert.All(distribution.Values, count =>
Assert.InRange(count, 25, 45)); // ~33 ± 12 per silo
}
Migration Path
From v0.1.0 to v0.2.0
- No breaking changes: Existing
[GpuResident]grains continue to work - Opt-in: Add
[QueueDepthAwarePlacement]attribute for new behavior - Configuration: Enable cluster-wide via
UseQueueDepthAwarePlacement()
Backward Compatibility
// v0.1.0 style (continues to work)
[GpuResident]
public class LegacyGrain : Grain { }
// v0.2.0 style (opt-in to queue awareness)
[GpuResident]
[QueueDepthAwarePlacement]
public class SmartGrain : Grain { }
Success Criteria
| Metric | Target | Measurement |
|---|---|---|
| Queue utilization variance | <15% | Across silos |
| Placement decision latency | <10ms | P99 |
| No message drops | 100% | Under normal load |
| Overflow handling | <100ms | Recovery time |
See Also
- Implementation Roadmap - Phase overview
- Ring Kernel Integration - Queue architecture
- GPU Memory Operations - Memory management
Phase 7: Intelligent placement for optimal GPU utilization.
Target Version: 0.2.0 Last Updated: 2025-11-28