hydro_lang/live_collections/singleton.rs
1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::{Deref, Not};
6use std::rc::Rc;
7
8use sealed::sealed;
9use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::optional::Optional;
13use super::sliced::sliced;
14use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
15use crate::compile::builder::{CycleId, FlowState};
16use crate::compile::ir::{
17 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, SingletonBoundKind,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22#[cfg(stageleft_runtime)]
23use crate::location::dynamic::{DynLocation, LocationId};
24use crate::location::tick::{Atomic, NoAtomic};
25use crate::location::{Location, Tick, check_matching_location};
26use crate::nondet::{NonDet, nondet};
27use crate::properties::{
28 ApplyMonotoneStream, ApplyOrderPreservingSingleton, MapFuncAlgebra, Proved,
29};
30
31/// A marker trait indicating which components of a [`Singleton`] may change.
32///
33/// In addition to [`Bounded`] (immutable) and [`Unbounded`] (arbitrarily mutable), this also
34/// includes an additional variant [`Monotonic`], which means that the value will only grow.
35pub trait SingletonBound {
36 /// The [`Boundedness`] that this [`Singleton`] would be erased to.
37 type UnderlyingBound: Boundedness + ApplyMonotoneStream<Proved, Self::StreamToMonotone>;
38
39 /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`Stream`] with [`Self`] boundedness.
40 type StreamToMonotone: SingletonBound<UnderlyingBound = Self::UnderlyingBound>;
41
42 /// Returns the [`SingletonBoundKind`] corresponding to this type.
43 fn bound_kind() -> SingletonBoundKind;
44}
45
46impl SingletonBound for Unbounded {
47 type UnderlyingBound = Unbounded;
48
49 type StreamToMonotone = Monotonic;
50
51 fn bound_kind() -> SingletonBoundKind {
52 SingletonBoundKind::Unbounded
53 }
54}
55
56impl SingletonBound for Bounded {
57 type UnderlyingBound = Bounded;
58
59 type StreamToMonotone = Bounded;
60
61 fn bound_kind() -> SingletonBoundKind {
62 SingletonBoundKind::Bounded
63 }
64}
65
66/// Marks that the [`Singleton`] is monotonic, which means that its value will only grow over time.
67pub struct Monotonic;
68
69impl SingletonBound for Monotonic {
70 type UnderlyingBound = Unbounded;
71
72 type StreamToMonotone = Monotonic;
73
74 fn bound_kind() -> SingletonBoundKind {
75 SingletonBoundKind::Monotonic
76 }
77}
78
79#[sealed]
80#[diagnostic::on_unimplemented(
81 message = "The input singleton must be monotonic (`Monotonic`) or bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
82 label = "required here",
83 note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
84)]
85/// Marker trait that is implemented for the [`Monotonic`] boundedness guarantee.
86pub trait IsMonotonic: SingletonBound {}
87
88#[sealed]
89#[diagnostic::do_not_recommend]
90impl IsMonotonic for Monotonic {}
91
92#[sealed]
93#[diagnostic::do_not_recommend]
94impl<B: IsBounded> IsMonotonic for B {}
95
96/// A single Rust value that can asynchronously change over time.
97///
98/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
99/// [`Unbounded`], the value will asynchronously change over time.
100///
101/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
102/// a single number that will asynchronously change as events are processed. Singletons also appear
103/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
104/// such as getting the length of a batch of requests.
105///
106/// Type Parameters:
107/// - `Type`: the type of the value in this singleton
108/// - `Loc`: the [`Location`] where the singleton is materialized
109/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
110pub struct Singleton<Type, Loc, Bound: SingletonBound> {
111 pub(crate) location: Loc,
112 pub(crate) ir_node: RefCell<HydroNode>,
113 pub(crate) flow_state: FlowState,
114
115 _phantom: PhantomData<(Type, Loc, Bound)>,
116}
117
118impl<T, L, B: SingletonBound> Drop for Singleton<T, L, B> {
119 fn drop(&mut self) {
120 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
121 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
122 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
123 input: Box::new(ir_node),
124 op_metadata: HydroIrOpMetadata::new(),
125 });
126 }
127 }
128}
129
130impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
131where
132 T: Clone,
133 L: Location<'a>,
134{
135 fn from(value: Singleton<T, L, Bounded>) -> Self {
136 let tick = value.location().tick();
137 value.clone_into_tick(&tick).latest()
138 }
139}
140
141impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
142where
143 L: Location<'a>,
144{
145 type Location = Tick<L>;
146
147 fn location(&self) -> &Self::Location {
148 self.location()
149 }
150
151 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
152 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
153 location.clone(),
154 HydroNode::DeferTick {
155 input: Box::new(HydroNode::CycleSource {
156 cycle_id,
157 metadata: location.new_node_metadata(Self::collection_kind()),
158 }),
159 metadata: location
160 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
161 },
162 );
163
164 from_previous_tick.unwrap_or(initial)
165 }
166}
167
168impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
169where
170 L: Location<'a>,
171{
172 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
173 assert_eq!(
174 Location::id(&self.location),
175 expected_location,
176 "locations do not match"
177 );
178 self.location
179 .flow_state()
180 .borrow_mut()
181 .push_root(HydroRoot::CycleSink {
182 cycle_id,
183 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
184 op_metadata: HydroIrOpMetadata::new(),
185 });
186 }
187}
188
189impl<'a, T, L, B: SingletonBound> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
190where
191 L: Location<'a>,
192{
193 type Location = L;
194
195 fn create_source(cycle_id: CycleId, location: L) -> Self {
196 Singleton::new(
197 location.clone(),
198 HydroNode::CycleSource {
199 cycle_id,
200 metadata: location.new_node_metadata(Self::collection_kind()),
201 },
202 )
203 }
204}
205
206impl<'a, T, L, B: SingletonBound> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
207where
208 L: Location<'a>,
209{
210 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
211 assert_eq!(
212 Location::id(&self.location),
213 expected_location,
214 "locations do not match"
215 );
216 self.location
217 .flow_state()
218 .borrow_mut()
219 .push_root(HydroRoot::CycleSink {
220 cycle_id,
221 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
222 op_metadata: HydroIrOpMetadata::new(),
223 });
224 }
225}
226
227impl<'a, T, L, B: SingletonBound> Clone for Singleton<T, L, B>
228where
229 T: Clone,
230 L: Location<'a>,
231{
232 fn clone(&self) -> Self {
233 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
234 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
235 *self.ir_node.borrow_mut() = HydroNode::Tee {
236 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
237 metadata: self.location.new_node_metadata(Self::collection_kind()),
238 };
239 }
240
241 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
242 Singleton {
243 location: self.location.clone(),
244 flow_state: self.flow_state.clone(),
245 ir_node: HydroNode::Tee {
246 inner: SharedNode(inner.0.clone()),
247 metadata: metadata.clone(),
248 }
249 .into(),
250 _phantom: PhantomData,
251 }
252 } else {
253 unreachable!()
254 }
255 }
256}
257
258#[cfg(stageleft_runtime)]
259fn zip_inside_tick<'a, T, L: Location<'a>, B: SingletonBound, O>(
260 me: Singleton<T, Tick<L>, B>,
261 other: Optional<O, Tick<L>, B::UnderlyingBound>,
262) -> Optional<(T, O), Tick<L>, B::UnderlyingBound> {
263 let me_as_optional: Optional<T, Tick<L>, B::UnderlyingBound> = me.into();
264 super::optional::zip_inside_tick(me_as_optional, other)
265}
266
267impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
268where
269 L: Location<'a>,
270{
271 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
272 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
273 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
274 let flow_state = location.flow_state().clone();
275 Singleton {
276 location,
277 flow_state,
278 ir_node: RefCell::new(ir_node),
279 _phantom: PhantomData,
280 }
281 }
282
283 pub(crate) fn collection_kind() -> CollectionKind {
284 CollectionKind::Singleton {
285 bound: B::bound_kind(),
286 element_type: stageleft::quote_type::<T>().into(),
287 }
288 }
289
290 /// Returns the [`Location`] where this singleton is being materialized.
291 pub fn location(&self) -> &L {
292 &self.location
293 }
294
295 /// Weakens the consistency of this live collection to not guarantee any consistency across
296 /// cluster members (if this collection is on a cluster).
297 pub fn weaken_consistency(self) -> Singleton<T, L::NoConsistency, B>
298 where
299 L: Location<'a>,
300 {
301 if L::consistency()
302 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
303 {
304 // already no consistency
305 Singleton::new(
306 self.location.drop_consistency(),
307 self.ir_node.replace(HydroNode::Placeholder),
308 )
309 } else {
310 Singleton::new(
311 self.location.drop_consistency(),
312 HydroNode::Cast {
313 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
314 metadata: self
315 .location
316 .clone()
317 .drop_consistency()
318 .new_node_metadata(Singleton::<T, L::NoConsistency, B>::collection_kind()),
319 },
320 )
321 }
322 }
323
324 /// Casts this live collection to have the consistency guarantees specified in the given
325 /// location type parameter. The developer must ensure that the strengthened consistency
326 /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
327 pub fn assert_has_consistency_of<L2: Location<'a, NoConsistency = L::NoConsistency>>(
328 self,
329 _proof: impl crate::properties::ConsistencyProof,
330 ) -> Singleton<T, L2, B>
331 where
332 L: Location<'a>,
333 {
334 if L::consistency() == L2::consistency() {
335 Singleton::new(
336 self.location.with_consistency_of(),
337 self.ir_node.replace(HydroNode::Placeholder),
338 )
339 } else {
340 Singleton::new(
341 self.location.with_consistency_of(),
342 HydroNode::AssertIsConsistent {
343 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
344 metadata: self
345 .location
346 .clone()
347 .with_consistency_of::<L2>()
348 .new_node_metadata(Singleton::<T, L2, B>::collection_kind()),
349 },
350 )
351 }
352 }
353
354 /// Drops the monotonicity property of the [`Singleton`].
355 pub fn ignore_monotonic(self) -> Singleton<T, L, B::UnderlyingBound> {
356 if B::bound_kind() == B::UnderlyingBound::bound_kind() {
357 Singleton::new(
358 self.location.clone(),
359 self.ir_node.replace(HydroNode::Placeholder),
360 )
361 } else {
362 Singleton::new(
363 self.location.clone(),
364 HydroNode::Cast {
365 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
366 metadata:
367 self.location.new_node_metadata(
368 Singleton::<T, L, B::UnderlyingBound>::collection_kind(),
369 ),
370 },
371 )
372 }
373 }
374
375 /// Transforms the singleton value by applying a function `f` to it,
376 /// continuously as the input is updated.
377 ///
378 /// # Example
379 /// ```rust
380 /// # #[cfg(feature = "deploy")] {
381 /// # use hydro_lang::prelude::*;
382 /// # use futures::StreamExt;
383 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
384 /// let tick = process.tick();
385 /// let singleton = tick.singleton(q!(5));
386 /// singleton.map(q!(|v| v * 2)).all_ticks()
387 /// # }, |mut stream| async move {
388 /// // 10
389 /// # assert_eq!(stream.next().await.unwrap(), 10);
390 /// # }));
391 /// # }
392 /// ```
393 pub fn map<U, F, OP, B2: SingletonBound>(
394 self,
395 f: impl IntoQuotedMut<'a, F, L, MapFuncAlgebra<OP>>,
396 ) -> Singleton<U, L, B2>
397 where
398 F: Fn(T) -> U + 'a,
399 B: ApplyOrderPreservingSingleton<OP, B2>,
400 {
401 let (f, proof) = f.splice_fn1_ctx_props(&self.location);
402 proof.register_proof(&f);
403 let f = f.into();
404 Singleton::new(
405 self.location.clone(),
406 HydroNode::Map {
407 f,
408 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
409 metadata: self
410 .location
411 .new_node_metadata(Singleton::<U, L, B2>::collection_kind()),
412 },
413 )
414 }
415
416 /// Transforms the singleton value by applying a function `f` to it and then flattening
417 /// the result into a stream, preserving the order of elements.
418 ///
419 /// The function `f` is applied to the singleton value to produce an iterator, and all items
420 /// from that iterator are emitted in the output stream in deterministic order.
421 ///
422 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
423 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
424 /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
425 ///
426 /// # Example
427 /// ```rust
428 /// # #[cfg(feature = "deploy")] {
429 /// # use hydro_lang::prelude::*;
430 /// # use futures::StreamExt;
431 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
432 /// let tick = process.tick();
433 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
434 /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
435 /// # }, |mut stream| async move {
436 /// // 1, 2, 3
437 /// # for w in vec![1, 2, 3] {
438 /// # assert_eq!(stream.next().await.unwrap(), w);
439 /// # }
440 /// # }));
441 /// # }
442 /// ```
443 pub fn flat_map_ordered<U, I, F>(
444 self,
445 f: impl IntoQuotedMut<'a, F, L>,
446 ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
447 where
448 B: IsBounded,
449 I: IntoIterator<Item = U>,
450 F: Fn(T) -> I + 'a,
451 {
452 self.into_stream().flat_map_ordered(f)
453 }
454
455 /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
456 /// for the output type `I` to produce items in any order.
457 ///
458 /// The function `f` is applied to the singleton value to produce an iterator, and all items
459 /// from that iterator are emitted in the output stream in non-deterministic order.
460 ///
461 /// # Example
462 /// ```rust
463 /// # #[cfg(feature = "deploy")] {
464 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
465 /// # use futures::StreamExt;
466 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
467 /// let tick = process.tick();
468 /// let singleton = tick.singleton(q!(
469 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
470 /// ));
471 /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
472 /// # }, |mut stream| async move {
473 /// // 1, 2, 3, but in no particular order
474 /// # let mut results = Vec::new();
475 /// # for _ in 0..3 {
476 /// # results.push(stream.next().await.unwrap());
477 /// # }
478 /// # results.sort();
479 /// # assert_eq!(results, vec![1, 2, 3]);
480 /// # }));
481 /// # }
482 /// ```
483 pub fn flat_map_unordered<U, I, F>(
484 self,
485 f: impl IntoQuotedMut<'a, F, L>,
486 ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
487 where
488 B: IsBounded,
489 I: IntoIterator<Item = U>,
490 F: Fn(T) -> I + 'a,
491 {
492 self.into_stream().flat_map_unordered(f)
493 }
494
495 /// Flattens the singleton value into a stream, preserving the order of elements.
496 ///
497 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
498 /// are emitted in the output stream in deterministic order.
499 ///
500 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
501 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
502 /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
503 ///
504 /// # Example
505 /// ```rust
506 /// # #[cfg(feature = "deploy")] {
507 /// # use hydro_lang::prelude::*;
508 /// # use futures::StreamExt;
509 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
510 /// let tick = process.tick();
511 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
512 /// singleton.flatten_ordered().all_ticks()
513 /// # }, |mut stream| async move {
514 /// // 1, 2, 3
515 /// # for w in vec![1, 2, 3] {
516 /// # assert_eq!(stream.next().await.unwrap(), w);
517 /// # }
518 /// # }));
519 /// # }
520 /// ```
521 pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
522 where
523 B: IsBounded,
524 T: IntoIterator<Item = U>,
525 {
526 self.flat_map_ordered(q!(|x| x))
527 }
528
529 /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
530 /// for the element type `T` to produce items in any order.
531 ///
532 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
533 /// are emitted in the output stream in non-deterministic order.
534 ///
535 /// # Example
536 /// ```rust
537 /// # #[cfg(feature = "deploy")] {
538 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
539 /// # use futures::StreamExt;
540 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
541 /// let tick = process.tick();
542 /// let singleton = tick.singleton(q!(
543 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
544 /// ));
545 /// singleton.flatten_unordered().all_ticks()
546 /// # }, |mut stream| async move {
547 /// // 1, 2, 3, but in no particular order
548 /// # let mut results = Vec::new();
549 /// # for _ in 0..3 {
550 /// # results.push(stream.next().await.unwrap());
551 /// # }
552 /// # results.sort();
553 /// # assert_eq!(results, vec![1, 2, 3]);
554 /// # }));
555 /// # }
556 /// ```
557 pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
558 where
559 B: IsBounded,
560 T: IntoIterator<Item = U>,
561 {
562 self.flat_map_unordered(q!(|x| x))
563 }
564
565 /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
566 ///
567 /// If the predicate returns `true`, the output optional contains the same value.
568 /// If the predicate returns `false`, the output optional is empty.
569 ///
570 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
571 /// not modify or take ownership of the value. If you need to modify the value while filtering
572 /// use [`Singleton::filter_map`] instead.
573 ///
574 /// # Example
575 /// ```rust
576 /// # #[cfg(feature = "deploy")] {
577 /// # use hydro_lang::prelude::*;
578 /// # use futures::StreamExt;
579 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
580 /// let tick = process.tick();
581 /// let singleton = tick.singleton(q!(5));
582 /// singleton.filter(q!(|&x| x > 3)).all_ticks()
583 /// # }, |mut stream| async move {
584 /// // 5
585 /// # assert_eq!(stream.next().await.unwrap(), 5);
586 /// # }));
587 /// # }
588 /// ```
589 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B::UnderlyingBound>
590 where
591 F: Fn(&T) -> bool + 'a,
592 {
593 let f = f.splice_fn1_borrow_ctx(&self.location).into();
594 Optional::new(
595 self.location.clone(),
596 HydroNode::Filter {
597 f,
598 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
599 metadata: self
600 .location
601 .new_node_metadata(Optional::<T, L, B::UnderlyingBound>::collection_kind()),
602 },
603 )
604 }
605
606 /// An operator that both filters and maps. It yields the value only if the supplied
607 /// closure `f` returns `Some(value)`.
608 ///
609 /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
610 /// If the closure returns `None`, the output optional is empty.
611 ///
612 /// # Example
613 /// ```rust
614 /// # #[cfg(feature = "deploy")] {
615 /// # use hydro_lang::prelude::*;
616 /// # use futures::StreamExt;
617 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
618 /// let tick = process.tick();
619 /// let singleton = tick.singleton(q!("42"));
620 /// singleton
621 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
622 /// .all_ticks()
623 /// # }, |mut stream| async move {
624 /// // 42
625 /// # assert_eq!(stream.next().await.unwrap(), 42);
626 /// # }));
627 /// # }
628 /// ```
629 pub fn filter_map<U, F>(
630 self,
631 f: impl IntoQuotedMut<'a, F, L>,
632 ) -> Optional<U, L, B::UnderlyingBound>
633 where
634 F: Fn(T) -> Option<U> + 'a,
635 {
636 let f = f.splice_fn1_ctx(&self.location).into();
637 Optional::new(
638 self.location.clone(),
639 HydroNode::FilterMap {
640 f,
641 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
642 metadata: self
643 .location
644 .new_node_metadata(Optional::<U, L, B::UnderlyingBound>::collection_kind()),
645 },
646 )
647 }
648
649 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
650 ///
651 /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
652 /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
653 /// non-null. This is useful for combining several pieces of state together.
654 ///
655 /// # Example
656 /// ```rust
657 /// # #[cfg(feature = "deploy")] {
658 /// # use hydro_lang::prelude::*;
659 /// # use futures::StreamExt;
660 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
661 /// let tick = process.tick();
662 /// let numbers = process
663 /// .source_iter(q!(vec![123, 456]))
664 /// .batch(&tick, nondet!(/** test */));
665 /// let count = numbers.clone().count(); // Singleton
666 /// let max = numbers.max(); // Optional
667 /// count.zip(max).all_ticks()
668 /// # }, |mut stream| async move {
669 /// // [(2, 456)]
670 /// # for w in vec![(2, 456)] {
671 /// # assert_eq!(stream.next().await.unwrap(), w);
672 /// # }
673 /// # }));
674 /// # }
675 /// ```
676 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
677 where
678 Self: ZipResult<'a, O, Location = L>,
679 B: IsBounded,
680 {
681 check_matching_location(&self.location, &Self::other_location(&other));
682
683 if L::is_top_level()
684 && let Some(tick) = self.location.try_tick()
685 {
686 let self_location = self.location().clone();
687 let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
688 let out = zip_inside_tick(
689 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
690 Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
691 other_location.clone(),
692 HydroNode::Cast {
693 inner: Box::new(Self::other_ir_node(other)),
694 metadata: other_location.new_node_metadata(Optional::<
695 <Self as ZipResult<'a, O>>::OtherType,
696 Tick<L>,
697 Bounded,
698 >::collection_kind(
699 )),
700 },
701 )
702 .snapshot(&tick, nondet!(/** eventually stabilizes */)),
703 )
704 .latest();
705
706 Self::make(self_location, out.ir_node.replace(HydroNode::Placeholder))
707 } else {
708 Self::make(
709 self.location.clone(),
710 HydroNode::CrossSingleton {
711 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
712 right: Box::new(Self::other_ir_node(other)),
713 metadata: self.location.new_node_metadata(CollectionKind::Optional {
714 bound: B::BOUND_KIND,
715 element_type: stageleft::quote_type::<
716 <Self as ZipResult<'a, O>>::ElementType,
717 >()
718 .into(),
719 }),
720 },
721 )
722 }
723 }
724
725 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
726 /// boolean signal is `true`, otherwise the output is null.
727 ///
728 /// # Example
729 /// ```rust
730 /// # #[cfg(feature = "deploy")] {
731 /// # use hydro_lang::prelude::*;
732 /// # use futures::StreamExt;
733 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
734 /// let tick = process.tick();
735 /// // ticks are lazy by default, forces the second tick to run
736 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
737 ///
738 /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
739 /// let batch_first_tick = process
740 /// .source_iter(q!(vec![1]))
741 /// .batch(&tick, nondet!(/** test */));
742 /// let batch_second_tick = process
743 /// .source_iter(q!(vec![1, 2, 3]))
744 /// .batch(&tick, nondet!(/** test */))
745 /// .defer_tick();
746 /// batch_first_tick.chain(batch_second_tick).count()
747 /// .filter_if(signal)
748 /// .all_ticks()
749 /// # }, |mut stream| async move {
750 /// // [1]
751 /// # for w in vec![1] {
752 /// # assert_eq!(stream.next().await.unwrap(), w);
753 /// # }
754 /// # }));
755 /// # }
756 /// ```
757 pub fn filter_if(
758 self,
759 signal: Singleton<bool, L, B>,
760 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
761 where
762 B: IsBounded,
763 {
764 self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
765 }
766
767 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
768 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
769 ///
770 /// Useful for conditionally processing, such as only emitting a singleton's value outside
771 /// a tick if some other condition is satisfied.
772 ///
773 /// # Example
774 /// ```rust
775 /// # #[cfg(feature = "deploy")] {
776 /// # use hydro_lang::prelude::*;
777 /// # use futures::StreamExt;
778 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
779 /// let tick = process.tick();
780 /// // ticks are lazy by default, forces the second tick to run
781 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
782 ///
783 /// let batch_first_tick = process
784 /// .source_iter(q!(vec![1]))
785 /// .batch(&tick, nondet!(/** test */));
786 /// let batch_second_tick = process
787 /// .source_iter(q!(vec![1, 2, 3]))
788 /// .batch(&tick, nondet!(/** test */))
789 /// .defer_tick(); // appears on the second tick
790 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
791 /// batch_first_tick.chain(batch_second_tick).count()
792 /// .filter_if_some(some_on_first_tick)
793 /// .all_ticks()
794 /// # }, |mut stream| async move {
795 /// // [1]
796 /// # for w in vec![1] {
797 /// # assert_eq!(stream.next().await.unwrap(), w);
798 /// # }
799 /// # }));
800 /// # }
801 /// ```
802 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
803 pub fn filter_if_some<U>(
804 self,
805 signal: Optional<U, L, B>,
806 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
807 where
808 B: IsBounded,
809 {
810 self.filter_if(signal.is_some())
811 }
812
813 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
814 /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
815 ///
816 /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
817 /// the condition.
818 ///
819 /// # Example
820 /// ```rust
821 /// # #[cfg(feature = "deploy")] {
822 /// # use hydro_lang::prelude::*;
823 /// # use futures::StreamExt;
824 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
825 /// let tick = process.tick();
826 /// // ticks are lazy by default, forces the second tick to run
827 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
828 ///
829 /// let batch_first_tick = process
830 /// .source_iter(q!(vec![1]))
831 /// .batch(&tick, nondet!(/** test */));
832 /// let batch_second_tick = process
833 /// .source_iter(q!(vec![1, 2, 3]))
834 /// .batch(&tick, nondet!(/** test */))
835 /// .defer_tick(); // appears on the second tick
836 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
837 /// batch_first_tick.chain(batch_second_tick).count()
838 /// .filter_if_none(some_on_first_tick)
839 /// .all_ticks()
840 /// # }, |mut stream| async move {
841 /// // [3]
842 /// # for w in vec![3] {
843 /// # assert_eq!(stream.next().await.unwrap(), w);
844 /// # }
845 /// # }));
846 /// # }
847 /// ```
848 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
849 pub fn filter_if_none<U>(
850 self,
851 other: Optional<U, L, B>,
852 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
853 where
854 B: IsBounded,
855 {
856 self.filter_if(other.is_none())
857 }
858
859 /// Returns a [`Singleton`] containing `true` if this singleton's value equals the other's.
860 ///
861 /// # Example
862 /// ```rust
863 /// # #[cfg(feature = "deploy")] {
864 /// # use hydro_lang::prelude::*;
865 /// # use futures::StreamExt;
866 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
867 /// let tick = process.tick();
868 /// let a = tick.singleton(q!(5));
869 /// let b = tick.singleton(q!(5));
870 /// a.equals(b).all_ticks()
871 /// # }, |mut stream| async move {
872 /// // [true]
873 /// # assert_eq!(stream.next().await.unwrap(), true);
874 /// # }));
875 /// # }
876 /// ```
877 pub fn equals(self, other: Singleton<T, L, B>) -> Singleton<bool, L, B>
878 where
879 T: PartialEq,
880 B: IsBounded,
881 {
882 self.zip(other).map(q!(|(a, b)| a == b))
883 }
884
885 /// Returns a [`Stream`] that emits an event the first time the singleton has a value that is
886 /// greater than or equal to the provided threshold. The event will have the value of the
887 /// given threshold.
888 ///
889 /// This requires the incoming singleton to be monotonic, because otherwise the detection of
890 /// the threshold would be non-deterministic.
891 ///
892 /// # Example
893 /// ```rust
894 /// # #[cfg(feature = "deploy")] {
895 /// # use hydro_lang::prelude::*;
896 /// # use futures::StreamExt;
897 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
898 /// let a = // singleton 1 ~> 5 ~> 10
899 /// # process.singleton(q!(5));
900 /// let b = process.singleton(q!(4));
901 /// a.threshold_greater_or_equal(b)
902 /// # }, |mut stream| async move {
903 /// // [4]
904 /// # assert_eq!(stream.next().await.unwrap(), 4);
905 /// # }));
906 /// # }
907 /// ```
908 pub fn threshold_greater_or_equal<B2: IsBounded>(
909 self,
910 threshold: Singleton<T, L, B2>,
911 ) -> Stream<T, L, B::UnderlyingBound>
912 where
913 T: Clone + PartialOrd,
914 B: IsMonotonic,
915 {
916 let threshold = threshold.make_bounded();
917 let self_location = self.location().clone();
918 match self.try_make_bounded() {
919 Ok(bounded) => {
920 let uncasted = threshold
921 .zip(bounded)
922 .into_stream()
923 .filter_map(q!(|(t, m)| if m < t { None } else { Some(t) }));
924
925 Stream::new(
926 uncasted.location.clone(),
927 uncasted.ir_node.replace(HydroNode::Placeholder),
928 )
929 }
930 Err(me) => {
931 let uncasted = sliced! {
932 let me = use(me, nondet!(/** thresholds are deterministic */));
933 let mut remaining_threshold = use::state(|l| {
934 let as_option: Optional<_, _, _> = threshold.clone_into_tick(l).into();
935 as_option
936 });
937
938 let (not_passed, passed) = remaining_threshold.zip(me).into_stream().partition(q!(|(t, m)| m < t));
939 remaining_threshold = not_passed.first().map(q!(|(t, _)| t));
940 passed.map(q!(|(t, _)| t))
941 };
942
943 Stream::new(
944 self_location,
945 uncasted.ir_node.replace(HydroNode::Placeholder),
946 )
947 }
948 }
949 }
950
951 /// An operator which allows you to "name" a `HydroNode`.
952 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
953 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
954 {
955 let mut node = self.ir_node.borrow_mut();
956 let metadata = node.metadata_mut();
957 metadata.tag = Some(name.to_owned());
958 }
959 self
960 }
961}
962
963impl<'a, L: Location<'a>, B: SingletonBound> Not for Singleton<bool, L, B> {
964 type Output = Singleton<bool, L, B::UnderlyingBound>;
965
966 fn not(self) -> Self::Output {
967 self.map(q!(|b| !b))
968 }
969}
970
971impl<'a, T, L, B: SingletonBound> Singleton<Option<T>, L, B>
972where
973 L: Location<'a>,
974{
975 /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
976 /// the inner `Option`.
977 ///
978 /// This is implemented as an identity [`Singleton::filter_map`], passing through the
979 /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
980 /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
981 ///
982 /// # Example
983 /// ```rust
984 /// # #[cfg(feature = "deploy")] {
985 /// # use hydro_lang::prelude::*;
986 /// # use futures::StreamExt;
987 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
988 /// let tick = process.tick();
989 /// let singleton = tick.singleton(q!(Some(42)));
990 /// singleton.into_optional().all_ticks()
991 /// # }, |mut stream| async move {
992 /// // 42
993 /// # assert_eq!(stream.next().await.unwrap(), 42);
994 /// # }));
995 /// # }
996 /// ```
997 pub fn into_optional(self) -> Optional<T, L, B::UnderlyingBound> {
998 self.filter_map(q!(|v| v))
999 }
1000}
1001
1002impl<'a, L, B: SingletonBound> Singleton<bool, L, B>
1003where
1004 L: Location<'a>,
1005{
1006 /// Returns a [`Singleton`] containing the logical AND of this and another boolean singleton.
1007 ///
1008 /// # Example
1009 /// ```rust
1010 /// # #[cfg(feature = "deploy")] {
1011 /// # use hydro_lang::prelude::*;
1012 /// # use futures::StreamExt;
1013 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1014 /// let tick = process.tick();
1015 /// // ticks are lazy by default, forces the second tick to run
1016 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1017 ///
1018 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1019 /// let b = tick.singleton(q!(true)); // true, true
1020 /// a.and(b).all_ticks()
1021 /// # }, |mut stream| async move {
1022 /// // [true, false]
1023 /// # for w in vec![true, false] {
1024 /// # assert_eq!(stream.next().await.unwrap(), w);
1025 /// # }
1026 /// # }));
1027 /// # }
1028 /// ```
1029 pub fn and(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1030 where
1031 B: IsBounded,
1032 {
1033 self.zip(other).map(q!(|(a, b)| a && b)).make_bounded()
1034 }
1035
1036 /// Returns a [`Singleton`] containing the logical OR of this and another boolean singleton.
1037 ///
1038 /// # Example
1039 /// ```rust
1040 /// # #[cfg(feature = "deploy")] {
1041 /// # use hydro_lang::prelude::*;
1042 /// # use futures::StreamExt;
1043 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1044 /// let tick = process.tick();
1045 /// // ticks are lazy by default, forces the second tick to run
1046 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1047 ///
1048 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1049 /// let b = tick.singleton(q!(false)); // false, false
1050 /// a.or(b).all_ticks()
1051 /// # }, |mut stream| async move {
1052 /// // [true, false]
1053 /// # for w in vec![true, false] {
1054 /// # assert_eq!(stream.next().await.unwrap(), w);
1055 /// # }
1056 /// # }));
1057 /// # }
1058 /// ```
1059 pub fn or(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1060 where
1061 B: IsBounded,
1062 {
1063 self.zip(other).map(q!(|(a, b)| a || b)).make_bounded()
1064 }
1065}
1066
1067impl<'a, T, L, B: SingletonBound> Singleton<T, Atomic<L>, B>
1068where
1069 L: Location<'a>,
1070{
1071 /// Returns a singleton value corresponding to the latest snapshot of the singleton
1072 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1073 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1074 /// all snapshots of this singleton into the atomic-associated tick will observe the
1075 /// same value each tick.
1076 ///
1077 /// # Non-Determinism
1078 /// Because this picks a snapshot of a singleton whose value is continuously changing,
1079 /// the output singleton has a non-deterministic value since the snapshot can be at an
1080 /// arbitrary point in time.
1081 pub fn snapshot_atomic<L2: Location<'a, NoConsistency = L::NoConsistency>>(
1082 self,
1083 tick: &Tick<L2>,
1084 _nondet: NonDet,
1085 ) -> Singleton<T, Tick<L::NoConsistency>, Bounded> {
1086 Singleton::new(
1087 tick.drop_consistency(),
1088 HydroNode::Batch {
1089 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1090 metadata: tick
1091 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1092 },
1093 )
1094 }
1095
1096 /// Returns this singleton back into a top-level, asynchronous execution context where updates
1097 /// to the value will be asynchronously propagated.
1098 pub fn end_atomic(self) -> Singleton<T, L, B> {
1099 Singleton::new(
1100 self.location.tick.l.clone(),
1101 HydroNode::EndAtomic {
1102 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1103 metadata: self
1104 .location
1105 .tick
1106 .l
1107 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
1108 },
1109 )
1110 }
1111}
1112
1113impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
1114where
1115 L: Location<'a>,
1116{
1117 /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
1118 /// will observe the same version of the value and will be executed synchronously before any
1119 /// outputs are yielded (in [`Optional::end_atomic`]).
1120 ///
1121 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1122 /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
1123 /// a different version).
1124 pub fn atomic(self) -> Singleton<T, Atomic<L>, B> {
1125 let id = self.location.flow_state().borrow_mut().next_clock_id();
1126 let out_location = Atomic {
1127 tick: Tick {
1128 id,
1129 l: self.location.clone(),
1130 },
1131 };
1132 Singleton::new(
1133 out_location.clone(),
1134 HydroNode::BeginAtomic {
1135 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1136 metadata: out_location
1137 .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
1138 },
1139 )
1140 }
1141
1142 /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
1143 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1144 /// relevant data that contributed to the snapshot at tick `t`.
1145 ///
1146 /// # Non-Determinism
1147 /// Because this picks a snapshot of a singleton whose value is continuously changing,
1148 /// the output singleton has a non-deterministic value since the snapshot can be at an
1149 /// arbitrary point in time.
1150 pub fn snapshot<L2: Location<'a, NoConsistency = L::NoConsistency>>(
1151 self,
1152 tick: &Tick<L2>,
1153 _nondet: NonDet,
1154 ) -> Singleton<T, Tick<L::NoConsistency>, Bounded> {
1155 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1156 Singleton::new(
1157 tick.drop_consistency(),
1158 HydroNode::Batch {
1159 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1160 metadata: tick
1161 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1162 },
1163 )
1164 }
1165
1166 /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
1167 /// with order corresponding to increasing prefixes of data contributing to the singleton.
1168 ///
1169 /// # Non-Determinism
1170 /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
1171 /// to non-deterministic batching and arrival of inputs, the output stream is
1172 /// non-deterministic.
1173 pub fn sample_eager(
1174 self,
1175 nondet: NonDet,
1176 ) -> Stream<T, L::NoConsistency, Unbounded, TotalOrder, AtLeastOnce> {
1177 sliced! {
1178 let snapshot = use(self, nondet);
1179 snapshot.into_stream()
1180 }
1181 .weaken_retries()
1182 }
1183
1184 /// Given a time interval, returns a stream corresponding to snapshots of the singleton
1185 /// value taken at various points in time. Because the input singleton may be
1186 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1187 /// represent the value of the singleton given some prefix of the streams leading up to
1188 /// it.
1189 ///
1190 /// # Non-Determinism
1191 /// The output stream is non-deterministic in which elements are sampled, since this
1192 /// is controlled by a clock.
1193 pub fn sample_every(
1194 self,
1195 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1196 nondet: NonDet,
1197 ) -> Stream<T, L::NoConsistency, Unbounded, TotalOrder, AtLeastOnce>
1198 where
1199 L: NoAtomic,
1200 {
1201 let samples = self.location.source_interval(interval, nondet);
1202 sliced! {
1203 let snapshot = use(self, nondet);
1204 let sample_batch = use(samples, nondet);
1205
1206 snapshot.filter_if(sample_batch.first().is_some()).into_stream()
1207 }
1208 .weaken_retries()
1209 }
1210
1211 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1212 /// implies that `B == Bounded`.
1213 pub fn make_bounded(self) -> Singleton<T, L, Bounded>
1214 where
1215 B: IsBounded,
1216 {
1217 Singleton::new(
1218 self.location.clone(),
1219 self.ir_node.replace(HydroNode::Placeholder),
1220 )
1221 }
1222
1223 #[expect(clippy::result_large_err, reason = "internal use only")]
1224 fn try_make_bounded(self) -> Result<Singleton<T, L, Bounded>, Singleton<T, L, B>> {
1225 if B::UnderlyingBound::BOUNDED {
1226 Ok(Singleton::new(
1227 self.location.clone(),
1228 self.ir_node.replace(HydroNode::Placeholder),
1229 ))
1230 } else {
1231 Err(self)
1232 }
1233 }
1234
1235 /// Clones this bounded singleton into a tick, returning a singleton that has the
1236 /// same value as the outer singleton. Because the outer singleton is bounded, this
1237 /// is deterministic because there is only a single immutable version.
1238 pub fn clone_into_tick<L2: Location<'a, NoConsistency = L::NoConsistency>>(
1239 self,
1240 tick: &Tick<L2>,
1241 ) -> Singleton<T, Tick<L2>, Bounded>
1242 where
1243 B: IsBounded,
1244 T: Clone,
1245 {
1246 // TODO(shadaj): avoid printing simulator logs for this snapshot
1247 let inner = self.snapshot(
1248 tick,
1249 nondet!(/** bounded top-level singleton so deterministic */),
1250 );
1251 Singleton::new(tick.clone(), inner.ir_node.replace(HydroNode::Placeholder))
1252 }
1253
1254 /// Converts this singleton into a [`Stream`] containing a single element, the value.
1255 ///
1256 /// # Example
1257 /// ```rust
1258 /// # #[cfg(feature = "deploy")] {
1259 /// # use hydro_lang::prelude::*;
1260 /// # use futures::StreamExt;
1261 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1262 /// let tick = process.tick();
1263 /// let batch_input = process
1264 /// .source_iter(q!(vec![123, 456]))
1265 /// .batch(&tick, nondet!(/** test */));
1266 /// batch_input.clone().chain(
1267 /// batch_input.count().into_stream()
1268 /// ).all_ticks()
1269 /// # }, |mut stream| async move {
1270 /// // [123, 456, 2]
1271 /// # for w in vec![123, 456, 2] {
1272 /// # assert_eq!(stream.next().await.unwrap(), w);
1273 /// # }
1274 /// # }));
1275 /// # }
1276 /// ```
1277 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1278 where
1279 B: IsBounded,
1280 {
1281 Stream::new(
1282 self.location.clone(),
1283 HydroNode::Cast {
1284 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1285 metadata: self.location.new_node_metadata(Stream::<
1286 T,
1287 Tick<L>,
1288 Bounded,
1289 TotalOrder,
1290 ExactlyOnce,
1291 >::collection_kind()),
1292 },
1293 )
1294 }
1295
1296 /// Resolves the singleton's [`Future`] value by blocking until it completes,
1297 /// producing a singleton of the resolved output.
1298 ///
1299 /// This is useful when the singleton contains an async computation that must
1300 /// be awaited before further processing. The future is polled to completion
1301 /// before the output value is emitted.
1302 ///
1303 /// # Example
1304 /// ```rust
1305 /// # #[cfg(feature = "deploy")] {
1306 /// # use hydro_lang::prelude::*;
1307 /// # use futures::StreamExt;
1308 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1309 /// let tick = process.tick();
1310 /// let singleton = tick.singleton(q!(5));
1311 /// singleton
1312 /// .map(q!(|v| async move { v * 2 }))
1313 /// .resolve_future_blocking()
1314 /// .all_ticks()
1315 /// # }, |mut stream| async move {
1316 /// // 10
1317 /// # assert_eq!(stream.next().await.unwrap(), 10);
1318 /// # }));
1319 /// # }
1320 /// ```
1321 pub fn resolve_future_blocking(
1322 self,
1323 ) -> Singleton<T::Output, L, <B as SingletonBound>::UnderlyingBound>
1324 where
1325 T: Future,
1326 B: IsBounded,
1327 {
1328 Singleton::new(
1329 self.location.clone(),
1330 HydroNode::ResolveFuturesBlocking {
1331 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1332 metadata: self
1333 .location
1334 .new_node_metadata(Singleton::<T::Output, L, B>::collection_kind()),
1335 },
1336 )
1337 }
1338}
1339
1340impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
1341where
1342 L: Location<'a>,
1343{
1344 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1345 /// which will stream the value computed in _each_ tick as a separate stream element.
1346 ///
1347 /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
1348 /// producing one element in the output for each tick. This is useful for batched computations,
1349 /// where the results from each tick must be combined together.
1350 ///
1351 /// # Example
1352 /// ```rust
1353 /// # #[cfg(feature = "deploy")] {
1354 /// # use hydro_lang::prelude::*;
1355 /// # use futures::StreamExt;
1356 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1357 /// let tick = process.tick();
1358 /// # // ticks are lazy by default, forces the second tick to run
1359 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1360 /// # let batch_first_tick = process
1361 /// # .source_iter(q!(vec![1]))
1362 /// # .batch(&tick, nondet!(/** test */));
1363 /// # let batch_second_tick = process
1364 /// # .source_iter(q!(vec![1, 2, 3]))
1365 /// # .batch(&tick, nondet!(/** test */))
1366 /// # .defer_tick(); // appears on the second tick
1367 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1368 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1369 /// .count()
1370 /// .all_ticks()
1371 /// # }, |mut stream| async move {
1372 /// // [1, 3]
1373 /// # for w in vec![1, 3] {
1374 /// # assert_eq!(stream.next().await.unwrap(), w);
1375 /// # }
1376 /// # }));
1377 /// # }
1378 /// ```
1379 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1380 self.into_stream().all_ticks()
1381 }
1382
1383 /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
1384 /// which will stream the value computed in _each_ tick as a separate stream element.
1385 ///
1386 /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
1387 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1388 /// singleton's [`Tick`] context.
1389 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1390 self.into_stream().all_ticks_atomic()
1391 }
1392
1393 /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
1394 /// be asynchronously updated with the latest value of the singleton inside the tick.
1395 ///
1396 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1397 /// tick that tracks the inner value. This is useful for getting the value as of the
1398 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1399 ///
1400 /// # Example
1401 /// ```rust
1402 /// # #[cfg(feature = "deploy")] {
1403 /// # use hydro_lang::prelude::*;
1404 /// # use futures::StreamExt;
1405 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1406 /// let tick = process.tick();
1407 /// # // ticks are lazy by default, forces the second tick to run
1408 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1409 /// # let batch_first_tick = process
1410 /// # .source_iter(q!(vec![1]))
1411 /// # .batch(&tick, nondet!(/** test */));
1412 /// # let batch_second_tick = process
1413 /// # .source_iter(q!(vec![1, 2, 3]))
1414 /// # .batch(&tick, nondet!(/** test */))
1415 /// # .defer_tick(); // appears on the second tick
1416 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1417 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1418 /// .count()
1419 /// .latest()
1420 /// # .sample_eager(nondet!(/** test */))
1421 /// # }, |mut stream| async move {
1422 /// // asynchronously changes from 1 ~> 3
1423 /// # for w in vec![1, 3] {
1424 /// # assert_eq!(stream.next().await.unwrap(), w);
1425 /// # }
1426 /// # }));
1427 /// # }
1428 /// ```
1429 pub fn latest(self) -> Singleton<T, L, Unbounded> {
1430 Singleton::new(
1431 self.location.outer().clone(),
1432 HydroNode::YieldConcat {
1433 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1434 metadata: self
1435 .location
1436 .outer()
1437 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1438 },
1439 )
1440 }
1441
1442 /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1443 /// be updated with the latest value of the singleton inside the tick.
1444 ///
1445 /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1446 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1447 /// singleton's [`Tick`] context.
1448 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1449 let out_location = Atomic {
1450 tick: self.location.clone(),
1451 };
1452 Singleton::new(
1453 out_location.clone(),
1454 HydroNode::YieldConcat {
1455 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1456 metadata: out_location
1457 .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1458 },
1459 )
1460 }
1461}
1462
1463#[doc(hidden)]
1464/// Helper trait that determines the output collection type for [`Singleton::zip`].
1465///
1466/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1467/// [`Singleton`].
1468#[sealed::sealed]
1469pub trait ZipResult<'a, Other> {
1470 /// The output collection type.
1471 type Out;
1472 /// The type of the tupled output value.
1473 type ElementType;
1474 /// The type of the other collection's value.
1475 type OtherType;
1476 /// The location where the tupled result will be materialized.
1477 type Location: Location<'a>;
1478
1479 /// The location of the second input to the `zip`.
1480 fn other_location(other: &Other) -> Self::Location;
1481 /// The IR node of the second input to the `zip`.
1482 fn other_ir_node(other: Other) -> HydroNode;
1483
1484 /// Constructs the output live collection given an IR node containing the zip result.
1485 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1486}
1487
1488#[sealed::sealed]
1489impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1490where
1491 L: Location<'a>,
1492{
1493 type Out = Singleton<(T, U), L, B>;
1494 type ElementType = (T, U);
1495 type OtherType = U;
1496 type Location = L;
1497
1498 fn other_location(other: &Singleton<U, L, B>) -> L {
1499 other.location.clone()
1500 }
1501
1502 fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1503 other.ir_node.replace(HydroNode::Placeholder)
1504 }
1505
1506 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1507 Singleton::new(
1508 location.clone(),
1509 HydroNode::Cast {
1510 inner: Box::new(ir_node),
1511 metadata: location.new_node_metadata(Self::Out::collection_kind()),
1512 },
1513 )
1514 }
1515}
1516
1517#[sealed::sealed]
1518impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Optional<U, L, B::UnderlyingBound>>
1519 for Singleton<T, L, B>
1520where
1521 L: Location<'a>,
1522{
1523 type Out = Optional<(T, U), L, B::UnderlyingBound>;
1524 type ElementType = (T, U);
1525 type OtherType = U;
1526 type Location = L;
1527
1528 fn other_location(other: &Optional<U, L, B::UnderlyingBound>) -> L {
1529 other.location.clone()
1530 }
1531
1532 fn other_ir_node(other: Optional<U, L, B::UnderlyingBound>) -> HydroNode {
1533 other.ir_node.replace(HydroNode::Placeholder)
1534 }
1535
1536 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1537 Optional::new(location, ir_node)
1538 }
1539}
1540
1541#[cfg(test)]
1542mod tests {
1543 #[cfg(feature = "deploy")]
1544 use futures::{SinkExt, StreamExt};
1545 #[cfg(feature = "deploy")]
1546 use hydro_deploy::Deployment;
1547 #[cfg(any(feature = "deploy", feature = "sim"))]
1548 use stageleft::q;
1549
1550 #[cfg(any(feature = "deploy", feature = "sim"))]
1551 use crate::compile::builder::FlowBuilder;
1552 #[cfg(feature = "deploy")]
1553 use crate::live_collections::stream::ExactlyOnce;
1554 #[cfg(any(feature = "deploy", feature = "sim"))]
1555 use crate::location::Location;
1556 #[cfg(any(feature = "deploy", feature = "sim"))]
1557 use crate::nondet::nondet;
1558
1559 #[cfg(feature = "deploy")]
1560 #[tokio::test]
1561 async fn tick_cycle_cardinality() {
1562 let mut deployment = Deployment::new();
1563
1564 let mut flow = FlowBuilder::new();
1565 let node = flow.process::<()>();
1566 let external = flow.external::<()>();
1567
1568 let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1569
1570 let node_tick = node.tick();
1571 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1572 let counts = singleton
1573 .clone()
1574 .into_stream()
1575 .count()
1576 .filter_if(
1577 input
1578 .batch(&node_tick, nondet!(/** testing */))
1579 .first()
1580 .is_some(),
1581 )
1582 .all_ticks()
1583 .send_bincode_external(&external);
1584 complete_cycle.complete_next_tick(singleton);
1585
1586 let nodes = flow
1587 .with_process(&node, deployment.Localhost())
1588 .with_external(&external, deployment.Localhost())
1589 .deploy(&mut deployment);
1590
1591 deployment.deploy().await.unwrap();
1592
1593 let mut tick_trigger = nodes.connect(input_send).await;
1594 let mut external_out = nodes.connect(counts).await;
1595
1596 deployment.start().await.unwrap();
1597
1598 tick_trigger.send(()).await.unwrap();
1599
1600 assert_eq!(external_out.next().await.unwrap(), 1);
1601
1602 tick_trigger.send(()).await.unwrap();
1603
1604 assert_eq!(external_out.next().await.unwrap(), 1);
1605 }
1606
1607 #[cfg(feature = "sim")]
1608 #[test]
1609 #[should_panic]
1610 fn sim_fold_intermediate_states() {
1611 let mut flow = FlowBuilder::new();
1612 let node = flow.process::<()>();
1613
1614 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1615 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1616
1617 let tick = node.tick();
1618 let batch = folded.snapshot(&tick, nondet!(/** test */));
1619 let out_recv = batch.all_ticks().sim_output();
1620
1621 flow.sim().exhaustive(async || {
1622 assert_eq!(out_recv.next().await.unwrap(), 10);
1623 });
1624 }
1625
1626 #[cfg(feature = "sim")]
1627 #[test]
1628 fn sim_fold_intermediate_state_count() {
1629 let mut flow = FlowBuilder::new();
1630 let node = flow.process::<()>();
1631
1632 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1633 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1634
1635 let tick = node.tick();
1636 let batch = folded.snapshot(&tick, nondet!(/** test */));
1637 let out_recv = batch.all_ticks().sim_output();
1638
1639 let instance_count = flow.sim().exhaustive(async || {
1640 let out = out_recv.collect::<Vec<_>>().await;
1641 assert_eq!(out.last(), Some(&10));
1642 });
1643
1644 assert_eq!(
1645 instance_count,
1646 16 // 2^4 possible subsets of intermediates (including initial state)
1647 )
1648 }
1649
1650 #[cfg(feature = "sim")]
1651 #[test]
1652 fn sim_fold_no_repeat_initial() {
1653 // check that we don't repeat the initial state of the fold in autonomous decisions
1654
1655 let mut flow = FlowBuilder::new();
1656 let node = flow.process::<()>();
1657
1658 let (in_port, input) = node.sim_input();
1659 let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1660
1661 let tick = node.tick();
1662 let batch = folded.snapshot(&tick, nondet!(/** test */));
1663 let out_recv = batch.all_ticks().sim_output();
1664
1665 flow.sim().exhaustive(async || {
1666 assert_eq!(out_recv.next().await.unwrap(), 0);
1667
1668 in_port.send(123);
1669
1670 assert_eq!(out_recv.next().await.unwrap(), 123);
1671 });
1672 }
1673
1674 #[cfg(feature = "sim")]
1675 #[test]
1676 #[should_panic]
1677 fn sim_fold_repeats_snapshots() {
1678 // when the tick is driven by a snapshot AND something else, the snapshot can
1679 // "stutter" and repeat the same state multiple times
1680
1681 let mut flow = FlowBuilder::new();
1682 let node = flow.process::<()>();
1683
1684 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1685 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1686
1687 let tick = node.tick();
1688 let batch = source
1689 .batch(&tick, nondet!(/** test */))
1690 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1691 let out_recv = batch.all_ticks().sim_output();
1692
1693 flow.sim().exhaustive(async || {
1694 if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1695 {
1696 panic!("repeated snapshot");
1697 }
1698 });
1699 }
1700
1701 #[cfg(feature = "sim")]
1702 #[test]
1703 fn sim_fold_repeats_snapshots_count() {
1704 // check the number of instances
1705 let mut flow = FlowBuilder::new();
1706 let node = flow.process::<()>();
1707
1708 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1709 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1710
1711 let tick = node.tick();
1712 let batch = source
1713 .batch(&tick, nondet!(/** test */))
1714 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1715 let out_recv = batch.all_ticks().sim_output();
1716
1717 let count = flow.sim().exhaustive(async || {
1718 let _ = out_recv.collect::<Vec<_>>().await;
1719 });
1720
1721 assert_eq!(count, 52);
1722 // don't have a combinatorial explanation for this number yet, but checked via logs
1723 }
1724
1725 #[cfg(feature = "sim")]
1726 #[test]
1727 fn sim_top_level_singleton_exhaustive() {
1728 // ensures that top-level singletons have only one snapshot
1729 let mut flow = FlowBuilder::new();
1730 let node = flow.process::<()>();
1731
1732 let singleton = node.singleton(q!(1));
1733 let tick = node.tick();
1734 let batch = singleton.snapshot(&tick, nondet!(/** test */));
1735 let out_recv = batch.all_ticks().sim_output();
1736
1737 let count = flow.sim().exhaustive(async || {
1738 let _ = out_recv.collect::<Vec<_>>().await;
1739 });
1740
1741 assert_eq!(count, 1);
1742 }
1743
1744 #[cfg(feature = "sim")]
1745 #[test]
1746 fn sim_top_level_singleton_join_count() {
1747 // if a tick consumes a static snapshot and a stream batch, only the batch require space
1748 // exploration
1749
1750 let mut flow = FlowBuilder::new();
1751 let node = flow.process::<()>();
1752
1753 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1754 let tick = node.tick();
1755 let batch = source_iter
1756 .batch(&tick, nondet!(/** test */))
1757 .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1758 let out_recv = batch.all_ticks().sim_output();
1759
1760 let instance_count = flow.sim().exhaustive(async || {
1761 let _ = out_recv.collect::<Vec<_>>().await;
1762 });
1763
1764 assert_eq!(
1765 instance_count,
1766 16 // 2^4 ways to split up (including a possibly empty first batch)
1767 )
1768 }
1769
1770 #[cfg(feature = "sim")]
1771 #[test]
1772 fn top_level_singleton_into_stream_no_replay() {
1773 let mut flow = FlowBuilder::new();
1774 let node = flow.process::<()>();
1775
1776 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1777 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1778
1779 let out_recv = folded.into_stream().sim_output();
1780
1781 flow.sim().exhaustive(async || {
1782 out_recv.assert_yields_only([10]).await;
1783 });
1784 }
1785
1786 #[cfg(feature = "sim")]
1787 #[test]
1788 fn inside_tick_singleton_zip() {
1789 use crate::live_collections::Stream;
1790 use crate::live_collections::sliced::sliced;
1791
1792 let mut flow = FlowBuilder::new();
1793 let node = flow.process::<()>();
1794
1795 let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1796 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1797
1798 let out_recv = sliced! {
1799 let v = use(folded, nondet!(/** test */));
1800 v.clone().zip(v).into_stream()
1801 }
1802 .sim_output();
1803
1804 let count = flow.sim().exhaustive(async || {
1805 let out = out_recv.collect::<Vec<_>>().await;
1806 assert_eq!(out.last(), Some(&(3, 3)));
1807 });
1808
1809 assert_eq!(count, 4);
1810 }
1811}