Skip to main content

hydro_lang/location/
dynamic.rs

1//! Definitions for interacting with locations using an untyped interface.
2//!
3//! Under the hood, locations are associated with a [`LocationId`] value that
4//! uniquely identifies the location. Manipulating these values is useful for
5//! observability and transforming the Hydro IR.
6
7use serde::{Deserialize, Serialize};
8
9use super::LocationKey;
10use crate::compile::builder::ClockId;
11#[cfg(stageleft_runtime)]
12use crate::compile::{
13    builder::FlowState,
14    ir::{CollectionKind, HydroIrMetadata},
15};
16use crate::location::LocationType;
17
18/// An enumeration representing the consistency guarantee of a live collection on a cluster.
19#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Hash, Serialize, Deserialize)]
20pub enum ClusterConsistency {
21    /// No consistency is guaranteed, see [`super::cluster::NoConsistency`].
22    NoConsistency,
23    /// Eventual consistency is guaranteed, see [`super::cluster::EventualConsistency`].
24    EventuallyConsistent,
25}
26
27/// An enumeration representing a location heirarchy, including "virtual" locations (atomic/tick).
28#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Hash, Serialize, Deserialize)]
29pub enum LocationId {
30    /// A process root location (i.e. a single node).
31    Process(LocationKey),
32    /// A cluster root location (i.e. multiple nodes).
33    Cluster(LocationKey, ClusterConsistency),
34    /// An atomic region, within a tick.
35    Atomic(
36        /// The tick that the atomic region is associated with.
37        Box<LocationId>,
38    ),
39    /// A tick within a location.
40    Tick(ClockId, Box<LocationId>),
41}
42
43/// Implement Debug to Display-print the key, reduces snapshot verbosity.
44impl std::fmt::Debug for LocationId {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        match self {
47            LocationId::Process(key) => write!(f, "Process({key})"),
48            LocationId::Cluster(key, c) => write!(f, "Cluster({key}, {c:?})"),
49            LocationId::Atomic(tick) => write!(f, "Atomic({tick:?})"),
50            LocationId::Tick(tick, id) => write!(f, "Tick({tick}, {id:?})"),
51        }
52    }
53}
54
55impl LocationId {
56    /// The [`LocationType`] of this location ID. `None` if this is not a root location.
57    pub fn location_type(&self) -> Option<LocationType> {
58        match self {
59            LocationId::Process(_) => Some(LocationType::Process),
60            LocationId::Cluster(_, _) => Some(LocationType::Cluster),
61            _ => None,
62        }
63    }
64}
65
66#[expect(missing_docs, reason = "TODO")]
67impl LocationId {
68    pub fn root(&self) -> &LocationId {
69        match self {
70            LocationId::Process(_) => self,
71            LocationId::Cluster(_, _) => self,
72            LocationId::Atomic(tick) => tick.root(),
73            LocationId::Tick(_, id) => id.root(),
74        }
75    }
76
77    pub fn is_root(&self) -> bool {
78        match self {
79            LocationId::Process(_) | LocationId::Cluster(_, _) => true,
80            LocationId::Atomic(_) => false,
81            LocationId::Tick(_, _) => false,
82        }
83    }
84
85    pub fn is_top_level(&self) -> bool {
86        match self {
87            LocationId::Process(_) | LocationId::Cluster(_, _) => true,
88            LocationId::Atomic(_) => true,
89            LocationId::Tick(_, _) => false,
90        }
91    }
92
93    pub fn key(&self) -> LocationKey {
94        match self {
95            LocationId::Process(id) => *id,
96            LocationId::Cluster(id, _) => *id,
97            LocationId::Atomic(_) => panic!("cannot get raw id for atomic"),
98            LocationId::Tick(_, _) => panic!("cannot get raw id for tick"),
99        }
100    }
101
102    pub fn swap_root(&mut self, new_root: LocationId) {
103        match self {
104            LocationId::Tick(_, id) => {
105                id.swap_root(new_root);
106            }
107            LocationId::Atomic(tick) => {
108                tick.swap_root(new_root);
109            }
110            _ => {
111                assert!(new_root.is_root());
112                *self = new_root;
113            }
114        }
115    }
116
117    pub fn new_node_metadata(self, collection_kind: CollectionKind) -> HydroIrMetadata {
118        use crate::compile::ir::HydroIrOpMetadata;
119        use crate::compile::ir::backtrace::Backtrace;
120
121        HydroIrMetadata {
122            location_id: self,
123            collection_kind,
124            cardinality: None,
125            tag: None,
126            op: HydroIrOpMetadata {
127                backtrace: Backtrace::get_backtrace(3),
128                cpu_usage: None,
129                network_recv_cpu_usage: None,
130                id: None,
131            },
132        }
133    }
134}
135
136#[cfg(stageleft_runtime)]
137pub(crate) trait DynLocation: Clone {
138    fn dyn_id(&self) -> LocationId;
139
140    fn flow_state(&self) -> &FlowState;
141    fn is_top_level() -> bool;
142    fn multiversioned(&self) -> bool;
143
144    fn new_node_metadata(&self, collection_kind: CollectionKind) -> HydroIrMetadata {
145        self.dyn_id().new_node_metadata(collection_kind)
146    }
147}