Skip to main content

hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick, NoAtomic};
30use crate::location::{Location, Tick, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35    AggFuncAlgebra, ApplyMonotoneStream, ValidCommutativityFor, ValidIdempotenceFor,
36};
37
38pub mod networking;
39
40/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
41#[sealed::sealed]
42pub trait Ordering:
43    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
44{
45    /// The [`StreamOrder`] corresponding to this type.
46    const ORDERING_KIND: StreamOrder;
47}
48
49/// Marks the stream as being totally ordered, which means that there are
50/// no sources of non-determinism (other than intentional ones) that will
51/// affect the order of elements.
52pub enum TotalOrder {}
53
54#[sealed::sealed]
55impl Ordering for TotalOrder {
56    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
57}
58
59/// Marks the stream as having no order, which means that the order of
60/// elements may be affected by non-determinism.
61///
62/// This restricts certain operators, such as `fold` and `reduce`, to only
63/// be used with commutative aggregation functions.
64pub enum NoOrder {}
65
66#[sealed::sealed]
67impl Ordering for NoOrder {
68    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
69}
70
71/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
72/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
73/// have `Self` guarantees instead.
74#[sealed::sealed]
75pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
76#[sealed::sealed]
77impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
78
79/// Helper trait for determining the weakest of two orderings.
80#[sealed::sealed]
81pub trait MinOrder<Other: ?Sized> {
82    /// The weaker of the two orderings.
83    type Min: Ordering;
84}
85
86#[sealed::sealed]
87impl<O: Ordering> MinOrder<O> for TotalOrder {
88    type Min = O;
89}
90
91#[sealed::sealed]
92impl<O: Ordering> MinOrder<O> for NoOrder {
93    type Min = NoOrder;
94}
95
96/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
97#[sealed::sealed]
98pub trait Retries:
99    MinRetries<Self, Min = Self>
100    + MinRetries<ExactlyOnce, Min = Self>
101    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
102{
103    /// The [`StreamRetry`] corresponding to this type.
104    const RETRIES_KIND: StreamRetry;
105}
106
107/// Marks the stream as having deterministic message cardinality, with no
108/// possibility of duplicates.
109pub enum ExactlyOnce {}
110
111#[sealed::sealed]
112impl Retries for ExactlyOnce {
113    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
114}
115
116/// Marks the stream as having non-deterministic message cardinality, which
117/// means that duplicates may occur, but messages will not be dropped.
118pub enum AtLeastOnce {}
119
120#[sealed::sealed]
121impl Retries for AtLeastOnce {
122    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
123}
124
125/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
126/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
127/// have `Self` guarantees instead.
128#[sealed::sealed]
129pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
130#[sealed::sealed]
131impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
132
133/// Helper trait for determining the weakest of two retry guarantees.
134#[sealed::sealed]
135pub trait MinRetries<Other: ?Sized> {
136    /// The weaker of the two retry guarantees.
137    type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
138}
139
140#[sealed::sealed]
141impl<R: Retries> MinRetries<R> for ExactlyOnce {
142    type Min = R;
143}
144
145#[sealed::sealed]
146impl<R: Retries> MinRetries<R> for AtLeastOnce {
147    type Min = AtLeastOnce;
148}
149
150#[sealed::sealed]
151#[diagnostic::on_unimplemented(
152    message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
153    label = "required here",
154    note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
155)]
156/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
157pub trait IsOrdered: Ordering {}
158
159#[sealed::sealed]
160#[diagnostic::do_not_recommend]
161impl IsOrdered for TotalOrder {}
162
163#[sealed::sealed]
164#[diagnostic::on_unimplemented(
165    message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
166    label = "required here",
167    note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
168)]
169/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
170pub trait IsExactlyOnce: Retries {}
171
172#[sealed::sealed]
173#[diagnostic::do_not_recommend]
174impl IsExactlyOnce for ExactlyOnce {}
175
176/// Streaming sequence of elements with type `Type`.
177///
178/// This live collection represents a growing sequence of elements, with new elements being
179/// asynchronously appended to the end of the sequence. This can be used to model the arrival
180/// of network input, such as API requests, or streaming ingestion.
181///
182/// By default, all streams have deterministic ordering and each element is materialized exactly
183/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
184/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
185/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
186///
187/// Type Parameters:
188/// - `Type`: the type of elements in the stream
189/// - `Loc`: the location where the stream is being materialized
190/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
191/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
192///   (default is [`TotalOrder`])
193/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
194///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
195pub struct Stream<
196    Type,
197    Loc,
198    Bound: Boundedness = Unbounded,
199    Order: Ordering = TotalOrder,
200    Retry: Retries = ExactlyOnce,
201> {
202    pub(crate) location: Loc,
203    pub(crate) ir_node: RefCell<HydroNode>,
204    pub(crate) flow_state: FlowState,
205
206    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
207}
208
209impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
210    fn drop(&mut self) {
211        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
212        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
213            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
214                input: Box::new(ir_node),
215                op_metadata: HydroIrOpMetadata::new(),
216            });
217        }
218    }
219}
220
221impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
222    for Stream<T, L, Unbounded, O, R>
223where
224    L: Location<'a>,
225{
226    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
227        let new_meta = stream
228            .location
229            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
230
231        Stream {
232            location: stream.location.clone(),
233            flow_state: stream.flow_state.clone(),
234            ir_node: RefCell::new(HydroNode::Cast {
235                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
236                metadata: new_meta,
237            }),
238            _phantom: PhantomData,
239        }
240    }
241}
242
243impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
244    for Stream<T, L, B, NoOrder, R>
245where
246    L: Location<'a>,
247{
248    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
249        stream.weaken_ordering()
250    }
251}
252
253impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
254    for Stream<T, L, B, O, AtLeastOnce>
255where
256    L: Location<'a>,
257{
258    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
259        stream.weaken_retries()
260    }
261}
262
263impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
264where
265    L: Location<'a>,
266{
267    fn defer_tick(self) -> Self {
268        Stream::defer_tick(self)
269    }
270}
271
272impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
273    for Stream<T, Tick<L>, Bounded, O, R>
274where
275    L: Location<'a>,
276{
277    type Location = Tick<L>;
278
279    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
280        Stream::new(
281            location.clone(),
282            HydroNode::CycleSource {
283                cycle_id,
284                metadata: location.new_node_metadata(Self::collection_kind()),
285            },
286        )
287    }
288}
289
290impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
291    for Stream<T, Tick<L>, Bounded, O, R>
292where
293    L: Location<'a>,
294{
295    type Location = Tick<L>;
296
297    fn location(&self) -> &Self::Location {
298        self.location()
299    }
300
301    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
302        let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
303            location.clone(),
304            HydroNode::DeferTick {
305                input: Box::new(HydroNode::CycleSource {
306                    cycle_id,
307                    metadata: location.new_node_metadata(Self::collection_kind()),
308                }),
309                metadata: location.new_node_metadata(Self::collection_kind()),
310            },
311        );
312
313        from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
314    }
315}
316
317impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
318    for Stream<T, Tick<L>, Bounded, O, R>
319where
320    L: Location<'a>,
321{
322    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
323        assert_eq!(
324            Location::id(&self.location),
325            expected_location,
326            "locations do not match"
327        );
328        self.location
329            .flow_state()
330            .borrow_mut()
331            .push_root(HydroRoot::CycleSink {
332                cycle_id,
333                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
334                op_metadata: HydroIrOpMetadata::new(),
335            });
336    }
337}
338
339impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
340    for Stream<T, L, B, O, R>
341where
342    L: Location<'a>,
343{
344    type Location = L;
345
346    fn create_source(cycle_id: CycleId, location: L) -> Self {
347        Stream::new(
348            location.clone(),
349            HydroNode::CycleSource {
350                cycle_id,
351                metadata: location.new_node_metadata(Self::collection_kind()),
352            },
353        )
354    }
355}
356
357impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
358    for Stream<T, L, B, O, R>
359where
360    L: Location<'a>,
361{
362    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
363        assert_eq!(
364            Location::id(&self.location),
365            expected_location,
366            "locations do not match"
367        );
368        self.location
369            .flow_state()
370            .borrow_mut()
371            .push_root(HydroRoot::CycleSink {
372                cycle_id,
373                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
374                op_metadata: HydroIrOpMetadata::new(),
375            });
376    }
377}
378
379impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
380where
381    T: Clone,
382    L: Location<'a>,
383{
384    fn clone(&self) -> Self {
385        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
386            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
387            *self.ir_node.borrow_mut() = HydroNode::Tee {
388                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
389                metadata: self.location.new_node_metadata(Self::collection_kind()),
390            };
391        }
392
393        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
394            Stream {
395                location: self.location.clone(),
396                flow_state: self.flow_state.clone(),
397                ir_node: HydroNode::Tee {
398                    inner: SharedNode(inner.0.clone()),
399                    metadata: metadata.clone(),
400                }
401                .into(),
402                _phantom: PhantomData,
403            }
404        } else {
405            unreachable!()
406        }
407    }
408}
409
410impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
411where
412    L: Location<'a>,
413{
414    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
415        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
416        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
417
418        let flow_state = location.flow_state().clone();
419        Stream {
420            location,
421            flow_state,
422            ir_node: RefCell::new(ir_node),
423            _phantom: PhantomData,
424        }
425    }
426
427    /// Returns the [`Location`] where this stream is being materialized.
428    pub fn location(&self) -> &L {
429        &self.location
430    }
431
432    /// Weakens the consistency of this live collection to not guarantee any consistency across
433    /// cluster members (if this collection is on a cluster).
434    pub fn weaken_consistency(self) -> Stream<T, L::NoConsistency, B, O, R>
435    where
436        L: Location<'a>,
437    {
438        if L::consistency()
439            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
440        {
441            // already no consistency
442            Stream::new(
443                self.location.drop_consistency(),
444                self.ir_node.replace(HydroNode::Placeholder),
445            )
446        } else {
447            Stream::new(
448                self.location.drop_consistency(),
449                HydroNode::Cast {
450                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
451                    metadata: self.location.drop_consistency().new_node_metadata(Stream::<
452                        T,
453                        L::NoConsistency,
454                        B,
455                        O,
456                        R,
457                    >::collection_kind(
458                    )),
459                },
460            )
461        }
462    }
463
464    /// Casts this live collection to have the consistency guarantees specified in the given
465    /// location type parameter. The developer must ensure that the strengthened consistency
466    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
467    pub fn assert_has_consistency_of<L2: Location<'a, NoConsistency = L::NoConsistency>>(
468        self,
469        _proof: impl crate::properties::ConsistencyProof,
470    ) -> Stream<T, L2, B, O, R>
471    where
472        L: Location<'a>,
473    {
474        if L::consistency() == L2::consistency() {
475            Stream::new(
476                self.location.with_consistency_of(),
477                self.ir_node.replace(HydroNode::Placeholder),
478            )
479        } else {
480            Stream::new(
481                self.location.with_consistency_of(),
482                HydroNode::AssertIsConsistent {
483                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
484                    metadata: self
485                        .location
486                        .clone()
487                        .with_consistency_of::<L2>()
488                        .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
489                },
490            )
491        }
492    }
493
494    pub(crate) fn collection_kind() -> CollectionKind {
495        CollectionKind::Stream {
496            bound: B::BOUND_KIND,
497            order: O::ORDERING_KIND,
498            retry: R::RETRIES_KIND,
499            element_type: quote_type::<T>().into(),
500        }
501    }
502
503    /// Produces a stream based on invoking `f` on each element.
504    /// If you do not want to modify the stream and instead only want to view
505    /// each item use [`Stream::inspect`] instead.
506    ///
507    /// # Example
508    /// ```rust
509    /// # #[cfg(feature = "deploy")] {
510    /// # use hydro_lang::prelude::*;
511    /// # use futures::StreamExt;
512    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
513    /// let words = process.source_iter(q!(vec!["hello", "world"]));
514    /// words.map(q!(|x| x.to_uppercase()))
515    /// # }, |mut stream| async move {
516    /// # for w in vec!["HELLO", "WORLD"] {
517    /// #     assert_eq!(stream.next().await.unwrap(), w);
518    /// # }
519    /// # }));
520    /// # }
521    /// ```
522    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
523    where
524        F: Fn(T) -> U + 'a,
525    {
526        let f = f.splice_fn1_ctx(&self.location).into();
527        Stream::new(
528            self.location.clone(),
529            HydroNode::Map {
530                f,
531                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
532                metadata: self
533                    .location
534                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
535            },
536        )
537    }
538
539    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
540    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
541    /// for the output type `U` must produce items in a **deterministic** order.
542    ///
543    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
544    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
545    ///
546    /// # Example
547    /// ```rust
548    /// # #[cfg(feature = "deploy")] {
549    /// # use hydro_lang::prelude::*;
550    /// # use futures::StreamExt;
551    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
552    /// process
553    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
554    ///     .flat_map_ordered(q!(|x| x))
555    /// # }, |mut stream| async move {
556    /// // 1, 2, 3, 4
557    /// # for w in (1..5) {
558    /// #     assert_eq!(stream.next().await.unwrap(), w);
559    /// # }
560    /// # }));
561    /// # }
562    /// ```
563    pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
564    where
565        I: IntoIterator<Item = U>,
566        F: Fn(T) -> I + 'a,
567    {
568        let f = f.splice_fn1_ctx(&self.location).into();
569        Stream::new(
570            self.location.clone(),
571            HydroNode::FlatMap {
572                f,
573                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
574                metadata: self
575                    .location
576                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
577            },
578        )
579    }
580
581    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
582    /// for the output type `U` to produce items in any order.
583    ///
584    /// # Example
585    /// ```rust
586    /// # #[cfg(feature = "deploy")] {
587    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
588    /// # use futures::StreamExt;
589    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
590    /// process
591    ///     .source_iter(q!(vec![
592    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
593    ///         std::collections::HashSet::from_iter(vec![3, 4]),
594    ///     ]))
595    ///     .flat_map_unordered(q!(|x| x))
596    /// # }, |mut stream| async move {
597    /// // 1, 2, 3, 4, but in no particular order
598    /// # let mut results = Vec::new();
599    /// # for w in (1..5) {
600    /// #     results.push(stream.next().await.unwrap());
601    /// # }
602    /// # results.sort();
603    /// # assert_eq!(results, vec![1, 2, 3, 4]);
604    /// # }));
605    /// # }
606    /// ```
607    pub fn flat_map_unordered<U, I, F>(
608        self,
609        f: impl IntoQuotedMut<'a, F, L>,
610    ) -> Stream<U, L, B, NoOrder, R>
611    where
612        I: IntoIterator<Item = U>,
613        F: Fn(T) -> I + 'a,
614    {
615        let f = f.splice_fn1_ctx(&self.location).into();
616        Stream::new(
617            self.location.clone(),
618            HydroNode::FlatMap {
619                f,
620                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
621                metadata: self
622                    .location
623                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
624            },
625        )
626    }
627
628    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
629    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
630    ///
631    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
632    /// not deterministic, use [`Stream::flatten_unordered`] instead.
633    ///
634    /// ```rust
635    /// # #[cfg(feature = "deploy")] {
636    /// # use hydro_lang::prelude::*;
637    /// # use futures::StreamExt;
638    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
639    /// process
640    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
641    ///     .flatten_ordered()
642    /// # }, |mut stream| async move {
643    /// // 1, 2, 3, 4
644    /// # for w in (1..5) {
645    /// #     assert_eq!(stream.next().await.unwrap(), w);
646    /// # }
647    /// # }));
648    /// # }
649    /// ```
650    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
651    where
652        T: IntoIterator<Item = U>,
653    {
654        self.flat_map_ordered(q!(|d| d))
655    }
656
657    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
658    /// for the element type `T` to produce items in any order.
659    ///
660    /// # Example
661    /// ```rust
662    /// # #[cfg(feature = "deploy")] {
663    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
664    /// # use futures::StreamExt;
665    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
666    /// process
667    ///     .source_iter(q!(vec![
668    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
669    ///         std::collections::HashSet::from_iter(vec![3, 4]),
670    ///     ]))
671    ///     .flatten_unordered()
672    /// # }, |mut stream| async move {
673    /// // 1, 2, 3, 4, but in no particular order
674    /// # let mut results = Vec::new();
675    /// # for w in (1..5) {
676    /// #     results.push(stream.next().await.unwrap());
677    /// # }
678    /// # results.sort();
679    /// # assert_eq!(results, vec![1, 2, 3, 4]);
680    /// # }));
681    /// # }
682    /// ```
683    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
684    where
685        T: IntoIterator<Item = U>,
686    {
687        self.flat_map_unordered(q!(|d| d))
688    }
689
690    /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
691    /// then emit the elements of that stream one by one. When the inner stream yields
692    /// `Pending`, this operator yields as well.
693    pub fn flat_map_stream_blocking<U, S, F>(
694        self,
695        f: impl IntoQuotedMut<'a, F, L>,
696    ) -> Stream<U, L, B, O, R>
697    where
698        S: futures::Stream<Item = U>,
699        F: Fn(T) -> S + 'a,
700    {
701        let f = f.splice_fn1_ctx(&self.location).into();
702        Stream::new(
703            self.location.clone(),
704            HydroNode::FlatMapStreamBlocking {
705                f,
706                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
707                metadata: self
708                    .location
709                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
710            },
711        )
712    }
713
714    /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
715    /// emit its elements one by one. When the inner stream yields `Pending`, this operator
716    /// yields as well.
717    pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
718    where
719        T: futures::Stream<Item = U>,
720    {
721        self.flat_map_stream_blocking(q!(|d| d))
722    }
723
724    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
725    /// `f`, preserving the order of the elements.
726    ///
727    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
728    /// not modify or take ownership of the values. If you need to modify the values while filtering
729    /// use [`Stream::filter_map`] instead.
730    ///
731    /// # Example
732    /// ```rust
733    /// # #[cfg(feature = "deploy")] {
734    /// # use hydro_lang::prelude::*;
735    /// # use futures::StreamExt;
736    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
737    /// process
738    ///     .source_iter(q!(vec![1, 2, 3, 4]))
739    ///     .filter(q!(|&x| x > 2))
740    /// # }, |mut stream| async move {
741    /// // 3, 4
742    /// # for w in (3..5) {
743    /// #     assert_eq!(stream.next().await.unwrap(), w);
744    /// # }
745    /// # }));
746    /// # }
747    /// ```
748    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
749    where
750        F: Fn(&T) -> bool + 'a,
751    {
752        let f = f.splice_fn1_borrow_ctx(&self.location).into();
753        Stream::new(
754            self.location.clone(),
755            HydroNode::Filter {
756                f,
757                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
758                metadata: self.location.new_node_metadata(Self::collection_kind()),
759            },
760        )
761    }
762
763    /// Splits the stream into two streams based on a predicate, without cloning elements.
764    ///
765    /// Elements for which `f` returns `true` are sent to the first output stream,
766    /// and elements for which `f` returns `false` are sent to the second output stream.
767    ///
768    /// Unlike using `filter` twice, this only evaluates the predicate once per element
769    /// and does not require `T: Clone`.
770    ///
771    /// The closure `f` receives a reference `&T` rather than an owned value `T` because
772    /// the predicate is only used for routing; the element itself is moved to the
773    /// appropriate output stream.
774    ///
775    /// # Example
776    /// ```rust
777    /// # #[cfg(feature = "deploy")] {
778    /// # use hydro_lang::prelude::*;
779    /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
780    /// # use futures::StreamExt;
781    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
782    /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
783    /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
784    /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
785    /// evens.map(q!(|x| (x, true)))
786    ///     .merge_unordered(odds.map(q!(|x| (x, false))))
787    /// # }, |mut stream| async move {
788    /// # let mut results = Vec::new();
789    /// # for _ in 0..6 {
790    /// #     results.push(stream.next().await.unwrap());
791    /// # }
792    /// # results.sort();
793    /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
794    /// # }));
795    /// # }
796    /// ```
797    #[expect(
798        clippy::type_complexity,
799        reason = "return type mirrors the input stream type"
800    )]
801    pub fn partition<F>(
802        self,
803        f: impl IntoQuotedMut<'a, F, L>,
804    ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
805    where
806        F: Fn(&T) -> bool + 'a,
807    {
808        let f: crate::compile::ir::DebugExpr = f.splice_fn1_borrow_ctx(&self.location).into();
809        let shared = SharedNode(Rc::new(RefCell::new(
810            self.ir_node.replace(HydroNode::Placeholder),
811        )));
812
813        let true_stream = Stream::new(
814            self.location.clone(),
815            HydroNode::Partition {
816                inner: SharedNode(shared.0.clone()),
817                f: f.clone(),
818                is_true: true,
819                metadata: self.location.new_node_metadata(Self::collection_kind()),
820            },
821        );
822
823        let false_stream = Stream::new(
824            self.location.clone(),
825            HydroNode::Partition {
826                inner: SharedNode(shared.0),
827                f,
828                is_true: false,
829                metadata: self.location.new_node_metadata(Self::collection_kind()),
830            },
831        );
832
833        (true_stream, false_stream)
834    }
835
836    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
837    ///
838    /// # Example
839    /// ```rust
840    /// # #[cfg(feature = "deploy")] {
841    /// # use hydro_lang::prelude::*;
842    /// # use futures::StreamExt;
843    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
844    /// process
845    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
846    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
847    /// # }, |mut stream| async move {
848    /// // 1, 2
849    /// # for w in (1..3) {
850    /// #     assert_eq!(stream.next().await.unwrap(), w);
851    /// # }
852    /// # }));
853    /// # }
854    /// ```
855    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
856    where
857        F: Fn(T) -> Option<U> + 'a,
858    {
859        let f = f.splice_fn1_ctx(&self.location).into();
860        Stream::new(
861            self.location.clone(),
862            HydroNode::FilterMap {
863                f,
864                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
865                metadata: self
866                    .location
867                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
868            },
869        )
870    }
871
872    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
873    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
874    /// If `other` is an empty [`Optional`], no values will be produced.
875    ///
876    /// # Example
877    /// ```rust
878    /// # #[cfg(feature = "deploy")] {
879    /// # use hydro_lang::prelude::*;
880    /// # use futures::StreamExt;
881    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
882    /// let tick = process.tick();
883    /// let batch = process
884    ///   .source_iter(q!(vec![1, 2, 3, 4]))
885    ///   .batch(&tick, nondet!(/** test */));
886    /// let count = batch.clone().count(); // `count()` returns a singleton
887    /// batch.cross_singleton(count).all_ticks()
888    /// # }, |mut stream| async move {
889    /// // (1, 4), (2, 4), (3, 4), (4, 4)
890    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
891    /// #     assert_eq!(stream.next().await.unwrap(), w);
892    /// # }
893    /// # }));
894    /// # }
895    /// ```
896    pub fn cross_singleton<O2>(
897        self,
898        other: impl Into<Optional<O2, L, Bounded>>,
899    ) -> Stream<(T, O2), L, B, O, R>
900    where
901        O2: Clone,
902    {
903        let other: Optional<O2, L, Bounded> = other.into();
904        check_matching_location(&self.location, &other.location);
905
906        Stream::new(
907            self.location.clone(),
908            HydroNode::CrossSingleton {
909                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
910                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
911                metadata: self
912                    .location
913                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
914            },
915        )
916    }
917
918    /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
919    ///
920    /// # Example
921    /// ```rust
922    /// # #[cfg(feature = "deploy")] {
923    /// # use hydro_lang::prelude::*;
924    /// # use futures::StreamExt;
925    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
926    /// let tick = process.tick();
927    /// // ticks are lazy by default, forces the second tick to run
928    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
929    ///
930    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
931    /// let batch_first_tick = process
932    ///   .source_iter(q!(vec![1, 2, 3, 4]))
933    ///   .batch(&tick, nondet!(/** test */));
934    /// let batch_second_tick = process
935    ///   .source_iter(q!(vec![5, 6, 7, 8]))
936    ///   .batch(&tick, nondet!(/** test */))
937    ///   .defer_tick();
938    /// batch_first_tick.chain(batch_second_tick)
939    ///   .filter_if(signal)
940    ///   .all_ticks()
941    /// # }, |mut stream| async move {
942    /// // [1, 2, 3, 4]
943    /// # for w in vec![1, 2, 3, 4] {
944    /// #     assert_eq!(stream.next().await.unwrap(), w);
945    /// # }
946    /// # }));
947    /// # }
948    /// ```
949    pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
950        self.cross_singleton(signal.filter(q!(|b| *b)))
951            .map(q!(|(d, _)| d))
952    }
953
954    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
955    ///
956    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
957    /// leader of a cluster.
958    ///
959    /// # Example
960    /// ```rust
961    /// # #[cfg(feature = "deploy")] {
962    /// # use hydro_lang::prelude::*;
963    /// # use futures::StreamExt;
964    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
965    /// let tick = process.tick();
966    /// // ticks are lazy by default, forces the second tick to run
967    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
968    ///
969    /// let batch_first_tick = process
970    ///   .source_iter(q!(vec![1, 2, 3, 4]))
971    ///   .batch(&tick, nondet!(/** test */));
972    /// let batch_second_tick = process
973    ///   .source_iter(q!(vec![5, 6, 7, 8]))
974    ///   .batch(&tick, nondet!(/** test */))
975    ///   .defer_tick(); // appears on the second tick
976    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
977    /// batch_first_tick.chain(batch_second_tick)
978    ///   .filter_if_some(some_on_first_tick)
979    ///   .all_ticks()
980    /// # }, |mut stream| async move {
981    /// // [1, 2, 3, 4]
982    /// # for w in vec![1, 2, 3, 4] {
983    /// #     assert_eq!(stream.next().await.unwrap(), w);
984    /// # }
985    /// # }));
986    /// # }
987    /// ```
988    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
989    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
990        self.filter_if(signal.is_some())
991    }
992
993    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
994    ///
995    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
996    /// some local state.
997    ///
998    /// # Example
999    /// ```rust
1000    /// # #[cfg(feature = "deploy")] {
1001    /// # use hydro_lang::prelude::*;
1002    /// # use futures::StreamExt;
1003    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1004    /// let tick = process.tick();
1005    /// // ticks are lazy by default, forces the second tick to run
1006    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1007    ///
1008    /// let batch_first_tick = process
1009    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1010    ///   .batch(&tick, nondet!(/** test */));
1011    /// let batch_second_tick = process
1012    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1013    ///   .batch(&tick, nondet!(/** test */))
1014    ///   .defer_tick(); // appears on the second tick
1015    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1016    /// batch_first_tick.chain(batch_second_tick)
1017    ///   .filter_if_none(some_on_first_tick)
1018    ///   .all_ticks()
1019    /// # }, |mut stream| async move {
1020    /// // [5, 6, 7, 8]
1021    /// # for w in vec![5, 6, 7, 8] {
1022    /// #     assert_eq!(stream.next().await.unwrap(), w);
1023    /// # }
1024    /// # }));
1025    /// # }
1026    /// ```
1027    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1028    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1029        self.filter_if(other.is_none())
1030    }
1031
1032    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams,
1033    /// returning all tupled pairs.
1034    ///
1035    /// When the right side is [`Bounded`], it is accumulated first and the left side streams
1036    /// through, preserving the left side's ordering. When both sides are [`Unbounded`], a
1037    /// symmetric hash join is used and ordering is [`NoOrder`].
1038    ///
1039    /// # Example
1040    /// ```rust
1041    /// # #[cfg(feature = "deploy")] {
1042    /// # use hydro_lang::prelude::*;
1043    /// # use std::collections::HashSet;
1044    /// # use futures::StreamExt;
1045    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1046    /// let tick = process.tick();
1047    /// let stream1 = process.source_iter(q!(vec![1, 2]));
1048    /// let stream2 = process.source_iter(q!(vec!['a', 'b']));
1049    /// stream1.cross_product(stream2)
1050    /// # }, |mut stream| async move {
1051    /// // (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b') in any order
1052    /// # let expected = HashSet::from([(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]);
1053    /// # stream.map(|i| assert!(expected.contains(&i)));
1054    /// # }));
1055    /// # }
1056    pub fn cross_product<T2, B2: Boundedness, O2: Ordering>(
1057        self,
1058        other: Stream<T2, L, B2, O2, R>,
1059    ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, R>
1060    where
1061        T: Clone,
1062        T2: Clone,
1063    {
1064        self.map(q!(|v| ((), v)))
1065            .join(other.map(q!(|v| ((), v))))
1066            .map(q!(|((), (v1, v2))| (v1, v2)))
1067    }
1068
1069    /// Takes one stream as input and filters out any duplicate occurrences. The output
1070    /// contains all unique values from the input.
1071    ///
1072    /// # Example
1073    /// ```rust
1074    /// # #[cfg(feature = "deploy")] {
1075    /// # use hydro_lang::prelude::*;
1076    /// # use futures::StreamExt;
1077    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1078    /// let tick = process.tick();
1079    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1080    /// # }, |mut stream| async move {
1081    /// # for w in vec![1, 2, 3, 4] {
1082    /// #     assert_eq!(stream.next().await.unwrap(), w);
1083    /// # }
1084    /// # }));
1085    /// # }
1086    /// ```
1087    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1088    where
1089        T: Eq + Hash,
1090    {
1091        Stream::new(
1092            self.location.clone(),
1093            HydroNode::Unique {
1094                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1095                metadata: self
1096                    .location
1097                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1098            },
1099        )
1100    }
1101
1102    /// Outputs everything in this stream that is *not* contained in the `other` stream.
1103    ///
1104    /// The `other` stream must be [`Bounded`], since this function will wait until
1105    /// all its elements are available before producing any output.
1106    /// # Example
1107    /// ```rust
1108    /// # #[cfg(feature = "deploy")] {
1109    /// # use hydro_lang::prelude::*;
1110    /// # use futures::StreamExt;
1111    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1112    /// let tick = process.tick();
1113    /// let stream = process
1114    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1115    ///   .batch(&tick, nondet!(/** test */));
1116    /// let batch = process
1117    ///   .source_iter(q!(vec![1, 2]))
1118    ///   .batch(&tick, nondet!(/** test */));
1119    /// stream.filter_not_in(batch).all_ticks()
1120    /// # }, |mut stream| async move {
1121    /// # for w in vec![3, 4] {
1122    /// #     assert_eq!(stream.next().await.unwrap(), w);
1123    /// # }
1124    /// # }));
1125    /// # }
1126    /// ```
1127    pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1128    where
1129        T: Eq + Hash,
1130        B2: IsBounded,
1131    {
1132        check_matching_location(&self.location, &other.location);
1133
1134        Stream::new(
1135            self.location.clone(),
1136            HydroNode::Difference {
1137                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1138                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1139                metadata: self
1140                    .location
1141                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1142            },
1143        )
1144    }
1145
1146    /// An operator which allows you to "inspect" each element of a stream without
1147    /// modifying it. The closure `f` is called on a reference to each item. This is
1148    /// mainly useful for debugging, and should not be used to generate side-effects.
1149    ///
1150    /// # Example
1151    /// ```rust
1152    /// # #[cfg(feature = "deploy")] {
1153    /// # use hydro_lang::prelude::*;
1154    /// # use futures::StreamExt;
1155    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1156    /// let nums = process.source_iter(q!(vec![1, 2]));
1157    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1158    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1159    /// # }, |mut stream| async move {
1160    /// # for w in vec![1, 2] {
1161    /// #     assert_eq!(stream.next().await.unwrap(), w);
1162    /// # }
1163    /// # }));
1164    /// # }
1165    /// ```
1166    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1167    where
1168        F: Fn(&T) + 'a,
1169    {
1170        let f = f.splice_fn1_borrow_ctx(&self.location).into();
1171
1172        Stream::new(
1173            self.location.clone(),
1174            HydroNode::Inspect {
1175                f,
1176                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1177                metadata: self.location.new_node_metadata(Self::collection_kind()),
1178            },
1179        )
1180    }
1181
1182    /// Executes the provided closure for every element in this stream.
1183    ///
1184    /// Because the closure may have side effects, the stream must have deterministic order
1185    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1186    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1187    /// [`Stream::assume_retries`] with an explanation for why this is the case.
1188    pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1189    where
1190        O: IsOrdered,
1191        R: IsExactlyOnce,
1192    {
1193        let f = f.splice_fn1_ctx(&self.location).into();
1194        self.location
1195            .flow_state()
1196            .borrow_mut()
1197            .push_root(HydroRoot::ForEach {
1198                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1199                f,
1200                op_metadata: HydroIrOpMetadata::new(),
1201            });
1202    }
1203
1204    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1205    /// TCP socket to some other server. You should _not_ use this API for interacting with
1206    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1207    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1208    /// interaction with asynchronous sinks.
1209    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1210    where
1211        O: IsOrdered,
1212        R: IsExactlyOnce,
1213        S: 'a + futures::Sink<T> + Unpin,
1214    {
1215        self.location
1216            .flow_state()
1217            .borrow_mut()
1218            .push_root(HydroRoot::DestSink {
1219                sink: sink.splice_typed_ctx(&self.location).into(),
1220                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1221                op_metadata: HydroIrOpMetadata::new(),
1222            });
1223    }
1224
1225    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1226    ///
1227    /// # Example
1228    /// ```rust
1229    /// # #[cfg(feature = "deploy")] {
1230    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1231    /// # use futures::StreamExt;
1232    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1233    /// let tick = process.tick();
1234    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1235    /// numbers.enumerate()
1236    /// # }, |mut stream| async move {
1237    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1238    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1239    /// #     assert_eq!(stream.next().await.unwrap(), w);
1240    /// # }
1241    /// # }));
1242    /// # }
1243    /// ```
1244    pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1245    where
1246        O: IsOrdered,
1247        R: IsExactlyOnce,
1248    {
1249        Stream::new(
1250            self.location.clone(),
1251            HydroNode::Enumerate {
1252                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1253                metadata: self.location.new_node_metadata(Stream::<
1254                    (usize, T),
1255                    L,
1256                    B,
1257                    TotalOrder,
1258                    ExactlyOnce,
1259                >::collection_kind()),
1260            },
1261        )
1262    }
1263
1264    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1265    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1266    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1267    ///
1268    /// Depending on the input stream guarantees, the closure may need to be commutative
1269    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1270    ///
1271    /// # Example
1272    /// ```rust
1273    /// # #[cfg(feature = "deploy")] {
1274    /// # use hydro_lang::prelude::*;
1275    /// # use futures::StreamExt;
1276    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1277    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1278    /// words
1279    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1280    ///     .into_stream()
1281    /// # }, |mut stream| async move {
1282    /// // "HELLOWORLD"
1283    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1284    /// # }));
1285    /// # }
1286    /// ```
1287    pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1288        self,
1289        init: impl IntoQuotedMut<'a, I, L>,
1290        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1291    ) -> Singleton<A, L, B2>
1292    where
1293        I: Fn() -> A + 'a,
1294        F: Fn(&mut A, T),
1295        C: ValidCommutativityFor<O>,
1296        Idemp: ValidIdempotenceFor<R>,
1297        B: ApplyMonotoneStream<M, B2>,
1298    {
1299        let init = init.splice_fn0_ctx(&self.location).into();
1300        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1301        proof.register_proof(&comb);
1302
1303        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1304        let ordered_etc: Stream<T, L::NoConsistency, B> =
1305            self.assume_retries(nondet).assume_ordering(nondet);
1306
1307        let core = HydroNode::Fold {
1308            init,
1309            acc: comb.into(),
1310            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1311            metadata: ordered_etc
1312                .location
1313                .new_node_metadata(Singleton::<A, L::NoConsistency, B2>::collection_kind()),
1314        };
1315
1316        Singleton::new(ordered_etc.location.clone(), core)
1317            .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1318    }
1319
1320    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1321    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1322    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1323    /// reference, so that it can be modified in place.
1324    ///
1325    /// Depending on the input stream guarantees, the closure may need to be commutative
1326    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1327    ///
1328    /// # Example
1329    /// ```rust
1330    /// # #[cfg(feature = "deploy")] {
1331    /// # use hydro_lang::prelude::*;
1332    /// # use futures::StreamExt;
1333    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1334    /// let bools = process.source_iter(q!(vec![false, true, false]));
1335    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1336    /// # }, |mut stream| async move {
1337    /// // true
1338    /// # assert_eq!(stream.next().await.unwrap(), true);
1339    /// # }));
1340    /// # }
1341    /// ```
1342    pub fn reduce<F, C, Idemp>(
1343        self,
1344        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1345    ) -> Optional<T, L, B>
1346    where
1347        F: Fn(&mut T, T) + 'a,
1348        C: ValidCommutativityFor<O>,
1349        Idemp: ValidIdempotenceFor<R>,
1350    {
1351        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1352        proof.register_proof(&f);
1353
1354        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1355        let ordered_etc: Stream<T, L::NoConsistency, B> =
1356            self.assume_retries(nondet).assume_ordering(nondet);
1357
1358        let core = HydroNode::Reduce {
1359            f: f.into(),
1360            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1361            metadata: ordered_etc
1362                .location
1363                .new_node_metadata(Optional::<T, L::NoConsistency, B>::collection_kind()),
1364        };
1365
1366        Optional::new(ordered_etc.location.clone(), core)
1367            .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1368    }
1369
1370    /// Computes the maximum element in the stream as an [`Optional`], which
1371    /// will be empty until the first element in the input arrives.
1372    ///
1373    /// # Example
1374    /// ```rust
1375    /// # #[cfg(feature = "deploy")] {
1376    /// # use hydro_lang::prelude::*;
1377    /// # use futures::StreamExt;
1378    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1379    /// let tick = process.tick();
1380    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1381    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1382    /// batch.max().all_ticks()
1383    /// # }, |mut stream| async move {
1384    /// // 4
1385    /// # assert_eq!(stream.next().await.unwrap(), 4);
1386    /// # }));
1387    /// # }
1388    /// ```
1389    pub fn max(self) -> Optional<T, L, B>
1390    where
1391        T: Ord,
1392    {
1393        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1394            .assume_ordering_trusted_bounded::<TotalOrder>(
1395                nondet!(/** max is commutative, but order affects intermediates */),
1396            )
1397            .reduce(q!(|curr, new| {
1398                if new > *curr {
1399                    *curr = new;
1400                }
1401            }))
1402    }
1403
1404    /// Computes the minimum element in the stream as an [`Optional`], which
1405    /// will be empty until the first element in the input arrives.
1406    ///
1407    /// # Example
1408    /// ```rust
1409    /// # #[cfg(feature = "deploy")] {
1410    /// # use hydro_lang::prelude::*;
1411    /// # use futures::StreamExt;
1412    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1413    /// let tick = process.tick();
1414    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1415    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1416    /// batch.min().all_ticks()
1417    /// # }, |mut stream| async move {
1418    /// // 1
1419    /// # assert_eq!(stream.next().await.unwrap(), 1);
1420    /// # }));
1421    /// # }
1422    /// ```
1423    pub fn min(self) -> Optional<T, L, B>
1424    where
1425        T: Ord,
1426    {
1427        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1428            .assume_ordering_trusted_bounded::<TotalOrder>(
1429                nondet!(/** max is commutative, but order affects intermediates */),
1430            )
1431            .reduce(q!(|curr, new| {
1432                if new < *curr {
1433                    *curr = new;
1434                }
1435            }))
1436    }
1437
1438    /// Computes the first element in the stream as an [`Optional`], which
1439    /// will be empty until the first element in the input arrives.
1440    ///
1441    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1442    /// re-ordering of elements may cause the first element to change.
1443    ///
1444    /// # Example
1445    /// ```rust
1446    /// # #[cfg(feature = "deploy")] {
1447    /// # use hydro_lang::prelude::*;
1448    /// # use futures::StreamExt;
1449    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1450    /// let tick = process.tick();
1451    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1452    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1453    /// batch.first().all_ticks()
1454    /// # }, |mut stream| async move {
1455    /// // 1
1456    /// # assert_eq!(stream.next().await.unwrap(), 1);
1457    /// # }));
1458    /// # }
1459    /// ```
1460    pub fn first(self) -> Optional<T, L, B>
1461    where
1462        O: IsOrdered,
1463    {
1464        self.make_totally_ordered()
1465            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1466            .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1467            .reduce(q!(|_, _| {}))
1468    }
1469
1470    /// Computes the last element in the stream as an [`Optional`], which
1471    /// will be empty until an element in the input arrives.
1472    ///
1473    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1474    /// re-ordering of elements may cause the last element to change.
1475    ///
1476    /// # Example
1477    /// ```rust
1478    /// # #[cfg(feature = "deploy")] {
1479    /// # use hydro_lang::prelude::*;
1480    /// # use futures::StreamExt;
1481    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1482    /// let tick = process.tick();
1483    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1484    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1485    /// batch.last().all_ticks()
1486    /// # }, |mut stream| async move {
1487    /// // 4
1488    /// # assert_eq!(stream.next().await.unwrap(), 4);
1489    /// # }));
1490    /// # }
1491    /// ```
1492    pub fn last(self) -> Optional<T, L, B>
1493    where
1494        O: IsOrdered,
1495    {
1496        self.make_totally_ordered()
1497            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1498            .reduce(q!(|curr, new| *curr = new))
1499    }
1500
1501    /// Returns a stream containing at most the first `n` elements of the input stream,
1502    /// preserving the original order. Similar to `LIMIT` in SQL.
1503    ///
1504    /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1505    /// retries, since the result depends on the order and cardinality of elements.
1506    ///
1507    /// # Example
1508    /// ```rust
1509    /// # #[cfg(feature = "deploy")] {
1510    /// # use hydro_lang::prelude::*;
1511    /// # use futures::StreamExt;
1512    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1513    /// let numbers = process.source_iter(q!(vec![10, 20, 30, 40, 50]));
1514    /// numbers.limit(q!(3))
1515    /// # }, |mut stream| async move {
1516    /// // 10, 20, 30
1517    /// # for w in vec![10, 20, 30] {
1518    /// #     assert_eq!(stream.next().await.unwrap(), w);
1519    /// # }
1520    /// # }));
1521    /// # }
1522    /// ```
1523    pub fn limit(
1524        self,
1525        n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1526    ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1527    where
1528        O: IsOrdered,
1529        R: IsExactlyOnce,
1530    {
1531        self.generator(
1532            q!(|| 0usize),
1533            q!(move |count, item| {
1534                if *count == n {
1535                    Generate::Break
1536                } else {
1537                    *count += 1;
1538                    if *count == n {
1539                        Generate::Return(item)
1540                    } else {
1541                        Generate::Yield(item)
1542                    }
1543                }
1544            }),
1545        )
1546    }
1547
1548    /// Collects all the elements of this stream into a single [`Vec`] element.
1549    ///
1550    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1551    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1552    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1553    /// the vector at an arbitrary point in time.
1554    ///
1555    /// # Example
1556    /// ```rust
1557    /// # #[cfg(feature = "deploy")] {
1558    /// # use hydro_lang::prelude::*;
1559    /// # use futures::StreamExt;
1560    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1561    /// let tick = process.tick();
1562    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1563    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1564    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1565    /// # }, |mut stream| async move {
1566    /// // [ vec![1, 2, 3, 4] ]
1567    /// # for w in vec![vec![1, 2, 3, 4]] {
1568    /// #     assert_eq!(stream.next().await.unwrap(), w);
1569    /// # }
1570    /// # }));
1571    /// # }
1572    /// ```
1573    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1574    where
1575        O: IsOrdered,
1576        R: IsExactlyOnce,
1577    {
1578        self.make_totally_ordered().make_exactly_once().fold(
1579            q!(|| vec![]),
1580            q!(|acc, v| {
1581                acc.push(v);
1582            }),
1583        )
1584    }
1585
1586    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1587    /// and emitting each intermediate result.
1588    ///
1589    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1590    /// containing all intermediate accumulated values. The scan operation can also terminate early
1591    /// by returning `None`.
1592    ///
1593    /// The function takes a mutable reference to the accumulator and the current element, and returns
1594    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1595    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1596    ///
1597    /// # Examples
1598    ///
1599    /// Basic usage - running sum:
1600    /// ```rust
1601    /// # #[cfg(feature = "deploy")] {
1602    /// # use hydro_lang::prelude::*;
1603    /// # use futures::StreamExt;
1604    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1605    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1606    ///     q!(|| 0),
1607    ///     q!(|acc, x| {
1608    ///         *acc += x;
1609    ///         Some(*acc)
1610    ///     }),
1611    /// )
1612    /// # }, |mut stream| async move {
1613    /// // Output: 1, 3, 6, 10
1614    /// # for w in vec![1, 3, 6, 10] {
1615    /// #     assert_eq!(stream.next().await.unwrap(), w);
1616    /// # }
1617    /// # }));
1618    /// # }
1619    /// ```
1620    ///
1621    /// Early termination example:
1622    /// ```rust
1623    /// # #[cfg(feature = "deploy")] {
1624    /// # use hydro_lang::prelude::*;
1625    /// # use futures::StreamExt;
1626    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1627    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1628    ///     q!(|| 1),
1629    ///     q!(|state, x| {
1630    ///         *state = *state * x;
1631    ///         if *state > 6 {
1632    ///             None // Terminate the stream
1633    ///         } else {
1634    ///             Some(-*state)
1635    ///         }
1636    ///     }),
1637    /// )
1638    /// # }, |mut stream| async move {
1639    /// // Output: -1, -2, -6
1640    /// # for w in vec![-1, -2, -6] {
1641    /// #     assert_eq!(stream.next().await.unwrap(), w);
1642    /// # }
1643    /// # }));
1644    /// # }
1645    /// ```
1646    pub fn scan<A, U, I, F>(
1647        self,
1648        init: impl IntoQuotedMut<'a, I, L>,
1649        f: impl IntoQuotedMut<'a, F, L>,
1650    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1651    where
1652        O: IsOrdered,
1653        R: IsExactlyOnce,
1654        I: Fn() -> A + 'a,
1655        F: Fn(&mut A, T) -> Option<U> + 'a,
1656    {
1657        let init = init.splice_fn0_ctx(&self.location).into();
1658        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1659
1660        Stream::new(
1661            self.location.clone(),
1662            HydroNode::Scan {
1663                init,
1664                acc: f,
1665                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1666                metadata: self.location.new_node_metadata(
1667                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1668                ),
1669            },
1670        )
1671    }
1672
1673    /// Async version of [`Stream::scan`]. Applies an async function to each element of the
1674    /// stream, maintaining an internal state (accumulator) and emitting the values returned
1675    /// by the function.
1676    ///
1677    /// The closure runs synchronously (so it can mutate the accumulator), then returns a
1678    /// future. The future is polled to completion. If it resolves to `Some`, the value is
1679    /// emitted. If it resolves to `None`, the item is filtered out.
1680    ///
1681    /// # Examples
1682    ///
1683    /// ```rust
1684    /// # #[cfg(feature = "deploy")] {
1685    /// # use hydro_lang::prelude::*;
1686    /// # use futures::StreamExt;
1687    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1688    /// process
1689    ///     .source_iter(q!(vec![1, 2, 3, 4]))
1690    ///     .scan_async_blocking(
1691    ///         q!(|| 0),
1692    ///         q!(|acc, x| {
1693    ///             *acc += x;
1694    ///             let val = *acc;
1695    ///             async move { Some(val) }
1696    ///         }),
1697    ///     )
1698    /// # }, |mut stream| async move {
1699    /// // Output: 1, 3, 6, 10
1700    /// # for w in vec![1, 3, 6, 10] {
1701    /// #     assert_eq!(stream.next().await.unwrap(), w);
1702    /// # }
1703    /// # }));
1704    /// # }
1705    /// ```
1706    pub fn scan_async_blocking<A, U, I, F, Fut>(
1707        self,
1708        init: impl IntoQuotedMut<'a, I, L>,
1709        f: impl IntoQuotedMut<'a, F, L>,
1710    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1711    where
1712        O: IsOrdered,
1713        R: IsExactlyOnce,
1714        I: Fn() -> A + 'a,
1715        F: Fn(&mut A, T) -> Fut + 'a,
1716        Fut: Future<Output = Option<U>> + 'a,
1717    {
1718        let init = init.splice_fn0_ctx(&self.location).into();
1719        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1720
1721        Stream::new(
1722            self.location.clone(),
1723            HydroNode::ScanAsyncBlocking {
1724                init,
1725                acc: f,
1726                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1727                metadata: self.location.new_node_metadata(
1728                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1729                ),
1730            },
1731        )
1732    }
1733
1734    /// Iteratively processes the elements of the stream using a state machine that can yield
1735    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1736    /// syntax in Rust, without requiring special syntax.
1737    ///
1738    /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1739    /// state. The second argument defines the processing logic, taking in a mutable reference
1740    /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1741    /// variants define what is emitted and whether further inputs should be processed.
1742    ///
1743    /// # Example
1744    /// ```rust
1745    /// # #[cfg(feature = "deploy")] {
1746    /// # use hydro_lang::prelude::*;
1747    /// # use futures::StreamExt;
1748    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1749    /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1750    ///     q!(|| 0),
1751    ///     q!(|acc, x| {
1752    ///         *acc += x;
1753    ///         if *acc > 100 {
1754    ///             hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1755    ///         } else if *acc % 2 == 0 {
1756    ///             hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1757    ///         } else {
1758    ///             hydro_lang::live_collections::keyed_stream::Generate::Continue
1759    ///         }
1760    ///     }),
1761    /// )
1762    /// # }, |mut stream| async move {
1763    /// // Output: "even", "done!"
1764    /// # let mut results = Vec::new();
1765    /// # for _ in 0..2 {
1766    /// #     results.push(stream.next().await.unwrap());
1767    /// # }
1768    /// # results.sort();
1769    /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1770    /// # }));
1771    /// # }
1772    /// ```
1773    pub fn generator<A, U, I, F>(
1774        self,
1775        init: impl IntoQuotedMut<'a, I, L> + Copy,
1776        f: impl IntoQuotedMut<'a, F, L> + Copy,
1777    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1778    where
1779        O: IsOrdered,
1780        R: IsExactlyOnce,
1781        I: Fn() -> A + 'a,
1782        F: Fn(&mut A, T) -> Generate<U> + 'a,
1783    {
1784        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1785        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1786
1787        let this = self.make_totally_ordered().make_exactly_once();
1788
1789        // State is Option<Option<A>>:
1790        //   None = not yet initialized
1791        //   Some(Some(a)) = active with state a
1792        //   Some(None) = terminated
1793        let scan_init = q!(|| None)
1794            .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1795            .into();
1796        let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1797            if state.is_none() {
1798                *state = Some(Some(init()));
1799            }
1800            match state {
1801                Some(Some(state_value)) => match f(state_value, v) {
1802                    Generate::Yield(out) => Some(Some(out)),
1803                    Generate::Return(out) => {
1804                        *state = Some(None);
1805                        Some(Some(out))
1806                    }
1807                    // Unlike KeyedStream, we can terminate the scan directly on
1808                    // Break/Return because there is only one state (no other keys
1809                    // that still need processing).
1810                    Generate::Break => None,
1811                    Generate::Continue => Some(None),
1812                },
1813                // State is Some(None) after Return; terminate the scan.
1814                _ => None,
1815            }
1816        })
1817        .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1818        .into();
1819
1820        let scan_node = HydroNode::Scan {
1821            init: scan_init,
1822            acc: scan_f,
1823            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1824            metadata: this.location.new_node_metadata(Stream::<
1825                Option<U>,
1826                L,
1827                B,
1828                TotalOrder,
1829                ExactlyOnce,
1830            >::collection_kind()),
1831        };
1832
1833        let flatten_f = q!(|d| d)
1834            .splice_fn1_ctx::<Option<U>, _>(&this.location)
1835            .into();
1836        let flatten_node = HydroNode::FlatMap {
1837            f: flatten_f,
1838            input: Box::new(scan_node),
1839            metadata: this
1840                .location
1841                .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1842        };
1843
1844        Stream::new(this.location.clone(), flatten_node)
1845    }
1846
1847    /// Given a time interval, returns a stream corresponding to samples taken from the
1848    /// stream roughly at that interval. The output will have elements in the same order
1849    /// as the input, but with arbitrary elements skipped between samples. There is also
1850    /// no guarantee on the exact timing of the samples.
1851    ///
1852    /// # Non-Determinism
1853    /// The output stream is non-deterministic in which elements are sampled, since this
1854    /// is controlled by a clock.
1855    pub fn sample_every(
1856        self,
1857        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1858        nondet: NonDet,
1859    ) -> Stream<T, L::NoConsistency, Unbounded, O, AtLeastOnce>
1860    where
1861        L: NoAtomic,
1862    {
1863        let samples = self.location.source_interval(interval, nondet);
1864
1865        let tick = self.location.tick();
1866        self.batch(&tick, nondet)
1867            .filter_if(samples.batch(&tick, nondet).first().is_some())
1868            .all_ticks()
1869            .weaken_retries()
1870    }
1871
1872    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
1873    /// stream has not emitted a value since that duration.
1874    ///
1875    /// # Non-Determinism
1876    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1877    /// samples take place, timeouts may be non-deterministically generated or missed,
1878    /// and the notification of the timeout may be delayed as well. There is also no
1879    /// guarantee on how long the [`Optional`] will have a value after the timeout is
1880    /// detected based on when the next sample is taken.
1881    pub fn timeout(
1882        self,
1883        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L::NoConsistency>> + Copy + 'a,
1884        nondet: NonDet,
1885    ) -> Optional<(), L::NoConsistency, Unbounded>
1886    where
1887        L: NoAtomic,
1888    {
1889        let tick = self.location.tick();
1890
1891        let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1892            q!(|| None),
1893            q!(
1894                |latest, _| {
1895                    *latest = Some(Instant::now());
1896                },
1897                commutative = manual_proof!(/** TODO */)
1898            ),
1899        );
1900
1901        latest_received
1902            .snapshot(&tick, nondet)
1903            .filter_map(q!(move |latest_received| {
1904                if let Some(latest_received) = latest_received {
1905                    if Instant::now().duration_since(latest_received) > duration {
1906                        Some(())
1907                    } else {
1908                        None
1909                    }
1910                } else {
1911                    Some(())
1912                }
1913            }))
1914            .latest()
1915    }
1916
1917    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1918    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1919    ///
1920    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1921    /// processed before an acknowledgement is emitted.
1922    pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
1923        let id = self.location.flow_state().borrow_mut().next_clock_id();
1924        let out_location = Atomic {
1925            tick: Tick {
1926                id,
1927                l: self.location.clone(),
1928            },
1929        };
1930        Stream::new(
1931            out_location.clone(),
1932            HydroNode::BeginAtomic {
1933                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1934                metadata: out_location
1935                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1936            },
1937        )
1938    }
1939
1940    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1941    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1942    /// the order of the input. The output stream will execute in the [`Tick`] that was
1943    /// used to create the atomic section.
1944    ///
1945    /// # Non-Determinism
1946    /// The batch boundaries are non-deterministic and may change across executions.
1947    pub fn batch<L2: Location<'a, NoConsistency = L::NoConsistency>>(
1948        self,
1949        tick: &Tick<L2>,
1950        _nondet: NonDet,
1951    ) -> Stream<T, Tick<L::NoConsistency>, Bounded, O, R> {
1952        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1953        Stream::new(
1954            tick.drop_consistency(),
1955            HydroNode::Batch {
1956                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1957                metadata: tick
1958                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1959            },
1960        )
1961    }
1962
1963    /// An operator which allows you to "name" a `HydroNode`.
1964    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1965    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1966        {
1967            let mut node = self.ir_node.borrow_mut();
1968            let metadata = node.metadata_mut();
1969            metadata.tag = Some(name.to_owned());
1970        }
1971        self
1972    }
1973
1974    /// Turns this [`Stream`] into a [`Optional`], under the invariant assumption that there is at
1975    /// most one element. If this invariant is broken, the program may exhibit undefined behavior,
1976    /// so uses must be carefully vetted.
1977    pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
1978    where
1979        B: IsBounded,
1980    {
1981        Optional::new(
1982            self.location.clone(),
1983            HydroNode::Cast {
1984                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1985                metadata: self
1986                    .location
1987                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1988            },
1989        )
1990    }
1991
1992    pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
1993        if O::ORDERING_KIND == O2::ORDERING_KIND {
1994            Stream::new(
1995                self.location.clone(),
1996                self.ir_node.replace(HydroNode::Placeholder),
1997            )
1998        } else {
1999            panic!(
2000                "Runtime ordering {:?} did not match requested cast {:?}.",
2001                O::ORDERING_KIND,
2002                O2::ORDERING_KIND
2003            )
2004        }
2005    }
2006
2007    /// Explicitly "casts" the stream to a type with a different ordering
2008    /// guarantee. Useful in unsafe code where the ordering cannot be proven
2009    /// by the type-system.
2010    ///
2011    /// # Non-Determinism
2012    /// This function is used as an escape hatch, and any mistakes in the
2013    /// provided ordering guarantee will propagate into the guarantees
2014    /// for the rest of the program.
2015    pub fn assume_ordering<O2: Ordering>(
2016        self,
2017        _nondet: NonDet,
2018    ) -> Stream<T, L::NoConsistency, B, O2, R> {
2019        if O::ORDERING_KIND == O2::ORDERING_KIND {
2020            self.use_ordering_type().weaken_consistency()
2021        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2022            // We can always weaken the ordering guarantee
2023            let target_location = self.location().drop_consistency();
2024            Stream::new(
2025                target_location.clone(),
2026                HydroNode::Cast {
2027                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2028                    metadata: target_location
2029                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2030                },
2031            )
2032        } else {
2033            let target_location = self.location().drop_consistency();
2034            Stream::new(
2035                target_location.clone(),
2036                HydroNode::ObserveNonDet {
2037                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2038                    trusted: false,
2039                    metadata: target_location
2040                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2041                },
2042            )
2043        }
2044    }
2045
2046    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
2047    // intermediate states will not be revealed
2048    fn assume_ordering_trusted_bounded<O2: Ordering>(
2049        self,
2050        nondet: NonDet,
2051    ) -> Stream<T, L, B, O2, R> {
2052        if B::BOUNDED {
2053            self.assume_ordering_trusted(nondet)
2054        } else {
2055            let self_location = self.location.clone();
2056            let inner: Stream<T, L::NoConsistency, B, O2, R> = self.assume_ordering(nondet);
2057            Stream::new(self_location, inner.ir_node.replace(HydroNode::Placeholder))
2058        }
2059    }
2060
2061    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2062    // is not observable
2063    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
2064        self,
2065        _nondet: NonDet,
2066    ) -> Stream<T, L, B, O2, R> {
2067        if O::ORDERING_KIND == O2::ORDERING_KIND {
2068            self.use_ordering_type()
2069        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2070            // We can always weaken the ordering guarantee
2071            Stream::new(
2072                self.location.clone(),
2073                HydroNode::Cast {
2074                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2075                    metadata: self
2076                        .location
2077                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2078                },
2079            )
2080        } else {
2081            Stream::new(
2082                self.location.clone(),
2083                HydroNode::ObserveNonDet {
2084                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2085                    trusted: true,
2086                    metadata: self
2087                        .location
2088                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2089                },
2090            )
2091        }
2092    }
2093
2094    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2095    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
2096    /// which is always safe because that is the weakest possible guarantee.
2097    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2098        self.weaken_ordering::<NoOrder>()
2099    }
2100
2101    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
2102    /// enforcing that `O2` is weaker than the input ordering guarantee.
2103    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2104        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
2105        self.assume_ordering_trusted::<O2>(nondet)
2106    }
2107
2108    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
2109    /// implies that `O == TotalOrder`.
2110    pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2111    where
2112        O: IsOrdered,
2113    {
2114        self.assume_ordering_trusted(nondet!(/** no-op */))
2115    }
2116
2117    /// Explicitly "casts" the stream to a type with a different retries
2118    /// guarantee. Useful in unsafe code where the lack of retries cannot
2119    /// be proven by the type-system.
2120    ///
2121    /// # Non-Determinism
2122    /// This function is used as an escape hatch, and any mistakes in the
2123    /// provided retries guarantee will propagate into the guarantees
2124    /// for the rest of the program.
2125    pub fn assume_retries<R2: Retries>(
2126        self,
2127        _nondet: NonDet,
2128    ) -> Stream<T, L::NoConsistency, B, O, R2> {
2129        if R::RETRIES_KIND == R2::RETRIES_KIND {
2130            Stream::new(
2131                self.location.drop_consistency(),
2132                self.ir_node.replace(HydroNode::Placeholder),
2133            )
2134        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2135            // We can always weaken the retries guarantee
2136            let target_location = self.location.drop_consistency();
2137            Stream::new(
2138                target_location.clone(),
2139                HydroNode::Cast {
2140                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2141                    metadata: target_location
2142                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2143                },
2144            )
2145        } else {
2146            let target_location = self.location.drop_consistency();
2147            Stream::new(
2148                target_location.clone(),
2149                HydroNode::ObserveNonDet {
2150                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2151                    trusted: false,
2152                    metadata: target_location
2153                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2154                },
2155            )
2156        }
2157    }
2158
2159    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2160    // is not observable
2161    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2162        if R::RETRIES_KIND == R2::RETRIES_KIND {
2163            Stream::new(
2164                self.location.clone(),
2165                self.ir_node.replace(HydroNode::Placeholder),
2166            )
2167        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2168            // We can always weaken the retries guarantee
2169            Stream::new(
2170                self.location.clone(),
2171                HydroNode::Cast {
2172                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2173                    metadata: self
2174                        .location
2175                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2176                },
2177            )
2178        } else {
2179            Stream::new(
2180                self.location.clone(),
2181                HydroNode::ObserveNonDet {
2182                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2183                    trusted: true,
2184                    metadata: self
2185                        .location
2186                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2187                },
2188            )
2189        }
2190    }
2191
2192    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2193    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
2194    /// which is always safe because that is the weakest possible guarantee.
2195    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2196        self.weaken_retries::<AtLeastOnce>()
2197    }
2198
2199    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
2200    /// enforcing that `R2` is weaker than the input retries guarantee.
2201    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2202        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
2203        self.assume_retries_trusted::<R2>(nondet)
2204    }
2205
2206    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
2207    /// implies that `R == ExactlyOnce`.
2208    pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2209    where
2210        R: IsExactlyOnce,
2211    {
2212        self.assume_retries_trusted(nondet!(/** no-op */))
2213    }
2214
2215    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
2216    /// implies that `B == Bounded`.
2217    pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2218    where
2219        B: IsBounded,
2220    {
2221        self.weaken_boundedness()
2222    }
2223
2224    /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
2225    /// which implies that `B == Bounded`.
2226    pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2227        if B::BOUNDED == B2::BOUNDED {
2228            Stream::new(
2229                self.location.clone(),
2230                self.ir_node.replace(HydroNode::Placeholder),
2231            )
2232        } else {
2233            // We can always weaken the boundedness
2234            Stream::new(
2235                self.location.clone(),
2236                HydroNode::Cast {
2237                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2238                    metadata: self
2239                        .location
2240                        .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2241                },
2242            )
2243        }
2244    }
2245}
2246
2247impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2248where
2249    L: Location<'a>,
2250{
2251    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2252    ///
2253    /// # Example
2254    /// ```rust
2255    /// # #[cfg(feature = "deploy")] {
2256    /// # use hydro_lang::prelude::*;
2257    /// # use futures::StreamExt;
2258    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2259    /// process.source_iter(q!(&[1, 2, 3])).cloned()
2260    /// # }, |mut stream| async move {
2261    /// // 1, 2, 3
2262    /// # for w in vec![1, 2, 3] {
2263    /// #     assert_eq!(stream.next().await.unwrap(), w);
2264    /// # }
2265    /// # }));
2266    /// # }
2267    /// ```
2268    pub fn cloned(self) -> Stream<T, L, B, O, R>
2269    where
2270        T: Clone,
2271    {
2272        self.map(q!(|d| d.clone()))
2273    }
2274}
2275
2276impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2277where
2278    L: Location<'a>,
2279{
2280    /// Computes the number of elements in the stream as a [`Singleton`].
2281    ///
2282    /// # Example
2283    /// ```rust
2284    /// # #[cfg(feature = "deploy")] {
2285    /// # use hydro_lang::prelude::*;
2286    /// # use futures::StreamExt;
2287    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2288    /// let tick = process.tick();
2289    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2290    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2291    /// batch.count().all_ticks()
2292    /// # }, |mut stream| async move {
2293    /// // 4
2294    /// # assert_eq!(stream.next().await.unwrap(), 4);
2295    /// # }));
2296    /// # }
2297    /// ```
2298    pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2299        self.assume_ordering_trusted::<TotalOrder>(nondet!(
2300            /// Order does not affect eventual count, and also does not affect intermediate states.
2301        ))
2302        .fold(
2303            q!(|| 0usize),
2304            q!(
2305                |count, _| *count += 1,
2306                monotone = manual_proof!(/** += 1 is monotone */)
2307            ),
2308        )
2309    }
2310}
2311
2312impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2313    /// Produces a new stream that merges the elements of the two input streams.
2314    /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2315    ///
2316    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2317    /// [`Bounded`], you can use [`Stream::chain`] instead.
2318    ///
2319    /// # Example
2320    /// ```rust
2321    /// # #[cfg(feature = "deploy")] {
2322    /// # use hydro_lang::prelude::*;
2323    /// # use futures::StreamExt;
2324    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2325    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2326    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2327    /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2328    /// # }, |mut stream| async move {
2329    /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2330    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2331    /// #     assert_eq!(stream.next().await.unwrap(), w);
2332    /// # }
2333    /// # }));
2334    /// # }
2335    /// ```
2336    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2337        self,
2338        other: Stream<T, L, Unbounded, O2, R2>,
2339    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2340    where
2341        R: MinRetries<R2>,
2342    {
2343        Stream::new(
2344            self.location.clone(),
2345            HydroNode::Chain {
2346                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2347                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2348                metadata: self.location.new_node_metadata(Stream::<
2349                    T,
2350                    L,
2351                    Unbounded,
2352                    NoOrder,
2353                    <R as MinRetries<R2>>::Min,
2354                >::collection_kind()),
2355            },
2356        )
2357    }
2358
2359    /// Deprecated: use [`Stream::merge_unordered`] instead.
2360    #[deprecated(note = "use `merge_unordered` instead")]
2361    pub fn interleave<O2: Ordering, R2: Retries>(
2362        self,
2363        other: Stream<T, L, Unbounded, O2, R2>,
2364    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2365    where
2366        R: MinRetries<R2>,
2367    {
2368        self.merge_unordered(other)
2369    }
2370}
2371
2372impl<'a, T, L: Location<'a>, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
2373    /// Produces a new stream that combines the elements of the two input streams,
2374    /// preserving the relative order of elements within each input.
2375    ///
2376    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2377    /// [`Bounded`], you can use [`Stream::chain`] instead.
2378    ///
2379    /// # Non-Determinism
2380    /// The order in which elements *across* the two streams will be interleaved is
2381    /// non-deterministic, so the order of elements will vary across runs. If the output order
2382    /// is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic but emits an
2383    /// unordered stream.
2384    ///
2385    /// # Example
2386    /// ```rust
2387    /// # #[cfg(feature = "deploy")] {
2388    /// # use hydro_lang::prelude::*;
2389    /// # use futures::StreamExt;
2390    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2391    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2392    /// # process.source_iter(q!(vec![1, 3])).into();
2393    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2394    /// # }, |mut stream| async move {
2395    /// // 1, 3 and 2, 4 in some order, preserving the original local order
2396    /// # for w in vec![1, 3, 2, 4] {
2397    /// #     assert_eq!(stream.next().await.unwrap(), w);
2398    /// # }
2399    /// # }));
2400    /// # }
2401    /// ```
2402    pub fn merge_ordered<R2: Retries>(
2403        self,
2404        other: Stream<T, L, Unbounded, TotalOrder, R2>,
2405        nondet: NonDet,
2406    ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
2407    where
2408        R: MinRetries<R2>,
2409    {
2410        let self_location = self.location.clone();
2411        let inner = super::sliced::sliced! {
2412            let self_batch = use(self, nondet);
2413            let other_batch = use(other, nondet);
2414            self_batch.chain(other_batch)
2415        };
2416
2417        Stream::new(self_location, inner.ir_node.replace(HydroNode::Placeholder))
2418    }
2419}
2420
2421impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2422where
2423    L: Location<'a>,
2424{
2425    /// Produces a new stream that emits the input elements in sorted order.
2426    ///
2427    /// The input stream can have any ordering guarantee, but the output stream
2428    /// will have a [`TotalOrder`] guarantee. This operator will block until all
2429    /// elements in the input stream are available, so it requires the input stream
2430    /// to be [`Bounded`].
2431    ///
2432    /// # Example
2433    /// ```rust
2434    /// # #[cfg(feature = "deploy")] {
2435    /// # use hydro_lang::prelude::*;
2436    /// # use futures::StreamExt;
2437    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2438    /// let tick = process.tick();
2439    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2440    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2441    /// batch.sort().all_ticks()
2442    /// # }, |mut stream| async move {
2443    /// // 1, 2, 3, 4
2444    /// # for w in (1..5) {
2445    /// #     assert_eq!(stream.next().await.unwrap(), w);
2446    /// # }
2447    /// # }));
2448    /// # }
2449    /// ```
2450    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2451    where
2452        B: IsBounded,
2453        T: Ord,
2454    {
2455        let this = self.make_bounded();
2456        Stream::new(
2457            this.location.clone(),
2458            HydroNode::Sort {
2459                input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2460                metadata: this
2461                    .location
2462                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2463            },
2464        )
2465    }
2466
2467    /// Produces a new stream that first emits the elements of the `self` stream,
2468    /// and then emits the elements of the `other` stream. The output stream has
2469    /// a [`TotalOrder`] guarantee if and only if both input streams have a
2470    /// [`TotalOrder`] guarantee.
2471    ///
2472    /// Currently, both input streams must be [`Bounded`]. This operator will block
2473    /// on the first stream until all its elements are available. In a future version,
2474    /// we will relax the requirement on the `other` stream.
2475    ///
2476    /// # Example
2477    /// ```rust
2478    /// # #[cfg(feature = "deploy")] {
2479    /// # use hydro_lang::prelude::*;
2480    /// # use futures::StreamExt;
2481    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2482    /// let tick = process.tick();
2483    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2484    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2485    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2486    /// # }, |mut stream| async move {
2487    /// // 2, 3, 4, 5, 1, 2, 3, 4
2488    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2489    /// #     assert_eq!(stream.next().await.unwrap(), w);
2490    /// # }
2491    /// # }));
2492    /// # }
2493    /// ```
2494    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2495        self,
2496        other: Stream<T, L, B2, O2, R2>,
2497    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2498    where
2499        B: IsBounded,
2500        O: MinOrder<O2>,
2501        R: MinRetries<R2>,
2502    {
2503        check_matching_location(&self.location, &other.location);
2504
2505        Stream::new(
2506            self.location.clone(),
2507            HydroNode::Chain {
2508                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2509                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2510                metadata: self.location.new_node_metadata(Stream::<
2511                    T,
2512                    L,
2513                    B2,
2514                    <O as MinOrder<O2>>::Min,
2515                    <R as MinRetries<R2>>::Min,
2516                >::collection_kind()),
2517            },
2518        )
2519    }
2520
2521    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2522    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2523    /// because this is compiled into a nested loop.
2524    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
2525        self,
2526        other: Stream<T2, L, Bounded, O2, R>,
2527    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
2528    where
2529        B: IsBounded,
2530        T: Clone,
2531        T2: Clone,
2532    {
2533        let this = self.make_bounded();
2534        check_matching_location(&this.location, &other.location);
2535
2536        Stream::new(
2537            this.location.clone(),
2538            HydroNode::CrossProduct {
2539                left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2540                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2541                metadata: this.location.new_node_metadata(Stream::<
2542                    (T, T2),
2543                    L,
2544                    Bounded,
2545                    <O2 as MinOrder<O>>::Min,
2546                    R,
2547                >::collection_kind()),
2548            },
2549        )
2550    }
2551
2552    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2553    /// `self` used as the values for *each* key.
2554    ///
2555    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2556    /// values. For example, it can be used to send the same set of elements to several cluster
2557    /// members, if the membership information is available as a [`KeyedSingleton`].
2558    ///
2559    /// # Example
2560    /// ```rust
2561    /// # #[cfg(feature = "deploy")] {
2562    /// # use hydro_lang::prelude::*;
2563    /// # use futures::StreamExt;
2564    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2565    /// # let tick = process.tick();
2566    /// let keyed_singleton = // { 1: (), 2: () }
2567    /// # process
2568    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
2569    /// #     .into_keyed()
2570    /// #     .batch(&tick, nondet!(/** test */))
2571    /// #     .first();
2572    /// let stream = // [ "a", "b" ]
2573    /// # process
2574    /// #     .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2575    /// #     .batch(&tick, nondet!(/** test */));
2576    /// stream.repeat_with_keys(keyed_singleton)
2577    /// # .entries().all_ticks()
2578    /// # }, |mut stream| async move {
2579    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2580    /// # let mut results = Vec::new();
2581    /// # for _ in 0..4 {
2582    /// #     results.push(stream.next().await.unwrap());
2583    /// # }
2584    /// # results.sort();
2585    /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2586    /// # }));
2587    /// # }
2588    /// ```
2589    pub fn repeat_with_keys<K, V2>(
2590        self,
2591        keys: KeyedSingleton<K, V2, L, Bounded>,
2592    ) -> KeyedStream<K, T, L, Bounded, O, R>
2593    where
2594        B: IsBounded,
2595        K: Clone,
2596        T: Clone,
2597    {
2598        keys.keys()
2599            .weaken_retries()
2600            .assume_ordering_trusted::<TotalOrder>(
2601                nondet!(/** keyed stream does not depend on ordering of keys */),
2602            )
2603            .cross_product_nested_loop(self.make_bounded())
2604            .into_keyed()
2605    }
2606
2607    /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2608    /// execution until all results are available. The output order is based on when futures
2609    /// complete, and may be different than the input order.
2610    ///
2611    /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2612    /// while futures are pending, this variant blocks until the futures resolve.
2613    ///
2614    /// # Example
2615    /// ```rust
2616    /// # #[cfg(feature = "deploy")] {
2617    /// # use std::collections::HashSet;
2618    /// # use futures::StreamExt;
2619    /// # use hydro_lang::prelude::*;
2620    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2621    /// process
2622    ///     .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2623    ///     .map(q!(|x| async move {
2624    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2625    ///         x
2626    ///     }))
2627    ///     .resolve_futures_blocking()
2628    /// #   },
2629    /// #   |mut stream| async move {
2630    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2631    /// #       let mut output = HashSet::new();
2632    /// #       for _ in 1..10 {
2633    /// #           output.insert(stream.next().await.unwrap());
2634    /// #       }
2635    /// #       assert_eq!(
2636    /// #           output,
2637    /// #           HashSet::<i32>::from_iter(1..10)
2638    /// #       );
2639    /// #   },
2640    /// # ));
2641    /// # }
2642    /// ```
2643    pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2644    where
2645        T: Future,
2646    {
2647        Stream::new(
2648            self.location.clone(),
2649            HydroNode::ResolveFuturesBlocking {
2650                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2651                metadata: self
2652                    .location
2653                    .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2654            },
2655        )
2656    }
2657
2658    /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2659    ///
2660    /// # Example
2661    /// ```rust
2662    /// # #[cfg(feature = "deploy")] {
2663    /// # use hydro_lang::prelude::*;
2664    /// # use futures::StreamExt;
2665    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2666    /// let tick = process.tick();
2667    /// let empty: Stream<i32, _, Bounded> = process
2668    ///   .source_iter(q!(Vec::<i32>::new()))
2669    ///   .batch(&tick, nondet!(/** test */));
2670    /// empty.is_empty().all_ticks()
2671    /// # }, |mut stream| async move {
2672    /// // true
2673    /// # assert_eq!(stream.next().await.unwrap(), true);
2674    /// # }));
2675    /// # }
2676    /// ```
2677    #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2678    pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2679    where
2680        B: IsBounded,
2681    {
2682        self.make_bounded()
2683            .assume_ordering_trusted::<TotalOrder>(
2684                nondet!(/** is_empty intermediates unaffected by order */),
2685            )
2686            .first()
2687            .is_none()
2688    }
2689}
2690
2691impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2692where
2693    L: Location<'a>,
2694{
2695    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2696    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2697    /// by equi-joining the two streams on the key attribute `K`.
2698    ///
2699    /// When the right-hand side is [`Bounded`], the join accumulates the right side first
2700    /// and streams the left side through, preserving the left side's ordering. When both
2701    /// sides are [`Unbounded`], a symmetric hash join is used and ordering is [`NoOrder`].
2702    ///
2703    /// # Example
2704    /// ```rust
2705    /// # #[cfg(feature = "deploy")] {
2706    /// # use hydro_lang::prelude::*;
2707    /// # use std::collections::HashSet;
2708    /// # use futures::StreamExt;
2709    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2710    /// let tick = process.tick();
2711    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2712    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2713    /// stream1.join(stream2)
2714    /// # }, |mut stream| async move {
2715    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2716    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2717    /// # stream.map(|i| assert!(expected.contains(&i)));
2718    /// # }));
2719    /// # }
2720    pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2721        self,
2722        n: Stream<(K, V2), L, B2, O2, R2>,
2723    ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2724    where
2725        K: Eq + Hash + Clone,
2726        R: MinRetries<R2>,
2727        V1: Clone,
2728        V2: Clone,
2729    {
2730        check_matching_location(&self.location, &n.location);
2731
2732        let ir_node = if B2::BOUNDED {
2733            HydroNode::JoinHalf {
2734                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2735                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2736                metadata: self.location.new_node_metadata(Stream::<
2737                    (K, (V1, V2)),
2738                    L,
2739                    B,
2740                    B2::PreserveOrderIfBounded<O>,
2741                    <R as MinRetries<R2>>::Min,
2742                >::collection_kind()),
2743            }
2744        } else {
2745            HydroNode::Join {
2746                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2747                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2748                metadata: self.location.new_node_metadata(Stream::<
2749                    (K, (V1, V2)),
2750                    L,
2751                    B,
2752                    B2::PreserveOrderIfBounded<O>,
2753                    <R as MinRetries<R2>>::Min,
2754                >::collection_kind()),
2755            }
2756        };
2757
2758        Stream::new(self.location.clone(), ir_node)
2759    }
2760
2761    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2762    /// computes the anti-join of the items in the input -- i.e. returns
2763    /// unique items in the first input that do not have a matching key
2764    /// in the second input.
2765    ///
2766    /// # Example
2767    /// ```rust
2768    /// # #[cfg(feature = "deploy")] {
2769    /// # use hydro_lang::prelude::*;
2770    /// # use futures::StreamExt;
2771    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2772    /// let tick = process.tick();
2773    /// let stream = process
2774    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2775    ///   .batch(&tick, nondet!(/** test */));
2776    /// let batch = process
2777    ///   .source_iter(q!(vec![1, 2]))
2778    ///   .batch(&tick, nondet!(/** test */));
2779    /// stream.anti_join(batch).all_ticks()
2780    /// # }, |mut stream| async move {
2781    /// # for w in vec![(3, 'c'), (4, 'd')] {
2782    /// #     assert_eq!(stream.next().await.unwrap(), w);
2783    /// # }
2784    /// # }));
2785    /// # }
2786    pub fn anti_join<O2: Ordering, R2: Retries>(
2787        self,
2788        n: Stream<K, L, Bounded, O2, R2>,
2789    ) -> Stream<(K, V1), L, B, O, R>
2790    where
2791        K: Eq + Hash,
2792    {
2793        check_matching_location(&self.location, &n.location);
2794
2795        Stream::new(
2796            self.location.clone(),
2797            HydroNode::AntiJoin {
2798                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2799                neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2800                metadata: self
2801                    .location
2802                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2803            },
2804        )
2805    }
2806}
2807
2808impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2809    Stream<(K, V), L, B, O, R>
2810{
2811    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2812    /// is used as the key and the second element is added to the entries associated with that key.
2813    ///
2814    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2815    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2816    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2817    /// total ordering _within_ each group but no ordering _across_ groups.
2818    ///
2819    /// # Example
2820    /// ```rust
2821    /// # #[cfg(feature = "deploy")] {
2822    /// # use hydro_lang::prelude::*;
2823    /// # use futures::StreamExt;
2824    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2825    /// process
2826    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2827    ///     .into_keyed()
2828    /// #   .entries()
2829    /// # }, |mut stream| async move {
2830    /// // { 1: [2, 3], 2: [4] }
2831    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2832    /// #     assert_eq!(stream.next().await.unwrap(), w);
2833    /// # }
2834    /// # }));
2835    /// # }
2836    /// ```
2837    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2838        KeyedStream::new(
2839            self.location.clone(),
2840            HydroNode::Cast {
2841                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2842                metadata: self
2843                    .location
2844                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2845            },
2846        )
2847    }
2848}
2849
2850impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2851where
2852    K: Eq + Hash,
2853    L: Location<'a>,
2854{
2855    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2856    /// # Example
2857    /// ```rust
2858    /// # #[cfg(feature = "deploy")] {
2859    /// # use hydro_lang::prelude::*;
2860    /// # use futures::StreamExt;
2861    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2862    /// let tick = process.tick();
2863    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2864    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2865    /// batch.keys().all_ticks()
2866    /// # }, |mut stream| async move {
2867    /// // 1, 2
2868    /// # assert_eq!(stream.next().await.unwrap(), 1);
2869    /// # assert_eq!(stream.next().await.unwrap(), 2);
2870    /// # }));
2871    /// # }
2872    /// ```
2873    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2874        self.into_keyed()
2875            .fold(
2876                q!(|| ()),
2877                q!(
2878                    |_, _| {},
2879                    commutative = manual_proof!(/** values are ignored */),
2880                    idempotent = manual_proof!(/** values are ignored */)
2881                ),
2882            )
2883            .keys()
2884    }
2885}
2886
2887impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2888where
2889    L: Location<'a>,
2890{
2891    /// Returns a stream corresponding to the latest batch of elements being atomically
2892    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2893    /// the order of the input.
2894    ///
2895    /// # Non-Determinism
2896    /// The batch boundaries are non-deterministic and may change across executions.
2897    pub fn batch_atomic<L2: Location<'a, NoConsistency = L::NoConsistency>>(
2898        self,
2899        tick: &Tick<L2>,
2900        _nondet: NonDet,
2901    ) -> Stream<T, Tick<L::NoConsistency>, Bounded, O, R> {
2902        Stream::new(
2903            tick.drop_consistency(),
2904            HydroNode::Batch {
2905                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2906                metadata: tick
2907                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2908            },
2909        )
2910    }
2911
2912    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2913    /// See [`Stream::atomic`] for more details.
2914    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2915        Stream::new(
2916            self.location.tick.l.clone(),
2917            HydroNode::EndAtomic {
2918                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2919                metadata: self
2920                    .location
2921                    .tick
2922                    .l
2923                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2924            },
2925        )
2926    }
2927}
2928
2929impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2930where
2931    L: Location<'a> + NoAtomic,
2932    F: Future<Output = T>,
2933{
2934    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2935    /// Future outputs are produced as available, regardless of input arrival order.
2936    ///
2937    /// # Example
2938    /// ```rust
2939    /// # #[cfg(feature = "deploy")] {
2940    /// # use std::collections::HashSet;
2941    /// # use futures::StreamExt;
2942    /// # use hydro_lang::prelude::*;
2943    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2944    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2945    ///     .map(q!(|x| async move {
2946    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2947    ///         x
2948    ///     }))
2949    ///     .resolve_futures()
2950    /// #   },
2951    /// #   |mut stream| async move {
2952    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2953    /// #       let mut output = HashSet::new();
2954    /// #       for _ in 1..10 {
2955    /// #           output.insert(stream.next().await.unwrap());
2956    /// #       }
2957    /// #       assert_eq!(
2958    /// #           output,
2959    /// #           HashSet::<i32>::from_iter(1..10)
2960    /// #       );
2961    /// #   },
2962    /// # ));
2963    /// # }
2964    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2965        Stream::new(
2966            self.location.clone(),
2967            HydroNode::ResolveFutures {
2968                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2969                metadata: self
2970                    .location
2971                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2972            },
2973        )
2974    }
2975
2976    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2977    /// Future outputs are produced in the same order as the input stream.
2978    ///
2979    /// # Example
2980    /// ```rust
2981    /// # #[cfg(feature = "deploy")] {
2982    /// # use std::collections::HashSet;
2983    /// # use futures::StreamExt;
2984    /// # use hydro_lang::prelude::*;
2985    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2986    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2987    ///     .map(q!(|x| async move {
2988    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2989    ///         x
2990    ///     }))
2991    ///     .resolve_futures_ordered()
2992    /// #   },
2993    /// #   |mut stream| async move {
2994    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2995    /// #       let mut output = Vec::new();
2996    /// #       for _ in 1..10 {
2997    /// #           output.push(stream.next().await.unwrap());
2998    /// #       }
2999    /// #       assert_eq!(
3000    /// #           output,
3001    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
3002    /// #       );
3003    /// #   },
3004    /// # ));
3005    /// # }
3006    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
3007        Stream::new(
3008            self.location.clone(),
3009            HydroNode::ResolveFuturesOrdered {
3010                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3011                metadata: self
3012                    .location
3013                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3014            },
3015        )
3016    }
3017}
3018
3019impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
3020where
3021    L: Location<'a>,
3022{
3023    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
3024    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3025    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
3026        Stream::new(
3027            self.location.outer().clone(),
3028            HydroNode::YieldConcat {
3029                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3030                metadata: self
3031                    .location
3032                    .outer()
3033                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3034            },
3035        )
3036    }
3037
3038    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
3039    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3040    ///
3041    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
3042    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
3043    /// stream's [`Tick`] context.
3044    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
3045        let out_location = Atomic {
3046            tick: self.location.clone(),
3047        };
3048
3049        Stream::new(
3050            out_location.clone(),
3051            HydroNode::YieldConcat {
3052                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3053                metadata: out_location
3054                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
3055            },
3056        )
3057    }
3058
3059    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
3060    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
3061    /// input.
3062    ///
3063    /// This API is particularly useful for stateful computation on batches of data, such as
3064    /// maintaining an accumulated state that is up to date with the current batch.
3065    ///
3066    /// # Example
3067    /// ```rust
3068    /// # #[cfg(feature = "deploy")] {
3069    /// # use hydro_lang::prelude::*;
3070    /// # use futures::StreamExt;
3071    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3072    /// let tick = process.tick();
3073    /// # // ticks are lazy by default, forces the second tick to run
3074    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3075    /// # let batch_first_tick = process
3076    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
3077    /// #  .batch(&tick, nondet!(/** test */));
3078    /// # let batch_second_tick = process
3079    /// #   .source_iter(q!(vec![5, 6, 7]))
3080    /// #   .batch(&tick, nondet!(/** test */))
3081    /// #   .defer_tick(); // appears on the second tick
3082    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
3083    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
3084    ///
3085    /// input.batch(&tick, nondet!(/** test */))
3086    ///     .across_ticks(|s| s.count()).all_ticks()
3087    /// # }, |mut stream| async move {
3088    /// // [4, 7]
3089    /// assert_eq!(stream.next().await.unwrap(), 4);
3090    /// assert_eq!(stream.next().await.unwrap(), 7);
3091    /// # }));
3092    /// # }
3093    /// ```
3094    pub fn across_ticks<Out: BatchAtomic<'a>>(
3095        self,
3096        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3097    ) -> Out::Batched {
3098        thunk(self.all_ticks_atomic()).batched_atomic()
3099    }
3100
3101    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
3102    /// always has the elements of `self` at tick `T - 1`.
3103    ///
3104    /// At tick `0`, the output stream is empty, since there is no previous tick.
3105    ///
3106    /// This operator enables stateful iterative processing with ticks, by sending data from one
3107    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
3108    ///
3109    /// # Example
3110    /// ```rust
3111    /// # #[cfg(feature = "deploy")] {
3112    /// # use hydro_lang::prelude::*;
3113    /// # use futures::StreamExt;
3114    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3115    /// let tick = process.tick();
3116    /// // ticks are lazy by default, forces the second tick to run
3117    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3118    ///
3119    /// let batch_first_tick = process
3120    ///   .source_iter(q!(vec![1, 2, 3, 4]))
3121    ///   .batch(&tick, nondet!(/** test */));
3122    /// let batch_second_tick = process
3123    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
3124    ///   .batch(&tick, nondet!(/** test */))
3125    ///   .defer_tick(); // appears on the second tick
3126    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
3127    ///
3128    /// changes_across_ticks.clone().filter_not_in(
3129    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
3130    /// ).all_ticks()
3131    /// # }, |mut stream| async move {
3132    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
3133    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
3134    /// #     assert_eq!(stream.next().await.unwrap(), w);
3135    /// # }
3136    /// # }));
3137    /// # }
3138    /// ```
3139    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3140        Stream::new(
3141            self.location.clone(),
3142            HydroNode::DeferTick {
3143                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3144                metadata: self
3145                    .location
3146                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3147            },
3148        )
3149    }
3150}
3151
3152#[cfg(test)]
3153mod tests {
3154    #[cfg(feature = "deploy")]
3155    use futures::{SinkExt, StreamExt};
3156    #[cfg(feature = "deploy")]
3157    use hydro_deploy::Deployment;
3158    #[cfg(feature = "deploy")]
3159    use serde::{Deserialize, Serialize};
3160    #[cfg(any(feature = "deploy", feature = "sim"))]
3161    use stageleft::q;
3162
3163    #[cfg(any(feature = "deploy", feature = "sim"))]
3164    use crate::compile::builder::FlowBuilder;
3165    #[cfg(feature = "deploy")]
3166    use crate::live_collections::sliced::sliced;
3167    #[cfg(feature = "deploy")]
3168    use crate::live_collections::stream::ExactlyOnce;
3169    #[cfg(feature = "sim")]
3170    use crate::live_collections::stream::NoOrder;
3171    #[cfg(any(feature = "deploy", feature = "sim"))]
3172    use crate::live_collections::stream::TotalOrder;
3173    #[cfg(any(feature = "deploy", feature = "sim"))]
3174    use crate::location::Location;
3175    #[cfg(feature = "sim")]
3176    use crate::networking::TCP;
3177    #[cfg(any(feature = "deploy", feature = "sim"))]
3178    use crate::nondet::nondet;
3179
3180    mod backtrace_chained_ops;
3181
3182    #[cfg(feature = "deploy")]
3183    struct P1 {}
3184    #[cfg(feature = "deploy")]
3185    struct P2 {}
3186
3187    #[cfg(feature = "deploy")]
3188    #[derive(Serialize, Deserialize, Debug)]
3189    struct SendOverNetwork {
3190        n: u32,
3191    }
3192
3193    #[cfg(feature = "deploy")]
3194    #[tokio::test]
3195    async fn first_ten_distributed() {
3196        use crate::networking::TCP;
3197
3198        let mut deployment = Deployment::new();
3199
3200        let mut flow = FlowBuilder::new();
3201        let first_node = flow.process::<P1>();
3202        let second_node = flow.process::<P2>();
3203        let external = flow.external::<P2>();
3204
3205        let numbers = first_node.source_iter(q!(0..10));
3206        let out_port = numbers
3207            .map(q!(|n| SendOverNetwork { n }))
3208            .send(&second_node, TCP.fail_stop().bincode())
3209            .send_bincode_external(&external);
3210
3211        let nodes = flow
3212            .with_process(&first_node, deployment.Localhost())
3213            .with_process(&second_node, deployment.Localhost())
3214            .with_external(&external, deployment.Localhost())
3215            .deploy(&mut deployment);
3216
3217        deployment.deploy().await.unwrap();
3218
3219        let mut external_out = nodes.connect(out_port).await;
3220
3221        deployment.start().await.unwrap();
3222
3223        for i in 0..10 {
3224            assert_eq!(external_out.next().await.unwrap().n, i);
3225        }
3226    }
3227
3228    #[cfg(feature = "deploy")]
3229    #[tokio::test]
3230    async fn first_cardinality() {
3231        let mut deployment = Deployment::new();
3232
3233        let mut flow = FlowBuilder::new();
3234        let node = flow.process::<()>();
3235        let external = flow.external::<()>();
3236
3237        let node_tick = node.tick();
3238        let count = node_tick
3239            .singleton(q!([1, 2, 3]))
3240            .into_stream()
3241            .flatten_ordered()
3242            .first()
3243            .into_stream()
3244            .count()
3245            .all_ticks()
3246            .send_bincode_external(&external);
3247
3248        let nodes = flow
3249            .with_process(&node, deployment.Localhost())
3250            .with_external(&external, deployment.Localhost())
3251            .deploy(&mut deployment);
3252
3253        deployment.deploy().await.unwrap();
3254
3255        let mut external_out = nodes.connect(count).await;
3256
3257        deployment.start().await.unwrap();
3258
3259        assert_eq!(external_out.next().await.unwrap(), 1);
3260    }
3261
3262    #[cfg(feature = "deploy")]
3263    #[tokio::test]
3264    async fn unbounded_reduce_remembers_state() {
3265        let mut deployment = Deployment::new();
3266
3267        let mut flow = FlowBuilder::new();
3268        let node = flow.process::<()>();
3269        let external = flow.external::<()>();
3270
3271        let (input_port, input) = node.source_external_bincode(&external);
3272        let out = input
3273            .reduce(q!(|acc, v| *acc += v))
3274            .sample_eager(nondet!(/** test */))
3275            .send_bincode_external(&external);
3276
3277        let nodes = flow
3278            .with_process(&node, deployment.Localhost())
3279            .with_external(&external, deployment.Localhost())
3280            .deploy(&mut deployment);
3281
3282        deployment.deploy().await.unwrap();
3283
3284        let mut external_in = nodes.connect(input_port).await;
3285        let mut external_out = nodes.connect(out).await;
3286
3287        deployment.start().await.unwrap();
3288
3289        external_in.send(1).await.unwrap();
3290        assert_eq!(external_out.next().await.unwrap(), 1);
3291
3292        external_in.send(2).await.unwrap();
3293        assert_eq!(external_out.next().await.unwrap(), 3);
3294    }
3295
3296    #[cfg(feature = "deploy")]
3297    #[tokio::test]
3298    async fn top_level_bounded_cross_singleton() {
3299        let mut deployment = Deployment::new();
3300
3301        let mut flow = FlowBuilder::new();
3302        let node = flow.process::<()>();
3303        let external = flow.external::<()>();
3304
3305        let (input_port, input) =
3306            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3307
3308        let out = input
3309            .cross_singleton(
3310                node.source_iter(q!(vec![1, 2, 3]))
3311                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3312            )
3313            .send_bincode_external(&external);
3314
3315        let nodes = flow
3316            .with_process(&node, deployment.Localhost())
3317            .with_external(&external, deployment.Localhost())
3318            .deploy(&mut deployment);
3319
3320        deployment.deploy().await.unwrap();
3321
3322        let mut external_in = nodes.connect(input_port).await;
3323        let mut external_out = nodes.connect(out).await;
3324
3325        deployment.start().await.unwrap();
3326
3327        external_in.send(1).await.unwrap();
3328        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3329
3330        external_in.send(2).await.unwrap();
3331        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3332    }
3333
3334    #[cfg(feature = "deploy")]
3335    #[tokio::test]
3336    async fn top_level_bounded_reduce_cardinality() {
3337        let mut deployment = Deployment::new();
3338
3339        let mut flow = FlowBuilder::new();
3340        let node = flow.process::<()>();
3341        let external = flow.external::<()>();
3342
3343        let (input_port, input) =
3344            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3345
3346        let out = sliced! {
3347            let input = use(input, nondet!(/** test */));
3348            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3349            input.cross_singleton(v.into_stream().count())
3350        }
3351        .send_bincode_external(&external);
3352
3353        let nodes = flow
3354            .with_process(&node, deployment.Localhost())
3355            .with_external(&external, deployment.Localhost())
3356            .deploy(&mut deployment);
3357
3358        deployment.deploy().await.unwrap();
3359
3360        let mut external_in = nodes.connect(input_port).await;
3361        let mut external_out = nodes.connect(out).await;
3362
3363        deployment.start().await.unwrap();
3364
3365        external_in.send(1).await.unwrap();
3366        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3367
3368        external_in.send(2).await.unwrap();
3369        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3370    }
3371
3372    #[cfg(feature = "deploy")]
3373    #[tokio::test]
3374    async fn top_level_bounded_into_singleton_cardinality() {
3375        let mut deployment = Deployment::new();
3376
3377        let mut flow = FlowBuilder::new();
3378        let node = flow.process::<()>();
3379        let external = flow.external::<()>();
3380
3381        let (input_port, input) =
3382            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3383
3384        let out = sliced! {
3385            let input = use(input, nondet!(/** test */));
3386            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3387            input.cross_singleton(v.into_stream().count())
3388        }
3389        .send_bincode_external(&external);
3390
3391        let nodes = flow
3392            .with_process(&node, deployment.Localhost())
3393            .with_external(&external, deployment.Localhost())
3394            .deploy(&mut deployment);
3395
3396        deployment.deploy().await.unwrap();
3397
3398        let mut external_in = nodes.connect(input_port).await;
3399        let mut external_out = nodes.connect(out).await;
3400
3401        deployment.start().await.unwrap();
3402
3403        external_in.send(1).await.unwrap();
3404        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3405
3406        external_in.send(2).await.unwrap();
3407        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3408    }
3409
3410    #[cfg(feature = "deploy")]
3411    #[tokio::test]
3412    async fn atomic_fold_replays_each_tick() {
3413        let mut deployment = Deployment::new();
3414
3415        let mut flow = FlowBuilder::new();
3416        let node = flow.process::<()>();
3417        let external = flow.external::<()>();
3418
3419        let (input_port, input) =
3420            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3421        let tick = node.tick();
3422
3423        let out = input
3424            .batch(&tick, nondet!(/** test */))
3425            .cross_singleton(
3426                node.source_iter(q!(vec![1, 2, 3]))
3427                    .atomic()
3428                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
3429                    .snapshot_atomic(&tick, nondet!(/** test */)),
3430            )
3431            .all_ticks()
3432            .send_bincode_external(&external);
3433
3434        let nodes = flow
3435            .with_process(&node, deployment.Localhost())
3436            .with_external(&external, deployment.Localhost())
3437            .deploy(&mut deployment);
3438
3439        deployment.deploy().await.unwrap();
3440
3441        let mut external_in = nodes.connect(input_port).await;
3442        let mut external_out = nodes.connect(out).await;
3443
3444        deployment.start().await.unwrap();
3445
3446        external_in.send(1).await.unwrap();
3447        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3448
3449        external_in.send(2).await.unwrap();
3450        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3451    }
3452
3453    #[cfg(feature = "deploy")]
3454    #[tokio::test]
3455    async fn unbounded_scan_remembers_state() {
3456        let mut deployment = Deployment::new();
3457
3458        let mut flow = FlowBuilder::new();
3459        let node = flow.process::<()>();
3460        let external = flow.external::<()>();
3461
3462        let (input_port, input) = node.source_external_bincode(&external);
3463        let out = input
3464            .scan(
3465                q!(|| 0),
3466                q!(|acc, v| {
3467                    *acc += v;
3468                    Some(*acc)
3469                }),
3470            )
3471            .send_bincode_external(&external);
3472
3473        let nodes = flow
3474            .with_process(&node, deployment.Localhost())
3475            .with_external(&external, deployment.Localhost())
3476            .deploy(&mut deployment);
3477
3478        deployment.deploy().await.unwrap();
3479
3480        let mut external_in = nodes.connect(input_port).await;
3481        let mut external_out = nodes.connect(out).await;
3482
3483        deployment.start().await.unwrap();
3484
3485        external_in.send(1).await.unwrap();
3486        assert_eq!(external_out.next().await.unwrap(), 1);
3487
3488        external_in.send(2).await.unwrap();
3489        assert_eq!(external_out.next().await.unwrap(), 3);
3490    }
3491
3492    #[cfg(feature = "deploy")]
3493    #[tokio::test]
3494    async fn unbounded_enumerate_remembers_state() {
3495        let mut deployment = Deployment::new();
3496
3497        let mut flow = FlowBuilder::new();
3498        let node = flow.process::<()>();
3499        let external = flow.external::<()>();
3500
3501        let (input_port, input) = node.source_external_bincode(&external);
3502        let out = input.enumerate().send_bincode_external(&external);
3503
3504        let nodes = flow
3505            .with_process(&node, deployment.Localhost())
3506            .with_external(&external, deployment.Localhost())
3507            .deploy(&mut deployment);
3508
3509        deployment.deploy().await.unwrap();
3510
3511        let mut external_in = nodes.connect(input_port).await;
3512        let mut external_out = nodes.connect(out).await;
3513
3514        deployment.start().await.unwrap();
3515
3516        external_in.send(1).await.unwrap();
3517        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3518
3519        external_in.send(2).await.unwrap();
3520        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3521    }
3522
3523    #[cfg(feature = "deploy")]
3524    #[tokio::test]
3525    async fn unbounded_unique_remembers_state() {
3526        let mut deployment = Deployment::new();
3527
3528        let mut flow = FlowBuilder::new();
3529        let node = flow.process::<()>();
3530        let external = flow.external::<()>();
3531
3532        let (input_port, input) =
3533            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3534        let out = input.unique().send_bincode_external(&external);
3535
3536        let nodes = flow
3537            .with_process(&node, deployment.Localhost())
3538            .with_external(&external, deployment.Localhost())
3539            .deploy(&mut deployment);
3540
3541        deployment.deploy().await.unwrap();
3542
3543        let mut external_in = nodes.connect(input_port).await;
3544        let mut external_out = nodes.connect(out).await;
3545
3546        deployment.start().await.unwrap();
3547
3548        external_in.send(1).await.unwrap();
3549        assert_eq!(external_out.next().await.unwrap(), 1);
3550
3551        external_in.send(2).await.unwrap();
3552        assert_eq!(external_out.next().await.unwrap(), 2);
3553
3554        external_in.send(1).await.unwrap();
3555        external_in.send(3).await.unwrap();
3556        assert_eq!(external_out.next().await.unwrap(), 3);
3557    }
3558
3559    #[cfg(feature = "sim")]
3560    #[test]
3561    #[should_panic]
3562    fn sim_batch_nondet_size() {
3563        let mut flow = FlowBuilder::new();
3564        let node = flow.process::<()>();
3565
3566        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3567
3568        let tick = node.tick();
3569        let out_recv = input
3570            .batch(&tick, nondet!(/** test */))
3571            .count()
3572            .all_ticks()
3573            .sim_output();
3574
3575        flow.sim().exhaustive(async || {
3576            in_send.send(());
3577            in_send.send(());
3578            in_send.send(());
3579
3580            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3581        });
3582    }
3583
3584    #[cfg(feature = "sim")]
3585    #[test]
3586    fn sim_batch_preserves_order() {
3587        let mut flow = FlowBuilder::new();
3588        let node = flow.process::<()>();
3589
3590        let (in_send, input) = node.sim_input();
3591
3592        let tick = node.tick();
3593        let out_recv = input
3594            .batch(&tick, nondet!(/** test */))
3595            .all_ticks()
3596            .sim_output();
3597
3598        flow.sim().exhaustive(async || {
3599            in_send.send(1);
3600            in_send.send(2);
3601            in_send.send(3);
3602
3603            out_recv.assert_yields_only([1, 2, 3]).await;
3604        });
3605    }
3606
3607    #[cfg(feature = "sim")]
3608    #[test]
3609    #[should_panic]
3610    fn sim_batch_unordered_shuffles() {
3611        let mut flow = FlowBuilder::new();
3612        let node = flow.process::<()>();
3613
3614        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3615
3616        let tick = node.tick();
3617        let batch = input.batch(&tick, nondet!(/** test */));
3618        let out_recv = batch
3619            .clone()
3620            .min()
3621            .zip(batch.max())
3622            .all_ticks()
3623            .sim_output();
3624
3625        flow.sim().exhaustive(async || {
3626            in_send.send_many_unordered([1, 2, 3]);
3627
3628            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3629                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3630            }
3631        });
3632    }
3633
3634    #[cfg(feature = "sim")]
3635    #[test]
3636    fn sim_batch_unordered_shuffles_count() {
3637        let mut flow = FlowBuilder::new();
3638        let node = flow.process::<()>();
3639
3640        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3641
3642        let tick = node.tick();
3643        let batch = input.batch(&tick, nondet!(/** test */));
3644        let out_recv = batch.all_ticks().sim_output();
3645
3646        let instance_count = flow.sim().exhaustive(async || {
3647            in_send.send_many_unordered([1, 2, 3, 4]);
3648            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3649        });
3650
3651        assert_eq!(
3652            instance_count,
3653            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3654        )
3655    }
3656
3657    #[cfg(feature = "sim")]
3658    #[test]
3659    #[should_panic]
3660    fn sim_observe_order_batched() {
3661        let mut flow = FlowBuilder::new();
3662        let node = flow.process::<()>();
3663
3664        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3665
3666        let tick = node.tick();
3667        let batch = input.batch(&tick, nondet!(/** test */));
3668        let out_recv = batch
3669            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3670            .all_ticks()
3671            .sim_output();
3672
3673        flow.sim().exhaustive(async || {
3674            in_send.send_many_unordered([1, 2, 3, 4]);
3675            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3676        });
3677    }
3678
3679    #[cfg(feature = "sim")]
3680    #[test]
3681    fn sim_observe_order_batched_count() {
3682        let mut flow = FlowBuilder::new();
3683        let node = flow.process::<()>();
3684
3685        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3686
3687        let tick = node.tick();
3688        let batch = input.batch(&tick, nondet!(/** test */));
3689        let out_recv = batch
3690            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3691            .all_ticks()
3692            .sim_output();
3693
3694        let instance_count = flow.sim().exhaustive(async || {
3695            in_send.send_many_unordered([1, 2, 3, 4]);
3696            let _ = out_recv.collect::<Vec<_>>().await;
3697        });
3698
3699        assert_eq!(
3700            instance_count,
3701            192 // 4! * 2^{4 - 1}
3702        )
3703    }
3704
3705    #[cfg(feature = "sim")]
3706    #[test]
3707    fn sim_unordered_count_instance_count() {
3708        let mut flow = FlowBuilder::new();
3709        let node = flow.process::<()>();
3710
3711        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3712
3713        let tick = node.tick();
3714        let out_recv = input
3715            .count()
3716            .snapshot(&tick, nondet!(/** test */))
3717            .all_ticks()
3718            .sim_output();
3719
3720        let instance_count = flow.sim().exhaustive(async || {
3721            in_send.send_many_unordered([1, 2, 3, 4]);
3722            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3723        });
3724
3725        assert_eq!(
3726            instance_count,
3727            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3728        )
3729    }
3730
3731    #[cfg(feature = "sim")]
3732    #[test]
3733    fn sim_top_level_assume_ordering() {
3734        let mut flow = FlowBuilder::new();
3735        let node = flow.process::<()>();
3736
3737        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3738
3739        let out_recv = input
3740            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3741            .sim_output();
3742
3743        let instance_count = flow.sim().exhaustive(async || {
3744            in_send.send_many_unordered([1, 2, 3]);
3745            let mut out = out_recv.collect::<Vec<_>>().await;
3746            out.sort();
3747            assert_eq!(out, vec![1, 2, 3]);
3748        });
3749
3750        assert_eq!(instance_count, 6)
3751    }
3752
3753    #[cfg(feature = "sim")]
3754    #[test]
3755    fn sim_top_level_assume_ordering_cycle_back() {
3756        let mut flow = FlowBuilder::new();
3757        let node = flow.process::<()>();
3758        let node2 = flow.process::<()>();
3759
3760        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3761
3762        let (complete_cycle_back, cycle_back) =
3763            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3764        let ordered = input
3765            .merge_unordered(cycle_back)
3766            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3767        complete_cycle_back.complete(
3768            ordered
3769                .clone()
3770                .map(q!(|v| v + 1))
3771                .filter(q!(|v| v % 2 == 1))
3772                .send(&node2, TCP.fail_stop().bincode())
3773                .send(&node, TCP.fail_stop().bincode()),
3774        );
3775
3776        let out_recv = ordered.sim_output();
3777
3778        let mut saw = false;
3779        let instance_count = flow.sim().exhaustive(async || {
3780            in_send.send_many_unordered([0, 2]);
3781            let out = out_recv.collect::<Vec<_>>().await;
3782
3783            if out.starts_with(&[0, 1, 2]) {
3784                saw = true;
3785            }
3786        });
3787
3788        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3789        assert_eq!(instance_count, 6);
3790    }
3791
3792    #[cfg(feature = "sim")]
3793    #[test]
3794    fn sim_top_level_assume_ordering_cycle_back_tick() {
3795        let mut flow = FlowBuilder::new();
3796        let node = flow.process::<()>();
3797        let node2 = flow.process::<()>();
3798
3799        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3800
3801        let (complete_cycle_back, cycle_back) =
3802            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3803        let ordered = input
3804            .merge_unordered(cycle_back)
3805            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3806        complete_cycle_back.complete(
3807            ordered
3808                .clone()
3809                .batch(&node.tick(), nondet!(/** test */))
3810                .all_ticks()
3811                .map(q!(|v| v + 1))
3812                .filter(q!(|v| v % 2 == 1))
3813                .send(&node2, TCP.fail_stop().bincode())
3814                .send(&node, TCP.fail_stop().bincode()),
3815        );
3816
3817        let out_recv = ordered.sim_output();
3818
3819        let mut saw = false;
3820        let instance_count = flow.sim().exhaustive(async || {
3821            in_send.send_many_unordered([0, 2]);
3822            let out = out_recv.collect::<Vec<_>>().await;
3823
3824            if out.starts_with(&[0, 1, 2]) {
3825                saw = true;
3826            }
3827        });
3828
3829        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3830        assert_eq!(instance_count, 58);
3831    }
3832
3833    #[cfg(feature = "sim")]
3834    #[test]
3835    fn sim_top_level_assume_ordering_multiple() {
3836        let mut flow = FlowBuilder::new();
3837        let node = flow.process::<()>();
3838        let node2 = flow.process::<()>();
3839
3840        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3841        let (_, input2) = node.sim_input::<_, NoOrder, _>();
3842
3843        let (complete_cycle_back, cycle_back) =
3844            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3845        let input1_ordered = input
3846            .clone()
3847            .merge_unordered(cycle_back)
3848            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3849        let foo = input1_ordered
3850            .clone()
3851            .map(q!(|v| v + 3))
3852            .weaken_ordering::<NoOrder>()
3853            .merge_unordered(input2)
3854            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3855
3856        complete_cycle_back.complete(
3857            foo.filter(q!(|v| *v == 3))
3858                .send(&node2, TCP.fail_stop().bincode())
3859                .send(&node, TCP.fail_stop().bincode()),
3860        );
3861
3862        let out_recv = input1_ordered.sim_output();
3863
3864        let mut saw = false;
3865        let instance_count = flow.sim().exhaustive(async || {
3866            in_send.send_many_unordered([0, 1]);
3867            let out = out_recv.collect::<Vec<_>>().await;
3868
3869            if out.starts_with(&[0, 3, 1]) {
3870                saw = true;
3871            }
3872        });
3873
3874        assert!(saw, "did not see an instance with 0, 3, 1 in order");
3875        assert_eq!(instance_count, 24);
3876    }
3877
3878    #[cfg(feature = "sim")]
3879    #[test]
3880    fn sim_atomic_assume_ordering_cycle_back() {
3881        let mut flow = FlowBuilder::new();
3882        let node = flow.process::<()>();
3883        let node2 = flow.process::<()>();
3884
3885        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3886
3887        let (complete_cycle_back, cycle_back) =
3888            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3889        let ordered = input
3890            .merge_unordered(cycle_back)
3891            .atomic()
3892            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3893            .end_atomic();
3894        complete_cycle_back.complete(
3895            ordered
3896                .clone()
3897                .map(q!(|v| v + 1))
3898                .filter(q!(|v| v % 2 == 1))
3899                .send(&node2, TCP.fail_stop().bincode())
3900                .send(&node, TCP.fail_stop().bincode()),
3901        );
3902
3903        let out_recv = ordered.sim_output();
3904
3905        let instance_count = flow.sim().exhaustive(async || {
3906            in_send.send_many_unordered([0, 2]);
3907            let out = out_recv.collect::<Vec<_>>().await;
3908            assert_eq!(out.len(), 4);
3909        });
3910        assert_eq!(instance_count, 22);
3911    }
3912
3913    #[cfg(feature = "deploy")]
3914    #[tokio::test]
3915    async fn partition_evens_odds() {
3916        let mut deployment = Deployment::new();
3917
3918        let mut flow = FlowBuilder::new();
3919        let node = flow.process::<()>();
3920        let external = flow.external::<()>();
3921
3922        let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3923        let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3924        let evens_port = evens.send_bincode_external(&external);
3925        let odds_port = odds.send_bincode_external(&external);
3926
3927        let nodes = flow
3928            .with_process(&node, deployment.Localhost())
3929            .with_external(&external, deployment.Localhost())
3930            .deploy(&mut deployment);
3931
3932        deployment.deploy().await.unwrap();
3933
3934        let mut evens_out = nodes.connect(evens_port).await;
3935        let mut odds_out = nodes.connect(odds_port).await;
3936
3937        deployment.start().await.unwrap();
3938
3939        let mut even_results = Vec::new();
3940        for _ in 0..3 {
3941            even_results.push(evens_out.next().await.unwrap());
3942        }
3943        even_results.sort();
3944        assert_eq!(even_results, vec![2, 4, 6]);
3945
3946        let mut odd_results = Vec::new();
3947        for _ in 0..3 {
3948            odd_results.push(odds_out.next().await.unwrap());
3949        }
3950        odd_results.sort();
3951        assert_eq!(odd_results, vec![1, 3, 5]);
3952    }
3953
3954    #[cfg(feature = "deploy")]
3955    #[tokio::test]
3956    async fn unconsumed_inspect_still_runs() {
3957        use crate::deploy::DeployCrateWrapper;
3958
3959        let mut deployment = Deployment::new();
3960
3961        let mut flow = FlowBuilder::new();
3962        let node = flow.process::<()>();
3963
3964        // The return value of .inspect() is intentionally dropped.
3965        // Before the Null-root fix, this would silently do nothing.
3966        node.source_iter(q!(0..5))
3967            .inspect(q!(|x| println!("inspect: {}", x)));
3968
3969        let nodes = flow
3970            .with_process(&node, deployment.Localhost())
3971            .deploy(&mut deployment);
3972
3973        deployment.deploy().await.unwrap();
3974
3975        let mut stdout = nodes.get_process(&node).stdout();
3976
3977        deployment.start().await.unwrap();
3978
3979        let mut lines = Vec::new();
3980        for _ in 0..5 {
3981            lines.push(stdout.recv().await.unwrap());
3982        }
3983        lines.sort();
3984        assert_eq!(
3985            lines,
3986            vec![
3987                "inspect: 0",
3988                "inspect: 1",
3989                "inspect: 2",
3990                "inspect: 3",
3991                "inspect: 4",
3992            ]
3993        );
3994    }
3995
3996    #[cfg(feature = "sim")]
3997    #[test]
3998    fn sim_limit() {
3999        let mut flow = FlowBuilder::new();
4000        let node = flow.process::<()>();
4001
4002        let (in_send, input) = node.sim_input();
4003
4004        let out_recv = input.limit(q!(3)).sim_output();
4005
4006        flow.sim().exhaustive(async || {
4007            in_send.send(1);
4008            in_send.send(2);
4009            in_send.send(3);
4010            in_send.send(4);
4011            in_send.send(5);
4012
4013            out_recv.assert_yields_only([1, 2, 3]).await;
4014        });
4015    }
4016
4017    #[cfg(feature = "sim")]
4018    #[test]
4019    fn sim_limit_zero() {
4020        let mut flow = FlowBuilder::new();
4021        let node = flow.process::<()>();
4022
4023        let (in_send, input) = node.sim_input();
4024
4025        let out_recv = input.limit(q!(0)).sim_output();
4026
4027        flow.sim().exhaustive(async || {
4028            in_send.send(1);
4029            in_send.send(2);
4030
4031            out_recv.assert_yields_only::<i32, _>([]).await;
4032        });
4033    }
4034
4035    #[cfg(feature = "sim")]
4036    #[test]
4037    fn sim_merge_ordered() {
4038        let mut flow = FlowBuilder::new();
4039        let node = flow.process::<()>();
4040
4041        let (in_send, input) = node.sim_input();
4042        let (in_send2, input2) = node.sim_input();
4043
4044        let out_recv = input
4045            .merge_ordered(input2, nondet!(/** test */))
4046            .sim_output();
4047
4048        let mut saw_out_of_order = false;
4049        let instances = flow.sim().exhaustive(async || {
4050            in_send.send(1);
4051            in_send.send(2);
4052            in_send2.send(3);
4053            in_send2.send(4);
4054
4055            let mut out = out_recv.collect::<Vec<_>>().await;
4056
4057            if out == [1, 3, 2, 4] {
4058                saw_out_of_order = true;
4059            }
4060
4061            out.sort();
4062            assert_eq!(out, vec![1, 2, 3, 4]);
4063        });
4064
4065        assert!(saw_out_of_order);
4066        assert_eq!(instances, 26);
4067    }
4068
4069    #[cfg(feature = "deploy")]
4070    #[tokio::test]
4071    async fn monotone_fold_threshold() {
4072        use crate::properties::manual_proof;
4073
4074        let mut deployment = Deployment::new();
4075
4076        let mut flow = FlowBuilder::new();
4077        let node = flow.process::<()>();
4078        let external = flow.external::<()>();
4079
4080        let in_unbounded: super::Stream<_, _> =
4081            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4082        let sum = in_unbounded.fold(
4083            q!(|| 0),
4084            q!(
4085                |sum, v| {
4086                    *sum += v;
4087                },
4088                monotone = manual_proof!(/** test */)
4089            ),
4090        );
4091
4092        let threshold_out = sum
4093            .threshold_greater_or_equal(node.singleton(q!(7)))
4094            .send_bincode_external(&external);
4095
4096        let nodes = flow
4097            .with_process(&node, deployment.Localhost())
4098            .with_external(&external, deployment.Localhost())
4099            .deploy(&mut deployment);
4100
4101        deployment.deploy().await.unwrap();
4102
4103        let mut threshold_out = nodes.connect(threshold_out).await;
4104
4105        deployment.start().await.unwrap();
4106
4107        assert_eq!(threshold_out.next().await.unwrap(), 7);
4108    }
4109
4110    #[cfg(feature = "deploy")]
4111    #[tokio::test]
4112    async fn monotone_count_threshold() {
4113        let mut deployment = Deployment::new();
4114
4115        let mut flow = FlowBuilder::new();
4116        let node = flow.process::<()>();
4117        let external = flow.external::<()>();
4118
4119        let in_unbounded: super::Stream<_, _> =
4120            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4121        let sum = in_unbounded.count();
4122
4123        let threshold_out = sum
4124            .threshold_greater_or_equal(node.singleton(q!(3)))
4125            .send_bincode_external(&external);
4126
4127        let nodes = flow
4128            .with_process(&node, deployment.Localhost())
4129            .with_external(&external, deployment.Localhost())
4130            .deploy(&mut deployment);
4131
4132        deployment.deploy().await.unwrap();
4133
4134        let mut threshold_out = nodes.connect(threshold_out).await;
4135
4136        deployment.start().await.unwrap();
4137
4138        assert_eq!(threshold_out.next().await.unwrap(), 3);
4139    }
4140
4141    #[cfg(feature = "deploy")]
4142    #[tokio::test]
4143    async fn monotone_map_order_preserving_threshold() {
4144        use crate::properties::manual_proof;
4145
4146        let mut deployment = Deployment::new();
4147
4148        let mut flow = FlowBuilder::new();
4149        let node = flow.process::<()>();
4150        let external = flow.external::<()>();
4151
4152        let in_unbounded: super::Stream<_, _> =
4153            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4154        let sum = in_unbounded.fold(
4155            q!(|| 0),
4156            q!(
4157                |sum, v| {
4158                    *sum += v;
4159                },
4160                monotone = manual_proof!(/** test */)
4161            ),
4162        );
4163
4164        // map with order_preserving should preserve monotonicity
4165        let doubled = sum.map(q!(
4166            |v| v * 2,
4167            order_preserving = manual_proof!(/** doubling preserves order */)
4168        ));
4169
4170        let threshold_out = doubled
4171            .threshold_greater_or_equal(node.singleton(q!(14)))
4172            .send_bincode_external(&external);
4173
4174        let nodes = flow
4175            .with_process(&node, deployment.Localhost())
4176            .with_external(&external, deployment.Localhost())
4177            .deploy(&mut deployment);
4178
4179        deployment.deploy().await.unwrap();
4180
4181        let mut threshold_out = nodes.connect(threshold_out).await;
4182
4183        deployment.start().await.unwrap();
4184
4185        assert_eq!(threshold_out.next().await.unwrap(), 14);
4186    }
4187
4188    // === Compile-time type tests for join/cross_product ordering ===
4189
4190    #[cfg(any(feature = "deploy", feature = "sim"))]
4191    mod join_ordering_type_tests {
4192        use crate::live_collections::boundedness::{Bounded, Unbounded};
4193        use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4194        use crate::location::{Location, Process};
4195
4196        #[expect(dead_code, reason = "compile-time type test")]
4197        fn join_unbounded_with_bounded_preserves_order<'a>(
4198            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4199            right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4200        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4201            left.join(right)
4202        }
4203
4204        #[expect(dead_code, reason = "compile-time type test")]
4205        fn join_unbounded_with_unbounded_is_no_order<'a>(
4206            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4207            right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4208        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4209            left.join(right)
4210        }
4211
4212        #[expect(dead_code, reason = "compile-time type test")]
4213        fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4214            left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4215            right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4216        ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4217            left.join(right)
4218        }
4219
4220        #[expect(dead_code, reason = "compile-time type test")]
4221        fn join_unbounded_noorder_with_bounded<'a>(
4222            left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4223            right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4224        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4225            left.join(right)
4226        }
4227
4228        // === Compile-time type tests for cross_product ordering ===
4229
4230        #[expect(dead_code, reason = "compile-time type test")]
4231        fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4232            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4233            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4234        ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4235            left.cross_product(right)
4236        }
4237
4238        #[expect(dead_code, reason = "compile-time type test")]
4239        fn cross_product_bounded_with_bounded_preserves_order<'a>(
4240            left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4241            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4242        ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4243            left.cross_product(right)
4244        }
4245
4246        #[expect(dead_code, reason = "compile-time type test")]
4247        fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4248            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4249            right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4250        ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4251            left.cross_product(right)
4252        }
4253    } // mod join_ordering_type_tests
4254
4255    // === Runtime correctness tests for bounded join/cross_product ===
4256
4257    #[cfg(feature = "sim")]
4258    #[test]
4259    fn cross_product_mixed_boundedness_correctness() {
4260        use stageleft::q;
4261
4262        use crate::compile::builder::FlowBuilder;
4263        use crate::nondet::nondet;
4264
4265        let mut flow = FlowBuilder::new();
4266        let process = flow.process::<()>();
4267        let tick = process.tick();
4268
4269        let left = process.source_iter(q!(vec![1, 2]));
4270        let right = process
4271            .source_iter(q!(vec!['a', 'b']))
4272            .batch(&tick, nondet!(/** test */))
4273            .all_ticks();
4274
4275        let out = left.cross_product(right).sim_output();
4276
4277        flow.sim().exhaustive(async || {
4278            out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4279                .await;
4280        });
4281    }
4282
4283    #[cfg(feature = "sim")]
4284    #[test]
4285    fn join_mixed_boundedness_correctness() {
4286        use stageleft::q;
4287
4288        use crate::compile::builder::FlowBuilder;
4289        use crate::nondet::nondet;
4290
4291        let mut flow = FlowBuilder::new();
4292        let process = flow.process::<()>();
4293        let tick = process.tick();
4294
4295        let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4296        let right = process
4297            .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4298            .batch(&tick, nondet!(/** test */))
4299            .all_ticks();
4300
4301        let out = left.join(right).sim_output();
4302
4303        flow.sim().exhaustive(async || {
4304            out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4305                .await;
4306        });
4307    }
4308}