Skip to main content

hydro_lang/live_collections/sliced/
mod.rs

1//! Utilities for transforming live collections via slicing.
2
3pub mod style;
4
5use super::boundedness::{Bounded, Unbounded};
6use super::stream::{Ordering, Retries};
7use crate::location::{Location, Tick};
8
9#[doc(hidden)]
10#[macro_export]
11macro_rules! __sliced_parse_uses__ {
12    // Parse immutable use statements with style: let name = use::style(args...);
13    (
14        @uses [$($uses:tt)*]
15        @states [$($states:tt)*]
16        let $name:ident = use:: $invocation:expr; $($rest:tt)*
17    ) => {
18        $crate::__sliced_parse_uses__!(
19            @uses [$($uses)* { $name, $invocation, $invocation }]
20            @states [$($states)*]
21            $($rest)*
22        )
23    };
24
25    // Parse immutable use statements without style: let name = use(args...);
26    (
27        @uses [$($uses:tt)*]
28        @states [$($states:tt)*]
29        let $name:ident = use($($args:expr),* $(,)?); $($rest:tt)*
30    ) => {
31        $crate::__sliced_parse_uses__!(
32            @uses [$($uses)* { $name, $crate::macro_support::copy_span::copy_span!($($args,)* default)($($args),*), $($args),* }]
33            @states [$($states)*]
34            $($rest)*
35        )
36    };
37
38    // Parse mutable state statements: let mut name = use::style::<Type>(args);
39    (
40        @uses [$($uses:tt)*]
41        @states [$($states:tt)*]
42        let mut $name:ident = use:: $style:ident $(::<$ty:ty>)? ($($args:expr)?); $($rest:tt)*
43    ) => {
44        $crate::__sliced_parse_uses__!(
45            @uses [$($uses)*]
46            @states [$($states)* { $name, $style, (($($ty)?), ($($args)?)) }]
47            $($rest)*
48        )
49    };
50
51    // Terminal case: no uses, only states
52    (
53        @uses []
54        @states [$({ $state_name:ident, $state_style:ident, $state_arg:tt })+]
55        $($body:tt)*
56    ) => {
57        {
58            // We need at least one use to get a tick, so panic if there are none
59            compile_error!("sliced! requires at least one `let name = use(...)` statement to determine the tick")
60        }
61    };
62
63    // Terminal case: uses with optional states
64    (
65        @uses [$({ $use_name:ident, $invocation:expr, $($invocation_spans:expr),* })+]
66        @states [$({ $state_name:ident, $state_style:ident, (($($state_ty:ty)?), ($($state_arg:expr)?)) })*]
67        $($body:tt)*
68    ) => {
69        {
70            use $crate::live_collections::sliced::style::*;
71            let __styled = (
72                $($invocation,)+
73            );
74
75            let __tick = $crate::live_collections::sliced::Slicable::create_tick(&__styled.0);
76            let __backtraces = {
77                use $crate::compile::ir::backtrace::__macro_get_backtrace;
78                (
79                    $($crate::macro_support::copy_span::copy_span!($($invocation_spans,)* {
80                        __macro_get_backtrace(1)
81                    }),)+
82                )
83            };
84            let __sliced = $crate::live_collections::sliced::Slicable::slice(__styled, &__tick, __backtraces);
85            let (
86                $($use_name,)+
87            ) = __sliced;
88
89            // Create all cycles and pack handles/values into tuples
90            let (__handles, __states) = $crate::live_collections::sliced::unzip_cycles((
91                $($crate::live_collections::sliced::style::$state_style$(::<$state_ty, _>)?(& __tick, $($state_arg)?),)*
92            ));
93
94            // Unpack mutable state values
95            let (
96                $(mut $state_name,)*
97            ) = __states;
98
99            // Execute the body
100            let __body_result = {
101                $($body)*
102            };
103
104            // Re-pack the final state values and complete cycles
105            let __final_states = (
106                $($state_name,)*
107            );
108            $crate::live_collections::sliced::complete_cycles(__handles, __final_states);
109
110            // Unslice the result
111            $crate::live_collections::sliced::Unslicable::unslice(__body_result)
112        }
113    };
114}
115
116#[macro_export]
117/// Transforms a live collection with a computation relying on a slice of another live collection.
118/// This is useful for reading a snapshot of an asynchronously updated collection while processing another
119/// collection, such as joining a stream with the latest values from a singleton.
120///
121/// # Syntax
122/// The `sliced!` macro takes in a closure-like syntax specifying the live collections to be sliced
123/// and the body of the transformation. Each `use` statement indicates a live collection to be sliced,
124/// along with a non-determinism explanation. Optionally, a style can be specified to control how the
125/// live collection is sliced (e.g., atomically). All `use` statements must appear before the body.
126///
127/// ```rust,ignore
128/// let stream = sliced! {
129///     let name1 = use(collection1, nondet!(/** explanation */));
130///     let name2 = use::atomic(collection2, nondet!(/** explanation */));
131///
132///     // arbitrary statements can follow
133///     let intermediate = name1.map(...);
134///     intermediate.cross_singleton(name2)
135/// };
136/// ```
137///
138/// # Stateful Computations
139/// The `sliced!` macro also supports stateful computations across iterations using `let mut` bindings
140/// with `use::state` or `use::state_null`. These create cycles that persist values between iterations.
141///
142/// - `use::state(|l| initial)`: Creates a cycle with an initial value. The closure receives
143///   the slice location and returns the initial state for the first iteration.
144/// - `use::state_null::<Type>()`: Creates a cycle that starts as null/empty on the first iteration.
145///
146/// The mutable binding can be reassigned in the body, and the final value will be passed to the
147/// next iteration.
148///
149/// ```rust,ignore
150/// let counter_stream = sliced! {
151///     let batch = use(input_stream, nondet!(/** explanation */));
152///     let mut counter = use::state(|l| l.singleton(q!(0)));
153///
154///     // Increment counter by the number of items in this batch
155///     let new_count = counter.clone().zip(batch.count())
156///         .map(q!(|(old, add)| old + add));
157///     counter = new_count.clone();
158///     new_count.into_stream()
159/// };
160/// ```
161macro_rules! __sliced__ {
162    ($($tt:tt)*) => {
163        $crate::__sliced_parse_uses__!(
164            @uses []
165            @states []
166            $($tt)*
167        )
168    };
169}
170
171pub use crate::__sliced__ as sliced;
172
173/// Marks this live collection as atomically-yielded, which means that the output outside
174/// `sliced` will be at an atomic location that is synchronous with respect to the body
175/// of the slice.
176pub fn yield_atomic<T>(t: T) -> style::Atomic<T> {
177    style::Atomic {
178        collection: t,
179        // yield_atomic doesn't need a nondet since it's for output, not input
180        nondet: crate::nondet::NonDet,
181    }
182}
183
184/// A trait for live collections which can be sliced into bounded versions at a tick.
185pub trait Slicable<'a, L: Location<'a>> {
186    /// The sliced version of this live collection.
187    type Slice;
188
189    /// The type of backtrace associated with this slice.
190    type Backtrace;
191
192    /// Gets the location associated with this live collection.
193    fn get_location(&self) -> L;
194
195    /// Creates a tick that is appropriate for the collection's location.
196    fn create_tick(&self) -> Tick<L> {
197        self.get_location().try_tick().unwrap()
198    }
199
200    /// Slices this live collection at the given tick.
201    ///
202    /// # Non-Determinism
203    /// Slicing a live collection may involve non-determinism, such as choosing which messages
204    /// to include in a batch.
205    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace) -> Self::Slice;
206}
207
208/// A trait for live collections which can be yielded out of a slice back into their original form.
209pub trait Unslicable {
210    /// The unsliced version of this live collection.
211    type Unsliced;
212
213    /// Unslices a sliced live collection back into its original form.
214    fn unslice(self) -> Self::Unsliced;
215}
216
217/// A trait for unzipping a tuple of (handle, state) pairs into separate tuples.
218#[doc(hidden)]
219pub trait UnzipCycles {
220    /// The tuple of cycle handles.
221    type Handles;
222    /// The tuple of state values.
223    type States;
224
225    /// Unzips the cycles into handles and states.
226    fn unzip(self) -> (Self::Handles, Self::States);
227}
228
229/// Unzips a tuple of cycles into handles and states.
230#[doc(hidden)]
231pub fn unzip_cycles<T: UnzipCycles>(cycles: T) -> (T::Handles, T::States) {
232    cycles.unzip()
233}
234
235/// A trait for completing a tuple of cycle handles with their final state values.
236#[doc(hidden)]
237pub trait CompleteCycles<States> {
238    /// Completes all cycles with the provided state values.
239    fn complete(self, states: States);
240}
241
242/// Completes a tuple of cycle handles with their final state values.
243#[doc(hidden)]
244pub fn complete_cycles<H: CompleteCycles<S>, S>(handles: H, states: S) {
245    handles.complete(states);
246}
247
248impl<'a, L: Location<'a>> Slicable<'a, L> for () {
249    type Slice = ();
250    type Backtrace = ();
251
252    fn get_location(&self) -> L {
253        unreachable!()
254    }
255
256    fn slice(self, _tick: &Tick<L>, _backtrace: Self::Backtrace) -> Self::Slice {}
257}
258
259impl Unslicable for () {
260    type Unsliced = ();
261
262    fn unslice(self) -> Self::Unsliced {}
263}
264
265macro_rules! impl_slicable_for_tuple {
266    ($($T:ident, $T_bt:ident, $idx:tt),+) => {
267        impl<'a, L: Location<'a>, $($T: Slicable<'a, L>),+> Slicable<'a, L> for ($($T,)+) {
268            type Slice = ($($T::Slice,)+);
269            type Backtrace = ($($T::Backtrace,)+);
270
271            fn get_location(&self) -> L {
272                self.0.get_location()
273            }
274
275            #[expect(non_snake_case, reason = "macro codegen")]
276            fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace) -> Self::Slice {
277                let ($($T,)+) = self;
278                let ($($T_bt,)+) = backtrace;
279                ($($T.slice(tick, $T_bt),)+)
280            }
281        }
282
283        impl<$($T: Unslicable),+> Unslicable for ($($T,)+) {
284            type Unsliced = ($($T::Unsliced,)+);
285
286            #[expect(non_snake_case, reason = "macro codegen")]
287            fn unslice(self) -> Self::Unsliced {
288                let ($($T,)+) = self;
289                ($($T.unslice(),)+)
290            }
291        }
292    };
293}
294
295#[cfg(stageleft_runtime)]
296impl_slicable_for_tuple!(S1, S1_bt, 0);
297#[cfg(stageleft_runtime)]
298impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1);
299#[cfg(stageleft_runtime)]
300impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2);
301#[cfg(stageleft_runtime)]
302impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3);
303#[cfg(stageleft_runtime)]
304impl_slicable_for_tuple!(
305    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4
306);
307#[cfg(stageleft_runtime)]
308impl_slicable_for_tuple!(
309    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5
310);
311#[cfg(stageleft_runtime)]
312impl_slicable_for_tuple!(
313    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
314    6
315);
316#[cfg(stageleft_runtime)]
317impl_slicable_for_tuple!(
318    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
319    6, S8, S8_bt, 7
320);
321#[cfg(stageleft_runtime)]
322impl_slicable_for_tuple!(
323    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
324    6, S8, S8_bt, 7, S9, S9_bt, 8
325);
326#[cfg(stageleft_runtime)]
327impl_slicable_for_tuple!(
328    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
329    6, S8, S8_bt, 7, S9, S9_bt, 8, S10, S10_bt, 9
330);
331#[cfg(stageleft_runtime)]
332impl_slicable_for_tuple!(
333    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
334    6, S8, S8_bt, 7, S9, S9_bt, 8, S10, S10_bt, 9, S11, S11_bt, 10
335);
336#[cfg(stageleft_runtime)]
337impl_slicable_for_tuple!(
338    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
339    6, S8, S8_bt, 7, S9, S9_bt, 8, S10, S10_bt, 9, S11, S11_bt, 10, S12, S12_bt, 11
340);
341
342macro_rules! impl_cycles_for_tuple {
343    ($($H:ident, $S:ident, $idx:tt),*) => {
344        impl<$($H, $S),*> UnzipCycles for ($(($H, $S),)*) {
345            type Handles = ($($H,)*);
346            type States = ($($S,)*);
347
348            #[expect(clippy::allow_attributes, reason = "macro codegen")]
349            #[allow(non_snake_case, reason = "macro codegen")]
350            fn unzip(self) -> (Self::Handles, Self::States) {
351                let ($($H,)*) = self;
352                (
353                    ($($H.0,)*),
354                    ($($H.1,)*),
355                )
356            }
357        }
358
359        impl<$($H: crate::forward_handle::CompleteCycle<$S>, $S),*> CompleteCycles<($($S,)*)> for ($($H,)*) {
360            #[expect(clippy::allow_attributes, reason = "macro codegen")]
361            #[allow(non_snake_case, reason = "macro codegen")]
362            fn complete(self, states: ($($S,)*)) {
363                let ($($H,)*) = self;
364                let ($($S,)*) = states;
365                $($H.complete_next_tick($S);)*
366            }
367        }
368    };
369}
370
371#[cfg(stageleft_runtime)]
372impl_cycles_for_tuple!();
373#[cfg(stageleft_runtime)]
374impl_cycles_for_tuple!(H1, S1, 0);
375#[cfg(stageleft_runtime)]
376impl_cycles_for_tuple!(H1, S1, 0, H2, S2, 1);
377#[cfg(stageleft_runtime)]
378impl_cycles_for_tuple!(H1, S1, 0, H2, S2, 1, H3, S3, 2);
379#[cfg(stageleft_runtime)]
380impl_cycles_for_tuple!(H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3);
381#[cfg(stageleft_runtime)]
382impl_cycles_for_tuple!(H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4);
383#[cfg(stageleft_runtime)]
384impl_cycles_for_tuple!(
385    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5
386);
387#[cfg(stageleft_runtime)]
388impl_cycles_for_tuple!(
389    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6
390);
391#[cfg(stageleft_runtime)]
392impl_cycles_for_tuple!(
393    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6, H8, S8, 7
394);
395#[cfg(stageleft_runtime)]
396impl_cycles_for_tuple!(
397    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6, H8, S8, 7, H9, S9,
398    8
399);
400#[cfg(stageleft_runtime)]
401impl_cycles_for_tuple!(
402    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6, H8, S8, 7, H9, S9,
403    8, H10, S10, 9
404);
405#[cfg(stageleft_runtime)]
406impl_cycles_for_tuple!(
407    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6, H8, S8, 7, H9, S9,
408    8, H10, S10, 9, H11, S11, 10
409);
410#[cfg(stageleft_runtime)]
411impl_cycles_for_tuple!(
412    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6, H8, S8, 7, H9, S9,
413    8, H10, S10, 9, H11, S11, 10, H12, S12, 11
414);
415
416// Unslicable implementations for plain collections (used when returning from sliced! body)
417impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Unslicable
418    for super::Stream<T, Tick<L>, Bounded, O, R>
419{
420    type Unsliced = super::Stream<T, L, Unbounded, O, R>;
421
422    fn unslice(self) -> Self::Unsliced {
423        self.all_ticks()
424    }
425}
426
427impl<'a, T, L: Location<'a>> Unslicable for super::Singleton<T, Tick<L>, Bounded> {
428    type Unsliced = super::Singleton<T, L, Unbounded>;
429
430    fn unslice(self) -> Self::Unsliced {
431        self.latest()
432    }
433}
434
435impl<'a, T, L: Location<'a>> Unslicable for super::Optional<T, Tick<L>, Bounded> {
436    type Unsliced = super::Optional<T, L, Unbounded>;
437
438    fn unslice(self) -> Self::Unsliced {
439        self.latest()
440    }
441}
442
443impl<'a, K, V, L: Location<'a>, O: Ordering, R: Retries> Unslicable
444    for super::KeyedStream<K, V, Tick<L>, Bounded, O, R>
445{
446    type Unsliced = super::KeyedStream<K, V, L, Unbounded, O, R>;
447
448    fn unslice(self) -> Self::Unsliced {
449        self.all_ticks()
450    }
451}
452
453// Unslicable implementations for Atomic-wrapped bounded collections
454impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Unslicable
455    for style::Atomic<super::Stream<T, Tick<L>, Bounded, O, R>>
456{
457    type Unsliced = super::Stream<T, crate::location::Atomic<L>, Unbounded, O, R>;
458
459    fn unslice(self) -> Self::Unsliced {
460        self.collection.all_ticks_atomic()
461    }
462}
463
464impl<'a, T, L: Location<'a>> Unslicable for style::Atomic<super::Singleton<T, Tick<L>, Bounded>> {
465    type Unsliced = super::Singleton<T, crate::location::Atomic<L>, Unbounded>;
466
467    fn unslice(self) -> Self::Unsliced {
468        self.collection.latest_atomic()
469    }
470}
471
472impl<'a, T, L: Location<'a>> Unslicable for style::Atomic<super::Optional<T, Tick<L>, Bounded>> {
473    type Unsliced = super::Optional<T, crate::location::Atomic<L>, Unbounded>;
474
475    fn unslice(self) -> Self::Unsliced {
476        self.collection.latest_atomic()
477    }
478}
479
480impl<'a, K, V, L: Location<'a>, O: Ordering, R: Retries> Unslicable
481    for style::Atomic<super::KeyedStream<K, V, Tick<L>, Bounded, O, R>>
482{
483    type Unsliced = super::KeyedStream<K, V, crate::location::Atomic<L>, Unbounded, O, R>;
484
485    fn unslice(self) -> Self::Unsliced {
486        self.collection.all_ticks_atomic()
487    }
488}
489
490#[cfg(feature = "sim")]
491#[cfg(test)]
492mod tests {
493    use stageleft::q;
494
495    use super::sliced;
496    use crate::location::Location;
497    use crate::nondet::nondet;
498    use crate::prelude::FlowBuilder;
499
500    /// Test a counter using `use::state` with an initial singleton value.
501    /// Each input increments the counter, and we verify the output after each tick.
502    #[test]
503    fn sim_state_counter() {
504        let mut flow = FlowBuilder::new();
505        let node = flow.process::<()>();
506
507        let (input_send, input) = node.sim_input::<i32, _, _>();
508
509        let out_recv = sliced! {
510            let batch = use(input, nondet!(/** test */));
511            let mut counter = use::state(|l| l.singleton(q!(0)));
512
513            let new_count = counter.clone().zip(batch.count())
514                .map(q!(|(old, add)| old + add));
515            counter = new_count.clone();
516            new_count.into_stream()
517        }
518        .sim_output();
519
520        flow.sim().exhaustive(async || {
521            input_send.send(1);
522            assert_eq!(out_recv.next().await.unwrap(), 1);
523
524            input_send.send(1);
525            assert_eq!(out_recv.next().await.unwrap(), 2);
526
527            input_send.send(1);
528            assert_eq!(out_recv.next().await.unwrap(), 3);
529        });
530    }
531
532    /// Test `use::state_null` with an Optional that starts as None.
533    #[cfg(feature = "sim")]
534    #[test]
535    fn sim_state_null_optional() {
536        use crate::live_collections::Optional;
537        use crate::live_collections::boundedness::Bounded;
538        use crate::location::{Location, Tick};
539
540        let mut flow = FlowBuilder::new();
541        let node = flow.process::<()>();
542
543        let (input_send, input) = node.sim_input::<i32, _, _>();
544
545        let out_recv = sliced! {
546            let batch = use(input, nondet!(/** test */));
547            let mut prev = use::state_null::<Optional<i32, Tick<_>, Bounded>>();
548
549            // Output the previous value (or -1 if none)
550            let output = prev.clone().unwrap_or(prev.location().singleton(q!(-1)));
551            // Store the current batch's first value for next tick
552            prev = batch.first();
553            output.into_stream()
554        }
555        .sim_output();
556
557        flow.sim().exhaustive(async || {
558            input_send.send(10);
559            // First tick: prev is None, so output is -1
560            assert_eq!(out_recv.next().await.unwrap(), -1);
561
562            input_send.send(20);
563            // Second tick: prev is Some(10), so output is 10
564            assert_eq!(out_recv.next().await.unwrap(), 10);
565
566            input_send.send(30);
567            // Third tick: prev is Some(20), so output is 20
568            assert_eq!(out_recv.next().await.unwrap(), 20);
569        });
570    }
571
572    /// Test `use::state` with `source_iter` to initialize a stream state.
573    /// On the first tick, the state is the initial `[10, 20]` from `source_iter`.
574    /// On subsequent ticks, the state is the batch from the previous tick.
575    #[test]
576    fn sim_state_source_iter() {
577        let mut flow = FlowBuilder::new();
578        let node = flow.process::<()>();
579
580        let (input_send, input) = node.sim_input::<i32, _, _>();
581
582        let out_recv = sliced! {
583            let batch = use(input, nondet!(/** test */));
584            let mut items = use::state(|l| l.source_iter(q!([10, 20])));
585
586            // Output the current state, then replace it with the batch
587            let output = items.clone();
588            items = batch;
589            output
590        }
591        .sim_output();
592
593        flow.sim().exhaustive(async || {
594            input_send.send(3);
595            // First tick: items = initial [10, 20], output = [10, 20]
596            let mut results = vec![];
597            results.push(out_recv.next().await.unwrap());
598            results.push(out_recv.next().await.unwrap());
599            results.sort();
600            assert_eq!(results, vec![10, 20]);
601
602            input_send.send(4);
603            // Second tick: items = [3] (from previous batch), output = [3]
604            assert_eq!(out_recv.next().await.unwrap(), 3);
605
606            input_send.send(5);
607            // Third tick: items = [4] (from previous batch), output = [4]
608            assert_eq!(out_recv.next().await.unwrap(), 4);
609        });
610    }
611
612    /// Test atomic slicing with keyed streams.
613    #[test]
614    fn sim_sliced_atomic_keyed_stream() {
615        let mut flow = FlowBuilder::new();
616        let node = flow.process::<()>();
617
618        let (input_send, input) = node.sim_input::<(i32, i32), _, _>();
619        let atomic_keyed_input = input.into_keyed().atomic();
620        let accumulated_inputs = atomic_keyed_input
621            .clone()
622            .assume_ordering(nondet!(/** Test */))
623            .fold(
624                q!(|| 0),
625                q!(|curr, new| {
626                    *curr += new;
627                }),
628            );
629
630        let out_recv = sliced! {
631            let atomic_keyed_input = use::atomic(atomic_keyed_input, nondet!(/** test */));
632            let accumulated_inputs = use::atomic(accumulated_inputs, nondet!(/** test */));
633            accumulated_inputs.join_keyed_stream(atomic_keyed_input)
634                .map(q!(|(sum, _input)| sum))
635                .entries()
636        }
637        .assume_ordering_trusted(nondet!(/** test */))
638        .sim_output();
639
640        flow.sim().exhaustive(async || {
641            input_send.send((1, 1));
642            assert_eq!(out_recv.next().await.unwrap(), (1, 1));
643
644            input_send.send((1, 2));
645            assert_eq!(out_recv.next().await.unwrap(), (1, 3));
646
647            input_send.send((2, 1));
648            assert_eq!(out_recv.next().await.unwrap(), (2, 1));
649
650            input_send.send((1, 3));
651            assert_eq!(out_recv.next().await.unwrap(), (1, 6));
652        });
653    }
654}