Skip to main content

hydro_lang/live_collections/
boundedness.rs

1//! Type declarations for boundedness markers, which indicate whether a live collection is finite
2//! and immutable ([`Bounded`]) or asynchronously arriving over time ([`Unbounded`]).
3
4use sealed::sealed;
5
6use super::keyed_singleton::KeyedSingletonBound;
7use crate::compile::ir::BoundKind;
8use crate::live_collections::singleton::SingletonBound;
9
10/// A marker trait indicating whether a stream's length is bounded (finite) or unbounded (potentially infinite).
11///
12/// Implementors of this trait use it to signal the boundedness property of a stream.
13#[sealed]
14pub trait Boundedness:
15    SingletonBound<UnderlyingBound = Self> + KeyedSingletonBound<UnderlyingBound = Self>
16{
17    /// `true` if the bound is [`Bounded`], `false` if it is [`Unbounded`].
18    const BOUNDED: bool;
19
20    /// The [`BoundKind`] corresponding to this type.
21    const BOUND_KIND: BoundKind = if Self::BOUNDED {
22        BoundKind::Bounded
23    } else {
24        BoundKind::Unbounded
25    };
26
27    /// Determines the output ordering of a join based on this (right/build) side's boundedness.
28    ///
29    /// When this side is [`Bounded`], the join accumulates this side first and then
30    /// streams the left side through, preserving the left side's ordering `InO`.
31    /// When this side is [`Unbounded`], a symmetric hash join is used and ordering is lost.
32    type PreserveOrderIfBounded<InO: crate::live_collections::stream::Ordering>: crate::live_collections::stream::Ordering;
33}
34
35/// Marks the stream as being unbounded, which means that it is not
36/// guaranteed to be complete in finite time.
37pub enum Unbounded {}
38
39#[sealed]
40impl Boundedness for Unbounded {
41    const BOUNDED: bool = false;
42    type PreserveOrderIfBounded<InO: crate::live_collections::stream::Ordering> =
43        crate::live_collections::stream::NoOrder;
44}
45
46/// Marks the stream as being bounded, which means that it is guaranteed
47/// to be complete in finite time.
48pub enum Bounded {}
49
50#[sealed]
51impl Boundedness for Bounded {
52    const BOUNDED: bool = true;
53    type PreserveOrderIfBounded<InO: crate::live_collections::stream::Ordering> = InO;
54}
55
56#[sealed]
57#[diagnostic::on_unimplemented(
58    message = "The input collection must be bounded (`Bounded`), but has bound `{Self}`. Strengthen the boundedness upstream or consider a different API.",
59    label = "required here",
60    note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
61)]
62/// Marker trait that is implemented for the [`Bounded`] boundedness guarantee.
63pub trait IsBounded: Boundedness {}
64
65#[sealed]
66#[diagnostic::do_not_recommend]
67impl IsBounded for Bounded {}