hydro_lang/
forward_handle.rs1use sealed::sealed;
4
5use crate::compile::builder::CycleId;
6use crate::location::Location;
7use crate::location::dynamic::LocationId;
8use crate::staging_util::Invariant;
9
10#[sealed]
11pub(crate) trait ReceiverKind {}
12
13pub enum ForwardRef {}
18
19#[sealed]
20impl ReceiverKind for ForwardRef {}
21
22pub enum TickCycle {}
27
28#[sealed]
29impl ReceiverKind for TickCycle {}
30
31pub(crate) trait ReceiverComplete<'a, Marker>
32where
33 Marker: ReceiverKind,
34{
35 fn complete(self, cycle_id: CycleId, expected_location: LocationId);
36}
37
38pub(crate) trait CycleCollection<'a, Kind>: ReceiverComplete<'a, Kind>
39where
40 Kind: ReceiverKind,
41{
42 type Location: Location<'a>;
43
44 fn create_source(id: CycleId, location: Self::Location) -> Self;
45}
46
47pub(crate) trait CycleCollectionWithInitial<'a, Kind>: ReceiverComplete<'a, Kind>
48where
49 Kind: ReceiverKind,
50{
51 type Location: Location<'a>;
52
53 fn location(&self) -> &Self::Location;
54
55 fn create_source_with_initial(
56 cycle_id: CycleId,
57 initial: Self,
58 location: Self::Location,
59 ) -> Self;
60}
61
62#[expect(
66 private_bounds,
67 reason = "only Hydro collections can implement ReceiverComplete"
68)]
69pub struct ForwardHandle<'a, C: ReceiverComplete<'a, ForwardRef>> {
70 completed: bool,
71 cycle_id: CycleId,
72 expected_location: LocationId,
73 _phantom: Invariant<'a, C>,
74}
75
76#[expect(
77 private_bounds,
78 reason = "only Hydro collections can implement ReceiverComplete"
79)]
80impl<'a, C: ReceiverComplete<'a, ForwardRef>> ForwardHandle<'a, C> {
81 pub(crate) fn new(cycle_id: CycleId, expected_location: LocationId) -> Self {
82 Self {
83 completed: false,
84 cycle_id,
85 expected_location,
86 _phantom: std::marker::PhantomData,
87 }
88 }
89}
90
91impl<'a, C: ReceiverComplete<'a, ForwardRef>> Drop for ForwardHandle<'a, C> {
92 fn drop(&mut self) {
93 if !self.completed && !std::thread::panicking() {
94 panic!("ForwardHandle dropped without being completed");
95 }
96 }
97}
98
99#[expect(
100 private_bounds,
101 reason = "only Hydro collections can implement ReceiverComplete"
102)]
103impl<'a, C: ReceiverComplete<'a, ForwardRef>> ForwardHandle<'a, C> {
104 pub fn complete(mut self, stream: impl Into<C>) {
111 self.completed = true;
112 C::complete(stream.into(), self.cycle_id, self.expected_location.clone())
113 }
114}
115
116#[expect(
120 private_bounds,
121 reason = "only Hydro collections can implement ReceiverComplete"
122)]
123pub struct TickCycleHandle<'a, C: ReceiverComplete<'a, TickCycle>> {
124 completed: bool,
125 cycle_id: CycleId,
126 expected_location: LocationId,
127 _phantom: Invariant<'a, C>,
128}
129
130#[expect(
131 private_bounds,
132 reason = "only Hydro collections can implement ReceiverComplete"
133)]
134impl<'a, C: ReceiverComplete<'a, TickCycle>> TickCycleHandle<'a, C> {
135 pub(crate) fn new(cycle_id: CycleId, expected_location: LocationId) -> Self {
136 Self {
137 completed: false,
138 cycle_id,
139 expected_location,
140 _phantom: std::marker::PhantomData,
141 }
142 }
143}
144
145impl<'a, C: ReceiverComplete<'a, TickCycle>> Drop for TickCycleHandle<'a, C> {
146 fn drop(&mut self) {
147 if !self.completed && !std::thread::panicking() {
148 panic!("TickCycleHandle dropped without being completed");
149 }
150 }
151}
152
153#[expect(
154 private_bounds,
155 reason = "only Hydro collections can implement ReceiverComplete"
156)]
157impl<'a, C: ReceiverComplete<'a, TickCycle>> TickCycleHandle<'a, C> {
158 pub fn complete_next_tick(mut self, stream: impl Into<C>) {
162 self.completed = true;
163 C::complete(stream.into(), self.cycle_id, self.expected_location.clone())
164 }
165}
166
167#[doc(hidden)]
170pub trait CompleteCycle<S> {
171 fn complete_next_tick(self, state: S);
173}
174
175impl<'a, C: ReceiverComplete<'a, TickCycle>> CompleteCycle<C> for TickCycleHandle<'a, C> {
176 fn complete_next_tick(self, state: C) {
177 TickCycleHandle::complete_next_tick(self, state)
178 }
179}