ringkernel_core/
telemetry.rs1#[derive(Debug, Clone, Copy)]
11#[repr(C, align(64))]
12pub struct TelemetryBuffer {
13 pub messages_processed: u64,
15 pub messages_dropped: u64,
17 pub total_latency_us: u64,
19 pub min_latency_us: u64,
21 pub max_latency_us: u64,
23 pub input_queue_depth: u32,
25 pub output_queue_depth: u32,
27 pub last_error: u32,
29 pub _reserved: [u32; 3],
31}
32
33const _: () = assert!(std::mem::size_of::<TelemetryBuffer>() == 64);
35
36impl TelemetryBuffer {
37 pub const fn new() -> Self {
39 Self {
40 messages_processed: 0,
41 messages_dropped: 0,
42 total_latency_us: 0,
43 min_latency_us: u64::MAX,
44 max_latency_us: 0,
45 input_queue_depth: 0,
46 output_queue_depth: 0,
47 last_error: 0,
48 _reserved: [0; 3],
49 }
50 }
51
52 pub fn avg_latency_us(&self) -> f64 {
54 if self.messages_processed == 0 {
55 0.0
56 } else {
57 self.total_latency_us as f64 / self.messages_processed as f64
58 }
59 }
60
61 pub fn throughput(&self, elapsed_secs: f64) -> f64 {
63 if elapsed_secs <= 0.0 {
64 0.0
65 } else {
66 self.messages_processed as f64 / elapsed_secs
67 }
68 }
69
70 pub fn drop_rate(&self) -> f64 {
72 let total = self.messages_processed + self.messages_dropped;
73 if total == 0 {
74 0.0
75 } else {
76 self.messages_dropped as f64 / total as f64
77 }
78 }
79
80 pub fn reset(&mut self) {
82 *self = Self::new();
83 }
84
85 pub fn merge(&mut self, other: &TelemetryBuffer) {
87 self.messages_processed += other.messages_processed;
88 self.messages_dropped += other.messages_dropped;
89 self.total_latency_us += other.total_latency_us;
90 self.min_latency_us = self.min_latency_us.min(other.min_latency_us);
91 self.max_latency_us = self.max_latency_us.max(other.max_latency_us);
92 self.input_queue_depth = other.input_queue_depth;
94 self.output_queue_depth = other.output_queue_depth;
95 if other.last_error != 0 {
97 self.last_error = other.last_error;
98 }
99 }
100}
101
102impl Default for TelemetryBuffer {
103 fn default() -> Self {
104 Self::new()
105 }
106}
107
108#[derive(Debug, Clone)]
110pub struct KernelMetrics {
111 pub telemetry: TelemetryBuffer,
113
114 pub kernel_id: String,
116
117 pub collected_at: std::time::Instant,
119
120 pub uptime: std::time::Duration,
122
123 pub invocations: u64,
125
126 pub bytes_to_device: u64,
128
129 pub bytes_from_device: u64,
131
132 pub gpu_memory_used: u64,
134
135 pub host_memory_used: u64,
137}
138
139impl Default for KernelMetrics {
140 fn default() -> Self {
141 Self {
142 telemetry: TelemetryBuffer::default(),
143 kernel_id: String::new(),
144 collected_at: std::time::Instant::now(),
145 uptime: std::time::Duration::ZERO,
146 invocations: 0,
147 bytes_to_device: 0,
148 bytes_from_device: 0,
149 gpu_memory_used: 0,
150 host_memory_used: 0,
151 }
152 }
153}
154
155impl KernelMetrics {
156 pub fn new(kernel_id: impl Into<String>) -> Self {
158 Self {
159 kernel_id: kernel_id.into(),
160 ..Default::default()
161 }
162 }
163
164 pub fn transfer_bandwidth(&self) -> f64 {
166 let total_bytes = self.bytes_to_device + self.bytes_from_device;
167 let secs = self.uptime.as_secs_f64();
168 if secs > 0.0 {
169 total_bytes as f64 / secs
170 } else {
171 0.0
172 }
173 }
174
175 pub fn summary(&self) -> String {
177 format!(
178 "Kernel {} - Processed: {}, Dropped: {}, Avg Latency: {:.2}µs, Throughput: {:.2}/s",
179 self.kernel_id,
180 self.telemetry.messages_processed,
181 self.telemetry.messages_dropped,
182 self.telemetry.avg_latency_us(),
183 self.telemetry.throughput(self.uptime.as_secs_f64())
184 )
185 }
186}
187
188#[derive(Debug, Clone)]
190pub struct LatencyHistogram {
191 pub buckets: Vec<u64>,
193 pub counts: Vec<u64>,
195 pub overflow: u64,
197}
198
199impl LatencyHistogram {
200 pub fn new() -> Self {
202 Self::with_buckets(vec![1, 10, 100, 1_000, 10_000, 100_000, 1_000_000])
204 }
205
206 pub fn with_buckets(buckets: Vec<u64>) -> Self {
208 let counts = vec![0; buckets.len()];
209 Self {
210 buckets,
211 counts,
212 overflow: 0,
213 }
214 }
215
216 pub fn record(&mut self, value_us: u64) {
218 for (i, &boundary) in self.buckets.iter().enumerate() {
219 if value_us <= boundary {
220 self.counts[i] += 1;
221 return;
222 }
223 }
224 self.overflow += 1;
225 }
226
227 pub fn total(&self) -> u64 {
229 self.counts.iter().sum::<u64>() + self.overflow
230 }
231
232 pub fn percentile(&self, p: f64) -> u64 {
234 let total = self.total();
235 if total == 0 {
236 return 0;
237 }
238
239 let target = (total as f64 * p / 100.0).ceil() as u64;
240 let mut cumulative = 0u64;
241
242 for (i, &count) in self.counts.iter().enumerate() {
243 cumulative += count;
244 if cumulative >= target {
245 return self.buckets[i];
246 }
247 }
248
249 self.buckets.last().map(|b| b + 1).unwrap_or(0)
251 }
252}
253
254impl Default for LatencyHistogram {
255 fn default() -> Self {
256 Self::new()
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263
264 #[test]
265 fn test_telemetry_buffer_size() {
266 assert_eq!(std::mem::size_of::<TelemetryBuffer>(), 64);
267 }
268
269 #[test]
270 fn test_avg_latency() {
271 let mut tb = TelemetryBuffer::new();
272 assert_eq!(tb.avg_latency_us(), 0.0);
273
274 tb.messages_processed = 10;
275 tb.total_latency_us = 1000;
276 assert_eq!(tb.avg_latency_us(), 100.0);
277 }
278
279 #[test]
280 fn test_throughput() {
281 let mut tb = TelemetryBuffer::new();
282 tb.messages_processed = 1000;
283
284 assert_eq!(tb.throughput(1.0), 1000.0);
285 assert_eq!(tb.throughput(2.0), 500.0);
286 assert_eq!(tb.throughput(0.0), 0.0);
287 }
288
289 #[test]
290 fn test_drop_rate() {
291 let mut tb = TelemetryBuffer::new();
292 tb.messages_processed = 90;
293 tb.messages_dropped = 10;
294
295 assert!((tb.drop_rate() - 0.1).abs() < 0.001);
296 }
297
298 #[test]
299 fn test_merge() {
300 let mut tb1 = TelemetryBuffer::new();
301 tb1.messages_processed = 100;
302 tb1.min_latency_us = 10;
303 tb1.max_latency_us = 100;
304
305 let mut tb2 = TelemetryBuffer::new();
306 tb2.messages_processed = 50;
307 tb2.min_latency_us = 5;
308 tb2.max_latency_us = 200;
309
310 tb1.merge(&tb2);
311
312 assert_eq!(tb1.messages_processed, 150);
313 assert_eq!(tb1.min_latency_us, 5);
314 assert_eq!(tb1.max_latency_us, 200);
315 }
316
317 #[test]
318 fn test_histogram_percentile() {
319 let mut hist = LatencyHistogram::with_buckets(vec![10, 50, 100, 500]);
320
321 for _ in 0..80 {
323 hist.record(5); }
325 for _ in 0..15 {
326 hist.record(30); }
328 for _ in 0..5 {
329 hist.record(200); }
331
332 assert_eq!(hist.percentile(50.0), 10); assert_eq!(hist.percentile(90.0), 50); assert_eq!(hist.percentile(99.0), 500); }
336}