1use bytemuck::{Pod, Zeroable};
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::time::{SystemTime, UNIX_EPOCH};
27use zerocopy::{AsBytes, FromBytes, FromZeroes};
28
29use crate::error::{Result, RingKernelError};
30
31pub const MAX_CLOCK_SKEW_MS: u64 = 60_000; #[derive(
43 Debug, Clone, Copy, PartialEq, Eq, Hash, AsBytes, FromBytes, FromZeroes, Pod, Zeroable,
44)]
45#[repr(C, align(8))]
46pub struct HlcTimestamp {
47 pub physical: u64,
49 pub logical: u64,
51 pub node_id: u64,
53}
54
55impl HlcTimestamp {
56 pub const fn new(physical: u64, logical: u64, node_id: u64) -> Self {
58 Self {
59 physical,
60 logical,
61 node_id,
62 }
63 }
64
65 pub const fn zero() -> Self {
67 Self {
68 physical: 0,
69 logical: 0,
70 node_id: 0,
71 }
72 }
73
74 pub fn now(node_id: u64) -> Self {
76 let physical = SystemTime::now()
77 .duration_since(UNIX_EPOCH)
78 .expect("Time went backwards")
79 .as_micros() as u64;
80
81 Self {
82 physical,
83 logical: 0,
84 node_id,
85 }
86 }
87
88 pub const fn is_zero(&self) -> bool {
90 self.physical == 0 && self.logical == 0
91 }
92
93 pub const fn as_micros(&self) -> u64 {
95 self.physical
96 }
97
98 pub const fn as_millis(&self) -> u64 {
100 self.physical / 1000
101 }
102
103 pub const fn pack(&self) -> u128 {
106 ((self.physical as u128) << 64)
107 | ((self.logical as u128) << 16)
108 | (self.node_id as u128 & 0xFFFF)
109 }
110
111 pub const fn unpack(packed: u128) -> Self {
113 Self {
114 physical: (packed >> 64) as u64,
115 logical: ((packed >> 16) & 0xFFFF_FFFF_FFFF) as u64,
116 node_id: (packed & 0xFFFF) as u64,
117 }
118 }
119}
120
121impl Default for HlcTimestamp {
122 fn default() -> Self {
123 Self::zero()
124 }
125}
126
127impl Ord for HlcTimestamp {
128 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
129 match self.physical.cmp(&other.physical) {
131 std::cmp::Ordering::Equal => {}
132 ord => return ord,
133 }
134 match self.logical.cmp(&other.logical) {
136 std::cmp::Ordering::Equal => {}
137 ord => return ord,
138 }
139 self.node_id.cmp(&other.node_id)
141 }
142}
143
144impl PartialOrd for HlcTimestamp {
145 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
146 Some(self.cmp(other))
147 }
148}
149
150impl std::fmt::Display for HlcTimestamp {
151 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152 write!(
153 f,
154 "HLC({}.{}.{})",
155 self.physical, self.logical, self.node_id
156 )
157 }
158}
159
160pub struct HlcClock {
164 physical: AtomicU64,
166 logical: AtomicU64,
168 node_id: u64,
170 max_drift_us: u64,
172}
173
174impl HlcClock {
175 pub fn new(node_id: u64) -> Self {
177 let now = SystemTime::now()
178 .duration_since(UNIX_EPOCH)
179 .expect("Time went backwards")
180 .as_micros() as u64;
181
182 Self {
183 physical: AtomicU64::new(now),
184 logical: AtomicU64::new(0),
185 node_id,
186 max_drift_us: MAX_CLOCK_SKEW_MS * 1000,
187 }
188 }
189
190 pub fn with_max_drift(node_id: u64, max_drift_ms: u64) -> Self {
192 let now = SystemTime::now()
193 .duration_since(UNIX_EPOCH)
194 .expect("Time went backwards")
195 .as_micros() as u64;
196
197 Self {
198 physical: AtomicU64::new(now),
199 logical: AtomicU64::new(0),
200 node_id,
201 max_drift_us: max_drift_ms * 1000,
202 }
203 }
204
205 pub fn node_id(&self) -> u64 {
207 self.node_id
208 }
209
210 pub fn now(&self) -> HlcTimestamp {
212 let wall = Self::wall_time();
213 let physical = self.physical.load(Ordering::Acquire);
214 let logical = self.logical.load(Ordering::Acquire);
215
216 let new_physical = physical.max(wall);
218
219 HlcTimestamp {
220 physical: new_physical,
221 logical,
222 node_id: self.node_id,
223 }
224 }
225
226 pub fn tick(&self) -> HlcTimestamp {
228 let wall = Self::wall_time();
229
230 loop {
231 let old_physical = self.physical.load(Ordering::Acquire);
232 let old_logical = self.logical.load(Ordering::Acquire);
233
234 let (new_physical, new_logical) = if wall > old_physical {
235 (wall, 0)
237 } else {
238 (old_physical, old_logical + 1)
240 };
241
242 if self
244 .physical
245 .compare_exchange(
246 old_physical,
247 new_physical,
248 Ordering::Release,
249 Ordering::Relaxed,
250 )
251 .is_ok()
252 {
253 self.logical.store(new_logical, Ordering::Release);
254 return HlcTimestamp {
255 physical: new_physical,
256 logical: new_logical,
257 node_id: self.node_id,
258 };
259 }
260 }
262 }
263
264 pub fn update(&self, received: &HlcTimestamp) -> Result<HlcTimestamp> {
268 let wall = Self::wall_time();
269
270 if received.physical > wall + self.max_drift_us {
272 return Err(RingKernelError::ClockSkew {
273 skew_ms: (received.physical - wall) / 1000,
274 max_ms: self.max_drift_us / 1000,
275 });
276 }
277
278 loop {
279 let old_physical = self.physical.load(Ordering::Acquire);
280 let old_logical = self.logical.load(Ordering::Acquire);
281
282 let max_physical = wall.max(old_physical).max(received.physical);
284
285 let new_logical = if max_physical == old_physical && max_physical == received.physical {
286 old_logical.max(received.logical) + 1
288 } else if max_physical == old_physical {
289 old_logical + 1
291 } else if max_physical == received.physical {
292 received.logical + 1
294 } else {
295 0
297 };
298
299 if self
301 .physical
302 .compare_exchange(
303 old_physical,
304 max_physical,
305 Ordering::Release,
306 Ordering::Relaxed,
307 )
308 .is_ok()
309 {
310 self.logical.store(new_logical, Ordering::Release);
311 return Ok(HlcTimestamp {
312 physical: max_physical,
313 logical: new_logical,
314 node_id: self.node_id,
315 });
316 }
317 }
319 }
320
321 fn wall_time() -> u64 {
323 SystemTime::now()
324 .duration_since(UNIX_EPOCH)
325 .expect("Time went backwards")
326 .as_micros() as u64
327 }
328}
329
330impl std::fmt::Debug for HlcClock {
331 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332 f.debug_struct("HlcClock")
333 .field("physical", &self.physical.load(Ordering::Relaxed))
334 .field("logical", &self.logical.load(Ordering::Relaxed))
335 .field("node_id", &self.node_id)
336 .finish()
337 }
338}
339
340#[derive(Debug, Clone, Copy, Default, AsBytes, FromBytes, FromZeroes, Pod, Zeroable)]
342#[repr(C, align(16))]
343pub struct HlcState {
344 pub physical: u64,
346 pub logical: u64,
348}
349
350impl HlcState {
351 pub const fn new(physical: u64, logical: u64) -> Self {
353 Self { physical, logical }
354 }
355
356 pub const fn to_timestamp(&self, node_id: u64) -> HlcTimestamp {
358 HlcTimestamp {
359 physical: self.physical,
360 logical: self.logical,
361 node_id,
362 }
363 }
364
365 pub const fn from_timestamp(ts: &HlcTimestamp) -> Self {
367 Self {
368 physical: ts.physical,
369 logical: ts.logical,
370 }
371 }
372}
373
374#[cfg(test)]
375mod tests {
376 use super::*;
377
378 #[test]
379 fn test_timestamp_ordering() {
380 let ts1 = HlcTimestamp::new(100, 0, 1);
381 let ts2 = HlcTimestamp::new(100, 1, 1);
382 let ts3 = HlcTimestamp::new(101, 0, 1);
383
384 assert!(ts1 < ts2);
385 assert!(ts2 < ts3);
386 assert!(ts1 < ts3);
387 }
388
389 #[test]
390 fn test_timestamp_node_id_tiebreak() {
391 let ts1 = HlcTimestamp::new(100, 5, 1);
392 let ts2 = HlcTimestamp::new(100, 5, 2);
393
394 assert!(ts1 < ts2);
395 }
396
397 #[test]
398 fn test_clock_tick() {
399 let clock = HlcClock::new(1);
400
401 let ts1 = clock.tick();
402 let ts2 = clock.tick();
403 let ts3 = clock.tick();
404
405 assert!(ts1 < ts2);
406 assert!(ts2 < ts3);
407 }
408
409 #[test]
410 fn test_clock_update() {
411 let clock1 = HlcClock::new(1);
412 let clock2 = HlcClock::new(2);
413
414 let ts1 = clock1.tick();
415 let ts2 = clock2.update(&ts1).unwrap();
416
417 assert!(ts1 < ts2);
419 }
420
421 #[test]
422 fn test_pack_unpack() {
423 let original = HlcTimestamp::new(12345678901234, 42, 7);
424 let packed = original.pack();
425 let unpacked = HlcTimestamp::unpack(packed);
426
427 assert_eq!(original.physical, unpacked.physical);
428 assert_eq!(original.logical, unpacked.logical);
430 }
431
432 #[test]
433 fn test_clock_skew_detection() {
434 let clock = HlcClock::with_max_drift(1, 100); let future = HlcTimestamp::new(
438 SystemTime::now()
439 .duration_since(UNIX_EPOCH)
440 .unwrap()
441 .as_micros() as u64
442 + 200_000_000, 0,
444 2,
445 );
446
447 let result = clock.update(&future);
448 assert!(matches!(result, Err(RingKernelError::ClockSkew { .. })));
449 }
450
451 #[test]
452 fn test_timestamp_display() {
453 let ts = HlcTimestamp::new(1234567890, 42, 7);
454 let s = format!("{}", ts);
455 assert!(s.contains("1234567890"));
456 assert!(s.contains("42"));
457 assert!(s.contains("7"));
458 }
459}