Skip to main content

hydro_lang/location/
mod.rs

1//! Type definitions for distributed locations, which specify where pieces of a Hydro
2//! program will be executed.
3//!
4//! Hydro is a **global**, **distributed** programming model. This means that the data
5//! and computation in a Hydro program can be spread across multiple machines, data
6//! centers, and even continents. To achieve this, Hydro uses the concept of
7//! **locations** to keep track of _where_ data is located and computation is executed.
8//!
9//! Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
10//! which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
11//! and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
12//! to allow live collections to be _moved_ between locations via network send/receive.
13//!
14//! See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
15
16use std::fmt::Debug;
17use std::future::Future;
18use std::marker::PhantomData;
19use std::num::ParseIntError;
20use std::time::Duration;
21
22use bytes::{Bytes, BytesMut};
23use futures::stream::Stream as FuturesStream;
24use proc_macro2::Span;
25use quote::quote;
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use slotmap::{Key, new_key_type};
29use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
30use stageleft::{QuotedWithContext, q, quote_type};
31use syn::parse_quote;
32use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
33
34use crate::compile::ir::{
35    ClusterMembersState, DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
36};
37use crate::forward_handle::ForwardRef;
38#[cfg(stageleft_runtime)]
39use crate::forward_handle::{CycleCollection, ForwardHandle};
40use crate::live_collections::boundedness::{Bounded, Unbounded};
41use crate::live_collections::keyed_stream::KeyedStream;
42use crate::live_collections::singleton::Singleton;
43use crate::live_collections::stream::{
44    ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
45};
46#[cfg(stageleft_runtime)]
47use crate::location::dynamic::DynLocation;
48use crate::location::dynamic::{ClusterConsistency, LocationId};
49use crate::location::external_process::{
50    ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
51};
52use crate::nondet::NonDet;
53#[cfg(feature = "sim")]
54use crate::sim::SimSender;
55use crate::staging_util::get_this_crate;
56
57pub mod dynamic;
58
59pub mod external_process;
60pub use external_process::External;
61
62pub mod process;
63pub use process::Process;
64
65pub mod cluster;
66pub use cluster::Cluster;
67
68pub mod member_id;
69pub use member_id::{MemberId, TaglessMemberId};
70
71pub mod tick;
72pub use tick::{Atomic, Tick};
73
74/// An event indicating a change in membership status of a location in a group
75/// (e.g. a node in a [`Cluster`] or an external client connection).
76#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
77pub enum MembershipEvent {
78    /// The member has joined the group and is now active.
79    Joined,
80    /// The member has left the group and is no longer active.
81    Left,
82}
83
84/// A hint for configuring the network transport used by an external connection.
85///
86/// This controls how the underlying TCP listener is set up when binding
87/// external client connections via methods like [`Location::bind_single_client`]
88/// or [`Location::bidi_external_many_bytes`].
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
90pub enum NetworkHint {
91    /// Automatically select the network configuration (e.g. an ephemeral port).
92    Auto,
93    /// Use a TCP port, optionally specifying a fixed port number.
94    ///
95    /// If `None`, an available port will be chosen automatically.
96    /// If `Some(port)`, the given port number will be used.
97    TcpPort(Option<u16>),
98}
99
100pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
101    assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
102}
103
104#[stageleft::export(LocationKey)]
105new_key_type! {
106    /// A unique identifier for a clock tick.
107    pub struct LocationKey;
108}
109
110impl std::fmt::Display for LocationKey {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        write!(f, "loc{:?}", self.data()) // `"loc1v1"``
113    }
114}
115
116/// This is used for the ECS membership stream.
117/// TODO(mingwei): Make this more robust?
118impl std::str::FromStr for LocationKey {
119    type Err = Option<ParseIntError>;
120
121    fn from_str(s: &str) -> Result<Self, Self::Err> {
122        let nvn = s.strip_prefix("loc").ok_or(None)?;
123        let (idx, ver) = nvn.split_once("v").ok_or(None)?;
124        let idx: u64 = idx.parse()?;
125        let ver: u64 = ver.parse()?;
126        Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
127    }
128}
129
130impl LocationKey {
131    /// TODO(minwgei): Remove this and avoid magic key for simulator external.
132    /// The first location key, used by the simulator as the default external location.
133    pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); // `1v1`
134
135    /// A key for testing with index 1.
136    #[cfg(test)]
137    pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000001)); // `1v255`
138
139    /// A key for testing with index 2.
140    #[cfg(test)]
141    pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000002)); // `2v255`
142}
143
144/// This is used within `q!` code in docker and ECS.
145impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
146    type O = LocationKey;
147
148    fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
149    where
150        Self: Sized,
151    {
152        let root = get_this_crate();
153        let n = Key::data(&self).as_ffi();
154        (
155            QuoteTokens {
156                prelude: None,
157                expr: Some(quote! {
158                    #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
159                }),
160            },
161            (),
162        )
163    }
164}
165
166/// A simple enum for the type of a root location.
167#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
168pub enum LocationType {
169    /// A process (single node).
170    Process,
171    /// A cluster (multiple nodes).
172    Cluster,
173    /// An external client.
174    External,
175}
176
177/// A location where data can be materialized and computation can be executed.
178///
179/// Hydro is a **global**, **distributed** programming model. This means that the data
180/// and computation in a Hydro program can be spread across multiple machines, data
181/// centers, and even continents. To achieve this, Hydro uses the concept of
182/// **locations** to keep track of _where_ data is located and computation is executed.
183///
184/// Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
185/// which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
186/// and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
187/// to allow live collections to be _moved_ between locations via network send/receive.
188///
189/// See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
190#[expect(
191    private_bounds,
192    reason = "only internal Hydro code can define location types"
193)]
194pub trait Location<'a>: DynLocation {
195    /// The root location type for this location.
196    ///
197    /// For top-level locations like [`Process`] and [`Cluster`], this is `Self`.
198    /// For nested locations like [`Tick`], this is the root location that contains it.
199    type Root: Location<'a>;
200
201    /// Location type with consistency guarantees dropped for the live collection on it.
202    type NoConsistency: Location<'a, NoConsistency = Self::NoConsistency>;
203
204    /// Returns the root location for this location.
205    ///
206    /// For top-level locations like [`Process`] and [`Cluster`], this returns `self`.
207    /// For nested locations like [`Tick`], this returns the root location that contains it.
208    fn root(&self) -> Self::Root;
209
210    /// This location with consistency guarantees dropped for the live collection
211    fn drop_consistency(&self) -> Self::NoConsistency;
212    /// Gets the runtime enum variant for the current consistency level, if this is a cluster.
213    fn consistency() -> Option<ClusterConsistency>;
214
215    /// Updates the consistency guarantees to match that of the given location.
216    fn with_consistency_of<L2: Location<'a, NoConsistency = Self::NoConsistency>>(&self) -> L2 {
217        L2::make_from_nondet(self.drop_consistency())
218    }
219
220    #[doc(hidden)]
221    fn make_from_nondet(l2: Self::NoConsistency) -> Self;
222
223    /// Attempts to create a new [`Tick`] clock domain at this location.
224    ///
225    /// Returns `Some(Tick)` if this is a top-level location (like [`Process`] or [`Cluster`]),
226    /// or `None` if this location is already inside a tick (nested ticks are not supported).
227    ///
228    /// Prefer using [`Location::tick`] when you know the location is top-level.
229    fn try_tick(&self) -> Option<Tick<Self>> {
230        if Self::is_top_level() {
231            let id = self.flow_state().borrow_mut().next_clock_id();
232            Some(Tick {
233                id,
234                l: self.clone(),
235            })
236        } else {
237            None
238        }
239    }
240
241    /// Returns the unique identifier for this location.
242    fn id(&self) -> LocationId {
243        DynLocation::dyn_id(self)
244    }
245
246    /// Creates a new [`Tick`] clock domain at this location.
247    ///
248    /// A tick represents a logical clock that can be used to batch streaming data
249    /// into discrete time steps. This is useful for implementing iterative algorithms
250    /// or for synchronizing data across multiple streams.
251    ///
252    /// # Example
253    /// ```rust
254    /// # #[cfg(feature = "deploy")] {
255    /// # use hydro_lang::prelude::*;
256    /// # use futures::StreamExt;
257    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
258    /// let tick = process.tick();
259    /// let inside_tick = process
260    ///     .source_iter(q!(vec![1, 2, 3, 4]))
261    ///     .batch(&tick, nondet!(/** test */));
262    /// inside_tick.all_ticks()
263    /// # }, |mut stream| async move {
264    /// // 1, 2, 3, 4
265    /// # for w in vec![1, 2, 3, 4] {
266    /// #     assert_eq!(stream.next().await.unwrap(), w);
267    /// # }
268    /// # }));
269    /// # }
270    /// ```
271    fn tick(&self) -> Tick<Self> {
272        if let LocationId::Tick(_, _) = self.id() {
273            panic!("cannot create nested ticks");
274        }
275
276        let id = self.flow_state().borrow_mut().next_clock_id();
277        Tick {
278            id,
279            l: self.clone(),
280        }
281    }
282
283    /// Creates an unbounded stream that continuously emits unit values `()`.
284    ///
285    /// This is useful for driving computations that need to run continuously,
286    /// such as polling or heartbeat mechanisms.
287    ///
288    /// # Example
289    /// ```rust
290    /// # #[cfg(feature = "deploy")] {
291    /// # use hydro_lang::prelude::*;
292    /// # use futures::StreamExt;
293    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
294    /// let tick = process.tick();
295    /// process.spin()
296    ///     .batch(&tick, nondet!(/** test */))
297    ///     .map(q!(|_| 42))
298    ///     .all_ticks()
299    /// # }, |mut stream| async move {
300    /// // 42, 42, 42, ...
301    /// # assert_eq!(stream.next().await.unwrap(), 42);
302    /// # assert_eq!(stream.next().await.unwrap(), 42);
303    /// # assert_eq!(stream.next().await.unwrap(), 42);
304    /// # }));
305    /// # }
306    /// ```
307    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
308    where
309        Self: Sized,
310    {
311        if let LocationId::Tick(_, _) = self.id() {
312            panic!("cannot call spin() inside a tick");
313        }
314
315        Stream::new(
316            self.clone(),
317            HydroNode::Source {
318                source: HydroSource::Spin(),
319                metadata: self.new_node_metadata(Stream::<
320                    (),
321                    Self,
322                    Unbounded,
323                    TotalOrder,
324                    ExactlyOnce,
325                >::collection_kind()),
326            },
327        )
328    }
329
330    /// Creates a stream from an async [`FuturesStream`].
331    ///
332    /// This is useful for integrating with external async data sources,
333    /// such as network connections or file readers.
334    ///
335    /// # Example
336    /// ```rust
337    /// # #[cfg(feature = "deploy")] {
338    /// # use hydro_lang::prelude::*;
339    /// # use futures::StreamExt;
340    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
341    /// process.source_stream(q!(futures::stream::iter(vec![1, 2, 3])))
342    /// # }, |mut stream| async move {
343    /// // 1, 2, 3
344    /// # for w in vec![1, 2, 3] {
345    /// #     assert_eq!(stream.next().await.unwrap(), w);
346    /// # }
347    /// # }));
348    /// # }
349    /// ```
350    fn source_stream<T, E>(
351        &self,
352        e: impl QuotedWithContext<'a, E, Self>,
353    ) -> Stream<T, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>
354    where
355        E: FuturesStream<Item = T> + Unpin,
356        Self: Sized,
357    {
358        if let LocationId::Tick(_, _) = self.id() {
359            panic!("cannot call source_stream() inside a tick");
360        }
361
362        let e = e.splice_untyped_ctx(self);
363
364        let target_location = self.drop_consistency();
365        Stream::new(
366            target_location.clone(),
367            HydroNode::Source {
368                source: HydroSource::Stream(e.into()),
369                metadata: target_location.new_node_metadata(Stream::<
370                    T,
371                    Self::NoConsistency,
372                    Unbounded,
373                    TotalOrder,
374                    ExactlyOnce,
375                >::collection_kind()),
376            },
377        )
378    }
379
380    /// Creates a bounded stream from an iterator.
381    ///
382    /// The iterator is evaluated once at runtime, and all elements are emitted
383    /// in order. This is useful for creating streams from static data or
384    /// for testing.
385    ///
386    /// # Example
387    /// ```rust
388    /// # #[cfg(feature = "deploy")] {
389    /// # use hydro_lang::prelude::*;
390    /// # use futures::StreamExt;
391    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
392    /// process.source_iter(q!(vec![1, 2, 3, 4]))
393    /// # }, |mut stream| async move {
394    /// // 1, 2, 3, 4
395    /// # for w in vec![1, 2, 3, 4] {
396    /// #     assert_eq!(stream.next().await.unwrap(), w);
397    /// # }
398    /// # }));
399    /// # }
400    /// ```
401    fn source_iter<T, E>(
402        &self,
403        e: impl QuotedWithContext<'a, E, Self>,
404    ) -> Stream<T, Self::NoConsistency, Bounded, TotalOrder, ExactlyOnce>
405    where
406        E: IntoIterator<Item = T>,
407        Self: Sized,
408    {
409        let e = e.splice_typed_ctx(self);
410
411        let target_location = self.drop_consistency();
412        Stream::new(
413            target_location.clone(),
414            HydroNode::Source {
415                source: HydroSource::Iter(e.into()),
416                metadata: target_location.new_node_metadata(Stream::<
417                    T,
418                    Self::NoConsistency,
419                    Bounded,
420                    TotalOrder,
421                    ExactlyOnce,
422                >::collection_kind()),
423            },
424        )
425    }
426
427    #[deprecated(note = "use .source_cluster_membership_stream(...) instead")]
428    /// Creates a stream of membership events for a cluster.
429    ///
430    /// This stream emits [`MembershipEvent::Joined`] when a cluster member joins
431    /// and [`MembershipEvent::Left`] when a cluster member leaves. The stream is
432    /// keyed by the [`MemberId`] of the cluster member.
433    ///
434    /// This is useful for implementing protocols that need to track cluster membership,
435    /// such as broadcasting to all members or detecting failures.
436    ///
437    /// # Non-Determinism
438    /// This stream is non-deterministic because the timing of membership events, for example
439    /// if a node leaves, the membership event may not be received if the node left before the
440    /// stream was created.
441    ///
442    /// # Example
443    /// ```rust
444    /// # #[cfg(feature = "deploy")] {
445    /// # use hydro_lang::prelude::*;
446    /// # use futures::StreamExt;
447    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
448    /// let p1 = flow.process::<()>();
449    /// let workers: Cluster<()> = flow.cluster::<()>();
450    /// # // do nothing on each worker
451    /// # workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
452    /// let cluster_members = p1.source_cluster_members(&workers, nondet!(/** late joiners may miss events */));
453    /// # cluster_members.entries().send(&p2, TCP.fail_stop().bincode())
454    /// // if there are 4 members in the cluster, we would see a join event for each
455    /// // { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
456    /// # }, |mut stream| async move {
457    /// # let mut results = Vec::new();
458    /// # for w in 0..4 {
459    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
460    /// # }
461    /// # results.sort();
462    /// # assert_eq!(results, vec!["(MemberId::<()>(0), Joined)", "(MemberId::<()>(1), Joined)", "(MemberId::<()>(2), Joined)", "(MemberId::<()>(3), Joined)"]);
463    /// # }));
464    /// # }
465    /// ```
466    fn source_cluster_members<C: 'a>(
467        &self,
468        cluster: &Cluster<'a, C>,
469        nondet_start: NonDet,
470    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self::NoConsistency, Unbounded>
471    where
472        Self: Sized,
473    {
474        self.source_cluster_membership_stream(cluster, nondet_start)
475    }
476
477    /// Creates a stream of membership events for a cluster.
478    ///
479    /// This stream emits [`MembershipEvent::Joined`] when a cluster member joins
480    /// and [`MembershipEvent::Left`] when a cluster member leaves. The stream is
481    /// keyed by the [`MemberId`] of the cluster member.
482    ///
483    /// This is useful for implementing protocols that need to track cluster membership,
484    /// such as broadcasting to all members or detecting failures.
485    ///
486    /// # Non-Determinism
487    /// This stream is non-deterministic because the timing of membership events, for example
488    /// if a node leaves, the membership event may not be received if the node left before the
489    /// stream was created.
490    ///
491    /// # Example
492    /// ```rust
493    /// # #[cfg(feature = "deploy")] {
494    /// # use hydro_lang::prelude::*;
495    /// # use futures::StreamExt;
496    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
497    /// let p1 = flow.process::<()>();
498    /// let workers: Cluster<()> = flow.cluster::<()>();
499    /// # // do nothing on each worker
500    /// # workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
501    /// let cluster_members = p1.source_cluster_membership_stream(&workers, nondet!(/** late joiners may miss events */));
502    /// # cluster_members.entries().send(&p2, TCP.fail_stop().bincode())
503    /// // if there are 4 members in the cluster, we would see a join event for each
504    /// // { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
505    /// # }, |mut stream| async move {
506    /// # let mut results = Vec::new();
507    /// # for w in 0..4 {
508    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
509    /// # }
510    /// # results.sort();
511    /// # assert_eq!(results, vec!["(MemberId::<()>(0), Joined)", "(MemberId::<()>(1), Joined)", "(MemberId::<()>(2), Joined)", "(MemberId::<()>(3), Joined)"]);
512    /// # }));
513    /// # }
514    /// ```
515    fn source_cluster_membership_stream<C: 'a>(
516        &self,
517        cluster: &Cluster<'a, C>,
518        _nondet_start: NonDet,
519    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self::NoConsistency, Unbounded>
520    where
521        Self: Sized,
522    {
523        if let LocationId::Tick(_, _) = self.id() {
524            panic!("cannot call source_cluster_membership_stream() inside a tick");
525        }
526
527        let target_consistency = self.drop_consistency();
528        Stream::new(
529            target_consistency.clone(),
530            HydroNode::Source {
531                source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
532                metadata: target_consistency.new_node_metadata(Stream::<
533                    (TaglessMemberId, MembershipEvent),
534                    Self,
535                    Unbounded,
536                    TotalOrder,
537                    ExactlyOnce,
538                >::collection_kind(
539                )),
540            },
541        )
542        .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
543        .into_keyed()
544    }
545
546    /// Creates a one-way connection from an external process to receive raw bytes.
547    ///
548    /// Returns a port handle for the external process to connect to, and a stream
549    /// of received byte buffers.
550    ///
551    /// For bidirectional communication or typed data, see [`Location::bind_single_client`]
552    /// or [`Location::source_external_bincode`].
553    fn source_external_bytes<L>(
554        &self,
555        from: &External<L>,
556    ) -> (
557        ExternalBytesPort,
558        Stream<BytesMut, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>,
559    )
560    where
561        Self: Sized,
562    {
563        if let LocationId::Tick(_, _) = self.id() {
564            panic!("cannot call source_external_bytes() inside a tick");
565        }
566
567        let target_consistency = self.drop_consistency();
568        let (port, stream, sink) = target_consistency
569            .bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
570
571        sink.complete(target_consistency.source_iter(q!([])));
572
573        (port, stream)
574    }
575
576    /// Creates a one-way connection from an external process to receive bincode-serialized data.
577    ///
578    /// Returns a sink handle for the external process to send data to, and a stream
579    /// of received values.
580    ///
581    /// For bidirectional communication, see [`Location::bind_single_client_bincode`].
582    #[expect(clippy::type_complexity, reason = "stream markers")]
583    fn source_external_bincode<L, T, O: Ordering, R: Retries>(
584        &self,
585        from: &External<L>,
586    ) -> (
587        ExternalBincodeSink<T, NotMany, O, R>,
588        Stream<T, Self::NoConsistency, Unbounded, O, R>,
589    )
590    where
591        Self: Sized,
592        T: Serialize + DeserializeOwned,
593    {
594        if let LocationId::Tick(_, _) = self.id() {
595            panic!("cannot call source_external_bincode() inside a tick");
596        }
597
598        let target_consistency = self.drop_consistency();
599        let (port, stream, sink) = target_consistency.bind_single_client_bincode::<_, T, ()>(from);
600        sink.complete(target_consistency.source_iter(q!([])));
601
602        (
603            ExternalBincodeSink {
604                process_key: from.key,
605                port_id: port.port_id,
606                _phantom: PhantomData,
607            },
608            stream.weaken_ordering().weaken_retries(),
609        )
610    }
611
612    /// Sets up a simulated input port on this location for testing.
613    ///
614    /// Returns a handle to send messages to the location as well as a stream
615    /// of received messages. This is only available when the `sim` feature is enabled.
616    #[cfg(feature = "sim")]
617    #[expect(clippy::type_complexity, reason = "stream markers")]
618    fn sim_input<T, O: Ordering, R: Retries>(
619        &self,
620    ) -> (
621        SimSender<T, O, R>,
622        Stream<T, Self::NoConsistency, Unbounded, O, R>,
623    )
624    where
625        Self: Sized,
626        T: Serialize + DeserializeOwned,
627    {
628        if let LocationId::Tick(_, _) = self.id() {
629            panic!("cannot call sim_input() inside a tick");
630        }
631
632        let external_location: External<'a, ()> = External {
633            key: LocationKey::FIRST,
634            flow_state: self.flow_state().clone(),
635            _phantom: PhantomData,
636        };
637
638        let (external, stream) = self.source_external_bincode(&external_location);
639
640        (SimSender(external.port_id, PhantomData), stream)
641    }
642
643    /// Creates an external input stream for embedded deployment mode.
644    ///
645    /// The `name` parameter specifies the name of the generated function parameter
646    /// that will supply data to this stream at runtime. The generated function will
647    /// accept an `impl Stream<Item = T> + Unpin` argument with this name.
648    fn embedded_input<T>(
649        &self,
650        name: impl Into<String>,
651    ) -> Stream<T, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>
652    where
653        Self: Sized,
654    {
655        if let LocationId::Tick(_, _) = self.id() {
656            panic!("cannot call embedded_input() inside a tick");
657        }
658
659        let ident = syn::Ident::new(&name.into(), Span::call_site());
660
661        let target_location = self.drop_consistency();
662        Stream::new(
663            target_location.clone(),
664            HydroNode::Source {
665                source: HydroSource::Embedded(ident),
666                metadata: target_location.new_node_metadata(Stream::<
667                    T,
668                    Self,
669                    Unbounded,
670                    TotalOrder,
671                    ExactlyOnce,
672                >::collection_kind()),
673            },
674        )
675    }
676
677    /// Creates an embedded singleton input for embedded deployment mode.
678    ///
679    /// The `name` parameter specifies the name of the generated function parameter
680    /// that will supply data to this singleton at runtime. The generated function will
681    /// accept a plain `T` parameter with this name.
682    fn embedded_singleton_input<T>(
683        &self,
684        name: impl Into<String>,
685    ) -> Singleton<T, Self::NoConsistency, Bounded>
686    where
687        Self: Sized,
688    {
689        if let LocationId::Tick(_, _) = self.id() {
690            panic!("cannot call embedded_singleton_input() inside a tick");
691        }
692
693        let ident = syn::Ident::new(&name.into(), Span::call_site());
694
695        let target_location = self.drop_consistency();
696        Singleton::new(
697            target_location.clone(),
698            HydroNode::Source {
699                source: HydroSource::EmbeddedSingleton(ident),
700                metadata: target_location
701                    .new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
702            },
703        )
704    }
705
706    /// Establishes a server on this location to receive a bidirectional connection from a single
707    /// client, identified by the given `External` handle. Returns a port handle for the external
708    /// process to connect to, a stream of incoming messages, and a handle to send outgoing
709    /// messages.
710    ///
711    /// # Example
712    /// ```rust
713    /// # #[cfg(feature = "deploy")] {
714    /// # use hydro_lang::prelude::*;
715    /// # use hydro_deploy::Deployment;
716    /// # use futures::{SinkExt, StreamExt};
717    /// # tokio_test::block_on(async {
718    /// # use bytes::Bytes;
719    /// # use hydro_lang::location::NetworkHint;
720    /// # use tokio_util::codec::LengthDelimitedCodec;
721    /// # let mut flow = FlowBuilder::new();
722    /// let node = flow.process::<()>();
723    /// let external = flow.external::<()>();
724    /// let (port, incoming, outgoing) =
725    ///     node.bind_single_client::<_, Bytes, LengthDelimitedCodec>(&external, NetworkHint::Auto);
726    /// outgoing.complete(incoming.map(q!(|data /* : Bytes */| {
727    ///     let mut resp: Vec<u8> = data.into();
728    ///     resp.push(42);
729    ///     resp.into() // : Bytes
730    /// })));
731    ///
732    /// # let mut deployment = Deployment::new();
733    /// let nodes = flow // ... with_process and with_external
734    /// #     .with_process(&node, deployment.Localhost())
735    /// #     .with_external(&external, deployment.Localhost())
736    /// #     .deploy(&mut deployment);
737    ///
738    /// deployment.deploy().await.unwrap();
739    /// deployment.start().await.unwrap();
740    ///
741    /// let (mut external_out, mut external_in) = nodes.connect(port).await;
742    /// external_in.send(vec![1, 2, 3].into()).await.unwrap();
743    /// assert_eq!(
744    ///     external_out.next().await.unwrap().unwrap(),
745    ///     vec![1, 2, 3, 42]
746    /// );
747    /// # });
748    /// # }
749    /// ```
750    #[expect(clippy::type_complexity, reason = "stream markers")]
751    fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
752        &self,
753        from: &External<L>,
754        port_hint: NetworkHint,
755    ) -> (
756        ExternalBytesPort<NotMany>,
757        Stream<<Codec as Decoder>::Item, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>,
758        ForwardHandle<'a, Stream<T, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>>,
759    )
760    where
761        Self: Sized,
762    {
763        if let LocationId::Tick(_, _) = self.id() {
764            panic!("cannot call bind_single_client() inside a tick");
765        }
766
767        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
768        let target_consistency = self.drop_consistency();
769
770        let (fwd_ref, to_sink) =
771            target_consistency
772                .forward_ref::<Stream<T, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>>(
773                );
774        let mut flow_state_borrow = self.flow_state().borrow_mut();
775
776        flow_state_borrow.push_root(HydroRoot::SendExternal {
777            to_external_key: from.key,
778            to_port_id: next_external_port_id,
779            to_many: false,
780            unpaired: false,
781            serialize_fn: None,
782            instantiate_fn: DebugInstantiate::Building,
783            input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
784            op_metadata: HydroIrOpMetadata::new(),
785        });
786
787        let raw_stream: Stream<
788            Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
789            Self::NoConsistency,
790            Unbounded,
791            TotalOrder,
792            ExactlyOnce,
793        > = Stream::new(
794            target_consistency.clone(),
795            HydroNode::ExternalInput {
796                from_external_key: from.key,
797                from_port_id: next_external_port_id,
798                from_many: false,
799                codec_type: quote_type::<Codec>().into(),
800                port_hint,
801                instantiate_fn: DebugInstantiate::Building,
802                deserialize_fn: None,
803                metadata: target_consistency.new_node_metadata(Stream::<
804                    Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
805                    Self::NoConsistency,
806                    Unbounded,
807                    TotalOrder,
808                    ExactlyOnce,
809                >::collection_kind(
810                )),
811            },
812        );
813
814        (
815            ExternalBytesPort {
816                process_key: from.key,
817                port_id: next_external_port_id,
818                _phantom: PhantomData,
819            },
820            raw_stream.flatten_ordered(),
821            fwd_ref,
822        )
823    }
824
825    /// Establishes a bidirectional connection from a single external client using bincode serialization.
826    ///
827    /// Returns a port handle for the external process to connect to, a stream of incoming messages,
828    /// and a handle to send outgoing messages. This is a convenience wrapper around
829    /// [`Location::bind_single_client`] that uses bincode for serialization.
830    ///
831    /// # Type Parameters
832    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
833    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
834    #[expect(clippy::type_complexity, reason = "stream markers")]
835    fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
836        &self,
837        from: &External<L>,
838    ) -> (
839        ExternalBincodeBidi<InT, OutT, NotMany>,
840        Stream<InT, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>,
841        ForwardHandle<'a, Stream<OutT, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>>,
842    )
843    where
844        Self: Sized,
845    {
846        if let LocationId::Tick(_, _) = self.id() {
847            panic!("cannot call bind_single_client_bincode() inside a tick");
848        }
849
850        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
851
852        let target_consistency = self.drop_consistency();
853        let (fwd_ref, to_sink) = target_consistency.forward_ref::<Stream<
854            OutT,
855            Self::NoConsistency,
856            Unbounded,
857            TotalOrder,
858            ExactlyOnce,
859        >>();
860        let mut flow_state_borrow = self.flow_state().borrow_mut();
861
862        let root = get_this_crate();
863
864        let out_t_type = quote_type::<OutT>();
865        let ser_fn: syn::Expr = syn::parse_quote! {
866            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
867                |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
868            )
869        };
870
871        flow_state_borrow.push_root(HydroRoot::SendExternal {
872            to_external_key: from.key,
873            to_port_id: next_external_port_id,
874            to_many: false,
875            unpaired: false,
876            serialize_fn: Some(ser_fn.into()),
877            instantiate_fn: DebugInstantiate::Building,
878            input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
879            op_metadata: HydroIrOpMetadata::new(),
880        });
881
882        let in_t_type = quote_type::<InT>();
883
884        let deser_fn: syn::Expr = syn::parse_quote! {
885            |res| {
886                let b = res.unwrap();
887                #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
888            }
889        };
890
891        let raw_stream: Stream<InT, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce> =
892            Stream::new(
893                target_consistency.clone(),
894                HydroNode::ExternalInput {
895                    from_external_key: from.key,
896                    from_port_id: next_external_port_id,
897                    from_many: false,
898                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
899                    port_hint: NetworkHint::Auto,
900                    instantiate_fn: DebugInstantiate::Building,
901                    deserialize_fn: Some(deser_fn.into()),
902                    metadata: target_consistency.new_node_metadata(Stream::<
903                        InT,
904                        Self::NoConsistency,
905                        Unbounded,
906                        TotalOrder,
907                        ExactlyOnce,
908                    >::collection_kind(
909                    )),
910                },
911            );
912
913        (
914            ExternalBincodeBidi {
915                process_key: from.key,
916                port_id: next_external_port_id,
917                _phantom: PhantomData,
918            },
919            raw_stream,
920            fwd_ref,
921        )
922    }
923
924    /// Establishes a server on this location to receive bidirectional connections from multiple
925    /// external clients using raw bytes.
926    ///
927    /// Unlike [`Location::bind_single_client`], this method supports multiple concurrent client
928    /// connections. Each client is assigned a unique `u64` identifier.
929    ///
930    /// Returns:
931    /// - A port handle for external processes to connect to
932    /// - A keyed stream of incoming messages, keyed by client ID
933    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
934    /// - A handle to send outgoing messages, keyed by client ID
935    #[expect(clippy::type_complexity, reason = "stream markers")]
936    fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
937        &self,
938        from: &External<L>,
939        port_hint: NetworkHint,
940    ) -> (
941        ExternalBytesPort<Many>,
942        KeyedStream<
943            u64,
944            <Codec as Decoder>::Item,
945            Self::NoConsistency,
946            Unbounded,
947            TotalOrder,
948            ExactlyOnce,
949        >,
950        KeyedStream<u64, MembershipEvent, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>,
951        ForwardHandle<
952            'a,
953            KeyedStream<u64, T, Self::NoConsistency, Unbounded, NoOrder, ExactlyOnce>,
954        >,
955    )
956    where
957        Self: Sized,
958    {
959        if let LocationId::Tick(_, _) = self.id() {
960            panic!("cannot call bidi_external_many_bytes() inside a tick");
961        }
962
963        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
964
965        let target_consistency = self.drop_consistency();
966        let (fwd_ref, to_sink) = target_consistency.forward_ref::<KeyedStream<
967            u64,
968            T,
969            Self::NoConsistency,
970            Unbounded,
971            NoOrder,
972            ExactlyOnce,
973        >>();
974        let mut flow_state_borrow = self.flow_state().borrow_mut();
975
976        flow_state_borrow.push_root(HydroRoot::SendExternal {
977            to_external_key: from.key,
978            to_port_id: next_external_port_id,
979            to_many: true,
980            unpaired: false,
981            serialize_fn: None,
982            instantiate_fn: DebugInstantiate::Building,
983            input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
984            op_metadata: HydroIrOpMetadata::new(),
985        });
986
987        let raw_stream: Stream<
988            Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
989            Self::NoConsistency,
990            Unbounded,
991            TotalOrder,
992            ExactlyOnce,
993        > = Stream::new(
994            target_consistency.clone(),
995            HydroNode::ExternalInput {
996                from_external_key: from.key,
997                from_port_id: next_external_port_id,
998                from_many: true,
999                codec_type: quote_type::<Codec>().into(),
1000                port_hint,
1001                instantiate_fn: DebugInstantiate::Building,
1002                deserialize_fn: None,
1003                metadata: target_consistency.new_node_metadata(Stream::<
1004                    Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
1005                    Self::NoConsistency,
1006                    Unbounded,
1007                    TotalOrder,
1008                    ExactlyOnce,
1009                >::collection_kind(
1010                )),
1011            },
1012        );
1013
1014        let membership_stream_ident = syn::Ident::new(
1015            &format!(
1016                "__hydro_deploy_many_{}_{}_membership",
1017                from.key, next_external_port_id
1018            ),
1019            Span::call_site(),
1020        );
1021        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
1022        let raw_membership_stream: KeyedStream<
1023            u64,
1024            bool,
1025            Self::NoConsistency,
1026            Unbounded,
1027            TotalOrder,
1028            ExactlyOnce,
1029        > = KeyedStream::new(
1030            target_consistency.clone(),
1031            HydroNode::Source {
1032                source: HydroSource::Stream(membership_stream_expr.into()),
1033                metadata: target_consistency.new_node_metadata(KeyedStream::<
1034                    u64,
1035                    bool,
1036                    Self::NoConsistency,
1037                    Unbounded,
1038                    TotalOrder,
1039                    ExactlyOnce,
1040                >::collection_kind(
1041                )),
1042            },
1043        );
1044
1045        (
1046            ExternalBytesPort {
1047                process_key: from.key,
1048                port_id: next_external_port_id,
1049                _phantom: PhantomData,
1050            },
1051            raw_stream
1052                .flatten_ordered() // TODO(shadaj): this silently drops framing errors, decide on right defaults
1053                .into_keyed(),
1054            raw_membership_stream.map(q!(|join| {
1055                if join {
1056                    MembershipEvent::Joined
1057                } else {
1058                    MembershipEvent::Left
1059                }
1060            })),
1061            fwd_ref,
1062        )
1063    }
1064
1065    /// Establishes a server on this location to receive bidirectional connections from multiple
1066    /// external clients using bincode serialization.
1067    ///
1068    /// Unlike [`Location::bind_single_client_bincode`], this method supports multiple concurrent
1069    /// client connections. Each client is assigned a unique `u64` identifier.
1070    ///
1071    /// Returns:
1072    /// - A port handle for external processes to connect to
1073    /// - A keyed stream of incoming messages, keyed by client ID
1074    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
1075    /// - A handle to send outgoing messages, keyed by client ID
1076    ///
1077    /// # Type Parameters
1078    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
1079    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
1080    #[expect(clippy::type_complexity, reason = "stream markers")]
1081    fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
1082        &self,
1083        from: &External<L>,
1084    ) -> (
1085        ExternalBincodeBidi<InT, OutT, Many>,
1086        KeyedStream<u64, InT, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>,
1087        KeyedStream<u64, MembershipEvent, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>,
1088        ForwardHandle<
1089            'a,
1090            KeyedStream<u64, OutT, Self::NoConsistency, Unbounded, NoOrder, ExactlyOnce>,
1091        >,
1092    )
1093    where
1094        Self: Sized,
1095    {
1096        if let LocationId::Tick(_, _) = self.id() {
1097            panic!("cannot call bidi_external_many_bincode() inside a tick");
1098        }
1099
1100        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
1101
1102        let target_consistency = self.drop_consistency();
1103        let (fwd_ref, to_sink) = target_consistency.forward_ref::<KeyedStream<
1104            u64,
1105            OutT,
1106            Self::NoConsistency,
1107            Unbounded,
1108            NoOrder,
1109            ExactlyOnce,
1110        >>();
1111        let mut flow_state_borrow = self.flow_state().borrow_mut();
1112
1113        let root = get_this_crate();
1114
1115        let out_t_type = quote_type::<OutT>();
1116        let ser_fn: syn::Expr = syn::parse_quote! {
1117            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
1118                |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
1119            )
1120        };
1121
1122        flow_state_borrow.push_root(HydroRoot::SendExternal {
1123            to_external_key: from.key,
1124            to_port_id: next_external_port_id,
1125            to_many: true,
1126            unpaired: false,
1127            serialize_fn: Some(ser_fn.into()),
1128            instantiate_fn: DebugInstantiate::Building,
1129            input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
1130            op_metadata: HydroIrOpMetadata::new(),
1131        });
1132
1133        let in_t_type = quote_type::<InT>();
1134
1135        let deser_fn: syn::Expr = syn::parse_quote! {
1136            |res| {
1137                let (id, b) = res.unwrap();
1138                (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
1139            }
1140        };
1141
1142        let raw_stream: KeyedStream<
1143            u64,
1144            InT,
1145            Self::NoConsistency,
1146            Unbounded,
1147            TotalOrder,
1148            ExactlyOnce,
1149        > = KeyedStream::new(
1150            target_consistency.clone(),
1151            HydroNode::ExternalInput {
1152                from_external_key: from.key,
1153                from_port_id: next_external_port_id,
1154                from_many: true,
1155                codec_type: quote_type::<LengthDelimitedCodec>().into(),
1156                port_hint: NetworkHint::Auto,
1157                instantiate_fn: DebugInstantiate::Building,
1158                deserialize_fn: Some(deser_fn.into()),
1159                metadata: target_consistency.new_node_metadata(KeyedStream::<
1160                    u64,
1161                    InT,
1162                    Self::NoConsistency,
1163                    Unbounded,
1164                    TotalOrder,
1165                    ExactlyOnce,
1166                >::collection_kind(
1167                )),
1168            },
1169        );
1170
1171        let membership_stream_ident = syn::Ident::new(
1172            &format!(
1173                "__hydro_deploy_many_{}_{}_membership",
1174                from.key, next_external_port_id
1175            ),
1176            Span::call_site(),
1177        );
1178        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
1179        let raw_membership_stream: KeyedStream<
1180            u64,
1181            bool,
1182            Self::NoConsistency,
1183            Unbounded,
1184            TotalOrder,
1185            ExactlyOnce,
1186        > = KeyedStream::new(
1187            target_consistency.clone(),
1188            HydroNode::Source {
1189                source: HydroSource::Stream(membership_stream_expr.into()),
1190                metadata: target_consistency.new_node_metadata(KeyedStream::<
1191                    u64,
1192                    bool,
1193                    Self::NoConsistency,
1194                    Unbounded,
1195                    TotalOrder,
1196                    ExactlyOnce,
1197                >::collection_kind(
1198                )),
1199            },
1200        );
1201
1202        (
1203            ExternalBincodeBidi {
1204                process_key: from.key,
1205                port_id: next_external_port_id,
1206                _phantom: PhantomData,
1207            },
1208            raw_stream,
1209            raw_membership_stream.map(q!(|join| {
1210                if join {
1211                    MembershipEvent::Joined
1212                } else {
1213                    MembershipEvent::Left
1214                }
1215            })),
1216            fwd_ref,
1217        )
1218    }
1219
1220    /// Constructs a [`Singleton`] materialized at this location with the given static value.
1221    ///
1222    /// See also: [`Tick::singleton`], for creating a singleton _within_ a tick, which requires
1223    /// `T: Clone`.
1224    ///
1225    /// # Example
1226    /// ```rust
1227    /// # #[cfg(feature = "deploy")] {
1228    /// # use hydro_lang::prelude::*;
1229    /// # use futures::StreamExt;
1230    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1231    /// let singleton = process.singleton(q!(5));
1232    /// # singleton.into_stream()
1233    /// # }, |mut stream| async move {
1234    /// // 5
1235    /// # assert_eq!(stream.next().await.unwrap(), 5);
1236    /// # }));
1237    /// # }
1238    /// ```
1239    fn singleton<T>(
1240        &self,
1241        e: impl QuotedWithContext<'a, T, Self>,
1242    ) -> Singleton<T, Self::NoConsistency, Bounded>
1243    where
1244        Self: Sized,
1245    {
1246        let e = e.splice_untyped_ctx(self);
1247
1248        let target_location = self.drop_consistency();
1249        Singleton::new(
1250            target_location.clone(),
1251            HydroNode::SingletonSource {
1252                value: e.into(),
1253                first_tick_only: false,
1254                metadata: target_location.new_node_metadata(Singleton::<
1255                    T,
1256                    Self::NoConsistency,
1257                    Bounded,
1258                >::collection_kind()),
1259            },
1260        )
1261    }
1262
1263    /// Constructs a [`Singleton`] by resolving an async [`Future`] to completion.
1264    ///
1265    /// This is a convenience method equivalent to
1266    /// `self.singleton(future_expr).resolve_future_blocking()`, which is a common
1267    /// pattern when initializing a singleton from an async computation.
1268    ///
1269    /// # Example
1270    /// ```rust
1271    /// # #[cfg(feature = "deploy")] {
1272    /// # use hydro_lang::prelude::*;
1273    /// # use futures::StreamExt;
1274    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1275    /// let singleton = process.singleton_future(q!(async { 42 }));
1276    /// singleton.into_stream()
1277    /// # }, |mut stream| async move {
1278    /// // 42
1279    /// # assert_eq!(stream.next().await.unwrap(), 42);
1280    /// # }));
1281    /// # }
1282    /// ```
1283    ///
1284    /// [`Future`]: std::future::Future
1285    fn singleton_future<F>(
1286        &self,
1287        e: impl QuotedWithContext<'a, F, Self>,
1288    ) -> Singleton<F::Output, Self::NoConsistency, Bounded>
1289    where
1290        F: Future,
1291        Self: Sized,
1292    {
1293        self.singleton(e).resolve_future_blocking()
1294    }
1295
1296    /// Generates a stream with values emitted at a fixed interval, with
1297    /// each value being the current time (as an [`tokio::time::Instant`]).
1298    ///
1299    /// The clock source used is monotonic, so elements will be emitted in
1300    /// increasing order.
1301    ///
1302    /// # Non-Determinism
1303    /// Because this stream is generated by an OS timer, it will be
1304    /// non-deterministic because each timestamp will be arbitrary.
1305    fn source_interval(
1306        &self,
1307        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1308        _nondet: NonDet,
1309    ) -> Stream<tokio::time::Instant, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>
1310    where
1311        Self: Sized,
1312    {
1313        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1314            tokio::time::interval(interval)
1315        )))
1316    }
1317
1318    /// Generates a stream with values emitted at a fixed interval (with an
1319    /// initial delay), with each value being the current time
1320    /// (as an [`tokio::time::Instant`]).
1321    ///
1322    /// The clock source used is monotonic, so elements will be emitted in
1323    /// increasing order.
1324    ///
1325    /// # Non-Determinism
1326    /// Because this stream is generated by an OS timer, it will be
1327    /// non-deterministic because each timestamp will be arbitrary.
1328    fn source_interval_delayed(
1329        &self,
1330        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1331        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1332        _nondet: NonDet,
1333    ) -> Stream<tokio::time::Instant, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>
1334    where
1335        Self: Sized,
1336    {
1337        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1338            tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
1339        )))
1340    }
1341
1342    /// Creates a forward reference, allowing a stream to be used before its source is defined.
1343    ///
1344    /// Returns a `(handle, placeholder)` pair. Use the placeholder in the dataflow graph,
1345    /// then call `handle.complete(actual_stream)` to wire in the real source.
1346    ///
1347    /// This is useful for mutually-dependent dataflows or when the definition order
1348    /// doesn't match the data flow direction. For feedback loops, prefer [`Tick::cycle`]
1349    /// instead, which automatically defers values by one tick.
1350    ///
1351    /// # Panics
1352    /// Panics if the forward reference creates a synchronous cycle (i.e., the completed
1353    /// stream transitively depends on the placeholder without a `defer_tick` or network
1354    /// hop in between).
1355    ///
1356    /// # Example
1357    /// ```rust
1358    /// # #[cfg(feature = "deploy")] {
1359    /// # use hydro_lang::prelude::*;
1360    /// # use hydro_lang::live_collections::stream::NoOrder;
1361    /// # use futures::StreamExt;
1362    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1363    /// // Create a forward reference to define a stream that will be completed later
1364    /// let (complete, forward_stream) = process.forward_ref::<Stream<i32, _, _, NoOrder>>();
1365    ///
1366    /// // Use the forward reference as input to another computation
1367    /// let output: Stream<_, _, _, NoOrder> = forward_stream.map(q!(|x| x * 2));
1368    ///
1369    /// // Complete the forward reference with the actual source
1370    /// let source: Stream<_, _, Unbounded> = process.source_iter(q!([1, 2, 3])).into();
1371    /// complete.complete(source);
1372    /// output
1373    /// # }, |mut stream| async move {
1374    /// // 2, 4, 6
1375    /// # assert_eq!(stream.next().await.unwrap(), 2);
1376    /// # assert_eq!(stream.next().await.unwrap(), 4);
1377    /// # assert_eq!(stream.next().await.unwrap(), 6);
1378    /// # }));
1379    /// # }
1380    /// ```
1381    fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1382    where
1383        S: CycleCollection<'a, ForwardRef, Location = Self>,
1384    {
1385        let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1386        (
1387            ForwardHandle::new(cycle_id, Location::id(self)),
1388            S::create_source(cycle_id, self.clone()),
1389        )
1390    }
1391}
1392
1393#[cfg(feature = "deploy")]
1394#[cfg(test)]
1395mod tests {
1396    use std::collections::HashSet;
1397
1398    use futures::{SinkExt, StreamExt};
1399    use hydro_deploy::Deployment;
1400    use stageleft::q;
1401    use tokio_util::codec::LengthDelimitedCodec;
1402
1403    use crate::compile::builder::FlowBuilder;
1404    use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1405    use crate::location::{Location, NetworkHint};
1406    use crate::nondet::nondet;
1407
1408    #[tokio::test]
1409    async fn top_level_singleton_replay_cardinality() {
1410        let mut deployment = Deployment::new();
1411
1412        let mut flow = FlowBuilder::new();
1413        let node = flow.process::<()>();
1414        let external = flow.external::<()>();
1415
1416        let (in_port, input) =
1417            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1418        let singleton = node.singleton(q!(123));
1419        let tick = node.tick();
1420        let out = input
1421            .batch(&tick, nondet!(/** test */))
1422            .cross_singleton(singleton.clone().snapshot(&tick, nondet!(/** test */)))
1423            .cross_singleton(
1424                singleton
1425                    .snapshot(&tick, nondet!(/** test */))
1426                    .into_stream()
1427                    .count(),
1428            )
1429            .all_ticks()
1430            .send_bincode_external(&external);
1431
1432        let nodes = flow
1433            .with_process(&node, deployment.Localhost())
1434            .with_external(&external, deployment.Localhost())
1435            .deploy(&mut deployment);
1436
1437        deployment.deploy().await.unwrap();
1438
1439        let mut external_in = nodes.connect(in_port).await;
1440        let mut external_out = nodes.connect(out).await;
1441
1442        deployment.start().await.unwrap();
1443
1444        external_in.send(1).await.unwrap();
1445        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1446
1447        external_in.send(2).await.unwrap();
1448        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1449    }
1450
1451    #[tokio::test]
1452    async fn tick_singleton_replay_cardinality() {
1453        let mut deployment = Deployment::new();
1454
1455        let mut flow = FlowBuilder::new();
1456        let node = flow.process::<()>();
1457        let external = flow.external::<()>();
1458
1459        let (in_port, input) =
1460            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1461        let tick = node.tick();
1462        let singleton = tick.singleton(q!(123));
1463        let out = input
1464            .batch(&tick, nondet!(/** test */))
1465            .cross_singleton(singleton.clone())
1466            .cross_singleton(singleton.into_stream().count())
1467            .all_ticks()
1468            .send_bincode_external(&external);
1469
1470        let nodes = flow
1471            .with_process(&node, deployment.Localhost())
1472            .with_external(&external, deployment.Localhost())
1473            .deploy(&mut deployment);
1474
1475        deployment.deploy().await.unwrap();
1476
1477        let mut external_in = nodes.connect(in_port).await;
1478        let mut external_out = nodes.connect(out).await;
1479
1480        deployment.start().await.unwrap();
1481
1482        external_in.send(1).await.unwrap();
1483        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1484
1485        external_in.send(2).await.unwrap();
1486        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1487    }
1488
1489    #[tokio::test]
1490    async fn external_bytes() {
1491        let mut deployment = Deployment::new();
1492
1493        let mut flow = FlowBuilder::new();
1494        let first_node = flow.process::<()>();
1495        let external = flow.external::<()>();
1496
1497        let (in_port, input) = first_node.source_external_bytes(&external);
1498        let out = input.send_bincode_external(&external);
1499
1500        let nodes = flow
1501            .with_process(&first_node, deployment.Localhost())
1502            .with_external(&external, deployment.Localhost())
1503            .deploy(&mut deployment);
1504
1505        deployment.deploy().await.unwrap();
1506
1507        let mut external_in = nodes.connect(in_port).await.1;
1508        let mut external_out = nodes.connect(out).await;
1509
1510        deployment.start().await.unwrap();
1511
1512        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1513
1514        assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1515    }
1516
1517    #[tokio::test]
1518    async fn multi_external_source() {
1519        let mut deployment = Deployment::new();
1520
1521        let mut flow = FlowBuilder::new();
1522        let first_node = flow.process::<()>();
1523        let external = flow.external::<()>();
1524
1525        let (in_port, input, _membership, complete_sink) =
1526            first_node.bidi_external_many_bincode(&external);
1527        let out = input.entries().send_bincode_external(&external);
1528        complete_sink.complete(
1529            first_node
1530                .source_iter::<(u64, ()), _>(q!([]))
1531                .into_keyed()
1532                .weaken_ordering(),
1533        );
1534
1535        let nodes = flow
1536            .with_process(&first_node, deployment.Localhost())
1537            .with_external(&external, deployment.Localhost())
1538            .deploy(&mut deployment);
1539
1540        deployment.deploy().await.unwrap();
1541
1542        let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1543        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1544        let external_out = nodes.connect(out).await;
1545
1546        deployment.start().await.unwrap();
1547
1548        external_in_1.send(123).await.unwrap();
1549        external_in_2.send(456).await.unwrap();
1550
1551        assert_eq!(
1552            external_out.take(2).collect::<HashSet<_>>().await,
1553            vec![(0, 123), (1, 456)].into_iter().collect()
1554        );
1555    }
1556
1557    #[tokio::test]
1558    async fn second_connection_only_multi_source() {
1559        let mut deployment = Deployment::new();
1560
1561        let mut flow = FlowBuilder::new();
1562        let first_node = flow.process::<()>();
1563        let external = flow.external::<()>();
1564
1565        let (in_port, input, _membership, complete_sink) =
1566            first_node.bidi_external_many_bincode(&external);
1567        let out = input.entries().send_bincode_external(&external);
1568        complete_sink.complete(
1569            first_node
1570                .source_iter::<(u64, ()), _>(q!([]))
1571                .into_keyed()
1572                .weaken_ordering(),
1573        );
1574
1575        let nodes = flow
1576            .with_process(&first_node, deployment.Localhost())
1577            .with_external(&external, deployment.Localhost())
1578            .deploy(&mut deployment);
1579
1580        deployment.deploy().await.unwrap();
1581
1582        // intentionally skipped to test stream waking logic
1583        let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1584        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1585        let mut external_out = nodes.connect(out).await;
1586
1587        deployment.start().await.unwrap();
1588
1589        external_in_2.send(456).await.unwrap();
1590
1591        assert_eq!(external_out.next().await.unwrap(), (1, 456));
1592    }
1593
1594    #[tokio::test]
1595    async fn multi_external_bytes() {
1596        let mut deployment = Deployment::new();
1597
1598        let mut flow = FlowBuilder::new();
1599        let first_node = flow.process::<()>();
1600        let external = flow.external::<()>();
1601
1602        let (in_port, input, _membership, complete_sink) = first_node
1603            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1604        let out = input.entries().send_bincode_external(&external);
1605        complete_sink.complete(
1606            first_node
1607                .source_iter(q!([]))
1608                .into_keyed()
1609                .weaken_ordering(),
1610        );
1611
1612        let nodes = flow
1613            .with_process(&first_node, deployment.Localhost())
1614            .with_external(&external, deployment.Localhost())
1615            .deploy(&mut deployment);
1616
1617        deployment.deploy().await.unwrap();
1618
1619        let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1620        let mut external_in_2 = nodes.connect(in_port).await.1;
1621        let external_out = nodes.connect(out).await;
1622
1623        deployment.start().await.unwrap();
1624
1625        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1626        external_in_2.send(vec![4, 5].into()).await.unwrap();
1627
1628        assert_eq!(
1629            external_out.take(2).collect::<HashSet<_>>().await,
1630            vec![
1631                (0, (&[1u8, 2, 3] as &[u8]).into()),
1632                (1, (&[4u8, 5] as &[u8]).into())
1633            ]
1634            .into_iter()
1635            .collect()
1636        );
1637    }
1638
1639    #[tokio::test]
1640    async fn single_client_external_bytes() {
1641        let mut deployment = Deployment::new();
1642        let mut flow = FlowBuilder::new();
1643        let first_node = flow.process::<()>();
1644        let external = flow.external::<()>();
1645        let (port, input, complete_sink) = first_node
1646            .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1647        complete_sink.complete(input.map(q!(|data| {
1648            let mut resp: Vec<u8> = data.into();
1649            resp.push(42);
1650            resp.into() // : Bytes
1651        })));
1652
1653        let nodes = flow
1654            .with_process(&first_node, deployment.Localhost())
1655            .with_external(&external, deployment.Localhost())
1656            .deploy(&mut deployment);
1657
1658        deployment.deploy().await.unwrap();
1659        deployment.start().await.unwrap();
1660
1661        let (mut external_out, mut external_in) = nodes.connect(port).await;
1662
1663        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1664        assert_eq!(
1665            external_out.next().await.unwrap().unwrap(),
1666            vec![1, 2, 3, 42]
1667        );
1668    }
1669
1670    #[tokio::test]
1671    async fn echo_external_bytes() {
1672        let mut deployment = Deployment::new();
1673
1674        let mut flow = FlowBuilder::new();
1675        let first_node = flow.process::<()>();
1676        let external = flow.external::<()>();
1677
1678        let (port, input, _membership, complete_sink) = first_node
1679            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1680        complete_sink
1681            .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1682
1683        let nodes = flow
1684            .with_process(&first_node, deployment.Localhost())
1685            .with_external(&external, deployment.Localhost())
1686            .deploy(&mut deployment);
1687
1688        deployment.deploy().await.unwrap();
1689
1690        let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1691        let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1692
1693        deployment.start().await.unwrap();
1694
1695        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1696        external_in_2.send(vec![4, 5].into()).await.unwrap();
1697
1698        assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1699        assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1700    }
1701
1702    #[tokio::test]
1703    async fn echo_external_bincode() {
1704        let mut deployment = Deployment::new();
1705
1706        let mut flow = FlowBuilder::new();
1707        let first_node = flow.process::<()>();
1708        let external = flow.external::<()>();
1709
1710        let (port, input, _membership, complete_sink) =
1711            first_node.bidi_external_many_bincode(&external);
1712        complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1713
1714        let nodes = flow
1715            .with_process(&first_node, deployment.Localhost())
1716            .with_external(&external, deployment.Localhost())
1717            .deploy(&mut deployment);
1718
1719        deployment.deploy().await.unwrap();
1720
1721        let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1722        let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1723
1724        deployment.start().await.unwrap();
1725
1726        external_in_1.send("hi".to_owned()).await.unwrap();
1727        external_in_2.send("hello".to_owned()).await.unwrap();
1728
1729        assert_eq!(external_out_1.next().await.unwrap(), "HI");
1730        assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1731    }
1732
1733    #[tokio::test]
1734    async fn closure_location_name() {
1735        let mut deployment = Deployment::new();
1736        let mut flow = FlowBuilder::new();
1737
1738        enum ClosureProcess {}
1739
1740        let node = flow.process::<ClosureProcess>();
1741        let external = flow.external::<()>();
1742
1743        let (in_port, input) =
1744            node.source_external_bincode::<_, i32, TotalOrder, ExactlyOnce>(&external);
1745        let out = input.send_bincode_external(&external);
1746
1747        let nodes = flow
1748            .with_process(&node, deployment.Localhost())
1749            .with_external(&external, deployment.Localhost())
1750            .deploy(&mut deployment);
1751
1752        deployment.deploy().await.unwrap();
1753
1754        let mut external_in = nodes.connect(in_port).await;
1755        let mut external_out = nodes.connect(out).await;
1756
1757        deployment.start().await.unwrap();
1758
1759        external_in.send(42).await.unwrap();
1760        assert_eq!(external_out.next().await.unwrap(), 42);
1761    }
1762}