Building Actors¶
Best practices for creating robust ring kernel actors.
Actor Structure¶
Basic Template¶
Every actor should follow this structure:
from pydotcompute import ring_kernel, message
from pydotcompute.ring_kernels.lifecycle import KernelContext
@ring_kernel(kernel_id="my_actor")
async def my_actor(ctx: KernelContext) -> None:
"""
Actor description.
Explain what this actor does and any important behavior.
"""
# === INITIALIZATION ===
# One-time setup (load models, initialize state)
state = initialize_state()
# === MAIN LOOP ===
while not ctx.should_terminate:
# Handle deactivation
if not ctx.is_active:
await ctx.wait_active()
continue
try:
# Receive with timeout
request = await ctx.receive(timeout=0.1)
# Process request
response = process(request, state)
# Send response
await ctx.send(response)
except asyncio.TimeoutError:
continue
except Exception as e:
# Log but don't crash
log_error(e)
continue
# === CLEANUP ===
cleanup(state)
Message Design¶
Request Messages¶
@message
@dataclass
class CalculateRequest:
"""Request to perform calculation."""
# Business fields
a: float
b: float
operation: str = "add"
# Standard fields (added by @message but shown for clarity)
message_id: UUID = field(default_factory=uuid4)
priority: int = 128
correlation_id: UUID | None = None
Response Messages¶
@message
@dataclass
class CalculateResponse:
"""Response with calculation result."""
# Result field
result: float
# Success/error fields
success: bool = True
error: str | None = None
error_code: str | None = None
# Standard fields
message_id: UUID = field(default_factory=uuid4)
priority: int = 128
correlation_id: UUID | None = None
Design Guidelines¶
- Include Error Information: Always have
successanderrorfields - Use Error Codes: Machine-readable error classification
- Minimal Fields: Only include necessary data
- Default Values: Provide sensible defaults
- Type Hints: Enable IDE support and validation
Error Handling¶
Structured Error Responses¶
@ring_kernel(kernel_id="calculator")
async def calculator(ctx: KernelContext) -> None:
while not ctx.should_terminate:
if not ctx.is_active:
await ctx.wait_active()
continue
try:
request = await ctx.receive(timeout=0.1)
# Business logic with error handling
try:
result = calculate(request.a, request.b, request.operation)
await ctx.send(CalculateResponse(
result=result,
success=True,
correlation_id=request.message_id,
))
except ZeroDivisionError:
await ctx.send(CalculateResponse(
result=0.0,
success=False,
error="Division by zero",
error_code="DIVIDE_BY_ZERO",
correlation_id=request.message_id,
))
except ValueError as e:
await ctx.send(CalculateResponse(
result=0.0,
success=False,
error=str(e),
error_code="INVALID_INPUT",
correlation_id=request.message_id,
))
except asyncio.TimeoutError:
continue
except Exception as e:
# Unexpected error - log but continue
logging.exception("Unexpected error in calculator")
continue
Error Categories¶
| Type | Handling | Example |
|---|---|---|
| Business errors | Return error response | Invalid input |
| Infrastructure errors | Log and continue | Network timeout |
| Fatal errors | Let crash (supervisor will restart) | Out of memory |
Stateful Actors¶
Maintaining State¶
@ring_kernel(kernel_id="counter")
async def counter(ctx: KernelContext) -> None:
"""Counter that maintains state across messages."""
# Private state
count = 0
history: list[int] = []
while not ctx.should_terminate:
if not ctx.is_active:
await ctx.wait_active()
continue
try:
request = await ctx.receive(timeout=0.1)
if request.action == "increment":
count += 1
elif request.action == "decrement":
count -= 1
elif request.action == "reset":
count = 0
history.append(count)
await ctx.send(CounterResponse(
count=count,
correlation_id=request.message_id,
))
except asyncio.TimeoutError:
continue
State Persistence¶
For durable state, persist periodically:
@ring_kernel(kernel_id="persistent_actor")
async def persistent_actor(ctx: KernelContext) -> None:
# Load persisted state
state = load_state_from_disk("actor_state.json")
messages_since_save = 0
while not ctx.should_terminate:
if not ctx.is_active:
await ctx.wait_active()
continue
try:
request = await ctx.receive(timeout=0.1)
# Update state
update_state(state, request)
messages_since_save += 1
# Periodic persistence
if messages_since_save >= 100:
save_state_to_disk("actor_state.json", state)
messages_since_save = 0
await ctx.send(Response(...))
except asyncio.TimeoutError:
continue
# Save on shutdown
save_state_to_disk("actor_state.json", state)
Resource Management¶
Loading Heavy Resources¶
@ring_kernel(kernel_id="ml_inference")
async def ml_inference(ctx: KernelContext) -> None:
"""ML inference actor with model loading."""
# Load model once at startup
print(f"[{ctx.kernel_id}] Loading model...")
model = load_large_model("model.pt")
print(f"[{ctx.kernel_id}] Model loaded!")
while not ctx.should_terminate:
if not ctx.is_active:
await ctx.wait_active()
continue
try:
request = await ctx.receive(timeout=0.1)
# Use pre-loaded model
prediction = model.predict(request.data)
await ctx.send(InferenceResponse(
prediction=prediction,
correlation_id=request.message_id,
))
except asyncio.TimeoutError:
continue
# Cleanup
print(f"[{ctx.kernel_id}] Unloading model...")
del model
Buffer Management¶
from pydotcompute.core.memory_pool import get_memory_pool
@ring_kernel(kernel_id="buffer_processor")
async def buffer_processor(ctx: KernelContext) -> None:
pool = get_memory_pool()
while not ctx.should_terminate:
if not ctx.is_active:
await ctx.wait_active()
continue
try:
request = await ctx.receive(timeout=0.1)
# Acquire buffer from pool
buffer = pool.acquire(request.shape, dtype=np.float32)
try:
# Process using buffer
buffer.copy_from(request.data)
result = process_on_gpu(buffer.device)
await ctx.send(ProcessResponse(
result=result.tolist(),
correlation_id=request.message_id,
))
finally:
# Always release buffer
pool.release(buffer)
except asyncio.TimeoutError:
continue
Testing Actors¶
Unit Testing¶
import pytest
from pydotcompute import RingKernelRuntime
@pytest.mark.asyncio
async def test_calculator_add():
async with RingKernelRuntime() as runtime:
await runtime.launch("calculator", calculator)
await runtime.activate("calculator")
await asyncio.sleep(0.1)
# Send request
request = CalculateRequest(a=10, b=5, operation="add")
await runtime.send("calculator", request)
# Verify response
response = await runtime.receive("calculator", timeout=1.0)
assert response.success
assert response.result == 15.0
assert response.correlation_id == request.message_id
@pytest.mark.asyncio
async def test_calculator_divide_by_zero():
async with RingKernelRuntime() as runtime:
await runtime.launch("calculator", calculator)
await runtime.activate("calculator")
await asyncio.sleep(0.1)
request = CalculateRequest(a=10, b=0, operation="div")
await runtime.send("calculator", request)
response = await runtime.receive("calculator", timeout=1.0)
assert not response.success
assert response.error_code == "DIVIDE_BY_ZERO"
Performance Tips¶
-
Use Appropriate Timeouts: Short for responsive, longer for efficiency
-
Batch Processing: Process multiple messages per iteration
-
Async Operations: Don't block the event loop
-
Memory Pooling: Reuse buffers for reduced allocation
-
Profile: Measure before optimizing
Anti-Patterns¶
Don't Block the Event Loop¶
# BAD: Blocking operation
while not ctx.should_terminate:
request = await ctx.receive(timeout=0.1)
time.sleep(1) # BLOCKS EVENT LOOP!
await ctx.send(response)
# GOOD: Use async sleep
while not ctx.should_terminate:
request = await ctx.receive(timeout=0.1)
await asyncio.sleep(1) # Non-blocking
await ctx.send(response)
Don't Share State¶
# BAD: Shared mutable state
shared_counter = {"count": 0}
@ring_kernel(kernel_id="worker")
async def worker(ctx):
while not ctx.should_terminate:
shared_counter["count"] += 1 # RACE CONDITION!
# GOOD: Private state
@ring_kernel(kernel_id="worker")
async def worker(ctx):
count = 0 # Private to this actor
while not ctx.should_terminate:
count += 1
Don't Ignore Errors¶
# BAD: Swallowing errors
except Exception:
pass
# GOOD: Log and continue
except Exception as e:
logging.exception("Error processing request")
continue
Next Steps¶
- Pipelines: Multi-stage processing
- GPU Optimization: Performance tuning
- Testing: Comprehensive testing