Lifecycle¶
Kernel state management and lifecycle transitions.
Overview¶
Ring kernels follow a well-defined lifecycle with explicit state transitions. This enables proper resource management, graceful shutdown, and pause/resume functionality.
Enums¶
KernelState¶
class KernelState(Enum):
"""Possible states for a ring kernel."""
CREATED = "created" # Defined but not launched
LAUNCHED = "launched" # Resources allocated, not processing
ACTIVE = "active" # Processing messages
DEACTIVATED = "deactivated" # Paused, can reactivate
TERMINATING = "terminating" # Shutdown in progress
TERMINATED = "terminated" # Fully stopped
Classes¶
RingKernel¶
@dataclass
class RingKernel:
"""Represents a ring kernel instance."""
kernel_id: str
func: Callable
input_queue: MessageQueue
output_queue: MessageQueue
state: KernelState = KernelState.CREATED
task: asyncio.Task | None = None
KernelContext¶
class KernelContext(Generic[TIn, TOut]):
"""
Context provided to ring kernel functions.
This is the primary interface actors use to:
- Receive input messages
- Send output messages
- Check lifecycle state
- Wait for state changes
"""
KernelContext Properties¶
kernel_id¶
should_terminate¶
is_active¶
state¶
KernelContext Methods¶
receive¶
async def receive(self, *, timeout: float | None = None) -> TIn:
"""
Receive a message from the input queue.
Args:
timeout: Maximum wait time
Returns:
The received message
Raises:
asyncio.TimeoutError: If timeout exceeded
"""
send¶
async def send(self, message: TOut) -> None:
"""
Send a message to the output queue.
Args:
message: Message to send
"""
wait_active¶
request_termination¶
State Transitions¶
Valid Transitions¶
| From | To | Method |
|---|---|---|
| CREATED | LAUNCHED | runtime.launch() |
| LAUNCHED | ACTIVE | runtime.activate() |
| ACTIVE | DEACTIVATED | runtime.deactivate() |
| DEACTIVATED | ACTIVE | runtime.reactivate() |
| ACTIVE | TERMINATING | runtime.terminate() |
| DEACTIVATED | TERMINATING | runtime.terminate() |
| LAUNCHED | TERMINATING | runtime.terminate() |
| TERMINATING | TERMINATED | (automatic) |
State Diagram¶
┌──────────────────┐
│ CREATED │
└────────┬─────────┘
│ launch()
┌────────▼─────────┐
│ LAUNCHED │
└────────┬─────────┘
│ activate()
┌────────▼─────────┐
┌─────────►│ ACTIVE │◄─────────┐
│ └────────┬─────────┘ │
│ │ deactivate() │
│ ┌────────▼─────────┐ │
│ │ DEACTIVATED │──────────┘
│ └────────┬─────────┘ reactivate()
│ │
│ │ terminate()
│ ┌────────▼─────────┐
└──────────│ TERMINATING │
terminate()└────────┬─────────┘
│ (automatic)
┌────────▼─────────┐
│ TERMINATED │
└──────────────────┘
Usage Examples¶
Standard Actor Loop¶
@ring_kernel(kernel_id="worker")
async def worker(ctx: KernelContext):
print(f"[{ctx.kernel_id}] Started")
while not ctx.should_terminate:
# Check if we should pause
if not ctx.is_active:
print(f"[{ctx.kernel_id}] Waiting for activation...")
await ctx.wait_active()
continue
try:
# Receive with timeout for responsive shutdown
msg = await ctx.receive(timeout=0.1)
# Process message
result = process(msg)
# Send response
await ctx.send(result)
except asyncio.TimeoutError:
# No message - check termination and continue
continue
print(f"[{ctx.kernel_id}] Terminated")
Checking State in Application¶
async with RingKernelRuntime() as runtime:
await runtime.launch("worker")
state = runtime.get_state("worker")
assert state == KernelState.LAUNCHED
await runtime.activate("worker")
state = runtime.get_state("worker")
assert state == KernelState.ACTIVE
Graceful Shutdown¶
async with RingKernelRuntime() as runtime:
await runtime.launch("worker")
await runtime.activate("worker")
# ... do work ...
# Graceful termination with timeout
await runtime.terminate("worker", timeout=5.0)
state = runtime.get_state("worker")
assert state == KernelState.TERMINATED
Pause and Resume¶
async with RingKernelRuntime() as runtime:
await runtime.launch("worker")
await runtime.activate("worker")
# Process some messages
for msg in batch1:
await runtime.send("worker", msg)
# Pause processing
await runtime.deactivate("worker")
assert runtime.get_state("worker") == KernelState.DEACTIVATED
# Messages queue but don't process
for msg in batch2:
await runtime.send("worker", msg)
# Resume
await runtime.reactivate("worker")
assert runtime.get_state("worker") == KernelState.ACTIVE
# Now batch2 messages are processed
Context Manager¶
managed_kernel¶
@asynccontextmanager
async def managed_kernel(
runtime: RingKernelRuntime,
kernel_id: str,
kernel_func: Callable | None = None,
) -> AsyncIterator[str]:
"""
Context manager for automatic kernel lifecycle.
Launches, activates, and terminates a kernel automatically.
"""
Usage:
async with RingKernelRuntime() as runtime:
async with managed_kernel(runtime, "worker", worker_func) as kernel_id:
# Kernel is launched and active
await runtime.send(kernel_id, message)
response = await runtime.receive(kernel_id)
# Kernel automatically terminated
Best Practices¶
-
Always Check Termination: Use
while not ctx.should_terminate -
Use Timeout in Receive: Allows responsive shutdown
-
Handle Deactivation: Check
is_activeand usewait_active() -
Graceful Shutdown: Allow time for in-flight messages
-
Use Context Manager: For automatic cleanup
Notes¶
- Invalid state transitions raise
KernelStateError - Terminated kernels cannot be restarted
- The runtime context manager terminates all kernels on exit
- State is tracked per-kernel, not globally