Skip to main content

hydro_lang/live_collections/keyed_stream/
networking.rs

1//! Networking APIs for [`KeyedStream`].
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5use stageleft::{q, quote_type};
6
7use super::KeyedStream;
8use crate::compile::ir::{DebugInstantiate, HydroNode};
9use crate::live_collections::boundedness::{Boundedness, Unbounded};
10use crate::live_collections::stream::{MinOrder, Ordering, Retries, Stream};
11use crate::location::cluster::NoConsistency;
12#[cfg(stageleft_runtime)]
13use crate::location::dynamic::DynLocation;
14use crate::location::{Cluster, MemberId, Process};
15use crate::networking::{NetworkFor, TCP};
16
17impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
18    KeyedStream<MemberId<L2>, T, Process<'a, L>, B, O, R>
19{
20    #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
21    /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
22    /// identifying the recipient for each group and using [`bincode`] to serialize/deserialize messages.
23    ///
24    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
25    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
26    /// API allows precise targeting of specific cluster members rather than broadcasting to
27    /// all members.
28    ///
29    /// # Example
30    /// ```rust
31    /// # #[cfg(feature = "deploy")] {
32    /// # use hydro_lang::prelude::*;
33    /// # use futures::StreamExt;
34    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
35    /// let p1 = flow.process::<()>();
36    /// let workers: Cluster<()> = flow.cluster::<()>();
37    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
38    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
39    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
40    ///     .into_keyed()
41    ///     .demux_bincode(&workers);
42    /// # on_worker.send_bincode(&p2).entries()
43    /// // if there are 4 members in the cluster, each receives one element
44    /// // - MemberId::<()>(0): [0]
45    /// // - MemberId::<()>(1): [1]
46    /// // - MemberId::<()>(2): [2]
47    /// // - MemberId::<()>(3): [3]
48    /// # }, |mut stream| async move {
49    /// # let mut results = Vec::new();
50    /// # for w in 0..4 {
51    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
52    /// # }
53    /// # results.sort();
54    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
55    /// # }));
56    /// # }
57    /// ```
58    pub fn demux_bincode(
59        self,
60        other: &Cluster<'a, L2>,
61    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
62    where
63        T: Serialize + DeserializeOwned,
64    {
65        self.demux(other, TCP.fail_stop().bincode())
66    }
67
68    /// Sends each group of this stream to a specific member of a cluster, with the [`MemberId`] key
69    /// identifying the recipient for each group and using the configuration in `via` to set up the
70    /// message transport.
71    ///
72    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
73    /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
74    /// API allows precise targeting of specific cluster members rather than broadcasting to
75    /// all members.
76    ///
77    /// # Example
78    /// ```rust
79    /// # #[cfg(feature = "deploy")] {
80    /// # use hydro_lang::prelude::*;
81    /// # use futures::StreamExt;
82    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
83    /// let p1 = flow.process::<()>();
84    /// let workers: Cluster<()> = flow.cluster::<()>();
85    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
86    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
87    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
88    ///     .into_keyed()
89    ///     .demux(&workers, TCP.fail_stop().bincode());
90    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
91    /// // if there are 4 members in the cluster, each receives one element
92    /// // - MemberId::<()>(0): [0]
93    /// // - MemberId::<()>(1): [1]
94    /// // - MemberId::<()>(2): [2]
95    /// // - MemberId::<()>(3): [3]
96    /// # }, |mut stream| async move {
97    /// # let mut results = Vec::new();
98    /// # for w in 0..4 {
99    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
100    /// # }
101    /// # results.sort();
102    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
103    /// # }));
104    /// # }
105    /// ```
106    #[expect(clippy::type_complexity, reason = "NoConsistency type")]
107    pub fn demux<N: NetworkFor<T>>(
108        self,
109        to: &Cluster<'a, L2>,
110        via: N,
111    ) -> Stream<
112        T,
113        Cluster<'a, L2, NoConsistency>,
114        Unbounded,
115        <O as MinOrder<N::OrderingGuarantee>>::Min,
116        R,
117    >
118    where
119        T: Serialize + DeserializeOwned,
120        O: MinOrder<N::OrderingGuarantee>,
121    {
122        let serialize_pipeline = Some(N::serialize_thunk(true));
123
124        let deserialize_pipeline = Some(N::deserialize_thunk(None));
125
126        let name = via.name();
127        if to.multiversioned() && name.is_none() {
128            panic!(
129                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
130            );
131        }
132
133        Stream::new(
134            to.clone(),
135            HydroNode::Network {
136                name: name.map(ToOwned::to_owned),
137                networking_info: N::networking_info(),
138                serialize_fn: serialize_pipeline.map(|e| e.into()),
139                instantiate_fn: DebugInstantiate::Building,
140                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
141                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
142                metadata: to.new_node_metadata(Stream::<
143                    T,
144                    Cluster<'a, L2>,
145                    Unbounded,
146                    <O as MinOrder<N::OrderingGuarantee>>::Min,
147                    R,
148                >::collection_kind()),
149            },
150        )
151    }
152}
153
154impl<'a, K, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
155    KeyedStream<(MemberId<L2>, K), T, Process<'a, L>, B, O, R>
156{
157    #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
158    /// Sends each group of this stream to a specific member of a cluster. The input stream has a
159    /// compound key where the first element is the recipient's [`MemberId`] and the second element
160    /// is a key that will be sent along with the value, using [`bincode`] to serialize/deserialize
161    /// messages.
162    ///
163    /// # Example
164    /// ```rust
165    /// # #[cfg(feature = "deploy")] {
166    /// # use hydro_lang::prelude::*;
167    /// # use futures::StreamExt;
168    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
169    /// let p1 = flow.process::<()>();
170    /// let workers: Cluster<()> = flow.cluster::<()>();
171    /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
172    ///     .source_iter(q!(vec![0, 1, 2, 3]))
173    ///     .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
174    ///     .into_keyed();
175    /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux_bincode(&workers);
176    /// # on_worker.entries().send_bincode(&p2).entries()
177    /// // if there are 4 members in the cluster, each receives one element
178    /// // - MemberId::<()>(0): { 0: [123] }
179    /// // - MemberId::<()>(1): { 1: [124] }
180    /// // - ...
181    /// # }, |mut stream| async move {
182    /// # let mut results = Vec::new();
183    /// # for w in 0..4 {
184    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
185    /// # }
186    /// # results.sort();
187    /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
188    /// # }));
189    /// # }
190    /// ```
191    pub fn demux_bincode(
192        self,
193        other: &Cluster<'a, L2>,
194    ) -> KeyedStream<K, T, Cluster<'a, L2>, Unbounded, O, R>
195    where
196        K: Serialize + DeserializeOwned,
197        T: Serialize + DeserializeOwned,
198    {
199        self.demux(other, TCP.fail_stop().bincode())
200    }
201
202    /// Sends each group of this stream to a specific member of a cluster. The input stream has a
203    /// compound key where the first element is the recipient's [`MemberId`] and the second element
204    /// is a key that will be sent along with the value, using the configuration in `via` to set up
205    /// the message transport.
206    ///
207    /// # Example
208    /// ```rust
209    /// # #[cfg(feature = "deploy")] {
210    /// # use hydro_lang::prelude::*;
211    /// # use futures::StreamExt;
212    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
213    /// let p1 = flow.process::<()>();
214    /// let workers: Cluster<()> = flow.cluster::<()>();
215    /// let to_send: KeyedStream<_, _, Process<_>, _> = p1
216    ///     .source_iter(q!(vec![0, 1, 2, 3]))
217    ///     .map(q!(|x| ((hydro_lang::location::MemberId::from_raw_id(x), x), x + 123)))
218    ///     .into_keyed();
219    /// let on_worker: KeyedStream<_, _, Cluster<_>, _> = to_send.demux(&workers, TCP.fail_stop().bincode());
220    /// # on_worker.entries().send(&p2, TCP.fail_stop().bincode()).entries()
221    /// // if there are 4 members in the cluster, each receives one element
222    /// // - MemberId::<()>(0): { 0: [123] }
223    /// // - MemberId::<()>(1): { 1: [124] }
224    /// // - ...
225    /// # }, |mut stream| async move {
226    /// # let mut results = Vec::new();
227    /// # for w in 0..4 {
228    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
229    /// # }
230    /// # results.sort();
231    /// # assert_eq!(results, vec!["(MemberId::<()>(0), (0, 123))", "(MemberId::<()>(1), (1, 124))", "(MemberId::<()>(2), (2, 125))", "(MemberId::<()>(3), (3, 126))"]);
232    /// # }));
233    /// # }
234    /// ```
235    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
236    pub fn demux<N: NetworkFor<(K, T)>>(
237        self,
238        to: &Cluster<'a, L2>,
239        via: N,
240    ) -> KeyedStream<
241        K,
242        T,
243        Cluster<'a, L2, NoConsistency>,
244        Unbounded,
245        <O as MinOrder<N::OrderingGuarantee>>::Min,
246        R,
247    >
248    where
249        K: Serialize + DeserializeOwned,
250        T: Serialize + DeserializeOwned,
251        O: MinOrder<N::OrderingGuarantee>,
252    {
253        let serialize_pipeline = Some(N::serialize_thunk(true));
254
255        let deserialize_pipeline = Some(N::deserialize_thunk(None));
256
257        let name = via.name();
258        if to.multiversioned() && name.is_none() {
259            panic!(
260                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
261            );
262        }
263
264        KeyedStream::new(
265            to.clone(),
266            HydroNode::Network {
267                name: name.map(ToOwned::to_owned),
268                networking_info: N::networking_info(),
269                serialize_fn: serialize_pipeline.map(|e| e.into()),
270                instantiate_fn: DebugInstantiate::Building,
271                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
272                input: Box::new(
273                    self.entries()
274                        .map(q!(|((id, k), v)| (id, (k, v))))
275                        .ir_node
276                        .replace(HydroNode::Placeholder),
277                ),
278                metadata: to.new_node_metadata(KeyedStream::<
279                    K,
280                    T,
281                    Cluster<'a, L2>,
282                    Unbounded,
283                    <O as MinOrder<N::OrderingGuarantee>>::Min,
284                    R,
285                >::collection_kind()),
286            },
287        )
288    }
289}
290
291impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
292    KeyedStream<MemberId<L2>, T, Cluster<'a, L>, B, O, R>
293{
294    #[deprecated = "use KeyedStream::demux(..., TCP.fail_stop().bincode()) instead"]
295    /// Sends each group of this stream at each source member to a specific member of a destination
296    /// cluster, with the [`MemberId`] key identifying the recipient for each group and using
297    /// [`bincode`] to serialize/deserialize messages.
298    ///
299    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
300    /// which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`], this
301    /// API allows precise targeting of specific cluster members rather than broadcasting to all
302    /// members.
303    ///
304    /// Each cluster member sends its local stream elements, and they are collected at each
305    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
306    ///
307    /// # Example
308    /// ```rust
309    /// # #[cfg(feature = "deploy")] {
310    /// # use hydro_lang::prelude::*;
311    /// # use futures::StreamExt;
312    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
313    /// # type Source = ();
314    /// # type Destination = ();
315    /// let source: Cluster<Source> = flow.cluster::<Source>();
316    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
317    ///     .source_iter(q!(vec![0, 1, 2, 3]))
318    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
319    ///     .into_keyed();
320    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
321    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
322    /// # all_received.entries().send_bincode(&p2).entries()
323    /// # }, |mut stream| async move {
324    /// // if there are 4 members in the destination cluster, each receives one message from each source member
325    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
326    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
327    /// // - ...
328    /// # let mut results = Vec::new();
329    /// # for w in 0..16 {
330    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
331    /// # }
332    /// # results.sort();
333    /// # assert_eq!(results, vec![
334    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
335    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
336    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
337    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
338    /// # ]);
339    /// # }));
340    /// # }
341    /// ```
342    pub fn demux_bincode(
343        self,
344        other: &Cluster<'a, L2>,
345    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
346    where
347        T: Serialize + DeserializeOwned,
348    {
349        self.demux(other, TCP.fail_stop().bincode())
350    }
351
352    /// Sends each group of this stream at each source member to a specific member of a destination
353    /// cluster, with the [`MemberId`] key identifying the recipient for each group and using the
354    /// configuration in `via` to set up the message transport.
355    ///
356    /// Each key must be a `MemberId<L2>` and each value must be a `T` where the key specifies
357    /// which cluster member should receive the data. Unlike [`Stream::broadcast`], this
358    /// API allows precise targeting of specific cluster members rather than broadcasting to all
359    /// members.
360    ///
361    /// Each cluster member sends its local stream elements, and they are collected at each
362    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
363    ///
364    /// # Example
365    /// ```rust
366    /// # #[cfg(feature = "deploy")] {
367    /// # use hydro_lang::prelude::*;
368    /// # use futures::StreamExt;
369    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
370    /// # type Source = ();
371    /// # type Destination = ();
372    /// let source: Cluster<Source> = flow.cluster::<Source>();
373    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
374    ///     .source_iter(q!(vec![0, 1, 2, 3]))
375    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
376    ///     .into_keyed();
377    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
378    /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
379    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
380    /// # }, |mut stream| async move {
381    /// // if there are 4 members in the destination cluster, each receives one message from each source member
382    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
383    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
384    /// // - ...
385    /// # let mut results = Vec::new();
386    /// # for w in 0..16 {
387    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
388    /// # }
389    /// # results.sort();
390    /// # assert_eq!(results, vec![
391    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
392    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
393    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
394    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
395    /// # ]);
396    /// # }));
397    /// # }
398    /// ```
399    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
400    pub fn demux<N: NetworkFor<T>>(
401        self,
402        to: &Cluster<'a, L2>,
403        via: N,
404    ) -> KeyedStream<
405        MemberId<L>,
406        T,
407        Cluster<'a, L2, NoConsistency>,
408        Unbounded,
409        <O as MinOrder<N::OrderingGuarantee>>::Min,
410        R,
411    >
412    where
413        T: Serialize + DeserializeOwned,
414        O: MinOrder<N::OrderingGuarantee>,
415    {
416        let serialize_pipeline = Some(N::serialize_thunk(true));
417
418        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
419
420        let name = via.name();
421        if to.multiversioned() && name.is_none() {
422            panic!(
423                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
424            );
425        }
426
427        KeyedStream::new(
428            to.clone(),
429            HydroNode::Network {
430                name: name.map(ToOwned::to_owned),
431                networking_info: N::networking_info(),
432                serialize_fn: serialize_pipeline.map(|e| e.into()),
433                instantiate_fn: DebugInstantiate::Building,
434                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
435                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
436                metadata: to.new_node_metadata(KeyedStream::<
437                    MemberId<L>,
438                    T,
439                    Cluster<'a, L2>,
440                    Unbounded,
441                    <O as MinOrder<N::OrderingGuarantee>>::Min,
442                    R,
443                >::collection_kind()),
444            },
445        )
446    }
447}
448
449impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries>
450    KeyedStream<K, V, Cluster<'a, L>, B, O, R>
451{
452    #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
453    #[deprecated = "use KeyedStream::send(..., TCP.fail_stop().bincode()) instead"]
454    /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
455    /// network, using [`bincode`] to serialize/deserialize messages. The resulting [`KeyedStream`]
456    /// has a compound key where the first element is the sender's [`MemberId`] and the second
457    /// element is the original key.
458    ///
459    /// # Example
460    /// ```rust
461    /// # #[cfg(feature = "deploy")] {
462    /// # use hydro_lang::prelude::*;
463    /// # use futures::StreamExt;
464    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
465    /// # type Source = ();
466    /// # type Destination = ();
467    /// let source: Cluster<Source> = flow.cluster::<Source>();
468    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
469    ///     .source_iter(q!(vec![0, 1, 2, 3]))
470    ///     .map(q!(|x| (x, x + 123)))
471    ///     .into_keyed();
472    /// let destination_process = flow.process::<Destination>();
473    /// let all_received = to_send.send_bincode(&destination_process); // KeyedStream<(MemberId<Source>, i32), i32, ...>
474    /// # all_received.entries().send_bincode(&p2)
475    /// # }, |mut stream| async move {
476    /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
477    /// // {
478    /// //     (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
479    /// //     (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
480    /// //     ...
481    /// // }
482    /// # let mut results = Vec::new();
483    /// # for w in 0..16 {
484    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
485    /// # }
486    /// # results.sort();
487    /// # assert_eq!(results, vec![
488    /// #   "((MemberId::<()>(0), 0), 123)",
489    /// #   "((MemberId::<()>(0), 1), 124)",
490    /// #   "((MemberId::<()>(0), 2), 125)",
491    /// #   "((MemberId::<()>(0), 3), 126)",
492    /// #   "((MemberId::<()>(1), 0), 123)",
493    /// #   "((MemberId::<()>(1), 1), 124)",
494    /// #   "((MemberId::<()>(1), 2), 125)",
495    /// #   "((MemberId::<()>(1), 3), 126)",
496    /// #   "((MemberId::<()>(2), 0), 123)",
497    /// #   "((MemberId::<()>(2), 1), 124)",
498    /// #   "((MemberId::<()>(2), 2), 125)",
499    /// #   "((MemberId::<()>(2), 3), 126)",
500    /// #   "((MemberId::<()>(3), 0), 123)",
501    /// #   "((MemberId::<()>(3), 1), 124)",
502    /// #   "((MemberId::<()>(3), 2), 125)",
503    /// #   "((MemberId::<()>(3), 3), 126)",
504    /// # ]);
505    /// # }));
506    /// # }
507    /// ```
508    pub fn send_bincode<L2>(
509        self,
510        other: &Process<'a, L2>,
511    ) -> KeyedStream<(MemberId<L>, K), V, Process<'a, L2>, Unbounded, O, R>
512    where
513        K: Serialize + DeserializeOwned,
514        V: Serialize + DeserializeOwned,
515    {
516        self.send(other, TCP.fail_stop().bincode())
517    }
518
519    #[expect(clippy::type_complexity, reason = "compound key types with ordering")]
520    /// "Moves" elements of this keyed stream from a cluster to a process by sending them over the
521    /// network, using the configuration in `via` to set up the message transport. The resulting
522    /// [`KeyedStream`] has a compound key where the first element is the sender's [`MemberId`] and
523    /// the second element is the original key.
524    ///
525    /// # Example
526    /// ```rust
527    /// # #[cfg(feature = "deploy")] {
528    /// # use hydro_lang::prelude::*;
529    /// # use futures::StreamExt;
530    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
531    /// # type Source = ();
532    /// # type Destination = ();
533    /// let source: Cluster<Source> = flow.cluster::<Source>();
534    /// let to_send: KeyedStream<_, _, Cluster<_>, _> = source
535    ///     .source_iter(q!(vec![0, 1, 2, 3]))
536    ///     .map(q!(|x| (x, x + 123)))
537    ///     .into_keyed();
538    /// let destination_process = flow.process::<Destination>();
539    /// let all_received = to_send.send(&destination_process, TCP.fail_stop().bincode()); // KeyedStream<(MemberId<Source>, i32), i32, ...>
540    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode())
541    /// # }, |mut stream| async move {
542    /// // if there are 4 members in the source cluster, the destination process receives four messages from each source member
543    /// // {
544    /// //     (MemberId<Source>(0), 0): [123], (MemberId<Source>(1), 0): [123], ...,
545    /// //     (MemberId<Source>(0), 1): [124], (MemberId<Source>(1), 1): [124], ...,
546    /// //     ...
547    /// // }
548    /// # let mut results = Vec::new();
549    /// # for w in 0..16 {
550    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
551    /// # }
552    /// # results.sort();
553    /// # assert_eq!(results, vec![
554    /// #   "((MemberId::<()>(0), 0), 123)",
555    /// #   "((MemberId::<()>(0), 1), 124)",
556    /// #   "((MemberId::<()>(0), 2), 125)",
557    /// #   "((MemberId::<()>(0), 3), 126)",
558    /// #   "((MemberId::<()>(1), 0), 123)",
559    /// #   "((MemberId::<()>(1), 1), 124)",
560    /// #   "((MemberId::<()>(1), 2), 125)",
561    /// #   "((MemberId::<()>(1), 3), 126)",
562    /// #   "((MemberId::<()>(2), 0), 123)",
563    /// #   "((MemberId::<()>(2), 1), 124)",
564    /// #   "((MemberId::<()>(2), 2), 125)",
565    /// #   "((MemberId::<()>(2), 3), 126)",
566    /// #   "((MemberId::<()>(3), 0), 123)",
567    /// #   "((MemberId::<()>(3), 1), 124)",
568    /// #   "((MemberId::<()>(3), 2), 125)",
569    /// #   "((MemberId::<()>(3), 3), 126)",
570    /// # ]);
571    /// # }));
572    /// # }
573    /// ```
574    pub fn send<L2, N: NetworkFor<(K, V)>>(
575        self,
576        to: &Process<'a, L2>,
577        via: N,
578    ) -> KeyedStream<
579        (MemberId<L>, K),
580        V,
581        Process<'a, L2>,
582        Unbounded,
583        <O as MinOrder<N::OrderingGuarantee>>::Min,
584        R,
585    >
586    where
587        K: Serialize + DeserializeOwned,
588        V: Serialize + DeserializeOwned,
589        O: MinOrder<N::OrderingGuarantee>,
590    {
591        let serialize_pipeline = Some(N::serialize_thunk(false));
592
593        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
594
595        let name = via.name();
596        if to.multiversioned() && name.is_none() {
597            panic!(
598                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
599            );
600        }
601
602        let raw_stream: Stream<
603            (MemberId<L>, (K, V)),
604            Process<'a, L2>,
605            Unbounded,
606            <O as MinOrder<N::OrderingGuarantee>>::Min,
607            R,
608        > = Stream::new(
609            to.clone(),
610            HydroNode::Network {
611                name: name.map(ToOwned::to_owned),
612                networking_info: N::networking_info(),
613                serialize_fn: serialize_pipeline.map(|e| e.into()),
614                instantiate_fn: DebugInstantiate::Building,
615                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
616                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
617                metadata: to.new_node_metadata(Stream::<
618                    (MemberId<L>, (K, V)),
619                    Cluster<'a, L2>,
620                    Unbounded,
621                    <O as MinOrder<N::OrderingGuarantee>>::Min,
622                    R,
623                >::collection_kind()),
624            },
625        );
626
627        raw_stream
628            .map(q!(|(sender, (k, v))| ((sender, k), v)))
629            .into_keyed()
630    }
631}