1use std::{
37 any::{Any, TypeId},
38 collections::{HashMap, HashSet, VecDeque},
39 hash::Hash,
40 sync::{Arc, Mutex},
41};
42
43use tokio::sync::broadcast;
44
45use super::{Identifier, IdentifierFilter};
46use crate::runtime::process::Id;
47
48const DEFAULT_CHANNEL_CAPACITY: usize = 16;
49
50tokio::task_local! {
51 pub(crate) static CURRENT_DATASPACE: DataspaceRegistry;
52}
53
54#[derive(Clone, Debug, PartialEq, Eq)]
56pub enum AssertionUpdate<T> {
57 Asserted(Identifier, T),
59
60 Retracted(Identifier),
62}
63
64#[derive(Clone, PartialEq, Eq, Hash)]
66struct StorageKey {
67 type_id: TypeId,
68 identifier: Identifier,
69}
70
71impl StorageKey {
72 fn new<T: 'static>(identifier: Identifier) -> Self {
73 Self {
74 type_id: TypeId::of::<T>(),
75 identifier,
76 }
77 }
78}
79
80trait AnyChannel: Send + Sync {
82 fn send_retraction(&self, id: &Identifier);
84
85 fn as_any(&self) -> &dyn Any;
87}
88
89struct TypedChannel<T: Clone + Send + Sync + 'static> {
91 tx: broadcast::Sender<AssertionUpdate<T>>,
92}
93
94impl<T: Clone + Send + Sync + 'static> AnyChannel for TypedChannel<T> {
95 fn send_retraction(&self, id: &Identifier) {
96 let _ = self.tx.send(AssertionUpdate::Retracted(id.clone()));
97 }
98
99 fn as_any(&self) -> &dyn Any {
100 self
101 }
102}
103
104struct FilteredChannel {
106 type_id: TypeId,
107 filter: IdentifierFilter,
108 sender: Box<dyn AnyChannel>,
109}
110
111struct StoredValue {
113 value: Box<dyn Any + Send + Sync>,
114 owner: Id,
115}
116
117struct RegistryState {
119 channels: HashMap<StorageKey, Box<dyn AnyChannel>>,
121
122 filtered_channels: Vec<FilteredChannel>,
124
125 current_values: HashMap<StorageKey, StoredValue>,
129
130 process_assertions: HashMap<Id, HashSet<StorageKey>>,
132
133 channel_capacity: usize,
135}
136
137impl RegistryState {
138 fn new(channel_capacity: usize) -> Self {
139 Self {
140 channels: HashMap::new(),
141 filtered_channels: Vec::new(),
142 current_values: HashMap::new(),
143 process_assertions: HashMap::new(),
144 channel_capacity,
145 }
146 }
147
148 fn get_or_create_exact_sender<T>(&mut self, key: StorageKey) -> broadcast::Receiver<AssertionUpdate<T>>
150 where
151 T: Clone + Send + Sync + 'static,
152 {
153 let channel = self.channels.entry(key).or_insert_with(|| {
154 let (tx, _) = broadcast::channel::<AssertionUpdate<T>>(self.channel_capacity);
155 Box::new(TypedChannel { tx })
156 });
157
158 let typed = channel
159 .as_any()
160 .downcast_ref::<TypedChannel<T>>()
161 .unwrap_or_else(|| unreachable!("type mismatch in dataspace registry"));
164
165 typed.tx.subscribe()
166 }
167
168 fn create_filtered_sender<T>(&mut self, filter: IdentifierFilter) -> broadcast::Receiver<AssertionUpdate<T>>
170 where
171 T: Clone + Send + Sync + 'static,
172 {
173 let (tx, rx) = broadcast::channel::<AssertionUpdate<T>>(self.channel_capacity);
174
175 self.filtered_channels.push(FilteredChannel {
176 type_id: TypeId::of::<T>(),
177 filter,
178 sender: Box::new(TypedChannel { tx }),
179 });
180
181 rx
182 }
183
184 fn notify_retraction(&self, key: &StorageKey) {
186 if let Some(ch) = self.channels.get(key) {
187 ch.send_retraction(&key.identifier);
188 }
189
190 for filtered in &self.filtered_channels {
191 if filtered.type_id == key.type_id && filtered.filter.matches(&key.identifier) {
192 filtered.sender.send_retraction(&key.identifier);
193 }
194 }
195 }
196}
197
198struct DataspaceRegistryInner {
200 state: Mutex<RegistryState>,
201}
202
203#[derive(Clone)]
213pub struct DataspaceRegistry {
214 inner: Arc<DataspaceRegistryInner>,
215}
216
217impl Default for DataspaceRegistry {
218 fn default() -> Self {
219 Self::new()
220 }
221}
222
223impl DataspaceRegistry {
224 pub fn new() -> Self {
226 Self::with_channel_capacity(DEFAULT_CHANNEL_CAPACITY)
227 }
228
229 pub fn try_current() -> Option<Self> {
231 CURRENT_DATASPACE.try_with(|ds| ds.clone()).ok()
232 }
233
234 pub fn with_channel_capacity(capacity: usize) -> Self {
236 Self {
237 inner: Arc::new(DataspaceRegistryInner {
238 state: Mutex::new(RegistryState::new(capacity)),
239 }),
240 }
241 }
242
243 pub fn assert<T>(&self, value: T, id: impl Into<Identifier>)
249 where
250 T: Clone + Send + Sync + 'static,
251 {
252 let id = id.into();
253 let key = StorageKey::new::<T>(id.clone());
254 let caller = Id::current();
255 let mut state = self.inner.state.lock().unwrap();
256
257 if let Some(existing) = state.current_values.get(&key) {
259 debug_assert_eq!(
260 existing.owner, caller,
261 "process {caller:?} attempted to update assertion owned by {:?}",
262 existing.owner
263 );
264 if existing.owner != caller {
265 return;
266 }
267 }
268
269 state.current_values.insert(
271 key.clone(),
272 StoredValue {
273 value: Box::new(value.clone()),
274 owner: caller,
275 },
276 );
277
278 state.process_assertions.entry(caller).or_default().insert(key.clone());
280
281 if let Some(ch) = state.channels.get(&key) {
283 if let Some(typed) = ch.as_any().downcast_ref::<TypedChannel<T>>() {
284 let _ = typed.tx.send(AssertionUpdate::Asserted(id.clone(), value.clone()));
285 }
286 }
287
288 let type_id = TypeId::of::<T>();
290 for channel in &state.filtered_channels {
291 if channel.type_id == type_id && channel.filter.matches(&id) {
292 if let Some(typed) = channel.sender.as_any().downcast_ref::<TypedChannel<T>>() {
293 let _ = typed.tx.send(AssertionUpdate::Asserted(id.clone(), value.clone()));
294 }
295 }
296 }
297 }
298
299 pub fn retract<T>(&self, id: impl Into<Identifier>)
303 where
304 T: Clone + Send + Sync + 'static,
305 {
306 let id = id.into();
307 let key = StorageKey::new::<T>(id.clone());
308 let caller = Id::current();
309 let mut state = self.inner.state.lock().unwrap();
310
311 let Some(stored) = state.current_values.get(&key) else {
313 return;
314 };
315
316 debug_assert_eq!(
317 stored.owner, caller,
318 "process {caller:?} attempted to retract assertion owned by {:?}",
319 stored.owner
320 );
321 if stored.owner != caller {
322 return;
323 }
324
325 state.current_values.remove(&key);
327
328 if let Some(keys) = state.process_assertions.get_mut(&caller) {
329 keys.remove(&key);
330 if keys.is_empty() {
331 state.process_assertions.remove(&caller);
332 }
333 }
334
335 state.notify_retraction(&key);
336 }
337
338 pub(crate) fn retract_all_for_process(&self, process_id: Id) {
343 let mut state = self.inner.state.lock().unwrap();
344
345 let Some(keys) = state.process_assertions.remove(&process_id) else {
346 return;
347 };
348
349 for key in keys {
350 state.current_values.remove(&key);
351 state.notify_retraction(&key);
352 }
353 }
354
355 pub fn subscribe<T>(&self, filter: IdentifierFilter) -> Subscription<T>
360 where
361 T: Clone + Send + Sync + 'static,
362 {
363 let mut state = self.inner.state.lock().unwrap();
364
365 match filter {
366 IdentifierFilter::Exact(ref id) => {
367 let key = StorageKey::new::<T>(id.clone());
368 let rx = state.get_or_create_exact_sender::<T>(key.clone());
369
370 let pending: VecDeque<_> = state
372 .current_values
373 .get(&key)
374 .and_then(|stored| stored.value.downcast_ref::<T>())
375 .map(|value| AssertionUpdate::Asserted(id.clone(), value.clone()))
376 .into_iter()
377 .collect();
378
379 Subscription { pending, rx }
380 }
381 filter @ (IdentifierFilter::All | IdentifierFilter::Prefix(_)) => {
382 let rx = state.create_filtered_sender::<T>(filter.clone());
383
384 let type_id = TypeId::of::<T>();
386 let pending: VecDeque<_> = state
387 .current_values
388 .iter()
389 .filter(|(key, _)| key.type_id == type_id && filter.matches(&key.identifier))
390 .filter_map(|(key, stored)| {
391 stored
392 .value
393 .downcast_ref::<T>()
394 .map(|value| AssertionUpdate::Asserted(key.identifier.clone(), value.clone()))
395 })
396 .collect();
397
398 Subscription { pending, rx }
399 }
400 }
401 }
402}
403
404pub struct Subscription<T> {
406 pending: VecDeque<AssertionUpdate<T>>,
407 rx: broadcast::Receiver<AssertionUpdate<T>>,
408}
409
410impl<T> Subscription<T>
411where
412 T: Clone + Send + Sync + 'static,
413{
414 pub async fn recv(&mut self) -> Option<AssertionUpdate<T>> {
420 if let Some(value) = self.pending.pop_front() {
423 return Some(value);
424 }
425
426 loop {
427 match self.rx.recv().await {
428 Ok(value) => return Some(value),
429 Err(broadcast::error::RecvError::Lagged(_)) => continue,
430 Err(broadcast::error::RecvError::Closed) => return None,
431 }
432 }
433 }
434}
435
436#[cfg(test)]
437mod tests {
438 use std::future::pending;
439
440 use tokio_test::{assert_pending, assert_ready, assert_ready_eq, task::spawn as test_spawn};
441
442 use super::*;
443 use crate::runtime::process::Process;
444
445 #[test]
446 fn subscribe_then_assert() {
447 let registry = DataspaceRegistry::new();
448 let id = Identifier::numeric(1);
449
450 let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
451 registry.assert(42u32, id.clone());
452
453 let mut recv = test_spawn(sub.recv());
454 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id, 42)));
455 }
456
457 #[test]
458 fn multiple_subscribers_receive_same_assertion() {
459 let registry = DataspaceRegistry::new();
460 let id = Identifier::numeric(1);
461
462 let mut sub1 = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
463 let mut sub2 = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
464
465 registry.assert(42u32, id.clone());
466
467 let mut recv1 = test_spawn(sub1.recv());
468 assert_ready_eq!(recv1.poll(), Some(AssertionUpdate::Asserted(id.clone(), 42)));
469
470 let mut recv2 = test_spawn(sub2.recv());
471 assert_ready_eq!(recv2.poll(), Some(AssertionUpdate::Asserted(id, 42)));
472 }
473
474 #[test]
475 fn assert_without_subscribers_stores_value() {
476 let registry = DataspaceRegistry::new();
477 let id = Identifier::numeric(1);
478
479 registry.assert(42u32, id.clone());
481
482 let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
484 let mut recv = test_spawn(sub.recv());
485 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id, 42)));
486 }
487
488 #[test]
489 fn different_types_same_identifier() {
490 let registry = DataspaceRegistry::new();
491 let id = Identifier::numeric(1);
492
493 let mut sub_u32 = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
494 let mut sub_string = registry.subscribe::<String>(IdentifierFilter::exact(id.clone()));
495
496 registry.assert(42u32, id.clone());
497 registry.assert("hello".to_string(), id.clone());
498
499 let mut recv_u32 = test_spawn(sub_u32.recv());
500 assert_ready_eq!(recv_u32.poll(), Some(AssertionUpdate::Asserted(id.clone(), 42)));
501
502 let mut recv_string = test_spawn(sub_string.recv());
503 assert_ready_eq!(
504 recv_string.poll(),
505 Some(AssertionUpdate::Asserted(id, "hello".to_string()))
506 );
507 }
508
509 #[test]
510 fn process_identifier() {
511 let registry = DataspaceRegistry::new();
512 let pid = crate::runtime::process::Id::new();
513 let id = Identifier::from(pid);
514
515 let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
516 registry.assert(42u32, id.clone());
517
518 let mut recv = test_spawn(sub.recv());
519 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id, 42)));
520 }
521
522 #[test]
523 fn channel_closed_returns_none() {
524 let registry = DataspaceRegistry::new();
525 let id = Identifier::numeric(1);
526
527 let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id));
528
529 drop(registry);
531
532 let mut recv = test_spawn(sub.recv());
533 assert_ready_eq!(recv.poll(), None);
534 }
535
536 #[test]
537 fn lagged_subscriber_recovers() {
538 let registry = DataspaceRegistry::with_channel_capacity(2);
540 let id = Identifier::numeric(1);
541
542 let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
543
544 for i in 0..10 {
546 registry.assert(i as u32, id.clone());
547 }
548
549 let mut recv = test_spawn(sub.recv());
551 let value = assert_ready!(recv.poll());
552 assert!(value.is_some());
553 }
554
555 #[test]
556 fn multiple_values_received_in_order() {
557 let registry = DataspaceRegistry::new();
558 let id = Identifier::numeric(1);
559
560 let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
561
562 registry.assert(1u32, id.clone());
563 registry.assert(2u32, id.clone());
564 registry.assert(3u32, id.clone());
565
566 let mut recv = test_spawn(sub.recv());
567 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id.clone(), 1)));
568 drop(recv);
569
570 let mut recv = test_spawn(sub.recv());
571 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id.clone(), 2)));
572 drop(recv);
573
574 let mut recv = test_spawn(sub.recv());
575 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id, 3)));
576 }
577
578 #[test]
579 fn assert_then_retract() {
580 let registry = DataspaceRegistry::new();
581 let id = Identifier::numeric(1);
582
583 let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
584
585 registry.assert(42u32, id.clone());
586 registry.retract::<u32>(id.clone());
587
588 let mut recv = test_spawn(sub.recv());
589 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id.clone(), 42)));
590 drop(recv);
591
592 let mut recv = test_spawn(sub.recv());
593 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Retracted(id)));
594 }
595
596 #[test]
597 fn retract_without_subscribers_succeeds() {
598 let registry = DataspaceRegistry::new();
599 let id = Identifier::numeric(1);
600
601 registry.retract::<u32>(id);
603 }
604
605 #[test]
606 fn multiple_subscribers_receive_retraction() {
607 let registry = DataspaceRegistry::new();
608 let id = Identifier::numeric(1);
609
610 let mut sub1 = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
611 let mut sub2 = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
612
613 registry.assert(42u32, id.clone());
614 registry.retract::<u32>(id.clone());
615
616 let mut recv1 = test_spawn(sub1.recv());
618 assert_ready_eq!(recv1.poll(), Some(AssertionUpdate::Asserted(id.clone(), 42)));
619 drop(recv1);
620
621 let mut recv2 = test_spawn(sub2.recv());
622 assert_ready_eq!(recv2.poll(), Some(AssertionUpdate::Asserted(id.clone(), 42)));
623 drop(recv2);
624
625 let mut recv1 = test_spawn(sub1.recv());
627 assert_ready_eq!(recv1.poll(), Some(AssertionUpdate::Retracted(id.clone())));
628
629 let mut recv2 = test_spawn(sub2.recv());
630 assert_ready_eq!(recv2.poll(), Some(AssertionUpdate::Retracted(id)));
631 }
632
633 #[test]
634 fn assert_retract_assert_sequence() {
635 let registry = DataspaceRegistry::new();
636 let id = Identifier::numeric(1);
637
638 let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
639
640 registry.assert(1u32, id.clone());
641 registry.retract::<u32>(id.clone());
642 registry.assert(2u32, id.clone());
643
644 let mut recv = test_spawn(sub.recv());
645 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id.clone(), 1)));
646 drop(recv);
647
648 let mut recv = test_spawn(sub.recv());
649 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Retracted(id.clone())));
650 drop(recv);
651
652 let mut recv = test_spawn(sub.recv());
653 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id, 2)));
654 }
655
656 #[test]
657 fn all_filter_receives_from_multiple_identifiers() {
658 let registry = DataspaceRegistry::new();
659 let id1 = Identifier::numeric(1);
660 let id2 = Identifier::numeric(2);
661
662 let mut sub = registry.subscribe::<u32>(IdentifierFilter::all());
663
664 registry.assert(1u32, id1.clone());
665 registry.assert(2u32, id2.clone());
666
667 let mut recv = test_spawn(sub.recv());
668 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id1, 1)));
669 drop(recv);
670
671 let mut recv = test_spawn(sub.recv());
672 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id2, 2)));
673 }
674
675 #[test]
676 fn all_filter_receives_retraction() {
677 let registry = DataspaceRegistry::new();
678 let id = Identifier::numeric(1);
679
680 let mut sub = registry.subscribe::<u32>(IdentifierFilter::all());
681
682 registry.assert(42u32, id.clone());
683 registry.retract::<u32>(id.clone());
684
685 let mut recv = test_spawn(sub.recv());
686 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id.clone(), 42)));
687 drop(recv);
688
689 let mut recv = test_spawn(sub.recv());
690 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Retracted(id)));
691 }
692
693 #[test]
694 fn all_filter_and_exact_both_receive() {
695 let registry = DataspaceRegistry::new();
696 let id = Identifier::numeric(1);
697
698 let mut specific = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
699 let mut all = registry.subscribe::<u32>(IdentifierFilter::all());
700
701 registry.assert(42u32, id.clone());
702
703 let mut recv_specific = test_spawn(specific.recv());
704 assert_ready_eq!(recv_specific.poll(), Some(AssertionUpdate::Asserted(id.clone(), 42)));
705
706 let mut recv_all = test_spawn(all.recv());
707 assert_ready_eq!(recv_all.poll(), Some(AssertionUpdate::Asserted(id, 42)));
708 }
709
710 #[test]
711 fn all_filter_created_before_exact_channels() {
712 let registry = DataspaceRegistry::new();
713
714 let mut all = registry.subscribe::<u32>(IdentifierFilter::all());
716
717 let id = Identifier::numeric(1);
718 registry.assert(42u32, id.clone());
719
720 let mut recv = test_spawn(all.recv());
721 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id, 42)));
722 }
723
724 #[test]
725 fn subscribe_receives_current_value() {
726 let registry = DataspaceRegistry::new();
727 let id = Identifier::numeric(1);
728
729 registry.assert(42u32, id.clone());
731
732 let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
734 let mut recv = test_spawn(sub.recv());
735 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id, 42)));
736 }
737
738 #[test]
739 fn subscribe_after_retract_gets_nothing_pending() {
740 let registry = DataspaceRegistry::new();
741 let id = Identifier::numeric(1);
742
743 registry.assert(42u32, id.clone());
745 registry.retract::<u32>(id.clone());
746
747 let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id));
748
749 drop(registry);
751
752 let mut recv = test_spawn(sub.recv());
753 assert_ready_eq!(recv.poll(), None);
754 }
755
756 #[test]
757 fn all_filter_receives_current_values() {
758 let registry = DataspaceRegistry::new();
759 let id1 = Identifier::numeric(1);
760 let id2 = Identifier::numeric(2);
761
762 registry.assert(1u32, id1.clone());
764 registry.assert(2u32, id2.clone());
765
766 let mut sub = registry.subscribe::<u32>(IdentifierFilter::all());
767
768 let mut recv = test_spawn(sub.recv());
770 let v1 = assert_ready!(recv.poll());
771 drop(recv);
772
773 let mut recv = test_spawn(sub.recv());
774 let v2 = assert_ready!(recv.poll());
775
776 let mut received = [v1.unwrap(), v2.unwrap()];
777 received.sort_by_key(|update| match update {
778 AssertionUpdate::Asserted(_, v) => *v,
779 AssertionUpdate::Retracted(_) => 0,
780 });
781
782 assert_eq!(received[0], AssertionUpdate::Asserted(id1, 1));
783 assert_eq!(received[1], AssertionUpdate::Asserted(id2, 2));
784 }
785
786 #[test]
787 fn subscribe_receives_current_then_future() {
788 let registry = DataspaceRegistry::new();
789 let id = Identifier::numeric(1);
790
791 registry.assert(1u32, id.clone());
793
794 let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
795
796 registry.assert(2u32, id.clone());
798
799 let mut recv = test_spawn(sub.recv());
801 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id.clone(), 1)));
802 drop(recv);
803
804 let mut recv = test_spawn(sub.recv());
805 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id, 2)));
806 }
807
808 #[test]
809 fn prefix_filter_matches_named_identifiers() {
810 let registry = DataspaceRegistry::new();
811 let id1 = Identifier::named("metrics.cpu");
812 let id2 = Identifier::named("metrics.mem");
813 let id3 = Identifier::named("logs.error");
814
815 let mut sub = registry.subscribe::<u32>(IdentifierFilter::prefix("metrics."));
816
817 registry.assert(1u32, id1.clone());
818 registry.assert(2u32, id2.clone());
819 registry.assert(3u32, id3);
820
821 let mut recv = test_spawn(sub.recv());
823 let v1 = assert_ready!(recv.poll());
824 drop(recv);
825
826 let mut recv = test_spawn(sub.recv());
827 let v2 = assert_ready!(recv.poll());
828
829 let mut received = [v1.unwrap(), v2.unwrap()];
830 received.sort_by_key(|update| match update {
831 AssertionUpdate::Asserted(_, v) => *v,
832 AssertionUpdate::Retracted(_) => 0,
833 });
834
835 assert_eq!(received[0], AssertionUpdate::Asserted(id1, 1));
836 assert_eq!(received[1], AssertionUpdate::Asserted(id2, 2));
837 }
838
839 #[test]
840 fn prefix_filter_does_not_match_numeric() {
841 let registry = DataspaceRegistry::new();
842 let id = Identifier::numeric(42);
843
844 let mut sub = registry.subscribe::<u32>(IdentifierFilter::prefix("any"));
845
846 registry.assert(1u32, id);
847
848 drop(registry);
850
851 let mut recv = test_spawn(sub.recv());
852 assert_ready_eq!(recv.poll(), None);
853 }
854
855 #[test]
856 fn prefix_filter_replays_matching_current_values() {
857 let registry = DataspaceRegistry::new();
858 let id1 = Identifier::named("svc.alpha");
859 let id2 = Identifier::named("svc.beta");
860 let id3 = Identifier::named("other.gamma");
861
862 registry.assert(1u32, id1.clone());
864 registry.assert(2u32, id2.clone());
865 registry.assert(3u32, id3);
866
867 let mut sub = registry.subscribe::<u32>(IdentifierFilter::prefix("svc."));
868
869 let mut recv = test_spawn(sub.recv());
871 let v1 = assert_ready!(recv.poll());
872 drop(recv);
873
874 let mut recv = test_spawn(sub.recv());
875 let v2 = assert_ready!(recv.poll());
876
877 let mut received = [v1.unwrap(), v2.unwrap()];
878 received.sort_by_key(|update| match update {
879 AssertionUpdate::Asserted(_, v) => *v,
880 AssertionUpdate::Retracted(_) => 0,
881 });
882
883 assert_eq!(received[0], AssertionUpdate::Asserted(id1, 1));
884 assert_eq!(received[1], AssertionUpdate::Asserted(id2, 2));
885 }
886
887 #[test]
888 fn try_current_returns_none_outside_context() {
889 assert!(DataspaceRegistry::try_current().is_none());
890 }
891
892 #[test]
893 fn current_and_try_current_work_within_scope() {
894 let registry = DataspaceRegistry::new();
895 let registry_clone = registry.clone();
896
897 let mut scope_fut = test_spawn(CURRENT_DATASPACE.scope(registry, async {
898 let current = DataspaceRegistry::try_current();
899 assert!(current.is_some());
900
901 let current = current.unwrap();
903 let id = Identifier::named("test");
904 current.assert(42u32, id.clone());
905
906 let mut sub = registry_clone.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
907 let value = sub.recv().await;
908 assert_eq!(value, Some(AssertionUpdate::Asserted(id, 42)));
909 }));
910 assert_ready!(scope_fut.poll());
911 }
912
913 #[test]
914 fn retract_all_for_process_retracts_all_owned_assertions() {
915 let registry = DataspaceRegistry::new();
916 let process_id = Id::new();
917 let id1 = Identifier::named("val1");
918 let id2 = Identifier::named("val2");
919
920 crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(process_id, || {
922 registry.assert(1u32, id1.clone());
923 registry.assert(2u32, id2.clone());
924 });
925
926 let mut sub1 = registry.subscribe::<u32>(IdentifierFilter::exact(id1.clone()));
928 let mut sub2 = registry.subscribe::<u32>(IdentifierFilter::exact(id2.clone()));
929
930 let mut recv = test_spawn(sub1.recv());
932 let _ = assert_ready!(recv.poll());
933 drop(recv);
934
935 let mut recv = test_spawn(sub2.recv());
936 let _ = assert_ready!(recv.poll());
937 drop(recv);
938
939 registry.retract_all_for_process(process_id);
941
942 let mut recv = test_spawn(sub1.recv());
944 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Retracted(id1)));
945 drop(recv);
946
947 let mut recv = test_spawn(sub2.recv());
948 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Retracted(id2)));
949 }
950
951 #[test]
952 fn retract_all_for_process_does_not_affect_other_processes() {
953 let registry = DataspaceRegistry::new();
954 let pid_a = Id::new();
955 let pid_b = Id::new();
956 let id_a = Identifier::named("a");
957 let id_b = Identifier::named("b");
958
959 crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_a, || {
960 registry.assert(1u32, id_a.clone());
961 });
962 crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_b, || {
963 registry.assert(2u32, id_b.clone());
964 });
965
966 registry.retract_all_for_process(pid_a);
968
969 let mut sub_b = registry.subscribe::<u32>(IdentifierFilter::exact(id_b.clone()));
971 let mut recv = test_spawn(sub_b.recv());
972 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id_b, 2)));
973 drop(recv);
974
975 let mut sub_a = registry.subscribe::<u32>(IdentifierFilter::exact(id_a));
977 drop(registry);
978 let mut recv = test_spawn(sub_a.recv());
979 assert_ready_eq!(recv.poll(), None);
980 }
981
982 #[test]
983 fn retract_all_for_process_notifies_filtered_subscribers() {
984 let registry = DataspaceRegistry::new();
985 let process_id = Id::new();
986 let id = Identifier::named("val");
987
988 let mut all_sub = registry.subscribe::<u32>(IdentifierFilter::all());
989
990 crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(process_id, || {
991 registry.assert(42u32, id.clone());
992 });
993
994 let mut recv = test_spawn(all_sub.recv());
996 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id.clone(), 42)));
997 drop(recv);
998
999 registry.retract_all_for_process(process_id);
1001
1002 let mut recv = test_spawn(all_sub.recv());
1004 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Retracted(id)));
1005 }
1006
1007 #[test]
1008 fn manual_retract_cleans_up_process_tracking() {
1009 let registry = DataspaceRegistry::new();
1010 let process_id = Id::new();
1011 let id = Identifier::named("val");
1012
1013 let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
1014
1015 crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(process_id, || {
1016 registry.assert(42u32, id.clone());
1017 });
1018
1019 crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(process_id, || {
1021 registry.retract::<u32>(id.clone());
1022 });
1023
1024 let mut recv = test_spawn(sub.recv());
1026 let _ = assert_ready!(recv.poll());
1027 drop(recv);
1028
1029 let mut recv = test_spawn(sub.recv());
1030 let _ = assert_ready!(recv.poll());
1031 drop(recv);
1032
1033 registry.retract_all_for_process(process_id);
1035
1036 drop(registry);
1038 let mut recv = test_spawn(sub.recv());
1039 assert_ready_eq!(recv.poll(), None);
1040 }
1041
1042 #[test]
1043 fn retract_all_for_unknown_process_is_noop() {
1044 let registry = DataspaceRegistry::new();
1045 registry.retract_all_for_process(Id::new());
1047 }
1048
1049 #[test]
1050 fn instrumented_process_retracts_on_normal_completion() {
1051 let registry = DataspaceRegistry::new();
1052 let process = Process::supervisor_with_dataspace("test_worker", None, Some(registry.clone())).unwrap();
1053 let id = "from_worker";
1054
1055 let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id));
1056
1057 let mut recv_fut = test_spawn(sub.recv());
1059 assert!(!recv_fut.is_woken());
1060 assert_pending!(recv_fut.poll());
1061
1062 let mut worker = test_spawn(process.into_process_future(async {
1064 DataspaceRegistry::try_current()
1065 .expect("dataspace registry should be available")
1066 .assert(42u32, "from_worker");
1067 }));
1068
1069 assert_ready!(worker.poll());
1071
1072 assert!(recv_fut.is_woken());
1074 assert_ready_eq!(recv_fut.poll(), Some(AssertionUpdate::Asserted(id.into(), 42)));
1075
1076 drop(recv_fut);
1077
1078 let mut recv_fut = test_spawn(sub.recv());
1080 assert_pending!(recv_fut.poll());
1081 assert!(!recv_fut.is_woken());
1082
1083 drop(worker);
1085
1086 assert!(recv_fut.is_woken());
1088 assert_ready_eq!(recv_fut.poll(), Some(AssertionUpdate::Retracted(id.into())));
1089 }
1090
1091 #[test]
1092 fn instrumented_process_retracts_on_drop_before_completion() {
1093 let registry = DataspaceRegistry::new();
1094 let process = Process::supervisor_with_dataspace("test_worker", None, Some(registry.clone())).unwrap();
1095 let id = "from_worker";
1096
1097 let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id));
1098
1099 let mut recv_fut = test_spawn(sub.recv());
1101 assert!(!recv_fut.is_woken());
1102 assert_pending!(recv_fut.poll());
1103
1104 let mut worker = test_spawn(process.into_process_future(async {
1106 DataspaceRegistry::try_current()
1107 .expect("dataspace registry should be available")
1108 .assert(42u32, "from_worker");
1109 pending::<()>().await;
1110 }));
1111
1112 assert_pending!(worker.poll());
1114
1115 assert!(recv_fut.is_woken());
1117 assert_ready_eq!(recv_fut.poll(), Some(AssertionUpdate::Asserted(id.into(), 42)));
1118
1119 drop(recv_fut);
1120
1121 let mut recv_fut = test_spawn(sub.recv());
1123 assert_pending!(recv_fut.poll());
1124 assert!(!recv_fut.is_woken());
1125
1126 drop(worker);
1128
1129 assert!(recv_fut.is_woken());
1131 assert_ready_eq!(recv_fut.poll(), Some(AssertionUpdate::Retracted(id.into())));
1132 }
1133
1134 #[test]
1135 fn instrumented_process_retracts_multiple_types_on_drop() {
1136 let registry = DataspaceRegistry::new();
1137 let process = Process::supervisor_with_dataspace("test_worker", None, Some(registry.clone())).unwrap();
1138 let id_num = "number";
1139 let id_str = "text";
1140
1141 let mut sub_u32 = registry.subscribe::<u32>(IdentifierFilter::exact(id_num));
1142 let mut sub_str = registry.subscribe::<String>(IdentifierFilter::exact(id_str));
1143
1144 let mut recv_u32_fut = test_spawn(sub_u32.recv());
1146 let mut recv_str_fut = test_spawn(sub_str.recv());
1147 assert!(!recv_u32_fut.is_woken());
1148 assert!(!recv_str_fut.is_woken());
1149 assert_pending!(recv_u32_fut.poll());
1150 assert_pending!(recv_str_fut.poll());
1151
1152 let mut worker = test_spawn(process.into_process_future(async {
1154 let ds = DataspaceRegistry::try_current().expect("dataspace registry should be available");
1155 ds.assert(42u32, id_num);
1156 ds.assert("hello".to_string(), id_str);
1157 pending::<()>().await;
1158 }));
1159
1160 assert_pending!(worker.poll());
1162
1163 assert!(recv_u32_fut.is_woken());
1165 assert!(recv_str_fut.is_woken());
1166 assert_ready_eq!(recv_u32_fut.poll(), Some(AssertionUpdate::Asserted(id_num.into(), 42)));
1167 assert_ready_eq!(
1168 recv_str_fut.poll(),
1169 Some(AssertionUpdate::Asserted(id_str.into(), "hello".to_string()))
1170 );
1171
1172 drop(recv_u32_fut);
1173 drop(recv_str_fut);
1174
1175 let mut recv_u32 = test_spawn(sub_u32.recv());
1177 let mut recv_str = test_spawn(sub_str.recv());
1178 assert_pending!(recv_u32.poll());
1179 assert_pending!(recv_str.poll());
1180 assert!(!recv_u32.is_woken());
1181 assert!(!recv_str.is_woken());
1182
1183 drop(worker);
1185
1186 assert!(recv_u32.is_woken());
1187 assert!(recv_str.is_woken());
1188 assert_ready_eq!(recv_u32.poll(), Some(AssertionUpdate::Retracted(id_num.into())));
1189 assert_ready_eq!(recv_str.poll(), Some(AssertionUpdate::Retracted(id_str.into())));
1190 }
1191
1192 #[test]
1193 #[cfg(not(debug_assertions))]
1194 fn retract_from_non_owner_is_ignored() {
1195 let registry = DataspaceRegistry::new();
1196 let pid_a = Id::new();
1197 let pid_b = Id::new();
1198 let id = Identifier::named("owned_by_a");
1199
1200 crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_a, || {
1202 registry.assert(42u32, id.clone());
1203 });
1204
1205 crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_b, || {
1207 registry.retract::<u32>(id.clone());
1208 });
1209
1210 let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
1212 let mut recv = test_spawn(sub.recv());
1213 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id.clone(), 42)));
1214 drop(recv);
1215
1216 crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_a, || {
1218 registry.retract::<u32>(id.clone());
1219 });
1220
1221 let mut recv = test_spawn(sub.recv());
1223 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Retracted(id)));
1224 }
1225
1226 #[test]
1227 #[cfg(debug_assertions)]
1228 #[should_panic(expected = "attempted to retract assertion owned by")]
1229 fn retract_from_non_owner_panics_in_debug() {
1230 let registry = DataspaceRegistry::new();
1231 let pid_a = Id::new();
1232 let pid_b = Id::new();
1233 let id = Identifier::named("owned_by_a");
1234
1235 crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_a, || {
1236 registry.assert(42u32, id.clone());
1237 });
1238
1239 crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_b, || {
1240 registry.retract::<u32>(id.clone());
1241 });
1242 }
1243
1244 #[test]
1245 #[cfg(not(debug_assertions))]
1246 fn reassert_from_non_owner_is_ignored() {
1247 let registry = DataspaceRegistry::new();
1248 let pid_a = Id::new();
1249 let pid_b = Id::new();
1250 let id = Identifier::named("owned_by_a");
1251
1252 crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_a, || {
1254 registry.assert(42u32, id.clone());
1255 });
1256
1257 crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_b, || {
1259 registry.assert(99u32, id.clone());
1260 });
1261
1262 let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
1264 let mut recv = test_spawn(sub.recv());
1265 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id.clone(), 42)));
1266 drop(recv);
1267
1268 crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_a, || {
1270 registry.assert(100u32, id.clone());
1271 });
1272
1273 let mut recv = test_spawn(sub.recv());
1275 assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id, 100)));
1276 }
1277
1278 #[test]
1279 #[cfg(debug_assertions)]
1280 #[should_panic(expected = "attempted to update assertion owned by")]
1281 fn reassert_from_non_owner_panics_in_debug() {
1282 let registry = DataspaceRegistry::new();
1283 let pid_a = Id::new();
1284 let pid_b = Id::new();
1285 let id = Identifier::named("owned_by_a");
1286
1287 crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_a, || {
1288 registry.assert(42u32, id.clone());
1289 });
1290
1291 crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_b, || {
1292 registry.assert(99u32, id.clone());
1293 });
1294 }
1295}