Core Abstractions
Mapping DotCompute Types to Rust
This document details the core traits and types that form the Ring Kernel abstraction layer.
1. RingMessage Trait
DotCompute: IRingKernelMessage
// crates/ringkernel-core/src/message.rs
use std::any::TypeId;
/// Marker trait for messages that can be sent between ring kernels.
///
/// Implementations are typically derived using `#[derive(RingMessage)]`.
pub trait RingMessage: Send + Sync + Sized + 'static {
/// Unique message type identifier for routing/deserialization.
const MESSAGE_TYPE: &'static str;
/// Returns the message ID (for tracking/deduplication).
fn message_id(&self) -> Uuid;
/// Sets the message ID.
fn set_message_id(&mut self, id: Uuid);
/// Priority level (0-255, higher = more urgent).
fn priority(&self) -> u8;
/// Optional correlation ID for request/response patterns.
fn correlation_id(&self) -> Option<Uuid>;
/// Serialized size in bytes.
fn payload_size(&self) -> usize;
}
/// Priority levels following DotCompute conventions.
pub mod priority {
pub const LOW: u8 = 32; // 0-63: Batch processing
pub const NORMAL: u8 = 128; // 64-127: Default
pub const HIGH: u8 = 160; // 128-191: Interactive
pub const CRITICAL: u8 = 224; // 192-255: System messages
}
Derive Macro Usage
use ringkernel::prelude::*;
use rkyv::{Archive, Serialize, Deserialize};
#[derive(RingMessage, Archive, Serialize, Deserialize)]
#[rkyv(compare(PartialEq))]
pub struct VectorAddRequest {
#[message(id)]
pub id: Uuid,
#[message(priority = "priority::NORMAL")]
pub priority: u8,
#[message(correlation)]
pub correlation_id: Option<Uuid>,
// Payload fields
pub a: Vec<f32>,
pub b: Vec<f32>,
}
2. MessageQueue Trait
DotCompute: IMessageQueue<T>
// crates/ringkernel-core/src/queue.rs
use std::future::Future;
/// Lock-free ring buffer for message passing.
pub trait MessageQueue<T: RingMessage>: Send + Sync {
/// Maximum capacity (must be power of 2).
fn capacity(&self) -> usize;
/// Current message count.
fn len(&self) -> usize;
/// Check if queue is empty.
fn is_empty(&self) -> bool { self.len() == 0 }
/// Check if queue is full.
fn is_full(&self) -> bool { self.len() >= self.capacity() - 1 }
/// Non-blocking enqueue attempt.
fn try_enqueue(&self, message: T) -> Result<(), QueueError<T>>;
/// Non-blocking dequeue attempt.
fn try_dequeue(&self) -> Option<T>;
/// Blocking enqueue with timeout.
fn enqueue(&self, message: T, timeout: Duration) -> impl Future<Output = Result<(), QueueError<T>>> + Send;
/// Blocking dequeue with timeout.
fn dequeue(&self, timeout: Duration) -> impl Future<Output = Result<T, QueueError<T>>> + Send;
/// Get queue statistics.
fn statistics(&self) -> QueueStatistics;
}
/// Queue operation errors.
#[derive(Debug, thiserror::Error)]
pub enum QueueError<T> {
#[error("Queue is full")]
Full(T),
#[error("Queue is empty")]
Empty,
#[error("Operation timed out")]
Timeout,
#[error("Queue has been closed")]
Closed,
}
/// Queue performance statistics.
#[derive(Debug, Clone, Default)]
pub struct QueueStatistics {
pub total_enqueued: u64,
pub total_dequeued: u64,
pub total_dropped: u64,
pub average_latency_ns: u64,
pub peak_depth: usize,
}
3. RingKernelRuntime Trait
DotCompute: IRingKernelRuntime
// crates/ringkernel-core/src/runtime.rs
use async_trait::async_trait;
/// Runtime for managing ring kernel lifecycles.
#[async_trait]
pub trait RingKernelRuntime: Send + Sync {
/// Backend identifier (e.g., "cuda", "metal", "wgpu").
fn backend_name(&self) -> &'static str;
/// Launch a kernel (initially inactive).
async fn launch(
&self,
kernel_id: &str,
grid_size: Dim3,
block_size: Dim3,
options: LaunchOptions,
) -> Result<KernelHandle>;
/// Activate a launched kernel (begin processing).
async fn activate(&self, kernel_id: &str) -> Result<()>;
/// Deactivate a kernel (pause, preserve state).
async fn deactivate(&self, kernel_id: &str) -> Result<()>;
/// Terminate a kernel (cleanup resources).
async fn terminate(&self, kernel_id: &str) -> Result<()>;
/// Send a message to a kernel.
async fn send<T: RingMessage>(&self, kernel_id: &str, message: T) -> Result<()>;
/// Receive a message from a kernel.
async fn receive<T: RingMessage>(&self, kernel_id: &str, timeout: Duration) -> Result<T>;
/// Get kernel status.
async fn status(&self, kernel_id: &str) -> Result<KernelStatus>;
/// Get kernel metrics.
async fn metrics(&self, kernel_id: &str) -> Result<KernelMetrics>;
/// Get real-time telemetry (<1μs latency).
async fn telemetry(&self, kernel_id: &str) -> Result<TelemetrySnapshot>;
/// List all managed kernels.
async fn list_kernels(&self) -> Result<Vec<String>>;
}
/// Grid/block dimensions.
#[derive(Debug, Clone, Copy, Default)]
pub struct Dim3 {
pub x: u32,
pub y: u32,
pub z: u32,
}
impl Dim3 {
pub fn new(x: u32, y: u32, z: u32) -> Self {
Self { x, y, z }
}
pub fn linear(size: u32) -> Self {
Self { x: size, y: 1, z: 1 }
}
}
4. RingContext Struct
DotCompute: RingKernelContext (ref struct)
// crates/ringkernel-core/src/context.rs
/// Runtime context passed to ring kernel handlers.
///
/// Methods translate to GPU intrinsics during compilation.
pub struct RingContext<'a> {
// Internal pointers (populated by runtime)
control_block: &'a ControlBlock,
input_queue: *const u8,
output_queue: *mut u8,
hlc: &'a mut HlcState,
}
impl<'a> RingContext<'a> {
// ═══════════════════════════════════════════════════════════════
// Thread Identity
// ═══════════════════════════════════════════════════════════════
/// Thread index within block (→ threadIdx.x)
#[inline(always)]
pub fn thread_id(&self) -> u32 { /* intrinsic */ 0 }
/// Block index within grid (→ blockIdx.x)
#[inline(always)]
pub fn block_id(&self) -> u32 { /* intrinsic */ 0 }
/// Warp index (thread_id / 32)
#[inline(always)]
pub fn warp_id(&self) -> u32 { self.thread_id() / 32 }
/// Lane within warp (thread_id % 32)
#[inline(always)]
pub fn lane_id(&self) -> u32 { self.thread_id() % 32 }
/// Global thread index (→ blockIdx.x * blockDim.x + threadIdx.x)
#[inline(always)]
pub fn global_thread_id(&self) -> u32 { /* intrinsic */ 0 }
// ═══════════════════════════════════════════════════════════════
// Synchronization Barriers
// ═══════════════════════════════════════════════════════════════
/// Block-level barrier (→ __syncthreads())
#[inline(always)]
pub fn sync_threads(&self) { /* intrinsic */ }
/// Grid-level barrier (→ cooperative_groups::grid::sync())
#[inline(always)]
pub fn sync_grid(&self) { /* intrinsic */ }
/// Warp-level barrier (→ __syncwarp(mask))
#[inline(always)]
pub fn sync_warp(&self, mask: u32) { /* intrinsic */ }
/// Named barrier for cross-kernel sync
pub fn named_barrier(&self, name: &str) { /* runtime */ }
// ═══════════════════════════════════════════════════════════════
// Temporal Operations (HLC)
// ═══════════════════════════════════════════════════════════════
/// Get current HLC timestamp (→ clock64() + logical)
#[inline(always)]
pub fn now(&self) -> HlcTimestamp { /* intrinsic */ HlcTimestamp::default() }
/// Advance local clock (local event)
#[inline(always)]
pub fn tick(&mut self) { /* intrinsic */ }
/// Merge received timestamp (causal ordering)
#[inline(always)]
pub fn update_clock(&mut self, received: HlcTimestamp) { /* intrinsic */ }
// ═══════════════════════════════════════════════════════════════
// Memory Ordering
// ═══════════════════════════════════════════════════════════════
/// Device-scope fence (→ __threadfence())
#[inline(always)]
pub fn thread_fence(&self) { /* intrinsic */ }
/// Block-scope fence (→ __threadfence_block())
#[inline(always)]
pub fn thread_fence_block(&self) { /* intrinsic */ }
/// System-scope fence (→ __threadfence_system())
#[inline(always)]
pub fn thread_fence_system(&self) { /* intrinsic */ }
// ═══════════════════════════════════════════════════════════════
// Atomic Operations
// ═══════════════════════════════════════════════════════════════
/// Atomic add (→ atomicAdd)
#[inline(always)]
pub fn atomic_add(&self, target: &mut i32, value: i32) -> i32 { /* intrinsic */ 0 }
/// Atomic CAS (→ atomicCAS)
#[inline(always)]
pub fn atomic_cas(&self, target: &mut i32, compare: i32, value: i32) -> i32 { /* intrinsic */ 0 }
/// Atomic exchange (→ atomicExch)
#[inline(always)]
pub fn atomic_exch(&self, target: &mut i32, value: i32) -> i32 { /* intrinsic */ 0 }
// ═══════════════════════════════════════════════════════════════
// Warp Primitives
// ═══════════════════════════════════════════════════════════════
/// Warp shuffle (→ __shfl_sync)
#[inline(always)]
pub fn warp_shuffle(&self, value: i32, src_lane: u32, mask: u32) -> i32 { /* intrinsic */ 0 }
/// Warp ballot (→ __ballot_sync)
#[inline(always)]
pub fn warp_ballot(&self, predicate: bool, mask: u32) -> u32 { /* intrinsic */ 0 }
// ═══════════════════════════════════════════════════════════════
// Queue Operations
// ═══════════════════════════════════════════════════════════════
/// Enqueue output message
pub fn enqueue_output<T: RingMessage>(&mut self, message: T) -> bool { false }
/// Check output queue capacity
pub fn output_queue_free_slots(&self) -> usize { 0 }
// ═══════════════════════════════════════════════════════════════
// K2K Messaging
// ═══════════════════════════════════════════════════════════════
/// Send to another kernel
pub fn send_to_kernel<T: RingMessage>(&mut self, target: &str, message: T) -> bool { false }
/// Receive from another kernel
pub fn try_receive_from_kernel<T: RingMessage>(&mut self, source: &str) -> Option<T> { None }
// ═══════════════════════════════════════════════════════════════
// Control
// ═══════════════════════════════════════════════════════════════
/// Request graceful termination
pub fn request_termination(&self) { /* write control block */ }
/// Check if termination requested
pub fn is_termination_requested(&self) -> bool { false }
/// Report an error
pub fn report_error(&self) { /* atomic increment */ }
}
5. HlcTimestamp
DotCompute: HlcTimestamp (Hybrid Logical Clock)
// crates/ringkernel-core/src/hlc.rs
use std::cmp::Ordering;
/// Hybrid Logical Clock timestamp for causal ordering.
///
/// Combines physical time (wall clock) with logical counter
/// to provide total ordering even when physical clocks disagree.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(C)]
pub struct HlcTimestamp {
/// Physical time component (nanoseconds since epoch or GPU ticks)
physical: u64,
/// Logical counter for events at same physical time
logical: u32,
/// Node/kernel identifier (for tie-breaking)
node_id: u32,
}
impl HlcTimestamp {
/// Create a new timestamp.
pub fn new(physical: u64, logical: u32, node_id: u32) -> Self {
Self { physical, logical, node_id }
}
/// Get physical time component.
pub fn physical(&self) -> u64 { self.physical }
/// Get logical counter.
pub fn logical(&self) -> u32 { self.logical }
/// Advance for local event (tick).
pub fn tick(&mut self) {
self.logical = self.logical.wrapping_add(1);
}
/// Merge with received timestamp (max + 1).
pub fn merge(&mut self, other: &HlcTimestamp) {
if other.physical > self.physical {
self.physical = other.physical;
self.logical = other.logical + 1;
} else if other.physical == self.physical {
self.logical = self.logical.max(other.logical) + 1;
} else {
self.logical += 1;
}
}
/// Update physical time if wall clock advanced.
pub fn update_physical(&mut self, wall_clock: u64) {
if wall_clock > self.physical {
self.physical = wall_clock;
self.logical = 0;
}
}
}
impl Ord for HlcTimestamp {
fn cmp(&self, other: &Self) -> Ordering {
match self.physical.cmp(&other.physical) {
Ordering::Equal => match self.logical.cmp(&other.logical) {
Ordering::Equal => self.node_id.cmp(&other.node_id),
other => other,
},
other => other,
}
}
}
impl PartialOrd for HlcTimestamp {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
6. ControlBlock and TelemetryBuffer
See Architecture Overview for the full struct definitions.