ringkernel_core/
pubsub.rs

1//! Topic-based publish/subscribe messaging.
2//!
3//! This module provides a topic-based pub/sub system for kernels to
4//! communicate through named topics without direct knowledge of each other.
5
6use parking_lot::RwLock;
7use std::collections::{HashMap, HashSet};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10use tokio::sync::broadcast;
11
12use crate::error::Result;
13use crate::hlc::HlcTimestamp;
14use crate::message::MessageEnvelope;
15use crate::runtime::KernelId;
16
17/// A topic name for pub/sub messaging.
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19pub struct Topic(pub String);
20
21impl Topic {
22    /// Create a new topic.
23    pub fn new(name: impl Into<String>) -> Self {
24        Self(name.into())
25    }
26
27    /// Get the topic name.
28    pub fn name(&self) -> &str {
29        &self.0
30    }
31
32    /// Check if this is a wildcard pattern.
33    pub fn is_pattern(&self) -> bool {
34        self.0.contains('*') || self.0.contains('#')
35    }
36
37    /// Check if a topic matches this pattern.
38    pub fn matches(&self, other: &Topic) -> bool {
39        if !self.is_pattern() {
40            return self.0 == other.0;
41        }
42
43        // Simple wildcard matching
44        // * matches one level, # matches multiple levels
45        let pattern_parts: Vec<&str> = self.0.split('/').collect();
46        let topic_parts: Vec<&str> = other.0.split('/').collect();
47
48        let mut p_idx = 0;
49        let mut t_idx = 0;
50
51        while p_idx < pattern_parts.len() && t_idx < topic_parts.len() {
52            match pattern_parts[p_idx] {
53                "#" => return true, // # matches everything remaining
54                "*" => {
55                    // * matches exactly one level
56                    p_idx += 1;
57                    t_idx += 1;
58                }
59                part if part == topic_parts[t_idx] => {
60                    p_idx += 1;
61                    t_idx += 1;
62                }
63                _ => return false,
64            }
65        }
66
67        p_idx == pattern_parts.len() && t_idx == topic_parts.len()
68    }
69}
70
71impl std::fmt::Display for Topic {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        write!(f, "{}", self.0)
74    }
75}
76
77impl From<&str> for Topic {
78    fn from(s: &str) -> Self {
79        Self::new(s)
80    }
81}
82
83impl From<String> for Topic {
84    fn from(s: String) -> Self {
85        Self(s)
86    }
87}
88
89/// Configuration for the pub/sub broker.
90#[derive(Debug, Clone)]
91pub struct PubSubConfig {
92    /// Maximum subscribers per topic.
93    pub max_subscribers_per_topic: usize,
94    /// Channel buffer size for each subscription.
95    pub channel_buffer_size: usize,
96    /// Maximum retained messages per topic.
97    pub max_retained_messages: usize,
98    /// Enable message persistence.
99    pub enable_persistence: bool,
100    /// Default QoS level.
101    pub default_qos: QoS,
102}
103
104impl Default for PubSubConfig {
105    fn default() -> Self {
106        Self {
107            max_subscribers_per_topic: 1000,
108            channel_buffer_size: 256,
109            max_retained_messages: 100,
110            enable_persistence: false,
111            default_qos: QoS::AtMostOnce,
112        }
113    }
114}
115
116/// Quality of Service level for message delivery.
117#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
118pub enum QoS {
119    /// Fire and forget - no delivery guarantee.
120    #[default]
121    AtMostOnce,
122    /// Deliver at least once (may duplicate).
123    AtLeastOnce,
124    /// Deliver exactly once.
125    ExactlyOnce,
126}
127
128/// A published message on a topic.
129#[derive(Debug, Clone)]
130pub struct Publication {
131    /// Topic the message was published to.
132    pub topic: Topic,
133    /// Publisher kernel ID.
134    pub publisher: KernelId,
135    /// The message envelope.
136    pub envelope: MessageEnvelope,
137    /// Publication timestamp.
138    pub timestamp: HlcTimestamp,
139    /// QoS level.
140    pub qos: QoS,
141    /// Sequence number (for ordering).
142    pub sequence: u64,
143    /// Whether this is a retained message.
144    pub retained: bool,
145}
146
147impl Publication {
148    /// Create a new publication.
149    pub fn new(
150        topic: Topic,
151        publisher: KernelId,
152        envelope: MessageEnvelope,
153        timestamp: HlcTimestamp,
154    ) -> Self {
155        Self {
156            topic,
157            publisher,
158            envelope,
159            timestamp,
160            qos: QoS::default(),
161            sequence: 0,
162            retained: false,
163        }
164    }
165
166    /// Set QoS level.
167    pub fn with_qos(mut self, qos: QoS) -> Self {
168        self.qos = qos;
169        self
170    }
171
172    /// Mark as retained.
173    pub fn with_retained(mut self, retained: bool) -> Self {
174        self.retained = retained;
175        self
176    }
177}
178
179/// A subscription to a topic.
180pub struct Subscription {
181    /// Subscription ID.
182    pub id: u64,
183    /// Topic pattern (may include wildcards).
184    pub pattern: Topic,
185    /// Subscriber kernel ID.
186    pub subscriber: KernelId,
187    /// Message receiver.
188    receiver: broadcast::Receiver<Publication>,
189    /// Reference to broker for unsubscribe.
190    broker: Arc<PubSubBroker>,
191}
192
193impl Subscription {
194    /// Receive the next publication.
195    pub async fn receive(&mut self) -> Option<Publication> {
196        loop {
197            match self.receiver.recv().await {
198                Ok(pub_msg) => {
199                    if self.pattern.matches(&pub_msg.topic) {
200                        return Some(pub_msg);
201                    }
202                }
203                Err(broadcast::error::RecvError::Closed) => return None,
204                Err(broadcast::error::RecvError::Lagged(_)) => continue,
205            }
206        }
207    }
208
209    /// Try to receive a publication (non-blocking).
210    pub fn try_receive(&mut self) -> Option<Publication> {
211        loop {
212            match self.receiver.try_recv() {
213                Ok(pub_msg) => {
214                    if self.pattern.matches(&pub_msg.topic) {
215                        return Some(pub_msg);
216                    }
217                }
218                Err(_) => return None,
219            }
220        }
221    }
222
223    /// Unsubscribe from the topic.
224    pub fn unsubscribe(self) {
225        self.broker.unsubscribe(self.id);
226    }
227}
228
229/// Topic info and statistics.
230#[derive(Debug, Clone)]
231pub struct TopicInfo {
232    /// Topic name.
233    pub topic: Topic,
234    /// Number of subscribers.
235    pub subscriber_count: usize,
236    /// Total messages published.
237    pub messages_published: u64,
238    /// Retained message count.
239    pub retained_count: usize,
240}
241
242/// Pub/sub message broker.
243pub struct PubSubBroker {
244    /// Configuration.
245    config: PubSubConfig,
246    /// Broadcast sender for all publications.
247    sender: broadcast::Sender<Publication>,
248    /// Subscriptions by ID.
249    subscriptions: RwLock<HashMap<u64, SubscriptionInfo>>,
250    /// Subscription counter.
251    subscription_counter: AtomicU64,
252    /// Topic statistics.
253    topic_stats: RwLock<HashMap<Topic, TopicStats>>,
254    /// Retained messages per topic.
255    retained: RwLock<HashMap<Topic, Vec<Publication>>>,
256    /// Global message sequence.
257    sequence: AtomicU64,
258}
259
260/// Internal subscription info.
261struct SubscriptionInfo {
262    pattern: Topic,
263    #[allow(dead_code)]
264    subscriber: KernelId,
265}
266
267/// Topic statistics.
268#[derive(Debug, Clone, Default)]
269struct TopicStats {
270    subscribers: HashSet<u64>,
271    messages_published: u64,
272}
273
274impl PubSubBroker {
275    /// Create a new pub/sub broker.
276    pub fn new(config: PubSubConfig) -> Arc<Self> {
277        let (sender, _) = broadcast::channel(config.channel_buffer_size);
278
279        Arc::new(Self {
280            config,
281            sender,
282            subscriptions: RwLock::new(HashMap::new()),
283            subscription_counter: AtomicU64::new(0),
284            topic_stats: RwLock::new(HashMap::new()),
285            retained: RwLock::new(HashMap::new()),
286            sequence: AtomicU64::new(0),
287        })
288    }
289
290    /// Subscribe to a topic.
291    pub fn subscribe(self: &Arc<Self>, subscriber: KernelId, pattern: Topic) -> Subscription {
292        let id = self.subscription_counter.fetch_add(1, Ordering::Relaxed);
293
294        // Store subscription info
295        self.subscriptions.write().insert(
296            id,
297            SubscriptionInfo {
298                pattern: pattern.clone(),
299                subscriber: subscriber.clone(),
300            },
301        );
302
303        // Update topic stats
304        let mut stats = self.topic_stats.write();
305        stats
306            .entry(pattern.clone())
307            .or_default()
308            .subscribers
309            .insert(id);
310
311        Subscription {
312            id,
313            pattern,
314            subscriber,
315            receiver: self.sender.subscribe(),
316            broker: Arc::clone(self),
317        }
318    }
319
320    /// Unsubscribe by subscription ID.
321    pub fn unsubscribe(&self, subscription_id: u64) {
322        let info = self.subscriptions.write().remove(&subscription_id);
323
324        if let Some(info) = info {
325            let mut stats = self.topic_stats.write();
326            if let Some(topic_stats) = stats.get_mut(&info.pattern) {
327                topic_stats.subscribers.remove(&subscription_id);
328            }
329        }
330    }
331
332    /// Publish a message to a topic.
333    pub fn publish(
334        &self,
335        topic: Topic,
336        publisher: KernelId,
337        envelope: MessageEnvelope,
338        timestamp: HlcTimestamp,
339    ) -> Result<u64> {
340        let sequence = self.sequence.fetch_add(1, Ordering::Relaxed);
341
342        let mut publication = Publication::new(topic.clone(), publisher, envelope, timestamp);
343        publication.sequence = sequence;
344
345        // Update stats
346        {
347            let mut stats = self.topic_stats.write();
348            let topic_stats = stats.entry(topic.clone()).or_default();
349            topic_stats.messages_published += 1;
350        }
351
352        // Handle retained message
353        if publication.retained {
354            let mut retained = self.retained.write();
355            let retained_list = retained.entry(topic).or_default();
356            retained_list.push(publication.clone());
357
358            // Trim to max retained
359            if retained_list.len() > self.config.max_retained_messages {
360                retained_list.remove(0);
361            }
362        }
363
364        // Broadcast to all subscribers
365        // Note: subscribers filter by pattern in their receive
366        let _ = self.sender.send(publication);
367
368        Ok(sequence)
369    }
370
371    /// Publish with QoS setting.
372    pub fn publish_qos(
373        &self,
374        topic: Topic,
375        publisher: KernelId,
376        envelope: MessageEnvelope,
377        timestamp: HlcTimestamp,
378        qos: QoS,
379    ) -> Result<u64> {
380        let sequence = self.sequence.fetch_add(1, Ordering::Relaxed);
381
382        let mut publication = Publication::new(topic.clone(), publisher, envelope, timestamp);
383        publication.sequence = sequence;
384        publication.qos = qos;
385
386        // Update stats
387        {
388            let mut stats = self.topic_stats.write();
389            let topic_stats = stats.entry(topic).or_default();
390            topic_stats.messages_published += 1;
391        }
392
393        let _ = self.sender.send(publication);
394        Ok(sequence)
395    }
396
397    /// Publish a retained message.
398    pub fn publish_retained(
399        &self,
400        topic: Topic,
401        publisher: KernelId,
402        envelope: MessageEnvelope,
403        timestamp: HlcTimestamp,
404    ) -> Result<u64> {
405        let sequence = self.sequence.fetch_add(1, Ordering::Relaxed);
406
407        let mut publication = Publication::new(topic.clone(), publisher, envelope, timestamp);
408        publication.sequence = sequence;
409        publication.retained = true;
410
411        // Store retained message
412        {
413            let mut retained = self.retained.write();
414            let retained_list = retained.entry(topic.clone()).or_default();
415            retained_list.push(publication.clone());
416
417            if retained_list.len() > self.config.max_retained_messages {
418                retained_list.remove(0);
419            }
420        }
421
422        // Update stats
423        {
424            let mut stats = self.topic_stats.write();
425            let topic_stats = stats.entry(topic).or_default();
426            topic_stats.messages_published += 1;
427        }
428
429        let _ = self.sender.send(publication);
430        Ok(sequence)
431    }
432
433    /// Get retained messages for a topic.
434    pub fn get_retained(&self, topic: &Topic) -> Vec<Publication> {
435        self.retained.read().get(topic).cloned().unwrap_or_default()
436    }
437
438    /// Clear retained messages for a topic.
439    pub fn clear_retained(&self, topic: &Topic) {
440        self.retained.write().remove(topic);
441    }
442
443    /// Get topic information.
444    pub fn topic_info(&self, topic: &Topic) -> Option<TopicInfo> {
445        let stats = self.topic_stats.read();
446        let topic_stats = stats.get(topic)?;
447
448        let retained_count = self
449            .retained
450            .read()
451            .get(topic)
452            .map(|v| v.len())
453            .unwrap_or(0);
454
455        Some(TopicInfo {
456            topic: topic.clone(),
457            subscriber_count: topic_stats.subscribers.len(),
458            messages_published: topic_stats.messages_published,
459            retained_count,
460        })
461    }
462
463    /// List all topics with subscribers.
464    pub fn list_topics(&self) -> Vec<Topic> {
465        self.topic_stats
466            .read()
467            .iter()
468            .filter(|(_, stats)| !stats.subscribers.is_empty())
469            .map(|(topic, _)| topic.clone())
470            .collect()
471    }
472
473    /// Get broker statistics.
474    pub fn stats(&self) -> PubSubStats {
475        let stats = self.topic_stats.read();
476        let total_subscribers: usize = stats.values().map(|s| s.subscribers.len()).sum();
477        let total_messages: u64 = stats.values().map(|s| s.messages_published).sum();
478        let retained_count: usize = self.retained.read().values().map(|v| v.len()).sum();
479
480        PubSubStats {
481            topic_count: stats.len(),
482            total_subscribers,
483            total_messages_published: total_messages,
484            retained_message_count: retained_count,
485        }
486    }
487}
488
489/// Pub/sub broker statistics.
490#[derive(Debug, Clone, Default)]
491pub struct PubSubStats {
492    /// Number of topics with activity.
493    pub topic_count: usize,
494    /// Total number of subscriptions.
495    pub total_subscribers: usize,
496    /// Total messages published.
497    pub total_messages_published: u64,
498    /// Total retained messages.
499    pub retained_message_count: usize,
500}
501
502/// Builder for creating pub/sub infrastructure.
503pub struct PubSubBuilder {
504    config: PubSubConfig,
505}
506
507impl PubSubBuilder {
508    /// Create a new builder.
509    pub fn new() -> Self {
510        Self {
511            config: PubSubConfig::default(),
512        }
513    }
514
515    /// Set maximum subscribers per topic.
516    pub fn max_subscribers_per_topic(mut self, count: usize) -> Self {
517        self.config.max_subscribers_per_topic = count;
518        self
519    }
520
521    /// Set channel buffer size.
522    pub fn channel_buffer_size(mut self, size: usize) -> Self {
523        self.config.channel_buffer_size = size;
524        self
525    }
526
527    /// Set maximum retained messages.
528    pub fn max_retained_messages(mut self, count: usize) -> Self {
529        self.config.max_retained_messages = count;
530        self
531    }
532
533    /// Enable message persistence.
534    pub fn enable_persistence(mut self, enable: bool) -> Self {
535        self.config.enable_persistence = enable;
536        self
537    }
538
539    /// Set default QoS.
540    pub fn default_qos(mut self, qos: QoS) -> Self {
541        self.config.default_qos = qos;
542        self
543    }
544
545    /// Build the pub/sub broker.
546    pub fn build(self) -> Arc<PubSubBroker> {
547        PubSubBroker::new(self.config)
548    }
549}
550
551impl Default for PubSubBuilder {
552    fn default() -> Self {
553        Self::new()
554    }
555}
556
557#[cfg(test)]
558mod tests {
559    use super::*;
560
561    #[test]
562    fn test_topic_matching() {
563        let pattern = Topic::new("sensors/*/temperature");
564        let topic1 = Topic::new("sensors/kitchen/temperature");
565        let topic2 = Topic::new("sensors/living_room/temperature");
566        let topic3 = Topic::new("sensors/kitchen/humidity");
567
568        assert!(pattern.matches(&topic1));
569        assert!(pattern.matches(&topic2));
570        assert!(!pattern.matches(&topic3));
571    }
572
573    #[test]
574    fn test_topic_wildcard_hash() {
575        let pattern = Topic::new("sensors/#");
576        let topic1 = Topic::new("sensors/kitchen/temperature");
577        let topic2 = Topic::new("sensors/a/b/c/d");
578
579        assert!(pattern.matches(&topic1));
580        assert!(pattern.matches(&topic2));
581    }
582
583    #[test]
584    fn test_topic_exact_match() {
585        let pattern = Topic::new("sensors/kitchen/temperature");
586        let topic1 = Topic::new("sensors/kitchen/temperature");
587        let topic2 = Topic::new("sensors/kitchen/humidity");
588
589        assert!(pattern.matches(&topic1));
590        assert!(!pattern.matches(&topic2));
591    }
592
593    #[tokio::test]
594    async fn test_pubsub_broker() {
595        let broker = PubSubBuilder::new().build();
596
597        let publisher = KernelId::new("publisher");
598        let subscriber = KernelId::new("subscriber");
599        let topic = Topic::new("test/topic");
600
601        let mut subscription = broker.subscribe(subscriber, topic.clone());
602
603        // Publish a message
604        let envelope = MessageEnvelope::empty(1, 2, HlcTimestamp::now(1));
605        let timestamp = HlcTimestamp::now(1);
606
607        broker
608            .publish(topic.clone(), publisher.clone(), envelope, timestamp)
609            .unwrap();
610
611        // Small delay for broadcast
612        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
613
614        // Receive the message
615        let received = subscription.try_receive();
616        assert!(received.is_some());
617        assert_eq!(received.unwrap().publisher, publisher);
618    }
619
620    #[test]
621    fn test_pubsub_stats() {
622        let broker = PubSubBuilder::new().build();
623
624        let topic = Topic::new("test");
625        let kernel = KernelId::new("kernel");
626
627        let _sub = broker.subscribe(kernel.clone(), topic.clone());
628
629        let stats = broker.stats();
630        assert_eq!(stats.topic_count, 1);
631        assert_eq!(stats.total_subscribers, 1);
632    }
633}