1use parking_lot::RwLock;
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::sync::broadcast;
12
13use crate::runtime::KernelId;
14use crate::telemetry::{KernelMetrics, LatencyHistogram, TelemetryBuffer};
15
16#[derive(Debug, Clone)]
18pub struct TelemetryConfig {
19 pub collection_interval_ms: u64,
21 pub max_history_samples: usize,
23 pub channel_buffer_size: usize,
25 pub enable_histograms: bool,
27 pub drop_rate_alert_threshold: f64,
29 pub latency_alert_threshold_us: u64,
31}
32
33impl Default for TelemetryConfig {
34 fn default() -> Self {
35 Self {
36 collection_interval_ms: 100,
37 max_history_samples: 1000,
38 channel_buffer_size: 256,
39 enable_histograms: true,
40 drop_rate_alert_threshold: 0.01, latency_alert_threshold_us: 10_000, }
43 }
44}
45
46#[derive(Debug, Clone)]
48pub enum TelemetryEvent {
49 MetricsSnapshot(MetricsSnapshot),
51 Alert(TelemetryAlert),
53 KernelStateChange {
55 kernel_id: KernelId,
57 previous: String,
59 new: String,
61 },
62}
63
64#[derive(Debug, Clone)]
66pub struct MetricsSnapshot {
67 pub timestamp: Instant,
69 pub kernel_metrics: HashMap<KernelId, KernelMetrics>,
71 pub aggregate: AggregateMetrics,
73}
74
75#[derive(Debug, Clone, Default)]
77pub struct AggregateMetrics {
78 pub total_messages_processed: u64,
80 pub total_messages_dropped: u64,
82 pub avg_latency_us: f64,
84 pub min_latency_us: u64,
86 pub max_latency_us: u64,
88 pub throughput: f64,
90 pub active_kernels: usize,
92 pub total_gpu_memory: u64,
94}
95
96#[derive(Debug, Clone)]
98pub struct TelemetryAlert {
99 pub severity: AlertSeverity,
101 pub alert_type: AlertType,
103 pub message: String,
105 pub kernel_id: Option<KernelId>,
107 pub timestamp: Instant,
109}
110
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
113pub enum AlertSeverity {
114 Info,
116 Warning,
118 Error,
120 Critical,
122}
123
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
126pub enum AlertType {
127 HighDropRate,
129 HighLatency,
131 QueueNearFull,
133 KernelError,
135 MemoryPressure,
137}
138
139pub struct TelemetryPipeline {
143 config: TelemetryConfig,
145 running: Arc<AtomicBool>,
147 sender: broadcast::Sender<TelemetryEvent>,
149 sources: Arc<RwLock<HashMap<KernelId, Arc<dyn MetricsSource>>>>,
151 history: Arc<RwLock<Vec<MetricsSnapshot>>>,
153 start_time: Instant,
155 #[allow(dead_code)]
157 sequence: AtomicU64,
158}
159
160pub trait MetricsSource: Send + Sync {
162 fn get_metrics(&self) -> KernelMetrics;
164
165 fn kernel_id(&self) -> &KernelId;
167
168 fn is_active(&self) -> bool;
170}
171
172impl TelemetryPipeline {
173 pub fn new(config: TelemetryConfig) -> Self {
175 let (sender, _) = broadcast::channel(config.channel_buffer_size);
176
177 Self {
178 config,
179 running: Arc::new(AtomicBool::new(false)),
180 sender,
181 sources: Arc::new(RwLock::new(HashMap::new())),
182 history: Arc::new(RwLock::new(Vec::new())),
183 start_time: Instant::now(),
184 sequence: AtomicU64::new(0),
185 }
186 }
187
188 pub fn subscribe(&self) -> broadcast::Receiver<TelemetryEvent> {
190 self.sender.subscribe()
191 }
192
193 pub fn register_source(&self, source: Arc<dyn MetricsSource>) {
195 let kernel_id = source.kernel_id().clone();
196 self.sources.write().insert(kernel_id, source);
197 }
198
199 pub fn unregister_source(&self, kernel_id: &KernelId) {
201 self.sources.write().remove(kernel_id);
202 }
203
204 pub fn start(&self) -> tokio::task::JoinHandle<()> {
206 self.running.store(true, Ordering::Release);
207
208 let running = Arc::clone(&self.running);
209 let sources = Arc::clone(&self.sources);
210 let history = Arc::clone(&self.history);
211 let sender = self.sender.clone();
212 let config = self.config.clone();
213 let start_time = self.start_time;
214
215 tokio::spawn(async move {
216 let interval = Duration::from_millis(config.collection_interval_ms);
217
218 while running.load(Ordering::Acquire) {
219 let snapshot = Self::collect_snapshot(&sources, start_time, &config);
221
222 let alerts = Self::check_alerts(&snapshot, &config);
224
225 {
227 let mut hist = history.write();
228 hist.push(snapshot.clone());
229 if hist.len() > config.max_history_samples {
230 hist.remove(0);
231 }
232 }
233
234 let _ = sender.send(TelemetryEvent::MetricsSnapshot(snapshot));
236
237 for alert in alerts {
239 let _ = sender.send(TelemetryEvent::Alert(alert));
240 }
241
242 tokio::time::sleep(interval).await;
243 }
244 })
245 }
246
247 pub fn stop(&self) {
249 self.running.store(false, Ordering::Release);
250 }
251
252 pub fn latest_snapshot(&self) -> Option<MetricsSnapshot> {
254 self.history.read().last().cloned()
255 }
256
257 pub fn history(&self) -> Vec<MetricsSnapshot> {
259 self.history.read().clone()
260 }
261
262 pub fn aggregate_over(&self, duration: Duration) -> Option<AggregateMetrics> {
264 let history = self.history.read();
265 let cutoff = Instant::now() - duration;
266
267 let relevant: Vec<_> = history.iter().filter(|s| s.timestamp >= cutoff).collect();
268
269 if relevant.is_empty() {
270 return None;
271 }
272
273 let mut aggregate = AggregateMetrics::default();
274
275 for snapshot in &relevant {
276 aggregate.total_messages_processed += snapshot.aggregate.total_messages_processed;
277 aggregate.total_messages_dropped += snapshot.aggregate.total_messages_dropped;
278 aggregate.min_latency_us = aggregate
279 .min_latency_us
280 .min(snapshot.aggregate.min_latency_us);
281 aggregate.max_latency_us = aggregate
282 .max_latency_us
283 .max(snapshot.aggregate.max_latency_us);
284 }
285
286 let count = relevant.len() as f64;
288 aggregate.avg_latency_us = relevant
289 .iter()
290 .map(|s| s.aggregate.avg_latency_us)
291 .sum::<f64>()
292 / count;
293 aggregate.throughput = relevant.iter().map(|s| s.aggregate.throughput).sum::<f64>() / count;
294 aggregate.active_kernels = relevant
295 .last()
296 .map(|s| s.aggregate.active_kernels)
297 .unwrap_or(0);
298 aggregate.total_gpu_memory = relevant
299 .last()
300 .map(|s| s.aggregate.total_gpu_memory)
301 .unwrap_or(0);
302
303 Some(aggregate)
304 }
305
306 fn collect_snapshot(
308 sources: &RwLock<HashMap<KernelId, Arc<dyn MetricsSource>>>,
309 start_time: Instant,
310 _config: &TelemetryConfig,
311 ) -> MetricsSnapshot {
312 let sources = sources.read();
313 let mut kernel_metrics = HashMap::new();
314 let mut aggregate = AggregateMetrics::default();
315
316 let elapsed = start_time.elapsed().as_secs_f64();
317
318 for (kernel_id, source) in sources.iter() {
319 if source.is_active() {
320 aggregate.active_kernels += 1;
321 }
322
323 let metrics = source.get_metrics();
324
325 aggregate.total_messages_processed += metrics.telemetry.messages_processed;
326 aggregate.total_messages_dropped += metrics.telemetry.messages_dropped;
327 aggregate.total_gpu_memory += metrics.gpu_memory_used;
328
329 if metrics.telemetry.min_latency_us < aggregate.min_latency_us
330 || aggregate.min_latency_us == 0
331 {
332 aggregate.min_latency_us = metrics.telemetry.min_latency_us;
333 }
334 if metrics.telemetry.max_latency_us > aggregate.max_latency_us {
335 aggregate.max_latency_us = metrics.telemetry.max_latency_us;
336 }
337
338 kernel_metrics.insert(kernel_id.clone(), metrics);
339 }
340
341 if !kernel_metrics.is_empty() {
343 aggregate.avg_latency_us = kernel_metrics
344 .values()
345 .map(|m| m.telemetry.avg_latency_us())
346 .sum::<f64>()
347 / kernel_metrics.len() as f64;
348
349 if elapsed > 0.0 {
350 aggregate.throughput = aggregate.total_messages_processed as f64 / elapsed;
351 }
352 }
353
354 MetricsSnapshot {
355 timestamp: Instant::now(),
356 kernel_metrics,
357 aggregate,
358 }
359 }
360
361 fn check_alerts(snapshot: &MetricsSnapshot, config: &TelemetryConfig) -> Vec<TelemetryAlert> {
363 let mut alerts = Vec::new();
364
365 for (kernel_id, metrics) in &snapshot.kernel_metrics {
366 let drop_rate = metrics.telemetry.drop_rate();
368 if drop_rate > config.drop_rate_alert_threshold {
369 alerts.push(TelemetryAlert {
370 severity: if drop_rate > 0.1 {
371 AlertSeverity::Critical
372 } else if drop_rate > 0.05 {
373 AlertSeverity::Error
374 } else {
375 AlertSeverity::Warning
376 },
377 alert_type: AlertType::HighDropRate,
378 message: format!(
379 "Kernel {} drop rate is {:.2}%",
380 kernel_id,
381 drop_rate * 100.0
382 ),
383 kernel_id: Some(kernel_id.clone()),
384 timestamp: Instant::now(),
385 });
386 }
387
388 let avg_latency = metrics.telemetry.avg_latency_us() as u64;
390 if avg_latency > config.latency_alert_threshold_us {
391 alerts.push(TelemetryAlert {
392 severity: if avg_latency > config.latency_alert_threshold_us * 10 {
393 AlertSeverity::Critical
394 } else if avg_latency > config.latency_alert_threshold_us * 5 {
395 AlertSeverity::Error
396 } else {
397 AlertSeverity::Warning
398 },
399 alert_type: AlertType::HighLatency,
400 message: format!("Kernel {} average latency is {}µs", kernel_id, avg_latency),
401 kernel_id: Some(kernel_id.clone()),
402 timestamp: Instant::now(),
403 });
404 }
405
406 if metrics.telemetry.last_error != 0 {
408 alerts.push(TelemetryAlert {
409 severity: AlertSeverity::Error,
410 alert_type: AlertType::KernelError,
411 message: format!(
412 "Kernel {} reported error code {}",
413 kernel_id, metrics.telemetry.last_error
414 ),
415 kernel_id: Some(kernel_id.clone()),
416 timestamp: Instant::now(),
417 });
418 }
419 }
420
421 alerts
422 }
423}
424
425#[derive(Default)]
427pub struct MetricsCollector {
428 kernel_telemetry: RwLock<HashMap<KernelId, TelemetryBuffer>>,
430 kernel_histograms: RwLock<HashMap<KernelId, LatencyHistogram>>,
432 start_time: RwLock<Option<Instant>>,
434}
435
436impl MetricsCollector {
437 pub fn new() -> Self {
439 Self {
440 kernel_telemetry: RwLock::new(HashMap::new()),
441 kernel_histograms: RwLock::new(HashMap::new()),
442 start_time: RwLock::new(Some(Instant::now())),
443 }
444 }
445
446 pub fn record_message_processed(&self, kernel_id: &KernelId, latency_us: u64) {
448 let mut telemetry = self.kernel_telemetry.write();
449 let entry = telemetry.entry(kernel_id.clone()).or_default();
450
451 entry.messages_processed += 1;
452 entry.total_latency_us += latency_us;
453 entry.min_latency_us = entry.min_latency_us.min(latency_us);
454 entry.max_latency_us = entry.max_latency_us.max(latency_us);
455
456 let mut histograms = self.kernel_histograms.write();
458 let histogram = histograms.entry(kernel_id.clone()).or_default();
459 histogram.record(latency_us);
460 }
461
462 pub fn record_message_dropped(&self, kernel_id: &KernelId) {
464 let mut telemetry = self.kernel_telemetry.write();
465 let entry = telemetry.entry(kernel_id.clone()).or_default();
466 entry.messages_dropped += 1;
467 }
468
469 pub fn record_error(&self, kernel_id: &KernelId, error_code: u32) {
471 let mut telemetry = self.kernel_telemetry.write();
472 let entry = telemetry.entry(kernel_id.clone()).or_default();
473 entry.last_error = error_code;
474 }
475
476 pub fn get_telemetry(&self, kernel_id: &KernelId) -> Option<TelemetryBuffer> {
478 self.kernel_telemetry.read().get(kernel_id).copied()
479 }
480
481 pub fn get_histogram(&self, kernel_id: &KernelId) -> Option<LatencyHistogram> {
483 self.kernel_histograms.read().get(kernel_id).cloned()
484 }
485
486 pub fn get_aggregate(&self) -> TelemetryBuffer {
488 let telemetry = self.kernel_telemetry.read();
489 let mut aggregate = TelemetryBuffer::new();
490
491 for buffer in telemetry.values() {
492 aggregate.merge(buffer);
493 }
494
495 aggregate
496 }
497
498 pub fn reset(&self) {
500 self.kernel_telemetry.write().clear();
501 self.kernel_histograms.write().clear();
502 *self.start_time.write() = Some(Instant::now());
503 }
504
505 pub fn elapsed(&self) -> Duration {
507 self.start_time
508 .read()
509 .map(|t| t.elapsed())
510 .unwrap_or_default()
511 }
512}
513
514#[cfg(test)]
515mod tests {
516 use super::*;
517
518 #[test]
519 fn test_telemetry_config_default() {
520 let config = TelemetryConfig::default();
521 assert_eq!(config.collection_interval_ms, 100);
522 assert_eq!(config.max_history_samples, 1000);
523 }
524
525 #[test]
526 fn test_metrics_collector() {
527 let collector = MetricsCollector::new();
528 let kernel_id = KernelId::new("test");
529
530 collector.record_message_processed(&kernel_id, 100);
531 collector.record_message_processed(&kernel_id, 200);
532 collector.record_message_dropped(&kernel_id);
533
534 let telemetry = collector.get_telemetry(&kernel_id).unwrap();
535 assert_eq!(telemetry.messages_processed, 2);
536 assert_eq!(telemetry.messages_dropped, 1);
537 assert_eq!(telemetry.min_latency_us, 100);
538 assert_eq!(telemetry.max_latency_us, 200);
539 }
540
541 #[test]
542 fn test_aggregate_metrics() {
543 let collector = MetricsCollector::new();
544
545 let kernel1 = KernelId::new("kernel1");
546 let kernel2 = KernelId::new("kernel2");
547
548 collector.record_message_processed(&kernel1, 100);
549 collector.record_message_processed(&kernel2, 200);
550
551 let aggregate = collector.get_aggregate();
552 assert_eq!(aggregate.messages_processed, 2);
553 }
554}