@ring_kernel¶
Decorator for creating ring kernel actor functions.
Overview¶
The @ring_kernel decorator transforms an async function into a ring kernel actor. Actors are persistent functions that process messages in an infinite loop, communicating through queues.
from pydotcompute import ring_kernel
@ring_kernel(kernel_id="my_actor")
async def my_actor(ctx):
while not ctx.should_terminate:
msg = await ctx.receive()
result = process(msg)
await ctx.send(result)
Decorator Signature¶
def ring_kernel(
*,
kernel_id: str,
input_type: type | None = None,
output_type: type | None = None,
queue_size: int = 1000,
backpressure: BackpressureStrategy = BackpressureStrategy.BLOCK,
auto_register: bool = True,
) -> Callable[[F], F]:
"""
Decorator for ring kernel actor functions.
Args:
kernel_id: Unique identifier for the kernel
input_type: Expected input message type
output_type: Expected output message type
queue_size: Size of input/output queues
backpressure: Strategy when queues are full
auto_register: Auto-register with runtime
Returns:
Decorated actor function
"""
Parameters¶
kernel_id¶
Unique identifier for the kernel. Used for:
- Launching and managing the kernel
- Sending/receiving messages
- Telemetry and logging
input_type¶
Expected type of input messages. If provided, enables runtime type checking.
output_type¶
Expected type of output messages. If provided, enables runtime type checking.
queue_size¶
Maximum size of input and output queues. Larger queues buffer more messages but use more memory.
backpressure¶
How to handle full queues:
BLOCK: Wait for space (default)REJECT: Raise error immediatelyDROP_OLDEST: Drop oldest message
auto_register¶
Whether to automatically register with the global runtime. Set to False for testing or manual control.
Actor Function Signature¶
async def my_actor(ctx: KernelContext[TIn, TOut]) -> None:
"""
Actor function signature.
Args:
ctx: Context for receiving/sending messages and checking state
"""
Usage Examples¶
Basic Actor¶
from pydotcompute import ring_kernel, message
from dataclasses import dataclass
@message
@dataclass
class Request:
value: int
@message
@dataclass
class Response:
result: int
@ring_kernel(kernel_id="doubler")
async def doubler(ctx):
while not ctx.should_terminate:
if not ctx.is_active:
await ctx.wait_active()
continue
try:
request = await ctx.receive(timeout=0.1)
response = Response(result=request.value * 2)
await ctx.send(response)
except:
continue
With Type Safety¶
@ring_kernel(
kernel_id="calculator",
input_type=CalculationRequest,
output_type=CalculationResponse,
)
async def calculator(ctx: KernelContext[CalculationRequest, CalculationResponse]):
while not ctx.should_terminate:
if not ctx.is_active:
await ctx.wait_active()
continue
try:
req = await ctx.receive(timeout=0.1)
# req is typed as CalculationRequest
result = calculate(req.a, req.b, req.operation)
await ctx.send(CalculationResponse(result=result))
except:
continue
Custom Queue Configuration¶
@ring_kernel(
kernel_id="high_throughput",
queue_size=10000,
backpressure=BackpressureStrategy.DROP_OLDEST,
)
async def high_throughput_actor(ctx):
"""Actor for high-volume streaming data."""
while not ctx.should_terminate:
try:
msg = await ctx.receive(timeout=0.01)
await ctx.send(process(msg))
except:
continue
Without Auto-Registration¶
@ring_kernel(kernel_id="test_actor", auto_register=False)
async def test_actor(ctx):
while not ctx.should_terminate:
try:
msg = await ctx.receive(timeout=0.1)
await ctx.send(msg)
except:
continue
# Must manually provide function to launch
async with RingKernelRuntime() as runtime:
await runtime.launch("test_actor", test_actor)
Error Handling¶
@ring_kernel(kernel_id="resilient")
async def resilient_actor(ctx):
while not ctx.should_terminate:
if not ctx.is_active:
await ctx.wait_active()
continue
try:
request = await ctx.receive(timeout=0.1)
try:
result = risky_operation(request)
await ctx.send(SuccessResponse(data=result))
except ValueError as e:
await ctx.send(ErrorResponse(error=str(e)))
except Exception as e:
await ctx.send(ErrorResponse(error="Internal error"))
except asyncio.TimeoutError:
continue
Stateful Actor¶
@ring_kernel(kernel_id="counter")
async def counter_actor(ctx):
"""Actor that maintains state across messages."""
count = 0
while not ctx.should_terminate:
if not ctx.is_active:
await ctx.wait_active()
continue
try:
msg = await ctx.receive(timeout=0.1)
if msg.action == "increment":
count += 1
elif msg.action == "decrement":
count -= 1
elif msg.action == "reset":
count = 0
await ctx.send(CountResponse(count=count))
except:
continue
Actor Loop Pattern¶
The recommended pattern for all actors:
@ring_kernel(kernel_id="template")
async def template_actor(ctx):
# Optional: One-time initialization
print(f"[{ctx.kernel_id}] Starting...")
while not ctx.should_terminate:
# Handle deactivation
if not ctx.is_active:
await ctx.wait_active()
continue
try:
# Receive with timeout for responsive shutdown
msg = await ctx.receive(timeout=0.1)
# Process message
result = process(msg)
# Send response
await ctx.send(result)
except asyncio.TimeoutError:
# No message - loop back to check termination
continue
except Exception as e:
# Log error but don't crash
print(f"Error: {e}")
continue
# Optional: Cleanup
print(f"[{ctx.kernel_id}] Terminated")
Best Practices¶
-
Always Check Termination: Use
while not ctx.should_terminate -
Handle Deactivation: Check
is_activeand usewait_active() -
Use Timeout: Allows responsive shutdown
-
Never Block Forever: Timeouts prevent actor from hanging
-
Error Recovery: Catch exceptions to prevent actor crash
-
Correlation IDs: Set
correlation_idin responses
Notes¶
- Actors run in asyncio tasks
- One actor instance per
kernel_id - State is private to the actor (no shared state)
- Use messages for all communication
- Actors are single-threaded (no internal concurrency)