1use std::future::Future;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11use async_trait::async_trait;
12
13use crate::error::Result;
14use crate::message::{MessageEnvelope, RingMessage};
15use crate::telemetry::KernelMetrics;
16use crate::types::KernelMode;
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20pub struct KernelId(pub String);
21
22impl KernelId {
23 pub fn new(id: impl Into<String>) -> Self {
25 Self(id.into())
26 }
27
28 pub fn as_str(&self) -> &str {
30 &self.0
31 }
32}
33
34impl std::fmt::Display for KernelId {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 write!(f, "{}", self.0)
37 }
38}
39
40impl From<&str> for KernelId {
41 fn from(s: &str) -> Self {
42 Self(s.to_string())
43 }
44}
45
46impl From<String> for KernelId {
47 fn from(s: String) -> Self {
48 Self(s)
49 }
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54pub enum KernelState {
55 Created,
57 Launched,
59 Active,
61 Deactivated,
63 Terminating,
65 Terminated,
67}
68
69impl KernelState {
70 pub fn can_activate(&self) -> bool {
72 matches!(self, Self::Launched | Self::Deactivated)
73 }
74
75 pub fn can_deactivate(&self) -> bool {
77 matches!(self, Self::Active)
78 }
79
80 pub fn can_terminate(&self) -> bool {
82 matches!(self, Self::Active | Self::Deactivated | Self::Launched)
83 }
84
85 pub fn is_running(&self) -> bool {
87 matches!(self, Self::Active)
88 }
89
90 pub fn is_finished(&self) -> bool {
92 matches!(self, Self::Terminated)
93 }
94}
95
96#[derive(Debug, Clone)]
98pub struct KernelStatus {
99 pub id: KernelId,
101 pub state: KernelState,
103 pub mode: KernelMode,
105 pub input_queue_depth: usize,
107 pub output_queue_depth: usize,
109 pub messages_processed: u64,
111 pub uptime: Duration,
113}
114
115#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
117pub enum Backend {
118 Cpu,
120 Cuda,
122 Metal,
124 Wgpu,
126 #[default]
128 Auto,
129}
130
131impl Backend {
132 pub fn name(&self) -> &'static str {
134 match self {
135 Backend::Cpu => "CPU",
136 Backend::Cuda => "CUDA",
137 Backend::Metal => "Metal",
138 Backend::Wgpu => "WebGPU",
139 Backend::Auto => "Auto",
140 }
141 }
142}
143
144impl std::fmt::Display for Backend {
145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146 write!(f, "{}", self.name())
147 }
148}
149
150#[derive(Debug, Clone)]
152pub struct LaunchOptions {
153 pub mode: KernelMode,
155 pub grid_size: u32,
157 pub block_size: u32,
159 pub input_queue_capacity: usize,
161 pub output_queue_capacity: usize,
163 pub shared_memory_size: usize,
165 pub auto_activate: bool,
167 pub cooperative: bool,
170 pub enable_k2k: bool,
173}
174
175impl Default for LaunchOptions {
176 fn default() -> Self {
177 Self {
178 mode: KernelMode::Persistent,
179 grid_size: 1,
180 block_size: 256,
181 input_queue_capacity: 1024,
182 output_queue_capacity: 1024,
183 shared_memory_size: 0,
184 auto_activate: true,
185 cooperative: false,
186 enable_k2k: false,
187 }
188 }
189}
190
191impl LaunchOptions {
192 pub fn single_block(block_size: u32) -> Self {
194 Self {
195 block_size,
196 ..Default::default()
197 }
198 }
199
200 pub fn multi_block(grid_size: u32, block_size: u32) -> Self {
202 Self {
203 grid_size,
204 block_size,
205 ..Default::default()
206 }
207 }
208
209 pub fn with_mode(mut self, mode: KernelMode) -> Self {
211 self.mode = mode;
212 self
213 }
214
215 pub fn with_queue_capacity(mut self, capacity: usize) -> Self {
217 self.input_queue_capacity = capacity;
218 self.output_queue_capacity = capacity;
219 self
220 }
221
222 pub fn with_shared_memory(mut self, size: usize) -> Self {
224 self.shared_memory_size = size;
225 self
226 }
227
228 pub fn without_auto_activate(mut self) -> Self {
230 self.auto_activate = false;
231 self
232 }
233
234 pub fn with_grid_size(mut self, grid_size: u32) -> Self {
236 self.grid_size = grid_size;
237 self
238 }
239
240 pub fn with_block_size(mut self, block_size: u32) -> Self {
242 self.block_size = block_size;
243 self
244 }
245
246 pub fn with_cooperative(mut self, cooperative: bool) -> Self {
252 self.cooperative = cooperative;
253 self
254 }
255
256 pub fn with_k2k(mut self, enable: bool) -> Self {
261 self.enable_k2k = enable;
262 self
263 }
264
265 pub fn with_priority(self, _priority: u8) -> Self {
269 self
271 }
272
273 pub fn with_input_queue_capacity(mut self, capacity: usize) -> Self {
275 self.input_queue_capacity = capacity;
276 self
277 }
278
279 pub fn with_output_queue_capacity(mut self, capacity: usize) -> Self {
281 self.output_queue_capacity = capacity;
282 self
283 }
284}
285
286pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
288
289#[async_trait]
294pub trait RingKernelRuntime: Send + Sync {
295 fn backend(&self) -> Backend;
297
298 fn is_backend_available(&self, backend: Backend) -> bool;
300
301 async fn launch(&self, kernel_id: &str, options: LaunchOptions) -> Result<KernelHandle>;
303
304 fn get_kernel(&self, kernel_id: &KernelId) -> Option<KernelHandle>;
306
307 fn list_kernels(&self) -> Vec<KernelId>;
309
310 fn metrics(&self) -> RuntimeMetrics;
312
313 async fn shutdown(&self) -> Result<()>;
315}
316
317#[derive(Clone)]
321pub struct KernelHandle {
322 id: KernelId,
324 inner: Arc<dyn KernelHandleInner>,
326}
327
328impl KernelHandle {
329 pub fn new(id: KernelId, inner: Arc<dyn KernelHandleInner>) -> Self {
331 Self { id, inner }
332 }
333
334 pub fn id(&self) -> &KernelId {
336 &self.id
337 }
338
339 pub async fn activate(&self) -> Result<()> {
341 self.inner.activate().await
342 }
343
344 pub async fn deactivate(&self) -> Result<()> {
346 self.inner.deactivate().await
347 }
348
349 pub async fn terminate(&self) -> Result<()> {
351 self.inner.terminate().await
352 }
353
354 pub async fn send<M: RingMessage>(&self, message: M) -> Result<()> {
356 let envelope = MessageEnvelope::new(
357 &message,
358 0, self.inner.kernel_id_num(),
360 self.inner.current_timestamp(),
361 );
362 self.inner.send_envelope(envelope).await
363 }
364
365 pub async fn send_envelope(&self, envelope: MessageEnvelope) -> Result<()> {
367 self.inner.send_envelope(envelope).await
368 }
369
370 pub async fn receive(&self) -> Result<MessageEnvelope> {
372 self.inner.receive().await
373 }
374
375 pub async fn receive_timeout(&self, timeout: Duration) -> Result<MessageEnvelope> {
377 self.inner.receive_timeout(timeout).await
378 }
379
380 pub fn try_receive(&self) -> Result<MessageEnvelope> {
382 self.inner.try_receive()
383 }
384
385 pub async fn call<M: RingMessage>(
387 &self,
388 message: M,
389 timeout: Duration,
390 ) -> Result<MessageEnvelope> {
391 let correlation = crate::message::CorrelationId::generate();
393
394 let mut envelope = MessageEnvelope::new(
396 &message,
397 0,
398 self.inner.kernel_id_num(),
399 self.inner.current_timestamp(),
400 );
401 envelope.header.correlation_id = correlation;
402
403 self.inner.send_envelope(envelope).await?;
405 self.inner.receive_correlated(correlation, timeout).await
406 }
407
408 pub fn status(&self) -> KernelStatus {
410 self.inner.status()
411 }
412
413 pub fn metrics(&self) -> KernelMetrics {
415 self.inner.metrics()
416 }
417
418 pub async fn wait(&self) -> Result<()> {
420 self.inner.wait().await
421 }
422
423 pub fn state(&self) -> KernelState {
427 self.status().state
428 }
429
430 pub async fn suspend(&self) -> Result<()> {
434 self.deactivate().await
435 }
436
437 pub async fn resume(&self) -> Result<()> {
441 self.activate().await
442 }
443
444 pub fn is_active(&self) -> bool {
446 self.state() == KernelState::Active
447 }
448
449 pub fn is_terminated(&self) -> bool {
451 self.state() == KernelState::Terminated
452 }
453}
454
455impl std::fmt::Debug for KernelHandle {
456 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
457 f.debug_struct("KernelHandle")
458 .field("id", &self.id)
459 .finish()
460 }
461}
462
463#[async_trait]
467pub trait KernelHandleInner: Send + Sync {
468 fn kernel_id_num(&self) -> u64;
470
471 fn current_timestamp(&self) -> crate::hlc::HlcTimestamp;
473
474 async fn activate(&self) -> Result<()>;
476
477 async fn deactivate(&self) -> Result<()>;
479
480 async fn terminate(&self) -> Result<()>;
482
483 async fn send_envelope(&self, envelope: MessageEnvelope) -> Result<()>;
485
486 async fn receive(&self) -> Result<MessageEnvelope>;
488
489 async fn receive_timeout(&self, timeout: Duration) -> Result<MessageEnvelope>;
491
492 fn try_receive(&self) -> Result<MessageEnvelope>;
494
495 async fn receive_correlated(
497 &self,
498 correlation: crate::message::CorrelationId,
499 timeout: Duration,
500 ) -> Result<MessageEnvelope>;
501
502 fn status(&self) -> KernelStatus;
504
505 fn metrics(&self) -> KernelMetrics;
507
508 async fn wait(&self) -> Result<()>;
510}
511
512#[derive(Debug, Clone, Default)]
514pub struct RuntimeMetrics {
515 pub active_kernels: usize,
517 pub total_launched: u64,
519 pub messages_sent: u64,
521 pub messages_received: u64,
523 pub gpu_memory_used: u64,
525 pub host_memory_used: u64,
527}
528
529#[derive(Debug, Clone)]
531pub struct RuntimeBuilder {
532 pub backend: Backend,
534 pub device_index: usize,
536 pub debug: bool,
538 pub profiling: bool,
540}
541
542impl Default for RuntimeBuilder {
543 fn default() -> Self {
544 Self {
545 backend: Backend::Auto,
546 device_index: 0,
547 debug: false,
548 profiling: false,
549 }
550 }
551}
552
553impl RuntimeBuilder {
554 pub fn new() -> Self {
556 Self::default()
557 }
558
559 pub fn backend(mut self, backend: Backend) -> Self {
561 self.backend = backend;
562 self
563 }
564
565 pub fn device(mut self, index: usize) -> Self {
567 self.device_index = index;
568 self
569 }
570
571 pub fn debug(mut self, enable: bool) -> Self {
573 self.debug = enable;
574 self
575 }
576
577 pub fn profiling(mut self, enable: bool) -> Self {
579 self.profiling = enable;
580 self
581 }
582}
583
584#[cfg(test)]
585mod tests {
586 use super::*;
587
588 #[test]
589 fn test_kernel_state_transitions() {
590 assert!(KernelState::Launched.can_activate());
591 assert!(KernelState::Deactivated.can_activate());
592 assert!(!KernelState::Active.can_activate());
593 assert!(!KernelState::Terminated.can_activate());
594
595 assert!(KernelState::Active.can_deactivate());
596 assert!(!KernelState::Launched.can_deactivate());
597
598 assert!(KernelState::Active.can_terminate());
599 assert!(KernelState::Deactivated.can_terminate());
600 assert!(!KernelState::Terminated.can_terminate());
601 }
602
603 #[test]
604 fn test_launch_options_builder() {
605 let opts = LaunchOptions::multi_block(4, 128)
606 .with_mode(KernelMode::EventDriven)
607 .with_queue_capacity(2048)
608 .with_shared_memory(4096)
609 .without_auto_activate();
610
611 assert_eq!(opts.grid_size, 4);
612 assert_eq!(opts.block_size, 128);
613 assert_eq!(opts.mode, KernelMode::EventDriven);
614 assert_eq!(opts.input_queue_capacity, 2048);
615 assert_eq!(opts.shared_memory_size, 4096);
616 assert!(!opts.auto_activate);
617 }
618
619 #[test]
620 fn test_kernel_id() {
621 let id1 = KernelId::new("test_kernel");
622 let id2: KernelId = "test_kernel".into();
623 assert_eq!(id1, id2);
624 assert_eq!(id1.as_str(), "test_kernel");
625 }
626
627 #[test]
628 fn test_backend_name() {
629 assert_eq!(Backend::Cpu.name(), "CPU");
630 assert_eq!(Backend::Cuda.name(), "CUDA");
631 assert_eq!(Backend::Metal.name(), "Metal");
632 assert_eq!(Backend::Wgpu.name(), "WebGPU");
633 }
634}