1use parking_lot::RwLock;
7use std::collections::{HashMap, HashSet};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10use tokio::sync::broadcast;
11
12use crate::error::Result;
13use crate::hlc::HlcTimestamp;
14use crate::message::MessageEnvelope;
15use crate::runtime::KernelId;
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19pub struct Topic(pub String);
20
21impl Topic {
22 pub fn new(name: impl Into<String>) -> Self {
24 Self(name.into())
25 }
26
27 pub fn name(&self) -> &str {
29 &self.0
30 }
31
32 pub fn is_pattern(&self) -> bool {
34 self.0.contains('*') || self.0.contains('#')
35 }
36
37 pub fn matches(&self, other: &Topic) -> bool {
39 if !self.is_pattern() {
40 return self.0 == other.0;
41 }
42
43 let pattern_parts: Vec<&str> = self.0.split('/').collect();
46 let topic_parts: Vec<&str> = other.0.split('/').collect();
47
48 let mut p_idx = 0;
49 let mut t_idx = 0;
50
51 while p_idx < pattern_parts.len() && t_idx < topic_parts.len() {
52 match pattern_parts[p_idx] {
53 "#" => return true, "*" => {
55 p_idx += 1;
57 t_idx += 1;
58 }
59 part if part == topic_parts[t_idx] => {
60 p_idx += 1;
61 t_idx += 1;
62 }
63 _ => return false,
64 }
65 }
66
67 p_idx == pattern_parts.len() && t_idx == topic_parts.len()
68 }
69}
70
71impl std::fmt::Display for Topic {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 write!(f, "{}", self.0)
74 }
75}
76
77impl From<&str> for Topic {
78 fn from(s: &str) -> Self {
79 Self::new(s)
80 }
81}
82
83impl From<String> for Topic {
84 fn from(s: String) -> Self {
85 Self(s)
86 }
87}
88
89#[derive(Debug, Clone)]
91pub struct PubSubConfig {
92 pub max_subscribers_per_topic: usize,
94 pub channel_buffer_size: usize,
96 pub max_retained_messages: usize,
98 pub enable_persistence: bool,
100 pub default_qos: QoS,
102}
103
104impl Default for PubSubConfig {
105 fn default() -> Self {
106 Self {
107 max_subscribers_per_topic: 1000,
108 channel_buffer_size: 256,
109 max_retained_messages: 100,
110 enable_persistence: false,
111 default_qos: QoS::AtMostOnce,
112 }
113 }
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
118pub enum QoS {
119 #[default]
121 AtMostOnce,
122 AtLeastOnce,
124 ExactlyOnce,
126}
127
128#[derive(Debug, Clone)]
130pub struct Publication {
131 pub topic: Topic,
133 pub publisher: KernelId,
135 pub envelope: MessageEnvelope,
137 pub timestamp: HlcTimestamp,
139 pub qos: QoS,
141 pub sequence: u64,
143 pub retained: bool,
145}
146
147impl Publication {
148 pub fn new(
150 topic: Topic,
151 publisher: KernelId,
152 envelope: MessageEnvelope,
153 timestamp: HlcTimestamp,
154 ) -> Self {
155 Self {
156 topic,
157 publisher,
158 envelope,
159 timestamp,
160 qos: QoS::default(),
161 sequence: 0,
162 retained: false,
163 }
164 }
165
166 pub fn with_qos(mut self, qos: QoS) -> Self {
168 self.qos = qos;
169 self
170 }
171
172 pub fn with_retained(mut self, retained: bool) -> Self {
174 self.retained = retained;
175 self
176 }
177}
178
179pub struct Subscription {
181 pub id: u64,
183 pub pattern: Topic,
185 pub subscriber: KernelId,
187 receiver: broadcast::Receiver<Publication>,
189 broker: Arc<PubSubBroker>,
191}
192
193impl Subscription {
194 pub async fn receive(&mut self) -> Option<Publication> {
196 loop {
197 match self.receiver.recv().await {
198 Ok(pub_msg) => {
199 if self.pattern.matches(&pub_msg.topic) {
200 return Some(pub_msg);
201 }
202 }
203 Err(broadcast::error::RecvError::Closed) => return None,
204 Err(broadcast::error::RecvError::Lagged(_)) => continue,
205 }
206 }
207 }
208
209 pub fn try_receive(&mut self) -> Option<Publication> {
211 loop {
212 match self.receiver.try_recv() {
213 Ok(pub_msg) => {
214 if self.pattern.matches(&pub_msg.topic) {
215 return Some(pub_msg);
216 }
217 }
218 Err(_) => return None,
219 }
220 }
221 }
222
223 pub fn unsubscribe(self) {
225 self.broker.unsubscribe(self.id);
226 }
227}
228
229#[derive(Debug, Clone)]
231pub struct TopicInfo {
232 pub topic: Topic,
234 pub subscriber_count: usize,
236 pub messages_published: u64,
238 pub retained_count: usize,
240}
241
242pub struct PubSubBroker {
244 config: PubSubConfig,
246 sender: broadcast::Sender<Publication>,
248 subscriptions: RwLock<HashMap<u64, SubscriptionInfo>>,
250 subscription_counter: AtomicU64,
252 topic_stats: RwLock<HashMap<Topic, TopicStats>>,
254 retained: RwLock<HashMap<Topic, Vec<Publication>>>,
256 sequence: AtomicU64,
258}
259
260struct SubscriptionInfo {
262 pattern: Topic,
263 #[allow(dead_code)]
264 subscriber: KernelId,
265}
266
267#[derive(Debug, Clone, Default)]
269struct TopicStats {
270 subscribers: HashSet<u64>,
271 messages_published: u64,
272}
273
274impl PubSubBroker {
275 pub fn new(config: PubSubConfig) -> Arc<Self> {
277 let (sender, _) = broadcast::channel(config.channel_buffer_size);
278
279 Arc::new(Self {
280 config,
281 sender,
282 subscriptions: RwLock::new(HashMap::new()),
283 subscription_counter: AtomicU64::new(0),
284 topic_stats: RwLock::new(HashMap::new()),
285 retained: RwLock::new(HashMap::new()),
286 sequence: AtomicU64::new(0),
287 })
288 }
289
290 pub fn subscribe(self: &Arc<Self>, subscriber: KernelId, pattern: Topic) -> Subscription {
292 let id = self.subscription_counter.fetch_add(1, Ordering::Relaxed);
293
294 self.subscriptions.write().insert(
296 id,
297 SubscriptionInfo {
298 pattern: pattern.clone(),
299 subscriber: subscriber.clone(),
300 },
301 );
302
303 let mut stats = self.topic_stats.write();
305 stats
306 .entry(pattern.clone())
307 .or_default()
308 .subscribers
309 .insert(id);
310
311 Subscription {
312 id,
313 pattern,
314 subscriber,
315 receiver: self.sender.subscribe(),
316 broker: Arc::clone(self),
317 }
318 }
319
320 pub fn unsubscribe(&self, subscription_id: u64) {
322 let info = self.subscriptions.write().remove(&subscription_id);
323
324 if let Some(info) = info {
325 let mut stats = self.topic_stats.write();
326 if let Some(topic_stats) = stats.get_mut(&info.pattern) {
327 topic_stats.subscribers.remove(&subscription_id);
328 }
329 }
330 }
331
332 pub fn publish(
334 &self,
335 topic: Topic,
336 publisher: KernelId,
337 envelope: MessageEnvelope,
338 timestamp: HlcTimestamp,
339 ) -> Result<u64> {
340 let sequence = self.sequence.fetch_add(1, Ordering::Relaxed);
341
342 let mut publication = Publication::new(topic.clone(), publisher, envelope, timestamp);
343 publication.sequence = sequence;
344
345 {
347 let mut stats = self.topic_stats.write();
348 let topic_stats = stats.entry(topic.clone()).or_default();
349 topic_stats.messages_published += 1;
350 }
351
352 if publication.retained {
354 let mut retained = self.retained.write();
355 let retained_list = retained.entry(topic).or_default();
356 retained_list.push(publication.clone());
357
358 if retained_list.len() > self.config.max_retained_messages {
360 retained_list.remove(0);
361 }
362 }
363
364 let _ = self.sender.send(publication);
367
368 Ok(sequence)
369 }
370
371 pub fn publish_qos(
373 &self,
374 topic: Topic,
375 publisher: KernelId,
376 envelope: MessageEnvelope,
377 timestamp: HlcTimestamp,
378 qos: QoS,
379 ) -> Result<u64> {
380 let sequence = self.sequence.fetch_add(1, Ordering::Relaxed);
381
382 let mut publication = Publication::new(topic.clone(), publisher, envelope, timestamp);
383 publication.sequence = sequence;
384 publication.qos = qos;
385
386 {
388 let mut stats = self.topic_stats.write();
389 let topic_stats = stats.entry(topic).or_default();
390 topic_stats.messages_published += 1;
391 }
392
393 let _ = self.sender.send(publication);
394 Ok(sequence)
395 }
396
397 pub fn publish_retained(
399 &self,
400 topic: Topic,
401 publisher: KernelId,
402 envelope: MessageEnvelope,
403 timestamp: HlcTimestamp,
404 ) -> Result<u64> {
405 let sequence = self.sequence.fetch_add(1, Ordering::Relaxed);
406
407 let mut publication = Publication::new(topic.clone(), publisher, envelope, timestamp);
408 publication.sequence = sequence;
409 publication.retained = true;
410
411 {
413 let mut retained = self.retained.write();
414 let retained_list = retained.entry(topic.clone()).or_default();
415 retained_list.push(publication.clone());
416
417 if retained_list.len() > self.config.max_retained_messages {
418 retained_list.remove(0);
419 }
420 }
421
422 {
424 let mut stats = self.topic_stats.write();
425 let topic_stats = stats.entry(topic).or_default();
426 topic_stats.messages_published += 1;
427 }
428
429 let _ = self.sender.send(publication);
430 Ok(sequence)
431 }
432
433 pub fn get_retained(&self, topic: &Topic) -> Vec<Publication> {
435 self.retained.read().get(topic).cloned().unwrap_or_default()
436 }
437
438 pub fn clear_retained(&self, topic: &Topic) {
440 self.retained.write().remove(topic);
441 }
442
443 pub fn topic_info(&self, topic: &Topic) -> Option<TopicInfo> {
445 let stats = self.topic_stats.read();
446 let topic_stats = stats.get(topic)?;
447
448 let retained_count = self
449 .retained
450 .read()
451 .get(topic)
452 .map(|v| v.len())
453 .unwrap_or(0);
454
455 Some(TopicInfo {
456 topic: topic.clone(),
457 subscriber_count: topic_stats.subscribers.len(),
458 messages_published: topic_stats.messages_published,
459 retained_count,
460 })
461 }
462
463 pub fn list_topics(&self) -> Vec<Topic> {
465 self.topic_stats
466 .read()
467 .iter()
468 .filter(|(_, stats)| !stats.subscribers.is_empty())
469 .map(|(topic, _)| topic.clone())
470 .collect()
471 }
472
473 pub fn stats(&self) -> PubSubStats {
475 let stats = self.topic_stats.read();
476 let total_subscribers: usize = stats.values().map(|s| s.subscribers.len()).sum();
477 let total_messages: u64 = stats.values().map(|s| s.messages_published).sum();
478 let retained_count: usize = self.retained.read().values().map(|v| v.len()).sum();
479
480 PubSubStats {
481 topic_count: stats.len(),
482 total_subscribers,
483 total_messages_published: total_messages,
484 retained_message_count: retained_count,
485 }
486 }
487}
488
489#[derive(Debug, Clone, Default)]
491pub struct PubSubStats {
492 pub topic_count: usize,
494 pub total_subscribers: usize,
496 pub total_messages_published: u64,
498 pub retained_message_count: usize,
500}
501
502pub struct PubSubBuilder {
504 config: PubSubConfig,
505}
506
507impl PubSubBuilder {
508 pub fn new() -> Self {
510 Self {
511 config: PubSubConfig::default(),
512 }
513 }
514
515 pub fn max_subscribers_per_topic(mut self, count: usize) -> Self {
517 self.config.max_subscribers_per_topic = count;
518 self
519 }
520
521 pub fn channel_buffer_size(mut self, size: usize) -> Self {
523 self.config.channel_buffer_size = size;
524 self
525 }
526
527 pub fn max_retained_messages(mut self, count: usize) -> Self {
529 self.config.max_retained_messages = count;
530 self
531 }
532
533 pub fn enable_persistence(mut self, enable: bool) -> Self {
535 self.config.enable_persistence = enable;
536 self
537 }
538
539 pub fn default_qos(mut self, qos: QoS) -> Self {
541 self.config.default_qos = qos;
542 self
543 }
544
545 pub fn build(self) -> Arc<PubSubBroker> {
547 PubSubBroker::new(self.config)
548 }
549}
550
551impl Default for PubSubBuilder {
552 fn default() -> Self {
553 Self::new()
554 }
555}
556
557#[cfg(test)]
558mod tests {
559 use super::*;
560
561 #[test]
562 fn test_topic_matching() {
563 let pattern = Topic::new("sensors/*/temperature");
564 let topic1 = Topic::new("sensors/kitchen/temperature");
565 let topic2 = Topic::new("sensors/living_room/temperature");
566 let topic3 = Topic::new("sensors/kitchen/humidity");
567
568 assert!(pattern.matches(&topic1));
569 assert!(pattern.matches(&topic2));
570 assert!(!pattern.matches(&topic3));
571 }
572
573 #[test]
574 fn test_topic_wildcard_hash() {
575 let pattern = Topic::new("sensors/#");
576 let topic1 = Topic::new("sensors/kitchen/temperature");
577 let topic2 = Topic::new("sensors/a/b/c/d");
578
579 assert!(pattern.matches(&topic1));
580 assert!(pattern.matches(&topic2));
581 }
582
583 #[test]
584 fn test_topic_exact_match() {
585 let pattern = Topic::new("sensors/kitchen/temperature");
586 let topic1 = Topic::new("sensors/kitchen/temperature");
587 let topic2 = Topic::new("sensors/kitchen/humidity");
588
589 assert!(pattern.matches(&topic1));
590 assert!(!pattern.matches(&topic2));
591 }
592
593 #[tokio::test]
594 async fn test_pubsub_broker() {
595 let broker = PubSubBuilder::new().build();
596
597 let publisher = KernelId::new("publisher");
598 let subscriber = KernelId::new("subscriber");
599 let topic = Topic::new("test/topic");
600
601 let mut subscription = broker.subscribe(subscriber, topic.clone());
602
603 let envelope = MessageEnvelope::empty(1, 2, HlcTimestamp::now(1));
605 let timestamp = HlcTimestamp::now(1);
606
607 broker
608 .publish(topic.clone(), publisher.clone(), envelope, timestamp)
609 .unwrap();
610
611 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
613
614 let received = subscription.try_receive();
616 assert!(received.is_some());
617 assert_eq!(received.unwrap().publisher, publisher);
618 }
619
620 #[test]
621 fn test_pubsub_stats() {
622 let broker = PubSubBuilder::new().build();
623
624 let topic = Topic::new("test");
625 let kernel = KernelId::new("kernel");
626
627 let _sub = broker.subscribe(kernel.clone(), topic.clone());
628
629 let stats = broker.stats();
630 assert_eq!(stats.topic_count, 1);
631 assert_eq!(stats.total_subscribers, 1);
632 }
633}