1use std::fmt::Debug;
17use std::future::Future;
18use std::marker::PhantomData;
19use std::num::ParseIntError;
20use std::time::Duration;
21
22use bytes::{Bytes, BytesMut};
23use futures::stream::Stream as FuturesStream;
24use proc_macro2::Span;
25use quote::quote;
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use slotmap::{Key, new_key_type};
29use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
30use stageleft::{QuotedWithContext, q, quote_type};
31use syn::parse_quote;
32use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
33
34use crate::compile::ir::{
35 ClusterMembersState, DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
36};
37use crate::forward_handle::ForwardRef;
38#[cfg(stageleft_runtime)]
39use crate::forward_handle::{CycleCollection, ForwardHandle};
40use crate::live_collections::boundedness::{Bounded, Unbounded};
41use crate::live_collections::keyed_stream::KeyedStream;
42use crate::live_collections::singleton::Singleton;
43use crate::live_collections::stream::{
44 ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
45};
46#[cfg(stageleft_runtime)]
47use crate::location::dynamic::DynLocation;
48use crate::location::dynamic::{ClusterConsistency, LocationId};
49use crate::location::external_process::{
50 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
51};
52use crate::nondet::NonDet;
53#[cfg(feature = "sim")]
54use crate::sim::SimSender;
55use crate::staging_util::get_this_crate;
56
57pub mod dynamic;
58
59pub mod external_process;
60pub use external_process::External;
61
62pub mod process;
63pub use process::Process;
64
65pub mod cluster;
66pub use cluster::Cluster;
67
68pub mod member_id;
69pub use member_id::{MemberId, TaglessMemberId};
70
71pub mod tick;
72pub use tick::{Atomic, Tick};
73
74#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
77pub enum MembershipEvent {
78 Joined,
80 Left,
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
90pub enum NetworkHint {
91 Auto,
93 TcpPort(Option<u16>),
98}
99
100pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
101 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
102}
103
104#[stageleft::export(LocationKey)]
105new_key_type! {
106 pub struct LocationKey;
108}
109
110impl std::fmt::Display for LocationKey {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 write!(f, "loc{:?}", self.data()) }
114}
115
116impl std::str::FromStr for LocationKey {
119 type Err = Option<ParseIntError>;
120
121 fn from_str(s: &str) -> Result<Self, Self::Err> {
122 let nvn = s.strip_prefix("loc").ok_or(None)?;
123 let (idx, ver) = nvn.split_once("v").ok_or(None)?;
124 let idx: u64 = idx.parse()?;
125 let ver: u64 = ver.parse()?;
126 Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
127 }
128}
129
130impl LocationKey {
131 pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); #[cfg(test)]
137 pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000001)); #[cfg(test)]
141 pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000002)); }
143
144impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
146 type O = LocationKey;
147
148 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
149 where
150 Self: Sized,
151 {
152 let root = get_this_crate();
153 let n = Key::data(&self).as_ffi();
154 (
155 QuoteTokens {
156 prelude: None,
157 expr: Some(quote! {
158 #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
159 }),
160 },
161 (),
162 )
163 }
164}
165
166#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
168pub enum LocationType {
169 Process,
171 Cluster,
173 External,
175}
176
177#[expect(
191 private_bounds,
192 reason = "only internal Hydro code can define location types"
193)]
194pub trait Location<'a>: DynLocation {
195 type Root: Location<'a>;
200
201 type NoConsistency: Location<'a, NoConsistency = Self::NoConsistency>;
203
204 fn root(&self) -> Self::Root;
209
210 fn drop_consistency(&self) -> Self::NoConsistency;
212 fn consistency() -> Option<ClusterConsistency>;
214
215 fn with_consistency_of<L2: Location<'a, NoConsistency = Self::NoConsistency>>(&self) -> L2 {
217 L2::make_from_nondet(self.drop_consistency())
218 }
219
220 #[doc(hidden)]
221 fn make_from_nondet(l2: Self::NoConsistency) -> Self;
222
223 fn try_tick(&self) -> Option<Tick<Self>> {
230 if Self::is_top_level() {
231 let id = self.flow_state().borrow_mut().next_clock_id();
232 Some(Tick {
233 id,
234 l: self.clone(),
235 })
236 } else {
237 None
238 }
239 }
240
241 fn id(&self) -> LocationId {
243 DynLocation::dyn_id(self)
244 }
245
246 fn tick(&self) -> Tick<Self> {
272 if let LocationId::Tick(_, _) = self.id() {
273 panic!("cannot create nested ticks");
274 }
275
276 let id = self.flow_state().borrow_mut().next_clock_id();
277 Tick {
278 id,
279 l: self.clone(),
280 }
281 }
282
283 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
308 where
309 Self: Sized,
310 {
311 if let LocationId::Tick(_, _) = self.id() {
312 panic!("cannot call spin() inside a tick");
313 }
314
315 Stream::new(
316 self.clone(),
317 HydroNode::Source {
318 source: HydroSource::Spin(),
319 metadata: self.new_node_metadata(Stream::<
320 (),
321 Self,
322 Unbounded,
323 TotalOrder,
324 ExactlyOnce,
325 >::collection_kind()),
326 },
327 )
328 }
329
330 fn source_stream<T, E>(
351 &self,
352 e: impl QuotedWithContext<'a, E, Self>,
353 ) -> Stream<T, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>
354 where
355 E: FuturesStream<Item = T> + Unpin,
356 Self: Sized,
357 {
358 if let LocationId::Tick(_, _) = self.id() {
359 panic!("cannot call source_stream() inside a tick");
360 }
361
362 let e = e.splice_untyped_ctx(self);
363
364 let target_location = self.drop_consistency();
365 Stream::new(
366 target_location.clone(),
367 HydroNode::Source {
368 source: HydroSource::Stream(e.into()),
369 metadata: target_location.new_node_metadata(Stream::<
370 T,
371 Self::NoConsistency,
372 Unbounded,
373 TotalOrder,
374 ExactlyOnce,
375 >::collection_kind()),
376 },
377 )
378 }
379
380 fn source_iter<T, E>(
402 &self,
403 e: impl QuotedWithContext<'a, E, Self>,
404 ) -> Stream<T, Self::NoConsistency, Bounded, TotalOrder, ExactlyOnce>
405 where
406 E: IntoIterator<Item = T>,
407 Self: Sized,
408 {
409 let e = e.splice_typed_ctx(self);
410
411 let target_location = self.drop_consistency();
412 Stream::new(
413 target_location.clone(),
414 HydroNode::Source {
415 source: HydroSource::Iter(e.into()),
416 metadata: target_location.new_node_metadata(Stream::<
417 T,
418 Self::NoConsistency,
419 Bounded,
420 TotalOrder,
421 ExactlyOnce,
422 >::collection_kind()),
423 },
424 )
425 }
426
427 #[deprecated(note = "use .source_cluster_membership_stream(...) instead")]
428 fn source_cluster_members<C: 'a>(
467 &self,
468 cluster: &Cluster<'a, C>,
469 nondet_start: NonDet,
470 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self::NoConsistency, Unbounded>
471 where
472 Self: Sized,
473 {
474 self.source_cluster_membership_stream(cluster, nondet_start)
475 }
476
477 fn source_cluster_membership_stream<C: 'a>(
516 &self,
517 cluster: &Cluster<'a, C>,
518 _nondet_start: NonDet,
519 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self::NoConsistency, Unbounded>
520 where
521 Self: Sized,
522 {
523 if let LocationId::Tick(_, _) = self.id() {
524 panic!("cannot call source_cluster_membership_stream() inside a tick");
525 }
526
527 let target_consistency = self.drop_consistency();
528 Stream::new(
529 target_consistency.clone(),
530 HydroNode::Source {
531 source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
532 metadata: target_consistency.new_node_metadata(Stream::<
533 (TaglessMemberId, MembershipEvent),
534 Self,
535 Unbounded,
536 TotalOrder,
537 ExactlyOnce,
538 >::collection_kind(
539 )),
540 },
541 )
542 .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
543 .into_keyed()
544 }
545
546 fn source_external_bytes<L>(
554 &self,
555 from: &External<L>,
556 ) -> (
557 ExternalBytesPort,
558 Stream<BytesMut, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>,
559 )
560 where
561 Self: Sized,
562 {
563 if let LocationId::Tick(_, _) = self.id() {
564 panic!("cannot call source_external_bytes() inside a tick");
565 }
566
567 let target_consistency = self.drop_consistency();
568 let (port, stream, sink) = target_consistency
569 .bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
570
571 sink.complete(target_consistency.source_iter(q!([])));
572
573 (port, stream)
574 }
575
576 #[expect(clippy::type_complexity, reason = "stream markers")]
583 fn source_external_bincode<L, T, O: Ordering, R: Retries>(
584 &self,
585 from: &External<L>,
586 ) -> (
587 ExternalBincodeSink<T, NotMany, O, R>,
588 Stream<T, Self::NoConsistency, Unbounded, O, R>,
589 )
590 where
591 Self: Sized,
592 T: Serialize + DeserializeOwned,
593 {
594 if let LocationId::Tick(_, _) = self.id() {
595 panic!("cannot call source_external_bincode() inside a tick");
596 }
597
598 let target_consistency = self.drop_consistency();
599 let (port, stream, sink) = target_consistency.bind_single_client_bincode::<_, T, ()>(from);
600 sink.complete(target_consistency.source_iter(q!([])));
601
602 (
603 ExternalBincodeSink {
604 process_key: from.key,
605 port_id: port.port_id,
606 _phantom: PhantomData,
607 },
608 stream.weaken_ordering().weaken_retries(),
609 )
610 }
611
612 #[cfg(feature = "sim")]
617 #[expect(clippy::type_complexity, reason = "stream markers")]
618 fn sim_input<T, O: Ordering, R: Retries>(
619 &self,
620 ) -> (
621 SimSender<T, O, R>,
622 Stream<T, Self::NoConsistency, Unbounded, O, R>,
623 )
624 where
625 Self: Sized,
626 T: Serialize + DeserializeOwned,
627 {
628 if let LocationId::Tick(_, _) = self.id() {
629 panic!("cannot call sim_input() inside a tick");
630 }
631
632 let external_location: External<'a, ()> = External {
633 key: LocationKey::FIRST,
634 flow_state: self.flow_state().clone(),
635 _phantom: PhantomData,
636 };
637
638 let (external, stream) = self.source_external_bincode(&external_location);
639
640 (SimSender(external.port_id, PhantomData), stream)
641 }
642
643 fn embedded_input<T>(
649 &self,
650 name: impl Into<String>,
651 ) -> Stream<T, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>
652 where
653 Self: Sized,
654 {
655 if let LocationId::Tick(_, _) = self.id() {
656 panic!("cannot call embedded_input() inside a tick");
657 }
658
659 let ident = syn::Ident::new(&name.into(), Span::call_site());
660
661 let target_location = self.drop_consistency();
662 Stream::new(
663 target_location.clone(),
664 HydroNode::Source {
665 source: HydroSource::Embedded(ident),
666 metadata: target_location.new_node_metadata(Stream::<
667 T,
668 Self,
669 Unbounded,
670 TotalOrder,
671 ExactlyOnce,
672 >::collection_kind()),
673 },
674 )
675 }
676
677 fn embedded_singleton_input<T>(
683 &self,
684 name: impl Into<String>,
685 ) -> Singleton<T, Self::NoConsistency, Bounded>
686 where
687 Self: Sized,
688 {
689 if let LocationId::Tick(_, _) = self.id() {
690 panic!("cannot call embedded_singleton_input() inside a tick");
691 }
692
693 let ident = syn::Ident::new(&name.into(), Span::call_site());
694
695 let target_location = self.drop_consistency();
696 Singleton::new(
697 target_location.clone(),
698 HydroNode::Source {
699 source: HydroSource::EmbeddedSingleton(ident),
700 metadata: target_location
701 .new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
702 },
703 )
704 }
705
706 #[expect(clippy::type_complexity, reason = "stream markers")]
751 fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
752 &self,
753 from: &External<L>,
754 port_hint: NetworkHint,
755 ) -> (
756 ExternalBytesPort<NotMany>,
757 Stream<<Codec as Decoder>::Item, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>,
758 ForwardHandle<'a, Stream<T, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>>,
759 )
760 where
761 Self: Sized,
762 {
763 if let LocationId::Tick(_, _) = self.id() {
764 panic!("cannot call bind_single_client() inside a tick");
765 }
766
767 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
768 let target_consistency = self.drop_consistency();
769
770 let (fwd_ref, to_sink) =
771 target_consistency
772 .forward_ref::<Stream<T, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>>(
773 );
774 let mut flow_state_borrow = self.flow_state().borrow_mut();
775
776 flow_state_borrow.push_root(HydroRoot::SendExternal {
777 to_external_key: from.key,
778 to_port_id: next_external_port_id,
779 to_many: false,
780 unpaired: false,
781 serialize_fn: None,
782 instantiate_fn: DebugInstantiate::Building,
783 input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
784 op_metadata: HydroIrOpMetadata::new(),
785 });
786
787 let raw_stream: Stream<
788 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
789 Self::NoConsistency,
790 Unbounded,
791 TotalOrder,
792 ExactlyOnce,
793 > = Stream::new(
794 target_consistency.clone(),
795 HydroNode::ExternalInput {
796 from_external_key: from.key,
797 from_port_id: next_external_port_id,
798 from_many: false,
799 codec_type: quote_type::<Codec>().into(),
800 port_hint,
801 instantiate_fn: DebugInstantiate::Building,
802 deserialize_fn: None,
803 metadata: target_consistency.new_node_metadata(Stream::<
804 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
805 Self::NoConsistency,
806 Unbounded,
807 TotalOrder,
808 ExactlyOnce,
809 >::collection_kind(
810 )),
811 },
812 );
813
814 (
815 ExternalBytesPort {
816 process_key: from.key,
817 port_id: next_external_port_id,
818 _phantom: PhantomData,
819 },
820 raw_stream.flatten_ordered(),
821 fwd_ref,
822 )
823 }
824
825 #[expect(clippy::type_complexity, reason = "stream markers")]
835 fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
836 &self,
837 from: &External<L>,
838 ) -> (
839 ExternalBincodeBidi<InT, OutT, NotMany>,
840 Stream<InT, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>,
841 ForwardHandle<'a, Stream<OutT, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>>,
842 )
843 where
844 Self: Sized,
845 {
846 if let LocationId::Tick(_, _) = self.id() {
847 panic!("cannot call bind_single_client_bincode() inside a tick");
848 }
849
850 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
851
852 let target_consistency = self.drop_consistency();
853 let (fwd_ref, to_sink) = target_consistency.forward_ref::<Stream<
854 OutT,
855 Self::NoConsistency,
856 Unbounded,
857 TotalOrder,
858 ExactlyOnce,
859 >>();
860 let mut flow_state_borrow = self.flow_state().borrow_mut();
861
862 let root = get_this_crate();
863
864 let out_t_type = quote_type::<OutT>();
865 let ser_fn: syn::Expr = syn::parse_quote! {
866 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
867 |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
868 )
869 };
870
871 flow_state_borrow.push_root(HydroRoot::SendExternal {
872 to_external_key: from.key,
873 to_port_id: next_external_port_id,
874 to_many: false,
875 unpaired: false,
876 serialize_fn: Some(ser_fn.into()),
877 instantiate_fn: DebugInstantiate::Building,
878 input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
879 op_metadata: HydroIrOpMetadata::new(),
880 });
881
882 let in_t_type = quote_type::<InT>();
883
884 let deser_fn: syn::Expr = syn::parse_quote! {
885 |res| {
886 let b = res.unwrap();
887 #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
888 }
889 };
890
891 let raw_stream: Stream<InT, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce> =
892 Stream::new(
893 target_consistency.clone(),
894 HydroNode::ExternalInput {
895 from_external_key: from.key,
896 from_port_id: next_external_port_id,
897 from_many: false,
898 codec_type: quote_type::<LengthDelimitedCodec>().into(),
899 port_hint: NetworkHint::Auto,
900 instantiate_fn: DebugInstantiate::Building,
901 deserialize_fn: Some(deser_fn.into()),
902 metadata: target_consistency.new_node_metadata(Stream::<
903 InT,
904 Self::NoConsistency,
905 Unbounded,
906 TotalOrder,
907 ExactlyOnce,
908 >::collection_kind(
909 )),
910 },
911 );
912
913 (
914 ExternalBincodeBidi {
915 process_key: from.key,
916 port_id: next_external_port_id,
917 _phantom: PhantomData,
918 },
919 raw_stream,
920 fwd_ref,
921 )
922 }
923
924 #[expect(clippy::type_complexity, reason = "stream markers")]
936 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
937 &self,
938 from: &External<L>,
939 port_hint: NetworkHint,
940 ) -> (
941 ExternalBytesPort<Many>,
942 KeyedStream<
943 u64,
944 <Codec as Decoder>::Item,
945 Self::NoConsistency,
946 Unbounded,
947 TotalOrder,
948 ExactlyOnce,
949 >,
950 KeyedStream<u64, MembershipEvent, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>,
951 ForwardHandle<
952 'a,
953 KeyedStream<u64, T, Self::NoConsistency, Unbounded, NoOrder, ExactlyOnce>,
954 >,
955 )
956 where
957 Self: Sized,
958 {
959 if let LocationId::Tick(_, _) = self.id() {
960 panic!("cannot call bidi_external_many_bytes() inside a tick");
961 }
962
963 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
964
965 let target_consistency = self.drop_consistency();
966 let (fwd_ref, to_sink) = target_consistency.forward_ref::<KeyedStream<
967 u64,
968 T,
969 Self::NoConsistency,
970 Unbounded,
971 NoOrder,
972 ExactlyOnce,
973 >>();
974 let mut flow_state_borrow = self.flow_state().borrow_mut();
975
976 flow_state_borrow.push_root(HydroRoot::SendExternal {
977 to_external_key: from.key,
978 to_port_id: next_external_port_id,
979 to_many: true,
980 unpaired: false,
981 serialize_fn: None,
982 instantiate_fn: DebugInstantiate::Building,
983 input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
984 op_metadata: HydroIrOpMetadata::new(),
985 });
986
987 let raw_stream: Stream<
988 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
989 Self::NoConsistency,
990 Unbounded,
991 TotalOrder,
992 ExactlyOnce,
993 > = Stream::new(
994 target_consistency.clone(),
995 HydroNode::ExternalInput {
996 from_external_key: from.key,
997 from_port_id: next_external_port_id,
998 from_many: true,
999 codec_type: quote_type::<Codec>().into(),
1000 port_hint,
1001 instantiate_fn: DebugInstantiate::Building,
1002 deserialize_fn: None,
1003 metadata: target_consistency.new_node_metadata(Stream::<
1004 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
1005 Self::NoConsistency,
1006 Unbounded,
1007 TotalOrder,
1008 ExactlyOnce,
1009 >::collection_kind(
1010 )),
1011 },
1012 );
1013
1014 let membership_stream_ident = syn::Ident::new(
1015 &format!(
1016 "__hydro_deploy_many_{}_{}_membership",
1017 from.key, next_external_port_id
1018 ),
1019 Span::call_site(),
1020 );
1021 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
1022 let raw_membership_stream: KeyedStream<
1023 u64,
1024 bool,
1025 Self::NoConsistency,
1026 Unbounded,
1027 TotalOrder,
1028 ExactlyOnce,
1029 > = KeyedStream::new(
1030 target_consistency.clone(),
1031 HydroNode::Source {
1032 source: HydroSource::Stream(membership_stream_expr.into()),
1033 metadata: target_consistency.new_node_metadata(KeyedStream::<
1034 u64,
1035 bool,
1036 Self::NoConsistency,
1037 Unbounded,
1038 TotalOrder,
1039 ExactlyOnce,
1040 >::collection_kind(
1041 )),
1042 },
1043 );
1044
1045 (
1046 ExternalBytesPort {
1047 process_key: from.key,
1048 port_id: next_external_port_id,
1049 _phantom: PhantomData,
1050 },
1051 raw_stream
1052 .flatten_ordered() .into_keyed(),
1054 raw_membership_stream.map(q!(|join| {
1055 if join {
1056 MembershipEvent::Joined
1057 } else {
1058 MembershipEvent::Left
1059 }
1060 })),
1061 fwd_ref,
1062 )
1063 }
1064
1065 #[expect(clippy::type_complexity, reason = "stream markers")]
1081 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
1082 &self,
1083 from: &External<L>,
1084 ) -> (
1085 ExternalBincodeBidi<InT, OutT, Many>,
1086 KeyedStream<u64, InT, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>,
1087 KeyedStream<u64, MembershipEvent, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>,
1088 ForwardHandle<
1089 'a,
1090 KeyedStream<u64, OutT, Self::NoConsistency, Unbounded, NoOrder, ExactlyOnce>,
1091 >,
1092 )
1093 where
1094 Self: Sized,
1095 {
1096 if let LocationId::Tick(_, _) = self.id() {
1097 panic!("cannot call bidi_external_many_bincode() inside a tick");
1098 }
1099
1100 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
1101
1102 let target_consistency = self.drop_consistency();
1103 let (fwd_ref, to_sink) = target_consistency.forward_ref::<KeyedStream<
1104 u64,
1105 OutT,
1106 Self::NoConsistency,
1107 Unbounded,
1108 NoOrder,
1109 ExactlyOnce,
1110 >>();
1111 let mut flow_state_borrow = self.flow_state().borrow_mut();
1112
1113 let root = get_this_crate();
1114
1115 let out_t_type = quote_type::<OutT>();
1116 let ser_fn: syn::Expr = syn::parse_quote! {
1117 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
1118 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
1119 )
1120 };
1121
1122 flow_state_borrow.push_root(HydroRoot::SendExternal {
1123 to_external_key: from.key,
1124 to_port_id: next_external_port_id,
1125 to_many: true,
1126 unpaired: false,
1127 serialize_fn: Some(ser_fn.into()),
1128 instantiate_fn: DebugInstantiate::Building,
1129 input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
1130 op_metadata: HydroIrOpMetadata::new(),
1131 });
1132
1133 let in_t_type = quote_type::<InT>();
1134
1135 let deser_fn: syn::Expr = syn::parse_quote! {
1136 |res| {
1137 let (id, b) = res.unwrap();
1138 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
1139 }
1140 };
1141
1142 let raw_stream: KeyedStream<
1143 u64,
1144 InT,
1145 Self::NoConsistency,
1146 Unbounded,
1147 TotalOrder,
1148 ExactlyOnce,
1149 > = KeyedStream::new(
1150 target_consistency.clone(),
1151 HydroNode::ExternalInput {
1152 from_external_key: from.key,
1153 from_port_id: next_external_port_id,
1154 from_many: true,
1155 codec_type: quote_type::<LengthDelimitedCodec>().into(),
1156 port_hint: NetworkHint::Auto,
1157 instantiate_fn: DebugInstantiate::Building,
1158 deserialize_fn: Some(deser_fn.into()),
1159 metadata: target_consistency.new_node_metadata(KeyedStream::<
1160 u64,
1161 InT,
1162 Self::NoConsistency,
1163 Unbounded,
1164 TotalOrder,
1165 ExactlyOnce,
1166 >::collection_kind(
1167 )),
1168 },
1169 );
1170
1171 let membership_stream_ident = syn::Ident::new(
1172 &format!(
1173 "__hydro_deploy_many_{}_{}_membership",
1174 from.key, next_external_port_id
1175 ),
1176 Span::call_site(),
1177 );
1178 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
1179 let raw_membership_stream: KeyedStream<
1180 u64,
1181 bool,
1182 Self::NoConsistency,
1183 Unbounded,
1184 TotalOrder,
1185 ExactlyOnce,
1186 > = KeyedStream::new(
1187 target_consistency.clone(),
1188 HydroNode::Source {
1189 source: HydroSource::Stream(membership_stream_expr.into()),
1190 metadata: target_consistency.new_node_metadata(KeyedStream::<
1191 u64,
1192 bool,
1193 Self::NoConsistency,
1194 Unbounded,
1195 TotalOrder,
1196 ExactlyOnce,
1197 >::collection_kind(
1198 )),
1199 },
1200 );
1201
1202 (
1203 ExternalBincodeBidi {
1204 process_key: from.key,
1205 port_id: next_external_port_id,
1206 _phantom: PhantomData,
1207 },
1208 raw_stream,
1209 raw_membership_stream.map(q!(|join| {
1210 if join {
1211 MembershipEvent::Joined
1212 } else {
1213 MembershipEvent::Left
1214 }
1215 })),
1216 fwd_ref,
1217 )
1218 }
1219
1220 fn singleton<T>(
1240 &self,
1241 e: impl QuotedWithContext<'a, T, Self>,
1242 ) -> Singleton<T, Self::NoConsistency, Bounded>
1243 where
1244 Self: Sized,
1245 {
1246 let e = e.splice_untyped_ctx(self);
1247
1248 let target_location = self.drop_consistency();
1249 Singleton::new(
1250 target_location.clone(),
1251 HydroNode::SingletonSource {
1252 value: e.into(),
1253 first_tick_only: false,
1254 metadata: target_location.new_node_metadata(Singleton::<
1255 T,
1256 Self::NoConsistency,
1257 Bounded,
1258 >::collection_kind()),
1259 },
1260 )
1261 }
1262
1263 fn singleton_future<F>(
1286 &self,
1287 e: impl QuotedWithContext<'a, F, Self>,
1288 ) -> Singleton<F::Output, Self::NoConsistency, Bounded>
1289 where
1290 F: Future,
1291 Self: Sized,
1292 {
1293 self.singleton(e).resolve_future_blocking()
1294 }
1295
1296 fn source_interval(
1306 &self,
1307 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1308 _nondet: NonDet,
1309 ) -> Stream<tokio::time::Instant, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>
1310 where
1311 Self: Sized,
1312 {
1313 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1314 tokio::time::interval(interval)
1315 )))
1316 }
1317
1318 fn source_interval_delayed(
1329 &self,
1330 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1331 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1332 _nondet: NonDet,
1333 ) -> Stream<tokio::time::Instant, Self::NoConsistency, Unbounded, TotalOrder, ExactlyOnce>
1334 where
1335 Self: Sized,
1336 {
1337 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1338 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
1339 )))
1340 }
1341
1342 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1382 where
1383 S: CycleCollection<'a, ForwardRef, Location = Self>,
1384 {
1385 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1386 (
1387 ForwardHandle::new(cycle_id, Location::id(self)),
1388 S::create_source(cycle_id, self.clone()),
1389 )
1390 }
1391}
1392
1393#[cfg(feature = "deploy")]
1394#[cfg(test)]
1395mod tests {
1396 use std::collections::HashSet;
1397
1398 use futures::{SinkExt, StreamExt};
1399 use hydro_deploy::Deployment;
1400 use stageleft::q;
1401 use tokio_util::codec::LengthDelimitedCodec;
1402
1403 use crate::compile::builder::FlowBuilder;
1404 use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1405 use crate::location::{Location, NetworkHint};
1406 use crate::nondet::nondet;
1407
1408 #[tokio::test]
1409 async fn top_level_singleton_replay_cardinality() {
1410 let mut deployment = Deployment::new();
1411
1412 let mut flow = FlowBuilder::new();
1413 let node = flow.process::<()>();
1414 let external = flow.external::<()>();
1415
1416 let (in_port, input) =
1417 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1418 let singleton = node.singleton(q!(123));
1419 let tick = node.tick();
1420 let out = input
1421 .batch(&tick, nondet!())
1422 .cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
1423 .cross_singleton(
1424 singleton
1425 .snapshot(&tick, nondet!())
1426 .into_stream()
1427 .count(),
1428 )
1429 .all_ticks()
1430 .send_bincode_external(&external);
1431
1432 let nodes = flow
1433 .with_process(&node, deployment.Localhost())
1434 .with_external(&external, deployment.Localhost())
1435 .deploy(&mut deployment);
1436
1437 deployment.deploy().await.unwrap();
1438
1439 let mut external_in = nodes.connect(in_port).await;
1440 let mut external_out = nodes.connect(out).await;
1441
1442 deployment.start().await.unwrap();
1443
1444 external_in.send(1).await.unwrap();
1445 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1446
1447 external_in.send(2).await.unwrap();
1448 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1449 }
1450
1451 #[tokio::test]
1452 async fn tick_singleton_replay_cardinality() {
1453 let mut deployment = Deployment::new();
1454
1455 let mut flow = FlowBuilder::new();
1456 let node = flow.process::<()>();
1457 let external = flow.external::<()>();
1458
1459 let (in_port, input) =
1460 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1461 let tick = node.tick();
1462 let singleton = tick.singleton(q!(123));
1463 let out = input
1464 .batch(&tick, nondet!())
1465 .cross_singleton(singleton.clone())
1466 .cross_singleton(singleton.into_stream().count())
1467 .all_ticks()
1468 .send_bincode_external(&external);
1469
1470 let nodes = flow
1471 .with_process(&node, deployment.Localhost())
1472 .with_external(&external, deployment.Localhost())
1473 .deploy(&mut deployment);
1474
1475 deployment.deploy().await.unwrap();
1476
1477 let mut external_in = nodes.connect(in_port).await;
1478 let mut external_out = nodes.connect(out).await;
1479
1480 deployment.start().await.unwrap();
1481
1482 external_in.send(1).await.unwrap();
1483 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1484
1485 external_in.send(2).await.unwrap();
1486 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1487 }
1488
1489 #[tokio::test]
1490 async fn external_bytes() {
1491 let mut deployment = Deployment::new();
1492
1493 let mut flow = FlowBuilder::new();
1494 let first_node = flow.process::<()>();
1495 let external = flow.external::<()>();
1496
1497 let (in_port, input) = first_node.source_external_bytes(&external);
1498 let out = input.send_bincode_external(&external);
1499
1500 let nodes = flow
1501 .with_process(&first_node, deployment.Localhost())
1502 .with_external(&external, deployment.Localhost())
1503 .deploy(&mut deployment);
1504
1505 deployment.deploy().await.unwrap();
1506
1507 let mut external_in = nodes.connect(in_port).await.1;
1508 let mut external_out = nodes.connect(out).await;
1509
1510 deployment.start().await.unwrap();
1511
1512 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1513
1514 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1515 }
1516
1517 #[tokio::test]
1518 async fn multi_external_source() {
1519 let mut deployment = Deployment::new();
1520
1521 let mut flow = FlowBuilder::new();
1522 let first_node = flow.process::<()>();
1523 let external = flow.external::<()>();
1524
1525 let (in_port, input, _membership, complete_sink) =
1526 first_node.bidi_external_many_bincode(&external);
1527 let out = input.entries().send_bincode_external(&external);
1528 complete_sink.complete(
1529 first_node
1530 .source_iter::<(u64, ()), _>(q!([]))
1531 .into_keyed()
1532 .weaken_ordering(),
1533 );
1534
1535 let nodes = flow
1536 .with_process(&first_node, deployment.Localhost())
1537 .with_external(&external, deployment.Localhost())
1538 .deploy(&mut deployment);
1539
1540 deployment.deploy().await.unwrap();
1541
1542 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1543 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1544 let external_out = nodes.connect(out).await;
1545
1546 deployment.start().await.unwrap();
1547
1548 external_in_1.send(123).await.unwrap();
1549 external_in_2.send(456).await.unwrap();
1550
1551 assert_eq!(
1552 external_out.take(2).collect::<HashSet<_>>().await,
1553 vec![(0, 123), (1, 456)].into_iter().collect()
1554 );
1555 }
1556
1557 #[tokio::test]
1558 async fn second_connection_only_multi_source() {
1559 let mut deployment = Deployment::new();
1560
1561 let mut flow = FlowBuilder::new();
1562 let first_node = flow.process::<()>();
1563 let external = flow.external::<()>();
1564
1565 let (in_port, input, _membership, complete_sink) =
1566 first_node.bidi_external_many_bincode(&external);
1567 let out = input.entries().send_bincode_external(&external);
1568 complete_sink.complete(
1569 first_node
1570 .source_iter::<(u64, ()), _>(q!([]))
1571 .into_keyed()
1572 .weaken_ordering(),
1573 );
1574
1575 let nodes = flow
1576 .with_process(&first_node, deployment.Localhost())
1577 .with_external(&external, deployment.Localhost())
1578 .deploy(&mut deployment);
1579
1580 deployment.deploy().await.unwrap();
1581
1582 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1584 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1585 let mut external_out = nodes.connect(out).await;
1586
1587 deployment.start().await.unwrap();
1588
1589 external_in_2.send(456).await.unwrap();
1590
1591 assert_eq!(external_out.next().await.unwrap(), (1, 456));
1592 }
1593
1594 #[tokio::test]
1595 async fn multi_external_bytes() {
1596 let mut deployment = Deployment::new();
1597
1598 let mut flow = FlowBuilder::new();
1599 let first_node = flow.process::<()>();
1600 let external = flow.external::<()>();
1601
1602 let (in_port, input, _membership, complete_sink) = first_node
1603 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1604 let out = input.entries().send_bincode_external(&external);
1605 complete_sink.complete(
1606 first_node
1607 .source_iter(q!([]))
1608 .into_keyed()
1609 .weaken_ordering(),
1610 );
1611
1612 let nodes = flow
1613 .with_process(&first_node, deployment.Localhost())
1614 .with_external(&external, deployment.Localhost())
1615 .deploy(&mut deployment);
1616
1617 deployment.deploy().await.unwrap();
1618
1619 let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1620 let mut external_in_2 = nodes.connect(in_port).await.1;
1621 let external_out = nodes.connect(out).await;
1622
1623 deployment.start().await.unwrap();
1624
1625 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1626 external_in_2.send(vec![4, 5].into()).await.unwrap();
1627
1628 assert_eq!(
1629 external_out.take(2).collect::<HashSet<_>>().await,
1630 vec![
1631 (0, (&[1u8, 2, 3] as &[u8]).into()),
1632 (1, (&[4u8, 5] as &[u8]).into())
1633 ]
1634 .into_iter()
1635 .collect()
1636 );
1637 }
1638
1639 #[tokio::test]
1640 async fn single_client_external_bytes() {
1641 let mut deployment = Deployment::new();
1642 let mut flow = FlowBuilder::new();
1643 let first_node = flow.process::<()>();
1644 let external = flow.external::<()>();
1645 let (port, input, complete_sink) = first_node
1646 .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1647 complete_sink.complete(input.map(q!(|data| {
1648 let mut resp: Vec<u8> = data.into();
1649 resp.push(42);
1650 resp.into() })));
1652
1653 let nodes = flow
1654 .with_process(&first_node, deployment.Localhost())
1655 .with_external(&external, deployment.Localhost())
1656 .deploy(&mut deployment);
1657
1658 deployment.deploy().await.unwrap();
1659 deployment.start().await.unwrap();
1660
1661 let (mut external_out, mut external_in) = nodes.connect(port).await;
1662
1663 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1664 assert_eq!(
1665 external_out.next().await.unwrap().unwrap(),
1666 vec![1, 2, 3, 42]
1667 );
1668 }
1669
1670 #[tokio::test]
1671 async fn echo_external_bytes() {
1672 let mut deployment = Deployment::new();
1673
1674 let mut flow = FlowBuilder::new();
1675 let first_node = flow.process::<()>();
1676 let external = flow.external::<()>();
1677
1678 let (port, input, _membership, complete_sink) = first_node
1679 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1680 complete_sink
1681 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1682
1683 let nodes = flow
1684 .with_process(&first_node, deployment.Localhost())
1685 .with_external(&external, deployment.Localhost())
1686 .deploy(&mut deployment);
1687
1688 deployment.deploy().await.unwrap();
1689
1690 let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1691 let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1692
1693 deployment.start().await.unwrap();
1694
1695 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1696 external_in_2.send(vec![4, 5].into()).await.unwrap();
1697
1698 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1699 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1700 }
1701
1702 #[tokio::test]
1703 async fn echo_external_bincode() {
1704 let mut deployment = Deployment::new();
1705
1706 let mut flow = FlowBuilder::new();
1707 let first_node = flow.process::<()>();
1708 let external = flow.external::<()>();
1709
1710 let (port, input, _membership, complete_sink) =
1711 first_node.bidi_external_many_bincode(&external);
1712 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1713
1714 let nodes = flow
1715 .with_process(&first_node, deployment.Localhost())
1716 .with_external(&external, deployment.Localhost())
1717 .deploy(&mut deployment);
1718
1719 deployment.deploy().await.unwrap();
1720
1721 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1722 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1723
1724 deployment.start().await.unwrap();
1725
1726 external_in_1.send("hi".to_owned()).await.unwrap();
1727 external_in_2.send("hello".to_owned()).await.unwrap();
1728
1729 assert_eq!(external_out_1.next().await.unwrap(), "HI");
1730 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1731 }
1732
1733 #[tokio::test]
1734 async fn closure_location_name() {
1735 let mut deployment = Deployment::new();
1736 let mut flow = FlowBuilder::new();
1737
1738 enum ClosureProcess {}
1739
1740 let node = flow.process::<ClosureProcess>();
1741 let external = flow.external::<()>();
1742
1743 let (in_port, input) =
1744 node.source_external_bincode::<_, i32, TotalOrder, ExactlyOnce>(&external);
1745 let out = input.send_bincode_external(&external);
1746
1747 let nodes = flow
1748 .with_process(&node, deployment.Localhost())
1749 .with_external(&external, deployment.Localhost())
1750 .deploy(&mut deployment);
1751
1752 deployment.deploy().await.unwrap();
1753
1754 let mut external_in = nodes.connect(in_port).await;
1755 let mut external_out = nodes.connect(out).await;
1756
1757 deployment.start().await.unwrap();
1758
1759 external_in.send(42).await.unwrap();
1760 assert_eq!(external_out.next().await.unwrap(), 42);
1761 }
1762}