ringkernel_core/
telemetry.rs

1//! Telemetry and metrics collection for kernel monitoring.
2//!
3//! This module provides structures for collecting performance metrics
4//! from GPU kernels, including throughput, latency, and error tracking.
5
6/// Telemetry buffer (64 bytes, cache-line aligned).
7///
8/// This structure is updated by the GPU kernel and read by the host
9/// for monitoring and debugging purposes.
10#[derive(Debug, Clone, Copy)]
11#[repr(C, align(64))]
12pub struct TelemetryBuffer {
13    /// Total messages processed successfully.
14    pub messages_processed: u64,
15    /// Total messages dropped (queue full, timeout, etc.).
16    pub messages_dropped: u64,
17    /// Sum of processing latencies in microseconds.
18    pub total_latency_us: u64,
19    /// Minimum processing latency in microseconds.
20    pub min_latency_us: u64,
21    /// Maximum processing latency in microseconds.
22    pub max_latency_us: u64,
23    /// Current input queue depth.
24    pub input_queue_depth: u32,
25    /// Current output queue depth.
26    pub output_queue_depth: u32,
27    /// Last error code (0 = no error).
28    pub last_error: u32,
29    /// Reserved for alignment (pad to 64 bytes).
30    pub _reserved: [u32; 3],
31}
32
33// Verify size at compile time
34const _: () = assert!(std::mem::size_of::<TelemetryBuffer>() == 64);
35
36impl TelemetryBuffer {
37    /// Create a new telemetry buffer.
38    pub const fn new() -> Self {
39        Self {
40            messages_processed: 0,
41            messages_dropped: 0,
42            total_latency_us: 0,
43            min_latency_us: u64::MAX,
44            max_latency_us: 0,
45            input_queue_depth: 0,
46            output_queue_depth: 0,
47            last_error: 0,
48            _reserved: [0; 3],
49        }
50    }
51
52    /// Calculate average latency in microseconds.
53    pub fn avg_latency_us(&self) -> f64 {
54        if self.messages_processed == 0 {
55            0.0
56        } else {
57            self.total_latency_us as f64 / self.messages_processed as f64
58        }
59    }
60
61    /// Get throughput (messages per second) given elapsed time.
62    pub fn throughput(&self, elapsed_secs: f64) -> f64 {
63        if elapsed_secs <= 0.0 {
64            0.0
65        } else {
66            self.messages_processed as f64 / elapsed_secs
67        }
68    }
69
70    /// Get drop rate (0.0 to 1.0).
71    pub fn drop_rate(&self) -> f64 {
72        let total = self.messages_processed + self.messages_dropped;
73        if total == 0 {
74            0.0
75        } else {
76            self.messages_dropped as f64 / total as f64
77        }
78    }
79
80    /// Reset all counters to initial state.
81    pub fn reset(&mut self) {
82        *self = Self::new();
83    }
84
85    /// Merge another telemetry buffer into this one.
86    pub fn merge(&mut self, other: &TelemetryBuffer) {
87        self.messages_processed += other.messages_processed;
88        self.messages_dropped += other.messages_dropped;
89        self.total_latency_us += other.total_latency_us;
90        self.min_latency_us = self.min_latency_us.min(other.min_latency_us);
91        self.max_latency_us = self.max_latency_us.max(other.max_latency_us);
92        // Queue depths are point-in-time, take latest
93        self.input_queue_depth = other.input_queue_depth;
94        self.output_queue_depth = other.output_queue_depth;
95        // Last error takes the most recent non-zero
96        if other.last_error != 0 {
97            self.last_error = other.last_error;
98        }
99    }
100}
101
102impl Default for TelemetryBuffer {
103    fn default() -> Self {
104        Self::new()
105    }
106}
107
108/// Extended metrics for detailed monitoring.
109#[derive(Debug, Clone)]
110pub struct KernelMetrics {
111    /// Basic telemetry from GPU.
112    pub telemetry: TelemetryBuffer,
113
114    /// Kernel identifier.
115    pub kernel_id: String,
116
117    /// Timestamp when metrics were collected.
118    pub collected_at: std::time::Instant,
119
120    /// Time since kernel was launched.
121    pub uptime: std::time::Duration,
122
123    /// Number of kernel invocations (for event-driven mode).
124    pub invocations: u64,
125
126    /// Total bytes transferred to device.
127    pub bytes_to_device: u64,
128
129    /// Total bytes transferred from device.
130    pub bytes_from_device: u64,
131
132    /// GPU memory usage in bytes.
133    pub gpu_memory_used: u64,
134
135    /// Host memory usage in bytes.
136    pub host_memory_used: u64,
137}
138
139impl Default for KernelMetrics {
140    fn default() -> Self {
141        Self {
142            telemetry: TelemetryBuffer::default(),
143            kernel_id: String::new(),
144            collected_at: std::time::Instant::now(),
145            uptime: std::time::Duration::ZERO,
146            invocations: 0,
147            bytes_to_device: 0,
148            bytes_from_device: 0,
149            gpu_memory_used: 0,
150            host_memory_used: 0,
151        }
152    }
153}
154
155impl KernelMetrics {
156    /// Create new metrics for a kernel.
157    pub fn new(kernel_id: impl Into<String>) -> Self {
158        Self {
159            kernel_id: kernel_id.into(),
160            ..Default::default()
161        }
162    }
163
164    /// Calculate transfer bandwidth (bytes/sec).
165    pub fn transfer_bandwidth(&self) -> f64 {
166        let total_bytes = self.bytes_to_device + self.bytes_from_device;
167        let secs = self.uptime.as_secs_f64();
168        if secs > 0.0 {
169            total_bytes as f64 / secs
170        } else {
171            0.0
172        }
173    }
174
175    /// Get summary as a formatted string.
176    pub fn summary(&self) -> String {
177        format!(
178            "Kernel {} - Processed: {}, Dropped: {}, Avg Latency: {:.2}µs, Throughput: {:.2}/s",
179            self.kernel_id,
180            self.telemetry.messages_processed,
181            self.telemetry.messages_dropped,
182            self.telemetry.avg_latency_us(),
183            self.telemetry.throughput(self.uptime.as_secs_f64())
184        )
185    }
186}
187
188/// Histogram for latency distribution.
189#[derive(Debug, Clone)]
190pub struct LatencyHistogram {
191    /// Bucket boundaries in microseconds.
192    pub buckets: Vec<u64>,
193    /// Counts for each bucket.
194    pub counts: Vec<u64>,
195    /// Count of values above last bucket.
196    pub overflow: u64,
197}
198
199impl LatencyHistogram {
200    /// Create a new histogram with default buckets.
201    pub fn new() -> Self {
202        // Default buckets: 1µs, 10µs, 100µs, 1ms, 10ms, 100ms, 1s
203        Self::with_buckets(vec![1, 10, 100, 1_000, 10_000, 100_000, 1_000_000])
204    }
205
206    /// Create with custom bucket boundaries.
207    pub fn with_buckets(buckets: Vec<u64>) -> Self {
208        let counts = vec![0; buckets.len()];
209        Self {
210            buckets,
211            counts,
212            overflow: 0,
213        }
214    }
215
216    /// Record a latency value.
217    pub fn record(&mut self, value_us: u64) {
218        for (i, &boundary) in self.buckets.iter().enumerate() {
219            if value_us <= boundary {
220                self.counts[i] += 1;
221                return;
222            }
223        }
224        self.overflow += 1;
225    }
226
227    /// Get total count.
228    pub fn total(&self) -> u64 {
229        self.counts.iter().sum::<u64>() + self.overflow
230    }
231
232    /// Get percentile value.
233    pub fn percentile(&self, p: f64) -> u64 {
234        let total = self.total();
235        if total == 0 {
236            return 0;
237        }
238
239        let target = (total as f64 * p / 100.0).ceil() as u64;
240        let mut cumulative = 0u64;
241
242        for (i, &count) in self.counts.iter().enumerate() {
243            cumulative += count;
244            if cumulative >= target {
245                return self.buckets[i];
246            }
247        }
248
249        // Return last bucket boundary + 1 for overflow
250        self.buckets.last().map(|b| b + 1).unwrap_or(0)
251    }
252}
253
254impl Default for LatencyHistogram {
255    fn default() -> Self {
256        Self::new()
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263
264    #[test]
265    fn test_telemetry_buffer_size() {
266        assert_eq!(std::mem::size_of::<TelemetryBuffer>(), 64);
267    }
268
269    #[test]
270    fn test_avg_latency() {
271        let mut tb = TelemetryBuffer::new();
272        assert_eq!(tb.avg_latency_us(), 0.0);
273
274        tb.messages_processed = 10;
275        tb.total_latency_us = 1000;
276        assert_eq!(tb.avg_latency_us(), 100.0);
277    }
278
279    #[test]
280    fn test_throughput() {
281        let mut tb = TelemetryBuffer::new();
282        tb.messages_processed = 1000;
283
284        assert_eq!(tb.throughput(1.0), 1000.0);
285        assert_eq!(tb.throughput(2.0), 500.0);
286        assert_eq!(tb.throughput(0.0), 0.0);
287    }
288
289    #[test]
290    fn test_drop_rate() {
291        let mut tb = TelemetryBuffer::new();
292        tb.messages_processed = 90;
293        tb.messages_dropped = 10;
294
295        assert!((tb.drop_rate() - 0.1).abs() < 0.001);
296    }
297
298    #[test]
299    fn test_merge() {
300        let mut tb1 = TelemetryBuffer::new();
301        tb1.messages_processed = 100;
302        tb1.min_latency_us = 10;
303        tb1.max_latency_us = 100;
304
305        let mut tb2 = TelemetryBuffer::new();
306        tb2.messages_processed = 50;
307        tb2.min_latency_us = 5;
308        tb2.max_latency_us = 200;
309
310        tb1.merge(&tb2);
311
312        assert_eq!(tb1.messages_processed, 150);
313        assert_eq!(tb1.min_latency_us, 5);
314        assert_eq!(tb1.max_latency_us, 200);
315    }
316
317    #[test]
318    fn test_histogram_percentile() {
319        let mut hist = LatencyHistogram::with_buckets(vec![10, 50, 100, 500]);
320
321        // Record some values
322        for _ in 0..80 {
323            hist.record(5); // <= 10
324        }
325        for _ in 0..15 {
326            hist.record(30); // <= 50
327        }
328        for _ in 0..5 {
329            hist.record(200); // <= 500
330        }
331
332        assert_eq!(hist.percentile(50.0), 10); // p50 = 10µs bucket
333        assert_eq!(hist.percentile(90.0), 50); // p90 = 50µs bucket
334        assert_eq!(hist.percentile(99.0), 500); // p99 = 500µs bucket
335    }
336}