RingKernelRuntime¶
The main coordinator for managing ring kernel actors.
Overview¶
RingKernelRuntime is the central class for creating, managing, and communicating with ring kernel actors. It handles lifecycle management, message routing, and telemetry collection.
from pydotcompute import RingKernelRuntime
async with RingKernelRuntime() as runtime:
await runtime.launch("my_actor")
await runtime.activate("my_actor")
await runtime.send("my_actor", request)
response = await runtime.receive("my_actor")
Class Definition¶
class RingKernelRuntime:
"""Runtime coordinator for ring kernel actors."""
def __init__(
self,
*,
enable_telemetry: bool = False,
default_queue_size: int = 1000,
backend: str = "auto",
) -> None:
"""
Create a runtime instance.
Args:
enable_telemetry: Enable performance monitoring
default_queue_size: Default message queue size
backend: Backend to use ("auto", "cpu", "cuda")
"""
Lifecycle Methods¶
launch¶
async def launch(
self,
kernel_id: str,
kernel_func: Callable | None = None,
*,
queue_size: int | None = None,
) -> None:
"""
Launch a kernel (Phase 1: allocate resources).
Args:
kernel_id: Unique identifier for the kernel
kernel_func: Kernel function (uses registered if None)
queue_size: Override default queue size
Raises:
KernelNotFoundError: If kernel_id not registered and no func provided
KernelStateError: If kernel already launched
"""
activate¶
async def activate(self, kernel_id: str) -> None:
"""
Activate a kernel (Phase 2: start processing).
Args:
kernel_id: Kernel to activate
Raises:
KernelNotFoundError: If kernel doesn't exist
KernelStateError: If kernel not in LAUNCHED state
"""
deactivate¶
async def deactivate(self, kernel_id: str) -> None:
"""
Deactivate a kernel (pause processing).
Messages continue to queue but won't be processed.
Args:
kernel_id: Kernel to deactivate
Raises:
KernelNotFoundError: If kernel doesn't exist
KernelStateError: If kernel not active
"""
reactivate¶
async def reactivate(self, kernel_id: str) -> None:
"""
Reactivate a deactivated kernel.
Args:
kernel_id: Kernel to reactivate
Raises:
KernelNotFoundError: If kernel doesn't exist
KernelStateError: If kernel not deactivated
"""
terminate¶
async def terminate(self, kernel_id: str, *, timeout: float = 5.0) -> None:
"""
Terminate a kernel (graceful shutdown).
Args:
kernel_id: Kernel to terminate
timeout: Maximum wait time for graceful shutdown
Raises:
KernelNotFoundError: If kernel doesn't exist
"""
terminate_all¶
async def terminate_all(self, *, timeout: float = 10.0) -> None:
"""
Terminate all kernels.
Args:
timeout: Maximum total wait time
"""
Message Methods¶
send¶
async def send(
self,
kernel_id: str,
message: Any,
*,
timeout: float | None = None,
) -> None:
"""
Send a message to a kernel's input queue.
Args:
kernel_id: Target kernel
message: Message to send
timeout: Maximum wait time if queue is full
Raises:
KernelNotFoundError: If kernel doesn't exist
QueueFullError: If queue full and timeout exceeded
"""
receive¶
async def receive(
self,
kernel_id: str,
*,
timeout: float | None = None,
) -> Any:
"""
Receive a message from a kernel's output queue.
Args:
kernel_id: Source kernel
timeout: Maximum wait time
Returns:
The received message
Raises:
KernelNotFoundError: If kernel doesn't exist
asyncio.TimeoutError: If timeout exceeded
"""
send_batch¶
async def send_batch(
self,
kernel_id: str,
messages: list[Any],
*,
timeout: float | None = None,
) -> int:
"""
Send multiple messages to a kernel.
Args:
kernel_id: Target kernel
messages: Messages to send
timeout: Timeout per message
Returns:
Number of messages successfully sent
"""
Query Methods¶
get_state¶
def get_state(self, kernel_id: str) -> KernelState:
"""
Get current state of a kernel.
Args:
kernel_id: Kernel to query
Returns:
Current KernelState
Raises:
KernelNotFoundError: If kernel doesn't exist
"""
kernel_ids¶
active_kernels¶
Telemetry Methods¶
get_telemetry¶
def get_telemetry(self, kernel_id: str) -> KernelTelemetry | None:
"""
Get telemetry for a kernel.
Args:
kernel_id: Kernel to query
Returns:
KernelTelemetry or None if telemetry disabled
"""
get_summary¶
def get_summary(self) -> dict[str, Any]:
"""
Get summary of all kernels.
Returns:
Dictionary with runtime statistics
"""
Context Manager¶
async def __aenter__(self) -> RingKernelRuntime:
"""Enter async context."""
async def __aexit__(self, *args) -> None:
"""Exit async context, terminating all kernels."""
Usage Examples¶
Complete Lifecycle¶
from pydotcompute import RingKernelRuntime, ring_kernel, message
@message
class Request:
value: int
@message
class Response:
result: int
@ring_kernel(kernel_id="doubler")
async def doubler(ctx):
while not ctx.should_terminate:
if not ctx.is_active:
await ctx.wait_active()
continue
try:
msg = await ctx.receive(timeout=0.1)
await ctx.send(Response(result=msg.value * 2))
except:
continue
async def main():
async with RingKernelRuntime(enable_telemetry=True) as runtime:
# Phase 1: Launch (allocate resources)
await runtime.launch("doubler")
# Phase 2: Activate (start processing)
await runtime.activate("doubler")
# Wait for startup
await asyncio.sleep(0.1)
# Send request
await runtime.send("doubler", Request(value=21))
# Receive response
response = await runtime.receive("doubler", timeout=1.0)
print(f"Result: {response.result}") # 42
# Check telemetry
telemetry = runtime.get_telemetry("doubler")
print(f"Messages processed: {telemetry.messages_processed}")
# Runtime automatically terminates all kernels on exit
Multiple Kernels¶
async with RingKernelRuntime() as runtime:
# Launch multiple kernels
await runtime.launch("processor1", processor_func)
await runtime.launch("processor2", processor_func)
await runtime.launch("aggregator", aggregator_func)
# Activate all
for kernel_id in runtime.kernel_ids:
await runtime.activate(kernel_id)
# Use pipeline...
Pause and Resume¶
async with RingKernelRuntime() as runtime:
await runtime.launch("worker")
await runtime.activate("worker")
# Process some work...
for req in batch1:
await runtime.send("worker", req)
# Pause for maintenance
await runtime.deactivate("worker")
# Messages queue up but aren't processed
for req in batch2:
await runtime.send("worker", req)
# Resume processing
await runtime.reactivate("worker")
# Queued messages now processed
Notes¶
- Always use as async context manager for proper cleanup
- Kernels must be launched before activation
- The runtime is the single point of contact for actors
- Telemetry adds minimal overhead when enabled