Skip to main content

hydro_lang/live_collections/
optional.rs

1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19use crate::live_collections::singleton::SingletonBound;
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::{DynLocation, LocationId};
22use crate::location::tick::{Atomic, DeferTick, NoAtomic};
23use crate::location::{Location, Tick, check_matching_location};
24use crate::nondet::{NonDet, nondet};
25use crate::prelude::KeyedSingleton;
26
27/// A *nullable* Rust value that can asynchronously change over time.
28///
29/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
30/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
31/// asynchronously change over time, including becoming present of uninhabited.
32///
33/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
34/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
35///
36/// Type Parameters:
37/// - `Type`: the type of the value in this optional (when it is not null)
38/// - `Loc`: the [`Location`] where the optional is materialized
39/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
40pub struct Optional<Type, Loc, Bound: Boundedness> {
41    pub(crate) location: Loc,
42    pub(crate) ir_node: RefCell<HydroNode>,
43    pub(crate) flow_state: FlowState,
44
45    _phantom: PhantomData<(Type, Loc, Bound)>,
46}
47
48impl<T, L, B: Boundedness> Drop for Optional<T, L, B> {
49    fn drop(&mut self) {
50        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
51        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
52            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
53                input: Box::new(ir_node),
54                op_metadata: HydroIrOpMetadata::new(),
55            });
56        }
57    }
58}
59
60impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
61where
62    T: Clone,
63    L: Location<'a>,
64{
65    fn from(value: Optional<T, L, Bounded>) -> Self {
66        let tick = value.location().tick();
67        value.clone_into_tick(&tick).latest()
68    }
69}
70
71impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
72where
73    L: Location<'a>,
74{
75    fn defer_tick(self) -> Self {
76        Optional::defer_tick(self)
77    }
78}
79
80impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
81where
82    L: Location<'a>,
83{
84    type Location = Tick<L>;
85
86    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
87        Optional::new(
88            location.clone(),
89            HydroNode::CycleSource {
90                cycle_id,
91                metadata: location.new_node_metadata(Self::collection_kind()),
92            },
93        )
94    }
95}
96
97impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
98where
99    L: Location<'a>,
100{
101    type Location = Tick<L>;
102
103    fn location(&self) -> &Self::Location {
104        self.location()
105    }
106
107    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
108        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
109            location.clone(),
110            HydroNode::DeferTick {
111                input: Box::new(HydroNode::CycleSource {
112                    cycle_id,
113                    metadata: location.new_node_metadata(Self::collection_kind()),
114                }),
115                metadata: location
116                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
117            },
118        );
119
120        from_previous_tick.or(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
121    }
122}
123
124impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
125where
126    L: Location<'a>,
127{
128    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
129        assert_eq!(
130            Location::id(&self.location),
131            expected_location,
132            "locations do not match"
133        );
134        self.location
135            .flow_state()
136            .borrow_mut()
137            .push_root(HydroRoot::CycleSink {
138                cycle_id,
139                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
140                op_metadata: HydroIrOpMetadata::new(),
141            });
142    }
143}
144
145impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
146where
147    L: Location<'a>,
148{
149    type Location = L;
150
151    fn create_source(cycle_id: CycleId, location: L) -> Self {
152        Optional::new(
153            location.clone(),
154            HydroNode::CycleSource {
155                cycle_id,
156                metadata: location.new_node_metadata(Self::collection_kind()),
157            },
158        )
159    }
160}
161
162impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
163where
164    L: Location<'a>,
165{
166    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
167        assert_eq!(
168            Location::id(&self.location),
169            expected_location,
170            "locations do not match"
171        );
172        self.location
173            .flow_state()
174            .borrow_mut()
175            .push_root(HydroRoot::CycleSink {
176                cycle_id,
177                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
178                op_metadata: HydroIrOpMetadata::new(),
179            });
180    }
181}
182
183impl<'a, T, L, B: SingletonBound> From<Singleton<T, L, B>> for Optional<T, L, B::UnderlyingBound>
184where
185    L: Location<'a>,
186{
187    fn from(singleton: Singleton<T, L, B>) -> Self {
188        Optional::new(
189            singleton.location.clone(),
190            HydroNode::Cast {
191                inner: Box::new(singleton.ir_node.replace(HydroNode::Placeholder)),
192                metadata: singleton
193                    .location
194                    .new_node_metadata(Self::collection_kind()),
195            },
196        )
197    }
198}
199
200#[cfg(stageleft_runtime)]
201pub(super) fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
202    me: Optional<T, L, B>,
203    other: Optional<O, L, B>,
204) -> Optional<(T, O), L, B> {
205    check_matching_location(&me.location, &other.location);
206
207    Optional::new(
208        me.location.clone(),
209        HydroNode::CrossSingleton {
210            left: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
211            right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
212            metadata: me
213                .location
214                .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
215        },
216    )
217}
218
219#[cfg(stageleft_runtime)]
220fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
221    me: Optional<T, L, B>,
222    other: Optional<T, L, B>,
223) -> Optional<T, L, B> {
224    check_matching_location(&me.location, &other.location);
225
226    Optional::new(
227        me.location.clone(),
228        HydroNode::ChainFirst {
229            first: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
230            second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
231            metadata: me
232                .location
233                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
234        },
235    )
236}
237
238impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
239where
240    T: Clone,
241    L: Location<'a>,
242{
243    fn clone(&self) -> Self {
244        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
245            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
246            *self.ir_node.borrow_mut() = HydroNode::Tee {
247                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
248                metadata: self.location.new_node_metadata(Self::collection_kind()),
249            };
250        }
251
252        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
253            Optional {
254                location: self.location.clone(),
255                flow_state: self.flow_state.clone(),
256                ir_node: HydroNode::Tee {
257                    inner: SharedNode(inner.0.clone()),
258                    metadata: metadata.clone(),
259                }
260                .into(),
261                _phantom: PhantomData,
262            }
263        } else {
264            unreachable!()
265        }
266    }
267}
268
269impl<'a, T, L, B: Boundedness> Optional<T, L, B>
270where
271    L: Location<'a>,
272{
273    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
274        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
275        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
276        let flow_state = location.flow_state().clone();
277        Optional {
278            location,
279            flow_state,
280            ir_node: RefCell::new(ir_node),
281            _phantom: PhantomData,
282        }
283    }
284
285    pub(crate) fn collection_kind() -> CollectionKind {
286        CollectionKind::Optional {
287            bound: B::BOUND_KIND,
288            element_type: stageleft::quote_type::<T>().into(),
289        }
290    }
291
292    /// Returns the [`Location`] where this optional is being materialized.
293    pub fn location(&self) -> &L {
294        &self.location
295    }
296
297    /// Weakens the consistency of this live collection to not guarantee any consistency across
298    /// cluster members (if this collection is on a cluster).
299    pub fn weaken_consistency(self) -> Optional<T, L::NoConsistency, B>
300    where
301        L: Location<'a>,
302    {
303        if L::consistency()
304            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
305        {
306            // already no consistency
307            Optional::new(
308                self.location.drop_consistency(),
309                self.ir_node.replace(HydroNode::Placeholder),
310            )
311        } else {
312            Optional::new(
313                self.location.drop_consistency(),
314                HydroNode::Cast {
315                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
316                    metadata: self
317                        .location
318                        .clone()
319                        .drop_consistency()
320                        .new_node_metadata(Optional::<T, L::NoConsistency, B>::collection_kind()),
321                },
322            )
323        }
324    }
325
326    /// Casts this live collection to have the consistency guarantees specified in the given
327    /// location type parameter. The developer must ensure that the strengthened consistency
328    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
329    pub fn assert_has_consistency_of<L2: Location<'a, NoConsistency = L::NoConsistency>>(
330        self,
331        _proof: impl crate::properties::ConsistencyProof,
332    ) -> Optional<T, L2, B>
333    where
334        L: Location<'a>,
335    {
336        if L::consistency() == L2::consistency() {
337            Optional::new(
338                self.location.with_consistency_of(),
339                self.ir_node.replace(HydroNode::Placeholder),
340            )
341        } else {
342            Optional::new(
343                self.location.with_consistency_of(),
344                HydroNode::AssertIsConsistent {
345                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
346                    metadata: self
347                        .location
348                        .clone()
349                        .with_consistency_of::<L2>()
350                        .new_node_metadata(Optional::<T, L2, B>::collection_kind()),
351                },
352            )
353        }
354    }
355
356    /// Transforms the optional value by applying a function `f` to it,
357    /// continuously as the input is updated.
358    ///
359    /// Whenever the optional is empty, the output optional is also empty.
360    ///
361    /// # Example
362    /// ```rust
363    /// # #[cfg(feature = "deploy")] {
364    /// # use hydro_lang::prelude::*;
365    /// # use futures::StreamExt;
366    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
367    /// let tick = process.tick();
368    /// let optional = tick.optional_first_tick(q!(1));
369    /// optional.map(q!(|v| v + 1)).all_ticks()
370    /// # }, |mut stream| async move {
371    /// // 2
372    /// # assert_eq!(stream.next().await.unwrap(), 2);
373    /// # }));
374    /// # }
375    /// ```
376    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
377    where
378        F: Fn(T) -> U + 'a,
379    {
380        let f = f.splice_fn1_ctx(&self.location).into();
381        Optional::new(
382            self.location.clone(),
383            HydroNode::Map {
384                f,
385                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
386                metadata: self
387                    .location
388                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
389            },
390        )
391    }
392
393    /// Transforms the optional value by applying a function `f` to it and then flattening
394    /// the result into a stream, preserving the order of elements.
395    ///
396    /// If the optional is empty, the output stream is also empty. If the optional contains
397    /// a value, `f` is applied to produce an iterator, and all items from that iterator
398    /// are emitted in the output stream in deterministic order.
399    ///
400    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
401    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
402    /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
403    ///
404    /// # Example
405    /// ```rust
406    /// # #[cfg(feature = "deploy")] {
407    /// # use hydro_lang::prelude::*;
408    /// # use futures::StreamExt;
409    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
410    /// let tick = process.tick();
411    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
412    /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
413    /// # }, |mut stream| async move {
414    /// // 1, 2, 3
415    /// # for w in vec![1, 2, 3] {
416    /// #     assert_eq!(stream.next().await.unwrap(), w);
417    /// # }
418    /// # }));
419    /// # }
420    /// ```
421    pub fn flat_map_ordered<U, I, F>(
422        self,
423        f: impl IntoQuotedMut<'a, F, L>,
424    ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
425    where
426        B: IsBounded,
427        I: IntoIterator<Item = U>,
428        F: Fn(T) -> I + 'a,
429    {
430        self.into_stream().flat_map_ordered(f)
431    }
432
433    /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
434    /// for the output type `I` to produce items in any order.
435    ///
436    /// If the optional is empty, the output stream is also empty. If the optional contains
437    /// a value, `f` is applied to produce an iterator, and all items from that iterator
438    /// are emitted in the output stream in non-deterministic order.
439    ///
440    /// # Example
441    /// ```rust
442    /// # #[cfg(feature = "deploy")] {
443    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
444    /// # use futures::StreamExt;
445    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
446    /// let tick = process.tick();
447    /// let optional = tick.optional_first_tick(q!(
448    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
449    /// ));
450    /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
451    /// # }, |mut stream| async move {
452    /// // 1, 2, 3, but in no particular order
453    /// # let mut results = Vec::new();
454    /// # for _ in 0..3 {
455    /// #     results.push(stream.next().await.unwrap());
456    /// # }
457    /// # results.sort();
458    /// # assert_eq!(results, vec![1, 2, 3]);
459    /// # }));
460    /// # }
461    /// ```
462    pub fn flat_map_unordered<U, I, F>(
463        self,
464        f: impl IntoQuotedMut<'a, F, L>,
465    ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
466    where
467        B: IsBounded,
468        I: IntoIterator<Item = U>,
469        F: Fn(T) -> I + 'a,
470    {
471        self.into_stream().flat_map_unordered(f)
472    }
473
474    /// Flattens the optional value into a stream, preserving the order of elements.
475    ///
476    /// If the optional is empty, the output stream is also empty. If the optional contains
477    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
478    /// in the output stream in deterministic order.
479    ///
480    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
481    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
482    /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
483    ///
484    /// # Example
485    /// ```rust
486    /// # #[cfg(feature = "deploy")] {
487    /// # use hydro_lang::prelude::*;
488    /// # use futures::StreamExt;
489    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
490    /// let tick = process.tick();
491    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
492    /// optional.flatten_ordered().all_ticks()
493    /// # }, |mut stream| async move {
494    /// // 1, 2, 3
495    /// # for w in vec![1, 2, 3] {
496    /// #     assert_eq!(stream.next().await.unwrap(), w);
497    /// # }
498    /// # }));
499    /// # }
500    /// ```
501    pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
502    where
503        B: IsBounded,
504        T: IntoIterator<Item = U>,
505    {
506        self.flat_map_ordered(q!(|v| v))
507    }
508
509    /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
510    /// for the element type `T` to produce items in any order.
511    ///
512    /// If the optional is empty, the output stream is also empty. If the optional contains
513    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
514    /// in the output stream in non-deterministic order.
515    ///
516    /// # Example
517    /// ```rust
518    /// # #[cfg(feature = "deploy")] {
519    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
520    /// # use futures::StreamExt;
521    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
522    /// let tick = process.tick();
523    /// let optional = tick.optional_first_tick(q!(
524    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
525    /// ));
526    /// optional.flatten_unordered().all_ticks()
527    /// # }, |mut stream| async move {
528    /// // 1, 2, 3, but in no particular order
529    /// # let mut results = Vec::new();
530    /// # for _ in 0..3 {
531    /// #     results.push(stream.next().await.unwrap());
532    /// # }
533    /// # results.sort();
534    /// # assert_eq!(results, vec![1, 2, 3]);
535    /// # }));
536    /// # }
537    /// ```
538    pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
539    where
540        B: IsBounded,
541        T: IntoIterator<Item = U>,
542    {
543        self.flat_map_unordered(q!(|v| v))
544    }
545
546    /// Creates an optional containing only the value if it satisfies a predicate `f`.
547    ///
548    /// If the optional is empty, the output optional is also empty. If the optional contains
549    /// a value and the predicate returns `true`, the output optional contains the same value.
550    /// If the predicate returns `false`, the output optional is empty.
551    ///
552    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
553    /// not modify or take ownership of the value. If you need to modify the value while filtering
554    /// use [`Optional::filter_map`] instead.
555    ///
556    /// # Example
557    /// ```rust
558    /// # #[cfg(feature = "deploy")] {
559    /// # use hydro_lang::prelude::*;
560    /// # use futures::StreamExt;
561    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
562    /// let tick = process.tick();
563    /// let optional = tick.optional_first_tick(q!(5));
564    /// optional.filter(q!(|&x| x > 3)).all_ticks()
565    /// # }, |mut stream| async move {
566    /// // 5
567    /// # assert_eq!(stream.next().await.unwrap(), 5);
568    /// # }));
569    /// # }
570    /// ```
571    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
572    where
573        F: Fn(&T) -> bool + 'a,
574    {
575        let f = f.splice_fn1_borrow_ctx(&self.location).into();
576        Optional::new(
577            self.location.clone(),
578            HydroNode::Filter {
579                f,
580                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
581                metadata: self.location.new_node_metadata(Self::collection_kind()),
582            },
583        )
584    }
585
586    /// An operator that both filters and maps. It yields only the value if the supplied
587    /// closure `f` returns `Some(value)`.
588    ///
589    /// If the optional is empty, the output optional is also empty. If the optional contains
590    /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
591    /// If the closure returns `None`, the output optional is empty.
592    ///
593    /// # Example
594    /// ```rust
595    /// # #[cfg(feature = "deploy")] {
596    /// # use hydro_lang::prelude::*;
597    /// # use futures::StreamExt;
598    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
599    /// let tick = process.tick();
600    /// let optional = tick.optional_first_tick(q!("42"));
601    /// optional
602    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
603    ///     .all_ticks()
604    /// # }, |mut stream| async move {
605    /// // 42
606    /// # assert_eq!(stream.next().await.unwrap(), 42);
607    /// # }));
608    /// # }
609    /// ```
610    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
611    where
612        F: Fn(T) -> Option<U> + 'a,
613    {
614        let f = f.splice_fn1_ctx(&self.location).into();
615        Optional::new(
616            self.location.clone(),
617            HydroNode::FilterMap {
618                f,
619                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
620                metadata: self
621                    .location
622                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
623            },
624        )
625    }
626
627    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
628    ///
629    /// If the other value is a [`Optional`], the output will be non-null only if the argument is
630    /// non-null. This is useful for combining several pieces of state together.
631    ///
632    /// # Example
633    /// ```rust
634    /// # #[cfg(feature = "deploy")] {
635    /// # use hydro_lang::prelude::*;
636    /// # use futures::StreamExt;
637    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
638    /// let tick = process.tick();
639    /// let numbers = process
640    ///   .source_iter(q!(vec![123, 456, 789]))
641    ///   .batch(&tick, nondet!(/** test */));
642    /// let min = numbers.clone().min(); // Optional
643    /// let max = numbers.max(); // Optional
644    /// min.zip(max).all_ticks()
645    /// # }, |mut stream| async move {
646    /// // [(123, 789)]
647    /// # for w in vec![(123, 789)] {
648    /// #     assert_eq!(stream.next().await.unwrap(), w);
649    /// # }
650    /// # }));
651    /// # }
652    /// ```
653    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
654    where
655        B: IsBounded,
656    {
657        let other: Optional<O, L, B> = other.into();
658        check_matching_location(&self.location, &other.location);
659
660        if L::is_top_level()
661            && let Some(tick) = self.location.try_tick()
662        {
663            let self_location = self.location().clone();
664            let out = zip_inside_tick(
665                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
666                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
667            )
668            .latest();
669
670            Optional::new(self_location, out.ir_node.replace(HydroNode::Placeholder))
671        } else {
672            zip_inside_tick(self, other)
673        }
674    }
675
676    /// Passes through `self` when it has a value, otherwise passes through `other`.
677    ///
678    /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
679    /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
680    /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
681    ///
682    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
683    /// of the inputs change (including to/from null states).
684    ///
685    /// # Example
686    /// ```rust
687    /// # #[cfg(feature = "deploy")] {
688    /// # use hydro_lang::prelude::*;
689    /// # use futures::StreamExt;
690    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
691    /// let tick = process.tick();
692    /// // ticks are lazy by default, forces the second tick to run
693    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
694    ///
695    /// let some_first_tick = tick.optional_first_tick(q!(123));
696    /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
697    /// some_first_tick.or(some_second_tick).all_ticks()
698    /// # }, |mut stream| async move {
699    /// // [123 /* first tick */, 456 /* second tick */]
700    /// # for w in vec![123, 456] {
701    /// #     assert_eq!(stream.next().await.unwrap(), w);
702    /// # }
703    /// # }));
704    /// # }
705    /// ```
706    pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
707        check_matching_location(&self.location, &other.location);
708
709        if L::is_top_level()
710            && !B::BOUNDED // only if unbounded we need to use a tick
711            && let Some(tick) = self.location.try_tick()
712        {
713            let self_location = self.location().clone();
714            let out = or_inside_tick(
715                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
716                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
717            )
718            .latest();
719
720            Optional::new(self_location, out.ir_node.replace(HydroNode::Placeholder))
721        } else {
722            Optional::new(
723                self.location.clone(),
724                HydroNode::ChainFirst {
725                    first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
726                    second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
727                    metadata: self.location.new_node_metadata(Self::collection_kind()),
728                },
729            )
730        }
731    }
732
733    /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
734    ///
735    /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
736    /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
737    ///
738    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
739    /// of the inputs change (including to/from null states).
740    ///
741    /// # Example
742    /// ```rust
743    /// # #[cfg(feature = "deploy")] {
744    /// # use hydro_lang::prelude::*;
745    /// # use futures::StreamExt;
746    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
747    /// let tick = process.tick();
748    /// // ticks are lazy by default, forces the later ticks to run
749    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
750    ///
751    /// let some_first_tick = tick.optional_first_tick(q!(123));
752    /// some_first_tick
753    ///     .unwrap_or(tick.singleton(q!(456)))
754    ///     .all_ticks()
755    /// # }, |mut stream| async move {
756    /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
757    /// # for w in vec![123, 456, 456, 456] {
758    /// #     assert_eq!(stream.next().await.unwrap(), w);
759    /// # }
760    /// # }));
761    /// # }
762    /// ```
763    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
764        let res_option = self.or(other.into());
765        Singleton::new(
766            res_option.location.clone(),
767            HydroNode::Cast {
768                inner: Box::new(res_option.ir_node.replace(HydroNode::Placeholder)),
769                metadata: res_option
770                    .location
771                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
772            },
773        )
774    }
775
776    /// Gets the contents of `self` when it has a value, otherwise returns the default value of `T`.
777    ///
778    /// Like [`Option::unwrap_or_default`], this is helpful for defining a fallback for an
779    /// [`Optional`] when the default value of the type is a suitable fallback.
780    ///
781    /// # Example
782    /// ```rust
783    /// # #[cfg(feature = "deploy")] {
784    /// # use hydro_lang::prelude::*;
785    /// # use futures::StreamExt;
786    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
787    /// let tick = process.tick();
788    /// // ticks are lazy by default, forces the later ticks to run
789    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
790    ///
791    /// let some_first_tick = tick.optional_first_tick(q!(123i32));
792    /// some_first_tick.unwrap_or_default().all_ticks()
793    /// # }, |mut stream| async move {
794    /// // [123 /* first tick */, 0 /* second tick */, 0 /* third tick */, 0, ...]
795    /// # for w in vec![123, 0, 0, 0] {
796    /// #     assert_eq!(stream.next().await.unwrap(), w);
797    /// # }
798    /// # }));
799    /// # }
800    /// ```
801    pub fn unwrap_or_default(self) -> Singleton<T, L, B>
802    where
803        T: Default + Clone,
804    {
805        self.into_singleton().map(q!(|v| v.unwrap_or_default()))
806    }
807
808    /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
809    ///
810    /// Useful for writing custom Rust code that needs to interact with both the null and non-null
811    /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
812    /// so that Hydro can skip any computation on null values.
813    ///
814    /// # Example
815    /// ```rust
816    /// # #[cfg(feature = "deploy")] {
817    /// # use hydro_lang::prelude::*;
818    /// # use futures::StreamExt;
819    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
820    /// let tick = process.tick();
821    /// // ticks are lazy by default, forces the later ticks to run
822    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
823    ///
824    /// let some_first_tick = tick.optional_first_tick(q!(123));
825    /// some_first_tick.into_singleton().all_ticks()
826    /// # }, |mut stream| async move {
827    /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
828    /// # for w in vec![Some(123), None, None, None] {
829    /// #     assert_eq!(stream.next().await.unwrap(), w);
830    /// # }
831    /// # }));
832    /// # }
833    /// ```
834    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
835    where
836        T: Clone,
837    {
838        let none: syn::Expr = parse_quote!(::std::option::Option::None);
839
840        let none_singleton = Singleton::new(
841            self.location.clone(),
842            HydroNode::SingletonSource {
843                value: none.into(),
844                first_tick_only: false,
845                metadata: self
846                    .location
847                    .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
848            },
849        );
850
851        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
852    }
853
854    /// Returns a [`Singleton`] containing `true` if this optional has a value, `false` otherwise.
855    ///
856    /// # Example
857    /// ```rust
858    /// # #[cfg(feature = "deploy")] {
859    /// # use hydro_lang::prelude::*;
860    /// # use futures::StreamExt;
861    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
862    /// let tick = process.tick();
863    /// // ticks are lazy by default, forces the second tick to run
864    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
865    ///
866    /// let some_first_tick = tick.optional_first_tick(q!(42));
867    /// some_first_tick.is_some().all_ticks()
868    /// # }, |mut stream| async move {
869    /// // [true /* first tick */, false /* second tick */, ...]
870    /// # for w in vec![true, false] {
871    /// #     assert_eq!(stream.next().await.unwrap(), w);
872    /// # }
873    /// # }));
874    /// # }
875    /// ```
876    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
877    pub fn is_some(self) -> Singleton<bool, L, B> {
878        self.map(q!(|_| ()))
879            .into_singleton()
880            .map(q!(|o| o.is_some()))
881    }
882
883    /// Returns a [`Singleton`] containing `true` if this optional is null, `false` otherwise.
884    ///
885    /// # Example
886    /// ```rust
887    /// # #[cfg(feature = "deploy")] {
888    /// # use hydro_lang::prelude::*;
889    /// # use futures::StreamExt;
890    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
891    /// let tick = process.tick();
892    /// // ticks are lazy by default, forces the second tick to run
893    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
894    ///
895    /// let some_first_tick = tick.optional_first_tick(q!(42));
896    /// some_first_tick.is_none().all_ticks()
897    /// # }, |mut stream| async move {
898    /// // [false /* first tick */, true /* second tick */, ...]
899    /// # for w in vec![false, true] {
900    /// #     assert_eq!(stream.next().await.unwrap(), w);
901    /// # }
902    /// # }));
903    /// # }
904    /// ```
905    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
906    pub fn is_none(self) -> Singleton<bool, L, B> {
907        self.map(q!(|_| ()))
908            .into_singleton()
909            .map(q!(|o| o.is_none()))
910    }
911
912    /// Returns a [`Singleton`] containing `true` if both optionals are non-null and their
913    /// values are equal, `false` otherwise (including when either is null).
914    ///
915    /// # Example
916    /// ```rust
917    /// # #[cfg(feature = "deploy")] {
918    /// # use hydro_lang::prelude::*;
919    /// # use futures::StreamExt;
920    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
921    /// let tick = process.tick();
922    /// // ticks are lazy by default, forces the second tick to run
923    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
924    ///
925    /// let a = tick.optional_first_tick(q!(5)); // Some(5), None
926    /// let b = tick.optional_first_tick(q!(5)); // Some(5), None
927    /// a.is_some_and_equals(b).all_ticks()
928    /// # }, |mut stream| async move {
929    /// // [true, false]
930    /// # for w in vec![true, false] {
931    /// #     assert_eq!(stream.next().await.unwrap(), w);
932    /// # }
933    /// # }));
934    /// # }
935    /// ```
936    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
937    pub fn is_some_and_equals(self, other: Optional<T, L, B>) -> Singleton<bool, L, B>
938    where
939        T: PartialEq + Clone,
940        B: IsBounded,
941    {
942        self.into_singleton()
943            .zip(other.into_singleton())
944            .map(q!(|(a, b)| a.is_some() && a == b))
945    }
946
947    /// An operator which allows you to "name" a `HydroNode`.
948    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
949    pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
950        {
951            let mut node = self.ir_node.borrow_mut();
952            let metadata = node.metadata_mut();
953            metadata.tag = Some(name.to_owned());
954        }
955        self
956    }
957
958    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
959    /// implies that `B == Bounded`.
960    pub fn make_bounded(self) -> Optional<T, L, Bounded>
961    where
962        B: IsBounded,
963    {
964        Optional::new(
965            self.location.clone(),
966            self.ir_node.replace(HydroNode::Placeholder),
967        )
968    }
969
970    /// Clones this bounded optional into a tick, returning a optional that has the
971    /// same value as the outer optional. Because the outer optional is bounded, this
972    /// is deterministic because there is only a single immutable version.
973    pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
974    where
975        B: IsBounded,
976        T: Clone,
977    {
978        // TODO(shadaj): avoid printing simulator logs for this snapshot
979        let inner = self.snapshot(
980            tick,
981            nondet!(/** bounded top-level optional so deterministic */),
982        );
983        Optional::new(tick.clone(), inner.ir_node.replace(HydroNode::Placeholder))
984    }
985
986    /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
987    /// non-null. Otherwise, the stream is empty.
988    ///
989    /// # Example
990    /// ```rust
991    /// # #[cfg(feature = "deploy")] {
992    /// # use hydro_lang::prelude::*;
993    /// # use futures::StreamExt;
994    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
995    /// # let tick = process.tick();
996    /// # // ticks are lazy by default, forces the second tick to run
997    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
998    /// # let batch_first_tick = process
999    /// #   .source_iter(q!(vec![]))
1000    /// #   .batch(&tick, nondet!(/** test */));
1001    /// # let batch_second_tick = process
1002    /// #   .source_iter(q!(vec![123, 456]))
1003    /// #   .batch(&tick, nondet!(/** test */))
1004    /// #   .defer_tick(); // appears on the second tick
1005    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1006    /// input_batch // first tick: [], second tick: [123, 456]
1007    ///     .clone()
1008    ///     .max()
1009    ///     .into_stream()
1010    ///     .chain(input_batch)
1011    ///     .all_ticks()
1012    /// # }, |mut stream| async move {
1013    /// // [456, 123, 456]
1014    /// # for w in vec![456, 123, 456] {
1015    /// #     assert_eq!(stream.next().await.unwrap(), w);
1016    /// # }
1017    /// # }));
1018    /// # }
1019    /// ```
1020    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1021    where
1022        B: IsBounded,
1023    {
1024        Stream::new(
1025            self.location.clone(),
1026            HydroNode::Cast {
1027                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1028                metadata: self.location.new_node_metadata(Stream::<
1029                    T,
1030                    Tick<L>,
1031                    Bounded,
1032                    TotalOrder,
1033                    ExactlyOnce,
1034                >::collection_kind()),
1035            },
1036        )
1037    }
1038
1039    /// Filters this optional, passing through the value if the boolean signal is `true`,
1040    /// otherwise the output is null.
1041    ///
1042    /// # Example
1043    /// ```rust
1044    /// # #[cfg(feature = "deploy")] {
1045    /// # use hydro_lang::prelude::*;
1046    /// # use futures::StreamExt;
1047    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1048    /// let tick = process.tick();
1049    /// // ticks are lazy by default, forces the second tick to run
1050    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1051    ///
1052    /// let some_first_tick = tick.optional_first_tick(q!(()));
1053    /// let signal = some_first_tick.is_some(); // true on first tick, false on second
1054    /// let batch_first_tick = process
1055    ///   .source_iter(q!(vec![456]))
1056    ///   .batch(&tick, nondet!(/** test */));
1057    /// let batch_second_tick = process
1058    ///   .source_iter(q!(vec![789]))
1059    ///   .batch(&tick, nondet!(/** test */))
1060    ///   .defer_tick();
1061    /// batch_first_tick.chain(batch_second_tick).first()
1062    ///   .filter_if(signal)
1063    ///   .unwrap_or(tick.singleton(q!(0)))
1064    ///   .all_ticks()
1065    /// # }, |mut stream| async move {
1066    /// // [456, 0]
1067    /// # for w in vec![456, 0] {
1068    /// #     assert_eq!(stream.next().await.unwrap(), w);
1069    /// # }
1070    /// # }));
1071    /// # }
1072    /// ```
1073    pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
1074    where
1075        B: IsBounded,
1076    {
1077        self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
1078    }
1079
1080    /// Filters this optional, passing through the optional value if it is non-null **and** the
1081    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
1082    ///
1083    /// Useful for conditionally processing, such as only emitting an optional's value outside
1084    /// a tick if some other condition is satisfied.
1085    ///
1086    /// # Example
1087    /// ```rust
1088    /// # #[cfg(feature = "deploy")] {
1089    /// # use hydro_lang::prelude::*;
1090    /// # use futures::StreamExt;
1091    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1092    /// let tick = process.tick();
1093    /// // ticks are lazy by default, forces the second tick to run
1094    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1095    ///
1096    /// let batch_first_tick = process
1097    ///   .source_iter(q!(vec![]))
1098    ///   .batch(&tick, nondet!(/** test */));
1099    /// let batch_second_tick = process
1100    ///   .source_iter(q!(vec![456]))
1101    ///   .batch(&tick, nondet!(/** test */))
1102    ///   .defer_tick(); // appears on the second tick
1103    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1104    /// batch_first_tick.chain(batch_second_tick).first()
1105    ///   .filter_if_some(some_on_first_tick)
1106    ///   .unwrap_or(tick.singleton(q!(789)))
1107    ///   .all_ticks()
1108    /// # }, |mut stream| async move {
1109    /// // [789, 789]
1110    /// # for w in vec![789, 789] {
1111    /// #     assert_eq!(stream.next().await.unwrap(), w);
1112    /// # }
1113    /// # }));
1114    /// # }
1115    /// ```
1116    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1117    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
1118    where
1119        B: IsBounded,
1120    {
1121        self.filter_if(signal.is_some())
1122    }
1123
1124    /// Filters this optional, passing through the optional value if it is non-null **and** the
1125    /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
1126    ///
1127    /// Useful for conditionally processing, such as only emitting an optional's value outside
1128    /// a tick if some other condition is satisfied.
1129    ///
1130    /// # Example
1131    /// ```rust
1132    /// # #[cfg(feature = "deploy")] {
1133    /// # use hydro_lang::prelude::*;
1134    /// # use futures::StreamExt;
1135    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1136    /// let tick = process.tick();
1137    /// // ticks are lazy by default, forces the second tick to run
1138    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1139    ///
1140    /// let batch_first_tick = process
1141    ///   .source_iter(q!(vec![]))
1142    ///   .batch(&tick, nondet!(/** test */));
1143    /// let batch_second_tick = process
1144    ///   .source_iter(q!(vec![456]))
1145    ///   .batch(&tick, nondet!(/** test */))
1146    ///   .defer_tick(); // appears on the second tick
1147    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1148    /// batch_first_tick.chain(batch_second_tick).first()
1149    ///   .filter_if_none(some_on_first_tick)
1150    ///   .unwrap_or(tick.singleton(q!(789)))
1151    ///   .all_ticks()
1152    /// # }, |mut stream| async move {
1153    /// // [789, 789]
1154    /// # for w in vec![789, 456] {
1155    /// #     assert_eq!(stream.next().await.unwrap(), w);
1156    /// # }
1157    /// # }));
1158    /// # }
1159    /// ```
1160    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1161    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
1162    where
1163        B: IsBounded,
1164    {
1165        self.filter_if(other.is_none())
1166    }
1167
1168    /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
1169    ///
1170    /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
1171    /// having a value, such as only releasing a piece of state if the node is the leader.
1172    ///
1173    /// # Example
1174    /// ```rust
1175    /// # #[cfg(feature = "deploy")] {
1176    /// # use hydro_lang::prelude::*;
1177    /// # use futures::StreamExt;
1178    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1179    /// let tick = process.tick();
1180    /// // ticks are lazy by default, forces the second tick to run
1181    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1182    ///
1183    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1184    /// some_on_first_tick
1185    ///     .if_some_then(tick.singleton(q!(456)))
1186    ///     .unwrap_or(tick.singleton(q!(123)))
1187    /// # .all_ticks()
1188    /// # }, |mut stream| async move {
1189    /// // 456 (first tick) ~> 123 (second tick onwards)
1190    /// # for w in vec![456, 123, 123] {
1191    /// #     assert_eq!(stream.next().await.unwrap(), w);
1192    /// # }
1193    /// # }));
1194    /// # }
1195    /// ```
1196    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1197    pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
1198    where
1199        B: IsBounded,
1200    {
1201        value.filter_if(self.is_some())
1202    }
1203}
1204
1205impl<'a, K, V, L, B: Boundedness> Optional<(K, V), L, B>
1206where
1207    L: Location<'a>,
1208{
1209    /// Converts this optional into a [`KeyedSingleton`] containing a single entry with the
1210    /// key-value pair of this [`Optional`].
1211    ///
1212    /// If this [`Optional`] is [`Bounded`], the [`KeyedSingleton`] will be [`Bounded`] as well
1213    /// if it is [`Unbounded`], the [`KeyedSingleton`] will be [`Unbounded`], which means that
1214    /// the entry will be updated and appear / disappear according to the state of the
1215    /// [`Optional`].
1216    pub fn into_keyed_singleton(self) -> KeyedSingleton<K, V, L, B> {
1217        KeyedSingleton::new(
1218            self.location.clone(),
1219            HydroNode::Cast {
1220                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1221                metadata: self
1222                    .location
1223                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1224            },
1225        )
1226    }
1227}
1228
1229impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
1230where
1231    L: Location<'a>,
1232{
1233    /// Returns an optional value corresponding to the latest snapshot of the optional
1234    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1235    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1236    /// all snapshots of this optional into the atomic-associated tick will observe the
1237    /// same value each tick.
1238    ///
1239    /// # Non-Determinism
1240    /// Because this picks a snapshot of a optional whose value is continuously changing,
1241    /// the output optional has a non-deterministic value since the snapshot can be at an
1242    /// arbitrary point in time.
1243    pub fn snapshot_atomic<L2: Location<'a, NoConsistency = L::NoConsistency>>(
1244        self,
1245        tick: &Tick<L2>,
1246        _nondet: NonDet,
1247    ) -> Optional<T, Tick<L::NoConsistency>, Bounded> {
1248        Optional::new(
1249            tick.drop_consistency(),
1250            HydroNode::Batch {
1251                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1252                metadata: tick
1253                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1254            },
1255        )
1256    }
1257
1258    /// Returns this optional back into a top-level, asynchronous execution context where updates
1259    /// to the value will be asynchronously propagated.
1260    pub fn end_atomic(self) -> Optional<T, L, B> {
1261        Optional::new(
1262            self.location.tick.l.clone(),
1263            HydroNode::EndAtomic {
1264                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1265                metadata: self
1266                    .location
1267                    .tick
1268                    .l
1269                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1270            },
1271        )
1272    }
1273}
1274
1275impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1276where
1277    L: Location<'a>,
1278{
1279    /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1280    /// will observe the same version of the value and will be executed synchronously before any
1281    /// outputs are yielded (in [`Optional::end_atomic`]).
1282    ///
1283    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1284    /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1285    /// a different version).
1286    pub fn atomic(self) -> Optional<T, Atomic<L>, B> {
1287        let id = self.location.flow_state().borrow_mut().next_clock_id();
1288        let out_location = Atomic {
1289            tick: Tick {
1290                id,
1291                l: self.location.clone(),
1292            },
1293        };
1294        Optional::new(
1295            out_location.clone(),
1296            HydroNode::BeginAtomic {
1297                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1298                metadata: out_location
1299                    .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1300            },
1301        )
1302    }
1303
1304    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1305    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1306    /// relevant data that contributed to the snapshot at tick `t`.
1307    ///
1308    /// # Non-Determinism
1309    /// Because this picks a snapshot of a optional whose value is continuously changing,
1310    /// the output optional has a non-deterministic value since the snapshot can be at an
1311    /// arbitrary point in time.
1312    pub fn snapshot<L2: Location<'a, NoConsistency = L::NoConsistency>>(
1313        self,
1314        tick: &Tick<L2>,
1315        _nondet: NonDet,
1316    ) -> Optional<T, Tick<L::NoConsistency>, Bounded> {
1317        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1318        Optional::new(
1319            tick.drop_consistency(),
1320            HydroNode::Batch {
1321                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1322                metadata: tick
1323                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1324            },
1325        )
1326    }
1327
1328    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1329    /// with order corresponding to increasing prefixes of data contributing to the optional.
1330    ///
1331    /// # Non-Determinism
1332    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1333    /// to non-deterministic batching and arrival of inputs, the output stream is
1334    /// non-deterministic.
1335    pub fn sample_eager(
1336        self,
1337        nondet: NonDet,
1338    ) -> Stream<T, L::NoConsistency, Unbounded, TotalOrder, AtLeastOnce> {
1339        let tick = self.location.tick();
1340        self.snapshot(&tick, nondet).all_ticks().weaken_retries()
1341    }
1342
1343    /// Given a time interval, returns a stream corresponding to snapshots of the optional
1344    /// value taken at various points in time. Because the input optional may be
1345    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1346    /// represent the value of the optional given some prefix of the streams leading up to
1347    /// it.
1348    ///
1349    /// # Non-Determinism
1350    /// The output stream is non-deterministic in which elements are sampled, since this
1351    /// is controlled by a clock.
1352    pub fn sample_every(
1353        self,
1354        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1355        nondet: NonDet,
1356    ) -> Stream<T, L::NoConsistency, Unbounded, TotalOrder, AtLeastOnce>
1357    where
1358        L: NoAtomic,
1359    {
1360        let samples = self.location.source_interval(interval, nondet);
1361        let tick = self.location.tick();
1362
1363        self.snapshot(&tick, nondet)
1364            .filter_if(samples.batch(&tick, nondet).first().is_some())
1365            .all_ticks()
1366            .weaken_retries()
1367    }
1368}
1369
1370impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1371where
1372    L: Location<'a>,
1373{
1374    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1375    /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1376    /// null values).
1377    ///
1378    /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1379    /// producing one element in the output for each (non-null) tick. This is useful for batched
1380    /// computations, where the results from each tick must be combined together.
1381    ///
1382    /// # Example
1383    /// ```rust
1384    /// # #[cfg(feature = "deploy")] {
1385    /// # use hydro_lang::prelude::*;
1386    /// # use futures::StreamExt;
1387    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1388    /// # let tick = process.tick();
1389    /// # // ticks are lazy by default, forces the second tick to run
1390    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1391    /// # let batch_first_tick = process
1392    /// #   .source_iter(q!(vec![]))
1393    /// #   .batch(&tick, nondet!(/** test */));
1394    /// # let batch_second_tick = process
1395    /// #   .source_iter(q!(vec![1, 2, 3]))
1396    /// #   .batch(&tick, nondet!(/** test */))
1397    /// #   .defer_tick(); // appears on the second tick
1398    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1399    /// input_batch // first tick: [], second tick: [1, 2, 3]
1400    ///     .max()
1401    ///     .all_ticks()
1402    /// # }, |mut stream| async move {
1403    /// // [3]
1404    /// # for w in vec![3] {
1405    /// #     assert_eq!(stream.next().await.unwrap(), w);
1406    /// # }
1407    /// # }));
1408    /// # }
1409    /// ```
1410    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1411        self.into_stream().all_ticks()
1412    }
1413
1414    /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1415    /// which will stream the value computed in _each_ tick as a separate stream element.
1416    ///
1417    /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1418    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1419    /// optional's [`Tick`] context.
1420    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1421        self.into_stream().all_ticks_atomic()
1422    }
1423
1424    /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1425    /// be asynchronously updated with the latest value of the optional inside the tick, including
1426    /// whether the optional is null or not.
1427    ///
1428    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1429    /// tick that tracks the inner value. This is useful for getting the value as of the
1430    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1431    ///
1432    /// # Example
1433    /// ```rust
1434    /// # #[cfg(feature = "deploy")] {
1435    /// # use hydro_lang::prelude::*;
1436    /// # use futures::StreamExt;
1437    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1438    /// # let tick = process.tick();
1439    /// # // ticks are lazy by default, forces the second tick to run
1440    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1441    /// # let batch_first_tick = process
1442    /// #   .source_iter(q!(vec![]))
1443    /// #   .batch(&tick, nondet!(/** test */));
1444    /// # let batch_second_tick = process
1445    /// #   .source_iter(q!(vec![1, 2, 3]))
1446    /// #   .batch(&tick, nondet!(/** test */))
1447    /// #   .defer_tick(); // appears on the second tick
1448    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1449    /// input_batch // first tick: [], second tick: [1, 2, 3]
1450    ///     .max()
1451    ///     .latest()
1452    /// # .into_singleton()
1453    /// # .sample_eager(nondet!(/** test */))
1454    /// # }, |mut stream| async move {
1455    /// // asynchronously changes from None ~> 3
1456    /// # for w in vec![None, Some(3)] {
1457    /// #     assert_eq!(stream.next().await.unwrap(), w);
1458    /// # }
1459    /// # }));
1460    /// # }
1461    /// ```
1462    pub fn latest(self) -> Optional<T, L, Unbounded> {
1463        Optional::new(
1464            self.location.outer().clone(),
1465            HydroNode::YieldConcat {
1466                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1467                metadata: self
1468                    .location
1469                    .outer()
1470                    .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1471            },
1472        )
1473    }
1474
1475    /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1476    /// be updated with the latest value of the optional inside the tick.
1477    ///
1478    /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1479    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1480    /// optional's [`Tick`] context.
1481    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1482        let out_location = Atomic {
1483            tick: self.location.clone(),
1484        };
1485
1486        Optional::new(
1487            out_location.clone(),
1488            HydroNode::YieldConcat {
1489                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1490                metadata: out_location
1491                    .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1492            },
1493        )
1494    }
1495
1496    /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1497    /// always has the state of `self` at tick `T - 1`.
1498    ///
1499    /// At tick `0`, the output optional is null, since there is no previous tick.
1500    ///
1501    /// This operator enables stateful iterative processing with ticks, by sending data from one
1502    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1503    ///
1504    /// # Example
1505    /// ```rust
1506    /// # #[cfg(feature = "deploy")] {
1507    /// # use hydro_lang::prelude::*;
1508    /// # use futures::StreamExt;
1509    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1510    /// let tick = process.tick();
1511    /// // ticks are lazy by default, forces the second tick to run
1512    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1513    ///
1514    /// let batch_first_tick = process
1515    ///   .source_iter(q!(vec![1, 2]))
1516    ///   .batch(&tick, nondet!(/** test */));
1517    /// let batch_second_tick = process
1518    ///   .source_iter(q!(vec![3, 4]))
1519    ///   .batch(&tick, nondet!(/** test */))
1520    ///   .defer_tick(); // appears on the second tick
1521    /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1522    ///   .reduce(q!(|state, v| *state += v));
1523    ///
1524    /// current_tick_sum.clone().into_singleton().zip(
1525    ///   current_tick_sum.defer_tick().into_singleton() // state from previous tick
1526    /// ).all_ticks()
1527    /// # }, |mut stream| async move {
1528    /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1529    /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1530    /// #     assert_eq!(stream.next().await.unwrap(), w);
1531    /// # }
1532    /// # }));
1533    /// # }
1534    /// ```
1535    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1536        Optional::new(
1537            self.location.clone(),
1538            HydroNode::DeferTick {
1539                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1540                metadata: self.location.new_node_metadata(Self::collection_kind()),
1541            },
1542        )
1543    }
1544}
1545
1546#[cfg(test)]
1547mod tests {
1548    #[cfg(feature = "deploy")]
1549    use futures::StreamExt;
1550    #[cfg(feature = "deploy")]
1551    use hydro_deploy::Deployment;
1552    #[cfg(any(feature = "deploy", feature = "sim"))]
1553    use stageleft::q;
1554
1555    #[cfg(feature = "deploy")]
1556    use super::Optional;
1557    #[cfg(any(feature = "deploy", feature = "sim"))]
1558    use crate::compile::builder::FlowBuilder;
1559    #[cfg(any(feature = "deploy", feature = "sim"))]
1560    use crate::location::Location;
1561    #[cfg(feature = "deploy")]
1562    use crate::nondet::nondet;
1563
1564    #[cfg(feature = "deploy")]
1565    #[tokio::test]
1566    async fn optional_or_cardinality() {
1567        let mut deployment = Deployment::new();
1568
1569        let mut flow = FlowBuilder::new();
1570        let node = flow.process::<()>();
1571        let external = flow.external::<()>();
1572
1573        let node_tick = node.tick();
1574        let tick_singleton = node_tick.singleton(q!(123));
1575        let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1576        let counts = tick_optional_inhabited
1577            .clone()
1578            .or(tick_optional_inhabited)
1579            .into_stream()
1580            .count()
1581            .all_ticks()
1582            .send_bincode_external(&external);
1583
1584        let nodes = flow
1585            .with_process(&node, deployment.Localhost())
1586            .with_external(&external, deployment.Localhost())
1587            .deploy(&mut deployment);
1588
1589        deployment.deploy().await.unwrap();
1590
1591        let mut external_out = nodes.connect(counts).await;
1592
1593        deployment.start().await.unwrap();
1594
1595        assert_eq!(external_out.next().await.unwrap(), 1);
1596    }
1597
1598    #[cfg(feature = "deploy")]
1599    #[tokio::test]
1600    async fn into_singleton_top_level_none_cardinality() {
1601        let mut deployment = Deployment::new();
1602
1603        let mut flow = FlowBuilder::new();
1604        let node = flow.process::<()>();
1605        let external = flow.external::<()>();
1606
1607        let node_tick = node.tick();
1608        let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1609        let into_singleton = top_level_none.into_singleton();
1610
1611        let tick_driver = node.spin();
1612
1613        let counts = into_singleton
1614            .snapshot(&node_tick, nondet!(/** test */))
1615            .into_stream()
1616            .count()
1617            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1618            .map(q!(|(c, _)| c))
1619            .all_ticks()
1620            .send_bincode_external(&external);
1621
1622        let nodes = flow
1623            .with_process(&node, deployment.Localhost())
1624            .with_external(&external, deployment.Localhost())
1625            .deploy(&mut deployment);
1626
1627        deployment.deploy().await.unwrap();
1628
1629        let mut external_out = nodes.connect(counts).await;
1630
1631        deployment.start().await.unwrap();
1632
1633        assert_eq!(external_out.next().await.unwrap(), 1);
1634        assert_eq!(external_out.next().await.unwrap(), 1);
1635        assert_eq!(external_out.next().await.unwrap(), 1);
1636    }
1637
1638    #[cfg(feature = "deploy")]
1639    #[tokio::test]
1640    async fn into_singleton_unbounded_top_level_none_cardinality() {
1641        let mut deployment = Deployment::new();
1642
1643        let mut flow = FlowBuilder::new();
1644        let node = flow.process::<()>();
1645        let external = flow.external::<()>();
1646
1647        let node_tick = node.tick();
1648        let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1649        let into_singleton = top_level_none.into_singleton();
1650
1651        let tick_driver = node.spin();
1652
1653        let counts = into_singleton
1654            .snapshot(&node_tick, nondet!(/** test */))
1655            .into_stream()
1656            .count()
1657            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1658            .map(q!(|(c, _)| c))
1659            .all_ticks()
1660            .send_bincode_external(&external);
1661
1662        let nodes = flow
1663            .with_process(&node, deployment.Localhost())
1664            .with_external(&external, deployment.Localhost())
1665            .deploy(&mut deployment);
1666
1667        deployment.deploy().await.unwrap();
1668
1669        let mut external_out = nodes.connect(counts).await;
1670
1671        deployment.start().await.unwrap();
1672
1673        assert_eq!(external_out.next().await.unwrap(), 1);
1674        assert_eq!(external_out.next().await.unwrap(), 1);
1675        assert_eq!(external_out.next().await.unwrap(), 1);
1676    }
1677
1678    #[cfg(feature = "sim")]
1679    #[test]
1680    fn top_level_optional_some_into_stream_no_replay() {
1681        let mut flow = FlowBuilder::new();
1682        let node = flow.process::<()>();
1683
1684        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1685        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1686        let filtered_some = folded.filter(q!(|_| true));
1687
1688        let out_recv = filtered_some.into_stream().sim_output();
1689
1690        flow.sim().exhaustive(async || {
1691            out_recv.assert_yields_only([10]).await;
1692        });
1693    }
1694
1695    #[cfg(feature = "sim")]
1696    #[test]
1697    fn top_level_optional_none_into_stream_no_replay() {
1698        let mut flow = FlowBuilder::new();
1699        let node = flow.process::<()>();
1700
1701        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1702        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1703        let filtered_none = folded.filter(q!(|_| false));
1704
1705        let out_recv = filtered_none.into_stream().sim_output();
1706
1707        flow.sim().exhaustive(async || {
1708            out_recv.assert_yields_only([] as [i32; 0]).await;
1709        });
1710    }
1711}