Skip to main content

hydro_lang/live_collections/
keyed_singleton.rs

1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::builder::{CycleId, FlowState};
18use crate::compile::ir::{
19    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, SharedNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24use crate::live_collections::stream::{Ordering, Retries};
25#[cfg(stageleft_runtime)]
26use crate::location::dynamic::{DynLocation, LocationId};
27use crate::location::tick::DeferTick;
28use crate::location::{Atomic, Location, Tick, check_matching_location};
29use crate::manual_expr::ManualExpr;
30use crate::nondet::{NonDet, nondet};
31use crate::properties::manual_proof;
32
33/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
34///
35/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
36/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
37/// indicates that entries may be added over time, but once an entry is added it will never be
38/// removed and its value will never change.
39pub trait KeyedSingletonBound {
40    /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
41    type UnderlyingBound: Boundedness;
42    /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
43    type ValueBound: Boundedness;
44
45    /// The type of the keyed singleton if the value for each key is immutable.
46    type WithBoundedValue: KeyedSingletonBound<
47            UnderlyingBound = Self::UnderlyingBound,
48            ValueBound = Bounded,
49            EraseMonotonic = Self::WithBoundedValue,
50        >;
51
52    /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`KeyedStream`] with [`Self`] boundedness.
53    type KeyedStreamToMonotone: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
54
55    /// The type of the keyed singleton if the value for each key is no longer monotonic.
56    type EraseMonotonic: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
57
58    /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
59    fn bound_kind() -> KeyedSingletonBoundKind;
60}
61
62impl KeyedSingletonBound for Unbounded {
63    type UnderlyingBound = Unbounded;
64    type ValueBound = Unbounded;
65    type WithBoundedValue = BoundedValue;
66    type KeyedStreamToMonotone = MonotonicValue;
67    type EraseMonotonic = Unbounded;
68
69    fn bound_kind() -> KeyedSingletonBoundKind {
70        KeyedSingletonBoundKind::Unbounded
71    }
72}
73
74impl KeyedSingletonBound for Bounded {
75    type UnderlyingBound = Bounded;
76    type ValueBound = Bounded;
77    type WithBoundedValue = Bounded;
78    type KeyedStreamToMonotone = Bounded;
79    type EraseMonotonic = Bounded;
80
81    fn bound_kind() -> KeyedSingletonBoundKind {
82        KeyedSingletonBoundKind::Bounded
83    }
84}
85
86/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
87/// its value is bounded and will never change, but new entries may appear asynchronously
88pub struct BoundedValue;
89
90impl KeyedSingletonBound for BoundedValue {
91    type UnderlyingBound = Unbounded;
92    type ValueBound = Bounded;
93    type WithBoundedValue = BoundedValue;
94    type KeyedStreamToMonotone = BoundedValue;
95    type EraseMonotonic = BoundedValue;
96
97    fn bound_kind() -> KeyedSingletonBoundKind {
98        KeyedSingletonBoundKind::BoundedValue
99    }
100}
101
102/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
103/// it will never be removed, and the corresponding value will only increase monotonically.
104pub struct MonotonicValue;
105
106impl KeyedSingletonBound for MonotonicValue {
107    type UnderlyingBound = Unbounded;
108    type ValueBound = Unbounded;
109    type WithBoundedValue = BoundedValue;
110    type KeyedStreamToMonotone = MonotonicValue;
111    type EraseMonotonic = Unbounded;
112
113    fn bound_kind() -> KeyedSingletonBoundKind {
114        KeyedSingletonBoundKind::MonotonicValue
115    }
116}
117
118/// Mapping from keys of type `K` to values of type `V`.
119///
120/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
121/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
122/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
123/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
124/// keys cannot be removed and the value for each key is immutable.
125///
126/// Type Parameters:
127/// - `K`: the type of the key for each entry
128/// - `V`: the type of the value for each entry
129/// - `Loc`: the [`Location`] where the keyed singleton is materialized
130/// - `Bound`: tracks whether the entries are:
131///     - [`Bounded`] (local and finite)
132///     - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
133///     - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
134pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
135    pub(crate) location: Loc,
136    pub(crate) ir_node: RefCell<HydroNode>,
137    pub(crate) flow_state: FlowState,
138
139    _phantom: PhantomData<(K, V, Loc, Bound)>,
140}
141
142impl<K, V, L, B: KeyedSingletonBound> Drop for KeyedSingleton<K, V, L, B> {
143    fn drop(&mut self) {
144        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
145        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
146            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
147                input: Box::new(ir_node),
148                op_metadata: HydroIrOpMetadata::new(),
149            });
150        }
151    }
152}
153
154impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
155    for KeyedSingleton<K, V, Loc, Bound>
156{
157    fn clone(&self) -> Self {
158        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
159            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
160            *self.ir_node.borrow_mut() = HydroNode::Tee {
161                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
162                metadata: self.location.new_node_metadata(Self::collection_kind()),
163            };
164        }
165
166        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
167            KeyedSingleton {
168                location: self.location.clone(),
169                flow_state: self.flow_state.clone(),
170                ir_node: HydroNode::Tee {
171                    inner: SharedNode(inner.0.clone()),
172                    metadata: metadata.clone(),
173                }
174                .into(),
175                _phantom: PhantomData,
176            }
177        } else {
178            unreachable!()
179        }
180    }
181}
182
183impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
184    for KeyedSingleton<K, V, L, B>
185where
186    L: Location<'a>,
187{
188    type Location = L;
189
190    fn create_source(cycle_id: CycleId, location: L) -> Self {
191        KeyedSingleton {
192            flow_state: location.flow_state().clone(),
193            location: location.clone(),
194            ir_node: RefCell::new(HydroNode::CycleSource {
195                cycle_id,
196                metadata: location.new_node_metadata(Self::collection_kind()),
197            }),
198            _phantom: PhantomData,
199        }
200    }
201}
202
203impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
204where
205    L: Location<'a>,
206{
207    type Location = Tick<L>;
208
209    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
210        KeyedSingleton::new(
211            location.clone(),
212            HydroNode::CycleSource {
213                cycle_id,
214                metadata: location.new_node_metadata(Self::collection_kind()),
215            },
216        )
217    }
218}
219
220impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
221where
222    L: Location<'a>,
223{
224    fn defer_tick(self) -> Self {
225        KeyedSingleton::defer_tick(self)
226    }
227}
228
229impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
230    for KeyedSingleton<K, V, L, B>
231where
232    L: Location<'a>,
233{
234    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
235        assert_eq!(
236            Location::id(&self.location),
237            expected_location,
238            "locations do not match"
239        );
240        self.location
241            .flow_state()
242            .borrow_mut()
243            .push_root(HydroRoot::CycleSink {
244                cycle_id,
245                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
246                op_metadata: HydroIrOpMetadata::new(),
247            });
248    }
249}
250
251impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
252where
253    L: Location<'a>,
254{
255    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
256        assert_eq!(
257            Location::id(&self.location),
258            expected_location,
259            "locations do not match"
260        );
261        self.location
262            .flow_state()
263            .borrow_mut()
264            .push_root(HydroRoot::CycleSink {
265                cycle_id,
266                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
267                op_metadata: HydroIrOpMetadata::new(),
268            });
269    }
270}
271
272impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
273    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
274        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
275        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
276
277        let flow_state = location.flow_state().clone();
278        KeyedSingleton {
279            location,
280            flow_state,
281            ir_node: RefCell::new(ir_node),
282            _phantom: PhantomData,
283        }
284    }
285
286    /// Returns the [`Location`] where this keyed singleton is being materialized.
287    pub fn location(&self) -> &L {
288        &self.location
289    }
290
291    /// Weakens the consistency of this live collection to not guarantee any consistency across
292    /// cluster members (if this collection is on a cluster).
293    pub fn weaken_consistency(self) -> KeyedSingleton<K, V, L::NoConsistency, B>
294    where
295        L: Location<'a>,
296    {
297        if L::consistency()
298            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
299        {
300            // already no consistency
301            KeyedSingleton::new(
302                self.location.drop_consistency(),
303                self.ir_node.replace(HydroNode::Placeholder),
304            )
305        } else {
306            KeyedSingleton::new(
307                self.location.drop_consistency(),
308                HydroNode::Cast {
309                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
310                    metadata: self
311                        .location
312                        .drop_consistency()
313                        .new_node_metadata(
314                            KeyedSingleton::<K, V, L::NoConsistency, B>::collection_kind(),
315                        ),
316                },
317            )
318        }
319    }
320
321    /// Casts this live collection to have the consistency guarantees specified in the given
322    /// location type parameter. The developer must ensure that the strengthened consistency
323    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
324    pub fn assert_has_consistency_of<L2: Location<'a, NoConsistency = L::NoConsistency>>(
325        self,
326        _proof: impl crate::properties::ConsistencyProof,
327    ) -> KeyedSingleton<K, V, L2, B>
328    where
329        L: Location<'a>,
330    {
331        if L::consistency() == L2::consistency() {
332            // already consistent
333            KeyedSingleton::new(
334                self.location.with_consistency_of(),
335                self.ir_node.replace(HydroNode::Placeholder),
336            )
337        } else {
338            KeyedSingleton::new(
339                self.location.with_consistency_of(),
340                HydroNode::AssertIsConsistent {
341                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
342                    metadata: self
343                        .location
344                        .clone()
345                        .with_consistency_of::<L2>()
346                        .new_node_metadata(KeyedSingleton::<K, V, L2, B>::collection_kind()),
347                },
348            )
349        }
350    }
351}
352
353#[cfg(stageleft_runtime)]
354fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
355    me: KeyedSingleton<K, V, L, Bounded>,
356) -> Singleton<usize, L, Bounded> {
357    me.entries().count()
358}
359
360#[cfg(stageleft_runtime)]
361fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
362    me: KeyedSingleton<K, V, L, Bounded>,
363) -> Singleton<HashMap<K, V>, L, Bounded>
364where
365    K: Eq + Hash,
366{
367    me.entries()
368        .assume_ordering_trusted(nondet!(
369            /// There is only one element associated with each key. The closure technically
370            /// isn't commutative in the case where both passed entries have the same key
371            /// but different values.
372            ///
373            /// In the future, we may want to have an `assume!(...)` statement in the UDF that
374            /// the key is never already present in the map.
375        ))
376        .fold(
377            q!(|| HashMap::new()),
378            q!(|map, (k, v)| {
379                map.insert(k, v);
380            }),
381        )
382}
383
384impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
385    pub(crate) fn collection_kind() -> CollectionKind {
386        CollectionKind::KeyedSingleton {
387            bound: B::bound_kind(),
388            key_type: stageleft::quote_type::<K>().into(),
389            value_type: stageleft::quote_type::<V>().into(),
390        }
391    }
392
393    /// Transforms each value by invoking `f` on each element, with keys staying the same
394    /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
395    ///
396    /// If you do not want to modify the stream and instead only want to view
397    /// each item use [`KeyedSingleton::inspect`] instead.
398    ///
399    /// # Example
400    /// ```rust
401    /// # #[cfg(feature = "deploy")] {
402    /// # use hydro_lang::prelude::*;
403    /// # use futures::StreamExt;
404    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
405    /// let keyed_singleton = // { 1: 2, 2: 4 }
406    /// # process
407    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
408    /// #     .into_keyed()
409    /// #     .first();
410    /// keyed_singleton.map(q!(|v| v + 1))
411    /// #   .entries()
412    /// # }, |mut stream| async move {
413    /// // { 1: 3, 2: 5 }
414    /// # let mut results = Vec::new();
415    /// # for _ in 0..2 {
416    /// #     results.push(stream.next().await.unwrap());
417    /// # }
418    /// # results.sort();
419    /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
420    /// # }));
421    /// # }
422    /// ```
423    pub fn map<U, F>(
424        self,
425        f: impl IntoQuotedMut<'a, F, L> + Copy,
426    ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
427    where
428        F: Fn(V) -> U + 'a,
429    {
430        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
431        let map_f = q!({
432            let orig = f;
433            move |(k, v)| (k, orig(v))
434        })
435        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
436        .into();
437
438        KeyedSingleton::new(
439            self.location.clone(),
440            HydroNode::Map {
441                f: map_f,
442                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
443                metadata: self.location.new_node_metadata(KeyedSingleton::<
444                    K,
445                    U,
446                    L,
447                    B::EraseMonotonic,
448                >::collection_kind()),
449            },
450        )
451    }
452
453    /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
454    /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
455    ///
456    /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
457    /// the new value `U`. The key remains unchanged in the output.
458    ///
459    /// # Example
460    /// ```rust
461    /// # #[cfg(feature = "deploy")] {
462    /// # use hydro_lang::prelude::*;
463    /// # use futures::StreamExt;
464    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
465    /// let keyed_singleton = // { 1: 2, 2: 4 }
466    /// # process
467    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
468    /// #     .into_keyed()
469    /// #     .first();
470    /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
471    /// #   .entries()
472    /// # }, |mut stream| async move {
473    /// // { 1: 3, 2: 6 }
474    /// # let mut results = Vec::new();
475    /// # for _ in 0..2 {
476    /// #     results.push(stream.next().await.unwrap());
477    /// # }
478    /// # results.sort();
479    /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
480    /// # }));
481    /// # }
482    /// ```
483    pub fn map_with_key<U, F>(
484        self,
485        f: impl IntoQuotedMut<'a, F, L> + Copy,
486    ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
487    where
488        F: Fn((K, V)) -> U + 'a,
489        K: Clone,
490    {
491        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
492        let map_f = q!({
493            let orig = f;
494            move |(k, v)| {
495                let out = orig((Clone::clone(&k), v));
496                (k, out)
497            }
498        })
499        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
500        .into();
501
502        KeyedSingleton::new(
503            self.location.clone(),
504            HydroNode::Map {
505                f: map_f,
506                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
507                metadata: self.location.new_node_metadata(KeyedSingleton::<
508                    K,
509                    U,
510                    L,
511                    B::EraseMonotonic,
512                >::collection_kind()),
513            },
514        )
515    }
516
517    /// Gets the number of keys in the keyed singleton.
518    ///
519    /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
520    /// since keys may be added / removed over time. When the set of keys changes, the count will
521    /// be asynchronously updated.
522    ///
523    /// # Example
524    /// ```rust
525    /// # #[cfg(feature = "deploy")] {
526    /// # use hydro_lang::prelude::*;
527    /// # use futures::StreamExt;
528    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
529    /// # let tick = process.tick();
530    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
531    /// # process
532    /// #     .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
533    /// #     .into_keyed()
534    /// #     .batch(&tick, nondet!(/** test */))
535    /// #     .first();
536    /// keyed_singleton.key_count()
537    /// # .all_ticks()
538    /// # }, |mut stream| async move {
539    /// // 3
540    /// # assert_eq!(stream.next().await.unwrap(), 3);
541    /// # }));
542    /// # }
543    /// ```
544    pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
545        if B::ValueBound::BOUNDED {
546            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
547                location: self.location.clone(),
548                flow_state: self.flow_state.clone(),
549                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
550                _phantom: PhantomData,
551            };
552
553            me.entries().count().ignore_monotonic()
554        } else if L::is_top_level()
555            && let Some(tick) = self.location.try_tick()
556            && B::bound_kind() == KeyedSingletonBoundKind::Unbounded
557        {
558            let me: KeyedSingleton<K, V, L, Unbounded> = KeyedSingleton::new(
559                self.location.clone(),
560                self.ir_node.replace(HydroNode::Placeholder),
561            );
562
563            let out =
564                key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
565                    .latest();
566            Singleton::new(
567                self.location.clone(),
568                out.ir_node.replace(HydroNode::Placeholder),
569            )
570        } else {
571            panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
572        }
573    }
574
575    /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
576    ///
577    /// As the values for each key are updated asynchronously, the `HashMap` will be updated
578    /// asynchronously as well.
579    ///
580    /// # Example
581    /// ```rust
582    /// # #[cfg(feature = "deploy")] {
583    /// # use hydro_lang::prelude::*;
584    /// # use futures::StreamExt;
585    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
586    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
587    /// # process
588    /// #     .source_iter(q!(vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())]))
589    /// #     .into_keyed()
590    /// #     .batch(&process.tick(), nondet!(/** test */))
591    /// #     .first();
592    /// keyed_singleton.into_singleton()
593    /// # .all_ticks()
594    /// # }, |mut stream| async move {
595    /// // { 1: "a", 2: "b", 3: "c" }
596    /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())].into_iter().collect());
597    /// # }));
598    /// # }
599    /// ```
600    pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
601    where
602        K: Eq + Hash,
603    {
604        if B::ValueBound::BOUNDED {
605            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
606                location: self.location.clone(),
607                flow_state: self.flow_state.clone(),
608                ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
609                _phantom: PhantomData,
610            };
611
612            me.entries()
613                .assume_ordering_trusted(nondet!(
614                    /// There is only one element associated with each key. The closure technically
615                    /// isn't commutative in the case where both passed entries have the same key
616                    /// but different values.
617                    ///
618                    /// In the future, we may want to have an `assume!(...)` statement in the UDF that
619                    /// the key is never already present in the map.
620                ))
621                .fold(
622                    q!(|| HashMap::new()),
623                    q!(|map, (k, v)| {
624                        // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
625                        map.insert(k, v);
626                    }),
627                )
628        } else if L::is_top_level()
629            && let Some(tick) = self.location.try_tick()
630            && B::bound_kind() == KeyedSingletonBoundKind::Unbounded
631        {
632            let me: KeyedSingleton<K, V, L, Unbounded> = KeyedSingleton::new(
633                self.location.clone(),
634                self.ir_node.replace(HydroNode::Placeholder),
635            );
636
637            let out = into_singleton_inside_tick(
638                me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
639            )
640            .latest();
641            Singleton::new(
642                self.location.clone(),
643                out.ir_node.replace(HydroNode::Placeholder),
644            )
645        } else {
646            panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
647        }
648    }
649
650    /// An operator which allows you to "name" a `HydroNode`.
651    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
652    pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
653        {
654            let mut node = self.ir_node.borrow_mut();
655            let metadata = node.metadata_mut();
656            metadata.tag = Some(name.to_owned());
657        }
658        self
659    }
660
661    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
662    /// implies that `B == Bounded`.
663    pub fn make_bounded(self) -> KeyedSingleton<K, V, L, Bounded>
664    where
665        B: IsBounded,
666    {
667        KeyedSingleton::new(
668            self.location.clone(),
669            self.ir_node.replace(HydroNode::Placeholder),
670        )
671    }
672
673    /// Gets the value associated with a specific key from the keyed singleton.
674    /// Returns `None` if the key is `None` or there is no associated value.
675    ///
676    /// # Example
677    /// ```rust
678    /// # #[cfg(feature = "deploy")] {
679    /// # use hydro_lang::prelude::*;
680    /// # use futures::StreamExt;
681    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
682    /// let tick = process.tick();
683    /// let keyed_data = process
684    ///     .source_iter(q!(vec![(1, 2), (2, 3)]))
685    ///     .into_keyed()
686    ///     .batch(&tick, nondet!(/** test */))
687    ///     .first();
688    /// let key = tick.singleton(q!(1));
689    /// keyed_data.get(key).all_ticks()
690    /// # }, |mut stream| async move {
691    /// // 2
692    /// # assert_eq!(stream.next().await.unwrap(), 2);
693    /// # }));
694    /// # }
695    /// ```
696    pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Optional<V, L, Bounded>
697    where
698        B: IsBounded,
699        K: Hash + Eq + Clone,
700        V: Clone,
701    {
702        self.make_bounded()
703            .into_keyed_stream()
704            .get(key)
705            .cast_at_most_one_element()
706    }
707
708    /// Emit a keyed stream containing keys shared between the keyed singleton and the
709    /// keyed stream, where each value in the output keyed stream is a tuple of
710    /// (the keyed singleton's value, the keyed stream's value).
711    ///
712    /// # Example
713    /// ```rust
714    /// # #[cfg(feature = "deploy")] {
715    /// # use hydro_lang::prelude::*;
716    /// # use futures::StreamExt;
717    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
718    /// let tick = process.tick();
719    /// let keyed_data = process
720    ///     .source_iter(q!(vec![(1, 10), (2, 20)]))
721    ///     .into_keyed()
722    ///     .batch(&tick, nondet!(/** test */))
723    ///     .first();
724    /// let other_data = process
725    ///     .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
726    ///     .into_keyed()
727    ///     .batch(&tick, nondet!(/** test */));
728    /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
729    /// # }, |mut stream| async move {
730    /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
731    /// # let mut results = vec![];
732    /// # for _ in 0..3 {
733    /// #     results.push(stream.next().await.unwrap());
734    /// # }
735    /// # results.sort();
736    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
737    /// # }));
738    /// # }
739    /// ```
740    pub fn join_keyed_stream<O2: Ordering, R2: Retries, V2, B2: Boundedness>(
741        self,
742        other: KeyedStream<K, V2, L, B2, O2, R2>,
743    ) -> KeyedStream<K, (V, V2), L, B2, O2, R2>
744    where
745        B: IsBounded,
746        K: Eq + Hash + Clone,
747        V: Clone,
748        V2: Clone,
749    {
750        // TODO(shadaj): if DFIR guarantees that joining unbounded keyed stream x bounded keyed stream
751        // always produces deterministic order per key (nested loop join), this could just use
752        // `join_keyed_stream` without constructing IRs manually
753        KeyedStream::new(
754            self.location.clone(),
755            HydroNode::Join {
756                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
757                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
758                metadata: self
759                    .location
760                    .new_node_metadata(KeyedStream::<K, (V, V2), L, B2, O2, R2>::collection_kind()),
761            },
762        )
763    }
764
765    /// Emit a keyed singleton containing all keys shared between two keyed singletons,
766    /// where each value in the output keyed singleton is a tuple of
767    /// (self.value, other.value).
768    ///
769    /// # Example
770    /// ```rust
771    /// # #[cfg(feature = "deploy")] {
772    /// # use hydro_lang::prelude::*;
773    /// # use futures::StreamExt;
774    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
775    /// # let tick = process.tick();
776    /// let requests = // { 1: 10, 2: 20, 3: 30 }
777    /// # process
778    /// #     .source_iter(q!(vec![(1, 10), (2, 20), (3, 30)]))
779    /// #     .into_keyed()
780    /// #     .batch(&tick, nondet!(/** test */))
781    /// #     .first();
782    /// let other = // { 1: 100, 2: 200, 4: 400 }
783    /// # process
784    /// #     .source_iter(q!(vec![(1, 100), (2, 200), (4, 400)]))
785    /// #     .into_keyed()
786    /// #     .batch(&tick, nondet!(/** test */))
787    /// #     .first();
788    /// requests.join_keyed_singleton(other)
789    /// # .entries().all_ticks()
790    /// # }, |mut stream| async move {
791    /// // { 1: (10, 100), 2: (20, 200) }
792    /// # let mut results = vec![];
793    /// # for _ in 0..2 {
794    /// #     results.push(stream.next().await.unwrap());
795    /// # }
796    /// # results.sort();
797    /// # assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
798    /// # }));
799    /// # }
800    /// ```
801    pub fn join_keyed_singleton<V2: Clone>(
802        self,
803        other: KeyedSingleton<K, V2, L, Bounded>,
804    ) -> KeyedSingleton<K, (V, V2), L, Bounded>
805    where
806        B: IsBounded,
807        K: Eq + Hash + Clone,
808        V: Clone,
809    {
810        let result_stream = self
811            .make_bounded()
812            .entries()
813            .join(other.entries())
814            .into_keyed();
815
816        // The cast is guaranteed to succeed, since each key (in both `self` and `other`) has at most one value.
817        result_stream.cast_at_most_one_entry_per_key()
818    }
819
820    /// For each value in `self`, find the matching key in `lookup`.
821    /// The output is a keyed singleton with the key from `self`, and a value
822    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
823    /// If the key is not present in `lookup`, the option will be [`None`].
824    ///
825    /// # Example
826    /// ```rust
827    /// # #[cfg(feature = "deploy")] {
828    /// # use hydro_lang::prelude::*;
829    /// # use futures::StreamExt;
830    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
831    /// # let tick = process.tick();
832    /// let requests = // { 1: 10, 2: 20 }
833    /// # process
834    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
835    /// #     .into_keyed()
836    /// #     .batch(&tick, nondet!(/** test */))
837    /// #     .first();
838    /// let other_data = // { 10: 100, 11: 110 }
839    /// # process
840    /// #     .source_iter(q!(vec![(10, 100), (11, 110)]))
841    /// #     .into_keyed()
842    /// #     .batch(&tick, nondet!(/** test */))
843    /// #     .first();
844    /// requests.lookup_keyed_singleton(other_data)
845    /// # .entries().all_ticks()
846    /// # }, |mut stream| async move {
847    /// // { 1: (10, Some(100)), 2: (20, None) }
848    /// # let mut results = vec![];
849    /// # for _ in 0..2 {
850    /// #     results.push(stream.next().await.unwrap());
851    /// # }
852    /// # results.sort();
853    /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
854    /// # }));
855    /// # }
856    /// ```
857    pub fn lookup_keyed_singleton<V2>(
858        self,
859        lookup: KeyedSingleton<V, V2, L, Bounded>,
860    ) -> KeyedSingleton<K, (V, Option<V2>), L, Bounded>
861    where
862        B: IsBounded,
863        K: Eq + Hash + Clone,
864        V: Eq + Hash + Clone,
865        V2: Clone,
866    {
867        let result_stream = self
868            .make_bounded()
869            .into_keyed_stream()
870            .lookup_keyed_stream(lookup.into_keyed_stream());
871
872        // The cast is guaranteed to succeed since both lookup and self contain at most 1 value per key
873        result_stream.cast_at_most_one_entry_per_key()
874    }
875
876    /// For each value in `self`, find the matching key in `lookup`.
877    /// The output is a keyed stream with the key from `self`, and a value
878    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
879    /// If the key is not present in `lookup`, the option will be [`None`].
880    ///
881    /// # Example
882    /// ```rust
883    /// # #[cfg(feature = "deploy")] {
884    /// # use hydro_lang::prelude::*;
885    /// # use futures::StreamExt;
886    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
887    /// # let tick = process.tick();
888    /// let requests = // { 1: 10, 2: 20 }
889    /// # process
890    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
891    /// #     .into_keyed()
892    /// #     .batch(&tick, nondet!(/** test */))
893    /// #     .first();
894    /// let other_data = // { 10: 100, 10: 110 }
895    /// # process
896    /// #     .source_iter(q!(vec![(10, 100), (10, 110)]))
897    /// #     .into_keyed()
898    /// #     .batch(&tick, nondet!(/** test */));
899    /// requests.lookup_keyed_stream(other_data)
900    /// # .entries().all_ticks()
901    /// # }, |mut stream| async move {
902    /// // { 1: [(10, Some(100)), (10, Some(110))], 2: (20, None) }
903    /// # let mut results = vec![];
904    /// # for _ in 0..3 {
905    /// #     results.push(stream.next().await.unwrap());
906    /// # }
907    /// # results.sort();
908    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(110))), (2, (20, None))]);
909    /// # }));
910    /// # }
911    /// ```
912    pub fn lookup_keyed_stream<V2, O: Ordering, R: Retries>(
913        self,
914        lookup: KeyedStream<V, V2, L, Bounded, O, R>,
915    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
916    where
917        B: IsBounded,
918        K: Eq + Hash + Clone,
919        V: Eq + Hash + Clone,
920        V2: Clone,
921    {
922        self.make_bounded()
923            .entries()
924            .weaken_retries::<R>() // TODO: Once weaken_retries() is implemented for KeyedSingleton, remove entries() and into_keyed()
925            .into_keyed()
926            .lookup_keyed_stream(lookup)
927    }
928}
929
930impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
931    KeyedSingleton<K, V, L, B>
932{
933    /// Flattens the keyed singleton into an unordered stream of key-value pairs.
934    ///
935    /// The value for each key must be bounded, otherwise the resulting stream elements would be
936    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
937    /// into the output.
938    ///
939    /// # Example
940    /// ```rust
941    /// # #[cfg(feature = "deploy")] {
942    /// # use hydro_lang::prelude::*;
943    /// # use futures::StreamExt;
944    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
945    /// let keyed_singleton = // { 1: 2, 2: 4 }
946    /// # process
947    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
948    /// #     .into_keyed()
949    /// #     .first();
950    /// keyed_singleton.entries()
951    /// # }, |mut stream| async move {
952    /// // (1, 2), (2, 4) in any order
953    /// # let mut results = Vec::new();
954    /// # for _ in 0..2 {
955    /// #     results.push(stream.next().await.unwrap());
956    /// # }
957    /// # results.sort();
958    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
959    /// # }));
960    /// # }
961    /// ```
962    pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
963        self.into_keyed_stream().entries()
964    }
965
966    /// Flattens the keyed singleton into an unordered stream of just the values.
967    ///
968    /// The value for each key must be bounded, otherwise the resulting stream elements would be
969    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
970    /// into the output.
971    ///
972    /// # Example
973    /// ```rust
974    /// # #[cfg(feature = "deploy")] {
975    /// # use hydro_lang::prelude::*;
976    /// # use futures::StreamExt;
977    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
978    /// let keyed_singleton = // { 1: 2, 2: 4 }
979    /// # process
980    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
981    /// #     .into_keyed()
982    /// #     .first();
983    /// keyed_singleton.values()
984    /// # }, |mut stream| async move {
985    /// // 2, 4 in any order
986    /// # let mut results = Vec::new();
987    /// # for _ in 0..2 {
988    /// #     results.push(stream.next().await.unwrap());
989    /// # }
990    /// # results.sort();
991    /// # assert_eq!(results, vec![2, 4]);
992    /// # }));
993    /// # }
994    /// ```
995    pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
996        let map_f = q!(|(_, v)| v)
997            .splice_fn1_ctx::<(K, V), V>(&self.location)
998            .into();
999
1000        Stream::new(
1001            self.location.clone(),
1002            HydroNode::Map {
1003                f: map_f,
1004                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1005                metadata: self.location.new_node_metadata(Stream::<
1006                    V,
1007                    L,
1008                    B::UnderlyingBound,
1009                    NoOrder,
1010                    ExactlyOnce,
1011                >::collection_kind()),
1012            },
1013        )
1014    }
1015
1016    /// Flattens the keyed singleton into an unordered stream of just the keys.
1017    ///
1018    /// The value for each key must be bounded, otherwise the removal of keys would result in
1019    /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
1020    /// into the output.
1021    ///
1022    /// # Example
1023    /// ```rust
1024    /// # #[cfg(feature = "deploy")] {
1025    /// # use hydro_lang::prelude::*;
1026    /// # use futures::StreamExt;
1027    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1028    /// let keyed_singleton = // { 1: 2, 2: 4 }
1029    /// # process
1030    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1031    /// #     .into_keyed()
1032    /// #     .first();
1033    /// keyed_singleton.keys()
1034    /// # }, |mut stream| async move {
1035    /// // 1, 2 in any order
1036    /// # let mut results = Vec::new();
1037    /// # for _ in 0..2 {
1038    /// #     results.push(stream.next().await.unwrap());
1039    /// # }
1040    /// # results.sort();
1041    /// # assert_eq!(results, vec![1, 2]);
1042    /// # }));
1043    /// # }
1044    /// ```
1045    pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1046        self.entries().map(q!(|(k, _)| k))
1047    }
1048
1049    /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
1050    /// entries whose keys are not in the provided stream.
1051    ///
1052    /// # Example
1053    /// ```rust
1054    /// # #[cfg(feature = "deploy")] {
1055    /// # use hydro_lang::prelude::*;
1056    /// # use futures::StreamExt;
1057    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1058    /// let tick = process.tick();
1059    /// let keyed_singleton = // { 1: 2, 2: 4 }
1060    /// # process
1061    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1062    /// #     .into_keyed()
1063    /// #     .first()
1064    /// #     .batch(&tick, nondet!(/** test */));
1065    /// let keys_to_remove = process
1066    ///     .source_iter(q!(vec![1]))
1067    ///     .batch(&tick, nondet!(/** test */));
1068    /// keyed_singleton.filter_key_not_in(keys_to_remove)
1069    /// #   .entries().all_ticks()
1070    /// # }, |mut stream| async move {
1071    /// // { 2: 4 }
1072    /// # for w in vec![(2, 4)] {
1073    /// #     assert_eq!(stream.next().await.unwrap(), w);
1074    /// # }
1075    /// # }));
1076    /// # }
1077    /// ```
1078    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1079        self,
1080        other: Stream<K, L, Bounded, O2, R2>,
1081    ) -> Self
1082    where
1083        K: Hash + Eq,
1084    {
1085        check_matching_location(&self.location, &other.location);
1086
1087        KeyedSingleton::new(
1088            self.location.clone(),
1089            HydroNode::AntiJoin {
1090                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1091                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1092                metadata: self.location.new_node_metadata(Self::collection_kind()),
1093            },
1094        )
1095    }
1096
1097    /// An operator which allows you to "inspect" each value of a keyed singleton without
1098    /// modifying it. The closure `f` is called on a reference to each value. This is
1099    /// mainly useful for debugging, and should not be used to generate side-effects.
1100    ///
1101    /// # Example
1102    /// ```rust
1103    /// # #[cfg(feature = "deploy")] {
1104    /// # use hydro_lang::prelude::*;
1105    /// # use futures::StreamExt;
1106    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1107    /// let keyed_singleton = // { 1: 2, 2: 4 }
1108    /// # process
1109    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1110    /// #     .into_keyed()
1111    /// #     .first();
1112    /// keyed_singleton
1113    ///     .inspect(q!(|v| println!("{}", v)))
1114    /// #   .entries()
1115    /// # }, |mut stream| async move {
1116    /// // { 1: 2, 2: 4 }
1117    /// # for w in vec![(1, 2), (2, 4)] {
1118    /// #     assert_eq!(stream.next().await.unwrap(), w);
1119    /// # }
1120    /// # }));
1121    /// # }
1122    /// ```
1123    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1124    where
1125        F: Fn(&V) + 'a,
1126    {
1127        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1128        let inspect_f = q!({
1129            let orig = f;
1130            move |t: &(_, _)| orig(&t.1)
1131        })
1132        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1133        .into();
1134
1135        KeyedSingleton::new(
1136            self.location.clone(),
1137            HydroNode::Inspect {
1138                f: inspect_f,
1139                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1140                metadata: self.location.new_node_metadata(Self::collection_kind()),
1141            },
1142        )
1143    }
1144
1145    /// An operator which allows you to "inspect" each entry of a keyed singleton without
1146    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1147    /// mainly useful for debugging, and should not be used to generate side-effects.
1148    ///
1149    /// # Example
1150    /// ```rust
1151    /// # #[cfg(feature = "deploy")] {
1152    /// # use hydro_lang::prelude::*;
1153    /// # use futures::StreamExt;
1154    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1155    /// let keyed_singleton = // { 1: 2, 2: 4 }
1156    /// # process
1157    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1158    /// #     .into_keyed()
1159    /// #     .first();
1160    /// keyed_singleton
1161    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1162    /// #   .entries()
1163    /// # }, |mut stream| async move {
1164    /// // { 1: 2, 2: 4 }
1165    /// # for w in vec![(1, 2), (2, 4)] {
1166    /// #     assert_eq!(stream.next().await.unwrap(), w);
1167    /// # }
1168    /// # }));
1169    /// # }
1170    /// ```
1171    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1172    where
1173        F: Fn(&(K, V)) + 'a,
1174    {
1175        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1176
1177        KeyedSingleton::new(
1178            self.location.clone(),
1179            HydroNode::Inspect {
1180                f: inspect_f,
1181                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1182                metadata: self.location.new_node_metadata(Self::collection_kind()),
1183            },
1184        )
1185    }
1186
1187    /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
1188    ///
1189    /// Because this method requires values to be bounded, the output [`Optional`] will only be
1190    /// asynchronously updated if a new key is added that is higher than the previous max key.
1191    ///
1192    /// # Example
1193    /// ```rust
1194    /// # #[cfg(feature = "deploy")] {
1195    /// # use hydro_lang::prelude::*;
1196    /// # use futures::StreamExt;
1197    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1198    /// let tick = process.tick();
1199    /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
1200    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 123), (2, 456), (0, 789)])))
1201    /// #     .into_keyed()
1202    /// #     .first();
1203    /// keyed_singleton.get_max_key()
1204    /// # .sample_eager(nondet!(/** test */))
1205    /// # }, |mut stream| async move {
1206    /// // (2, 456)
1207    /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
1208    /// # }));
1209    /// # }
1210    /// ```
1211    pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
1212    where
1213        K: Ord,
1214    {
1215        self.entries()
1216            .assume_ordering_trusted(nondet!(
1217                /// There is only one element associated with each key, and the keys are totallly
1218                /// ordered so we will produce a deterministic value. The closure technically
1219                /// isn't commutative in the case where both passed entries have the same key
1220                /// but different values.
1221                ///
1222                /// In the future, we may want to have an `assume!(...)` statement in the UDF that
1223                /// the two inputs do not have the same key.
1224            ))
1225            .reduce(q!(
1226                move |curr, new| {
1227                    if new.0 > curr.0 {
1228                        *curr = new;
1229                    }
1230                },
1231                idempotent = manual_proof!(/** repeated elements are ignored */)
1232            ))
1233    }
1234
1235    /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
1236    /// element, the value.
1237    ///
1238    /// This is the equivalent of [`Singleton::into_stream`] but keyed.
1239    ///
1240    /// # Example
1241    /// ```rust
1242    /// # #[cfg(feature = "deploy")] {
1243    /// # use hydro_lang::prelude::*;
1244    /// # use futures::StreamExt;
1245    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1246    /// let keyed_singleton = // { 1: 2, 2: 4 }
1247    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 2), (2, 4)])))
1248    /// #     .into_keyed()
1249    /// #     .first();
1250    /// keyed_singleton
1251    ///     .clone()
1252    ///     .into_keyed_stream()
1253    ///     .merge_unordered(
1254    ///         keyed_singleton.into_keyed_stream()
1255    ///     )
1256    /// #   .entries()
1257    /// # }, |mut stream| async move {
1258    /// /// // { 1: [2, 2], 2: [4, 4] }
1259    /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
1260    /// #     assert_eq!(stream.next().await.unwrap(), w);
1261    /// # }
1262    /// # }));
1263    /// # }
1264    /// ```
1265    pub fn into_keyed_stream(
1266        self,
1267    ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
1268        KeyedStream::new(
1269            self.location.clone(),
1270            HydroNode::Cast {
1271                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1272                metadata: self.location.new_node_metadata(KeyedStream::<
1273                    K,
1274                    V,
1275                    L,
1276                    B::UnderlyingBound,
1277                    TotalOrder,
1278                    ExactlyOnce,
1279                >::collection_kind()),
1280            },
1281        )
1282    }
1283}
1284
1285impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1286where
1287    L: Location<'a>,
1288{
1289    /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1290    /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1291    ///
1292    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1293    /// processed before an acknowledgement is emitted.
1294    pub fn atomic(self) -> KeyedSingleton<K, V, Atomic<L>, B> {
1295        let id = self.location.flow_state().borrow_mut().next_clock_id();
1296        let out_location = Atomic {
1297            tick: Tick {
1298                id,
1299                l: self.location.clone(),
1300            },
1301        };
1302        KeyedSingleton::new(
1303            out_location.clone(),
1304            HydroNode::BeginAtomic {
1305                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1306                metadata: out_location
1307                    .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1308            },
1309        )
1310    }
1311}
1312
1313impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1314where
1315    L: Location<'a>,
1316{
1317    /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1318    /// See [`KeyedSingleton::atomic`] for more details.
1319    pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1320        KeyedSingleton::new(
1321            self.location.tick.l.clone(),
1322            HydroNode::EndAtomic {
1323                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1324                metadata: self
1325                    .location
1326                    .tick
1327                    .l
1328                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1329            },
1330        )
1331    }
1332}
1333
1334impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1335    /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1336    /// tick `T` always has the entries of `self` at tick `T - 1`.
1337    ///
1338    /// At tick `0`, the output has no entries, since there is no previous tick.
1339    ///
1340    /// This operator enables stateful iterative processing with ticks, by sending data from one
1341    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1342    ///
1343    /// # Example
1344    /// ```rust
1345    /// # #[cfg(feature = "deploy")] {
1346    /// # use hydro_lang::prelude::*;
1347    /// # use futures::StreamExt;
1348    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1349    /// let tick = process.tick();
1350    /// # // ticks are lazy by default, forces the second tick to run
1351    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1352    /// # let batch_first_tick = process
1353    /// #   .source_iter(q!(vec![(1, 2), (2, 3)]))
1354    /// #   .batch(&tick, nondet!(/** test */))
1355    /// #   .into_keyed();
1356    /// # let batch_second_tick = process
1357    /// #   .source_iter(q!(vec![(2, 4), (3, 5)]))
1358    /// #   .batch(&tick, nondet!(/** test */))
1359    /// #   .into_keyed()
1360    /// #   .defer_tick(); // appears on the second tick
1361    /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1362    /// # batch_first_tick.chain(batch_second_tick).first();
1363    /// input_batch.clone().filter_key_not_in(
1364    ///     input_batch.defer_tick().keys() // keys present in the previous tick
1365    /// )
1366    /// # .entries().all_ticks()
1367    /// # }, |mut stream| async move {
1368    /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1369    /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1370    /// #     assert_eq!(stream.next().await.unwrap(), w);
1371    /// # }
1372    /// # }));
1373    /// # }
1374    /// ```
1375    pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1376        KeyedSingleton::new(
1377            self.location.clone(),
1378            HydroNode::DeferTick {
1379                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1380                metadata: self
1381                    .location
1382                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1383            },
1384        )
1385    }
1386}
1387
1388impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1389where
1390    L: Location<'a>,
1391{
1392    /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1393    /// point in time.
1394    ///
1395    /// # Non-Determinism
1396    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1397    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1398    pub fn snapshot<L2: Location<'a, NoConsistency = L::NoConsistency>>(
1399        self,
1400        tick: &Tick<L2>,
1401        _nondet: NonDet,
1402    ) -> KeyedSingleton<K, V, Tick<L::NoConsistency>, Bounded> {
1403        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1404        KeyedSingleton::new(
1405            tick.drop_consistency(),
1406            HydroNode::Batch {
1407                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1408                metadata: tick
1409                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1410            },
1411        )
1412    }
1413}
1414
1415impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1416where
1417    L: Location<'a>,
1418{
1419    /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1420    /// state of the keyed singleton being atomically processed.
1421    ///
1422    /// # Non-Determinism
1423    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1424    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1425    pub fn snapshot_atomic<L2: Location<'a, NoConsistency = L::NoConsistency>>(
1426        self,
1427        tick: &Tick<L2>,
1428        _nondet: NonDet,
1429    ) -> KeyedSingleton<K, V, Tick<L::NoConsistency>, Bounded> {
1430        KeyedSingleton::new(
1431            tick.drop_consistency(),
1432            HydroNode::Batch {
1433                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1434                metadata: tick
1435                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1436            },
1437        )
1438    }
1439}
1440
1441impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1442where
1443    L: Location<'a>,
1444{
1445    /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1446    ///
1447    /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1448    /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1449    /// is filtered out.
1450    ///
1451    /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1452    /// not modify or take ownership of the values. If you need to modify the values while filtering
1453    /// use [`KeyedSingleton::filter_map`] instead.
1454    ///
1455    /// # Example
1456    /// ```rust
1457    /// # #[cfg(feature = "deploy")] {
1458    /// # use hydro_lang::prelude::*;
1459    /// # use futures::StreamExt;
1460    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1461    /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1462    /// # process
1463    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1464    /// #     .into_keyed()
1465    /// #     .first();
1466    /// keyed_singleton.filter(q!(|&v| v > 1))
1467    /// #   .entries()
1468    /// # }, |mut stream| async move {
1469    /// // { 1: 2, 2: 4 }
1470    /// # let mut results = Vec::new();
1471    /// # for _ in 0..2 {
1472    /// #     results.push(stream.next().await.unwrap());
1473    /// # }
1474    /// # results.sort();
1475    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1476    /// # }));
1477    /// # }
1478    /// ```
1479    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1480    where
1481        F: Fn(&V) -> bool + 'a,
1482    {
1483        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1484        let filter_f = q!({
1485            let orig = f;
1486            move |t: &(_, _)| orig(&t.1)
1487        })
1488        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1489        .into();
1490
1491        KeyedSingleton::new(
1492            self.location.clone(),
1493            HydroNode::Filter {
1494                f: filter_f,
1495                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1496                metadata: self
1497                    .location
1498                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1499            },
1500        )
1501    }
1502
1503    /// An operator that both filters and maps values. It yields only the key-value pairs where
1504    /// the supplied closure `f` returns `Some(value)`.
1505    ///
1506    /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1507    /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1508    /// If it returns `None`, the key-value pair is filtered out.
1509    ///
1510    /// # Example
1511    /// ```rust
1512    /// # #[cfg(feature = "deploy")] {
1513    /// # use hydro_lang::prelude::*;
1514    /// # use futures::StreamExt;
1515    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1516    /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1517    /// # process
1518    /// #     .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1519    /// #     .into_keyed()
1520    /// #     .first();
1521    /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1522    /// #   .entries()
1523    /// # }, |mut stream| async move {
1524    /// // { 1: 42, 3: 100 }
1525    /// # let mut results = Vec::new();
1526    /// # for _ in 0..2 {
1527    /// #     results.push(stream.next().await.unwrap());
1528    /// # }
1529    /// # results.sort();
1530    /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1531    /// # }));
1532    /// # }
1533    /// ```
1534    pub fn filter_map<F, U>(
1535        self,
1536        f: impl IntoQuotedMut<'a, F, L> + Copy,
1537    ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
1538    where
1539        F: Fn(V) -> Option<U> + 'a,
1540    {
1541        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1542        let filter_map_f = q!({
1543            let orig = f;
1544            move |(k, v)| orig(v).map(|o| (k, o))
1545        })
1546        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1547        .into();
1548
1549        KeyedSingleton::new(
1550            self.location.clone(),
1551            HydroNode::FilterMap {
1552                f: filter_map_f,
1553                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1554                metadata: self.location.new_node_metadata(KeyedSingleton::<
1555                    K,
1556                    U,
1557                    L,
1558                    B::EraseMonotonic,
1559                >::collection_kind()),
1560            },
1561        )
1562    }
1563
1564    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1565    /// arrived since the previous batch was released.
1566    ///
1567    /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1568    /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1569    ///
1570    /// # Non-Determinism
1571    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1572    /// has a non-deterministic set of key-value pairs.
1573    pub fn batch<L2: Location<'a, NoConsistency = L::NoConsistency>>(
1574        self,
1575        tick: &Tick<L2>,
1576        _nondet: NonDet,
1577    ) -> KeyedSingleton<K, V, Tick<L::NoConsistency>, Bounded> {
1578        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1579        KeyedSingleton::new(
1580            tick.drop_consistency(),
1581            HydroNode::Batch {
1582                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1583                metadata: tick
1584                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1585            },
1586        )
1587    }
1588}
1589
1590impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1591where
1592    L: Location<'a>,
1593{
1594    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1595    /// atomically processed.
1596    ///
1597    /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1598    /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1599    ///
1600    /// # Non-Determinism
1601    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1602    /// has a non-deterministic set of key-value pairs.
1603    pub fn batch_atomic<L2: Location<'a, NoConsistency = L::NoConsistency>>(
1604        self,
1605        tick: &Tick<L2>,
1606        nondet: NonDet,
1607    ) -> KeyedSingleton<K, V, Tick<L::NoConsistency>, Bounded> {
1608        let _ = nondet;
1609        KeyedSingleton::new(
1610            tick.drop_consistency(),
1611            HydroNode::Batch {
1612                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1613                metadata: tick
1614                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1615            },
1616        )
1617    }
1618}
1619
1620#[cfg(test)]
1621mod tests {
1622    #[cfg(feature = "deploy")]
1623    use futures::{SinkExt, StreamExt};
1624    #[cfg(feature = "deploy")]
1625    use hydro_deploy::Deployment;
1626    #[cfg(any(feature = "deploy", feature = "sim"))]
1627    use stageleft::q;
1628
1629    #[cfg(any(feature = "deploy", feature = "sim"))]
1630    use crate::compile::builder::FlowBuilder;
1631    #[cfg(any(feature = "deploy", feature = "sim"))]
1632    use crate::location::Location;
1633    #[cfg(any(feature = "deploy", feature = "sim"))]
1634    use crate::nondet::nondet;
1635
1636    #[cfg(feature = "deploy")]
1637    #[tokio::test]
1638    async fn key_count_bounded_value() {
1639        let mut deployment = Deployment::new();
1640
1641        let mut flow = FlowBuilder::new();
1642        let node = flow.process::<()>();
1643        let external = flow.external::<()>();
1644
1645        let (input_port, input) = node.source_external_bincode(&external);
1646        let out = input
1647            .into_keyed()
1648            .first()
1649            .key_count()
1650            .sample_eager(nondet!(/** test */))
1651            .send_bincode_external(&external);
1652
1653        let nodes = flow
1654            .with_process(&node, deployment.Localhost())
1655            .with_external(&external, deployment.Localhost())
1656            .deploy(&mut deployment);
1657
1658        deployment.deploy().await.unwrap();
1659
1660        let mut external_in = nodes.connect(input_port).await;
1661        let mut external_out = nodes.connect(out).await;
1662
1663        deployment.start().await.unwrap();
1664
1665        assert_eq!(external_out.next().await.unwrap(), 0);
1666
1667        external_in.send((1, 1)).await.unwrap();
1668        assert_eq!(external_out.next().await.unwrap(), 1);
1669
1670        external_in.send((2, 2)).await.unwrap();
1671        assert_eq!(external_out.next().await.unwrap(), 2);
1672    }
1673
1674    #[cfg(feature = "deploy")]
1675    #[tokio::test]
1676    async fn key_count_unbounded_value() {
1677        let mut deployment = Deployment::new();
1678
1679        let mut flow = FlowBuilder::new();
1680        let node = flow.process::<()>();
1681        let external = flow.external::<()>();
1682
1683        let (input_port, input) = node.source_external_bincode(&external);
1684        let out = input
1685            .into_keyed()
1686            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1687            .key_count()
1688            .sample_eager(nondet!(/** test */))
1689            .send_bincode_external(&external);
1690
1691        let nodes = flow
1692            .with_process(&node, deployment.Localhost())
1693            .with_external(&external, deployment.Localhost())
1694            .deploy(&mut deployment);
1695
1696        deployment.deploy().await.unwrap();
1697
1698        let mut external_in = nodes.connect(input_port).await;
1699        let mut external_out = nodes.connect(out).await;
1700
1701        deployment.start().await.unwrap();
1702
1703        assert_eq!(external_out.next().await.unwrap(), 0);
1704
1705        external_in.send((1, 1)).await.unwrap();
1706        assert_eq!(external_out.next().await.unwrap(), 1);
1707
1708        external_in.send((1, 2)).await.unwrap();
1709        assert_eq!(external_out.next().await.unwrap(), 1);
1710
1711        external_in.send((2, 2)).await.unwrap();
1712        assert_eq!(external_out.next().await.unwrap(), 2);
1713
1714        external_in.send((1, 1)).await.unwrap();
1715        assert_eq!(external_out.next().await.unwrap(), 2);
1716
1717        external_in.send((3, 1)).await.unwrap();
1718        assert_eq!(external_out.next().await.unwrap(), 3);
1719    }
1720
1721    #[cfg(feature = "deploy")]
1722    #[tokio::test]
1723    async fn into_singleton_bounded_value() {
1724        let mut deployment = Deployment::new();
1725
1726        let mut flow = FlowBuilder::new();
1727        let node = flow.process::<()>();
1728        let external = flow.external::<()>();
1729
1730        let (input_port, input) = node.source_external_bincode(&external);
1731        let out = input
1732            .into_keyed()
1733            .first()
1734            .into_singleton()
1735            .sample_eager(nondet!(/** test */))
1736            .send_bincode_external(&external);
1737
1738        let nodes = flow
1739            .with_process(&node, deployment.Localhost())
1740            .with_external(&external, deployment.Localhost())
1741            .deploy(&mut deployment);
1742
1743        deployment.deploy().await.unwrap();
1744
1745        let mut external_in = nodes.connect(input_port).await;
1746        let mut external_out = nodes.connect(out).await;
1747
1748        deployment.start().await.unwrap();
1749
1750        assert_eq!(
1751            external_out.next().await.unwrap(),
1752            std::collections::HashMap::new()
1753        );
1754
1755        external_in.send((1, 1)).await.unwrap();
1756        assert_eq!(
1757            external_out.next().await.unwrap(),
1758            vec![(1, 1)].into_iter().collect()
1759        );
1760
1761        external_in.send((2, 2)).await.unwrap();
1762        assert_eq!(
1763            external_out.next().await.unwrap(),
1764            vec![(1, 1), (2, 2)].into_iter().collect()
1765        );
1766    }
1767
1768    #[cfg(feature = "deploy")]
1769    #[tokio::test]
1770    async fn into_singleton_unbounded_value() {
1771        let mut deployment = Deployment::new();
1772
1773        let mut flow = FlowBuilder::new();
1774        let node = flow.process::<()>();
1775        let external = flow.external::<()>();
1776
1777        let (input_port, input) = node.source_external_bincode(&external);
1778        let out = input
1779            .into_keyed()
1780            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1781            .into_singleton()
1782            .sample_eager(nondet!(/** test */))
1783            .send_bincode_external(&external);
1784
1785        let nodes = flow
1786            .with_process(&node, deployment.Localhost())
1787            .with_external(&external, deployment.Localhost())
1788            .deploy(&mut deployment);
1789
1790        deployment.deploy().await.unwrap();
1791
1792        let mut external_in = nodes.connect(input_port).await;
1793        let mut external_out = nodes.connect(out).await;
1794
1795        deployment.start().await.unwrap();
1796
1797        assert_eq!(
1798            external_out.next().await.unwrap(),
1799            std::collections::HashMap::new()
1800        );
1801
1802        external_in.send((1, 1)).await.unwrap();
1803        assert_eq!(
1804            external_out.next().await.unwrap(),
1805            vec![(1, 1)].into_iter().collect()
1806        );
1807
1808        external_in.send((1, 2)).await.unwrap();
1809        assert_eq!(
1810            external_out.next().await.unwrap(),
1811            vec![(1, 2)].into_iter().collect()
1812        );
1813
1814        external_in.send((2, 2)).await.unwrap();
1815        assert_eq!(
1816            external_out.next().await.unwrap(),
1817            vec![(1, 2), (2, 1)].into_iter().collect()
1818        );
1819
1820        external_in.send((1, 1)).await.unwrap();
1821        assert_eq!(
1822            external_out.next().await.unwrap(),
1823            vec![(1, 3), (2, 1)].into_iter().collect()
1824        );
1825
1826        external_in.send((3, 1)).await.unwrap();
1827        assert_eq!(
1828            external_out.next().await.unwrap(),
1829            vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1830        );
1831    }
1832
1833    #[cfg(feature = "sim")]
1834    #[test]
1835    fn sim_unbounded_singleton_snapshot() {
1836        let mut flow = FlowBuilder::new();
1837        let node = flow.process::<()>();
1838
1839        let (input_port, input) = node.sim_input();
1840        let output = input
1841            .into_keyed()
1842            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1843            .snapshot(&node.tick(), nondet!(/** test */))
1844            .entries()
1845            .all_ticks()
1846            .sim_output();
1847
1848        let count = flow.sim().exhaustive(async || {
1849            input_port.send((1, 123));
1850            input_port.send((1, 456));
1851            input_port.send((2, 123));
1852
1853            let all = output.collect_sorted::<Vec<_>>().await;
1854            assert_eq!(all.last().unwrap(), &(2, 1));
1855        });
1856
1857        assert_eq!(count, 8);
1858    }
1859
1860    #[cfg(feature = "deploy")]
1861    #[tokio::test]
1862    async fn join_keyed_stream() {
1863        let mut deployment = Deployment::new();
1864
1865        let mut flow = FlowBuilder::new();
1866        let node = flow.process::<()>();
1867        let external = flow.external::<()>();
1868
1869        let tick = node.tick();
1870        let keyed_data = node
1871            .source_iter(q!(vec![(1, 10), (2, 20)]))
1872            .into_keyed()
1873            .batch(&tick, nondet!(/** test */))
1874            .first();
1875        let requests = node
1876            .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
1877            .into_keyed()
1878            .batch(&tick, nondet!(/** test */));
1879
1880        let out = keyed_data
1881            .join_keyed_stream(requests)
1882            .entries()
1883            .all_ticks()
1884            .send_bincode_external(&external);
1885
1886        let nodes = flow
1887            .with_process(&node, deployment.Localhost())
1888            .with_external(&external, deployment.Localhost())
1889            .deploy(&mut deployment);
1890
1891        deployment.deploy().await.unwrap();
1892
1893        let mut external_out = nodes.connect(out).await;
1894
1895        deployment.start().await.unwrap();
1896
1897        let mut results = vec![];
1898        for _ in 0..2 {
1899            results.push(external_out.next().await.unwrap());
1900        }
1901        results.sort();
1902
1903        assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
1904    }
1905}