hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick, NoAtomic};
30use crate::location::{Location, Tick, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35 AggFuncAlgebra, ApplyMonotoneStream, ValidCommutativityFor, ValidIdempotenceFor,
36};
37
38pub mod networking;
39
40/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
41#[sealed::sealed]
42pub trait Ordering:
43 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
44{
45 /// The [`StreamOrder`] corresponding to this type.
46 const ORDERING_KIND: StreamOrder;
47}
48
49/// Marks the stream as being totally ordered, which means that there are
50/// no sources of non-determinism (other than intentional ones) that will
51/// affect the order of elements.
52pub enum TotalOrder {}
53
54#[sealed::sealed]
55impl Ordering for TotalOrder {
56 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
57}
58
59/// Marks the stream as having no order, which means that the order of
60/// elements may be affected by non-determinism.
61///
62/// This restricts certain operators, such as `fold` and `reduce`, to only
63/// be used with commutative aggregation functions.
64pub enum NoOrder {}
65
66#[sealed::sealed]
67impl Ordering for NoOrder {
68 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
69}
70
71/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
72/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
73/// have `Self` guarantees instead.
74#[sealed::sealed]
75pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
76#[sealed::sealed]
77impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
78
79/// Helper trait for determining the weakest of two orderings.
80#[sealed::sealed]
81pub trait MinOrder<Other: ?Sized> {
82 /// The weaker of the two orderings.
83 type Min: Ordering;
84}
85
86#[sealed::sealed]
87impl<O: Ordering> MinOrder<O> for TotalOrder {
88 type Min = O;
89}
90
91#[sealed::sealed]
92impl<O: Ordering> MinOrder<O> for NoOrder {
93 type Min = NoOrder;
94}
95
96/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
97#[sealed::sealed]
98pub trait Retries:
99 MinRetries<Self, Min = Self>
100 + MinRetries<ExactlyOnce, Min = Self>
101 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
102{
103 /// The [`StreamRetry`] corresponding to this type.
104 const RETRIES_KIND: StreamRetry;
105}
106
107/// Marks the stream as having deterministic message cardinality, with no
108/// possibility of duplicates.
109pub enum ExactlyOnce {}
110
111#[sealed::sealed]
112impl Retries for ExactlyOnce {
113 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
114}
115
116/// Marks the stream as having non-deterministic message cardinality, which
117/// means that duplicates may occur, but messages will not be dropped.
118pub enum AtLeastOnce {}
119
120#[sealed::sealed]
121impl Retries for AtLeastOnce {
122 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
123}
124
125/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
126/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
127/// have `Self` guarantees instead.
128#[sealed::sealed]
129pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
130#[sealed::sealed]
131impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
132
133/// Helper trait for determining the weakest of two retry guarantees.
134#[sealed::sealed]
135pub trait MinRetries<Other: ?Sized> {
136 /// The weaker of the two retry guarantees.
137 type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
138}
139
140#[sealed::sealed]
141impl<R: Retries> MinRetries<R> for ExactlyOnce {
142 type Min = R;
143}
144
145#[sealed::sealed]
146impl<R: Retries> MinRetries<R> for AtLeastOnce {
147 type Min = AtLeastOnce;
148}
149
150#[sealed::sealed]
151#[diagnostic::on_unimplemented(
152 message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
153 label = "required here",
154 note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
155)]
156/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
157pub trait IsOrdered: Ordering {}
158
159#[sealed::sealed]
160#[diagnostic::do_not_recommend]
161impl IsOrdered for TotalOrder {}
162
163#[sealed::sealed]
164#[diagnostic::on_unimplemented(
165 message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
166 label = "required here",
167 note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
168)]
169/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
170pub trait IsExactlyOnce: Retries {}
171
172#[sealed::sealed]
173#[diagnostic::do_not_recommend]
174impl IsExactlyOnce for ExactlyOnce {}
175
176/// Streaming sequence of elements with type `Type`.
177///
178/// This live collection represents a growing sequence of elements, with new elements being
179/// asynchronously appended to the end of the sequence. This can be used to model the arrival
180/// of network input, such as API requests, or streaming ingestion.
181///
182/// By default, all streams have deterministic ordering and each element is materialized exactly
183/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
184/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
185/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
186///
187/// Type Parameters:
188/// - `Type`: the type of elements in the stream
189/// - `Loc`: the location where the stream is being materialized
190/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
191/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
192/// (default is [`TotalOrder`])
193/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
194/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
195pub struct Stream<
196 Type,
197 Loc,
198 Bound: Boundedness = Unbounded,
199 Order: Ordering = TotalOrder,
200 Retry: Retries = ExactlyOnce,
201> {
202 pub(crate) location: Loc,
203 pub(crate) ir_node: RefCell<HydroNode>,
204 pub(crate) flow_state: FlowState,
205
206 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
207}
208
209impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
210 fn drop(&mut self) {
211 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
212 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
213 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
214 input: Box::new(ir_node),
215 op_metadata: HydroIrOpMetadata::new(),
216 });
217 }
218 }
219}
220
221impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
222 for Stream<T, L, Unbounded, O, R>
223where
224 L: Location<'a>,
225{
226 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
227 let new_meta = stream
228 .location
229 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
230
231 Stream {
232 location: stream.location.clone(),
233 flow_state: stream.flow_state.clone(),
234 ir_node: RefCell::new(HydroNode::Cast {
235 inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
236 metadata: new_meta,
237 }),
238 _phantom: PhantomData,
239 }
240 }
241}
242
243impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
244 for Stream<T, L, B, NoOrder, R>
245where
246 L: Location<'a>,
247{
248 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
249 stream.weaken_ordering()
250 }
251}
252
253impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
254 for Stream<T, L, B, O, AtLeastOnce>
255where
256 L: Location<'a>,
257{
258 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
259 stream.weaken_retries()
260 }
261}
262
263impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
264where
265 L: Location<'a>,
266{
267 fn defer_tick(self) -> Self {
268 Stream::defer_tick(self)
269 }
270}
271
272impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
273 for Stream<T, Tick<L>, Bounded, O, R>
274where
275 L: Location<'a>,
276{
277 type Location = Tick<L>;
278
279 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
280 Stream::new(
281 location.clone(),
282 HydroNode::CycleSource {
283 cycle_id,
284 metadata: location.new_node_metadata(Self::collection_kind()),
285 },
286 )
287 }
288}
289
290impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
291 for Stream<T, Tick<L>, Bounded, O, R>
292where
293 L: Location<'a>,
294{
295 type Location = Tick<L>;
296
297 fn location(&self) -> &Self::Location {
298 self.location()
299 }
300
301 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
302 let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
303 location.clone(),
304 HydroNode::DeferTick {
305 input: Box::new(HydroNode::CycleSource {
306 cycle_id,
307 metadata: location.new_node_metadata(Self::collection_kind()),
308 }),
309 metadata: location.new_node_metadata(Self::collection_kind()),
310 },
311 );
312
313 from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
314 }
315}
316
317impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
318 for Stream<T, Tick<L>, Bounded, O, R>
319where
320 L: Location<'a>,
321{
322 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
323 assert_eq!(
324 Location::id(&self.location),
325 expected_location,
326 "locations do not match"
327 );
328 self.location
329 .flow_state()
330 .borrow_mut()
331 .push_root(HydroRoot::CycleSink {
332 cycle_id,
333 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
334 op_metadata: HydroIrOpMetadata::new(),
335 });
336 }
337}
338
339impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
340 for Stream<T, L, B, O, R>
341where
342 L: Location<'a>,
343{
344 type Location = L;
345
346 fn create_source(cycle_id: CycleId, location: L) -> Self {
347 Stream::new(
348 location.clone(),
349 HydroNode::CycleSource {
350 cycle_id,
351 metadata: location.new_node_metadata(Self::collection_kind()),
352 },
353 )
354 }
355}
356
357impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
358 for Stream<T, L, B, O, R>
359where
360 L: Location<'a>,
361{
362 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
363 assert_eq!(
364 Location::id(&self.location),
365 expected_location,
366 "locations do not match"
367 );
368 self.location
369 .flow_state()
370 .borrow_mut()
371 .push_root(HydroRoot::CycleSink {
372 cycle_id,
373 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
374 op_metadata: HydroIrOpMetadata::new(),
375 });
376 }
377}
378
379impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
380where
381 T: Clone,
382 L: Location<'a>,
383{
384 fn clone(&self) -> Self {
385 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
386 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
387 *self.ir_node.borrow_mut() = HydroNode::Tee {
388 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
389 metadata: self.location.new_node_metadata(Self::collection_kind()),
390 };
391 }
392
393 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
394 Stream {
395 location: self.location.clone(),
396 flow_state: self.flow_state.clone(),
397 ir_node: HydroNode::Tee {
398 inner: SharedNode(inner.0.clone()),
399 metadata: metadata.clone(),
400 }
401 .into(),
402 _phantom: PhantomData,
403 }
404 } else {
405 unreachable!()
406 }
407 }
408}
409
410impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
411where
412 L: Location<'a>,
413{
414 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
415 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
416 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
417
418 let flow_state = location.flow_state().clone();
419 Stream {
420 location,
421 flow_state,
422 ir_node: RefCell::new(ir_node),
423 _phantom: PhantomData,
424 }
425 }
426
427 /// Returns the [`Location`] where this stream is being materialized.
428 pub fn location(&self) -> &L {
429 &self.location
430 }
431
432 /// Weakens the consistency of this live collection to not guarantee any consistency across
433 /// cluster members (if this collection is on a cluster).
434 pub fn weaken_consistency(self) -> Stream<T, L::NoConsistency, B, O, R>
435 where
436 L: Location<'a>,
437 {
438 if L::consistency()
439 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
440 {
441 // already no consistency
442 Stream::new(
443 self.location.drop_consistency(),
444 self.ir_node.replace(HydroNode::Placeholder),
445 )
446 } else {
447 Stream::new(
448 self.location.drop_consistency(),
449 HydroNode::Cast {
450 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
451 metadata: self.location.drop_consistency().new_node_metadata(Stream::<
452 T,
453 L::NoConsistency,
454 B,
455 O,
456 R,
457 >::collection_kind(
458 )),
459 },
460 )
461 }
462 }
463
464 /// Casts this live collection to have the consistency guarantees specified in the given
465 /// location type parameter. The developer must ensure that the strengthened consistency
466 /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
467 pub fn assert_has_consistency_of<L2: Location<'a, NoConsistency = L::NoConsistency>>(
468 self,
469 _proof: impl crate::properties::ConsistencyProof,
470 ) -> Stream<T, L2, B, O, R>
471 where
472 L: Location<'a>,
473 {
474 if L::consistency() == L2::consistency() {
475 Stream::new(
476 self.location.with_consistency_of(),
477 self.ir_node.replace(HydroNode::Placeholder),
478 )
479 } else {
480 Stream::new(
481 self.location.with_consistency_of(),
482 HydroNode::AssertIsConsistent {
483 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
484 metadata: self
485 .location
486 .clone()
487 .with_consistency_of::<L2>()
488 .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
489 },
490 )
491 }
492 }
493
494 pub(crate) fn collection_kind() -> CollectionKind {
495 CollectionKind::Stream {
496 bound: B::BOUND_KIND,
497 order: O::ORDERING_KIND,
498 retry: R::RETRIES_KIND,
499 element_type: quote_type::<T>().into(),
500 }
501 }
502
503 /// Produces a stream based on invoking `f` on each element.
504 /// If you do not want to modify the stream and instead only want to view
505 /// each item use [`Stream::inspect`] instead.
506 ///
507 /// # Example
508 /// ```rust
509 /// # #[cfg(feature = "deploy")] {
510 /// # use hydro_lang::prelude::*;
511 /// # use futures::StreamExt;
512 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
513 /// let words = process.source_iter(q!(vec!["hello", "world"]));
514 /// words.map(q!(|x| x.to_uppercase()))
515 /// # }, |mut stream| async move {
516 /// # for w in vec!["HELLO", "WORLD"] {
517 /// # assert_eq!(stream.next().await.unwrap(), w);
518 /// # }
519 /// # }));
520 /// # }
521 /// ```
522 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
523 where
524 F: Fn(T) -> U + 'a,
525 {
526 let f = f.splice_fn1_ctx(&self.location).into();
527 Stream::new(
528 self.location.clone(),
529 HydroNode::Map {
530 f,
531 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
532 metadata: self
533 .location
534 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
535 },
536 )
537 }
538
539 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
540 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
541 /// for the output type `U` must produce items in a **deterministic** order.
542 ///
543 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
544 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
545 ///
546 /// # Example
547 /// ```rust
548 /// # #[cfg(feature = "deploy")] {
549 /// # use hydro_lang::prelude::*;
550 /// # use futures::StreamExt;
551 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
552 /// process
553 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
554 /// .flat_map_ordered(q!(|x| x))
555 /// # }, |mut stream| async move {
556 /// // 1, 2, 3, 4
557 /// # for w in (1..5) {
558 /// # assert_eq!(stream.next().await.unwrap(), w);
559 /// # }
560 /// # }));
561 /// # }
562 /// ```
563 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
564 where
565 I: IntoIterator<Item = U>,
566 F: Fn(T) -> I + 'a,
567 {
568 let f = f.splice_fn1_ctx(&self.location).into();
569 Stream::new(
570 self.location.clone(),
571 HydroNode::FlatMap {
572 f,
573 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
574 metadata: self
575 .location
576 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
577 },
578 )
579 }
580
581 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
582 /// for the output type `U` to produce items in any order.
583 ///
584 /// # Example
585 /// ```rust
586 /// # #[cfg(feature = "deploy")] {
587 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
588 /// # use futures::StreamExt;
589 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
590 /// process
591 /// .source_iter(q!(vec![
592 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
593 /// std::collections::HashSet::from_iter(vec![3, 4]),
594 /// ]))
595 /// .flat_map_unordered(q!(|x| x))
596 /// # }, |mut stream| async move {
597 /// // 1, 2, 3, 4, but in no particular order
598 /// # let mut results = Vec::new();
599 /// # for w in (1..5) {
600 /// # results.push(stream.next().await.unwrap());
601 /// # }
602 /// # results.sort();
603 /// # assert_eq!(results, vec![1, 2, 3, 4]);
604 /// # }));
605 /// # }
606 /// ```
607 pub fn flat_map_unordered<U, I, F>(
608 self,
609 f: impl IntoQuotedMut<'a, F, L>,
610 ) -> Stream<U, L, B, NoOrder, R>
611 where
612 I: IntoIterator<Item = U>,
613 F: Fn(T) -> I + 'a,
614 {
615 let f = f.splice_fn1_ctx(&self.location).into();
616 Stream::new(
617 self.location.clone(),
618 HydroNode::FlatMap {
619 f,
620 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
621 metadata: self
622 .location
623 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
624 },
625 )
626 }
627
628 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
629 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
630 ///
631 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
632 /// not deterministic, use [`Stream::flatten_unordered`] instead.
633 ///
634 /// ```rust
635 /// # #[cfg(feature = "deploy")] {
636 /// # use hydro_lang::prelude::*;
637 /// # use futures::StreamExt;
638 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
639 /// process
640 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
641 /// .flatten_ordered()
642 /// # }, |mut stream| async move {
643 /// // 1, 2, 3, 4
644 /// # for w in (1..5) {
645 /// # assert_eq!(stream.next().await.unwrap(), w);
646 /// # }
647 /// # }));
648 /// # }
649 /// ```
650 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
651 where
652 T: IntoIterator<Item = U>,
653 {
654 self.flat_map_ordered(q!(|d| d))
655 }
656
657 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
658 /// for the element type `T` to produce items in any order.
659 ///
660 /// # Example
661 /// ```rust
662 /// # #[cfg(feature = "deploy")] {
663 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
664 /// # use futures::StreamExt;
665 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
666 /// process
667 /// .source_iter(q!(vec![
668 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
669 /// std::collections::HashSet::from_iter(vec![3, 4]),
670 /// ]))
671 /// .flatten_unordered()
672 /// # }, |mut stream| async move {
673 /// // 1, 2, 3, 4, but in no particular order
674 /// # let mut results = Vec::new();
675 /// # for w in (1..5) {
676 /// # results.push(stream.next().await.unwrap());
677 /// # }
678 /// # results.sort();
679 /// # assert_eq!(results, vec![1, 2, 3, 4]);
680 /// # }));
681 /// # }
682 /// ```
683 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
684 where
685 T: IntoIterator<Item = U>,
686 {
687 self.flat_map_unordered(q!(|d| d))
688 }
689
690 /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
691 /// then emit the elements of that stream one by one. When the inner stream yields
692 /// `Pending`, this operator yields as well.
693 pub fn flat_map_stream_blocking<U, S, F>(
694 self,
695 f: impl IntoQuotedMut<'a, F, L>,
696 ) -> Stream<U, L, B, O, R>
697 where
698 S: futures::Stream<Item = U>,
699 F: Fn(T) -> S + 'a,
700 {
701 let f = f.splice_fn1_ctx(&self.location).into();
702 Stream::new(
703 self.location.clone(),
704 HydroNode::FlatMapStreamBlocking {
705 f,
706 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
707 metadata: self
708 .location
709 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
710 },
711 )
712 }
713
714 /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
715 /// emit its elements one by one. When the inner stream yields `Pending`, this operator
716 /// yields as well.
717 pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
718 where
719 T: futures::Stream<Item = U>,
720 {
721 self.flat_map_stream_blocking(q!(|d| d))
722 }
723
724 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
725 /// `f`, preserving the order of the elements.
726 ///
727 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
728 /// not modify or take ownership of the values. If you need to modify the values while filtering
729 /// use [`Stream::filter_map`] instead.
730 ///
731 /// # Example
732 /// ```rust
733 /// # #[cfg(feature = "deploy")] {
734 /// # use hydro_lang::prelude::*;
735 /// # use futures::StreamExt;
736 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
737 /// process
738 /// .source_iter(q!(vec![1, 2, 3, 4]))
739 /// .filter(q!(|&x| x > 2))
740 /// # }, |mut stream| async move {
741 /// // 3, 4
742 /// # for w in (3..5) {
743 /// # assert_eq!(stream.next().await.unwrap(), w);
744 /// # }
745 /// # }));
746 /// # }
747 /// ```
748 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
749 where
750 F: Fn(&T) -> bool + 'a,
751 {
752 let f = f.splice_fn1_borrow_ctx(&self.location).into();
753 Stream::new(
754 self.location.clone(),
755 HydroNode::Filter {
756 f,
757 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
758 metadata: self.location.new_node_metadata(Self::collection_kind()),
759 },
760 )
761 }
762
763 /// Splits the stream into two streams based on a predicate, without cloning elements.
764 ///
765 /// Elements for which `f` returns `true` are sent to the first output stream,
766 /// and elements for which `f` returns `false` are sent to the second output stream.
767 ///
768 /// Unlike using `filter` twice, this only evaluates the predicate once per element
769 /// and does not require `T: Clone`.
770 ///
771 /// The closure `f` receives a reference `&T` rather than an owned value `T` because
772 /// the predicate is only used for routing; the element itself is moved to the
773 /// appropriate output stream.
774 ///
775 /// # Example
776 /// ```rust
777 /// # #[cfg(feature = "deploy")] {
778 /// # use hydro_lang::prelude::*;
779 /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
780 /// # use futures::StreamExt;
781 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
782 /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
783 /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
784 /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
785 /// evens.map(q!(|x| (x, true)))
786 /// .merge_unordered(odds.map(q!(|x| (x, false))))
787 /// # }, |mut stream| async move {
788 /// # let mut results = Vec::new();
789 /// # for _ in 0..6 {
790 /// # results.push(stream.next().await.unwrap());
791 /// # }
792 /// # results.sort();
793 /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
794 /// # }));
795 /// # }
796 /// ```
797 #[expect(
798 clippy::type_complexity,
799 reason = "return type mirrors the input stream type"
800 )]
801 pub fn partition<F>(
802 self,
803 f: impl IntoQuotedMut<'a, F, L>,
804 ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
805 where
806 F: Fn(&T) -> bool + 'a,
807 {
808 let f: crate::compile::ir::DebugExpr = f.splice_fn1_borrow_ctx(&self.location).into();
809 let shared = SharedNode(Rc::new(RefCell::new(
810 self.ir_node.replace(HydroNode::Placeholder),
811 )));
812
813 let true_stream = Stream::new(
814 self.location.clone(),
815 HydroNode::Partition {
816 inner: SharedNode(shared.0.clone()),
817 f: f.clone(),
818 is_true: true,
819 metadata: self.location.new_node_metadata(Self::collection_kind()),
820 },
821 );
822
823 let false_stream = Stream::new(
824 self.location.clone(),
825 HydroNode::Partition {
826 inner: SharedNode(shared.0),
827 f,
828 is_true: false,
829 metadata: self.location.new_node_metadata(Self::collection_kind()),
830 },
831 );
832
833 (true_stream, false_stream)
834 }
835
836 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
837 ///
838 /// # Example
839 /// ```rust
840 /// # #[cfg(feature = "deploy")] {
841 /// # use hydro_lang::prelude::*;
842 /// # use futures::StreamExt;
843 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
844 /// process
845 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
846 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
847 /// # }, |mut stream| async move {
848 /// // 1, 2
849 /// # for w in (1..3) {
850 /// # assert_eq!(stream.next().await.unwrap(), w);
851 /// # }
852 /// # }));
853 /// # }
854 /// ```
855 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
856 where
857 F: Fn(T) -> Option<U> + 'a,
858 {
859 let f = f.splice_fn1_ctx(&self.location).into();
860 Stream::new(
861 self.location.clone(),
862 HydroNode::FilterMap {
863 f,
864 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
865 metadata: self
866 .location
867 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
868 },
869 )
870 }
871
872 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
873 /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
874 /// If `other` is an empty [`Optional`], no values will be produced.
875 ///
876 /// # Example
877 /// ```rust
878 /// # #[cfg(feature = "deploy")] {
879 /// # use hydro_lang::prelude::*;
880 /// # use futures::StreamExt;
881 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
882 /// let tick = process.tick();
883 /// let batch = process
884 /// .source_iter(q!(vec![1, 2, 3, 4]))
885 /// .batch(&tick, nondet!(/** test */));
886 /// let count = batch.clone().count(); // `count()` returns a singleton
887 /// batch.cross_singleton(count).all_ticks()
888 /// # }, |mut stream| async move {
889 /// // (1, 4), (2, 4), (3, 4), (4, 4)
890 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
891 /// # assert_eq!(stream.next().await.unwrap(), w);
892 /// # }
893 /// # }));
894 /// # }
895 /// ```
896 pub fn cross_singleton<O2>(
897 self,
898 other: impl Into<Optional<O2, L, Bounded>>,
899 ) -> Stream<(T, O2), L, B, O, R>
900 where
901 O2: Clone,
902 {
903 let other: Optional<O2, L, Bounded> = other.into();
904 check_matching_location(&self.location, &other.location);
905
906 Stream::new(
907 self.location.clone(),
908 HydroNode::CrossSingleton {
909 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
910 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
911 metadata: self
912 .location
913 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
914 },
915 )
916 }
917
918 /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
919 ///
920 /// # Example
921 /// ```rust
922 /// # #[cfg(feature = "deploy")] {
923 /// # use hydro_lang::prelude::*;
924 /// # use futures::StreamExt;
925 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
926 /// let tick = process.tick();
927 /// // ticks are lazy by default, forces the second tick to run
928 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
929 ///
930 /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
931 /// let batch_first_tick = process
932 /// .source_iter(q!(vec![1, 2, 3, 4]))
933 /// .batch(&tick, nondet!(/** test */));
934 /// let batch_second_tick = process
935 /// .source_iter(q!(vec![5, 6, 7, 8]))
936 /// .batch(&tick, nondet!(/** test */))
937 /// .defer_tick();
938 /// batch_first_tick.chain(batch_second_tick)
939 /// .filter_if(signal)
940 /// .all_ticks()
941 /// # }, |mut stream| async move {
942 /// // [1, 2, 3, 4]
943 /// # for w in vec![1, 2, 3, 4] {
944 /// # assert_eq!(stream.next().await.unwrap(), w);
945 /// # }
946 /// # }));
947 /// # }
948 /// ```
949 pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
950 self.cross_singleton(signal.filter(q!(|b| *b)))
951 .map(q!(|(d, _)| d))
952 }
953
954 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
955 ///
956 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
957 /// leader of a cluster.
958 ///
959 /// # Example
960 /// ```rust
961 /// # #[cfg(feature = "deploy")] {
962 /// # use hydro_lang::prelude::*;
963 /// # use futures::StreamExt;
964 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
965 /// let tick = process.tick();
966 /// // ticks are lazy by default, forces the second tick to run
967 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
968 ///
969 /// let batch_first_tick = process
970 /// .source_iter(q!(vec![1, 2, 3, 4]))
971 /// .batch(&tick, nondet!(/** test */));
972 /// let batch_second_tick = process
973 /// .source_iter(q!(vec![5, 6, 7, 8]))
974 /// .batch(&tick, nondet!(/** test */))
975 /// .defer_tick(); // appears on the second tick
976 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
977 /// batch_first_tick.chain(batch_second_tick)
978 /// .filter_if_some(some_on_first_tick)
979 /// .all_ticks()
980 /// # }, |mut stream| async move {
981 /// // [1, 2, 3, 4]
982 /// # for w in vec![1, 2, 3, 4] {
983 /// # assert_eq!(stream.next().await.unwrap(), w);
984 /// # }
985 /// # }));
986 /// # }
987 /// ```
988 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
989 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
990 self.filter_if(signal.is_some())
991 }
992
993 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
994 ///
995 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
996 /// some local state.
997 ///
998 /// # Example
999 /// ```rust
1000 /// # #[cfg(feature = "deploy")] {
1001 /// # use hydro_lang::prelude::*;
1002 /// # use futures::StreamExt;
1003 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1004 /// let tick = process.tick();
1005 /// // ticks are lazy by default, forces the second tick to run
1006 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1007 ///
1008 /// let batch_first_tick = process
1009 /// .source_iter(q!(vec![1, 2, 3, 4]))
1010 /// .batch(&tick, nondet!(/** test */));
1011 /// let batch_second_tick = process
1012 /// .source_iter(q!(vec![5, 6, 7, 8]))
1013 /// .batch(&tick, nondet!(/** test */))
1014 /// .defer_tick(); // appears on the second tick
1015 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1016 /// batch_first_tick.chain(batch_second_tick)
1017 /// .filter_if_none(some_on_first_tick)
1018 /// .all_ticks()
1019 /// # }, |mut stream| async move {
1020 /// // [5, 6, 7, 8]
1021 /// # for w in vec![5, 6, 7, 8] {
1022 /// # assert_eq!(stream.next().await.unwrap(), w);
1023 /// # }
1024 /// # }));
1025 /// # }
1026 /// ```
1027 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1028 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1029 self.filter_if(other.is_none())
1030 }
1031
1032 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams,
1033 /// returning all tupled pairs.
1034 ///
1035 /// When the right side is [`Bounded`], it is accumulated first and the left side streams
1036 /// through, preserving the left side's ordering. When both sides are [`Unbounded`], a
1037 /// symmetric hash join is used and ordering is [`NoOrder`].
1038 ///
1039 /// # Example
1040 /// ```rust
1041 /// # #[cfg(feature = "deploy")] {
1042 /// # use hydro_lang::prelude::*;
1043 /// # use std::collections::HashSet;
1044 /// # use futures::StreamExt;
1045 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1046 /// let tick = process.tick();
1047 /// let stream1 = process.source_iter(q!(vec![1, 2]));
1048 /// let stream2 = process.source_iter(q!(vec!['a', 'b']));
1049 /// stream1.cross_product(stream2)
1050 /// # }, |mut stream| async move {
1051 /// // (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b') in any order
1052 /// # let expected = HashSet::from([(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]);
1053 /// # stream.map(|i| assert!(expected.contains(&i)));
1054 /// # }));
1055 /// # }
1056 pub fn cross_product<T2, B2: Boundedness, O2: Ordering>(
1057 self,
1058 other: Stream<T2, L, B2, O2, R>,
1059 ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, R>
1060 where
1061 T: Clone,
1062 T2: Clone,
1063 {
1064 self.map(q!(|v| ((), v)))
1065 .join(other.map(q!(|v| ((), v))))
1066 .map(q!(|((), (v1, v2))| (v1, v2)))
1067 }
1068
1069 /// Takes one stream as input and filters out any duplicate occurrences. The output
1070 /// contains all unique values from the input.
1071 ///
1072 /// # Example
1073 /// ```rust
1074 /// # #[cfg(feature = "deploy")] {
1075 /// # use hydro_lang::prelude::*;
1076 /// # use futures::StreamExt;
1077 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1078 /// let tick = process.tick();
1079 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1080 /// # }, |mut stream| async move {
1081 /// # for w in vec![1, 2, 3, 4] {
1082 /// # assert_eq!(stream.next().await.unwrap(), w);
1083 /// # }
1084 /// # }));
1085 /// # }
1086 /// ```
1087 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1088 where
1089 T: Eq + Hash,
1090 {
1091 Stream::new(
1092 self.location.clone(),
1093 HydroNode::Unique {
1094 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1095 metadata: self
1096 .location
1097 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1098 },
1099 )
1100 }
1101
1102 /// Outputs everything in this stream that is *not* contained in the `other` stream.
1103 ///
1104 /// The `other` stream must be [`Bounded`], since this function will wait until
1105 /// all its elements are available before producing any output.
1106 /// # Example
1107 /// ```rust
1108 /// # #[cfg(feature = "deploy")] {
1109 /// # use hydro_lang::prelude::*;
1110 /// # use futures::StreamExt;
1111 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1112 /// let tick = process.tick();
1113 /// let stream = process
1114 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1115 /// .batch(&tick, nondet!(/** test */));
1116 /// let batch = process
1117 /// .source_iter(q!(vec![1, 2]))
1118 /// .batch(&tick, nondet!(/** test */));
1119 /// stream.filter_not_in(batch).all_ticks()
1120 /// # }, |mut stream| async move {
1121 /// # for w in vec![3, 4] {
1122 /// # assert_eq!(stream.next().await.unwrap(), w);
1123 /// # }
1124 /// # }));
1125 /// # }
1126 /// ```
1127 pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1128 where
1129 T: Eq + Hash,
1130 B2: IsBounded,
1131 {
1132 check_matching_location(&self.location, &other.location);
1133
1134 Stream::new(
1135 self.location.clone(),
1136 HydroNode::Difference {
1137 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1138 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1139 metadata: self
1140 .location
1141 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1142 },
1143 )
1144 }
1145
1146 /// An operator which allows you to "inspect" each element of a stream without
1147 /// modifying it. The closure `f` is called on a reference to each item. This is
1148 /// mainly useful for debugging, and should not be used to generate side-effects.
1149 ///
1150 /// # Example
1151 /// ```rust
1152 /// # #[cfg(feature = "deploy")] {
1153 /// # use hydro_lang::prelude::*;
1154 /// # use futures::StreamExt;
1155 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1156 /// let nums = process.source_iter(q!(vec![1, 2]));
1157 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1158 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1159 /// # }, |mut stream| async move {
1160 /// # for w in vec![1, 2] {
1161 /// # assert_eq!(stream.next().await.unwrap(), w);
1162 /// # }
1163 /// # }));
1164 /// # }
1165 /// ```
1166 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1167 where
1168 F: Fn(&T) + 'a,
1169 {
1170 let f = f.splice_fn1_borrow_ctx(&self.location).into();
1171
1172 Stream::new(
1173 self.location.clone(),
1174 HydroNode::Inspect {
1175 f,
1176 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1177 metadata: self.location.new_node_metadata(Self::collection_kind()),
1178 },
1179 )
1180 }
1181
1182 /// Executes the provided closure for every element in this stream.
1183 ///
1184 /// Because the closure may have side effects, the stream must have deterministic order
1185 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1186 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1187 /// [`Stream::assume_retries`] with an explanation for why this is the case.
1188 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1189 where
1190 O: IsOrdered,
1191 R: IsExactlyOnce,
1192 {
1193 let f = f.splice_fn1_ctx(&self.location).into();
1194 self.location
1195 .flow_state()
1196 .borrow_mut()
1197 .push_root(HydroRoot::ForEach {
1198 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1199 f,
1200 op_metadata: HydroIrOpMetadata::new(),
1201 });
1202 }
1203
1204 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1205 /// TCP socket to some other server. You should _not_ use this API for interacting with
1206 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1207 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1208 /// interaction with asynchronous sinks.
1209 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1210 where
1211 O: IsOrdered,
1212 R: IsExactlyOnce,
1213 S: 'a + futures::Sink<T> + Unpin,
1214 {
1215 self.location
1216 .flow_state()
1217 .borrow_mut()
1218 .push_root(HydroRoot::DestSink {
1219 sink: sink.splice_typed_ctx(&self.location).into(),
1220 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1221 op_metadata: HydroIrOpMetadata::new(),
1222 });
1223 }
1224
1225 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1226 ///
1227 /// # Example
1228 /// ```rust
1229 /// # #[cfg(feature = "deploy")] {
1230 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1231 /// # use futures::StreamExt;
1232 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1233 /// let tick = process.tick();
1234 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1235 /// numbers.enumerate()
1236 /// # }, |mut stream| async move {
1237 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1238 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1239 /// # assert_eq!(stream.next().await.unwrap(), w);
1240 /// # }
1241 /// # }));
1242 /// # }
1243 /// ```
1244 pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1245 where
1246 O: IsOrdered,
1247 R: IsExactlyOnce,
1248 {
1249 Stream::new(
1250 self.location.clone(),
1251 HydroNode::Enumerate {
1252 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1253 metadata: self.location.new_node_metadata(Stream::<
1254 (usize, T),
1255 L,
1256 B,
1257 TotalOrder,
1258 ExactlyOnce,
1259 >::collection_kind()),
1260 },
1261 )
1262 }
1263
1264 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1265 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1266 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1267 ///
1268 /// Depending on the input stream guarantees, the closure may need to be commutative
1269 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1270 ///
1271 /// # Example
1272 /// ```rust
1273 /// # #[cfg(feature = "deploy")] {
1274 /// # use hydro_lang::prelude::*;
1275 /// # use futures::StreamExt;
1276 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1277 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1278 /// words
1279 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1280 /// .into_stream()
1281 /// # }, |mut stream| async move {
1282 /// // "HELLOWORLD"
1283 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1284 /// # }));
1285 /// # }
1286 /// ```
1287 pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1288 self,
1289 init: impl IntoQuotedMut<'a, I, L>,
1290 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1291 ) -> Singleton<A, L, B2>
1292 where
1293 I: Fn() -> A + 'a,
1294 F: Fn(&mut A, T),
1295 C: ValidCommutativityFor<O>,
1296 Idemp: ValidIdempotenceFor<R>,
1297 B: ApplyMonotoneStream<M, B2>,
1298 {
1299 let init = init.splice_fn0_ctx(&self.location).into();
1300 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1301 proof.register_proof(&comb);
1302
1303 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1304 let ordered_etc: Stream<T, L::NoConsistency, B> =
1305 self.assume_retries(nondet).assume_ordering(nondet);
1306
1307 let core = HydroNode::Fold {
1308 init,
1309 acc: comb.into(),
1310 input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1311 metadata: ordered_etc
1312 .location
1313 .new_node_metadata(Singleton::<A, L::NoConsistency, B2>::collection_kind()),
1314 };
1315
1316 Singleton::new(ordered_etc.location.clone(), core)
1317 .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1318 }
1319
1320 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1321 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1322 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1323 /// reference, so that it can be modified in place.
1324 ///
1325 /// Depending on the input stream guarantees, the closure may need to be commutative
1326 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1327 ///
1328 /// # Example
1329 /// ```rust
1330 /// # #[cfg(feature = "deploy")] {
1331 /// # use hydro_lang::prelude::*;
1332 /// # use futures::StreamExt;
1333 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1334 /// let bools = process.source_iter(q!(vec![false, true, false]));
1335 /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1336 /// # }, |mut stream| async move {
1337 /// // true
1338 /// # assert_eq!(stream.next().await.unwrap(), true);
1339 /// # }));
1340 /// # }
1341 /// ```
1342 pub fn reduce<F, C, Idemp>(
1343 self,
1344 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1345 ) -> Optional<T, L, B>
1346 where
1347 F: Fn(&mut T, T) + 'a,
1348 C: ValidCommutativityFor<O>,
1349 Idemp: ValidIdempotenceFor<R>,
1350 {
1351 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1352 proof.register_proof(&f);
1353
1354 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1355 let ordered_etc: Stream<T, L::NoConsistency, B> =
1356 self.assume_retries(nondet).assume_ordering(nondet);
1357
1358 let core = HydroNode::Reduce {
1359 f: f.into(),
1360 input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1361 metadata: ordered_etc
1362 .location
1363 .new_node_metadata(Optional::<T, L::NoConsistency, B>::collection_kind()),
1364 };
1365
1366 Optional::new(ordered_etc.location.clone(), core)
1367 .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1368 }
1369
1370 /// Computes the maximum element in the stream as an [`Optional`], which
1371 /// will be empty until the first element in the input arrives.
1372 ///
1373 /// # Example
1374 /// ```rust
1375 /// # #[cfg(feature = "deploy")] {
1376 /// # use hydro_lang::prelude::*;
1377 /// # use futures::StreamExt;
1378 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1379 /// let tick = process.tick();
1380 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1381 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1382 /// batch.max().all_ticks()
1383 /// # }, |mut stream| async move {
1384 /// // 4
1385 /// # assert_eq!(stream.next().await.unwrap(), 4);
1386 /// # }));
1387 /// # }
1388 /// ```
1389 pub fn max(self) -> Optional<T, L, B>
1390 where
1391 T: Ord,
1392 {
1393 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1394 .assume_ordering_trusted_bounded::<TotalOrder>(
1395 nondet!(/** max is commutative, but order affects intermediates */),
1396 )
1397 .reduce(q!(|curr, new| {
1398 if new > *curr {
1399 *curr = new;
1400 }
1401 }))
1402 }
1403
1404 /// Computes the minimum element in the stream as an [`Optional`], which
1405 /// will be empty until the first element in the input arrives.
1406 ///
1407 /// # Example
1408 /// ```rust
1409 /// # #[cfg(feature = "deploy")] {
1410 /// # use hydro_lang::prelude::*;
1411 /// # use futures::StreamExt;
1412 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1413 /// let tick = process.tick();
1414 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1415 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1416 /// batch.min().all_ticks()
1417 /// # }, |mut stream| async move {
1418 /// // 1
1419 /// # assert_eq!(stream.next().await.unwrap(), 1);
1420 /// # }));
1421 /// # }
1422 /// ```
1423 pub fn min(self) -> Optional<T, L, B>
1424 where
1425 T: Ord,
1426 {
1427 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1428 .assume_ordering_trusted_bounded::<TotalOrder>(
1429 nondet!(/** max is commutative, but order affects intermediates */),
1430 )
1431 .reduce(q!(|curr, new| {
1432 if new < *curr {
1433 *curr = new;
1434 }
1435 }))
1436 }
1437
1438 /// Computes the first element in the stream as an [`Optional`], which
1439 /// will be empty until the first element in the input arrives.
1440 ///
1441 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1442 /// re-ordering of elements may cause the first element to change.
1443 ///
1444 /// # Example
1445 /// ```rust
1446 /// # #[cfg(feature = "deploy")] {
1447 /// # use hydro_lang::prelude::*;
1448 /// # use futures::StreamExt;
1449 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1450 /// let tick = process.tick();
1451 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1452 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1453 /// batch.first().all_ticks()
1454 /// # }, |mut stream| async move {
1455 /// // 1
1456 /// # assert_eq!(stream.next().await.unwrap(), 1);
1457 /// # }));
1458 /// # }
1459 /// ```
1460 pub fn first(self) -> Optional<T, L, B>
1461 where
1462 O: IsOrdered,
1463 {
1464 self.make_totally_ordered()
1465 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1466 .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1467 .reduce(q!(|_, _| {}))
1468 }
1469
1470 /// Computes the last element in the stream as an [`Optional`], which
1471 /// will be empty until an element in the input arrives.
1472 ///
1473 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1474 /// re-ordering of elements may cause the last element to change.
1475 ///
1476 /// # Example
1477 /// ```rust
1478 /// # #[cfg(feature = "deploy")] {
1479 /// # use hydro_lang::prelude::*;
1480 /// # use futures::StreamExt;
1481 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1482 /// let tick = process.tick();
1483 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1484 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1485 /// batch.last().all_ticks()
1486 /// # }, |mut stream| async move {
1487 /// // 4
1488 /// # assert_eq!(stream.next().await.unwrap(), 4);
1489 /// # }));
1490 /// # }
1491 /// ```
1492 pub fn last(self) -> Optional<T, L, B>
1493 where
1494 O: IsOrdered,
1495 {
1496 self.make_totally_ordered()
1497 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1498 .reduce(q!(|curr, new| *curr = new))
1499 }
1500
1501 /// Returns a stream containing at most the first `n` elements of the input stream,
1502 /// preserving the original order. Similar to `LIMIT` in SQL.
1503 ///
1504 /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1505 /// retries, since the result depends on the order and cardinality of elements.
1506 ///
1507 /// # Example
1508 /// ```rust
1509 /// # #[cfg(feature = "deploy")] {
1510 /// # use hydro_lang::prelude::*;
1511 /// # use futures::StreamExt;
1512 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1513 /// let numbers = process.source_iter(q!(vec![10, 20, 30, 40, 50]));
1514 /// numbers.limit(q!(3))
1515 /// # }, |mut stream| async move {
1516 /// // 10, 20, 30
1517 /// # for w in vec![10, 20, 30] {
1518 /// # assert_eq!(stream.next().await.unwrap(), w);
1519 /// # }
1520 /// # }));
1521 /// # }
1522 /// ```
1523 pub fn limit(
1524 self,
1525 n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1526 ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1527 where
1528 O: IsOrdered,
1529 R: IsExactlyOnce,
1530 {
1531 self.generator(
1532 q!(|| 0usize),
1533 q!(move |count, item| {
1534 if *count == n {
1535 Generate::Break
1536 } else {
1537 *count += 1;
1538 if *count == n {
1539 Generate::Return(item)
1540 } else {
1541 Generate::Yield(item)
1542 }
1543 }
1544 }),
1545 )
1546 }
1547
1548 /// Collects all the elements of this stream into a single [`Vec`] element.
1549 ///
1550 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1551 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1552 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1553 /// the vector at an arbitrary point in time.
1554 ///
1555 /// # Example
1556 /// ```rust
1557 /// # #[cfg(feature = "deploy")] {
1558 /// # use hydro_lang::prelude::*;
1559 /// # use futures::StreamExt;
1560 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1561 /// let tick = process.tick();
1562 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1563 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1564 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1565 /// # }, |mut stream| async move {
1566 /// // [ vec![1, 2, 3, 4] ]
1567 /// # for w in vec![vec![1, 2, 3, 4]] {
1568 /// # assert_eq!(stream.next().await.unwrap(), w);
1569 /// # }
1570 /// # }));
1571 /// # }
1572 /// ```
1573 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1574 where
1575 O: IsOrdered,
1576 R: IsExactlyOnce,
1577 {
1578 self.make_totally_ordered().make_exactly_once().fold(
1579 q!(|| vec![]),
1580 q!(|acc, v| {
1581 acc.push(v);
1582 }),
1583 )
1584 }
1585
1586 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1587 /// and emitting each intermediate result.
1588 ///
1589 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1590 /// containing all intermediate accumulated values. The scan operation can also terminate early
1591 /// by returning `None`.
1592 ///
1593 /// The function takes a mutable reference to the accumulator and the current element, and returns
1594 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1595 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1596 ///
1597 /// # Examples
1598 ///
1599 /// Basic usage - running sum:
1600 /// ```rust
1601 /// # #[cfg(feature = "deploy")] {
1602 /// # use hydro_lang::prelude::*;
1603 /// # use futures::StreamExt;
1604 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1605 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1606 /// q!(|| 0),
1607 /// q!(|acc, x| {
1608 /// *acc += x;
1609 /// Some(*acc)
1610 /// }),
1611 /// )
1612 /// # }, |mut stream| async move {
1613 /// // Output: 1, 3, 6, 10
1614 /// # for w in vec![1, 3, 6, 10] {
1615 /// # assert_eq!(stream.next().await.unwrap(), w);
1616 /// # }
1617 /// # }));
1618 /// # }
1619 /// ```
1620 ///
1621 /// Early termination example:
1622 /// ```rust
1623 /// # #[cfg(feature = "deploy")] {
1624 /// # use hydro_lang::prelude::*;
1625 /// # use futures::StreamExt;
1626 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1627 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1628 /// q!(|| 1),
1629 /// q!(|state, x| {
1630 /// *state = *state * x;
1631 /// if *state > 6 {
1632 /// None // Terminate the stream
1633 /// } else {
1634 /// Some(-*state)
1635 /// }
1636 /// }),
1637 /// )
1638 /// # }, |mut stream| async move {
1639 /// // Output: -1, -2, -6
1640 /// # for w in vec![-1, -2, -6] {
1641 /// # assert_eq!(stream.next().await.unwrap(), w);
1642 /// # }
1643 /// # }));
1644 /// # }
1645 /// ```
1646 pub fn scan<A, U, I, F>(
1647 self,
1648 init: impl IntoQuotedMut<'a, I, L>,
1649 f: impl IntoQuotedMut<'a, F, L>,
1650 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1651 where
1652 O: IsOrdered,
1653 R: IsExactlyOnce,
1654 I: Fn() -> A + 'a,
1655 F: Fn(&mut A, T) -> Option<U> + 'a,
1656 {
1657 let init = init.splice_fn0_ctx(&self.location).into();
1658 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1659
1660 Stream::new(
1661 self.location.clone(),
1662 HydroNode::Scan {
1663 init,
1664 acc: f,
1665 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1666 metadata: self.location.new_node_metadata(
1667 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1668 ),
1669 },
1670 )
1671 }
1672
1673 /// Async version of [`Stream::scan`]. Applies an async function to each element of the
1674 /// stream, maintaining an internal state (accumulator) and emitting the values returned
1675 /// by the function.
1676 ///
1677 /// The closure runs synchronously (so it can mutate the accumulator), then returns a
1678 /// future. The future is polled to completion. If it resolves to `Some`, the value is
1679 /// emitted. If it resolves to `None`, the item is filtered out.
1680 ///
1681 /// # Examples
1682 ///
1683 /// ```rust
1684 /// # #[cfg(feature = "deploy")] {
1685 /// # use hydro_lang::prelude::*;
1686 /// # use futures::StreamExt;
1687 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1688 /// process
1689 /// .source_iter(q!(vec![1, 2, 3, 4]))
1690 /// .scan_async_blocking(
1691 /// q!(|| 0),
1692 /// q!(|acc, x| {
1693 /// *acc += x;
1694 /// let val = *acc;
1695 /// async move { Some(val) }
1696 /// }),
1697 /// )
1698 /// # }, |mut stream| async move {
1699 /// // Output: 1, 3, 6, 10
1700 /// # for w in vec![1, 3, 6, 10] {
1701 /// # assert_eq!(stream.next().await.unwrap(), w);
1702 /// # }
1703 /// # }));
1704 /// # }
1705 /// ```
1706 pub fn scan_async_blocking<A, U, I, F, Fut>(
1707 self,
1708 init: impl IntoQuotedMut<'a, I, L>,
1709 f: impl IntoQuotedMut<'a, F, L>,
1710 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1711 where
1712 O: IsOrdered,
1713 R: IsExactlyOnce,
1714 I: Fn() -> A + 'a,
1715 F: Fn(&mut A, T) -> Fut + 'a,
1716 Fut: Future<Output = Option<U>> + 'a,
1717 {
1718 let init = init.splice_fn0_ctx(&self.location).into();
1719 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1720
1721 Stream::new(
1722 self.location.clone(),
1723 HydroNode::ScanAsyncBlocking {
1724 init,
1725 acc: f,
1726 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1727 metadata: self.location.new_node_metadata(
1728 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1729 ),
1730 },
1731 )
1732 }
1733
1734 /// Iteratively processes the elements of the stream using a state machine that can yield
1735 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1736 /// syntax in Rust, without requiring special syntax.
1737 ///
1738 /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1739 /// state. The second argument defines the processing logic, taking in a mutable reference
1740 /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1741 /// variants define what is emitted and whether further inputs should be processed.
1742 ///
1743 /// # Example
1744 /// ```rust
1745 /// # #[cfg(feature = "deploy")] {
1746 /// # use hydro_lang::prelude::*;
1747 /// # use futures::StreamExt;
1748 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1749 /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1750 /// q!(|| 0),
1751 /// q!(|acc, x| {
1752 /// *acc += x;
1753 /// if *acc > 100 {
1754 /// hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1755 /// } else if *acc % 2 == 0 {
1756 /// hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1757 /// } else {
1758 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1759 /// }
1760 /// }),
1761 /// )
1762 /// # }, |mut stream| async move {
1763 /// // Output: "even", "done!"
1764 /// # let mut results = Vec::new();
1765 /// # for _ in 0..2 {
1766 /// # results.push(stream.next().await.unwrap());
1767 /// # }
1768 /// # results.sort();
1769 /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1770 /// # }));
1771 /// # }
1772 /// ```
1773 pub fn generator<A, U, I, F>(
1774 self,
1775 init: impl IntoQuotedMut<'a, I, L> + Copy,
1776 f: impl IntoQuotedMut<'a, F, L> + Copy,
1777 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1778 where
1779 O: IsOrdered,
1780 R: IsExactlyOnce,
1781 I: Fn() -> A + 'a,
1782 F: Fn(&mut A, T) -> Generate<U> + 'a,
1783 {
1784 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1785 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1786
1787 let this = self.make_totally_ordered().make_exactly_once();
1788
1789 // State is Option<Option<A>>:
1790 // None = not yet initialized
1791 // Some(Some(a)) = active with state a
1792 // Some(None) = terminated
1793 let scan_init = q!(|| None)
1794 .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1795 .into();
1796 let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1797 if state.is_none() {
1798 *state = Some(Some(init()));
1799 }
1800 match state {
1801 Some(Some(state_value)) => match f(state_value, v) {
1802 Generate::Yield(out) => Some(Some(out)),
1803 Generate::Return(out) => {
1804 *state = Some(None);
1805 Some(Some(out))
1806 }
1807 // Unlike KeyedStream, we can terminate the scan directly on
1808 // Break/Return because there is only one state (no other keys
1809 // that still need processing).
1810 Generate::Break => None,
1811 Generate::Continue => Some(None),
1812 },
1813 // State is Some(None) after Return; terminate the scan.
1814 _ => None,
1815 }
1816 })
1817 .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1818 .into();
1819
1820 let scan_node = HydroNode::Scan {
1821 init: scan_init,
1822 acc: scan_f,
1823 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1824 metadata: this.location.new_node_metadata(Stream::<
1825 Option<U>,
1826 L,
1827 B,
1828 TotalOrder,
1829 ExactlyOnce,
1830 >::collection_kind()),
1831 };
1832
1833 let flatten_f = q!(|d| d)
1834 .splice_fn1_ctx::<Option<U>, _>(&this.location)
1835 .into();
1836 let flatten_node = HydroNode::FlatMap {
1837 f: flatten_f,
1838 input: Box::new(scan_node),
1839 metadata: this
1840 .location
1841 .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1842 };
1843
1844 Stream::new(this.location.clone(), flatten_node)
1845 }
1846
1847 /// Given a time interval, returns a stream corresponding to samples taken from the
1848 /// stream roughly at that interval. The output will have elements in the same order
1849 /// as the input, but with arbitrary elements skipped between samples. There is also
1850 /// no guarantee on the exact timing of the samples.
1851 ///
1852 /// # Non-Determinism
1853 /// The output stream is non-deterministic in which elements are sampled, since this
1854 /// is controlled by a clock.
1855 pub fn sample_every(
1856 self,
1857 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1858 nondet: NonDet,
1859 ) -> Stream<T, L::NoConsistency, Unbounded, O, AtLeastOnce>
1860 where
1861 L: NoAtomic,
1862 {
1863 let samples = self.location.source_interval(interval, nondet);
1864
1865 let tick = self.location.tick();
1866 self.batch(&tick, nondet)
1867 .filter_if(samples.batch(&tick, nondet).first().is_some())
1868 .all_ticks()
1869 .weaken_retries()
1870 }
1871
1872 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
1873 /// stream has not emitted a value since that duration.
1874 ///
1875 /// # Non-Determinism
1876 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1877 /// samples take place, timeouts may be non-deterministically generated or missed,
1878 /// and the notification of the timeout may be delayed as well. There is also no
1879 /// guarantee on how long the [`Optional`] will have a value after the timeout is
1880 /// detected based on when the next sample is taken.
1881 pub fn timeout(
1882 self,
1883 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L::NoConsistency>> + Copy + 'a,
1884 nondet: NonDet,
1885 ) -> Optional<(), L::NoConsistency, Unbounded>
1886 where
1887 L: NoAtomic,
1888 {
1889 let tick = self.location.tick();
1890
1891 let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1892 q!(|| None),
1893 q!(
1894 |latest, _| {
1895 *latest = Some(Instant::now());
1896 },
1897 commutative = manual_proof!(/** TODO */)
1898 ),
1899 );
1900
1901 latest_received
1902 .snapshot(&tick, nondet)
1903 .filter_map(q!(move |latest_received| {
1904 if let Some(latest_received) = latest_received {
1905 if Instant::now().duration_since(latest_received) > duration {
1906 Some(())
1907 } else {
1908 None
1909 }
1910 } else {
1911 Some(())
1912 }
1913 }))
1914 .latest()
1915 }
1916
1917 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1918 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1919 ///
1920 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1921 /// processed before an acknowledgement is emitted.
1922 pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
1923 let id = self.location.flow_state().borrow_mut().next_clock_id();
1924 let out_location = Atomic {
1925 tick: Tick {
1926 id,
1927 l: self.location.clone(),
1928 },
1929 };
1930 Stream::new(
1931 out_location.clone(),
1932 HydroNode::BeginAtomic {
1933 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1934 metadata: out_location
1935 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1936 },
1937 )
1938 }
1939
1940 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1941 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1942 /// the order of the input. The output stream will execute in the [`Tick`] that was
1943 /// used to create the atomic section.
1944 ///
1945 /// # Non-Determinism
1946 /// The batch boundaries are non-deterministic and may change across executions.
1947 pub fn batch<L2: Location<'a, NoConsistency = L::NoConsistency>>(
1948 self,
1949 tick: &Tick<L2>,
1950 _nondet: NonDet,
1951 ) -> Stream<T, Tick<L::NoConsistency>, Bounded, O, R> {
1952 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1953 Stream::new(
1954 tick.drop_consistency(),
1955 HydroNode::Batch {
1956 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1957 metadata: tick
1958 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1959 },
1960 )
1961 }
1962
1963 /// An operator which allows you to "name" a `HydroNode`.
1964 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1965 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1966 {
1967 let mut node = self.ir_node.borrow_mut();
1968 let metadata = node.metadata_mut();
1969 metadata.tag = Some(name.to_owned());
1970 }
1971 self
1972 }
1973
1974 /// Turns this [`Stream`] into a [`Optional`], under the invariant assumption that there is at
1975 /// most one element. If this invariant is broken, the program may exhibit undefined behavior,
1976 /// so uses must be carefully vetted.
1977 pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
1978 where
1979 B: IsBounded,
1980 {
1981 Optional::new(
1982 self.location.clone(),
1983 HydroNode::Cast {
1984 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1985 metadata: self
1986 .location
1987 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1988 },
1989 )
1990 }
1991
1992 pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
1993 if O::ORDERING_KIND == O2::ORDERING_KIND {
1994 Stream::new(
1995 self.location.clone(),
1996 self.ir_node.replace(HydroNode::Placeholder),
1997 )
1998 } else {
1999 panic!(
2000 "Runtime ordering {:?} did not match requested cast {:?}.",
2001 O::ORDERING_KIND,
2002 O2::ORDERING_KIND
2003 )
2004 }
2005 }
2006
2007 /// Explicitly "casts" the stream to a type with a different ordering
2008 /// guarantee. Useful in unsafe code where the ordering cannot be proven
2009 /// by the type-system.
2010 ///
2011 /// # Non-Determinism
2012 /// This function is used as an escape hatch, and any mistakes in the
2013 /// provided ordering guarantee will propagate into the guarantees
2014 /// for the rest of the program.
2015 pub fn assume_ordering<O2: Ordering>(
2016 self,
2017 _nondet: NonDet,
2018 ) -> Stream<T, L::NoConsistency, B, O2, R> {
2019 if O::ORDERING_KIND == O2::ORDERING_KIND {
2020 self.use_ordering_type().weaken_consistency()
2021 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2022 // We can always weaken the ordering guarantee
2023 let target_location = self.location().drop_consistency();
2024 Stream::new(
2025 target_location.clone(),
2026 HydroNode::Cast {
2027 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2028 metadata: target_location
2029 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2030 },
2031 )
2032 } else {
2033 let target_location = self.location().drop_consistency();
2034 Stream::new(
2035 target_location.clone(),
2036 HydroNode::ObserveNonDet {
2037 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2038 trusted: false,
2039 metadata: target_location
2040 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2041 },
2042 )
2043 }
2044 }
2045
2046 // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
2047 // intermediate states will not be revealed
2048 fn assume_ordering_trusted_bounded<O2: Ordering>(
2049 self,
2050 nondet: NonDet,
2051 ) -> Stream<T, L, B, O2, R> {
2052 if B::BOUNDED {
2053 self.assume_ordering_trusted(nondet)
2054 } else {
2055 let self_location = self.location.clone();
2056 let inner: Stream<T, L::NoConsistency, B, O2, R> = self.assume_ordering(nondet);
2057 Stream::new(self_location, inner.ir_node.replace(HydroNode::Placeholder))
2058 }
2059 }
2060
2061 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2062 // is not observable
2063 pub(crate) fn assume_ordering_trusted<O2: Ordering>(
2064 self,
2065 _nondet: NonDet,
2066 ) -> Stream<T, L, B, O2, R> {
2067 if O::ORDERING_KIND == O2::ORDERING_KIND {
2068 self.use_ordering_type()
2069 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2070 // We can always weaken the ordering guarantee
2071 Stream::new(
2072 self.location.clone(),
2073 HydroNode::Cast {
2074 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2075 metadata: self
2076 .location
2077 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2078 },
2079 )
2080 } else {
2081 Stream::new(
2082 self.location.clone(),
2083 HydroNode::ObserveNonDet {
2084 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2085 trusted: true,
2086 metadata: self
2087 .location
2088 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2089 },
2090 )
2091 }
2092 }
2093
2094 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2095 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
2096 /// which is always safe because that is the weakest possible guarantee.
2097 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2098 self.weaken_ordering::<NoOrder>()
2099 }
2100
2101 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
2102 /// enforcing that `O2` is weaker than the input ordering guarantee.
2103 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2104 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
2105 self.assume_ordering_trusted::<O2>(nondet)
2106 }
2107
2108 /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
2109 /// implies that `O == TotalOrder`.
2110 pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2111 where
2112 O: IsOrdered,
2113 {
2114 self.assume_ordering_trusted(nondet!(/** no-op */))
2115 }
2116
2117 /// Explicitly "casts" the stream to a type with a different retries
2118 /// guarantee. Useful in unsafe code where the lack of retries cannot
2119 /// be proven by the type-system.
2120 ///
2121 /// # Non-Determinism
2122 /// This function is used as an escape hatch, and any mistakes in the
2123 /// provided retries guarantee will propagate into the guarantees
2124 /// for the rest of the program.
2125 pub fn assume_retries<R2: Retries>(
2126 self,
2127 _nondet: NonDet,
2128 ) -> Stream<T, L::NoConsistency, B, O, R2> {
2129 if R::RETRIES_KIND == R2::RETRIES_KIND {
2130 Stream::new(
2131 self.location.drop_consistency(),
2132 self.ir_node.replace(HydroNode::Placeholder),
2133 )
2134 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2135 // We can always weaken the retries guarantee
2136 let target_location = self.location.drop_consistency();
2137 Stream::new(
2138 target_location.clone(),
2139 HydroNode::Cast {
2140 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2141 metadata: target_location
2142 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2143 },
2144 )
2145 } else {
2146 let target_location = self.location.drop_consistency();
2147 Stream::new(
2148 target_location.clone(),
2149 HydroNode::ObserveNonDet {
2150 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2151 trusted: false,
2152 metadata: target_location
2153 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2154 },
2155 )
2156 }
2157 }
2158
2159 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2160 // is not observable
2161 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2162 if R::RETRIES_KIND == R2::RETRIES_KIND {
2163 Stream::new(
2164 self.location.clone(),
2165 self.ir_node.replace(HydroNode::Placeholder),
2166 )
2167 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2168 // We can always weaken the retries guarantee
2169 Stream::new(
2170 self.location.clone(),
2171 HydroNode::Cast {
2172 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2173 metadata: self
2174 .location
2175 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2176 },
2177 )
2178 } else {
2179 Stream::new(
2180 self.location.clone(),
2181 HydroNode::ObserveNonDet {
2182 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2183 trusted: true,
2184 metadata: self
2185 .location
2186 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2187 },
2188 )
2189 }
2190 }
2191
2192 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2193 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
2194 /// which is always safe because that is the weakest possible guarantee.
2195 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2196 self.weaken_retries::<AtLeastOnce>()
2197 }
2198
2199 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
2200 /// enforcing that `R2` is weaker than the input retries guarantee.
2201 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2202 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
2203 self.assume_retries_trusted::<R2>(nondet)
2204 }
2205
2206 /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
2207 /// implies that `R == ExactlyOnce`.
2208 pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2209 where
2210 R: IsExactlyOnce,
2211 {
2212 self.assume_retries_trusted(nondet!(/** no-op */))
2213 }
2214
2215 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
2216 /// implies that `B == Bounded`.
2217 pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2218 where
2219 B: IsBounded,
2220 {
2221 self.weaken_boundedness()
2222 }
2223
2224 /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
2225 /// which implies that `B == Bounded`.
2226 pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2227 if B::BOUNDED == B2::BOUNDED {
2228 Stream::new(
2229 self.location.clone(),
2230 self.ir_node.replace(HydroNode::Placeholder),
2231 )
2232 } else {
2233 // We can always weaken the boundedness
2234 Stream::new(
2235 self.location.clone(),
2236 HydroNode::Cast {
2237 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2238 metadata: self
2239 .location
2240 .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2241 },
2242 )
2243 }
2244 }
2245}
2246
2247impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2248where
2249 L: Location<'a>,
2250{
2251 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2252 ///
2253 /// # Example
2254 /// ```rust
2255 /// # #[cfg(feature = "deploy")] {
2256 /// # use hydro_lang::prelude::*;
2257 /// # use futures::StreamExt;
2258 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2259 /// process.source_iter(q!(&[1, 2, 3])).cloned()
2260 /// # }, |mut stream| async move {
2261 /// // 1, 2, 3
2262 /// # for w in vec![1, 2, 3] {
2263 /// # assert_eq!(stream.next().await.unwrap(), w);
2264 /// # }
2265 /// # }));
2266 /// # }
2267 /// ```
2268 pub fn cloned(self) -> Stream<T, L, B, O, R>
2269 where
2270 T: Clone,
2271 {
2272 self.map(q!(|d| d.clone()))
2273 }
2274}
2275
2276impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2277where
2278 L: Location<'a>,
2279{
2280 /// Computes the number of elements in the stream as a [`Singleton`].
2281 ///
2282 /// # Example
2283 /// ```rust
2284 /// # #[cfg(feature = "deploy")] {
2285 /// # use hydro_lang::prelude::*;
2286 /// # use futures::StreamExt;
2287 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2288 /// let tick = process.tick();
2289 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2290 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2291 /// batch.count().all_ticks()
2292 /// # }, |mut stream| async move {
2293 /// // 4
2294 /// # assert_eq!(stream.next().await.unwrap(), 4);
2295 /// # }));
2296 /// # }
2297 /// ```
2298 pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2299 self.assume_ordering_trusted::<TotalOrder>(nondet!(
2300 /// Order does not affect eventual count, and also does not affect intermediate states.
2301 ))
2302 .fold(
2303 q!(|| 0usize),
2304 q!(
2305 |count, _| *count += 1,
2306 monotone = manual_proof!(/** += 1 is monotone */)
2307 ),
2308 )
2309 }
2310}
2311
2312impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2313 /// Produces a new stream that merges the elements of the two input streams.
2314 /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2315 ///
2316 /// Currently, both input streams must be [`Unbounded`]. When the streams are
2317 /// [`Bounded`], you can use [`Stream::chain`] instead.
2318 ///
2319 /// # Example
2320 /// ```rust
2321 /// # #[cfg(feature = "deploy")] {
2322 /// # use hydro_lang::prelude::*;
2323 /// # use futures::StreamExt;
2324 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2325 /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2326 /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2327 /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2328 /// # }, |mut stream| async move {
2329 /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2330 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2331 /// # assert_eq!(stream.next().await.unwrap(), w);
2332 /// # }
2333 /// # }));
2334 /// # }
2335 /// ```
2336 pub fn merge_unordered<O2: Ordering, R2: Retries>(
2337 self,
2338 other: Stream<T, L, Unbounded, O2, R2>,
2339 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2340 where
2341 R: MinRetries<R2>,
2342 {
2343 Stream::new(
2344 self.location.clone(),
2345 HydroNode::Chain {
2346 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2347 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2348 metadata: self.location.new_node_metadata(Stream::<
2349 T,
2350 L,
2351 Unbounded,
2352 NoOrder,
2353 <R as MinRetries<R2>>::Min,
2354 >::collection_kind()),
2355 },
2356 )
2357 }
2358
2359 /// Deprecated: use [`Stream::merge_unordered`] instead.
2360 #[deprecated(note = "use `merge_unordered` instead")]
2361 pub fn interleave<O2: Ordering, R2: Retries>(
2362 self,
2363 other: Stream<T, L, Unbounded, O2, R2>,
2364 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2365 where
2366 R: MinRetries<R2>,
2367 {
2368 self.merge_unordered(other)
2369 }
2370}
2371
2372impl<'a, T, L: Location<'a>, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
2373 /// Produces a new stream that combines the elements of the two input streams,
2374 /// preserving the relative order of elements within each input.
2375 ///
2376 /// Currently, both input streams must be [`Unbounded`]. When the streams are
2377 /// [`Bounded`], you can use [`Stream::chain`] instead.
2378 ///
2379 /// # Non-Determinism
2380 /// The order in which elements *across* the two streams will be interleaved is
2381 /// non-deterministic, so the order of elements will vary across runs. If the output order
2382 /// is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic but emits an
2383 /// unordered stream.
2384 ///
2385 /// # Example
2386 /// ```rust
2387 /// # #[cfg(feature = "deploy")] {
2388 /// # use hydro_lang::prelude::*;
2389 /// # use futures::StreamExt;
2390 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2391 /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2392 /// # process.source_iter(q!(vec![1, 3])).into();
2393 /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2394 /// # }, |mut stream| async move {
2395 /// // 1, 3 and 2, 4 in some order, preserving the original local order
2396 /// # for w in vec![1, 3, 2, 4] {
2397 /// # assert_eq!(stream.next().await.unwrap(), w);
2398 /// # }
2399 /// # }));
2400 /// # }
2401 /// ```
2402 pub fn merge_ordered<R2: Retries>(
2403 self,
2404 other: Stream<T, L, Unbounded, TotalOrder, R2>,
2405 nondet: NonDet,
2406 ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
2407 where
2408 R: MinRetries<R2>,
2409 {
2410 let self_location = self.location.clone();
2411 let inner = super::sliced::sliced! {
2412 let self_batch = use(self, nondet);
2413 let other_batch = use(other, nondet);
2414 self_batch.chain(other_batch)
2415 };
2416
2417 Stream::new(self_location, inner.ir_node.replace(HydroNode::Placeholder))
2418 }
2419}
2420
2421impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2422where
2423 L: Location<'a>,
2424{
2425 /// Produces a new stream that emits the input elements in sorted order.
2426 ///
2427 /// The input stream can have any ordering guarantee, but the output stream
2428 /// will have a [`TotalOrder`] guarantee. This operator will block until all
2429 /// elements in the input stream are available, so it requires the input stream
2430 /// to be [`Bounded`].
2431 ///
2432 /// # Example
2433 /// ```rust
2434 /// # #[cfg(feature = "deploy")] {
2435 /// # use hydro_lang::prelude::*;
2436 /// # use futures::StreamExt;
2437 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2438 /// let tick = process.tick();
2439 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2440 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2441 /// batch.sort().all_ticks()
2442 /// # }, |mut stream| async move {
2443 /// // 1, 2, 3, 4
2444 /// # for w in (1..5) {
2445 /// # assert_eq!(stream.next().await.unwrap(), w);
2446 /// # }
2447 /// # }));
2448 /// # }
2449 /// ```
2450 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2451 where
2452 B: IsBounded,
2453 T: Ord,
2454 {
2455 let this = self.make_bounded();
2456 Stream::new(
2457 this.location.clone(),
2458 HydroNode::Sort {
2459 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2460 metadata: this
2461 .location
2462 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2463 },
2464 )
2465 }
2466
2467 /// Produces a new stream that first emits the elements of the `self` stream,
2468 /// and then emits the elements of the `other` stream. The output stream has
2469 /// a [`TotalOrder`] guarantee if and only if both input streams have a
2470 /// [`TotalOrder`] guarantee.
2471 ///
2472 /// Currently, both input streams must be [`Bounded`]. This operator will block
2473 /// on the first stream until all its elements are available. In a future version,
2474 /// we will relax the requirement on the `other` stream.
2475 ///
2476 /// # Example
2477 /// ```rust
2478 /// # #[cfg(feature = "deploy")] {
2479 /// # use hydro_lang::prelude::*;
2480 /// # use futures::StreamExt;
2481 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2482 /// let tick = process.tick();
2483 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2484 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2485 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2486 /// # }, |mut stream| async move {
2487 /// // 2, 3, 4, 5, 1, 2, 3, 4
2488 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2489 /// # assert_eq!(stream.next().await.unwrap(), w);
2490 /// # }
2491 /// # }));
2492 /// # }
2493 /// ```
2494 pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2495 self,
2496 other: Stream<T, L, B2, O2, R2>,
2497 ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2498 where
2499 B: IsBounded,
2500 O: MinOrder<O2>,
2501 R: MinRetries<R2>,
2502 {
2503 check_matching_location(&self.location, &other.location);
2504
2505 Stream::new(
2506 self.location.clone(),
2507 HydroNode::Chain {
2508 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2509 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2510 metadata: self.location.new_node_metadata(Stream::<
2511 T,
2512 L,
2513 B2,
2514 <O as MinOrder<O2>>::Min,
2515 <R as MinRetries<R2>>::Min,
2516 >::collection_kind()),
2517 },
2518 )
2519 }
2520
2521 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2522 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2523 /// because this is compiled into a nested loop.
2524 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
2525 self,
2526 other: Stream<T2, L, Bounded, O2, R>,
2527 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
2528 where
2529 B: IsBounded,
2530 T: Clone,
2531 T2: Clone,
2532 {
2533 let this = self.make_bounded();
2534 check_matching_location(&this.location, &other.location);
2535
2536 Stream::new(
2537 this.location.clone(),
2538 HydroNode::CrossProduct {
2539 left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2540 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2541 metadata: this.location.new_node_metadata(Stream::<
2542 (T, T2),
2543 L,
2544 Bounded,
2545 <O2 as MinOrder<O>>::Min,
2546 R,
2547 >::collection_kind()),
2548 },
2549 )
2550 }
2551
2552 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2553 /// `self` used as the values for *each* key.
2554 ///
2555 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2556 /// values. For example, it can be used to send the same set of elements to several cluster
2557 /// members, if the membership information is available as a [`KeyedSingleton`].
2558 ///
2559 /// # Example
2560 /// ```rust
2561 /// # #[cfg(feature = "deploy")] {
2562 /// # use hydro_lang::prelude::*;
2563 /// # use futures::StreamExt;
2564 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2565 /// # let tick = process.tick();
2566 /// let keyed_singleton = // { 1: (), 2: () }
2567 /// # process
2568 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
2569 /// # .into_keyed()
2570 /// # .batch(&tick, nondet!(/** test */))
2571 /// # .first();
2572 /// let stream = // [ "a", "b" ]
2573 /// # process
2574 /// # .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2575 /// # .batch(&tick, nondet!(/** test */));
2576 /// stream.repeat_with_keys(keyed_singleton)
2577 /// # .entries().all_ticks()
2578 /// # }, |mut stream| async move {
2579 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2580 /// # let mut results = Vec::new();
2581 /// # for _ in 0..4 {
2582 /// # results.push(stream.next().await.unwrap());
2583 /// # }
2584 /// # results.sort();
2585 /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2586 /// # }));
2587 /// # }
2588 /// ```
2589 pub fn repeat_with_keys<K, V2>(
2590 self,
2591 keys: KeyedSingleton<K, V2, L, Bounded>,
2592 ) -> KeyedStream<K, T, L, Bounded, O, R>
2593 where
2594 B: IsBounded,
2595 K: Clone,
2596 T: Clone,
2597 {
2598 keys.keys()
2599 .weaken_retries()
2600 .assume_ordering_trusted::<TotalOrder>(
2601 nondet!(/** keyed stream does not depend on ordering of keys */),
2602 )
2603 .cross_product_nested_loop(self.make_bounded())
2604 .into_keyed()
2605 }
2606
2607 /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2608 /// execution until all results are available. The output order is based on when futures
2609 /// complete, and may be different than the input order.
2610 ///
2611 /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2612 /// while futures are pending, this variant blocks until the futures resolve.
2613 ///
2614 /// # Example
2615 /// ```rust
2616 /// # #[cfg(feature = "deploy")] {
2617 /// # use std::collections::HashSet;
2618 /// # use futures::StreamExt;
2619 /// # use hydro_lang::prelude::*;
2620 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2621 /// process
2622 /// .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2623 /// .map(q!(|x| async move {
2624 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2625 /// x
2626 /// }))
2627 /// .resolve_futures_blocking()
2628 /// # },
2629 /// # |mut stream| async move {
2630 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2631 /// # let mut output = HashSet::new();
2632 /// # for _ in 1..10 {
2633 /// # output.insert(stream.next().await.unwrap());
2634 /// # }
2635 /// # assert_eq!(
2636 /// # output,
2637 /// # HashSet::<i32>::from_iter(1..10)
2638 /// # );
2639 /// # },
2640 /// # ));
2641 /// # }
2642 /// ```
2643 pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2644 where
2645 T: Future,
2646 {
2647 Stream::new(
2648 self.location.clone(),
2649 HydroNode::ResolveFuturesBlocking {
2650 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2651 metadata: self
2652 .location
2653 .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2654 },
2655 )
2656 }
2657
2658 /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2659 ///
2660 /// # Example
2661 /// ```rust
2662 /// # #[cfg(feature = "deploy")] {
2663 /// # use hydro_lang::prelude::*;
2664 /// # use futures::StreamExt;
2665 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2666 /// let tick = process.tick();
2667 /// let empty: Stream<i32, _, Bounded> = process
2668 /// .source_iter(q!(Vec::<i32>::new()))
2669 /// .batch(&tick, nondet!(/** test */));
2670 /// empty.is_empty().all_ticks()
2671 /// # }, |mut stream| async move {
2672 /// // true
2673 /// # assert_eq!(stream.next().await.unwrap(), true);
2674 /// # }));
2675 /// # }
2676 /// ```
2677 #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2678 pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2679 where
2680 B: IsBounded,
2681 {
2682 self.make_bounded()
2683 .assume_ordering_trusted::<TotalOrder>(
2684 nondet!(/** is_empty intermediates unaffected by order */),
2685 )
2686 .first()
2687 .is_none()
2688 }
2689}
2690
2691impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2692where
2693 L: Location<'a>,
2694{
2695 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2696 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2697 /// by equi-joining the two streams on the key attribute `K`.
2698 ///
2699 /// When the right-hand side is [`Bounded`], the join accumulates the right side first
2700 /// and streams the left side through, preserving the left side's ordering. When both
2701 /// sides are [`Unbounded`], a symmetric hash join is used and ordering is [`NoOrder`].
2702 ///
2703 /// # Example
2704 /// ```rust
2705 /// # #[cfg(feature = "deploy")] {
2706 /// # use hydro_lang::prelude::*;
2707 /// # use std::collections::HashSet;
2708 /// # use futures::StreamExt;
2709 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2710 /// let tick = process.tick();
2711 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2712 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2713 /// stream1.join(stream2)
2714 /// # }, |mut stream| async move {
2715 /// // (1, ('a', 'x')), (2, ('b', 'y'))
2716 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2717 /// # stream.map(|i| assert!(expected.contains(&i)));
2718 /// # }));
2719 /// # }
2720 pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2721 self,
2722 n: Stream<(K, V2), L, B2, O2, R2>,
2723 ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2724 where
2725 K: Eq + Hash + Clone,
2726 R: MinRetries<R2>,
2727 V1: Clone,
2728 V2: Clone,
2729 {
2730 check_matching_location(&self.location, &n.location);
2731
2732 let ir_node = if B2::BOUNDED {
2733 HydroNode::JoinHalf {
2734 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2735 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2736 metadata: self.location.new_node_metadata(Stream::<
2737 (K, (V1, V2)),
2738 L,
2739 B,
2740 B2::PreserveOrderIfBounded<O>,
2741 <R as MinRetries<R2>>::Min,
2742 >::collection_kind()),
2743 }
2744 } else {
2745 HydroNode::Join {
2746 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2747 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2748 metadata: self.location.new_node_metadata(Stream::<
2749 (K, (V1, V2)),
2750 L,
2751 B,
2752 B2::PreserveOrderIfBounded<O>,
2753 <R as MinRetries<R2>>::Min,
2754 >::collection_kind()),
2755 }
2756 };
2757
2758 Stream::new(self.location.clone(), ir_node)
2759 }
2760
2761 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2762 /// computes the anti-join of the items in the input -- i.e. returns
2763 /// unique items in the first input that do not have a matching key
2764 /// in the second input.
2765 ///
2766 /// # Example
2767 /// ```rust
2768 /// # #[cfg(feature = "deploy")] {
2769 /// # use hydro_lang::prelude::*;
2770 /// # use futures::StreamExt;
2771 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2772 /// let tick = process.tick();
2773 /// let stream = process
2774 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2775 /// .batch(&tick, nondet!(/** test */));
2776 /// let batch = process
2777 /// .source_iter(q!(vec![1, 2]))
2778 /// .batch(&tick, nondet!(/** test */));
2779 /// stream.anti_join(batch).all_ticks()
2780 /// # }, |mut stream| async move {
2781 /// # for w in vec![(3, 'c'), (4, 'd')] {
2782 /// # assert_eq!(stream.next().await.unwrap(), w);
2783 /// # }
2784 /// # }));
2785 /// # }
2786 pub fn anti_join<O2: Ordering, R2: Retries>(
2787 self,
2788 n: Stream<K, L, Bounded, O2, R2>,
2789 ) -> Stream<(K, V1), L, B, O, R>
2790 where
2791 K: Eq + Hash,
2792 {
2793 check_matching_location(&self.location, &n.location);
2794
2795 Stream::new(
2796 self.location.clone(),
2797 HydroNode::AntiJoin {
2798 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2799 neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2800 metadata: self
2801 .location
2802 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2803 },
2804 )
2805 }
2806}
2807
2808impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2809 Stream<(K, V), L, B, O, R>
2810{
2811 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2812 /// is used as the key and the second element is added to the entries associated with that key.
2813 ///
2814 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2815 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2816 /// performing grouped aggregations, but also for more precise ordering guarantees such as
2817 /// total ordering _within_ each group but no ordering _across_ groups.
2818 ///
2819 /// # Example
2820 /// ```rust
2821 /// # #[cfg(feature = "deploy")] {
2822 /// # use hydro_lang::prelude::*;
2823 /// # use futures::StreamExt;
2824 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2825 /// process
2826 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2827 /// .into_keyed()
2828 /// # .entries()
2829 /// # }, |mut stream| async move {
2830 /// // { 1: [2, 3], 2: [4] }
2831 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2832 /// # assert_eq!(stream.next().await.unwrap(), w);
2833 /// # }
2834 /// # }));
2835 /// # }
2836 /// ```
2837 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2838 KeyedStream::new(
2839 self.location.clone(),
2840 HydroNode::Cast {
2841 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2842 metadata: self
2843 .location
2844 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2845 },
2846 )
2847 }
2848}
2849
2850impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2851where
2852 K: Eq + Hash,
2853 L: Location<'a>,
2854{
2855 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2856 /// # Example
2857 /// ```rust
2858 /// # #[cfg(feature = "deploy")] {
2859 /// # use hydro_lang::prelude::*;
2860 /// # use futures::StreamExt;
2861 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2862 /// let tick = process.tick();
2863 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2864 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2865 /// batch.keys().all_ticks()
2866 /// # }, |mut stream| async move {
2867 /// // 1, 2
2868 /// # assert_eq!(stream.next().await.unwrap(), 1);
2869 /// # assert_eq!(stream.next().await.unwrap(), 2);
2870 /// # }));
2871 /// # }
2872 /// ```
2873 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2874 self.into_keyed()
2875 .fold(
2876 q!(|| ()),
2877 q!(
2878 |_, _| {},
2879 commutative = manual_proof!(/** values are ignored */),
2880 idempotent = manual_proof!(/** values are ignored */)
2881 ),
2882 )
2883 .keys()
2884 }
2885}
2886
2887impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2888where
2889 L: Location<'a>,
2890{
2891 /// Returns a stream corresponding to the latest batch of elements being atomically
2892 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2893 /// the order of the input.
2894 ///
2895 /// # Non-Determinism
2896 /// The batch boundaries are non-deterministic and may change across executions.
2897 pub fn batch_atomic<L2: Location<'a, NoConsistency = L::NoConsistency>>(
2898 self,
2899 tick: &Tick<L2>,
2900 _nondet: NonDet,
2901 ) -> Stream<T, Tick<L::NoConsistency>, Bounded, O, R> {
2902 Stream::new(
2903 tick.drop_consistency(),
2904 HydroNode::Batch {
2905 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2906 metadata: tick
2907 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2908 },
2909 )
2910 }
2911
2912 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2913 /// See [`Stream::atomic`] for more details.
2914 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2915 Stream::new(
2916 self.location.tick.l.clone(),
2917 HydroNode::EndAtomic {
2918 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2919 metadata: self
2920 .location
2921 .tick
2922 .l
2923 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2924 },
2925 )
2926 }
2927}
2928
2929impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2930where
2931 L: Location<'a> + NoAtomic,
2932 F: Future<Output = T>,
2933{
2934 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2935 /// Future outputs are produced as available, regardless of input arrival order.
2936 ///
2937 /// # Example
2938 /// ```rust
2939 /// # #[cfg(feature = "deploy")] {
2940 /// # use std::collections::HashSet;
2941 /// # use futures::StreamExt;
2942 /// # use hydro_lang::prelude::*;
2943 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2944 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2945 /// .map(q!(|x| async move {
2946 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2947 /// x
2948 /// }))
2949 /// .resolve_futures()
2950 /// # },
2951 /// # |mut stream| async move {
2952 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2953 /// # let mut output = HashSet::new();
2954 /// # for _ in 1..10 {
2955 /// # output.insert(stream.next().await.unwrap());
2956 /// # }
2957 /// # assert_eq!(
2958 /// # output,
2959 /// # HashSet::<i32>::from_iter(1..10)
2960 /// # );
2961 /// # },
2962 /// # ));
2963 /// # }
2964 pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2965 Stream::new(
2966 self.location.clone(),
2967 HydroNode::ResolveFutures {
2968 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2969 metadata: self
2970 .location
2971 .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2972 },
2973 )
2974 }
2975
2976 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2977 /// Future outputs are produced in the same order as the input stream.
2978 ///
2979 /// # Example
2980 /// ```rust
2981 /// # #[cfg(feature = "deploy")] {
2982 /// # use std::collections::HashSet;
2983 /// # use futures::StreamExt;
2984 /// # use hydro_lang::prelude::*;
2985 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2986 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2987 /// .map(q!(|x| async move {
2988 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2989 /// x
2990 /// }))
2991 /// .resolve_futures_ordered()
2992 /// # },
2993 /// # |mut stream| async move {
2994 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2995 /// # let mut output = Vec::new();
2996 /// # for _ in 1..10 {
2997 /// # output.push(stream.next().await.unwrap());
2998 /// # }
2999 /// # assert_eq!(
3000 /// # output,
3001 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
3002 /// # );
3003 /// # },
3004 /// # ));
3005 /// # }
3006 pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
3007 Stream::new(
3008 self.location.clone(),
3009 HydroNode::ResolveFuturesOrdered {
3010 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3011 metadata: self
3012 .location
3013 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3014 },
3015 )
3016 }
3017}
3018
3019impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
3020where
3021 L: Location<'a>,
3022{
3023 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
3024 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3025 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
3026 Stream::new(
3027 self.location.outer().clone(),
3028 HydroNode::YieldConcat {
3029 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3030 metadata: self
3031 .location
3032 .outer()
3033 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3034 },
3035 )
3036 }
3037
3038 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
3039 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3040 ///
3041 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
3042 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
3043 /// stream's [`Tick`] context.
3044 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
3045 let out_location = Atomic {
3046 tick: self.location.clone(),
3047 };
3048
3049 Stream::new(
3050 out_location.clone(),
3051 HydroNode::YieldConcat {
3052 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3053 metadata: out_location
3054 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
3055 },
3056 )
3057 }
3058
3059 /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
3060 /// such as `fold` retrain their memory across ticks rather than resetting across batches of
3061 /// input.
3062 ///
3063 /// This API is particularly useful for stateful computation on batches of data, such as
3064 /// maintaining an accumulated state that is up to date with the current batch.
3065 ///
3066 /// # Example
3067 /// ```rust
3068 /// # #[cfg(feature = "deploy")] {
3069 /// # use hydro_lang::prelude::*;
3070 /// # use futures::StreamExt;
3071 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3072 /// let tick = process.tick();
3073 /// # // ticks are lazy by default, forces the second tick to run
3074 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3075 /// # let batch_first_tick = process
3076 /// # .source_iter(q!(vec![1, 2, 3, 4]))
3077 /// # .batch(&tick, nondet!(/** test */));
3078 /// # let batch_second_tick = process
3079 /// # .source_iter(q!(vec![5, 6, 7]))
3080 /// # .batch(&tick, nondet!(/** test */))
3081 /// # .defer_tick(); // appears on the second tick
3082 /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
3083 /// # batch_first_tick.chain(batch_second_tick).all_ticks();
3084 ///
3085 /// input.batch(&tick, nondet!(/** test */))
3086 /// .across_ticks(|s| s.count()).all_ticks()
3087 /// # }, |mut stream| async move {
3088 /// // [4, 7]
3089 /// assert_eq!(stream.next().await.unwrap(), 4);
3090 /// assert_eq!(stream.next().await.unwrap(), 7);
3091 /// # }));
3092 /// # }
3093 /// ```
3094 pub fn across_ticks<Out: BatchAtomic<'a>>(
3095 self,
3096 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3097 ) -> Out::Batched {
3098 thunk(self.all_ticks_atomic()).batched_atomic()
3099 }
3100
3101 /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
3102 /// always has the elements of `self` at tick `T - 1`.
3103 ///
3104 /// At tick `0`, the output stream is empty, since there is no previous tick.
3105 ///
3106 /// This operator enables stateful iterative processing with ticks, by sending data from one
3107 /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
3108 ///
3109 /// # Example
3110 /// ```rust
3111 /// # #[cfg(feature = "deploy")] {
3112 /// # use hydro_lang::prelude::*;
3113 /// # use futures::StreamExt;
3114 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3115 /// let tick = process.tick();
3116 /// // ticks are lazy by default, forces the second tick to run
3117 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3118 ///
3119 /// let batch_first_tick = process
3120 /// .source_iter(q!(vec![1, 2, 3, 4]))
3121 /// .batch(&tick, nondet!(/** test */));
3122 /// let batch_second_tick = process
3123 /// .source_iter(q!(vec![0, 3, 4, 5, 6]))
3124 /// .batch(&tick, nondet!(/** test */))
3125 /// .defer_tick(); // appears on the second tick
3126 /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
3127 ///
3128 /// changes_across_ticks.clone().filter_not_in(
3129 /// changes_across_ticks.defer_tick() // the elements from the previous tick
3130 /// ).all_ticks()
3131 /// # }, |mut stream| async move {
3132 /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
3133 /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
3134 /// # assert_eq!(stream.next().await.unwrap(), w);
3135 /// # }
3136 /// # }));
3137 /// # }
3138 /// ```
3139 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3140 Stream::new(
3141 self.location.clone(),
3142 HydroNode::DeferTick {
3143 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3144 metadata: self
3145 .location
3146 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3147 },
3148 )
3149 }
3150}
3151
3152#[cfg(test)]
3153mod tests {
3154 #[cfg(feature = "deploy")]
3155 use futures::{SinkExt, StreamExt};
3156 #[cfg(feature = "deploy")]
3157 use hydro_deploy::Deployment;
3158 #[cfg(feature = "deploy")]
3159 use serde::{Deserialize, Serialize};
3160 #[cfg(any(feature = "deploy", feature = "sim"))]
3161 use stageleft::q;
3162
3163 #[cfg(any(feature = "deploy", feature = "sim"))]
3164 use crate::compile::builder::FlowBuilder;
3165 #[cfg(feature = "deploy")]
3166 use crate::live_collections::sliced::sliced;
3167 #[cfg(feature = "deploy")]
3168 use crate::live_collections::stream::ExactlyOnce;
3169 #[cfg(feature = "sim")]
3170 use crate::live_collections::stream::NoOrder;
3171 #[cfg(any(feature = "deploy", feature = "sim"))]
3172 use crate::live_collections::stream::TotalOrder;
3173 #[cfg(any(feature = "deploy", feature = "sim"))]
3174 use crate::location::Location;
3175 #[cfg(feature = "sim")]
3176 use crate::networking::TCP;
3177 #[cfg(any(feature = "deploy", feature = "sim"))]
3178 use crate::nondet::nondet;
3179
3180 mod backtrace_chained_ops;
3181
3182 #[cfg(feature = "deploy")]
3183 struct P1 {}
3184 #[cfg(feature = "deploy")]
3185 struct P2 {}
3186
3187 #[cfg(feature = "deploy")]
3188 #[derive(Serialize, Deserialize, Debug)]
3189 struct SendOverNetwork {
3190 n: u32,
3191 }
3192
3193 #[cfg(feature = "deploy")]
3194 #[tokio::test]
3195 async fn first_ten_distributed() {
3196 use crate::networking::TCP;
3197
3198 let mut deployment = Deployment::new();
3199
3200 let mut flow = FlowBuilder::new();
3201 let first_node = flow.process::<P1>();
3202 let second_node = flow.process::<P2>();
3203 let external = flow.external::<P2>();
3204
3205 let numbers = first_node.source_iter(q!(0..10));
3206 let out_port = numbers
3207 .map(q!(|n| SendOverNetwork { n }))
3208 .send(&second_node, TCP.fail_stop().bincode())
3209 .send_bincode_external(&external);
3210
3211 let nodes = flow
3212 .with_process(&first_node, deployment.Localhost())
3213 .with_process(&second_node, deployment.Localhost())
3214 .with_external(&external, deployment.Localhost())
3215 .deploy(&mut deployment);
3216
3217 deployment.deploy().await.unwrap();
3218
3219 let mut external_out = nodes.connect(out_port).await;
3220
3221 deployment.start().await.unwrap();
3222
3223 for i in 0..10 {
3224 assert_eq!(external_out.next().await.unwrap().n, i);
3225 }
3226 }
3227
3228 #[cfg(feature = "deploy")]
3229 #[tokio::test]
3230 async fn first_cardinality() {
3231 let mut deployment = Deployment::new();
3232
3233 let mut flow = FlowBuilder::new();
3234 let node = flow.process::<()>();
3235 let external = flow.external::<()>();
3236
3237 let node_tick = node.tick();
3238 let count = node_tick
3239 .singleton(q!([1, 2, 3]))
3240 .into_stream()
3241 .flatten_ordered()
3242 .first()
3243 .into_stream()
3244 .count()
3245 .all_ticks()
3246 .send_bincode_external(&external);
3247
3248 let nodes = flow
3249 .with_process(&node, deployment.Localhost())
3250 .with_external(&external, deployment.Localhost())
3251 .deploy(&mut deployment);
3252
3253 deployment.deploy().await.unwrap();
3254
3255 let mut external_out = nodes.connect(count).await;
3256
3257 deployment.start().await.unwrap();
3258
3259 assert_eq!(external_out.next().await.unwrap(), 1);
3260 }
3261
3262 #[cfg(feature = "deploy")]
3263 #[tokio::test]
3264 async fn unbounded_reduce_remembers_state() {
3265 let mut deployment = Deployment::new();
3266
3267 let mut flow = FlowBuilder::new();
3268 let node = flow.process::<()>();
3269 let external = flow.external::<()>();
3270
3271 let (input_port, input) = node.source_external_bincode(&external);
3272 let out = input
3273 .reduce(q!(|acc, v| *acc += v))
3274 .sample_eager(nondet!(/** test */))
3275 .send_bincode_external(&external);
3276
3277 let nodes = flow
3278 .with_process(&node, deployment.Localhost())
3279 .with_external(&external, deployment.Localhost())
3280 .deploy(&mut deployment);
3281
3282 deployment.deploy().await.unwrap();
3283
3284 let mut external_in = nodes.connect(input_port).await;
3285 let mut external_out = nodes.connect(out).await;
3286
3287 deployment.start().await.unwrap();
3288
3289 external_in.send(1).await.unwrap();
3290 assert_eq!(external_out.next().await.unwrap(), 1);
3291
3292 external_in.send(2).await.unwrap();
3293 assert_eq!(external_out.next().await.unwrap(), 3);
3294 }
3295
3296 #[cfg(feature = "deploy")]
3297 #[tokio::test]
3298 async fn top_level_bounded_cross_singleton() {
3299 let mut deployment = Deployment::new();
3300
3301 let mut flow = FlowBuilder::new();
3302 let node = flow.process::<()>();
3303 let external = flow.external::<()>();
3304
3305 let (input_port, input) =
3306 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3307
3308 let out = input
3309 .cross_singleton(
3310 node.source_iter(q!(vec![1, 2, 3]))
3311 .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3312 )
3313 .send_bincode_external(&external);
3314
3315 let nodes = flow
3316 .with_process(&node, deployment.Localhost())
3317 .with_external(&external, deployment.Localhost())
3318 .deploy(&mut deployment);
3319
3320 deployment.deploy().await.unwrap();
3321
3322 let mut external_in = nodes.connect(input_port).await;
3323 let mut external_out = nodes.connect(out).await;
3324
3325 deployment.start().await.unwrap();
3326
3327 external_in.send(1).await.unwrap();
3328 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3329
3330 external_in.send(2).await.unwrap();
3331 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3332 }
3333
3334 #[cfg(feature = "deploy")]
3335 #[tokio::test]
3336 async fn top_level_bounded_reduce_cardinality() {
3337 let mut deployment = Deployment::new();
3338
3339 let mut flow = FlowBuilder::new();
3340 let node = flow.process::<()>();
3341 let external = flow.external::<()>();
3342
3343 let (input_port, input) =
3344 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3345
3346 let out = sliced! {
3347 let input = use(input, nondet!(/** test */));
3348 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3349 input.cross_singleton(v.into_stream().count())
3350 }
3351 .send_bincode_external(&external);
3352
3353 let nodes = flow
3354 .with_process(&node, deployment.Localhost())
3355 .with_external(&external, deployment.Localhost())
3356 .deploy(&mut deployment);
3357
3358 deployment.deploy().await.unwrap();
3359
3360 let mut external_in = nodes.connect(input_port).await;
3361 let mut external_out = nodes.connect(out).await;
3362
3363 deployment.start().await.unwrap();
3364
3365 external_in.send(1).await.unwrap();
3366 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3367
3368 external_in.send(2).await.unwrap();
3369 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3370 }
3371
3372 #[cfg(feature = "deploy")]
3373 #[tokio::test]
3374 async fn top_level_bounded_into_singleton_cardinality() {
3375 let mut deployment = Deployment::new();
3376
3377 let mut flow = FlowBuilder::new();
3378 let node = flow.process::<()>();
3379 let external = flow.external::<()>();
3380
3381 let (input_port, input) =
3382 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3383
3384 let out = sliced! {
3385 let input = use(input, nondet!(/** test */));
3386 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3387 input.cross_singleton(v.into_stream().count())
3388 }
3389 .send_bincode_external(&external);
3390
3391 let nodes = flow
3392 .with_process(&node, deployment.Localhost())
3393 .with_external(&external, deployment.Localhost())
3394 .deploy(&mut deployment);
3395
3396 deployment.deploy().await.unwrap();
3397
3398 let mut external_in = nodes.connect(input_port).await;
3399 let mut external_out = nodes.connect(out).await;
3400
3401 deployment.start().await.unwrap();
3402
3403 external_in.send(1).await.unwrap();
3404 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3405
3406 external_in.send(2).await.unwrap();
3407 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3408 }
3409
3410 #[cfg(feature = "deploy")]
3411 #[tokio::test]
3412 async fn atomic_fold_replays_each_tick() {
3413 let mut deployment = Deployment::new();
3414
3415 let mut flow = FlowBuilder::new();
3416 let node = flow.process::<()>();
3417 let external = flow.external::<()>();
3418
3419 let (input_port, input) =
3420 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3421 let tick = node.tick();
3422
3423 let out = input
3424 .batch(&tick, nondet!(/** test */))
3425 .cross_singleton(
3426 node.source_iter(q!(vec![1, 2, 3]))
3427 .atomic()
3428 .fold(q!(|| 0), q!(|acc, v| *acc += v))
3429 .snapshot_atomic(&tick, nondet!(/** test */)),
3430 )
3431 .all_ticks()
3432 .send_bincode_external(&external);
3433
3434 let nodes = flow
3435 .with_process(&node, deployment.Localhost())
3436 .with_external(&external, deployment.Localhost())
3437 .deploy(&mut deployment);
3438
3439 deployment.deploy().await.unwrap();
3440
3441 let mut external_in = nodes.connect(input_port).await;
3442 let mut external_out = nodes.connect(out).await;
3443
3444 deployment.start().await.unwrap();
3445
3446 external_in.send(1).await.unwrap();
3447 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3448
3449 external_in.send(2).await.unwrap();
3450 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3451 }
3452
3453 #[cfg(feature = "deploy")]
3454 #[tokio::test]
3455 async fn unbounded_scan_remembers_state() {
3456 let mut deployment = Deployment::new();
3457
3458 let mut flow = FlowBuilder::new();
3459 let node = flow.process::<()>();
3460 let external = flow.external::<()>();
3461
3462 let (input_port, input) = node.source_external_bincode(&external);
3463 let out = input
3464 .scan(
3465 q!(|| 0),
3466 q!(|acc, v| {
3467 *acc += v;
3468 Some(*acc)
3469 }),
3470 )
3471 .send_bincode_external(&external);
3472
3473 let nodes = flow
3474 .with_process(&node, deployment.Localhost())
3475 .with_external(&external, deployment.Localhost())
3476 .deploy(&mut deployment);
3477
3478 deployment.deploy().await.unwrap();
3479
3480 let mut external_in = nodes.connect(input_port).await;
3481 let mut external_out = nodes.connect(out).await;
3482
3483 deployment.start().await.unwrap();
3484
3485 external_in.send(1).await.unwrap();
3486 assert_eq!(external_out.next().await.unwrap(), 1);
3487
3488 external_in.send(2).await.unwrap();
3489 assert_eq!(external_out.next().await.unwrap(), 3);
3490 }
3491
3492 #[cfg(feature = "deploy")]
3493 #[tokio::test]
3494 async fn unbounded_enumerate_remembers_state() {
3495 let mut deployment = Deployment::new();
3496
3497 let mut flow = FlowBuilder::new();
3498 let node = flow.process::<()>();
3499 let external = flow.external::<()>();
3500
3501 let (input_port, input) = node.source_external_bincode(&external);
3502 let out = input.enumerate().send_bincode_external(&external);
3503
3504 let nodes = flow
3505 .with_process(&node, deployment.Localhost())
3506 .with_external(&external, deployment.Localhost())
3507 .deploy(&mut deployment);
3508
3509 deployment.deploy().await.unwrap();
3510
3511 let mut external_in = nodes.connect(input_port).await;
3512 let mut external_out = nodes.connect(out).await;
3513
3514 deployment.start().await.unwrap();
3515
3516 external_in.send(1).await.unwrap();
3517 assert_eq!(external_out.next().await.unwrap(), (0, 1));
3518
3519 external_in.send(2).await.unwrap();
3520 assert_eq!(external_out.next().await.unwrap(), (1, 2));
3521 }
3522
3523 #[cfg(feature = "deploy")]
3524 #[tokio::test]
3525 async fn unbounded_unique_remembers_state() {
3526 let mut deployment = Deployment::new();
3527
3528 let mut flow = FlowBuilder::new();
3529 let node = flow.process::<()>();
3530 let external = flow.external::<()>();
3531
3532 let (input_port, input) =
3533 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3534 let out = input.unique().send_bincode_external(&external);
3535
3536 let nodes = flow
3537 .with_process(&node, deployment.Localhost())
3538 .with_external(&external, deployment.Localhost())
3539 .deploy(&mut deployment);
3540
3541 deployment.deploy().await.unwrap();
3542
3543 let mut external_in = nodes.connect(input_port).await;
3544 let mut external_out = nodes.connect(out).await;
3545
3546 deployment.start().await.unwrap();
3547
3548 external_in.send(1).await.unwrap();
3549 assert_eq!(external_out.next().await.unwrap(), 1);
3550
3551 external_in.send(2).await.unwrap();
3552 assert_eq!(external_out.next().await.unwrap(), 2);
3553
3554 external_in.send(1).await.unwrap();
3555 external_in.send(3).await.unwrap();
3556 assert_eq!(external_out.next().await.unwrap(), 3);
3557 }
3558
3559 #[cfg(feature = "sim")]
3560 #[test]
3561 #[should_panic]
3562 fn sim_batch_nondet_size() {
3563 let mut flow = FlowBuilder::new();
3564 let node = flow.process::<()>();
3565
3566 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3567
3568 let tick = node.tick();
3569 let out_recv = input
3570 .batch(&tick, nondet!(/** test */))
3571 .count()
3572 .all_ticks()
3573 .sim_output();
3574
3575 flow.sim().exhaustive(async || {
3576 in_send.send(());
3577 in_send.send(());
3578 in_send.send(());
3579
3580 assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3581 });
3582 }
3583
3584 #[cfg(feature = "sim")]
3585 #[test]
3586 fn sim_batch_preserves_order() {
3587 let mut flow = FlowBuilder::new();
3588 let node = flow.process::<()>();
3589
3590 let (in_send, input) = node.sim_input();
3591
3592 let tick = node.tick();
3593 let out_recv = input
3594 .batch(&tick, nondet!(/** test */))
3595 .all_ticks()
3596 .sim_output();
3597
3598 flow.sim().exhaustive(async || {
3599 in_send.send(1);
3600 in_send.send(2);
3601 in_send.send(3);
3602
3603 out_recv.assert_yields_only([1, 2, 3]).await;
3604 });
3605 }
3606
3607 #[cfg(feature = "sim")]
3608 #[test]
3609 #[should_panic]
3610 fn sim_batch_unordered_shuffles() {
3611 let mut flow = FlowBuilder::new();
3612 let node = flow.process::<()>();
3613
3614 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3615
3616 let tick = node.tick();
3617 let batch = input.batch(&tick, nondet!(/** test */));
3618 let out_recv = batch
3619 .clone()
3620 .min()
3621 .zip(batch.max())
3622 .all_ticks()
3623 .sim_output();
3624
3625 flow.sim().exhaustive(async || {
3626 in_send.send_many_unordered([1, 2, 3]);
3627
3628 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3629 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3630 }
3631 });
3632 }
3633
3634 #[cfg(feature = "sim")]
3635 #[test]
3636 fn sim_batch_unordered_shuffles_count() {
3637 let mut flow = FlowBuilder::new();
3638 let node = flow.process::<()>();
3639
3640 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3641
3642 let tick = node.tick();
3643 let batch = input.batch(&tick, nondet!(/** test */));
3644 let out_recv = batch.all_ticks().sim_output();
3645
3646 let instance_count = flow.sim().exhaustive(async || {
3647 in_send.send_many_unordered([1, 2, 3, 4]);
3648 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3649 });
3650
3651 assert_eq!(
3652 instance_count,
3653 75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3654 )
3655 }
3656
3657 #[cfg(feature = "sim")]
3658 #[test]
3659 #[should_panic]
3660 fn sim_observe_order_batched() {
3661 let mut flow = FlowBuilder::new();
3662 let node = flow.process::<()>();
3663
3664 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3665
3666 let tick = node.tick();
3667 let batch = input.batch(&tick, nondet!(/** test */));
3668 let out_recv = batch
3669 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3670 .all_ticks()
3671 .sim_output();
3672
3673 flow.sim().exhaustive(async || {
3674 in_send.send_many_unordered([1, 2, 3, 4]);
3675 out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3676 });
3677 }
3678
3679 #[cfg(feature = "sim")]
3680 #[test]
3681 fn sim_observe_order_batched_count() {
3682 let mut flow = FlowBuilder::new();
3683 let node = flow.process::<()>();
3684
3685 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3686
3687 let tick = node.tick();
3688 let batch = input.batch(&tick, nondet!(/** test */));
3689 let out_recv = batch
3690 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3691 .all_ticks()
3692 .sim_output();
3693
3694 let instance_count = flow.sim().exhaustive(async || {
3695 in_send.send_many_unordered([1, 2, 3, 4]);
3696 let _ = out_recv.collect::<Vec<_>>().await;
3697 });
3698
3699 assert_eq!(
3700 instance_count,
3701 192 // 4! * 2^{4 - 1}
3702 )
3703 }
3704
3705 #[cfg(feature = "sim")]
3706 #[test]
3707 fn sim_unordered_count_instance_count() {
3708 let mut flow = FlowBuilder::new();
3709 let node = flow.process::<()>();
3710
3711 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3712
3713 let tick = node.tick();
3714 let out_recv = input
3715 .count()
3716 .snapshot(&tick, nondet!(/** test */))
3717 .all_ticks()
3718 .sim_output();
3719
3720 let instance_count = flow.sim().exhaustive(async || {
3721 in_send.send_many_unordered([1, 2, 3, 4]);
3722 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3723 });
3724
3725 assert_eq!(
3726 instance_count,
3727 16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3728 )
3729 }
3730
3731 #[cfg(feature = "sim")]
3732 #[test]
3733 fn sim_top_level_assume_ordering() {
3734 let mut flow = FlowBuilder::new();
3735 let node = flow.process::<()>();
3736
3737 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3738
3739 let out_recv = input
3740 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3741 .sim_output();
3742
3743 let instance_count = flow.sim().exhaustive(async || {
3744 in_send.send_many_unordered([1, 2, 3]);
3745 let mut out = out_recv.collect::<Vec<_>>().await;
3746 out.sort();
3747 assert_eq!(out, vec![1, 2, 3]);
3748 });
3749
3750 assert_eq!(instance_count, 6)
3751 }
3752
3753 #[cfg(feature = "sim")]
3754 #[test]
3755 fn sim_top_level_assume_ordering_cycle_back() {
3756 let mut flow = FlowBuilder::new();
3757 let node = flow.process::<()>();
3758 let node2 = flow.process::<()>();
3759
3760 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3761
3762 let (complete_cycle_back, cycle_back) =
3763 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3764 let ordered = input
3765 .merge_unordered(cycle_back)
3766 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3767 complete_cycle_back.complete(
3768 ordered
3769 .clone()
3770 .map(q!(|v| v + 1))
3771 .filter(q!(|v| v % 2 == 1))
3772 .send(&node2, TCP.fail_stop().bincode())
3773 .send(&node, TCP.fail_stop().bincode()),
3774 );
3775
3776 let out_recv = ordered.sim_output();
3777
3778 let mut saw = false;
3779 let instance_count = flow.sim().exhaustive(async || {
3780 in_send.send_many_unordered([0, 2]);
3781 let out = out_recv.collect::<Vec<_>>().await;
3782
3783 if out.starts_with(&[0, 1, 2]) {
3784 saw = true;
3785 }
3786 });
3787
3788 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3789 assert_eq!(instance_count, 6);
3790 }
3791
3792 #[cfg(feature = "sim")]
3793 #[test]
3794 fn sim_top_level_assume_ordering_cycle_back_tick() {
3795 let mut flow = FlowBuilder::new();
3796 let node = flow.process::<()>();
3797 let node2 = flow.process::<()>();
3798
3799 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3800
3801 let (complete_cycle_back, cycle_back) =
3802 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3803 let ordered = input
3804 .merge_unordered(cycle_back)
3805 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3806 complete_cycle_back.complete(
3807 ordered
3808 .clone()
3809 .batch(&node.tick(), nondet!(/** test */))
3810 .all_ticks()
3811 .map(q!(|v| v + 1))
3812 .filter(q!(|v| v % 2 == 1))
3813 .send(&node2, TCP.fail_stop().bincode())
3814 .send(&node, TCP.fail_stop().bincode()),
3815 );
3816
3817 let out_recv = ordered.sim_output();
3818
3819 let mut saw = false;
3820 let instance_count = flow.sim().exhaustive(async || {
3821 in_send.send_many_unordered([0, 2]);
3822 let out = out_recv.collect::<Vec<_>>().await;
3823
3824 if out.starts_with(&[0, 1, 2]) {
3825 saw = true;
3826 }
3827 });
3828
3829 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3830 assert_eq!(instance_count, 58);
3831 }
3832
3833 #[cfg(feature = "sim")]
3834 #[test]
3835 fn sim_top_level_assume_ordering_multiple() {
3836 let mut flow = FlowBuilder::new();
3837 let node = flow.process::<()>();
3838 let node2 = flow.process::<()>();
3839
3840 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3841 let (_, input2) = node.sim_input::<_, NoOrder, _>();
3842
3843 let (complete_cycle_back, cycle_back) =
3844 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3845 let input1_ordered = input
3846 .clone()
3847 .merge_unordered(cycle_back)
3848 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3849 let foo = input1_ordered
3850 .clone()
3851 .map(q!(|v| v + 3))
3852 .weaken_ordering::<NoOrder>()
3853 .merge_unordered(input2)
3854 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3855
3856 complete_cycle_back.complete(
3857 foo.filter(q!(|v| *v == 3))
3858 .send(&node2, TCP.fail_stop().bincode())
3859 .send(&node, TCP.fail_stop().bincode()),
3860 );
3861
3862 let out_recv = input1_ordered.sim_output();
3863
3864 let mut saw = false;
3865 let instance_count = flow.sim().exhaustive(async || {
3866 in_send.send_many_unordered([0, 1]);
3867 let out = out_recv.collect::<Vec<_>>().await;
3868
3869 if out.starts_with(&[0, 3, 1]) {
3870 saw = true;
3871 }
3872 });
3873
3874 assert!(saw, "did not see an instance with 0, 3, 1 in order");
3875 assert_eq!(instance_count, 24);
3876 }
3877
3878 #[cfg(feature = "sim")]
3879 #[test]
3880 fn sim_atomic_assume_ordering_cycle_back() {
3881 let mut flow = FlowBuilder::new();
3882 let node = flow.process::<()>();
3883 let node2 = flow.process::<()>();
3884
3885 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3886
3887 let (complete_cycle_back, cycle_back) =
3888 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3889 let ordered = input
3890 .merge_unordered(cycle_back)
3891 .atomic()
3892 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3893 .end_atomic();
3894 complete_cycle_back.complete(
3895 ordered
3896 .clone()
3897 .map(q!(|v| v + 1))
3898 .filter(q!(|v| v % 2 == 1))
3899 .send(&node2, TCP.fail_stop().bincode())
3900 .send(&node, TCP.fail_stop().bincode()),
3901 );
3902
3903 let out_recv = ordered.sim_output();
3904
3905 let instance_count = flow.sim().exhaustive(async || {
3906 in_send.send_many_unordered([0, 2]);
3907 let out = out_recv.collect::<Vec<_>>().await;
3908 assert_eq!(out.len(), 4);
3909 });
3910 assert_eq!(instance_count, 22);
3911 }
3912
3913 #[cfg(feature = "deploy")]
3914 #[tokio::test]
3915 async fn partition_evens_odds() {
3916 let mut deployment = Deployment::new();
3917
3918 let mut flow = FlowBuilder::new();
3919 let node = flow.process::<()>();
3920 let external = flow.external::<()>();
3921
3922 let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3923 let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3924 let evens_port = evens.send_bincode_external(&external);
3925 let odds_port = odds.send_bincode_external(&external);
3926
3927 let nodes = flow
3928 .with_process(&node, deployment.Localhost())
3929 .with_external(&external, deployment.Localhost())
3930 .deploy(&mut deployment);
3931
3932 deployment.deploy().await.unwrap();
3933
3934 let mut evens_out = nodes.connect(evens_port).await;
3935 let mut odds_out = nodes.connect(odds_port).await;
3936
3937 deployment.start().await.unwrap();
3938
3939 let mut even_results = Vec::new();
3940 for _ in 0..3 {
3941 even_results.push(evens_out.next().await.unwrap());
3942 }
3943 even_results.sort();
3944 assert_eq!(even_results, vec![2, 4, 6]);
3945
3946 let mut odd_results = Vec::new();
3947 for _ in 0..3 {
3948 odd_results.push(odds_out.next().await.unwrap());
3949 }
3950 odd_results.sort();
3951 assert_eq!(odd_results, vec![1, 3, 5]);
3952 }
3953
3954 #[cfg(feature = "deploy")]
3955 #[tokio::test]
3956 async fn unconsumed_inspect_still_runs() {
3957 use crate::deploy::DeployCrateWrapper;
3958
3959 let mut deployment = Deployment::new();
3960
3961 let mut flow = FlowBuilder::new();
3962 let node = flow.process::<()>();
3963
3964 // The return value of .inspect() is intentionally dropped.
3965 // Before the Null-root fix, this would silently do nothing.
3966 node.source_iter(q!(0..5))
3967 .inspect(q!(|x| println!("inspect: {}", x)));
3968
3969 let nodes = flow
3970 .with_process(&node, deployment.Localhost())
3971 .deploy(&mut deployment);
3972
3973 deployment.deploy().await.unwrap();
3974
3975 let mut stdout = nodes.get_process(&node).stdout();
3976
3977 deployment.start().await.unwrap();
3978
3979 let mut lines = Vec::new();
3980 for _ in 0..5 {
3981 lines.push(stdout.recv().await.unwrap());
3982 }
3983 lines.sort();
3984 assert_eq!(
3985 lines,
3986 vec![
3987 "inspect: 0",
3988 "inspect: 1",
3989 "inspect: 2",
3990 "inspect: 3",
3991 "inspect: 4",
3992 ]
3993 );
3994 }
3995
3996 #[cfg(feature = "sim")]
3997 #[test]
3998 fn sim_limit() {
3999 let mut flow = FlowBuilder::new();
4000 let node = flow.process::<()>();
4001
4002 let (in_send, input) = node.sim_input();
4003
4004 let out_recv = input.limit(q!(3)).sim_output();
4005
4006 flow.sim().exhaustive(async || {
4007 in_send.send(1);
4008 in_send.send(2);
4009 in_send.send(3);
4010 in_send.send(4);
4011 in_send.send(5);
4012
4013 out_recv.assert_yields_only([1, 2, 3]).await;
4014 });
4015 }
4016
4017 #[cfg(feature = "sim")]
4018 #[test]
4019 fn sim_limit_zero() {
4020 let mut flow = FlowBuilder::new();
4021 let node = flow.process::<()>();
4022
4023 let (in_send, input) = node.sim_input();
4024
4025 let out_recv = input.limit(q!(0)).sim_output();
4026
4027 flow.sim().exhaustive(async || {
4028 in_send.send(1);
4029 in_send.send(2);
4030
4031 out_recv.assert_yields_only::<i32, _>([]).await;
4032 });
4033 }
4034
4035 #[cfg(feature = "sim")]
4036 #[test]
4037 fn sim_merge_ordered() {
4038 let mut flow = FlowBuilder::new();
4039 let node = flow.process::<()>();
4040
4041 let (in_send, input) = node.sim_input();
4042 let (in_send2, input2) = node.sim_input();
4043
4044 let out_recv = input
4045 .merge_ordered(input2, nondet!(/** test */))
4046 .sim_output();
4047
4048 let mut saw_out_of_order = false;
4049 let instances = flow.sim().exhaustive(async || {
4050 in_send.send(1);
4051 in_send.send(2);
4052 in_send2.send(3);
4053 in_send2.send(4);
4054
4055 let mut out = out_recv.collect::<Vec<_>>().await;
4056
4057 if out == [1, 3, 2, 4] {
4058 saw_out_of_order = true;
4059 }
4060
4061 out.sort();
4062 assert_eq!(out, vec![1, 2, 3, 4]);
4063 });
4064
4065 assert!(saw_out_of_order);
4066 assert_eq!(instances, 26);
4067 }
4068
4069 #[cfg(feature = "deploy")]
4070 #[tokio::test]
4071 async fn monotone_fold_threshold() {
4072 use crate::properties::manual_proof;
4073
4074 let mut deployment = Deployment::new();
4075
4076 let mut flow = FlowBuilder::new();
4077 let node = flow.process::<()>();
4078 let external = flow.external::<()>();
4079
4080 let in_unbounded: super::Stream<_, _> =
4081 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4082 let sum = in_unbounded.fold(
4083 q!(|| 0),
4084 q!(
4085 |sum, v| {
4086 *sum += v;
4087 },
4088 monotone = manual_proof!(/** test */)
4089 ),
4090 );
4091
4092 let threshold_out = sum
4093 .threshold_greater_or_equal(node.singleton(q!(7)))
4094 .send_bincode_external(&external);
4095
4096 let nodes = flow
4097 .with_process(&node, deployment.Localhost())
4098 .with_external(&external, deployment.Localhost())
4099 .deploy(&mut deployment);
4100
4101 deployment.deploy().await.unwrap();
4102
4103 let mut threshold_out = nodes.connect(threshold_out).await;
4104
4105 deployment.start().await.unwrap();
4106
4107 assert_eq!(threshold_out.next().await.unwrap(), 7);
4108 }
4109
4110 #[cfg(feature = "deploy")]
4111 #[tokio::test]
4112 async fn monotone_count_threshold() {
4113 let mut deployment = Deployment::new();
4114
4115 let mut flow = FlowBuilder::new();
4116 let node = flow.process::<()>();
4117 let external = flow.external::<()>();
4118
4119 let in_unbounded: super::Stream<_, _> =
4120 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4121 let sum = in_unbounded.count();
4122
4123 let threshold_out = sum
4124 .threshold_greater_or_equal(node.singleton(q!(3)))
4125 .send_bincode_external(&external);
4126
4127 let nodes = flow
4128 .with_process(&node, deployment.Localhost())
4129 .with_external(&external, deployment.Localhost())
4130 .deploy(&mut deployment);
4131
4132 deployment.deploy().await.unwrap();
4133
4134 let mut threshold_out = nodes.connect(threshold_out).await;
4135
4136 deployment.start().await.unwrap();
4137
4138 assert_eq!(threshold_out.next().await.unwrap(), 3);
4139 }
4140
4141 #[cfg(feature = "deploy")]
4142 #[tokio::test]
4143 async fn monotone_map_order_preserving_threshold() {
4144 use crate::properties::manual_proof;
4145
4146 let mut deployment = Deployment::new();
4147
4148 let mut flow = FlowBuilder::new();
4149 let node = flow.process::<()>();
4150 let external = flow.external::<()>();
4151
4152 let in_unbounded: super::Stream<_, _> =
4153 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4154 let sum = in_unbounded.fold(
4155 q!(|| 0),
4156 q!(
4157 |sum, v| {
4158 *sum += v;
4159 },
4160 monotone = manual_proof!(/** test */)
4161 ),
4162 );
4163
4164 // map with order_preserving should preserve monotonicity
4165 let doubled = sum.map(q!(
4166 |v| v * 2,
4167 order_preserving = manual_proof!(/** doubling preserves order */)
4168 ));
4169
4170 let threshold_out = doubled
4171 .threshold_greater_or_equal(node.singleton(q!(14)))
4172 .send_bincode_external(&external);
4173
4174 let nodes = flow
4175 .with_process(&node, deployment.Localhost())
4176 .with_external(&external, deployment.Localhost())
4177 .deploy(&mut deployment);
4178
4179 deployment.deploy().await.unwrap();
4180
4181 let mut threshold_out = nodes.connect(threshold_out).await;
4182
4183 deployment.start().await.unwrap();
4184
4185 assert_eq!(threshold_out.next().await.unwrap(), 14);
4186 }
4187
4188 // === Compile-time type tests for join/cross_product ordering ===
4189
4190 #[cfg(any(feature = "deploy", feature = "sim"))]
4191 mod join_ordering_type_tests {
4192 use crate::live_collections::boundedness::{Bounded, Unbounded};
4193 use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4194 use crate::location::{Location, Process};
4195
4196 #[expect(dead_code, reason = "compile-time type test")]
4197 fn join_unbounded_with_bounded_preserves_order<'a>(
4198 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4199 right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4200 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4201 left.join(right)
4202 }
4203
4204 #[expect(dead_code, reason = "compile-time type test")]
4205 fn join_unbounded_with_unbounded_is_no_order<'a>(
4206 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4207 right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4208 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4209 left.join(right)
4210 }
4211
4212 #[expect(dead_code, reason = "compile-time type test")]
4213 fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4214 left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4215 right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4216 ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4217 left.join(right)
4218 }
4219
4220 #[expect(dead_code, reason = "compile-time type test")]
4221 fn join_unbounded_noorder_with_bounded<'a>(
4222 left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4223 right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4224 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4225 left.join(right)
4226 }
4227
4228 // === Compile-time type tests for cross_product ordering ===
4229
4230 #[expect(dead_code, reason = "compile-time type test")]
4231 fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4232 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4233 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4234 ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4235 left.cross_product(right)
4236 }
4237
4238 #[expect(dead_code, reason = "compile-time type test")]
4239 fn cross_product_bounded_with_bounded_preserves_order<'a>(
4240 left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4241 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4242 ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4243 left.cross_product(right)
4244 }
4245
4246 #[expect(dead_code, reason = "compile-time type test")]
4247 fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4248 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4249 right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4250 ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4251 left.cross_product(right)
4252 }
4253 } // mod join_ordering_type_tests
4254
4255 // === Runtime correctness tests for bounded join/cross_product ===
4256
4257 #[cfg(feature = "sim")]
4258 #[test]
4259 fn cross_product_mixed_boundedness_correctness() {
4260 use stageleft::q;
4261
4262 use crate::compile::builder::FlowBuilder;
4263 use crate::nondet::nondet;
4264
4265 let mut flow = FlowBuilder::new();
4266 let process = flow.process::<()>();
4267 let tick = process.tick();
4268
4269 let left = process.source_iter(q!(vec![1, 2]));
4270 let right = process
4271 .source_iter(q!(vec!['a', 'b']))
4272 .batch(&tick, nondet!(/** test */))
4273 .all_ticks();
4274
4275 let out = left.cross_product(right).sim_output();
4276
4277 flow.sim().exhaustive(async || {
4278 out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4279 .await;
4280 });
4281 }
4282
4283 #[cfg(feature = "sim")]
4284 #[test]
4285 fn join_mixed_boundedness_correctness() {
4286 use stageleft::q;
4287
4288 use crate::compile::builder::FlowBuilder;
4289 use crate::nondet::nondet;
4290
4291 let mut flow = FlowBuilder::new();
4292 let process = flow.process::<()>();
4293 let tick = process.tick();
4294
4295 let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4296 let right = process
4297 .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4298 .batch(&tick, nondet!(/** test */))
4299 .all_ticks();
4300
4301 let out = left.join(right).sim_output();
4302
4303 flow.sim().exhaustive(async || {
4304 out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4305 .await;
4306 });
4307 }
4308}