hydro_lang/live_collections/stream/networking.rs
1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, MinOrder, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(feature = "sim")]
18use crate::location::LocationKey;
19use crate::location::cluster::NoConsistency;
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::DynLocation;
22use crate::location::external_process::ExternalBincodeStream;
23use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, Process};
24use crate::networking::{NetworkFor, TCP};
25use crate::nondet::{NonDet, nondet};
26#[cfg(feature = "sim")]
27use crate::sim::SimReceiver;
28use crate::staging_util::get_this_crate;
29
30// same as the one in `hydro_std`, but internal use only
31fn track_membership<'a, C, L: Location<'a>>(
32 membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
33) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
34 membership.fold(
35 q!(|| false),
36 q!(|present, event| {
37 match event {
38 MembershipEvent::Joined => *present = true,
39 MembershipEvent::Left => *present = false,
40 }
41 }),
42 )
43}
44
45fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
46 let root = get_this_crate();
47
48 if is_demux {
49 parse_quote! {
50 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
51 |(id, data)| {
52 (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
53 }
54 )
55 }
56 } else {
57 parse_quote! {
58 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
59 |data| {
60 #root::runtime_support::bincode::serialize(&data).unwrap().into()
61 }
62 )
63 }
64 }
65}
66
67pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
68 serialize_bincode_with_type(is_demux, "e_type::<T>())
69}
70
71fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
72 let root = get_this_crate();
73 if let Some(c_type) = tagged {
74 parse_quote! {
75 |res| {
76 let (id, b) = res.unwrap();
77 (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
78 }
79 }
80 } else {
81 parse_quote! {
82 |res| {
83 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
84 }
85 }
86 }
87}
88
89pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
90 deserialize_bincode_with_type(tagged, "e_type::<T>())
91}
92
93impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
94 #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
95 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
96 /// using [`bincode`] to serialize/deserialize messages.
97 ///
98 /// The returned stream captures the elements received at the destination, where values will
99 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
100 /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
101 /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
102 /// dropped no further messages will be sent.
103 ///
104 /// # Example
105 /// ```rust
106 /// # #[cfg(feature = "deploy")] {
107 /// # use hydro_lang::prelude::*;
108 /// # use futures::StreamExt;
109 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
110 /// let p1 = flow.process::<()>();
111 /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
112 /// let p2 = flow.process::<()>();
113 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
114 /// // 1, 2, 3
115 /// # on_p2.send_bincode(&p_out)
116 /// # }, |mut stream| async move {
117 /// # for w in 1..=3 {
118 /// # assert_eq!(stream.next().await, Some(w));
119 /// # }
120 /// # }));
121 /// # }
122 /// ```
123 pub fn send_bincode<L2>(
124 self,
125 other: &Process<'a, L2>,
126 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
127 where
128 T: Serialize + DeserializeOwned,
129 {
130 self.send(other, TCP.fail_stop().bincode())
131 }
132
133 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
134 /// using the configuration in `via` to set up the message transport.
135 ///
136 /// The returned stream captures the elements received at the destination, where values will
137 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
138 /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
139 /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
140 /// dropped no further messages will be sent.
141 ///
142 /// # Example
143 /// ```rust
144 /// # #[cfg(feature = "deploy")] {
145 /// # use hydro_lang::prelude::*;
146 /// # use futures::StreamExt;
147 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
148 /// let p1 = flow.process::<()>();
149 /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
150 /// let p2 = flow.process::<()>();
151 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.fail_stop().bincode());
152 /// // 1, 2, 3
153 /// # on_p2.send(&p_out, TCP.fail_stop().bincode())
154 /// # }, |mut stream| async move {
155 /// # for w in 1..=3 {
156 /// # assert_eq!(stream.next().await, Some(w));
157 /// # }
158 /// # }));
159 /// # }
160 /// ```
161 pub fn send<L2, N: NetworkFor<T>>(
162 self,
163 to: &Process<'a, L2>,
164 via: N,
165 ) -> Stream<T, Process<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
166 where
167 T: Serialize + DeserializeOwned,
168 O: MinOrder<N::OrderingGuarantee>,
169 {
170 let serialize_pipeline = Some(N::serialize_thunk(false));
171 let deserialize_pipeline = Some(N::deserialize_thunk(None));
172
173 let name = via.name();
174 if to.multiversioned() && name.is_none() {
175 panic!(
176 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
177 );
178 }
179
180 Stream::new(
181 to.clone(),
182 HydroNode::Network {
183 name: name.map(ToOwned::to_owned),
184 networking_info: N::networking_info(),
185 serialize_fn: serialize_pipeline.map(|e| e.into()),
186 instantiate_fn: DebugInstantiate::Building,
187 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
188 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
189 metadata: to.new_node_metadata(Stream::<
190 T,
191 Process<'a, L2>,
192 Unbounded,
193 <O as MinOrder<N::OrderingGuarantee>>::Min,
194 R,
195 >::collection_kind()),
196 },
197 )
198 }
199
200 #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
201 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
202 /// using [`bincode`] to serialize/deserialize messages.
203 ///
204 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
205 /// membership information. This is a common pattern in distributed systems for broadcasting data to
206 /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
207 /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
208 /// each element to all cluster members.
209 ///
210 /// # Non-Determinism
211 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
212 /// to the current cluster members _at that point in time_. Depending on when we are notified of
213 /// membership changes, we will broadcast each element to different members.
214 ///
215 /// # Example
216 /// ```rust
217 /// # #[cfg(feature = "deploy")] {
218 /// # use hydro_lang::prelude::*;
219 /// # use futures::StreamExt;
220 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
221 /// let p1 = flow.process::<()>();
222 /// let workers: Cluster<()> = flow.cluster::<()>();
223 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
224 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
225 /// # on_worker.send_bincode(&p2).entries()
226 /// // if there are 4 members in the cluster, each receives one element
227 /// // - MemberId::<()>(0): [123]
228 /// // - MemberId::<()>(1): [123]
229 /// // - MemberId::<()>(2): [123]
230 /// // - MemberId::<()>(3): [123]
231 /// # }, |mut stream| async move {
232 /// # let mut results = Vec::new();
233 /// # for w in 0..4 {
234 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
235 /// # }
236 /// # results.sort();
237 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
238 /// # }));
239 /// # }
240 /// ```
241 pub fn broadcast_bincode<L2: 'a>(
242 self,
243 other: &Cluster<'a, L2>,
244 nondet_membership: NonDet,
245 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
246 where
247 T: Clone + Serialize + DeserializeOwned,
248 {
249 self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
250 }
251
252 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
253 /// using the configuration in `via` to set up the message transport.
254 ///
255 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
256 /// membership information. This is a common pattern in distributed systems for broadcasting data to
257 /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
258 /// target specific members, `broadcast` takes a stream of **only data elements** and sends
259 /// each element to all cluster members.
260 ///
261 /// # Non-Determinism
262 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
263 /// to the current cluster members _at that point in time_. Depending on when we are notified of
264 /// membership changes, we will broadcast each element to different members.
265 ///
266 /// # Example
267 /// ```rust
268 /// # #[cfg(feature = "deploy")] {
269 /// # use hydro_lang::prelude::*;
270 /// # use futures::StreamExt;
271 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
272 /// let p1 = flow.process::<()>();
273 /// let workers: Cluster<()> = flow.cluster::<()>();
274 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
275 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
276 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
277 /// // if there are 4 members in the cluster, each receives one element
278 /// // - MemberId::<()>(0): [123]
279 /// // - MemberId::<()>(1): [123]
280 /// // - MemberId::<()>(2): [123]
281 /// // - MemberId::<()>(3): [123]
282 /// # }, |mut stream| async move {
283 /// # let mut results = Vec::new();
284 /// # for w in 0..4 {
285 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
286 /// # }
287 /// # results.sort();
288 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
289 /// # }));
290 /// # }
291 /// ```
292 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
293 self,
294 to: &Cluster<'a, L2>,
295 via: N,
296 nondet_membership: NonDet,
297 ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
298 where
299 T: Clone + Serialize + DeserializeOwned,
300 O: MinOrder<N::OrderingGuarantee>,
301 {
302 let ids = track_membership(self.location.source_cluster_membership_stream(
303 to,
304 nondet!(/** droppped prefixes don't affect broadcast */),
305 ));
306 sliced! {
307 let members_snapshot = use(ids, nondet_membership);
308 let elements = use(self, nondet_membership);
309
310 let current_members = members_snapshot.filter(q!(|b| *b));
311 elements.repeat_with_keys(current_members)
312 }
313 .demux(to, via)
314 }
315
316 /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
317 /// serialization. The external process can receive these elements by establishing a TCP
318 /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
319 ///
320 /// # Example
321 /// ```rust
322 /// # #[cfg(feature = "deploy")] {
323 /// # use hydro_lang::prelude::*;
324 /// # use futures::StreamExt;
325 /// # tokio_test::block_on(async move {
326 /// let mut flow = FlowBuilder::new();
327 /// let process = flow.process::<()>();
328 /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
329 /// let external = flow.external::<()>();
330 /// let external_handle = numbers.send_bincode_external(&external);
331 ///
332 /// let mut deployment = hydro_deploy::Deployment::new();
333 /// let nodes = flow
334 /// .with_process(&process, deployment.Localhost())
335 /// .with_external(&external, deployment.Localhost())
336 /// .deploy(&mut deployment);
337 ///
338 /// deployment.deploy().await.unwrap();
339 /// // establish the TCP connection
340 /// let mut external_recv_stream = nodes.connect(external_handle).await;
341 /// deployment.start().await.unwrap();
342 ///
343 /// for w in 1..=3 {
344 /// assert_eq!(external_recv_stream.next().await, Some(w));
345 /// }
346 /// # });
347 /// # }
348 /// ```
349 pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
350 where
351 T: Serialize + DeserializeOwned,
352 {
353 let serialize_pipeline = Some(serialize_bincode::<T>(false));
354
355 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
356
357 let external_port_id = flow_state_borrow.next_external_port();
358
359 flow_state_borrow.push_root(HydroRoot::SendExternal {
360 to_external_key: other.key,
361 to_port_id: external_port_id,
362 to_many: false,
363 unpaired: true,
364 serialize_fn: serialize_pipeline.map(|e| e.into()),
365 instantiate_fn: DebugInstantiate::Building,
366 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
367 op_metadata: HydroIrOpMetadata::new(),
368 });
369
370 ExternalBincodeStream {
371 process_key: other.key,
372 port_id: external_port_id,
373 _phantom: PhantomData,
374 }
375 }
376
377 #[cfg(feature = "sim")]
378 /// Sets up a simulation output port for this stream, allowing test code to receive elements
379 /// sent to this stream during simulation.
380 pub fn sim_output(self) -> SimReceiver<T, O, R>
381 where
382 T: Serialize + DeserializeOwned,
383 {
384 let external_location: External<'a, ()> = External {
385 key: LocationKey::FIRST,
386 flow_state: self.location.flow_state().clone(),
387 _phantom: PhantomData,
388 };
389
390 let external = self.send_bincode_external(&external_location);
391
392 SimReceiver(external.port_id, PhantomData)
393 }
394}
395
396impl<'a, T, L: Location<'a>, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce> {
397 /// Creates an external output for embedded deployment mode.
398 ///
399 /// The `name` parameter specifies the name of the field in the generated
400 /// `EmbeddedOutputs` struct that will receive elements from this stream.
401 /// The generated function will accept an `EmbeddedOutputs` struct with an
402 /// `impl FnMut(T)` field with this name.
403 pub fn embedded_output(self, name: impl Into<String>) {
404 let ident = syn::Ident::new(&name.into(), proc_macro2::Span::call_site());
405
406 self.location
407 .flow_state()
408 .borrow_mut()
409 .push_root(HydroRoot::EmbeddedOutput {
410 ident,
411 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
412 op_metadata: HydroIrOpMetadata::new(),
413 });
414 }
415}
416
417impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
418 Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
419{
420 #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
421 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
422 /// using [`bincode`] to serialize/deserialize messages.
423 ///
424 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
425 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
426 /// this API allows precise targeting of specific cluster members rather than broadcasting to
427 /// all members.
428 ///
429 /// # Example
430 /// ```rust
431 /// # #[cfg(feature = "deploy")] {
432 /// # use hydro_lang::prelude::*;
433 /// # use futures::StreamExt;
434 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
435 /// let p1 = flow.process::<()>();
436 /// let workers: Cluster<()> = flow.cluster::<()>();
437 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
438 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
439 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
440 /// .demux_bincode(&workers);
441 /// # on_worker.send_bincode(&p2).entries()
442 /// // if there are 4 members in the cluster, each receives one element
443 /// // - MemberId::<()>(0): [0]
444 /// // - MemberId::<()>(1): [1]
445 /// // - MemberId::<()>(2): [2]
446 /// // - MemberId::<()>(3): [3]
447 /// # }, |mut stream| async move {
448 /// # let mut results = Vec::new();
449 /// # for w in 0..4 {
450 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
451 /// # }
452 /// # results.sort();
453 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
454 /// # }));
455 /// # }
456 /// ```
457 pub fn demux_bincode(
458 self,
459 other: &Cluster<'a, L2>,
460 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
461 where
462 T: Serialize + DeserializeOwned,
463 {
464 self.demux(other, TCP.fail_stop().bincode())
465 }
466
467 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
468 /// using the configuration in `via` to set up the message transport.
469 ///
470 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
471 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
472 /// this API allows precise targeting of specific cluster members rather than broadcasting to
473 /// all members.
474 ///
475 /// # Example
476 /// ```rust
477 /// # #[cfg(feature = "deploy")] {
478 /// # use hydro_lang::prelude::*;
479 /// # use futures::StreamExt;
480 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
481 /// let p1 = flow.process::<()>();
482 /// let workers: Cluster<()> = flow.cluster::<()>();
483 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
484 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
485 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
486 /// .demux(&workers, TCP.fail_stop().bincode());
487 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
488 /// // if there are 4 members in the cluster, each receives one element
489 /// // - MemberId::<()>(0): [0]
490 /// // - MemberId::<()>(1): [1]
491 /// // - MemberId::<()>(2): [2]
492 /// // - MemberId::<()>(3): [3]
493 /// # }, |mut stream| async move {
494 /// # let mut results = Vec::new();
495 /// # for w in 0..4 {
496 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
497 /// # }
498 /// # results.sort();
499 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
500 /// # }));
501 /// # }
502 /// ```
503 #[expect(clippy::type_complexity, reason = "NoConsistency type")]
504 pub fn demux<N: NetworkFor<T>>(
505 self,
506 to: &Cluster<'a, L2>,
507 via: N,
508 ) -> Stream<
509 T,
510 Cluster<'a, L2, NoConsistency>,
511 Unbounded,
512 <O as MinOrder<N::OrderingGuarantee>>::Min,
513 R,
514 >
515 where
516 T: Serialize + DeserializeOwned,
517 O: MinOrder<N::OrderingGuarantee>,
518 {
519 self.into_keyed().demux(to, via)
520 }
521}
522
523impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
524 #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
525 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
526 /// [`bincode`] to serialize/deserialize messages.
527 ///
528 /// This provides load balancing by evenly distributing work across cluster members. The
529 /// distribution is deterministic based on element order - the first element goes to member 0,
530 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
531 ///
532 /// # Non-Determinism
533 /// The set of cluster members may asynchronously change over time. Each element is distributed
534 /// based on the current cluster membership _at that point in time_. Depending on when cluster
535 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
536 /// membership is stable, the order of members in the round-robin pattern may change across runs.
537 ///
538 /// # Ordering Requirements
539 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
540 /// order of messages and retries affects the round-robin pattern.
541 ///
542 /// # Example
543 /// ```rust
544 /// # #[cfg(feature = "deploy")] {
545 /// # use hydro_lang::prelude::*;
546 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
547 /// # use futures::StreamExt;
548 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
549 /// let p1 = flow.process::<()>();
550 /// let workers: Cluster<()> = flow.cluster::<()>();
551 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
552 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
553 /// on_worker.send_bincode(&p2)
554 /// # .first().values() // we use first to assert that each member gets one element
555 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
556 /// // - MemberId::<()>(?): [1]
557 /// // - MemberId::<()>(?): [2]
558 /// // - MemberId::<()>(?): [3]
559 /// // - MemberId::<()>(?): [4]
560 /// # }, |mut stream| async move {
561 /// # let mut results = Vec::new();
562 /// # for w in 0..4 {
563 /// # results.push(stream.next().await.unwrap());
564 /// # }
565 /// # results.sort();
566 /// # assert_eq!(results, vec![1, 2, 3, 4]);
567 /// # }));
568 /// # }
569 /// ```
570 pub fn round_robin_bincode<L2: 'a>(
571 self,
572 other: &Cluster<'a, L2>,
573 nondet_membership: NonDet,
574 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
575 where
576 T: Serialize + DeserializeOwned,
577 {
578 self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
579 }
580
581 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
582 /// the configuration in `via` to set up the message transport.
583 ///
584 /// This provides load balancing by evenly distributing work across cluster members. The
585 /// distribution is deterministic based on element order - the first element goes to member 0,
586 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
587 ///
588 /// # Non-Determinism
589 /// The set of cluster members may asynchronously change over time. Each element is distributed
590 /// based on the current cluster membership _at that point in time_. Depending on when cluster
591 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
592 /// membership is stable, the order of members in the round-robin pattern may change across runs.
593 ///
594 /// # Ordering Requirements
595 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
596 /// order of messages and retries affects the round-robin pattern.
597 ///
598 /// # Example
599 /// ```rust
600 /// # #[cfg(feature = "deploy")] {
601 /// # use hydro_lang::prelude::*;
602 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
603 /// # use futures::StreamExt;
604 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
605 /// let p1 = flow.process::<()>();
606 /// let workers: Cluster<()> = flow.cluster::<()>();
607 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
608 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
609 /// on_worker.send(&p2, TCP.fail_stop().bincode())
610 /// # .first().values() // we use first to assert that each member gets one element
611 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
612 /// // - MemberId::<()>(?): [1]
613 /// // - MemberId::<()>(?): [2]
614 /// // - MemberId::<()>(?): [3]
615 /// // - MemberId::<()>(?): [4]
616 /// # }, |mut stream| async move {
617 /// # let mut results = Vec::new();
618 /// # for w in 0..4 {
619 /// # results.push(stream.next().await.unwrap());
620 /// # }
621 /// # results.sort();
622 /// # assert_eq!(results, vec![1, 2, 3, 4]);
623 /// # }));
624 /// # }
625 /// ```
626 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
627 self,
628 to: &Cluster<'a, L2>,
629 via: N,
630 nondet_membership: NonDet,
631 ) -> Stream<T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
632 where
633 T: Serialize + DeserializeOwned,
634 {
635 let ids = track_membership(self.location.source_cluster_membership_stream(
636 to,
637 nondet!(/** droppped prefixes don't affect broadcast */),
638 ));
639 sliced! {
640 let members_snapshot = use(ids, nondet_membership);
641 let elements = use(self.enumerate(), nondet_membership);
642
643 let current_members = members_snapshot
644 .filter(q!(|b| *b))
645 .keys()
646 .assume_ordering::<TotalOrder>(nondet_membership)
647 .collect_vec();
648
649 elements
650 .cross_singleton(current_members)
651 .filter_map(q!(|(data, members)| {
652 if members.is_empty() {
653 None
654 } else {
655 Some((members[data.0 % members.len()].clone(), data.1))
656 }
657 }))
658 }
659 .demux(to, via)
660 }
661}
662
663impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
664 #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
665 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
666 /// [`bincode`] to serialize/deserialize messages.
667 ///
668 /// This provides load balancing by evenly distributing work across cluster members. The
669 /// distribution is deterministic based on element order - the first element goes to member 0,
670 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
671 ///
672 /// # Non-Determinism
673 /// The set of cluster members may asynchronously change over time. Each element is distributed
674 /// based on the current cluster membership _at that point in time_. Depending on when cluster
675 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
676 /// membership is stable, the order of members in the round-robin pattern may change across runs.
677 ///
678 /// # Ordering Requirements
679 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
680 /// order of messages and retries affects the round-robin pattern.
681 ///
682 /// # Example
683 /// ```rust
684 /// # #[cfg(feature = "deploy")] {
685 /// # use hydro_lang::prelude::*;
686 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
687 /// # use hydro_lang::location::MemberId;
688 /// # use futures::StreamExt;
689 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
690 /// let p1 = flow.process::<()>();
691 /// let workers1: Cluster<()> = flow.cluster::<()>();
692 /// let workers2: Cluster<()> = flow.cluster::<()>();
693 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
694 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
695 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
696 /// on_worker2.send_bincode(&p2)
697 /// # .entries()
698 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
699 /// # }, |mut stream| async move {
700 /// # let mut results = Vec::new();
701 /// # let mut locations = std::collections::HashSet::new();
702 /// # for w in 0..=16 {
703 /// # let (location, v) = stream.next().await.unwrap();
704 /// # locations.insert(location);
705 /// # results.push(v);
706 /// # }
707 /// # results.sort();
708 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
709 /// # assert_eq!(locations.len(), 16);
710 /// # }));
711 /// # }
712 /// ```
713 pub fn round_robin_bincode<L2: 'a>(
714 self,
715 other: &Cluster<'a, L2>,
716 nondet_membership: NonDet,
717 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
718 where
719 T: Serialize + DeserializeOwned,
720 {
721 self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
722 }
723
724 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
725 /// the configuration in `via` to set up the message transport.
726 ///
727 /// This provides load balancing by evenly distributing work across cluster members. The
728 /// distribution is deterministic based on element order - the first element goes to member 0,
729 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
730 ///
731 /// # Non-Determinism
732 /// The set of cluster members may asynchronously change over time. Each element is distributed
733 /// based on the current cluster membership _at that point in time_. Depending on when cluster
734 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
735 /// membership is stable, the order of members in the round-robin pattern may change across runs.
736 ///
737 /// # Ordering Requirements
738 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
739 /// order of messages and retries affects the round-robin pattern.
740 ///
741 /// # Example
742 /// ```rust
743 /// # #[cfg(feature = "deploy")] {
744 /// # use hydro_lang::prelude::*;
745 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
746 /// # use hydro_lang::location::MemberId;
747 /// # use futures::StreamExt;
748 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
749 /// let p1 = flow.process::<()>();
750 /// let workers1: Cluster<()> = flow.cluster::<()>();
751 /// let workers2: Cluster<()> = flow.cluster::<()>();
752 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
753 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
754 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
755 /// on_worker2.send(&p2, TCP.fail_stop().bincode())
756 /// # .entries()
757 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
758 /// # }, |mut stream| async move {
759 /// # let mut results = Vec::new();
760 /// # let mut locations = std::collections::HashSet::new();
761 /// # for w in 0..=16 {
762 /// # let (location, v) = stream.next().await.unwrap();
763 /// # locations.insert(location);
764 /// # results.push(v);
765 /// # }
766 /// # results.sort();
767 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
768 /// # assert_eq!(locations.len(), 16);
769 /// # }));
770 /// # }
771 /// ```
772 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
773 self,
774 to: &Cluster<'a, L2>,
775 via: N,
776 nondet_membership: NonDet,
777 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
778 where
779 T: Serialize + DeserializeOwned,
780 {
781 let ids = track_membership(self.location.source_cluster_membership_stream(
782 to,
783 nondet!(/** droppped prefixes don't affect broadcast */),
784 ));
785 sliced! {
786 let members_snapshot = use(ids, nondet_membership);
787 let elements = use(self.enumerate(), nondet_membership);
788
789 let current_members = members_snapshot
790 .filter(q!(|b| *b))
791 .keys()
792 .assume_ordering::<TotalOrder>(nondet_membership)
793 .collect_vec();
794
795 elements
796 .cross_singleton(current_members)
797 .filter_map(q!(|(data, members)| {
798 if members.is_empty() {
799 None
800 } else {
801 Some((members[data.0 % members.len()].clone(), data.1))
802 }
803 }))
804 }
805 .demux(to, via)
806 }
807}
808
809impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
810 #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
811 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
812 /// using [`bincode`] to serialize/deserialize messages.
813 ///
814 /// Each cluster member sends its local stream elements, and they are collected at the destination
815 /// as a [`KeyedStream`] where keys identify the source cluster member.
816 ///
817 /// # Example
818 /// ```rust
819 /// # #[cfg(feature = "deploy")] {
820 /// # use hydro_lang::prelude::*;
821 /// # use futures::StreamExt;
822 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
823 /// let workers: Cluster<()> = flow.cluster::<()>();
824 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
825 /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
826 /// # all_received.entries()
827 /// # }, |mut stream| async move {
828 /// // if there are 4 members in the cluster, we should receive 4 elements
829 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
830 /// # let mut results = Vec::new();
831 /// # for w in 0..4 {
832 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
833 /// # }
834 /// # results.sort();
835 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
836 /// # }));
837 /// # }
838 /// ```
839 ///
840 /// If you don't need to know the source for each element, you can use `.values()`
841 /// to get just the data:
842 /// ```rust
843 /// # #[cfg(feature = "deploy")] {
844 /// # use hydro_lang::prelude::*;
845 /// # use hydro_lang::live_collections::stream::NoOrder;
846 /// # use futures::StreamExt;
847 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
848 /// # let workers: Cluster<()> = flow.cluster::<()>();
849 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
850 /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
851 /// # values
852 /// # }, |mut stream| async move {
853 /// # let mut results = Vec::new();
854 /// # for w in 0..4 {
855 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
856 /// # }
857 /// # results.sort();
858 /// // if there are 4 members in the cluster, we should receive 4 elements
859 /// // 1, 1, 1, 1
860 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
861 /// # }));
862 /// # }
863 /// ```
864 pub fn send_bincode<L2>(
865 self,
866 other: &Process<'a, L2>,
867 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
868 where
869 T: Serialize + DeserializeOwned,
870 {
871 self.send(other, TCP.fail_stop().bincode())
872 }
873
874 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
875 /// using the configuration in `via` to set up the message transport.
876 ///
877 /// Each cluster member sends its local stream elements, and they are collected at the destination
878 /// as a [`KeyedStream`] where keys identify the source cluster member.
879 ///
880 /// # Example
881 /// ```rust
882 /// # #[cfg(feature = "deploy")] {
883 /// # use hydro_lang::prelude::*;
884 /// # use futures::StreamExt;
885 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
886 /// let workers: Cluster<()> = flow.cluster::<()>();
887 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
888 /// let all_received = numbers.send(&process, TCP.fail_stop().bincode()); // KeyedStream<MemberId<()>, i32, ...>
889 /// # all_received.entries()
890 /// # }, |mut stream| async move {
891 /// // if there are 4 members in the cluster, we should receive 4 elements
892 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
893 /// # let mut results = Vec::new();
894 /// # for w in 0..4 {
895 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
896 /// # }
897 /// # results.sort();
898 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
899 /// # }));
900 /// # }
901 /// ```
902 ///
903 /// If you don't need to know the source for each element, you can use `.values()`
904 /// to get just the data:
905 /// ```rust
906 /// # #[cfg(feature = "deploy")] {
907 /// # use hydro_lang::prelude::*;
908 /// # use hydro_lang::live_collections::stream::NoOrder;
909 /// # use futures::StreamExt;
910 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
911 /// # let workers: Cluster<()> = flow.cluster::<()>();
912 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
913 /// let values: Stream<i32, _, _, NoOrder> =
914 /// numbers.send(&process, TCP.fail_stop().bincode()).values();
915 /// # values
916 /// # }, |mut stream| async move {
917 /// # let mut results = Vec::new();
918 /// # for w in 0..4 {
919 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
920 /// # }
921 /// # results.sort();
922 /// // if there are 4 members in the cluster, we should receive 4 elements
923 /// // 1, 1, 1, 1
924 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
925 /// # }));
926 /// # }
927 /// ```
928 #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
929 pub fn send<L2, N: NetworkFor<T>>(
930 self,
931 to: &Process<'a, L2>,
932 via: N,
933 ) -> KeyedStream<
934 MemberId<L>,
935 T,
936 Process<'a, L2>,
937 Unbounded,
938 <O as MinOrder<N::OrderingGuarantee>>::Min,
939 R,
940 >
941 where
942 T: Serialize + DeserializeOwned,
943 O: MinOrder<N::OrderingGuarantee>,
944 {
945 let serialize_pipeline = Some(N::serialize_thunk(false));
946
947 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
948
949 let name = via.name();
950 if to.multiversioned() && name.is_none() {
951 panic!(
952 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
953 );
954 }
955
956 let raw_stream: Stream<
957 (MemberId<L>, T),
958 Process<'a, L2>,
959 Unbounded,
960 <O as MinOrder<N::OrderingGuarantee>>::Min,
961 R,
962 > = Stream::new(
963 to.clone(),
964 HydroNode::Network {
965 name: name.map(ToOwned::to_owned),
966 networking_info: N::networking_info(),
967 serialize_fn: serialize_pipeline.map(|e| e.into()),
968 instantiate_fn: DebugInstantiate::Building,
969 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
970 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
971 metadata: to.new_node_metadata(Stream::<
972 (MemberId<L>, T),
973 Process<'a, L2>,
974 Unbounded,
975 <O as MinOrder<N::OrderingGuarantee>>::Min,
976 R,
977 >::collection_kind()),
978 },
979 );
980
981 raw_stream.into_keyed()
982 }
983
984 #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
985 /// Broadcasts elements of this stream at each source member to all members of a destination
986 /// cluster, using [`bincode`] to serialize/deserialize messages.
987 ///
988 /// Each source member sends each of its stream elements to **every** member of the cluster
989 /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
990 /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
991 /// **only data elements** and sends each element to all cluster members.
992 ///
993 /// # Non-Determinism
994 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
995 /// to the current cluster members known _at that point in time_ at the source member. Depending
996 /// on when each source member is notified of membership changes, it will broadcast each element
997 /// to different members.
998 ///
999 /// # Example
1000 /// ```rust
1001 /// # #[cfg(feature = "deploy")] {
1002 /// # use hydro_lang::prelude::*;
1003 /// # use hydro_lang::location::MemberId;
1004 /// # use futures::StreamExt;
1005 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1006 /// # type Source = ();
1007 /// # type Destination = ();
1008 /// let source: Cluster<Source> = flow.cluster::<Source>();
1009 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1010 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1011 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
1012 /// # on_destination.entries().send_bincode(&p2).entries()
1013 /// // if there are 4 members in the desination, each receives one element from each source member
1014 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1015 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1016 /// // - ...
1017 /// # }, |mut stream| async move {
1018 /// # let mut results = Vec::new();
1019 /// # for w in 0..16 {
1020 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1021 /// # }
1022 /// # results.sort();
1023 /// # assert_eq!(results, vec![
1024 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1025 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1026 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1027 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1028 /// # ]);
1029 /// # }));
1030 /// # }
1031 /// ```
1032 pub fn broadcast_bincode<L2: 'a>(
1033 self,
1034 other: &Cluster<'a, L2>,
1035 nondet_membership: NonDet,
1036 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1037 where
1038 T: Clone + Serialize + DeserializeOwned,
1039 {
1040 self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
1041 }
1042
1043 /// Broadcasts elements of this stream at each source member to all members of a destination
1044 /// cluster, using the configuration in `via` to set up the message transport.
1045 ///
1046 /// Each source member sends each of its stream elements to **every** member of the cluster
1047 /// based on its latest membership information. Unlike [`Stream::demux`], which requires
1048 /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
1049 /// **only data elements** and sends each element to all cluster members.
1050 ///
1051 /// # Non-Determinism
1052 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1053 /// to the current cluster members known _at that point in time_ at the source member. Depending
1054 /// on when each source member is notified of membership changes, it will broadcast each element
1055 /// to different members.
1056 ///
1057 /// # Example
1058 /// ```rust
1059 /// # #[cfg(feature = "deploy")] {
1060 /// # use hydro_lang::prelude::*;
1061 /// # use hydro_lang::location::MemberId;
1062 /// # use futures::StreamExt;
1063 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1064 /// # type Source = ();
1065 /// # type Destination = ();
1066 /// let source: Cluster<Source> = flow.cluster::<Source>();
1067 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1068 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1069 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
1070 /// # on_destination.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1071 /// // if there are 4 members in the desination, each receives one element from each source member
1072 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1073 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1074 /// // - ...
1075 /// # }, |mut stream| async move {
1076 /// # let mut results = Vec::new();
1077 /// # for w in 0..16 {
1078 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1079 /// # }
1080 /// # results.sort();
1081 /// # assert_eq!(results, vec![
1082 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1083 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1084 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1085 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1086 /// # ]);
1087 /// # }));
1088 /// # }
1089 /// ```
1090 #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1091 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1092 self,
1093 to: &Cluster<'a, L2>,
1094 via: N,
1095 nondet_membership: NonDet,
1096 ) -> KeyedStream<
1097 MemberId<L>,
1098 T,
1099 Cluster<'a, L2>,
1100 Unbounded,
1101 <O as MinOrder<N::OrderingGuarantee>>::Min,
1102 R,
1103 >
1104 where
1105 T: Clone + Serialize + DeserializeOwned,
1106 O: MinOrder<N::OrderingGuarantee>,
1107 {
1108 let ids = track_membership(self.location.source_cluster_membership_stream(
1109 to,
1110 nondet!(/** droppped prefixes don't affect broadcast */),
1111 ));
1112 sliced! {
1113 let members_snapshot = use(ids, nondet_membership);
1114 let elements = use(self, nondet_membership);
1115
1116 let current_members = members_snapshot.filter(q!(|b| *b));
1117 elements.repeat_with_keys(current_members)
1118 }
1119 .demux(to, via)
1120 }
1121
1122 #[cfg(feature = "sim")]
1123 /// Sends elements of this cluster stream to an external location using bincode serialization.
1124 fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
1125 where
1126 T: Serialize + DeserializeOwned,
1127 {
1128 let serialize_pipeline = Some(serialize_bincode::<T>(false));
1129
1130 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
1131
1132 let external_port_id = flow_state_borrow.next_external_port();
1133
1134 flow_state_borrow.push_root(HydroRoot::SendExternal {
1135 to_external_key: other.key,
1136 to_port_id: external_port_id,
1137 to_many: false,
1138 unpaired: true,
1139 serialize_fn: serialize_pipeline.map(|e| e.into()),
1140 instantiate_fn: DebugInstantiate::Building,
1141 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1142 op_metadata: HydroIrOpMetadata::new(),
1143 });
1144
1145 ExternalBincodeStream {
1146 process_key: other.key,
1147 port_id: external_port_id,
1148 _phantom: PhantomData,
1149 }
1150 }
1151
1152 #[cfg(feature = "sim")]
1153 /// Sets up a simulation output port for this cluster stream, allowing test code
1154 /// to receive `(member_id, T)` pairs during simulation.
1155 pub fn sim_cluster_output(self) -> crate::sim::SimClusterReceiver<T, O, R>
1156 where
1157 T: Serialize + DeserializeOwned,
1158 {
1159 let external_location: External<'a, ()> = External {
1160 key: LocationKey::FIRST,
1161 flow_state: self.location.flow_state().clone(),
1162 _phantom: PhantomData,
1163 };
1164
1165 let external = self.send_bincode_external(&external_location);
1166
1167 crate::sim::SimClusterReceiver(external.port_id, PhantomData)
1168 }
1169}
1170
1171impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
1172 Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
1173{
1174 #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
1175 /// Sends elements of this stream at each source member to specific members of a destination
1176 /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1177 ///
1178 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1179 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1180 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1181 /// all members.
1182 ///
1183 /// Each cluster member sends its local stream elements, and they are collected at each
1184 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1185 ///
1186 /// # Example
1187 /// ```rust
1188 /// # #[cfg(feature = "deploy")] {
1189 /// # use hydro_lang::prelude::*;
1190 /// # use futures::StreamExt;
1191 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1192 /// # type Source = ();
1193 /// # type Destination = ();
1194 /// let source: Cluster<Source> = flow.cluster::<Source>();
1195 /// let to_send: Stream<_, Cluster<_>, _> = source
1196 /// .source_iter(q!(vec![0, 1, 2, 3]))
1197 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1198 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1199 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1200 /// # all_received.entries().send_bincode(&p2).entries()
1201 /// # }, |mut stream| async move {
1202 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1203 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1204 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1205 /// // - ...
1206 /// # let mut results = Vec::new();
1207 /// # for w in 0..16 {
1208 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1209 /// # }
1210 /// # results.sort();
1211 /// # assert_eq!(results, vec![
1212 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1213 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1214 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1215 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1216 /// # ]);
1217 /// # }));
1218 /// # }
1219 /// ```
1220 pub fn demux_bincode(
1221 self,
1222 other: &Cluster<'a, L2>,
1223 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1224 where
1225 T: Serialize + DeserializeOwned,
1226 {
1227 self.demux(other, TCP.fail_stop().bincode())
1228 }
1229
1230 /// Sends elements of this stream at each source member to specific members of a destination
1231 /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1232 /// message transport.
1233 ///
1234 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1235 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1236 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1237 /// all members.
1238 ///
1239 /// Each cluster member sends its local stream elements, and they are collected at each
1240 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1241 ///
1242 /// # Example
1243 /// ```rust
1244 /// # #[cfg(feature = "deploy")] {
1245 /// # use hydro_lang::prelude::*;
1246 /// # use futures::StreamExt;
1247 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1248 /// # type Source = ();
1249 /// # type Destination = ();
1250 /// let source: Cluster<Source> = flow.cluster::<Source>();
1251 /// let to_send: Stream<_, Cluster<_>, _> = source
1252 /// .source_iter(q!(vec![0, 1, 2, 3]))
1253 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1254 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1255 /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1256 /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1257 /// # }, |mut stream| async move {
1258 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1259 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1260 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1261 /// // - ...
1262 /// # let mut results = Vec::new();
1263 /// # for w in 0..16 {
1264 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1265 /// # }
1266 /// # results.sort();
1267 /// # assert_eq!(results, vec![
1268 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1269 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1270 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1271 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1272 /// # ]);
1273 /// # }));
1274 /// # }
1275 /// ```
1276 #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1277 pub fn demux<N: NetworkFor<T>>(
1278 self,
1279 to: &Cluster<'a, L2>,
1280 via: N,
1281 ) -> KeyedStream<
1282 MemberId<L>,
1283 T,
1284 Cluster<'a, L2, NoConsistency>,
1285 Unbounded,
1286 <O as MinOrder<N::OrderingGuarantee>>::Min,
1287 R,
1288 >
1289 where
1290 T: Serialize + DeserializeOwned,
1291 O: MinOrder<N::OrderingGuarantee>,
1292 {
1293 self.into_keyed().demux(to, via)
1294 }
1295}
1296
1297#[cfg(test)]
1298mod tests {
1299 #[cfg(feature = "sim")]
1300 use stageleft::q;
1301
1302 #[cfg(feature = "sim")]
1303 use crate::live_collections::sliced::sliced;
1304 #[cfg(feature = "sim")]
1305 use crate::location::{Location, MemberId};
1306 #[cfg(feature = "sim")]
1307 use crate::networking::TCP;
1308 #[cfg(feature = "sim")]
1309 use crate::nondet::nondet;
1310 #[cfg(feature = "sim")]
1311 use crate::prelude::FlowBuilder;
1312
1313 #[cfg(feature = "sim")]
1314 #[test]
1315 fn sim_send_bincode_o2o() {
1316 use crate::networking::TCP;
1317
1318 let mut flow = FlowBuilder::new();
1319 let node = flow.process::<()>();
1320 let node2 = flow.process::<()>();
1321
1322 let (in_send, input) = node.sim_input();
1323
1324 let out_recv = input
1325 .send(&node2, TCP.fail_stop().bincode())
1326 .batch(&node2.tick(), nondet!(/** test */))
1327 .count()
1328 .all_ticks()
1329 .sim_output();
1330
1331 let instances = flow.sim().exhaustive(async || {
1332 in_send.send(());
1333 in_send.send(());
1334 in_send.send(());
1335
1336 let received = out_recv.collect::<Vec<_>>().await;
1337 assert!(received.into_iter().sum::<usize>() == 3);
1338 });
1339
1340 assert_eq!(instances, 4); // 2^{3 - 1}
1341 }
1342
1343 #[cfg(feature = "sim")]
1344 #[test]
1345 fn sim_send_bincode_m2o() {
1346 let mut flow = FlowBuilder::new();
1347 let cluster = flow.cluster::<()>();
1348 let node = flow.process::<()>();
1349
1350 let input = cluster.source_iter(q!(vec![1]));
1351
1352 let out_recv = input
1353 .send(&node, TCP.fail_stop().bincode())
1354 .entries()
1355 .batch(&node.tick(), nondet!(/** test */))
1356 .all_ticks()
1357 .sim_output();
1358
1359 let instances = flow
1360 .sim()
1361 .with_cluster_size(&cluster, 4)
1362 .exhaustive(async || {
1363 out_recv
1364 .assert_yields_only_unordered(vec![
1365 (MemberId::from_raw_id(0), 1),
1366 (MemberId::from_raw_id(1), 1),
1367 (MemberId::from_raw_id(2), 1),
1368 (MemberId::from_raw_id(3), 1),
1369 ])
1370 .await
1371 });
1372
1373 assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1374 }
1375
1376 #[cfg(feature = "sim")]
1377 #[test]
1378 fn sim_send_bincode_multiple_m2o() {
1379 let mut flow = FlowBuilder::new();
1380 let cluster1 = flow.cluster::<()>();
1381 let cluster2 = flow.cluster::<()>();
1382 let node = flow.process::<()>();
1383
1384 let out_recv_1 = cluster1
1385 .source_iter(q!(vec![1]))
1386 .send(&node, TCP.fail_stop().bincode())
1387 .entries()
1388 .sim_output();
1389
1390 let out_recv_2 = cluster2
1391 .source_iter(q!(vec![2]))
1392 .send(&node, TCP.fail_stop().bincode())
1393 .entries()
1394 .sim_output();
1395
1396 let instances = flow
1397 .sim()
1398 .with_cluster_size(&cluster1, 3)
1399 .with_cluster_size(&cluster2, 4)
1400 .exhaustive(async || {
1401 out_recv_1
1402 .assert_yields_only_unordered(vec![
1403 (MemberId::from_raw_id(0), 1),
1404 (MemberId::from_raw_id(1), 1),
1405 (MemberId::from_raw_id(2), 1),
1406 ])
1407 .await;
1408
1409 out_recv_2
1410 .assert_yields_only_unordered(vec![
1411 (MemberId::from_raw_id(0), 2),
1412 (MemberId::from_raw_id(1), 2),
1413 (MemberId::from_raw_id(2), 2),
1414 (MemberId::from_raw_id(3), 2),
1415 ])
1416 .await;
1417 });
1418
1419 assert_eq!(instances, 1);
1420 }
1421
1422 #[cfg(feature = "sim")]
1423 #[test]
1424 fn sim_send_bincode_o2m() {
1425 let mut flow = FlowBuilder::new();
1426 let cluster = flow.cluster::<()>();
1427 let node = flow.process::<()>();
1428
1429 let input = node.source_iter(q!(vec![
1430 (MemberId::from_raw_id(0), 123),
1431 (MemberId::from_raw_id(1), 456),
1432 ]));
1433
1434 let out_recv = input
1435 .demux(&cluster, TCP.fail_stop().bincode())
1436 .map(q!(|x| x + 1))
1437 .send(&node, TCP.fail_stop().bincode())
1438 .entries()
1439 .sim_output();
1440
1441 flow.sim()
1442 .with_cluster_size(&cluster, 4)
1443 .exhaustive(async || {
1444 out_recv
1445 .assert_yields_only_unordered(vec![
1446 (MemberId::from_raw_id(0), 124),
1447 (MemberId::from_raw_id(1), 457),
1448 ])
1449 .await
1450 });
1451 }
1452
1453 #[cfg(feature = "sim")]
1454 #[test]
1455 fn sim_broadcast_bincode_o2m() {
1456 let mut flow = FlowBuilder::new();
1457 let cluster = flow.cluster::<()>();
1458 let node = flow.process::<()>();
1459
1460 let input = node.source_iter(q!(vec![123, 456]));
1461
1462 let out_recv = input
1463 .broadcast(&cluster, TCP.fail_stop().bincode(), nondet!(/** test */))
1464 .map(q!(|x| x + 1))
1465 .send(&node, TCP.fail_stop().bincode())
1466 .entries()
1467 .sim_output();
1468
1469 let mut c_1_produced = false;
1470 let mut c_2_produced = false;
1471
1472 flow.sim()
1473 .with_cluster_size(&cluster, 2)
1474 .exhaustive(async || {
1475 let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1476
1477 // check that order is preserved
1478 if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1479 assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1480 c_1_produced = true;
1481 }
1482
1483 if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1484 assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1485 c_2_produced = true;
1486 }
1487 });
1488
1489 assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1490 }
1491
1492 #[cfg(feature = "sim")]
1493 #[test]
1494 fn sim_send_bincode_m2m() {
1495 let mut flow = FlowBuilder::new();
1496 let cluster = flow.cluster::<()>();
1497 let node = flow.process::<()>();
1498
1499 let input = node.source_iter(q!(vec![
1500 (MemberId::from_raw_id(0), 123),
1501 (MemberId::from_raw_id(1), 456),
1502 ]));
1503
1504 let out_recv = input
1505 .demux(&cluster, TCP.fail_stop().bincode())
1506 .map(q!(|x| x + 1))
1507 .flat_map_ordered(q!(|x| vec![
1508 (MemberId::from_raw_id(0), x),
1509 (MemberId::from_raw_id(1), x),
1510 ]))
1511 .demux(&cluster, TCP.fail_stop().bincode())
1512 .entries()
1513 .send(&node, TCP.fail_stop().bincode())
1514 .entries()
1515 .sim_output();
1516
1517 flow.sim()
1518 .with_cluster_size(&cluster, 4)
1519 .exhaustive(async || {
1520 out_recv
1521 .assert_yields_only_unordered(vec![
1522 (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1523 (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1524 (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1525 (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1526 ])
1527 .await
1528 });
1529 }
1530
1531 #[cfg(feature = "sim")]
1532 #[test]
1533 fn sim_lossy_delayed_forever_o2o() {
1534 use std::collections::HashSet;
1535
1536 use crate::properties::manual_proof;
1537
1538 let mut flow = FlowBuilder::new();
1539 let node = flow.process::<()>();
1540 let node2 = flow.process::<()>();
1541
1542 let received = node
1543 .source_iter(q!(0..3_u32))
1544 .send(&node2, TCP.lossy_delayed_forever().bincode())
1545 .fold(
1546 q!(|| std::collections::HashSet::<u32>::new()),
1547 q!(
1548 |set, v| {
1549 set.insert(v);
1550 },
1551 commutative = manual_proof!(/** set insert is commutative */)
1552 ),
1553 );
1554
1555 let out_recv = sliced! {
1556 let snapshot = use(received, nondet!(/** test */));
1557 snapshot.into_stream()
1558 }
1559 .sim_output();
1560
1561 let mut saw_non_contiguous = false;
1562
1563 flow.sim().test_safety_only().exhaustive(async || {
1564 let snapshots = out_recv.collect::<Vec<HashSet<u32>>>().await;
1565
1566 // Check each individual snapshot for a non-contiguous subset.
1567 for set in &snapshots {
1568 #[expect(clippy::disallowed_methods, reason = "min / max are deterministic")]
1569 if set.len() >= 2 && set.len() < 3 {
1570 let min = *set.iter().min().unwrap();
1571 let max = *set.iter().max().unwrap();
1572 if set.len() < (max - min + 1) as usize {
1573 saw_non_contiguous = true;
1574 }
1575 }
1576 }
1577 });
1578
1579 assert!(
1580 saw_non_contiguous,
1581 "Expected at least one execution with a non-contiguous subset of inputs"
1582 );
1583 }
1584}