hydro_lang/live_collections/optional.rs
1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19use crate::live_collections::singleton::SingletonBound;
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::{DynLocation, LocationId};
22use crate::location::tick::{Atomic, DeferTick, NoAtomic};
23use crate::location::{Location, Tick, check_matching_location};
24use crate::nondet::{NonDet, nondet};
25use crate::prelude::KeyedSingleton;
26
27/// A *nullable* Rust value that can asynchronously change over time.
28///
29/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
30/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
31/// asynchronously change over time, including becoming present of uninhabited.
32///
33/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
34/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
35///
36/// Type Parameters:
37/// - `Type`: the type of the value in this optional (when it is not null)
38/// - `Loc`: the [`Location`] where the optional is materialized
39/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
40pub struct Optional<Type, Loc, Bound: Boundedness> {
41 pub(crate) location: Loc,
42 pub(crate) ir_node: RefCell<HydroNode>,
43 pub(crate) flow_state: FlowState,
44
45 _phantom: PhantomData<(Type, Loc, Bound)>,
46}
47
48impl<T, L, B: Boundedness> Drop for Optional<T, L, B> {
49 fn drop(&mut self) {
50 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
51 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
52 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
53 input: Box::new(ir_node),
54 op_metadata: HydroIrOpMetadata::new(),
55 });
56 }
57 }
58}
59
60impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
61where
62 T: Clone,
63 L: Location<'a>,
64{
65 fn from(value: Optional<T, L, Bounded>) -> Self {
66 let tick = value.location().tick();
67 value.clone_into_tick(&tick).latest()
68 }
69}
70
71impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
72where
73 L: Location<'a>,
74{
75 fn defer_tick(self) -> Self {
76 Optional::defer_tick(self)
77 }
78}
79
80impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
81where
82 L: Location<'a>,
83{
84 type Location = Tick<L>;
85
86 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
87 Optional::new(
88 location.clone(),
89 HydroNode::CycleSource {
90 cycle_id,
91 metadata: location.new_node_metadata(Self::collection_kind()),
92 },
93 )
94 }
95}
96
97impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
98where
99 L: Location<'a>,
100{
101 type Location = Tick<L>;
102
103 fn location(&self) -> &Self::Location {
104 self.location()
105 }
106
107 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
108 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
109 location.clone(),
110 HydroNode::DeferTick {
111 input: Box::new(HydroNode::CycleSource {
112 cycle_id,
113 metadata: location.new_node_metadata(Self::collection_kind()),
114 }),
115 metadata: location
116 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
117 },
118 );
119
120 from_previous_tick.or(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
121 }
122}
123
124impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
125where
126 L: Location<'a>,
127{
128 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
129 assert_eq!(
130 Location::id(&self.location),
131 expected_location,
132 "locations do not match"
133 );
134 self.location
135 .flow_state()
136 .borrow_mut()
137 .push_root(HydroRoot::CycleSink {
138 cycle_id,
139 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
140 op_metadata: HydroIrOpMetadata::new(),
141 });
142 }
143}
144
145impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
146where
147 L: Location<'a>,
148{
149 type Location = L;
150
151 fn create_source(cycle_id: CycleId, location: L) -> Self {
152 Optional::new(
153 location.clone(),
154 HydroNode::CycleSource {
155 cycle_id,
156 metadata: location.new_node_metadata(Self::collection_kind()),
157 },
158 )
159 }
160}
161
162impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
163where
164 L: Location<'a>,
165{
166 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
167 assert_eq!(
168 Location::id(&self.location),
169 expected_location,
170 "locations do not match"
171 );
172 self.location
173 .flow_state()
174 .borrow_mut()
175 .push_root(HydroRoot::CycleSink {
176 cycle_id,
177 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
178 op_metadata: HydroIrOpMetadata::new(),
179 });
180 }
181}
182
183impl<'a, T, L, B: SingletonBound> From<Singleton<T, L, B>> for Optional<T, L, B::UnderlyingBound>
184where
185 L: Location<'a>,
186{
187 fn from(singleton: Singleton<T, L, B>) -> Self {
188 Optional::new(
189 singleton.location.clone(),
190 HydroNode::Cast {
191 inner: Box::new(singleton.ir_node.replace(HydroNode::Placeholder)),
192 metadata: singleton
193 .location
194 .new_node_metadata(Self::collection_kind()),
195 },
196 )
197 }
198}
199
200#[cfg(stageleft_runtime)]
201pub(super) fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
202 me: Optional<T, L, B>,
203 other: Optional<O, L, B>,
204) -> Optional<(T, O), L, B> {
205 check_matching_location(&me.location, &other.location);
206
207 Optional::new(
208 me.location.clone(),
209 HydroNode::CrossSingleton {
210 left: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
211 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
212 metadata: me
213 .location
214 .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
215 },
216 )
217}
218
219#[cfg(stageleft_runtime)]
220fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
221 me: Optional<T, L, B>,
222 other: Optional<T, L, B>,
223) -> Optional<T, L, B> {
224 check_matching_location(&me.location, &other.location);
225
226 Optional::new(
227 me.location.clone(),
228 HydroNode::ChainFirst {
229 first: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
230 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
231 metadata: me
232 .location
233 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
234 },
235 )
236}
237
238impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
239where
240 T: Clone,
241 L: Location<'a>,
242{
243 fn clone(&self) -> Self {
244 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
245 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
246 *self.ir_node.borrow_mut() = HydroNode::Tee {
247 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
248 metadata: self.location.new_node_metadata(Self::collection_kind()),
249 };
250 }
251
252 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
253 Optional {
254 location: self.location.clone(),
255 flow_state: self.flow_state.clone(),
256 ir_node: HydroNode::Tee {
257 inner: SharedNode(inner.0.clone()),
258 metadata: metadata.clone(),
259 }
260 .into(),
261 _phantom: PhantomData,
262 }
263 } else {
264 unreachable!()
265 }
266 }
267}
268
269impl<'a, T, L, B: Boundedness> Optional<T, L, B>
270where
271 L: Location<'a>,
272{
273 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
274 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
275 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
276 let flow_state = location.flow_state().clone();
277 Optional {
278 location,
279 flow_state,
280 ir_node: RefCell::new(ir_node),
281 _phantom: PhantomData,
282 }
283 }
284
285 pub(crate) fn collection_kind() -> CollectionKind {
286 CollectionKind::Optional {
287 bound: B::BOUND_KIND,
288 element_type: stageleft::quote_type::<T>().into(),
289 }
290 }
291
292 /// Returns the [`Location`] where this optional is being materialized.
293 pub fn location(&self) -> &L {
294 &self.location
295 }
296
297 /// Weakens the consistency of this live collection to not guarantee any consistency across
298 /// cluster members (if this collection is on a cluster).
299 pub fn weaken_consistency(self) -> Optional<T, L::NoConsistency, B>
300 where
301 L: Location<'a>,
302 {
303 if L::consistency()
304 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
305 {
306 // already no consistency
307 Optional::new(
308 self.location.drop_consistency(),
309 self.ir_node.replace(HydroNode::Placeholder),
310 )
311 } else {
312 Optional::new(
313 self.location.drop_consistency(),
314 HydroNode::Cast {
315 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
316 metadata: self
317 .location
318 .clone()
319 .drop_consistency()
320 .new_node_metadata(Optional::<T, L::NoConsistency, B>::collection_kind()),
321 },
322 )
323 }
324 }
325
326 /// Casts this live collection to have the consistency guarantees specified in the given
327 /// location type parameter. The developer must ensure that the strengthened consistency
328 /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
329 pub fn assert_has_consistency_of<L2: Location<'a, NoConsistency = L::NoConsistency>>(
330 self,
331 _proof: impl crate::properties::ConsistencyProof,
332 ) -> Optional<T, L2, B>
333 where
334 L: Location<'a>,
335 {
336 if L::consistency() == L2::consistency() {
337 Optional::new(
338 self.location.with_consistency_of(),
339 self.ir_node.replace(HydroNode::Placeholder),
340 )
341 } else {
342 Optional::new(
343 self.location.with_consistency_of(),
344 HydroNode::AssertIsConsistent {
345 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
346 metadata: self
347 .location
348 .clone()
349 .with_consistency_of::<L2>()
350 .new_node_metadata(Optional::<T, L2, B>::collection_kind()),
351 },
352 )
353 }
354 }
355
356 /// Transforms the optional value by applying a function `f` to it,
357 /// continuously as the input is updated.
358 ///
359 /// Whenever the optional is empty, the output optional is also empty.
360 ///
361 /// # Example
362 /// ```rust
363 /// # #[cfg(feature = "deploy")] {
364 /// # use hydro_lang::prelude::*;
365 /// # use futures::StreamExt;
366 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
367 /// let tick = process.tick();
368 /// let optional = tick.optional_first_tick(q!(1));
369 /// optional.map(q!(|v| v + 1)).all_ticks()
370 /// # }, |mut stream| async move {
371 /// // 2
372 /// # assert_eq!(stream.next().await.unwrap(), 2);
373 /// # }));
374 /// # }
375 /// ```
376 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
377 where
378 F: Fn(T) -> U + 'a,
379 {
380 let f = f.splice_fn1_ctx(&self.location).into();
381 Optional::new(
382 self.location.clone(),
383 HydroNode::Map {
384 f,
385 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
386 metadata: self
387 .location
388 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
389 },
390 )
391 }
392
393 /// Transforms the optional value by applying a function `f` to it and then flattening
394 /// the result into a stream, preserving the order of elements.
395 ///
396 /// If the optional is empty, the output stream is also empty. If the optional contains
397 /// a value, `f` is applied to produce an iterator, and all items from that iterator
398 /// are emitted in the output stream in deterministic order.
399 ///
400 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
401 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
402 /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
403 ///
404 /// # Example
405 /// ```rust
406 /// # #[cfg(feature = "deploy")] {
407 /// # use hydro_lang::prelude::*;
408 /// # use futures::StreamExt;
409 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
410 /// let tick = process.tick();
411 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
412 /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
413 /// # }, |mut stream| async move {
414 /// // 1, 2, 3
415 /// # for w in vec![1, 2, 3] {
416 /// # assert_eq!(stream.next().await.unwrap(), w);
417 /// # }
418 /// # }));
419 /// # }
420 /// ```
421 pub fn flat_map_ordered<U, I, F>(
422 self,
423 f: impl IntoQuotedMut<'a, F, L>,
424 ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
425 where
426 B: IsBounded,
427 I: IntoIterator<Item = U>,
428 F: Fn(T) -> I + 'a,
429 {
430 self.into_stream().flat_map_ordered(f)
431 }
432
433 /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
434 /// for the output type `I` to produce items in any order.
435 ///
436 /// If the optional is empty, the output stream is also empty. If the optional contains
437 /// a value, `f` is applied to produce an iterator, and all items from that iterator
438 /// are emitted in the output stream in non-deterministic order.
439 ///
440 /// # Example
441 /// ```rust
442 /// # #[cfg(feature = "deploy")] {
443 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
444 /// # use futures::StreamExt;
445 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
446 /// let tick = process.tick();
447 /// let optional = tick.optional_first_tick(q!(
448 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
449 /// ));
450 /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
451 /// # }, |mut stream| async move {
452 /// // 1, 2, 3, but in no particular order
453 /// # let mut results = Vec::new();
454 /// # for _ in 0..3 {
455 /// # results.push(stream.next().await.unwrap());
456 /// # }
457 /// # results.sort();
458 /// # assert_eq!(results, vec![1, 2, 3]);
459 /// # }));
460 /// # }
461 /// ```
462 pub fn flat_map_unordered<U, I, F>(
463 self,
464 f: impl IntoQuotedMut<'a, F, L>,
465 ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
466 where
467 B: IsBounded,
468 I: IntoIterator<Item = U>,
469 F: Fn(T) -> I + 'a,
470 {
471 self.into_stream().flat_map_unordered(f)
472 }
473
474 /// Flattens the optional value into a stream, preserving the order of elements.
475 ///
476 /// If the optional is empty, the output stream is also empty. If the optional contains
477 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
478 /// in the output stream in deterministic order.
479 ///
480 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
481 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
482 /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
483 ///
484 /// # Example
485 /// ```rust
486 /// # #[cfg(feature = "deploy")] {
487 /// # use hydro_lang::prelude::*;
488 /// # use futures::StreamExt;
489 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
490 /// let tick = process.tick();
491 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
492 /// optional.flatten_ordered().all_ticks()
493 /// # }, |mut stream| async move {
494 /// // 1, 2, 3
495 /// # for w in vec![1, 2, 3] {
496 /// # assert_eq!(stream.next().await.unwrap(), w);
497 /// # }
498 /// # }));
499 /// # }
500 /// ```
501 pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
502 where
503 B: IsBounded,
504 T: IntoIterator<Item = U>,
505 {
506 self.flat_map_ordered(q!(|v| v))
507 }
508
509 /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
510 /// for the element type `T` to produce items in any order.
511 ///
512 /// If the optional is empty, the output stream is also empty. If the optional contains
513 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
514 /// in the output stream in non-deterministic order.
515 ///
516 /// # Example
517 /// ```rust
518 /// # #[cfg(feature = "deploy")] {
519 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
520 /// # use futures::StreamExt;
521 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
522 /// let tick = process.tick();
523 /// let optional = tick.optional_first_tick(q!(
524 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
525 /// ));
526 /// optional.flatten_unordered().all_ticks()
527 /// # }, |mut stream| async move {
528 /// // 1, 2, 3, but in no particular order
529 /// # let mut results = Vec::new();
530 /// # for _ in 0..3 {
531 /// # results.push(stream.next().await.unwrap());
532 /// # }
533 /// # results.sort();
534 /// # assert_eq!(results, vec![1, 2, 3]);
535 /// # }));
536 /// # }
537 /// ```
538 pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
539 where
540 B: IsBounded,
541 T: IntoIterator<Item = U>,
542 {
543 self.flat_map_unordered(q!(|v| v))
544 }
545
546 /// Creates an optional containing only the value if it satisfies a predicate `f`.
547 ///
548 /// If the optional is empty, the output optional is also empty. If the optional contains
549 /// a value and the predicate returns `true`, the output optional contains the same value.
550 /// If the predicate returns `false`, the output optional is empty.
551 ///
552 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
553 /// not modify or take ownership of the value. If you need to modify the value while filtering
554 /// use [`Optional::filter_map`] instead.
555 ///
556 /// # Example
557 /// ```rust
558 /// # #[cfg(feature = "deploy")] {
559 /// # use hydro_lang::prelude::*;
560 /// # use futures::StreamExt;
561 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
562 /// let tick = process.tick();
563 /// let optional = tick.optional_first_tick(q!(5));
564 /// optional.filter(q!(|&x| x > 3)).all_ticks()
565 /// # }, |mut stream| async move {
566 /// // 5
567 /// # assert_eq!(stream.next().await.unwrap(), 5);
568 /// # }));
569 /// # }
570 /// ```
571 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
572 where
573 F: Fn(&T) -> bool + 'a,
574 {
575 let f = f.splice_fn1_borrow_ctx(&self.location).into();
576 Optional::new(
577 self.location.clone(),
578 HydroNode::Filter {
579 f,
580 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
581 metadata: self.location.new_node_metadata(Self::collection_kind()),
582 },
583 )
584 }
585
586 /// An operator that both filters and maps. It yields only the value if the supplied
587 /// closure `f` returns `Some(value)`.
588 ///
589 /// If the optional is empty, the output optional is also empty. If the optional contains
590 /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
591 /// If the closure returns `None`, the output optional is empty.
592 ///
593 /// # Example
594 /// ```rust
595 /// # #[cfg(feature = "deploy")] {
596 /// # use hydro_lang::prelude::*;
597 /// # use futures::StreamExt;
598 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
599 /// let tick = process.tick();
600 /// let optional = tick.optional_first_tick(q!("42"));
601 /// optional
602 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
603 /// .all_ticks()
604 /// # }, |mut stream| async move {
605 /// // 42
606 /// # assert_eq!(stream.next().await.unwrap(), 42);
607 /// # }));
608 /// # }
609 /// ```
610 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
611 where
612 F: Fn(T) -> Option<U> + 'a,
613 {
614 let f = f.splice_fn1_ctx(&self.location).into();
615 Optional::new(
616 self.location.clone(),
617 HydroNode::FilterMap {
618 f,
619 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
620 metadata: self
621 .location
622 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
623 },
624 )
625 }
626
627 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
628 ///
629 /// If the other value is a [`Optional`], the output will be non-null only if the argument is
630 /// non-null. This is useful for combining several pieces of state together.
631 ///
632 /// # Example
633 /// ```rust
634 /// # #[cfg(feature = "deploy")] {
635 /// # use hydro_lang::prelude::*;
636 /// # use futures::StreamExt;
637 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
638 /// let tick = process.tick();
639 /// let numbers = process
640 /// .source_iter(q!(vec![123, 456, 789]))
641 /// .batch(&tick, nondet!(/** test */));
642 /// let min = numbers.clone().min(); // Optional
643 /// let max = numbers.max(); // Optional
644 /// min.zip(max).all_ticks()
645 /// # }, |mut stream| async move {
646 /// // [(123, 789)]
647 /// # for w in vec![(123, 789)] {
648 /// # assert_eq!(stream.next().await.unwrap(), w);
649 /// # }
650 /// # }));
651 /// # }
652 /// ```
653 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
654 where
655 B: IsBounded,
656 {
657 let other: Optional<O, L, B> = other.into();
658 check_matching_location(&self.location, &other.location);
659
660 if L::is_top_level()
661 && let Some(tick) = self.location.try_tick()
662 {
663 let self_location = self.location().clone();
664 let out = zip_inside_tick(
665 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
666 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
667 )
668 .latest();
669
670 Optional::new(self_location, out.ir_node.replace(HydroNode::Placeholder))
671 } else {
672 zip_inside_tick(self, other)
673 }
674 }
675
676 /// Passes through `self` when it has a value, otherwise passes through `other`.
677 ///
678 /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
679 /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
680 /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
681 ///
682 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
683 /// of the inputs change (including to/from null states).
684 ///
685 /// # Example
686 /// ```rust
687 /// # #[cfg(feature = "deploy")] {
688 /// # use hydro_lang::prelude::*;
689 /// # use futures::StreamExt;
690 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
691 /// let tick = process.tick();
692 /// // ticks are lazy by default, forces the second tick to run
693 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
694 ///
695 /// let some_first_tick = tick.optional_first_tick(q!(123));
696 /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
697 /// some_first_tick.or(some_second_tick).all_ticks()
698 /// # }, |mut stream| async move {
699 /// // [123 /* first tick */, 456 /* second tick */]
700 /// # for w in vec![123, 456] {
701 /// # assert_eq!(stream.next().await.unwrap(), w);
702 /// # }
703 /// # }));
704 /// # }
705 /// ```
706 pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
707 check_matching_location(&self.location, &other.location);
708
709 if L::is_top_level()
710 && !B::BOUNDED // only if unbounded we need to use a tick
711 && let Some(tick) = self.location.try_tick()
712 {
713 let self_location = self.location().clone();
714 let out = or_inside_tick(
715 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
716 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
717 )
718 .latest();
719
720 Optional::new(self_location, out.ir_node.replace(HydroNode::Placeholder))
721 } else {
722 Optional::new(
723 self.location.clone(),
724 HydroNode::ChainFirst {
725 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
726 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
727 metadata: self.location.new_node_metadata(Self::collection_kind()),
728 },
729 )
730 }
731 }
732
733 /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
734 ///
735 /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
736 /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
737 ///
738 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
739 /// of the inputs change (including to/from null states).
740 ///
741 /// # Example
742 /// ```rust
743 /// # #[cfg(feature = "deploy")] {
744 /// # use hydro_lang::prelude::*;
745 /// # use futures::StreamExt;
746 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
747 /// let tick = process.tick();
748 /// // ticks are lazy by default, forces the later ticks to run
749 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
750 ///
751 /// let some_first_tick = tick.optional_first_tick(q!(123));
752 /// some_first_tick
753 /// .unwrap_or(tick.singleton(q!(456)))
754 /// .all_ticks()
755 /// # }, |mut stream| async move {
756 /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
757 /// # for w in vec![123, 456, 456, 456] {
758 /// # assert_eq!(stream.next().await.unwrap(), w);
759 /// # }
760 /// # }));
761 /// # }
762 /// ```
763 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
764 let res_option = self.or(other.into());
765 Singleton::new(
766 res_option.location.clone(),
767 HydroNode::Cast {
768 inner: Box::new(res_option.ir_node.replace(HydroNode::Placeholder)),
769 metadata: res_option
770 .location
771 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
772 },
773 )
774 }
775
776 /// Gets the contents of `self` when it has a value, otherwise returns the default value of `T`.
777 ///
778 /// Like [`Option::unwrap_or_default`], this is helpful for defining a fallback for an
779 /// [`Optional`] when the default value of the type is a suitable fallback.
780 ///
781 /// # Example
782 /// ```rust
783 /// # #[cfg(feature = "deploy")] {
784 /// # use hydro_lang::prelude::*;
785 /// # use futures::StreamExt;
786 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
787 /// let tick = process.tick();
788 /// // ticks are lazy by default, forces the later ticks to run
789 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
790 ///
791 /// let some_first_tick = tick.optional_first_tick(q!(123i32));
792 /// some_first_tick.unwrap_or_default().all_ticks()
793 /// # }, |mut stream| async move {
794 /// // [123 /* first tick */, 0 /* second tick */, 0 /* third tick */, 0, ...]
795 /// # for w in vec![123, 0, 0, 0] {
796 /// # assert_eq!(stream.next().await.unwrap(), w);
797 /// # }
798 /// # }));
799 /// # }
800 /// ```
801 pub fn unwrap_or_default(self) -> Singleton<T, L, B>
802 where
803 T: Default + Clone,
804 {
805 self.into_singleton().map(q!(|v| v.unwrap_or_default()))
806 }
807
808 /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
809 ///
810 /// Useful for writing custom Rust code that needs to interact with both the null and non-null
811 /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
812 /// so that Hydro can skip any computation on null values.
813 ///
814 /// # Example
815 /// ```rust
816 /// # #[cfg(feature = "deploy")] {
817 /// # use hydro_lang::prelude::*;
818 /// # use futures::StreamExt;
819 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
820 /// let tick = process.tick();
821 /// // ticks are lazy by default, forces the later ticks to run
822 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
823 ///
824 /// let some_first_tick = tick.optional_first_tick(q!(123));
825 /// some_first_tick.into_singleton().all_ticks()
826 /// # }, |mut stream| async move {
827 /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
828 /// # for w in vec![Some(123), None, None, None] {
829 /// # assert_eq!(stream.next().await.unwrap(), w);
830 /// # }
831 /// # }));
832 /// # }
833 /// ```
834 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
835 where
836 T: Clone,
837 {
838 let none: syn::Expr = parse_quote!(::std::option::Option::None);
839
840 let none_singleton = Singleton::new(
841 self.location.clone(),
842 HydroNode::SingletonSource {
843 value: none.into(),
844 first_tick_only: false,
845 metadata: self
846 .location
847 .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
848 },
849 );
850
851 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
852 }
853
854 /// Returns a [`Singleton`] containing `true` if this optional has a value, `false` otherwise.
855 ///
856 /// # Example
857 /// ```rust
858 /// # #[cfg(feature = "deploy")] {
859 /// # use hydro_lang::prelude::*;
860 /// # use futures::StreamExt;
861 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
862 /// let tick = process.tick();
863 /// // ticks are lazy by default, forces the second tick to run
864 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
865 ///
866 /// let some_first_tick = tick.optional_first_tick(q!(42));
867 /// some_first_tick.is_some().all_ticks()
868 /// # }, |mut stream| async move {
869 /// // [true /* first tick */, false /* second tick */, ...]
870 /// # for w in vec![true, false] {
871 /// # assert_eq!(stream.next().await.unwrap(), w);
872 /// # }
873 /// # }));
874 /// # }
875 /// ```
876 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
877 pub fn is_some(self) -> Singleton<bool, L, B> {
878 self.map(q!(|_| ()))
879 .into_singleton()
880 .map(q!(|o| o.is_some()))
881 }
882
883 /// Returns a [`Singleton`] containing `true` if this optional is null, `false` otherwise.
884 ///
885 /// # Example
886 /// ```rust
887 /// # #[cfg(feature = "deploy")] {
888 /// # use hydro_lang::prelude::*;
889 /// # use futures::StreamExt;
890 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
891 /// let tick = process.tick();
892 /// // ticks are lazy by default, forces the second tick to run
893 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
894 ///
895 /// let some_first_tick = tick.optional_first_tick(q!(42));
896 /// some_first_tick.is_none().all_ticks()
897 /// # }, |mut stream| async move {
898 /// // [false /* first tick */, true /* second tick */, ...]
899 /// # for w in vec![false, true] {
900 /// # assert_eq!(stream.next().await.unwrap(), w);
901 /// # }
902 /// # }));
903 /// # }
904 /// ```
905 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
906 pub fn is_none(self) -> Singleton<bool, L, B> {
907 self.map(q!(|_| ()))
908 .into_singleton()
909 .map(q!(|o| o.is_none()))
910 }
911
912 /// Returns a [`Singleton`] containing `true` if both optionals are non-null and their
913 /// values are equal, `false` otherwise (including when either is null).
914 ///
915 /// # Example
916 /// ```rust
917 /// # #[cfg(feature = "deploy")] {
918 /// # use hydro_lang::prelude::*;
919 /// # use futures::StreamExt;
920 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
921 /// let tick = process.tick();
922 /// // ticks are lazy by default, forces the second tick to run
923 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
924 ///
925 /// let a = tick.optional_first_tick(q!(5)); // Some(5), None
926 /// let b = tick.optional_first_tick(q!(5)); // Some(5), None
927 /// a.is_some_and_equals(b).all_ticks()
928 /// # }, |mut stream| async move {
929 /// // [true, false]
930 /// # for w in vec![true, false] {
931 /// # assert_eq!(stream.next().await.unwrap(), w);
932 /// # }
933 /// # }));
934 /// # }
935 /// ```
936 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
937 pub fn is_some_and_equals(self, other: Optional<T, L, B>) -> Singleton<bool, L, B>
938 where
939 T: PartialEq + Clone,
940 B: IsBounded,
941 {
942 self.into_singleton()
943 .zip(other.into_singleton())
944 .map(q!(|(a, b)| a.is_some() && a == b))
945 }
946
947 /// An operator which allows you to "name" a `HydroNode`.
948 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
949 pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
950 {
951 let mut node = self.ir_node.borrow_mut();
952 let metadata = node.metadata_mut();
953 metadata.tag = Some(name.to_owned());
954 }
955 self
956 }
957
958 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
959 /// implies that `B == Bounded`.
960 pub fn make_bounded(self) -> Optional<T, L, Bounded>
961 where
962 B: IsBounded,
963 {
964 Optional::new(
965 self.location.clone(),
966 self.ir_node.replace(HydroNode::Placeholder),
967 )
968 }
969
970 /// Clones this bounded optional into a tick, returning a optional that has the
971 /// same value as the outer optional. Because the outer optional is bounded, this
972 /// is deterministic because there is only a single immutable version.
973 pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
974 where
975 B: IsBounded,
976 T: Clone,
977 {
978 // TODO(shadaj): avoid printing simulator logs for this snapshot
979 let inner = self.snapshot(
980 tick,
981 nondet!(/** bounded top-level optional so deterministic */),
982 );
983 Optional::new(tick.clone(), inner.ir_node.replace(HydroNode::Placeholder))
984 }
985
986 /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
987 /// non-null. Otherwise, the stream is empty.
988 ///
989 /// # Example
990 /// ```rust
991 /// # #[cfg(feature = "deploy")] {
992 /// # use hydro_lang::prelude::*;
993 /// # use futures::StreamExt;
994 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
995 /// # let tick = process.tick();
996 /// # // ticks are lazy by default, forces the second tick to run
997 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
998 /// # let batch_first_tick = process
999 /// # .source_iter(q!(vec![]))
1000 /// # .batch(&tick, nondet!(/** test */));
1001 /// # let batch_second_tick = process
1002 /// # .source_iter(q!(vec![123, 456]))
1003 /// # .batch(&tick, nondet!(/** test */))
1004 /// # .defer_tick(); // appears on the second tick
1005 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1006 /// input_batch // first tick: [], second tick: [123, 456]
1007 /// .clone()
1008 /// .max()
1009 /// .into_stream()
1010 /// .chain(input_batch)
1011 /// .all_ticks()
1012 /// # }, |mut stream| async move {
1013 /// // [456, 123, 456]
1014 /// # for w in vec![456, 123, 456] {
1015 /// # assert_eq!(stream.next().await.unwrap(), w);
1016 /// # }
1017 /// # }));
1018 /// # }
1019 /// ```
1020 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1021 where
1022 B: IsBounded,
1023 {
1024 Stream::new(
1025 self.location.clone(),
1026 HydroNode::Cast {
1027 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1028 metadata: self.location.new_node_metadata(Stream::<
1029 T,
1030 Tick<L>,
1031 Bounded,
1032 TotalOrder,
1033 ExactlyOnce,
1034 >::collection_kind()),
1035 },
1036 )
1037 }
1038
1039 /// Filters this optional, passing through the value if the boolean signal is `true`,
1040 /// otherwise the output is null.
1041 ///
1042 /// # Example
1043 /// ```rust
1044 /// # #[cfg(feature = "deploy")] {
1045 /// # use hydro_lang::prelude::*;
1046 /// # use futures::StreamExt;
1047 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1048 /// let tick = process.tick();
1049 /// // ticks are lazy by default, forces the second tick to run
1050 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1051 ///
1052 /// let some_first_tick = tick.optional_first_tick(q!(()));
1053 /// let signal = some_first_tick.is_some(); // true on first tick, false on second
1054 /// let batch_first_tick = process
1055 /// .source_iter(q!(vec![456]))
1056 /// .batch(&tick, nondet!(/** test */));
1057 /// let batch_second_tick = process
1058 /// .source_iter(q!(vec![789]))
1059 /// .batch(&tick, nondet!(/** test */))
1060 /// .defer_tick();
1061 /// batch_first_tick.chain(batch_second_tick).first()
1062 /// .filter_if(signal)
1063 /// .unwrap_or(tick.singleton(q!(0)))
1064 /// .all_ticks()
1065 /// # }, |mut stream| async move {
1066 /// // [456, 0]
1067 /// # for w in vec![456, 0] {
1068 /// # assert_eq!(stream.next().await.unwrap(), w);
1069 /// # }
1070 /// # }));
1071 /// # }
1072 /// ```
1073 pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
1074 where
1075 B: IsBounded,
1076 {
1077 self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
1078 }
1079
1080 /// Filters this optional, passing through the optional value if it is non-null **and** the
1081 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
1082 ///
1083 /// Useful for conditionally processing, such as only emitting an optional's value outside
1084 /// a tick if some other condition is satisfied.
1085 ///
1086 /// # Example
1087 /// ```rust
1088 /// # #[cfg(feature = "deploy")] {
1089 /// # use hydro_lang::prelude::*;
1090 /// # use futures::StreamExt;
1091 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1092 /// let tick = process.tick();
1093 /// // ticks are lazy by default, forces the second tick to run
1094 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1095 ///
1096 /// let batch_first_tick = process
1097 /// .source_iter(q!(vec![]))
1098 /// .batch(&tick, nondet!(/** test */));
1099 /// let batch_second_tick = process
1100 /// .source_iter(q!(vec![456]))
1101 /// .batch(&tick, nondet!(/** test */))
1102 /// .defer_tick(); // appears on the second tick
1103 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1104 /// batch_first_tick.chain(batch_second_tick).first()
1105 /// .filter_if_some(some_on_first_tick)
1106 /// .unwrap_or(tick.singleton(q!(789)))
1107 /// .all_ticks()
1108 /// # }, |mut stream| async move {
1109 /// // [789, 789]
1110 /// # for w in vec![789, 789] {
1111 /// # assert_eq!(stream.next().await.unwrap(), w);
1112 /// # }
1113 /// # }));
1114 /// # }
1115 /// ```
1116 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1117 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
1118 where
1119 B: IsBounded,
1120 {
1121 self.filter_if(signal.is_some())
1122 }
1123
1124 /// Filters this optional, passing through the optional value if it is non-null **and** the
1125 /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
1126 ///
1127 /// Useful for conditionally processing, such as only emitting an optional's value outside
1128 /// a tick if some other condition is satisfied.
1129 ///
1130 /// # Example
1131 /// ```rust
1132 /// # #[cfg(feature = "deploy")] {
1133 /// # use hydro_lang::prelude::*;
1134 /// # use futures::StreamExt;
1135 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1136 /// let tick = process.tick();
1137 /// // ticks are lazy by default, forces the second tick to run
1138 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1139 ///
1140 /// let batch_first_tick = process
1141 /// .source_iter(q!(vec![]))
1142 /// .batch(&tick, nondet!(/** test */));
1143 /// let batch_second_tick = process
1144 /// .source_iter(q!(vec![456]))
1145 /// .batch(&tick, nondet!(/** test */))
1146 /// .defer_tick(); // appears on the second tick
1147 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1148 /// batch_first_tick.chain(batch_second_tick).first()
1149 /// .filter_if_none(some_on_first_tick)
1150 /// .unwrap_or(tick.singleton(q!(789)))
1151 /// .all_ticks()
1152 /// # }, |mut stream| async move {
1153 /// // [789, 789]
1154 /// # for w in vec![789, 456] {
1155 /// # assert_eq!(stream.next().await.unwrap(), w);
1156 /// # }
1157 /// # }));
1158 /// # }
1159 /// ```
1160 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1161 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
1162 where
1163 B: IsBounded,
1164 {
1165 self.filter_if(other.is_none())
1166 }
1167
1168 /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
1169 ///
1170 /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
1171 /// having a value, such as only releasing a piece of state if the node is the leader.
1172 ///
1173 /// # Example
1174 /// ```rust
1175 /// # #[cfg(feature = "deploy")] {
1176 /// # use hydro_lang::prelude::*;
1177 /// # use futures::StreamExt;
1178 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1179 /// let tick = process.tick();
1180 /// // ticks are lazy by default, forces the second tick to run
1181 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1182 ///
1183 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1184 /// some_on_first_tick
1185 /// .if_some_then(tick.singleton(q!(456)))
1186 /// .unwrap_or(tick.singleton(q!(123)))
1187 /// # .all_ticks()
1188 /// # }, |mut stream| async move {
1189 /// // 456 (first tick) ~> 123 (second tick onwards)
1190 /// # for w in vec![456, 123, 123] {
1191 /// # assert_eq!(stream.next().await.unwrap(), w);
1192 /// # }
1193 /// # }));
1194 /// # }
1195 /// ```
1196 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1197 pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
1198 where
1199 B: IsBounded,
1200 {
1201 value.filter_if(self.is_some())
1202 }
1203}
1204
1205impl<'a, K, V, L, B: Boundedness> Optional<(K, V), L, B>
1206where
1207 L: Location<'a>,
1208{
1209 /// Converts this optional into a [`KeyedSingleton`] containing a single entry with the
1210 /// key-value pair of this [`Optional`].
1211 ///
1212 /// If this [`Optional`] is [`Bounded`], the [`KeyedSingleton`] will be [`Bounded`] as well
1213 /// if it is [`Unbounded`], the [`KeyedSingleton`] will be [`Unbounded`], which means that
1214 /// the entry will be updated and appear / disappear according to the state of the
1215 /// [`Optional`].
1216 pub fn into_keyed_singleton(self) -> KeyedSingleton<K, V, L, B> {
1217 KeyedSingleton::new(
1218 self.location.clone(),
1219 HydroNode::Cast {
1220 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1221 metadata: self
1222 .location
1223 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1224 },
1225 )
1226 }
1227}
1228
1229impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
1230where
1231 L: Location<'a>,
1232{
1233 /// Returns an optional value corresponding to the latest snapshot of the optional
1234 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1235 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1236 /// all snapshots of this optional into the atomic-associated tick will observe the
1237 /// same value each tick.
1238 ///
1239 /// # Non-Determinism
1240 /// Because this picks a snapshot of a optional whose value is continuously changing,
1241 /// the output optional has a non-deterministic value since the snapshot can be at an
1242 /// arbitrary point in time.
1243 pub fn snapshot_atomic<L2: Location<'a, NoConsistency = L::NoConsistency>>(
1244 self,
1245 tick: &Tick<L2>,
1246 _nondet: NonDet,
1247 ) -> Optional<T, Tick<L::NoConsistency>, Bounded> {
1248 Optional::new(
1249 tick.drop_consistency(),
1250 HydroNode::Batch {
1251 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1252 metadata: tick
1253 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1254 },
1255 )
1256 }
1257
1258 /// Returns this optional back into a top-level, asynchronous execution context where updates
1259 /// to the value will be asynchronously propagated.
1260 pub fn end_atomic(self) -> Optional<T, L, B> {
1261 Optional::new(
1262 self.location.tick.l.clone(),
1263 HydroNode::EndAtomic {
1264 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1265 metadata: self
1266 .location
1267 .tick
1268 .l
1269 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1270 },
1271 )
1272 }
1273}
1274
1275impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1276where
1277 L: Location<'a>,
1278{
1279 /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1280 /// will observe the same version of the value and will be executed synchronously before any
1281 /// outputs are yielded (in [`Optional::end_atomic`]).
1282 ///
1283 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1284 /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1285 /// a different version).
1286 pub fn atomic(self) -> Optional<T, Atomic<L>, B> {
1287 let id = self.location.flow_state().borrow_mut().next_clock_id();
1288 let out_location = Atomic {
1289 tick: Tick {
1290 id,
1291 l: self.location.clone(),
1292 },
1293 };
1294 Optional::new(
1295 out_location.clone(),
1296 HydroNode::BeginAtomic {
1297 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1298 metadata: out_location
1299 .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1300 },
1301 )
1302 }
1303
1304 /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1305 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1306 /// relevant data that contributed to the snapshot at tick `t`.
1307 ///
1308 /// # Non-Determinism
1309 /// Because this picks a snapshot of a optional whose value is continuously changing,
1310 /// the output optional has a non-deterministic value since the snapshot can be at an
1311 /// arbitrary point in time.
1312 pub fn snapshot<L2: Location<'a, NoConsistency = L::NoConsistency>>(
1313 self,
1314 tick: &Tick<L2>,
1315 _nondet: NonDet,
1316 ) -> Optional<T, Tick<L::NoConsistency>, Bounded> {
1317 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1318 Optional::new(
1319 tick.drop_consistency(),
1320 HydroNode::Batch {
1321 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1322 metadata: tick
1323 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1324 },
1325 )
1326 }
1327
1328 /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1329 /// with order corresponding to increasing prefixes of data contributing to the optional.
1330 ///
1331 /// # Non-Determinism
1332 /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1333 /// to non-deterministic batching and arrival of inputs, the output stream is
1334 /// non-deterministic.
1335 pub fn sample_eager(
1336 self,
1337 nondet: NonDet,
1338 ) -> Stream<T, L::NoConsistency, Unbounded, TotalOrder, AtLeastOnce> {
1339 let tick = self.location.tick();
1340 self.snapshot(&tick, nondet).all_ticks().weaken_retries()
1341 }
1342
1343 /// Given a time interval, returns a stream corresponding to snapshots of the optional
1344 /// value taken at various points in time. Because the input optional may be
1345 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1346 /// represent the value of the optional given some prefix of the streams leading up to
1347 /// it.
1348 ///
1349 /// # Non-Determinism
1350 /// The output stream is non-deterministic in which elements are sampled, since this
1351 /// is controlled by a clock.
1352 pub fn sample_every(
1353 self,
1354 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1355 nondet: NonDet,
1356 ) -> Stream<T, L::NoConsistency, Unbounded, TotalOrder, AtLeastOnce>
1357 where
1358 L: NoAtomic,
1359 {
1360 let samples = self.location.source_interval(interval, nondet);
1361 let tick = self.location.tick();
1362
1363 self.snapshot(&tick, nondet)
1364 .filter_if(samples.batch(&tick, nondet).first().is_some())
1365 .all_ticks()
1366 .weaken_retries()
1367 }
1368}
1369
1370impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1371where
1372 L: Location<'a>,
1373{
1374 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1375 /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1376 /// null values).
1377 ///
1378 /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1379 /// producing one element in the output for each (non-null) tick. This is useful for batched
1380 /// computations, where the results from each tick must be combined together.
1381 ///
1382 /// # Example
1383 /// ```rust
1384 /// # #[cfg(feature = "deploy")] {
1385 /// # use hydro_lang::prelude::*;
1386 /// # use futures::StreamExt;
1387 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1388 /// # let tick = process.tick();
1389 /// # // ticks are lazy by default, forces the second tick to run
1390 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1391 /// # let batch_first_tick = process
1392 /// # .source_iter(q!(vec![]))
1393 /// # .batch(&tick, nondet!(/** test */));
1394 /// # let batch_second_tick = process
1395 /// # .source_iter(q!(vec![1, 2, 3]))
1396 /// # .batch(&tick, nondet!(/** test */))
1397 /// # .defer_tick(); // appears on the second tick
1398 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1399 /// input_batch // first tick: [], second tick: [1, 2, 3]
1400 /// .max()
1401 /// .all_ticks()
1402 /// # }, |mut stream| async move {
1403 /// // [3]
1404 /// # for w in vec![3] {
1405 /// # assert_eq!(stream.next().await.unwrap(), w);
1406 /// # }
1407 /// # }));
1408 /// # }
1409 /// ```
1410 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1411 self.into_stream().all_ticks()
1412 }
1413
1414 /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1415 /// which will stream the value computed in _each_ tick as a separate stream element.
1416 ///
1417 /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1418 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1419 /// optional's [`Tick`] context.
1420 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1421 self.into_stream().all_ticks_atomic()
1422 }
1423
1424 /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1425 /// be asynchronously updated with the latest value of the optional inside the tick, including
1426 /// whether the optional is null or not.
1427 ///
1428 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1429 /// tick that tracks the inner value. This is useful for getting the value as of the
1430 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1431 ///
1432 /// # Example
1433 /// ```rust
1434 /// # #[cfg(feature = "deploy")] {
1435 /// # use hydro_lang::prelude::*;
1436 /// # use futures::StreamExt;
1437 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1438 /// # let tick = process.tick();
1439 /// # // ticks are lazy by default, forces the second tick to run
1440 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1441 /// # let batch_first_tick = process
1442 /// # .source_iter(q!(vec![]))
1443 /// # .batch(&tick, nondet!(/** test */));
1444 /// # let batch_second_tick = process
1445 /// # .source_iter(q!(vec![1, 2, 3]))
1446 /// # .batch(&tick, nondet!(/** test */))
1447 /// # .defer_tick(); // appears on the second tick
1448 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1449 /// input_batch // first tick: [], second tick: [1, 2, 3]
1450 /// .max()
1451 /// .latest()
1452 /// # .into_singleton()
1453 /// # .sample_eager(nondet!(/** test */))
1454 /// # }, |mut stream| async move {
1455 /// // asynchronously changes from None ~> 3
1456 /// # for w in vec![None, Some(3)] {
1457 /// # assert_eq!(stream.next().await.unwrap(), w);
1458 /// # }
1459 /// # }));
1460 /// # }
1461 /// ```
1462 pub fn latest(self) -> Optional<T, L, Unbounded> {
1463 Optional::new(
1464 self.location.outer().clone(),
1465 HydroNode::YieldConcat {
1466 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1467 metadata: self
1468 .location
1469 .outer()
1470 .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1471 },
1472 )
1473 }
1474
1475 /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1476 /// be updated with the latest value of the optional inside the tick.
1477 ///
1478 /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1479 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1480 /// optional's [`Tick`] context.
1481 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1482 let out_location = Atomic {
1483 tick: self.location.clone(),
1484 };
1485
1486 Optional::new(
1487 out_location.clone(),
1488 HydroNode::YieldConcat {
1489 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1490 metadata: out_location
1491 .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1492 },
1493 )
1494 }
1495
1496 /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1497 /// always has the state of `self` at tick `T - 1`.
1498 ///
1499 /// At tick `0`, the output optional is null, since there is no previous tick.
1500 ///
1501 /// This operator enables stateful iterative processing with ticks, by sending data from one
1502 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1503 ///
1504 /// # Example
1505 /// ```rust
1506 /// # #[cfg(feature = "deploy")] {
1507 /// # use hydro_lang::prelude::*;
1508 /// # use futures::StreamExt;
1509 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1510 /// let tick = process.tick();
1511 /// // ticks are lazy by default, forces the second tick to run
1512 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1513 ///
1514 /// let batch_first_tick = process
1515 /// .source_iter(q!(vec![1, 2]))
1516 /// .batch(&tick, nondet!(/** test */));
1517 /// let batch_second_tick = process
1518 /// .source_iter(q!(vec![3, 4]))
1519 /// .batch(&tick, nondet!(/** test */))
1520 /// .defer_tick(); // appears on the second tick
1521 /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1522 /// .reduce(q!(|state, v| *state += v));
1523 ///
1524 /// current_tick_sum.clone().into_singleton().zip(
1525 /// current_tick_sum.defer_tick().into_singleton() // state from previous tick
1526 /// ).all_ticks()
1527 /// # }, |mut stream| async move {
1528 /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1529 /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1530 /// # assert_eq!(stream.next().await.unwrap(), w);
1531 /// # }
1532 /// # }));
1533 /// # }
1534 /// ```
1535 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1536 Optional::new(
1537 self.location.clone(),
1538 HydroNode::DeferTick {
1539 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1540 metadata: self.location.new_node_metadata(Self::collection_kind()),
1541 },
1542 )
1543 }
1544}
1545
1546#[cfg(test)]
1547mod tests {
1548 #[cfg(feature = "deploy")]
1549 use futures::StreamExt;
1550 #[cfg(feature = "deploy")]
1551 use hydro_deploy::Deployment;
1552 #[cfg(any(feature = "deploy", feature = "sim"))]
1553 use stageleft::q;
1554
1555 #[cfg(feature = "deploy")]
1556 use super::Optional;
1557 #[cfg(any(feature = "deploy", feature = "sim"))]
1558 use crate::compile::builder::FlowBuilder;
1559 #[cfg(any(feature = "deploy", feature = "sim"))]
1560 use crate::location::Location;
1561 #[cfg(feature = "deploy")]
1562 use crate::nondet::nondet;
1563
1564 #[cfg(feature = "deploy")]
1565 #[tokio::test]
1566 async fn optional_or_cardinality() {
1567 let mut deployment = Deployment::new();
1568
1569 let mut flow = FlowBuilder::new();
1570 let node = flow.process::<()>();
1571 let external = flow.external::<()>();
1572
1573 let node_tick = node.tick();
1574 let tick_singleton = node_tick.singleton(q!(123));
1575 let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1576 let counts = tick_optional_inhabited
1577 .clone()
1578 .or(tick_optional_inhabited)
1579 .into_stream()
1580 .count()
1581 .all_ticks()
1582 .send_bincode_external(&external);
1583
1584 let nodes = flow
1585 .with_process(&node, deployment.Localhost())
1586 .with_external(&external, deployment.Localhost())
1587 .deploy(&mut deployment);
1588
1589 deployment.deploy().await.unwrap();
1590
1591 let mut external_out = nodes.connect(counts).await;
1592
1593 deployment.start().await.unwrap();
1594
1595 assert_eq!(external_out.next().await.unwrap(), 1);
1596 }
1597
1598 #[cfg(feature = "deploy")]
1599 #[tokio::test]
1600 async fn into_singleton_top_level_none_cardinality() {
1601 let mut deployment = Deployment::new();
1602
1603 let mut flow = FlowBuilder::new();
1604 let node = flow.process::<()>();
1605 let external = flow.external::<()>();
1606
1607 let node_tick = node.tick();
1608 let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1609 let into_singleton = top_level_none.into_singleton();
1610
1611 let tick_driver = node.spin();
1612
1613 let counts = into_singleton
1614 .snapshot(&node_tick, nondet!(/** test */))
1615 .into_stream()
1616 .count()
1617 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1618 .map(q!(|(c, _)| c))
1619 .all_ticks()
1620 .send_bincode_external(&external);
1621
1622 let nodes = flow
1623 .with_process(&node, deployment.Localhost())
1624 .with_external(&external, deployment.Localhost())
1625 .deploy(&mut deployment);
1626
1627 deployment.deploy().await.unwrap();
1628
1629 let mut external_out = nodes.connect(counts).await;
1630
1631 deployment.start().await.unwrap();
1632
1633 assert_eq!(external_out.next().await.unwrap(), 1);
1634 assert_eq!(external_out.next().await.unwrap(), 1);
1635 assert_eq!(external_out.next().await.unwrap(), 1);
1636 }
1637
1638 #[cfg(feature = "deploy")]
1639 #[tokio::test]
1640 async fn into_singleton_unbounded_top_level_none_cardinality() {
1641 let mut deployment = Deployment::new();
1642
1643 let mut flow = FlowBuilder::new();
1644 let node = flow.process::<()>();
1645 let external = flow.external::<()>();
1646
1647 let node_tick = node.tick();
1648 let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1649 let into_singleton = top_level_none.into_singleton();
1650
1651 let tick_driver = node.spin();
1652
1653 let counts = into_singleton
1654 .snapshot(&node_tick, nondet!(/** test */))
1655 .into_stream()
1656 .count()
1657 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1658 .map(q!(|(c, _)| c))
1659 .all_ticks()
1660 .send_bincode_external(&external);
1661
1662 let nodes = flow
1663 .with_process(&node, deployment.Localhost())
1664 .with_external(&external, deployment.Localhost())
1665 .deploy(&mut deployment);
1666
1667 deployment.deploy().await.unwrap();
1668
1669 let mut external_out = nodes.connect(counts).await;
1670
1671 deployment.start().await.unwrap();
1672
1673 assert_eq!(external_out.next().await.unwrap(), 1);
1674 assert_eq!(external_out.next().await.unwrap(), 1);
1675 assert_eq!(external_out.next().await.unwrap(), 1);
1676 }
1677
1678 #[cfg(feature = "sim")]
1679 #[test]
1680 fn top_level_optional_some_into_stream_no_replay() {
1681 let mut flow = FlowBuilder::new();
1682 let node = flow.process::<()>();
1683
1684 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1685 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1686 let filtered_some = folded.filter(q!(|_| true));
1687
1688 let out_recv = filtered_some.into_stream().sim_output();
1689
1690 flow.sim().exhaustive(async || {
1691 out_recv.assert_yields_only([10]).await;
1692 });
1693 }
1694
1695 #[cfg(feature = "sim")]
1696 #[test]
1697 fn top_level_optional_none_into_stream_no_replay() {
1698 let mut flow = FlowBuilder::new();
1699 let node = flow.process::<()>();
1700
1701 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1702 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1703 let filtered_none = folded.filter(q!(|_| false));
1704
1705 let out_recv = filtered_none.into_stream().sim_output();
1706
1707 flow.sim().exhaustive(async || {
1708 out_recv.assert_yields_only([] as [i32; 0]).await;
1709 });
1710 }
1711}