Skip to main content

saluki_core/runtime/state/
dataspace.rs

1//! A type-erased, async-aware dataspace registry for inter-process coordination.
2//!
3//! The [`DataspaceRegistry`] allows processes to assert and retract typed values by identifier, and subscribe to
4//! receive notifications of those assertions and retractions. Multiple subscribers can observe the same updates.
5//!
6//! - **Assertion**: a value of type `T` becomes available, associated with a given identifier.
7//! - **Retraction**: the value of type `T` associated with a given identifier is withdrawn.
8//!
9//! Subscribers can listen for updates matching a specific identifier, a prefix, or all identifiers for a given type.
10//!
11//! This enables decoupled coordination where processes don't need to know about each other, only the identifier and
12//! type of the values they're exchanging.
13//!
14//! # Example
15//!
16//! ```
17//! use saluki_core::runtime::state::{AssertionUpdate, Identifier, IdentifierFilter, DataspaceRegistry};
18//!
19//! # #[tokio::main]
20//! # async fn main() {
21//! let registry = DataspaceRegistry::new();
22//! let id = Identifier::named("my_value");
23//!
24//! // Subscribe before asserting:
25//! let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
26//!
27//! // Assert a value:
28//! registry.assert(42u32, id.clone());
29//!
30//! // Receive the assertion:
31//! let value = sub.recv().await;
32//! assert_eq!(value, Some(AssertionUpdate::Asserted(id, 42)));
33//! # }
34//! ```
35
36use std::{
37    any::{Any, TypeId},
38    collections::{HashMap, HashSet, VecDeque},
39    hash::Hash,
40    sync::{Arc, Mutex},
41};
42
43use tokio::sync::broadcast;
44
45use super::{Identifier, IdentifierFilter};
46use crate::runtime::process::Id;
47
48const DEFAULT_CHANNEL_CAPACITY: usize = 16;
49
50tokio::task_local! {
51    pub(crate) static CURRENT_DATASPACE: DataspaceRegistry;
52}
53
54/// An update received by a subscription, indicating that a value was asserted or retracted.
55#[derive(Clone, Debug, PartialEq, Eq)]
56pub enum AssertionUpdate<T> {
57    /// A value was asserted (made available), along with the identifier it is associated with.
58    Asserted(Identifier, T),
59
60    /// The value associated with the given identifier was retracted (withdrawn).
61    Retracted(Identifier),
62}
63
64/// Internal key combining type and identifier.
65#[derive(Clone, PartialEq, Eq, Hash)]
66struct StorageKey {
67    type_id: TypeId,
68    identifier: Identifier,
69}
70
71impl StorageKey {
72    fn new<T: 'static>(identifier: Identifier) -> Self {
73        Self {
74            type_id: TypeId::of::<T>(),
75            identifier,
76        }
77    }
78}
79
80/// A type-erased broadcast channel that supports sending retraction notifications without knowing `T`.
81trait AnyChannel: Send + Sync {
82    /// Sends a retraction for the given identifier.
83    fn send_retraction(&self, id: &Identifier);
84
85    /// Returns a downcasted reference to `self` that can be fallibly upcasted back to the original type.
86    fn as_any(&self) -> &dyn Any;
87}
88
89/// Concrete implementation of [`AnyChannel`] that wraps a `broadcast::Sender<AssertionUpdate<T>>`.
90struct TypedChannel<T: Clone + Send + Sync + 'static> {
91    tx: broadcast::Sender<AssertionUpdate<T>>,
92}
93
94impl<T: Clone + Send + Sync + 'static> AnyChannel for TypedChannel<T> {
95    fn send_retraction(&self, id: &Identifier) {
96        let _ = self.tx.send(AssertionUpdate::Retracted(id.clone()));
97    }
98
99    fn as_any(&self) -> &dyn Any {
100        self
101    }
102}
103
104/// A type-erased filtered subscription channel.
105struct FilteredChannel {
106    type_id: TypeId,
107    filter: IdentifierFilter,
108    sender: Box<dyn AnyChannel>,
109}
110
111/// A stored assertion value along with the process that owns it.
112struct StoredValue {
113    value: Box<dyn Any + Send + Sync>,
114    owner: Id,
115}
116
117/// Internal registry state protected by a mutex.
118struct RegistryState {
119    /// Broadcast senders for exact-match subscriptions, keyed by (type, identifier).
120    channels: HashMap<StorageKey, Box<dyn AnyChannel>>,
121
122    /// Filtered subscription channels that are evaluated on every assert/retract.
123    filtered_channels: Vec<FilteredChannel>,
124
125    /// Current assertion values for each (type, identifier) pair, stored type-erased.
126    ///
127    /// Entries are removed on retraction. Used to replay current state to new subscribers.
128    current_values: HashMap<StorageKey, StoredValue>,
129
130    /// Tracks which storage keys each process has asserted, for automatic retraction on process exit.
131    process_assertions: HashMap<Id, HashSet<StorageKey>>,
132
133    /// Default capacity for new broadcast channels.
134    channel_capacity: usize,
135}
136
137impl RegistryState {
138    fn new(channel_capacity: usize) -> Self {
139        Self {
140            channels: HashMap::new(),
141            filtered_channels: Vec::new(),
142            current_values: HashMap::new(),
143            process_assertions: HashMap::new(),
144            channel_capacity,
145        }
146    }
147
148    /// Gets or creates a broadcast sender for the given key, returning a new receiver.
149    fn get_or_create_exact_sender<T>(&mut self, key: StorageKey) -> broadcast::Receiver<AssertionUpdate<T>>
150    where
151        T: Clone + Send + Sync + 'static,
152    {
153        let channel = self.channels.entry(key).or_insert_with(|| {
154            let (tx, _) = broadcast::channel::<AssertionUpdate<T>>(self.channel_capacity);
155            Box::new(TypedChannel { tx })
156        });
157
158        let typed = channel
159            .as_any()
160            .downcast_ref::<TypedChannel<T>>()
161            // `StorageKey` includes `TypeId::of::<T>()`, so a channel stored under a
162            // given key is always a `TypedChannel<T>` for the same `T`.
163            .unwrap_or_else(|| unreachable!("type mismatch in dataspace registry"));
164
165        typed.tx.subscribe()
166    }
167
168    /// Creates a new filtered subscription channel, returning a new receiver.
169    fn create_filtered_sender<T>(&mut self, filter: IdentifierFilter) -> broadcast::Receiver<AssertionUpdate<T>>
170    where
171        T: Clone + Send + Sync + 'static,
172    {
173        let (tx, rx) = broadcast::channel::<AssertionUpdate<T>>(self.channel_capacity);
174
175        self.filtered_channels.push(FilteredChannel {
176            type_id: TypeId::of::<T>(),
177            filter,
178            sender: Box::new(TypedChannel { tx }),
179        });
180
181        rx
182    }
183
184    /// Sends a retraction notification on all channels (exact + filtered) matching the given key.
185    fn notify_retraction(&self, key: &StorageKey) {
186        if let Some(ch) = self.channels.get(key) {
187            ch.send_retraction(&key.identifier);
188        }
189
190        for filtered in &self.filtered_channels {
191            if filtered.type_id == key.type_id && filtered.filter.matches(&key.identifier) {
192                filtered.sender.send_retraction(&key.identifier);
193            }
194        }
195    }
196}
197
198/// Shared inner state of the registry.
199struct DataspaceRegistryInner {
200    state: Mutex<RegistryState>,
201}
202
203/// A dataspace registry for async coordination between processes.
204///
205/// The registry stores broadcast channels indexed by type and [`Identifier`]. Processes can subscribe to receive
206/// assertion and retraction updates for a given type and identifier filter, and other processes can assert or retract
207/// values that are delivered to all matching subscribers.
208///
209/// # Thread Safety
210///
211/// `DataspaceRegistry` is `Clone` and can be safely shared across threadVs and tasks. All operations are thread-safe.
212#[derive(Clone)]
213pub struct DataspaceRegistry {
214    inner: Arc<DataspaceRegistryInner>,
215}
216
217impl Default for DataspaceRegistry {
218    fn default() -> Self {
219        Self::new()
220    }
221}
222
223impl DataspaceRegistry {
224    /// Creates a new empty registry with the default channel capacity.
225    pub fn new() -> Self {
226        Self::with_channel_capacity(DEFAULT_CHANNEL_CAPACITY)
227    }
228
229    /// Returns the dataspace registry for the current supervision tree, if one exists.
230    pub fn try_current() -> Option<Self> {
231        CURRENT_DATASPACE.try_with(|ds| ds.clone()).ok()
232    }
233
234    /// Creates a new empty registry with the given channel capacity for broadcast channels.
235    pub fn with_channel_capacity(capacity: usize) -> Self {
236        Self {
237            inner: Arc::new(DataspaceRegistryInner {
238                state: Mutex::new(RegistryState::new(capacity)),
239            }),
240        }
241    }
242
243    /// Asserts a value with the given identifier, notifying all matching subscribers.
244    ///
245    /// The assertion is automatically associated with the current process, and will be automatically retracted when
246    /// that process exists. Only the owning process may update an existing assertion for a given type/identifier
247    /// combination.
248    pub fn assert<T>(&self, value: T, id: impl Into<Identifier>)
249    where
250        T: Clone + Send + Sync + 'static,
251    {
252        let id = id.into();
253        let key = StorageKey::new::<T>(id.clone());
254        let caller = Id::current();
255        let mut state = self.inner.state.lock().unwrap();
256
257        // If an assertion already exists for this key, only the owning process may update it.
258        if let Some(existing) = state.current_values.get(&key) {
259            debug_assert_eq!(
260                existing.owner, caller,
261                "process {caller:?} attempted to update assertion owned by {:?}",
262                existing.owner
263            );
264            if existing.owner != caller {
265                return;
266            }
267        }
268
269        // Store the current value for future subscribers, along with the owning process.
270        state.current_values.insert(
271            key.clone(),
272            StoredValue {
273                value: Box::new(value.clone()),
274                owner: caller,
275            },
276        );
277
278        // Track this assertion against the owning process.
279        state.process_assertions.entry(caller).or_default().insert(key.clone());
280
281        // Notify exact-match subscribers.
282        if let Some(ch) = state.channels.get(&key) {
283            if let Some(typed) = ch.as_any().downcast_ref::<TypedChannel<T>>() {
284                let _ = typed.tx.send(AssertionUpdate::Asserted(id.clone(), value.clone()));
285            }
286        }
287
288        // Notify filtered subscribers.
289        let type_id = TypeId::of::<T>();
290        for channel in &state.filtered_channels {
291            if channel.type_id == type_id && channel.filter.matches(&id) {
292                if let Some(typed) = channel.sender.as_any().downcast_ref::<TypedChannel<T>>() {
293                    let _ = typed.tx.send(AssertionUpdate::Asserted(id.clone(), value.clone()));
294                }
295            }
296        }
297    }
298
299    /// Retracts the value of the given type and identifier, notifying all matching subscribers.
300    ///
301    /// Only the process that originally asserted the value may retract it.
302    pub fn retract<T>(&self, id: impl Into<Identifier>)
303    where
304        T: Clone + Send + Sync + 'static,
305    {
306        let id = id.into();
307        let key = StorageKey::new::<T>(id.clone());
308        let caller = Id::current();
309        let mut state = self.inner.state.lock().unwrap();
310
311        // Check that the assertion exists and is owned by the calling process.
312        let Some(stored) = state.current_values.get(&key) else {
313            return;
314        };
315
316        debug_assert_eq!(
317            stored.owner, caller,
318            "process {caller:?} attempted to retract assertion owned by {:?}",
319            stored.owner
320        );
321        if stored.owner != caller {
322            return;
323        }
324
325        // Remove the stored value and clean up process tracking.
326        state.current_values.remove(&key);
327
328        if let Some(keys) = state.process_assertions.get_mut(&caller) {
329            keys.remove(&key);
330            if keys.is_empty() {
331                state.process_assertions.remove(&caller);
332            }
333        }
334
335        state.notify_retraction(&key);
336    }
337
338    /// Retracts all assertions made by the given process.
339    ///
340    /// This is called automatically when a process exits (via [`FutureProcess`] drop) to ensure that no stale
341    /// assertions remain in the registry after the owning process is gone.
342    pub(crate) fn retract_all_for_process(&self, process_id: Id) {
343        let mut state = self.inner.state.lock().unwrap();
344
345        let Some(keys) = state.process_assertions.remove(&process_id) else {
346            return;
347        };
348
349        for key in keys {
350            state.current_values.remove(&key);
351            state.notify_retraction(&key);
352        }
353    }
354
355    /// Subscribes to assertion and retraction updates matching the given filter.
356    ///
357    /// Returns a [`Subscription`] that can be used to asynchronously receive updates. Any assertions that match the
358    /// filter at the time of subscribing will be immediately yielded.
359    pub fn subscribe<T>(&self, filter: IdentifierFilter) -> Subscription<T>
360    where
361        T: Clone + Send + Sync + 'static,
362    {
363        let mut state = self.inner.state.lock().unwrap();
364
365        match filter {
366            IdentifierFilter::Exact(ref id) => {
367                let key = StorageKey::new::<T>(id.clone());
368                let rx = state.get_or_create_exact_sender::<T>(key.clone());
369
370                // Replay current value if one exists.
371                let pending: VecDeque<_> = state
372                    .current_values
373                    .get(&key)
374                    .and_then(|stored| stored.value.downcast_ref::<T>())
375                    .map(|value| AssertionUpdate::Asserted(id.clone(), value.clone()))
376                    .into_iter()
377                    .collect();
378
379                Subscription { pending, rx }
380            }
381            filter @ (IdentifierFilter::All | IdentifierFilter::Prefix(_)) => {
382                let rx = state.create_filtered_sender::<T>(filter.clone());
383
384                // Replay all matching current values.
385                let type_id = TypeId::of::<T>();
386                let pending: VecDeque<_> = state
387                    .current_values
388                    .iter()
389                    .filter(|(key, _)| key.type_id == type_id && filter.matches(&key.identifier))
390                    .filter_map(|(key, stored)| {
391                        stored
392                            .value
393                            .downcast_ref::<T>()
394                            .map(|value| AssertionUpdate::Asserted(key.identifier.clone(), value.clone()))
395                    })
396                    .collect();
397
398                Subscription { pending, rx }
399            }
400        }
401    }
402}
403
404/// A subscription to updates for a specific type/identifier combination.
405pub struct Subscription<T> {
406    pending: VecDeque<AssertionUpdate<T>>,
407    rx: broadcast::Receiver<AssertionUpdate<T>>,
408}
409
410impl<T> Subscription<T>
411where
412    T: Clone + Send + Sync + 'static,
413{
414    /// Receives the next assertion or retraction update.
415    ///
416    /// Returns `Some(update)` when an update is available, or `None` if the channel has been closed (all senders
417    /// dropped). If messages were missed due to the subscriber falling behind, the missed messages are skipped and the
418    /// next available update is returned.
419    pub async fn recv(&mut self) -> Option<AssertionUpdate<T>> {
420        // TODO: Switch to bounded MPSC channels for delivering assertion/retraction updates.
421
422        if let Some(value) = self.pending.pop_front() {
423            return Some(value);
424        }
425
426        loop {
427            match self.rx.recv().await {
428                Ok(value) => return Some(value),
429                Err(broadcast::error::RecvError::Lagged(_)) => continue,
430                Err(broadcast::error::RecvError::Closed) => return None,
431            }
432        }
433    }
434}
435
436#[cfg(test)]
437mod tests {
438    use std::future::pending;
439
440    use tokio_test::{assert_pending, assert_ready, assert_ready_eq, task::spawn as test_spawn};
441
442    use super::*;
443    use crate::runtime::process::Process;
444
445    #[test]
446    fn subscribe_then_assert() {
447        let registry = DataspaceRegistry::new();
448        let id = Identifier::numeric(1);
449
450        let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
451        registry.assert(42u32, id.clone());
452
453        let mut recv = test_spawn(sub.recv());
454        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id, 42)));
455    }
456
457    #[test]
458    fn multiple_subscribers_receive_same_assertion() {
459        let registry = DataspaceRegistry::new();
460        let id = Identifier::numeric(1);
461
462        let mut sub1 = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
463        let mut sub2 = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
464
465        registry.assert(42u32, id.clone());
466
467        let mut recv1 = test_spawn(sub1.recv());
468        assert_ready_eq!(recv1.poll(), Some(AssertionUpdate::Asserted(id.clone(), 42)));
469
470        let mut recv2 = test_spawn(sub2.recv());
471        assert_ready_eq!(recv2.poll(), Some(AssertionUpdate::Asserted(id, 42)));
472    }
473
474    #[test]
475    fn assert_without_subscribers_stores_value() {
476        let registry = DataspaceRegistry::new();
477        let id = Identifier::numeric(1);
478
479        // Assert before any subscriber exists -- the value is stored.
480        registry.assert(42u32, id.clone());
481
482        // A later subscriber should receive the current value.
483        let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
484        let mut recv = test_spawn(sub.recv());
485        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id, 42)));
486    }
487
488    #[test]
489    fn different_types_same_identifier() {
490        let registry = DataspaceRegistry::new();
491        let id = Identifier::numeric(1);
492
493        let mut sub_u32 = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
494        let mut sub_string = registry.subscribe::<String>(IdentifierFilter::exact(id.clone()));
495
496        registry.assert(42u32, id.clone());
497        registry.assert("hello".to_string(), id.clone());
498
499        let mut recv_u32 = test_spawn(sub_u32.recv());
500        assert_ready_eq!(recv_u32.poll(), Some(AssertionUpdate::Asserted(id.clone(), 42)));
501
502        let mut recv_string = test_spawn(sub_string.recv());
503        assert_ready_eq!(
504            recv_string.poll(),
505            Some(AssertionUpdate::Asserted(id, "hello".to_string()))
506        );
507    }
508
509    #[test]
510    fn process_identifier() {
511        let registry = DataspaceRegistry::new();
512        let pid = crate::runtime::process::Id::new();
513        let id = Identifier::from(pid);
514
515        let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
516        registry.assert(42u32, id.clone());
517
518        let mut recv = test_spawn(sub.recv());
519        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id, 42)));
520    }
521
522    #[test]
523    fn channel_closed_returns_none() {
524        let registry = DataspaceRegistry::new();
525        let id = Identifier::numeric(1);
526
527        let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id));
528
529        // Drop the registry, which drops the Arc. Since we only have one reference, the sender is dropped.
530        drop(registry);
531
532        let mut recv = test_spawn(sub.recv());
533        assert_ready_eq!(recv.poll(), None);
534    }
535
536    #[test]
537    fn lagged_subscriber_recovers() {
538        // Create a registry with a tiny buffer so we can force lag.
539        let registry = DataspaceRegistry::with_channel_capacity(2);
540        let id = Identifier::numeric(1);
541
542        let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
543
544        // Assert more values than the channel can hold.
545        for i in 0..10 {
546            registry.assert(i as u32, id.clone());
547        }
548
549        // The subscriber should skip lagged messages and still receive a value.
550        let mut recv = test_spawn(sub.recv());
551        let value = assert_ready!(recv.poll());
552        assert!(value.is_some());
553    }
554
555    #[test]
556    fn multiple_values_received_in_order() {
557        let registry = DataspaceRegistry::new();
558        let id = Identifier::numeric(1);
559
560        let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
561
562        registry.assert(1u32, id.clone());
563        registry.assert(2u32, id.clone());
564        registry.assert(3u32, id.clone());
565
566        let mut recv = test_spawn(sub.recv());
567        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id.clone(), 1)));
568        drop(recv);
569
570        let mut recv = test_spawn(sub.recv());
571        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id.clone(), 2)));
572        drop(recv);
573
574        let mut recv = test_spawn(sub.recv());
575        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id, 3)));
576    }
577
578    #[test]
579    fn assert_then_retract() {
580        let registry = DataspaceRegistry::new();
581        let id = Identifier::numeric(1);
582
583        let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
584
585        registry.assert(42u32, id.clone());
586        registry.retract::<u32>(id.clone());
587
588        let mut recv = test_spawn(sub.recv());
589        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id.clone(), 42)));
590        drop(recv);
591
592        let mut recv = test_spawn(sub.recv());
593        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Retracted(id)));
594    }
595
596    #[test]
597    fn retract_without_subscribers_succeeds() {
598        let registry = DataspaceRegistry::new();
599        let id = Identifier::numeric(1);
600
601        // Retract without any subscribers -- should not panic.
602        registry.retract::<u32>(id);
603    }
604
605    #[test]
606    fn multiple_subscribers_receive_retraction() {
607        let registry = DataspaceRegistry::new();
608        let id = Identifier::numeric(1);
609
610        let mut sub1 = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
611        let mut sub2 = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
612
613        registry.assert(42u32, id.clone());
614        registry.retract::<u32>(id.clone());
615
616        // Drain assertion notifications.
617        let mut recv1 = test_spawn(sub1.recv());
618        assert_ready_eq!(recv1.poll(), Some(AssertionUpdate::Asserted(id.clone(), 42)));
619        drop(recv1);
620
621        let mut recv2 = test_spawn(sub2.recv());
622        assert_ready_eq!(recv2.poll(), Some(AssertionUpdate::Asserted(id.clone(), 42)));
623        drop(recv2);
624
625        // Check retraction notifications.
626        let mut recv1 = test_spawn(sub1.recv());
627        assert_ready_eq!(recv1.poll(), Some(AssertionUpdate::Retracted(id.clone())));
628
629        let mut recv2 = test_spawn(sub2.recv());
630        assert_ready_eq!(recv2.poll(), Some(AssertionUpdate::Retracted(id)));
631    }
632
633    #[test]
634    fn assert_retract_assert_sequence() {
635        let registry = DataspaceRegistry::new();
636        let id = Identifier::numeric(1);
637
638        let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
639
640        registry.assert(1u32, id.clone());
641        registry.retract::<u32>(id.clone());
642        registry.assert(2u32, id.clone());
643
644        let mut recv = test_spawn(sub.recv());
645        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id.clone(), 1)));
646        drop(recv);
647
648        let mut recv = test_spawn(sub.recv());
649        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Retracted(id.clone())));
650        drop(recv);
651
652        let mut recv = test_spawn(sub.recv());
653        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id, 2)));
654    }
655
656    #[test]
657    fn all_filter_receives_from_multiple_identifiers() {
658        let registry = DataspaceRegistry::new();
659        let id1 = Identifier::numeric(1);
660        let id2 = Identifier::numeric(2);
661
662        let mut sub = registry.subscribe::<u32>(IdentifierFilter::all());
663
664        registry.assert(1u32, id1.clone());
665        registry.assert(2u32, id2.clone());
666
667        let mut recv = test_spawn(sub.recv());
668        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id1, 1)));
669        drop(recv);
670
671        let mut recv = test_spawn(sub.recv());
672        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id2, 2)));
673    }
674
675    #[test]
676    fn all_filter_receives_retraction() {
677        let registry = DataspaceRegistry::new();
678        let id = Identifier::numeric(1);
679
680        let mut sub = registry.subscribe::<u32>(IdentifierFilter::all());
681
682        registry.assert(42u32, id.clone());
683        registry.retract::<u32>(id.clone());
684
685        let mut recv = test_spawn(sub.recv());
686        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id.clone(), 42)));
687        drop(recv);
688
689        let mut recv = test_spawn(sub.recv());
690        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Retracted(id)));
691    }
692
693    #[test]
694    fn all_filter_and_exact_both_receive() {
695        let registry = DataspaceRegistry::new();
696        let id = Identifier::numeric(1);
697
698        let mut specific = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
699        let mut all = registry.subscribe::<u32>(IdentifierFilter::all());
700
701        registry.assert(42u32, id.clone());
702
703        let mut recv_specific = test_spawn(specific.recv());
704        assert_ready_eq!(recv_specific.poll(), Some(AssertionUpdate::Asserted(id.clone(), 42)));
705
706        let mut recv_all = test_spawn(all.recv());
707        assert_ready_eq!(recv_all.poll(), Some(AssertionUpdate::Asserted(id, 42)));
708    }
709
710    #[test]
711    fn all_filter_created_before_exact_channels() {
712        let registry = DataspaceRegistry::new();
713
714        // Subscribe to all identifiers before any specific-identifier activity exists.
715        let mut all = registry.subscribe::<u32>(IdentifierFilter::all());
716
717        let id = Identifier::numeric(1);
718        registry.assert(42u32, id.clone());
719
720        let mut recv = test_spawn(all.recv());
721        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id, 42)));
722    }
723
724    #[test]
725    fn subscribe_receives_current_value() {
726        let registry = DataspaceRegistry::new();
727        let id = Identifier::numeric(1);
728
729        // Assert before subscribing.
730        registry.assert(42u32, id.clone());
731
732        // Subscribe after asserting -- should immediately get the current value.
733        let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
734        let mut recv = test_spawn(sub.recv());
735        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id, 42)));
736    }
737
738    #[test]
739    fn subscribe_after_retract_gets_nothing_pending() {
740        let registry = DataspaceRegistry::new();
741        let id = Identifier::numeric(1);
742
743        // Assert then retract -- no current value.
744        registry.assert(42u32, id.clone());
745        registry.retract::<u32>(id.clone());
746
747        let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id));
748
749        // Drop the registry so the channel closes, proving no pending value is delivered.
750        drop(registry);
751
752        let mut recv = test_spawn(sub.recv());
753        assert_ready_eq!(recv.poll(), None);
754    }
755
756    #[test]
757    fn all_filter_receives_current_values() {
758        let registry = DataspaceRegistry::new();
759        let id1 = Identifier::numeric(1);
760        let id2 = Identifier::numeric(2);
761
762        // Assert on two identifiers before subscribing.
763        registry.assert(1u32, id1.clone());
764        registry.assert(2u32, id2.clone());
765
766        let mut sub = registry.subscribe::<u32>(IdentifierFilter::all());
767
768        // Should receive both current values (order is not guaranteed since HashMap iteration is unordered).
769        let mut recv = test_spawn(sub.recv());
770        let v1 = assert_ready!(recv.poll());
771        drop(recv);
772
773        let mut recv = test_spawn(sub.recv());
774        let v2 = assert_ready!(recv.poll());
775
776        let mut received = [v1.unwrap(), v2.unwrap()];
777        received.sort_by_key(|update| match update {
778            AssertionUpdate::Asserted(_, v) => *v,
779            AssertionUpdate::Retracted(_) => 0,
780        });
781
782        assert_eq!(received[0], AssertionUpdate::Asserted(id1, 1));
783        assert_eq!(received[1], AssertionUpdate::Asserted(id2, 2));
784    }
785
786    #[test]
787    fn subscribe_receives_current_then_future() {
788        let registry = DataspaceRegistry::new();
789        let id = Identifier::numeric(1);
790
791        // Assert before subscribing.
792        registry.assert(1u32, id.clone());
793
794        let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
795
796        // Assert again after subscribing.
797        registry.assert(2u32, id.clone());
798
799        // First recv should return the initial value, second should return the broadcast value.
800        let mut recv = test_spawn(sub.recv());
801        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id.clone(), 1)));
802        drop(recv);
803
804        let mut recv = test_spawn(sub.recv());
805        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id, 2)));
806    }
807
808    #[test]
809    fn prefix_filter_matches_named_identifiers() {
810        let registry = DataspaceRegistry::new();
811        let id1 = Identifier::named("metrics.cpu");
812        let id2 = Identifier::named("metrics.mem");
813        let id3 = Identifier::named("logs.error");
814
815        let mut sub = registry.subscribe::<u32>(IdentifierFilter::prefix("metrics."));
816
817        registry.assert(1u32, id1.clone());
818        registry.assert(2u32, id2.clone());
819        registry.assert(3u32, id3);
820
821        // Should only receive the two metrics identifiers.
822        let mut recv = test_spawn(sub.recv());
823        let v1 = assert_ready!(recv.poll());
824        drop(recv);
825
826        let mut recv = test_spawn(sub.recv());
827        let v2 = assert_ready!(recv.poll());
828
829        let mut received = [v1.unwrap(), v2.unwrap()];
830        received.sort_by_key(|update| match update {
831            AssertionUpdate::Asserted(_, v) => *v,
832            AssertionUpdate::Retracted(_) => 0,
833        });
834
835        assert_eq!(received[0], AssertionUpdate::Asserted(id1, 1));
836        assert_eq!(received[1], AssertionUpdate::Asserted(id2, 2));
837    }
838
839    #[test]
840    fn prefix_filter_does_not_match_numeric() {
841        let registry = DataspaceRegistry::new();
842        let id = Identifier::numeric(42);
843
844        let mut sub = registry.subscribe::<u32>(IdentifierFilter::prefix("any"));
845
846        registry.assert(1u32, id);
847
848        // Drop registry to close channel, proving no value was delivered.
849        drop(registry);
850
851        let mut recv = test_spawn(sub.recv());
852        assert_ready_eq!(recv.poll(), None);
853    }
854
855    #[test]
856    fn prefix_filter_replays_matching_current_values() {
857        let registry = DataspaceRegistry::new();
858        let id1 = Identifier::named("svc.alpha");
859        let id2 = Identifier::named("svc.beta");
860        let id3 = Identifier::named("other.gamma");
861
862        // Assert before subscribing.
863        registry.assert(1u32, id1.clone());
864        registry.assert(2u32, id2.clone());
865        registry.assert(3u32, id3);
866
867        let mut sub = registry.subscribe::<u32>(IdentifierFilter::prefix("svc."));
868
869        // Should replay only the two matching values.
870        let mut recv = test_spawn(sub.recv());
871        let v1 = assert_ready!(recv.poll());
872        drop(recv);
873
874        let mut recv = test_spawn(sub.recv());
875        let v2 = assert_ready!(recv.poll());
876
877        let mut received = [v1.unwrap(), v2.unwrap()];
878        received.sort_by_key(|update| match update {
879            AssertionUpdate::Asserted(_, v) => *v,
880            AssertionUpdate::Retracted(_) => 0,
881        });
882
883        assert_eq!(received[0], AssertionUpdate::Asserted(id1, 1));
884        assert_eq!(received[1], AssertionUpdate::Asserted(id2, 2));
885    }
886
887    #[test]
888    fn try_current_returns_none_outside_context() {
889        assert!(DataspaceRegistry::try_current().is_none());
890    }
891
892    #[test]
893    fn current_and_try_current_work_within_scope() {
894        let registry = DataspaceRegistry::new();
895        let registry_clone = registry.clone();
896
897        let mut scope_fut = test_spawn(CURRENT_DATASPACE.scope(registry, async {
898            let current = DataspaceRegistry::try_current();
899            assert!(current.is_some());
900
901            // Verify it's the same registry by asserting in one and reading from the other.
902            let current = current.unwrap();
903            let id = Identifier::named("test");
904            current.assert(42u32, id.clone());
905
906            let mut sub = registry_clone.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
907            let value = sub.recv().await;
908            assert_eq!(value, Some(AssertionUpdate::Asserted(id, 42)));
909        }));
910        assert_ready!(scope_fut.poll());
911    }
912
913    #[test]
914    fn retract_all_for_process_retracts_all_owned_assertions() {
915        let registry = DataspaceRegistry::new();
916        let process_id = Id::new();
917        let id1 = Identifier::named("val1");
918        let id2 = Identifier::named("val2");
919
920        // Assert two values as if from the given process.
921        crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(process_id, || {
922            registry.assert(1u32, id1.clone());
923            registry.assert(2u32, id2.clone());
924        });
925
926        // Subscribe to both after assertion to get current values replayed.
927        let mut sub1 = registry.subscribe::<u32>(IdentifierFilter::exact(id1.clone()));
928        let mut sub2 = registry.subscribe::<u32>(IdentifierFilter::exact(id2.clone()));
929
930        // Drain the initial replayed values.
931        let mut recv = test_spawn(sub1.recv());
932        let _ = assert_ready!(recv.poll());
933        drop(recv);
934
935        let mut recv = test_spawn(sub2.recv());
936        let _ = assert_ready!(recv.poll());
937        drop(recv);
938
939        // Retract all assertions for the process.
940        registry.retract_all_for_process(process_id);
941
942        // Both subscribers should receive retraction notifications.
943        let mut recv = test_spawn(sub1.recv());
944        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Retracted(id1)));
945        drop(recv);
946
947        let mut recv = test_spawn(sub2.recv());
948        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Retracted(id2)));
949    }
950
951    #[test]
952    fn retract_all_for_process_does_not_affect_other_processes() {
953        let registry = DataspaceRegistry::new();
954        let pid_a = Id::new();
955        let pid_b = Id::new();
956        let id_a = Identifier::named("a");
957        let id_b = Identifier::named("b");
958
959        crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_a, || {
960            registry.assert(1u32, id_a.clone());
961        });
962        crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_b, || {
963            registry.assert(2u32, id_b.clone());
964        });
965
966        // Retract only process A's assertions.
967        registry.retract_all_for_process(pid_a);
968
969        // Process B's value should still be available to new subscribers.
970        let mut sub_b = registry.subscribe::<u32>(IdentifierFilter::exact(id_b.clone()));
971        let mut recv = test_spawn(sub_b.recv());
972        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id_b, 2)));
973        drop(recv);
974
975        // Process A's value should not be available.
976        let mut sub_a = registry.subscribe::<u32>(IdentifierFilter::exact(id_a));
977        drop(registry);
978        let mut recv = test_spawn(sub_a.recv());
979        assert_ready_eq!(recv.poll(), None);
980    }
981
982    #[test]
983    fn retract_all_for_process_notifies_filtered_subscribers() {
984        let registry = DataspaceRegistry::new();
985        let process_id = Id::new();
986        let id = Identifier::named("val");
987
988        let mut all_sub = registry.subscribe::<u32>(IdentifierFilter::all());
989
990        crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(process_id, || {
991            registry.assert(42u32, id.clone());
992        });
993
994        // Receive the assertion.
995        let mut recv = test_spawn(all_sub.recv());
996        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id.clone(), 42)));
997        drop(recv);
998
999        // Retract all for the process.
1000        registry.retract_all_for_process(process_id);
1001
1002        // Should receive the retraction on the wildcard subscription.
1003        let mut recv = test_spawn(all_sub.recv());
1004        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Retracted(id)));
1005    }
1006
1007    #[test]
1008    fn manual_retract_cleans_up_process_tracking() {
1009        let registry = DataspaceRegistry::new();
1010        let process_id = Id::new();
1011        let id = Identifier::named("val");
1012
1013        let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
1014
1015        crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(process_id, || {
1016            registry.assert(42u32, id.clone());
1017        });
1018
1019        // Manually retract (must be from the owning process).
1020        crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(process_id, || {
1021            registry.retract::<u32>(id.clone());
1022        });
1023
1024        // Drain the assertion and retraction.
1025        let mut recv = test_spawn(sub.recv());
1026        let _ = assert_ready!(recv.poll());
1027        drop(recv);
1028
1029        let mut recv = test_spawn(sub.recv());
1030        let _ = assert_ready!(recv.poll());
1031        drop(recv);
1032
1033        // Now retract_all_for_process should be a no-op (no duplicate retraction).
1034        registry.retract_all_for_process(process_id);
1035
1036        // Drop the registry to close the channel, proving no further messages are pending.
1037        drop(registry);
1038        let mut recv = test_spawn(sub.recv());
1039        assert_ready_eq!(recv.poll(), None);
1040    }
1041
1042    #[test]
1043    fn retract_all_for_unknown_process_is_noop() {
1044        let registry = DataspaceRegistry::new();
1045        // Should not panic.
1046        registry.retract_all_for_process(Id::new());
1047    }
1048
1049    #[test]
1050    fn instrumented_process_retracts_on_normal_completion() {
1051        let registry = DataspaceRegistry::new();
1052        let process = Process::supervisor_with_dataspace("test_worker", None, Some(registry.clone())).unwrap();
1053        let id = "from_worker";
1054
1055        let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id));
1056
1057        // Spawn a subscriber future, asserting that it is quiesced.
1058        let mut recv_fut = test_spawn(sub.recv());
1059        assert!(!recv_fut.is_woken());
1060        assert_pending!(recv_fut.poll());
1061
1062        // Create an instrumented future that asserts a value, then completes.
1063        let mut worker = test_spawn(process.into_process_future(async {
1064            DataspaceRegistry::try_current()
1065                .expect("dataspace registry should be available")
1066                .assert(42u32, "from_worker");
1067        }));
1068
1069        // Poll the worker to completion — the assertion happens during this poll.
1070        assert_ready!(worker.poll());
1071
1072        // The subscriber should now be woken and have the assertion update.
1073        assert!(recv_fut.is_woken());
1074        assert_ready_eq!(recv_fut.poll(), Some(AssertionUpdate::Asserted(id.into(), 42)));
1075
1076        drop(recv_fut);
1077
1078        // Set up a new subscriber call for getting the retraction.
1079        let mut recv_fut = test_spawn(sub.recv());
1080        assert_pending!(recv_fut.poll());
1081        assert!(!recv_fut.is_woken());
1082
1083        // Drop the InstrumentedProcess — this triggers automatic retraction.
1084        drop(worker);
1085
1086        // The drop should have woken the subscriber.
1087        assert!(recv_fut.is_woken());
1088        assert_ready_eq!(recv_fut.poll(), Some(AssertionUpdate::Retracted(id.into())));
1089    }
1090
1091    #[test]
1092    fn instrumented_process_retracts_on_drop_before_completion() {
1093        let registry = DataspaceRegistry::new();
1094        let process = Process::supervisor_with_dataspace("test_worker", None, Some(registry.clone())).unwrap();
1095        let id = "from_worker";
1096
1097        let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id));
1098
1099        // Spawn a subscriber future, asserting that it is quiesced.
1100        let mut recv_fut = test_spawn(sub.recv());
1101        assert!(!recv_fut.is_woken());
1102        assert_pending!(recv_fut.poll());
1103
1104        // Create an instrumented future that asserts a value, then pends forever.
1105        let mut worker = test_spawn(process.into_process_future(async {
1106            DataspaceRegistry::try_current()
1107                .expect("dataspace registry should be available")
1108                .assert(42u32, "from_worker");
1109            pending::<()>().await;
1110        }));
1111
1112        // Drive the worker to assert the value.
1113        assert_pending!(worker.poll());
1114
1115        // The subscriber should now be woken and have the assertion update.
1116        assert!(recv_fut.is_woken());
1117        assert_ready_eq!(recv_fut.poll(), Some(AssertionUpdate::Asserted(id.into(), 42)));
1118
1119        drop(recv_fut);
1120
1121        // Set up a new subscriber call for getting the retraction.
1122        let mut recv_fut = test_spawn(sub.recv());
1123        assert_pending!(recv_fut.poll());
1124        assert!(!recv_fut.is_woken());
1125
1126        // Drop the worker (simulates abort/cancel) — triggers automatic retraction.
1127        drop(worker);
1128
1129        // The drop should have woken the subscriber with a retraction.
1130        assert!(recv_fut.is_woken());
1131        assert_ready_eq!(recv_fut.poll(), Some(AssertionUpdate::Retracted(id.into())));
1132    }
1133
1134    #[test]
1135    fn instrumented_process_retracts_multiple_types_on_drop() {
1136        let registry = DataspaceRegistry::new();
1137        let process = Process::supervisor_with_dataspace("test_worker", None, Some(registry.clone())).unwrap();
1138        let id_num = "number";
1139        let id_str = "text";
1140
1141        let mut sub_u32 = registry.subscribe::<u32>(IdentifierFilter::exact(id_num));
1142        let mut sub_str = registry.subscribe::<String>(IdentifierFilter::exact(id_str));
1143
1144        // Spawn futures for both subscribers, asserting that they are quiesced.
1145        let mut recv_u32_fut = test_spawn(sub_u32.recv());
1146        let mut recv_str_fut = test_spawn(sub_str.recv());
1147        assert!(!recv_u32_fut.is_woken());
1148        assert!(!recv_str_fut.is_woken());
1149        assert_pending!(recv_u32_fut.poll());
1150        assert_pending!(recv_str_fut.poll());
1151
1152        // Create an instrumented future that asserts values of two different types.
1153        let mut worker = test_spawn(process.into_process_future(async {
1154            let ds = DataspaceRegistry::try_current().expect("dataspace registry should be available");
1155            ds.assert(42u32, id_num);
1156            ds.assert("hello".to_string(), id_str);
1157            pending::<()>().await;
1158        }));
1159
1160        // Drive the worker to assert the values.
1161        assert_pending!(worker.poll());
1162
1163        // Both subscribers should now be woken and have the assertion updates.
1164        assert!(recv_u32_fut.is_woken());
1165        assert!(recv_str_fut.is_woken());
1166        assert_ready_eq!(recv_u32_fut.poll(), Some(AssertionUpdate::Asserted(id_num.into(), 42)));
1167        assert_ready_eq!(
1168            recv_str_fut.poll(),
1169            Some(AssertionUpdate::Asserted(id_str.into(), "hello".to_string()))
1170        );
1171
1172        drop(recv_u32_fut);
1173        drop(recv_str_fut);
1174
1175        // Set up new subscriber calls for getting the retractions.
1176        let mut recv_u32 = test_spawn(sub_u32.recv());
1177        let mut recv_str = test_spawn(sub_str.recv());
1178        assert_pending!(recv_u32.poll());
1179        assert_pending!(recv_str.poll());
1180        assert!(!recv_u32.is_woken());
1181        assert!(!recv_str.is_woken());
1182
1183        // Drop the worker — both types should be retracted.
1184        drop(worker);
1185
1186        assert!(recv_u32.is_woken());
1187        assert!(recv_str.is_woken());
1188        assert_ready_eq!(recv_u32.poll(), Some(AssertionUpdate::Retracted(id_num.into())));
1189        assert_ready_eq!(recv_str.poll(), Some(AssertionUpdate::Retracted(id_str.into())));
1190    }
1191
1192    #[test]
1193    #[cfg(not(debug_assertions))]
1194    fn retract_from_non_owner_is_ignored() {
1195        let registry = DataspaceRegistry::new();
1196        let pid_a = Id::new();
1197        let pid_b = Id::new();
1198        let id = Identifier::named("owned_by_a");
1199
1200        // Assert as process A.
1201        crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_a, || {
1202            registry.assert(42u32, id.clone());
1203        });
1204
1205        // Attempt to retract as process B -- should be silently ignored.
1206        crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_b, || {
1207            registry.retract::<u32>(id.clone());
1208        });
1209
1210        // Value should still be present for new subscribers.
1211        let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
1212        let mut recv = test_spawn(sub.recv());
1213        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id.clone(), 42)));
1214        drop(recv);
1215
1216        // Retract as process A -- should succeed.
1217        crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_a, || {
1218            registry.retract::<u32>(id.clone());
1219        });
1220
1221        // Subscriber should receive the retraction.
1222        let mut recv = test_spawn(sub.recv());
1223        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Retracted(id)));
1224    }
1225
1226    #[test]
1227    #[cfg(debug_assertions)]
1228    #[should_panic(expected = "attempted to retract assertion owned by")]
1229    fn retract_from_non_owner_panics_in_debug() {
1230        let registry = DataspaceRegistry::new();
1231        let pid_a = Id::new();
1232        let pid_b = Id::new();
1233        let id = Identifier::named("owned_by_a");
1234
1235        crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_a, || {
1236            registry.assert(42u32, id.clone());
1237        });
1238
1239        crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_b, || {
1240            registry.retract::<u32>(id.clone());
1241        });
1242    }
1243
1244    #[test]
1245    #[cfg(not(debug_assertions))]
1246    fn reassert_from_non_owner_is_ignored() {
1247        let registry = DataspaceRegistry::new();
1248        let pid_a = Id::new();
1249        let pid_b = Id::new();
1250        let id = Identifier::named("owned_by_a");
1251
1252        // Assert as process A.
1253        crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_a, || {
1254            registry.assert(42u32, id.clone());
1255        });
1256
1257        // Attempt to overwrite as process B -- should be silently ignored.
1258        crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_b, || {
1259            registry.assert(99u32, id.clone());
1260        });
1261
1262        // Value should still be the original.
1263        let mut sub = registry.subscribe::<u32>(IdentifierFilter::exact(id.clone()));
1264        let mut recv = test_spawn(sub.recv());
1265        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id.clone(), 42)));
1266        drop(recv);
1267
1268        // Update as process A -- should succeed.
1269        crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_a, || {
1270            registry.assert(100u32, id.clone());
1271        });
1272
1273        // Subscriber should receive the update.
1274        let mut recv = test_spawn(sub.recv());
1275        assert_ready_eq!(recv.poll(), Some(AssertionUpdate::Asserted(id, 100)));
1276    }
1277
1278    #[test]
1279    #[cfg(debug_assertions)]
1280    #[should_panic(expected = "attempted to update assertion owned by")]
1281    fn reassert_from_non_owner_panics_in_debug() {
1282        let registry = DataspaceRegistry::new();
1283        let pid_a = Id::new();
1284        let pid_b = Id::new();
1285        let id = Identifier::named("owned_by_a");
1286
1287        crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_a, || {
1288            registry.assert(42u32, id.clone());
1289        });
1290
1291        crate::runtime::process::CURRENT_PROCESS_ID.sync_scope(pid_b, || {
1292            registry.assert(99u32, id.clone());
1293        });
1294    }
1295}