Message Passing¶
How ring kernels communicate through typed messages.
Overview¶
Ring kernels communicate exclusively through message passing. Messages are immutable data objects that flow through priority queues between the application and actors.
@message
@dataclass
class WorkRequest:
data: list[float]
operation: str = "sum"
# Send message to actor
await runtime.send("worker", WorkRequest(data=[1, 2, 3]))
# Receive response from actor
response = await runtime.receive("worker")
Why Message Passing?¶
Thread Safety¶
Messages are copied between queues, eliminating shared state:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Producer │ copy │ Queue │ copy │ Actor │
│ │ ──────► │ │ ──────► │ │
│ (owns msg) │ │ (owns copy) │ │ (owns copy) │
└─────────────┘ └─────────────┘ └─────────────┘
Decoupling¶
Producers and consumers are decoupled:
- Actors don't know who sends messages
- Producers don't know actor internals
- Either side can be replaced independently
Asynchrony¶
Message queues enable async processing:
- Producers don't block waiting for actors
- Actors process at their own pace
- Backpressure handled by queue strategies
Message Structure¶
Required Fields¶
Every message has these fields (added by @message):
@message
@dataclass
class MyMessage:
# Your fields
data: str
# Added automatically:
message_id: UUID # Unique identifier
priority: int = 128 # Processing priority (0-255)
correlation_id: UUID | None = None # Links responses to requests
Message ID¶
Every message has a unique ID for:
- Tracking and logging
- Deduplication
- Correlation
Priority¶
Priority determines processing order (higher = first):
| Range | Level | Use Case |
|---|---|---|
| 0-63 | Low | Background tasks |
| 64-127 | Below Normal | Batch processing |
| 128 | Normal | Default |
| 129-191 | Above Normal | Important requests |
| 192-255 | High | Urgent/real-time |
# High priority request
urgent = WorkRequest(data=[1, 2, 3], priority=255)
# Low priority background task
background = WorkRequest(data=[1, 2, 3], priority=32)
Correlation ID¶
Link responses to their requests:
# In actor
request = await ctx.receive()
response = WorkResponse(
result=compute(request),
correlation_id=request.message_id, # Link to request
)
await ctx.send(response)
# In client
request = WorkRequest(data=[1, 2, 3])
await runtime.send("worker", request)
response = await runtime.receive("worker")
assert response.correlation_id == request.message_id
Serialization¶
Messages are serialized using msgpack for efficient transfer:
msg = WorkRequest(data=[1.0, 2.0, 3.0])
# Serialize (internal)
data = msg.serialize() # bytes
# Deserialize (internal)
restored = WorkRequest.deserialize(data)
Supported Types¶
| Type | Serialized As |
|---|---|
int |
Integer |
float |
Float |
str |
String |
bool |
Boolean |
bytes |
Binary |
None |
Nil |
list |
Array |
tuple |
Array |
dict |
Map |
UUID |
Binary (16 bytes) |
datetime |
ISO string |
Nested @message |
Map |
Large Data¶
For large data, use UnifiedBuffer instead of message fields:
# Don't do this for large arrays
@message
@dataclass
class BadRequest:
huge_array: list[float] # Slow to serialize!
# Do this instead
@message
@dataclass
class GoodRequest:
buffer_id: str # Reference to UnifiedBuffer
size: int
Request-Response Pattern¶
The most common pattern:
@message
@dataclass
class CalculateRequest:
expression: str
message_id: UUID = field(default_factory=uuid4)
priority: int = 128
correlation_id: UUID | None = None
@message
@dataclass
class CalculateResponse:
result: float
success: bool = True
error: str | None = None
message_id: UUID = field(default_factory=uuid4)
priority: int = 128
correlation_id: UUID | None = None
# Usage
request = CalculateRequest(expression="2 + 2")
await runtime.send("calculator", request)
response = await runtime.receive("calculator", timeout=5.0)
if response.success:
print(f"Result: {response.result}")
else:
print(f"Error: {response.error}")
Fire-and-Forget Pattern¶
When you don't need a response:
@message
@dataclass
class LogEvent:
level: str
message: str
timestamp: float = field(default_factory=time.time)
# Send without waiting for response
await runtime.send("logger", LogEvent(level="INFO", message="Started"))
# Continue immediately
Pub-Sub Pattern¶
One producer, multiple consumers:
# Publisher sends to topic
await runtime.send("topic_processor", Event(data="update"))
# Topic processor broadcasts to subscribers
@ring_kernel(kernel_id="topic_processor")
async def topic_processor(ctx):
subscribers = ["sub1", "sub2", "sub3"]
while not ctx.should_terminate:
event = await ctx.receive(timeout=0.1)
for sub in subscribers:
await forward_to(sub, event)
Pipeline Pattern¶
Chain of actors:
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ Input │ ──► │ Stage 1 │ ──► │ Stage 2 │ ──► │ Stage 3 │
└─────────┘ └─────────┘ └─────────┘ └─────────┘
# Send to first stage
await runtime.send("stage1", RawData(values=[1, 2, 3]))
# Connect stages in actor
@ring_kernel(kernel_id="stage1")
async def stage1(ctx):
while not ctx.should_terminate:
raw = await ctx.receive(timeout=0.1)
processed = preprocess(raw)
# Forward to next stage
await runtime.send("stage2", processed)
await ctx.send(Ack()) # Acknowledge to caller
Error Handling¶
Include error information in responses:
@message
@dataclass
class Response:
success: bool = True
error: str | None = None
error_code: int | None = None
result: Any = None
# In actor
try:
result = risky_operation(request)
await ctx.send(Response(success=True, result=result))
except ValidationError as e:
await ctx.send(Response(success=False, error=str(e), error_code=400))
except Exception as e:
await ctx.send(Response(success=False, error="Internal error", error_code=500))
Best Practices¶
-
Keep Messages Small: Large data should use buffers
-
Include Error Fields: Always handle failure cases
-
Use Correlation IDs: Essential for request-response
-
Set Appropriate Priority: Don't abuse high priority
-
Design for Immutability: Messages shouldn't be modified
-
Version Messages: Consider backwards compatibility
Next Steps¶
- Memory Management: Handling large data
- Lifecycle: State transitions