ringkernel_core/
hlc.rs

1//! Hybrid Logical Clock (HLC) implementation for causal ordering.
2//!
3//! HLC combines physical time with logical counters to provide total ordering
4//! of events across distributed GPU kernels while maintaining close relationship
5//! with real time.
6//!
7//! ## Properties
8//!
9//! - **Total Ordering**: All timestamps can be compared
10//! - **Causality**: If event A causes event B, then HLC(A) < HLC(B)
11//! - **Bounded Drift**: Physical component stays within bounded drift of real time
12//!
13//! ## Usage
14//!
15//! ```
16//! use ringkernel_core::hlc::{HlcTimestamp, HlcClock};
17//!
18//! let clock = HlcClock::new(1); // Node ID = 1
19//! let ts1 = clock.tick();
20//! let ts2 = clock.tick();
21//! assert!(ts1 < ts2); // tick() guarantees strictly increasing timestamps
22//! ```
23
24use bytemuck::{Pod, Zeroable};
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::time::{SystemTime, UNIX_EPOCH};
27use zerocopy::{AsBytes, FromBytes, FromZeroes};
28
29use crate::error::{Result, RingKernelError};
30
31/// Maximum allowed clock skew in milliseconds.
32pub const MAX_CLOCK_SKEW_MS: u64 = 60_000; // 1 minute
33
34/// Hybrid Logical Clock timestamp.
35///
36/// Composed of:
37/// - Physical time (wall clock in microseconds since epoch)
38/// - Logical counter (for events at same physical time)
39/// - Node ID (for tie-breaking across nodes)
40///
41/// This struct is 24 bytes and cache-line friendly.
42#[derive(
43    Debug, Clone, Copy, PartialEq, Eq, Hash, AsBytes, FromBytes, FromZeroes, Pod, Zeroable,
44)]
45#[repr(C, align(8))]
46pub struct HlcTimestamp {
47    /// Physical time component (microseconds since UNIX epoch).
48    pub physical: u64,
49    /// Logical counter for events at the same physical time.
50    pub logical: u64,
51    /// Node identifier for tie-breaking.
52    pub node_id: u64,
53}
54
55impl HlcTimestamp {
56    /// Create a new HLC timestamp.
57    pub const fn new(physical: u64, logical: u64, node_id: u64) -> Self {
58        Self {
59            physical,
60            logical,
61            node_id,
62        }
63    }
64
65    /// Create a zero timestamp (minimum value).
66    pub const fn zero() -> Self {
67        Self {
68            physical: 0,
69            logical: 0,
70            node_id: 0,
71        }
72    }
73
74    /// Create a timestamp from the current wall clock.
75    pub fn now(node_id: u64) -> Self {
76        let physical = SystemTime::now()
77            .duration_since(UNIX_EPOCH)
78            .expect("Time went backwards")
79            .as_micros() as u64;
80
81        Self {
82            physical,
83            logical: 0,
84            node_id,
85        }
86    }
87
88    /// Check if this timestamp is zero/uninitialized.
89    pub const fn is_zero(&self) -> bool {
90        self.physical == 0 && self.logical == 0
91    }
92
93    /// Get physical time as microseconds since epoch.
94    pub const fn as_micros(&self) -> u64 {
95        self.physical
96    }
97
98    /// Get physical time as milliseconds since epoch.
99    pub const fn as_millis(&self) -> u64 {
100        self.physical / 1000
101    }
102
103    /// Pack timestamp into a single u128 for atomic comparison.
104    /// Format: [physical:64][logical:48][node_id:16]
105    pub const fn pack(&self) -> u128 {
106        ((self.physical as u128) << 64)
107            | ((self.logical as u128) << 16)
108            | (self.node_id as u128 & 0xFFFF)
109    }
110
111    /// Unpack timestamp from u128.
112    pub const fn unpack(packed: u128) -> Self {
113        Self {
114            physical: (packed >> 64) as u64,
115            logical: ((packed >> 16) & 0xFFFF_FFFF_FFFF) as u64,
116            node_id: (packed & 0xFFFF) as u64,
117        }
118    }
119}
120
121impl Default for HlcTimestamp {
122    fn default() -> Self {
123        Self::zero()
124    }
125}
126
127impl Ord for HlcTimestamp {
128    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
129        // Compare physical time first
130        match self.physical.cmp(&other.physical) {
131            std::cmp::Ordering::Equal => {}
132            ord => return ord,
133        }
134        // Then logical counter
135        match self.logical.cmp(&other.logical) {
136            std::cmp::Ordering::Equal => {}
137            ord => return ord,
138        }
139        // Finally node_id for total ordering
140        self.node_id.cmp(&other.node_id)
141    }
142}
143
144impl PartialOrd for HlcTimestamp {
145    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
146        Some(self.cmp(other))
147    }
148}
149
150impl std::fmt::Display for HlcTimestamp {
151    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152        write!(
153            f,
154            "HLC({}.{}.{})",
155            self.physical, self.logical, self.node_id
156        )
157    }
158}
159
160/// Hybrid Logical Clock for generating causally-ordered timestamps.
161///
162/// Thread-safe implementation using atomics for the state.
163pub struct HlcClock {
164    /// Current physical time (atomically updated).
165    physical: AtomicU64,
166    /// Current logical counter (atomically updated).
167    logical: AtomicU64,
168    /// Node identifier.
169    node_id: u64,
170    /// Maximum allowed clock drift in microseconds.
171    max_drift_us: u64,
172}
173
174impl HlcClock {
175    /// Create a new HLC clock with the given node ID.
176    pub fn new(node_id: u64) -> Self {
177        let now = SystemTime::now()
178            .duration_since(UNIX_EPOCH)
179            .expect("Time went backwards")
180            .as_micros() as u64;
181
182        Self {
183            physical: AtomicU64::new(now),
184            logical: AtomicU64::new(0),
185            node_id,
186            max_drift_us: MAX_CLOCK_SKEW_MS * 1000,
187        }
188    }
189
190    /// Create a new HLC clock with custom max drift.
191    pub fn with_max_drift(node_id: u64, max_drift_ms: u64) -> Self {
192        let now = SystemTime::now()
193            .duration_since(UNIX_EPOCH)
194            .expect("Time went backwards")
195            .as_micros() as u64;
196
197        Self {
198            physical: AtomicU64::new(now),
199            logical: AtomicU64::new(0),
200            node_id,
201            max_drift_us: max_drift_ms * 1000,
202        }
203    }
204
205    /// Get the node ID.
206    pub fn node_id(&self) -> u64 {
207        self.node_id
208    }
209
210    /// Get current timestamp without advancing the clock.
211    pub fn now(&self) -> HlcTimestamp {
212        let wall = Self::wall_time();
213        let physical = self.physical.load(Ordering::Acquire);
214        let logical = self.logical.load(Ordering::Acquire);
215
216        // Use max of wall clock and stored physical
217        let new_physical = physical.max(wall);
218
219        HlcTimestamp {
220            physical: new_physical,
221            logical,
222            node_id: self.node_id,
223        }
224    }
225
226    /// Generate a new timestamp, advancing the clock.
227    pub fn tick(&self) -> HlcTimestamp {
228        let wall = Self::wall_time();
229
230        loop {
231            let old_physical = self.physical.load(Ordering::Acquire);
232            let old_logical = self.logical.load(Ordering::Acquire);
233
234            let (new_physical, new_logical) = if wall > old_physical {
235                // Wall clock advanced: use wall time, reset logical
236                (wall, 0)
237            } else {
238                // Same or past: increment logical counter
239                (old_physical, old_logical + 1)
240            };
241
242            // Try to update atomically
243            if self
244                .physical
245                .compare_exchange(
246                    old_physical,
247                    new_physical,
248                    Ordering::Release,
249                    Ordering::Relaxed,
250                )
251                .is_ok()
252            {
253                self.logical.store(new_logical, Ordering::Release);
254                return HlcTimestamp {
255                    physical: new_physical,
256                    logical: new_logical,
257                    node_id: self.node_id,
258                };
259            }
260            // CAS failed, retry
261        }
262    }
263
264    /// Update clock on receiving a message with the given timestamp.
265    ///
266    /// Returns the new local timestamp that causally follows the received timestamp.
267    pub fn update(&self, received: &HlcTimestamp) -> Result<HlcTimestamp> {
268        let wall = Self::wall_time();
269
270        // Check for clock skew
271        if received.physical > wall + self.max_drift_us {
272            return Err(RingKernelError::ClockSkew {
273                skew_ms: (received.physical - wall) / 1000,
274                max_ms: self.max_drift_us / 1000,
275            });
276        }
277
278        loop {
279            let old_physical = self.physical.load(Ordering::Acquire);
280            let old_logical = self.logical.load(Ordering::Acquire);
281
282            // Take max of wall, local, and received physical
283            let max_physical = wall.max(old_physical).max(received.physical);
284
285            let new_logical = if max_physical == old_physical && max_physical == received.physical {
286                // All three equal: take max logical + 1
287                old_logical.max(received.logical) + 1
288            } else if max_physical == old_physical {
289                // Local physical wins: increment local logical
290                old_logical + 1
291            } else if max_physical == received.physical {
292                // Received physical wins: use received logical + 1
293                received.logical + 1
294            } else {
295                // Wall clock wins: reset logical
296                0
297            };
298
299            // Try to update atomically
300            if self
301                .physical
302                .compare_exchange(
303                    old_physical,
304                    max_physical,
305                    Ordering::Release,
306                    Ordering::Relaxed,
307                )
308                .is_ok()
309            {
310                self.logical.store(new_logical, Ordering::Release);
311                return Ok(HlcTimestamp {
312                    physical: max_physical,
313                    logical: new_logical,
314                    node_id: self.node_id,
315                });
316            }
317            // CAS failed, retry
318        }
319    }
320
321    /// Get current wall clock time in microseconds.
322    fn wall_time() -> u64 {
323        SystemTime::now()
324            .duration_since(UNIX_EPOCH)
325            .expect("Time went backwards")
326            .as_micros() as u64
327    }
328}
329
330impl std::fmt::Debug for HlcClock {
331    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332        f.debug_struct("HlcClock")
333            .field("physical", &self.physical.load(Ordering::Relaxed))
334            .field("logical", &self.logical.load(Ordering::Relaxed))
335            .field("node_id", &self.node_id)
336            .finish()
337    }
338}
339
340/// Compact HLC state for GPU-side storage (16 bytes).
341#[derive(Debug, Clone, Copy, Default, AsBytes, FromBytes, FromZeroes, Pod, Zeroable)]
342#[repr(C, align(16))]
343pub struct HlcState {
344    /// Physical time in microseconds.
345    pub physical: u64,
346    /// Logical counter.
347    pub logical: u64,
348}
349
350impl HlcState {
351    /// Create new HLC state.
352    pub const fn new(physical: u64, logical: u64) -> Self {
353        Self { physical, logical }
354    }
355
356    /// Convert to full timestamp with node ID.
357    pub const fn to_timestamp(&self, node_id: u64) -> HlcTimestamp {
358        HlcTimestamp {
359            physical: self.physical,
360            logical: self.logical,
361            node_id,
362        }
363    }
364
365    /// Create from full timestamp (drops node_id).
366    pub const fn from_timestamp(ts: &HlcTimestamp) -> Self {
367        Self {
368            physical: ts.physical,
369            logical: ts.logical,
370        }
371    }
372}
373
374#[cfg(test)]
375mod tests {
376    use super::*;
377
378    #[test]
379    fn test_timestamp_ordering() {
380        let ts1 = HlcTimestamp::new(100, 0, 1);
381        let ts2 = HlcTimestamp::new(100, 1, 1);
382        let ts3 = HlcTimestamp::new(101, 0, 1);
383
384        assert!(ts1 < ts2);
385        assert!(ts2 < ts3);
386        assert!(ts1 < ts3);
387    }
388
389    #[test]
390    fn test_timestamp_node_id_tiebreak() {
391        let ts1 = HlcTimestamp::new(100, 5, 1);
392        let ts2 = HlcTimestamp::new(100, 5, 2);
393
394        assert!(ts1 < ts2);
395    }
396
397    #[test]
398    fn test_clock_tick() {
399        let clock = HlcClock::new(1);
400
401        let ts1 = clock.tick();
402        let ts2 = clock.tick();
403        let ts3 = clock.tick();
404
405        assert!(ts1 < ts2);
406        assert!(ts2 < ts3);
407    }
408
409    #[test]
410    fn test_clock_update() {
411        let clock1 = HlcClock::new(1);
412        let clock2 = HlcClock::new(2);
413
414        let ts1 = clock1.tick();
415        let ts2 = clock2.update(&ts1).unwrap();
416
417        // ts2 should causally follow ts1
418        assert!(ts1 < ts2);
419    }
420
421    #[test]
422    fn test_pack_unpack() {
423        let original = HlcTimestamp::new(12345678901234, 42, 7);
424        let packed = original.pack();
425        let unpacked = HlcTimestamp::unpack(packed);
426
427        assert_eq!(original.physical, unpacked.physical);
428        // Note: node_id is truncated to 16 bits in pack format
429        assert_eq!(original.logical, unpacked.logical);
430    }
431
432    #[test]
433    fn test_clock_skew_detection() {
434        let clock = HlcClock::with_max_drift(1, 100); // 100ms max drift
435
436        // Create a timestamp far in the future
437        let future = HlcTimestamp::new(
438            SystemTime::now()
439                .duration_since(UNIX_EPOCH)
440                .unwrap()
441                .as_micros() as u64
442                + 200_000_000, // 200 seconds in future
443            0,
444            2,
445        );
446
447        let result = clock.update(&future);
448        assert!(matches!(result, Err(RingKernelError::ClockSkew { .. })));
449    }
450
451    #[test]
452    fn test_timestamp_display() {
453        let ts = HlcTimestamp::new(1234567890, 42, 7);
454        let s = format!("{}", ts);
455        assert!(s.contains("1234567890"));
456        assert!(s.contains("42"));
457        assert!(s.contains("7"));
458    }
459}