hydro_lang/live_collections/keyed_stream/networking.rs
1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::{MinOrder, Ordering, Retries, Stream};
11use crate::location::cluster::NoConsistency;
12#[cfg(stageleft_runtime)]
13use crate::location::dynamic::DynLocation;
14use crate::location::{Cluster, MemberId, Process};
15use crate::networking::{NetworkFor, TCP};
16
17impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
18 KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
19{
20 #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
21 /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
22 /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
23 ///
24 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
25 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
26 /// API allows precise targeting of specific cluster members rather than broadcasting to
27 /// all members.
28 ///
29 /// # Example
30 /// ```rust
31 /// # #[cfg(feature = "deploy")] {
32 /// # use hydro_lang::prelude::*;
33 /// # use futures::StreamExt;
34 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
35 /// let p1 = flow.process::<()>();
36 /// let workers: Cluster<()> = flow.cluster::<()>();
37 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
38 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
39 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
40 /// .into_keyed()
41 /// .demux_bincode(&workers);
42 /// # on_worker.send_bincode(&p2).entries()
43 /// // if there are 4 members in the cluster, each receives one element
44 /// // - MemberId::<()>(0): [0]
45 /// // - MemberId::<()>(1): [1]
46 /// // - MemberId::<()>(2): [2]
47 /// // - MemberId::<()>(3): [3]
48 /// # }, |mut stream| async move {
49 /// # let mut results = Vec::new();
50 /// # for w in 0..4 {
51 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
52 /// # }
53 /// # results.sort();
54 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
55 /// # }));
56 /// # }
57 /// ```
58 pub fn demux_bincode(
59 self,
60 other: &Cluster<'a, L2>,
61 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
62 where
63 T: Serialize + DeserializeOwned,
64 {
65 self.demux(other, TCP.fail_stop().bincode())
66 }
67
68 /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
69 /// identifying the recipient for each group and using the configuration in `via` to set up the
70 /// message transport.
71 ///
72 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
73 /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
74 /// API allows precise targeting of specific cluster members rather than broadcasting to
75 /// all members.
76 ///
77 /// # Example
78 /// ```rust
79 /// # #[cfg(feature = "deploy")] {
80 /// # use hydro_lang::prelude::*;
81 /// # use futures::StreamExt;
82 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
83 /// let p1 = flow.process::<()>();
84 /// let workers: Cluster<()> = flow.cluster::<()>();
85 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
86 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
87 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
88 /// .into_keyed()
89 /// .demux(&workers, TCP.fail_stop().bincode());
90 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
91 /// // if there are 4 members in the cluster, each receives one element
92 /// // - MemberId::<()>(0): [0]
93 /// // - MemberId::<()>(1): [1]
94 /// // - MemberId::<()>(2): [2]
95 /// // - MemberId::<()>(3): [3]
96 /// # }, |mut stream| async move {
97 /// # let mut results = Vec::new();
98 /// # for w in 0..4 {
99 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
100 /// # }
101 /// # results.sort();
102 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
103 /// # }));
104 /// # }
105 /// ```
106 #[expect(clippy::type_complexity, reason = "NoConsistency type")]
107 pub fn demux<N: NetworkFor<T>>(
108 self,
109 to: &Cluster<'a, L2>,
110 via: N,
111 ) -> Stream<
112 T,
113 Cluster<'a, L2, NoConsistency>,
114 Unbounded,
115 <O as MinOrder<N::OrderingGuarantee>>::Min,
116 R,
117 >
118 where
119 T: Serialize + DeserializeOwned,
120 O: MinOrder<N::OrderingGuarantee>,
121 {
122 let serialize_pipeline = Some(N::serialize_thunk(true));
123
124 let deserialize_pipeline = Some(N::deserialize_thunk(None));
125
126 let name = via.name();
127 if to.multiversioned() && name.is_none() {
128 panic!(
129 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
130 );
131 }
132
133 Stream::new(
134 to.clone(),
135 HydroNode::Network {
136 name: name.map(ToOwned::to_owned),
137 networking_info: N::networking_info(),
138 serialize_fn: serialize_pipeline.map(|e| e.into()),
139 instantiate_fn: DebugInstantiate::Building,
140 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
141 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
142 metadata: to.new_node_metadata(Stream::<
143 T,
144 Cluster<'a, L2>,
145 Unbounded,
146 <O as MinOrder<N::OrderingGuarantee>>::Min,
147 R,
148 >::collection_kind()),
149 },
150 )
151 }
152}
153
154impl<'a, K, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
155 KeyedStream<(MemberId<L2>, K), T, Process<'a, L>, B, O, R>
156{
157 #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
158 /// Sends each group of this stream to a specific member of a cluster. The input stream has a
159 /// compound key where the first element is the recipient's [`MemberId`] and the second element
160 /// is a key that will be sent along with the value, using [`bincode`] to serialize/deserialize
161 /// messages.
162 ///
163 /// # Example
164 /// ```rust
165 /// # #[cfg(feature = "deploy")] {
166 /// # use hydro_lang::prelude::*;
167 /// # use futures::StreamExt;
168 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
169 /// let p1 = flow.process::<()>();
170 /// let workers: Cluster<()> = flow.cluster::<()>();
171 /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
172 /// .source_iter(q!(vec![0, 1, 2, 3]))
173 /// .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
174 /// .into_keyed();
175 /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux_bincode(&workers);
176 /// # on_worker.entries().send_bincode(&p2).entries()
177 /// // if there are 4 members in the cluster, each receives one element
178 /// // - MemberId::<()>(0): { 0: [123] }
179 /// // - MemberId::<()>(1): { 1: [124] }
180 /// // - ...
181 /// # }, |mut stream| async move {
182 /// # let mut results = Vec::new();
183 /// # for w in 0..4 {
184 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
185 /// # }
186 /// # results.sort();
187 /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
188 /// # }));
189 /// # }
190 /// ```
191 pub fn demux_bincode(
192 self,
193 other: &Cluster<'a, L2>,
194 ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
195 where
196 K: Serialize + DeserializeOwned,
197 T: Serialize + DeserializeOwned,
198 {
199 self.demux(other, TCP.fail_stop().bincode())
200 }
201
202 /// Sends each group of this stream to a specific member of a cluster. The input stream has a
203 /// compound key where the first element is the recipient's [`MemberId`] and the second element
204 /// is a key that will be sent along with the value, using the configuration in `via` to set up
205 /// the message transport.
206 ///
207 /// # Example
208 /// ```rust
209 /// # #[cfg(feature = "deploy")] {
210 /// # use hydro_lang::prelude::*;
211 /// # use futures::StreamExt;
212 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
213 /// let p1 = flow.process::<()>();
214 /// let workers: Cluster<()> = flow.cluster::<()>();
215 /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
216 /// .source_iter(q!(vec![0, 1, 2, 3]))
217 /// .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
218 /// .into_keyed();
219 /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux(&workers, TCP.fail_stop().bincode());
220 /// # on_worker.entries().send(&p2, TCP.fail_stop().bincode()).entries()
221 /// // if there are 4 members in the cluster, each receives one element
222 /// // - MemberId::<()>(0): { 0: [123] }
223 /// // - MemberId::<()>(1): { 1: [124] }
224 /// // - ...
225 /// # }, |mut stream| async move {
226 /// # let mut results = Vec::new();
227 /// # for w in 0..4 {
228 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
229 /// # }
230 /// # results.sort();
231 /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
232 /// # }));
233 /// # }
234 /// ```
235 #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
236 pub fn demux<N: NetworkFor<(K, T)>>(
237 self,
238 to: &Cluster<'a, L2>,
239 via: N,
240 ) -> KeyedStream<
241 K,
242 T,
243 Cluster<'a, L2, NoConsistency>,
244 Unbounded,
245 <O as MinOrder<N::OrderingGuarantee>>::Min,
246 R,
247 >
248 where
249 K: Serialize + DeserializeOwned,
250 T: Serialize + DeserializeOwned,
251 O: MinOrder<N::OrderingGuarantee>,
252 {
253 let serialize_pipeline = Some(N::serialize_thunk(true));
254
255 let deserialize_pipeline = Some(N::deserialize_thunk(None));
256
257 let name = via.name();
258 if to.multiversioned() && name.is_none() {
259 panic!(
260 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
261 );
262 }
263
264 KeyedStream::new(
265 to.clone(),
266 HydroNode::Network {
267 name: name.map(ToOwned::to_owned),
268 networking_info: N::networking_info(),
269 serialize_fn: serialize_pipeline.map(|e| e.into()),
270 instantiate_fn: DebugInstantiate::Building,
271 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
272 input: Box::new(
273 self.entries()
274 .map(q!(|((id, k), v)| (id, (k, v))))
275 .ir_node
276 .replace(HydroNode::Placeholder),
277 ),
278 metadata: to.new_node_metadata(KeyedStream::<
279 K,
280 T,
281 Cluster<'a, L2>,
282 Unbounded,
283 <O as MinOrder<N::OrderingGuarantee>>::Min,
284 R,
285 >::collection_kind()),
286 },
287 )
288 }
289}
290
291impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
292 KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
293{
294 #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
295 /// Sends each group of this stream at each source member to a specific member of a destination
296 /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
297 /// [`bincode`] to serialize/deserialize messages.
298 ///
299 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
300 /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
301 /// API allows precise targeting of specific cluster members rather than broadcasting to all
302 /// members.
303 ///
304 /// Each cluster member sends its local stream elements, and they are collected at each
305 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
306 ///
307 /// # Example
308 /// ```rust
309 /// # #[cfg(feature = "deploy")] {
310 /// # use hydro_lang::prelude::*;
311 /// # use futures::StreamExt;
312 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
313 /// # type Source = ();
314 /// # type Destination = ();
315 /// let source: Cluster<Source> = flow.cluster::<Source>();
316 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
317 /// .source_iter(q!(vec![0, 1, 2, 3]))
318 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
319 /// .into_keyed();
320 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
321 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
322 /// # all_received.entries().send_bincode(&p2).entries()
323 /// # }, |mut stream| async move {
324 /// // if there are 4 members in the destination cluster, each receives one message from each source member
325 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
326 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
327 /// // - ...
328 /// # let mut results = Vec::new();
329 /// # for w in 0..16 {
330 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
331 /// # }
332 /// # results.sort();
333 /// # assert_eq!(results, vec![
334 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
335 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
336 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
337 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
338 /// # ]);
339 /// # }));
340 /// # }
341 /// ```
342 pub fn demux_bincode(
343 self,
344 other: &Cluster<'a, L2>,
345 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
346 where
347 T: Serialize + DeserializeOwned,
348 {
349 self.demux(other, TCP.fail_stop().bincode())
350 }
351
352 /// Sends each group of this stream at each source member to a specific member of a destination
353 /// cluster, with the [`MemberId`] key identifying the recipient for each group and using the
354 /// configuration in `via` to set up the message transport.
355 ///
356 /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
357 /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
358 /// API allows precise targeting of specific cluster members rather than broadcasting to all
359 /// members.
360 ///
361 /// Each cluster member sends its local stream elements, and they are collected at each
362 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
363 ///
364 /// # Example
365 /// ```rust
366 /// # #[cfg(feature = "deploy")] {
367 /// # use hydro_lang::prelude::*;
368 /// # use futures::StreamExt;
369 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
370 /// # type Source = ();
371 /// # type Destination = ();
372 /// let source: Cluster<Source> = flow.cluster::<Source>();
373 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
374 /// .source_iter(q!(vec![0, 1, 2, 3]))
375 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
376 /// .into_keyed();
377 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
378 /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
379 /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
380 /// # }, |mut stream| async move {
381 /// // if there are 4 members in the destination cluster, each receives one message from each source member
382 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
383 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
384 /// // - ...
385 /// # let mut results = Vec::new();
386 /// # for w in 0..16 {
387 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
388 /// # }
389 /// # results.sort();
390 /// # assert_eq!(results, vec![
391 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
392 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
393 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
394 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
395 /// # ]);
396 /// # }));
397 /// # }
398 /// ```
399 #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
400 pub fn demux<N: NetworkFor<T>>(
401 self,
402 to: &Cluster<'a, L2>,
403 via: N,
404 ) -> KeyedStream<
405 MemberId<L>,
406 T,
407 Cluster<'a, L2, NoConsistency>,
408 Unbounded,
409 <O as MinOrder<N::OrderingGuarantee>>::Min,
410 R,
411 >
412 where
413 T: Serialize + DeserializeOwned,
414 O: MinOrder<N::OrderingGuarantee>,
415 {
416 let serialize_pipeline = Some(N::serialize_thunk(true));
417
418 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
419
420 let name = via.name();
421 if to.multiversioned() && name.is_none() {
422 panic!(
423 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
424 );
425 }
426
427 KeyedStream::new(
428 to.clone(),
429 HydroNode::Network {
430 name: name.map(ToOwned::to_owned),
431 networking_info: N::networking_info(),
432 serialize_fn: serialize_pipeline.map(|e| e.into()),
433 instantiate_fn: DebugInstantiate::Building,
434 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
435 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
436 metadata: to.new_node_metadata(KeyedStream::<
437 MemberId<L>,
438 T,
439 Cluster<'a, L2>,
440 Unbounded,
441 <O as MinOrder<N::OrderingGuarantee>>::Min,
442 R,
443 >::collection_kind()),
444 },
445 )
446 }
447}
448
449impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries>
450 KeyedStream<K, V, Cluster<'a, L>, B, O, R>
451{
452 #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
453 #[deprecated = "use KeyedStream::send(..., TCP.fail_stop().bincode()) instead"]
454 /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
455 /// network, using [`bincode`] to serialize/deserialize messages. The resulting [`KeyedStream`]
456 /// has a compound key where the first element is the sender's [`MemberId`] and the second
457 /// element is the original key.
458 ///
459 /// # Example
460 /// ```rust
461 /// # #[cfg(feature = "deploy")] {
462 /// # use hydro_lang::prelude::*;
463 /// # use futures::StreamExt;
464 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
465 /// # type Source = ();
466 /// # type Destination = ();
467 /// let source: Cluster<Source> = flow.cluster::<Source>();
468 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
469 /// .source_iter(q!(vec![0, 1, 2, 3]))
470 /// .map(q!(|x| (x, x + 123)))
471 /// .into_keyed();
472 /// let destination_process = flow.process::<Destination>();
473 /// let all_received = to_send.send_bincode(&destination_process); // KeyedStream<(MemberId<Source>, i32), i32, ...>
474 /// # all_received.entries().send_bincode(&p2)
475 /// # }, |mut stream| async move {
476 /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
477 /// // {
478 /// // (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
479 /// // (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
480 /// // ...
481 /// // }
482 /// # let mut results = Vec::new();
483 /// # for w in 0..16 {
484 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
485 /// # }
486 /// # results.sort();
487 /// # assert_eq!(results, vec![
488 /// # "((MemberId::<()>(0), 0), 123)",
489 /// # "((MemberId::<()>(0), 1), 124)",
490 /// # "((MemberId::<()>(0), 2), 125)",
491 /// # "((MemberId::<()>(0), 3), 126)",
492 /// # "((MemberId::<()>(1), 0), 123)",
493 /// # "((MemberId::<()>(1), 1), 124)",
494 /// # "((MemberId::<()>(1), 2), 125)",
495 /// # "((MemberId::<()>(1), 3), 126)",
496 /// # "((MemberId::<()>(2), 0), 123)",
497 /// # "((MemberId::<()>(2), 1), 124)",
498 /// # "((MemberId::<()>(2), 2), 125)",
499 /// # "((MemberId::<()>(2), 3), 126)",
500 /// # "((MemberId::<()>(3), 0), 123)",
501 /// # "((MemberId::<()>(3), 1), 124)",
502 /// # "((MemberId::<()>(3), 2), 125)",
503 /// # "((MemberId::<()>(3), 3), 126)",
504 /// # ]);
505 /// # }));
506 /// # }
507 /// ```
508 pub fn send_bincode<L2>(
509 self,
510 other: &Process<'a, L2>,
511 ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
512 where
513 K: Serialize + DeserializeOwned,
514 V: Serialize + DeserializeOwned,
515 {
516 self.send(other, TCP.fail_stop().bincode())
517 }
518
519 #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
520 /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
521 /// network, using the configuration in `via` to set up the message transport. The resulting
522 /// [`KeyedStream`] has a compound key where the first element is the sender's [`MemberId`] and
523 /// the second element is the original key.
524 ///
525 /// # Example
526 /// ```rust
527 /// # #[cfg(feature = "deploy")] {
528 /// # use hydro_lang::prelude::*;
529 /// # use futures::StreamExt;
530 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
531 /// # type Source = ();
532 /// # type Destination = ();
533 /// let source: Cluster<Source> = flow.cluster::<Source>();
534 /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
535 /// .source_iter(q!(vec![0, 1, 2, 3]))
536 /// .map(q!(|x| (x, x + 123)))
537 /// .into_keyed();
538 /// let destination_process = flow.process::<Destination>();
539 /// let all_received = to_send.send(&destination_process, TCP.fail_stop().bincode()); // KeyedStream<(MemberId<Source>, i32), i32, ...>
540 /// # all_received.entries().send(&p2, TCP.fail_stop().bincode())
541 /// # }, |mut stream| async move {
542 /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
543 /// // {
544 /// // (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
545 /// // (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
546 /// // ...
547 /// // }
548 /// # let mut results = Vec::new();
549 /// # for w in 0..16 {
550 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
551 /// # }
552 /// # results.sort();
553 /// # assert_eq!(results, vec![
554 /// # "((MemberId::<()>(0), 0), 123)",
555 /// # "((MemberId::<()>(0), 1), 124)",
556 /// # "((MemberId::<()>(0), 2), 125)",
557 /// # "((MemberId::<()>(0), 3), 126)",
558 /// # "((MemberId::<()>(1), 0), 123)",
559 /// # "((MemberId::<()>(1), 1), 124)",
560 /// # "((MemberId::<()>(1), 2), 125)",
561 /// # "((MemberId::<()>(1), 3), 126)",
562 /// # "((MemberId::<()>(2), 0), 123)",
563 /// # "((MemberId::<()>(2), 1), 124)",
564 /// # "((MemberId::<()>(2), 2), 125)",
565 /// # "((MemberId::<()>(2), 3), 126)",
566 /// # "((MemberId::<()>(3), 0), 123)",
567 /// # "((MemberId::<()>(3), 1), 124)",
568 /// # "((MemberId::<()>(3), 2), 125)",
569 /// # "((MemberId::<()>(3), 3), 126)",
570 /// # ]);
571 /// # }));
572 /// # }
573 /// ```
574 pub fn send<L2, N: NetworkFor<(K, V)>>(
575 self,
576 to: &Process<'a, L2>,
577 via: N,
578 ) -> KeyedStream<
579 (MemberId<L>, K),
580 V,
581 Process<'a, L2>,
582 Unbounded,
583 <O as MinOrder<N::OrderingGuarantee>>::Min,
584 R,
585 >
586 where
587 K: Serialize + DeserializeOwned,
588 V: Serialize + DeserializeOwned,
589 O: MinOrder<N::OrderingGuarantee>,
590 {
591 let serialize_pipeline = Some(N::serialize_thunk(false));
592
593 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
594
595 let name = via.name();
596 if to.multiversioned() && name.is_none() {
597 panic!(
598 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
599 );
600 }
601
602 let raw_stream: Stream<
603 (MemberId<L>, (K, V)),
604 Process<'a, L2>,
605 Unbounded,
606 <O as MinOrder<N::OrderingGuarantee>>::Min,
607 R,
608 > = Stream::new(
609 to.clone(),
610 HydroNode::Network {
611 name: name.map(ToOwned::to_owned),
612 networking_info: N::networking_info(),
613 serialize_fn: serialize_pipeline.map(|e| e.into()),
614 instantiate_fn: DebugInstantiate::Building,
615 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
616 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
617 metadata: to.new_node_metadata(Stream::<
618 (MemberId<L>, (K, V)),
619 Cluster<'a, L2>,
620 Unbounded,
621 <O as MinOrder<N::OrderingGuarantee>>::Min,
622 R,
623 >::collection_kind()),
624 },
625 );
626
627 raw_stream
628 .map(q!(|(sender, (k, v))| ((sender, k), v)))
629 .into_keyed()
630 }
631}