1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31use crate::location::dynamic::LocationId;
32use crate::location::{LocationKey, NetworkHint};
33
34pub mod backtrace;
35use backtrace::Backtrace;
36
37#[derive(Clone, Hash)]
41pub struct DebugExpr(pub Box<syn::Expr>);
42
43impl serde::Serialize for DebugExpr {
44 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
45 serializer.serialize_str(&self.to_string())
46 }
47}
48
49impl From<syn::Expr> for DebugExpr {
50 fn from(expr: syn::Expr) -> Self {
51 Self(Box::new(expr))
52 }
53}
54
55impl Deref for DebugExpr {
56 type Target = syn::Expr;
57
58 fn deref(&self) -> &Self::Target {
59 &self.0
60 }
61}
62
63impl ToTokens for DebugExpr {
64 fn to_tokens(&self, tokens: &mut TokenStream) {
65 self.0.to_tokens(tokens);
66 }
67}
68
69impl Debug for DebugExpr {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 write!(f, "{}", self.0.to_token_stream())
72 }
73}
74
75impl Display for DebugExpr {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 let original = self.0.as_ref().clone();
78 let simplified = simplify_q_macro(original);
79
80 write!(f, "q!({})", quote::quote!(#simplified))
83 }
84}
85
86fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
88 let mut simplifier = QMacroSimplifier::new();
91 simplifier.visit_expr_mut(&mut expr);
92
93 if let Some(simplified) = simplifier.simplified_result {
95 simplified
96 } else {
97 expr
98 }
99}
100
101#[derive(Default)]
103pub struct QMacroSimplifier {
104 pub simplified_result: Option<syn::Expr>,
105}
106
107impl QMacroSimplifier {
108 pub fn new() -> Self {
109 Self::default()
110 }
111}
112
113impl VisitMut for QMacroSimplifier {
114 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
115 if self.simplified_result.is_some() {
117 return;
118 }
119
120 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
121 && self.is_stageleft_runtime_support_call(&path_expr.path)
123 && let Some(closure) = self.extract_closure_from_args(&call.args)
125 {
126 self.simplified_result = Some(closure);
127 return;
128 }
129
130 syn::visit_mut::visit_expr_mut(self, expr);
133 }
134}
135
136impl QMacroSimplifier {
137 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
138 if let Some(last_segment) = path.segments.last() {
140 let fn_name = last_segment.ident.to_string();
141 fn_name.contains("_type_hint")
143 && path.segments.len() > 2
144 && path.segments[0].ident == "stageleft"
145 && path.segments[1].ident == "runtime_support"
146 } else {
147 false
148 }
149 }
150
151 fn extract_closure_from_args(
152 &self,
153 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
154 ) -> Option<syn::Expr> {
155 for arg in args {
157 if let syn::Expr::Closure(_) = arg {
158 return Some(arg.clone());
159 }
160 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
162 return Some(closure_expr);
163 }
164 }
165 None
166 }
167
168 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
169 let mut visitor = ClosureFinder {
170 found_closure: None,
171 prefer_inner_blocks: true,
172 };
173 visitor.visit_expr(expr);
174 visitor.found_closure
175 }
176}
177
178struct ClosureFinder {
180 found_closure: Option<syn::Expr>,
181 prefer_inner_blocks: bool,
182}
183
184impl<'ast> Visit<'ast> for ClosureFinder {
185 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
186 if self.found_closure.is_some() {
188 return;
189 }
190
191 match expr {
192 syn::Expr::Closure(_) => {
193 self.found_closure = Some(expr.clone());
194 }
195 syn::Expr::Block(block) if self.prefer_inner_blocks => {
196 for stmt in &block.block.stmts {
198 if let syn::Stmt::Expr(stmt_expr, _) = stmt
199 && let syn::Expr::Block(_) = stmt_expr
200 {
201 let mut inner_visitor = ClosureFinder {
203 found_closure: None,
204 prefer_inner_blocks: false, };
206 inner_visitor.visit_expr(stmt_expr);
207 if inner_visitor.found_closure.is_some() {
208 self.found_closure = Some(stmt_expr.clone());
210 return;
211 }
212 }
213 }
214
215 visit::visit_expr(self, expr);
217
218 if self.found_closure.is_some() {
221 }
223 }
224 _ => {
225 visit::visit_expr(self, expr);
227 }
228 }
229 }
230}
231
232#[derive(Clone, PartialEq, Eq, Hash)]
236pub struct DebugType(pub Box<syn::Type>);
237
238impl From<syn::Type> for DebugType {
239 fn from(t: syn::Type) -> Self {
240 Self(Box::new(t))
241 }
242}
243
244impl Deref for DebugType {
245 type Target = syn::Type;
246
247 fn deref(&self) -> &Self::Target {
248 &self.0
249 }
250}
251
252impl ToTokens for DebugType {
253 fn to_tokens(&self, tokens: &mut TokenStream) {
254 self.0.to_tokens(tokens);
255 }
256}
257
258impl Debug for DebugType {
259 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260 write!(f, "{}", self.0.to_token_stream())
261 }
262}
263
264impl serde::Serialize for DebugType {
265 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
266 serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
267 }
268}
269
270fn serialize_backtrace_as_span<S: serde::Serializer>(
271 backtrace: &Backtrace,
272 serializer: S,
273) -> Result<S::Ok, S::Error> {
274 match backtrace.format_span() {
275 Some(span) => serializer.serialize_some(&span),
276 None => serializer.serialize_none(),
277 }
278}
279
280fn serialize_ident<S: serde::Serializer>(
281 ident: &syn::Ident,
282 serializer: S,
283) -> Result<S::Ok, S::Error> {
284 serializer.serialize_str(&ident.to_string())
285}
286
287pub enum DebugInstantiate {
288 Building,
289 Finalized(Box<DebugInstantiateFinalized>),
290}
291
292impl serde::Serialize for DebugInstantiate {
293 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
294 match self {
295 DebugInstantiate::Building => {
296 serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
297 }
298 DebugInstantiate::Finalized(_) => {
299 panic!(
300 "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
301 )
302 }
303 }
304 }
305}
306
307#[cfg_attr(
308 not(feature = "build"),
309 expect(
310 dead_code,
311 reason = "sink, source unused without `feature = \"build\"`."
312 )
313)]
314pub struct DebugInstantiateFinalized {
315 sink: syn::Expr,
316 source: syn::Expr,
317 connect_fn: Option<Box<dyn FnOnce()>>,
318}
319
320impl From<DebugInstantiateFinalized> for DebugInstantiate {
321 fn from(f: DebugInstantiateFinalized) -> Self {
322 Self::Finalized(Box::new(f))
323 }
324}
325
326impl Debug for DebugInstantiate {
327 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
328 write!(f, "<network instantiate>")
329 }
330}
331
332impl Hash for DebugInstantiate {
333 fn hash<H: Hasher>(&self, _state: &mut H) {
334 }
336}
337
338impl Clone for DebugInstantiate {
339 fn clone(&self) -> Self {
340 match self {
341 DebugInstantiate::Building => DebugInstantiate::Building,
342 DebugInstantiate::Finalized(_) => {
343 panic!("DebugInstantiate::Finalized should not be cloned")
344 }
345 }
346 }
347}
348
349#[derive(Debug, Hash, Clone, serde::Serialize)]
358pub enum ClusterMembersState {
359 Uninit,
361 Stream(DebugExpr),
364 Tee(LocationId, LocationId),
368}
369
370#[derive(Debug, Hash, Clone, serde::Serialize)]
372pub enum HydroSource {
373 Stream(DebugExpr),
374 ExternalNetwork(),
375 Iter(DebugExpr),
376 Spin(),
377 ClusterMembers(LocationId, ClusterMembersState),
378 Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
379 EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
380}
381
382#[cfg(feature = "build")]
383pub trait DfirBuilder {
389 fn singleton_intermediates(&self) -> bool;
391
392 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
394
395 fn batch(
396 &mut self,
397 in_ident: syn::Ident,
398 in_location: &LocationId,
399 in_kind: &CollectionKind,
400 out_ident: &syn::Ident,
401 out_location: &LocationId,
402 op_meta: &HydroIrOpMetadata,
403 );
404 fn yield_from_tick(
405 &mut self,
406 in_ident: syn::Ident,
407 in_location: &LocationId,
408 in_kind: &CollectionKind,
409 out_ident: &syn::Ident,
410 out_location: &LocationId,
411 );
412
413 fn begin_atomic(
414 &mut self,
415 in_ident: syn::Ident,
416 in_location: &LocationId,
417 in_kind: &CollectionKind,
418 out_ident: &syn::Ident,
419 out_location: &LocationId,
420 op_meta: &HydroIrOpMetadata,
421 );
422 fn end_atomic(
423 &mut self,
424 in_ident: syn::Ident,
425 in_location: &LocationId,
426 in_kind: &CollectionKind,
427 out_ident: &syn::Ident,
428 );
429
430 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
431 fn observe_nondet(
432 &mut self,
433 trusted: bool,
434 location: &LocationId,
435 in_ident: syn::Ident,
436 in_kind: &CollectionKind,
437 out_ident: &syn::Ident,
438 out_kind: &CollectionKind,
439 op_meta: &HydroIrOpMetadata,
440 );
441
442 #[expect(clippy::too_many_arguments, reason = "TODO")]
443 fn create_network(
444 &mut self,
445 from: &LocationId,
446 to: &LocationId,
447 input_ident: syn::Ident,
448 out_ident: &syn::Ident,
449 serialize: Option<&DebugExpr>,
450 sink: syn::Expr,
451 source: syn::Expr,
452 deserialize: Option<&DebugExpr>,
453 tag_id: usize,
454 networking_info: &crate::networking::NetworkingInfo,
455 );
456
457 fn create_external_source(
458 &mut self,
459 on: &LocationId,
460 source_expr: syn::Expr,
461 out_ident: &syn::Ident,
462 deserialize: Option<&DebugExpr>,
463 tag_id: usize,
464 );
465
466 fn create_external_output(
467 &mut self,
468 on: &LocationId,
469 sink_expr: syn::Expr,
470 input_ident: &syn::Ident,
471 serialize: Option<&DebugExpr>,
472 tag_id: usize,
473 );
474
475 fn assert_is_consistent(
476 &mut self,
477 location: &LocationId,
478 in_ident: syn::Ident,
479 out_ident: &syn::Ident,
480 );
481}
482
483#[cfg(feature = "build")]
484impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
485 fn singleton_intermediates(&self) -> bool {
486 false
487 }
488
489 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
490 self.entry(location.root().key())
491 .expect("location was removed")
492 .or_default()
493 }
494
495 fn batch(
496 &mut self,
497 in_ident: syn::Ident,
498 in_location: &LocationId,
499 in_kind: &CollectionKind,
500 out_ident: &syn::Ident,
501 _out_location: &LocationId,
502 _op_meta: &HydroIrOpMetadata,
503 ) {
504 let builder = self.get_dfir_mut(in_location.root());
505 if in_kind.is_bounded()
506 && matches!(
507 in_kind,
508 CollectionKind::Singleton { .. }
509 | CollectionKind::Optional { .. }
510 | CollectionKind::KeyedSingleton { .. }
511 )
512 {
513 assert!(in_location.is_top_level());
514 builder.add_dfir(
515 parse_quote! {
516 #out_ident = #in_ident -> persist::<'static>();
517 },
518 None,
519 None,
520 );
521 } else {
522 builder.add_dfir(
523 parse_quote! {
524 #out_ident = #in_ident;
525 },
526 None,
527 None,
528 );
529 }
530 }
531
532 fn yield_from_tick(
533 &mut self,
534 in_ident: syn::Ident,
535 in_location: &LocationId,
536 _in_kind: &CollectionKind,
537 out_ident: &syn::Ident,
538 _out_location: &LocationId,
539 ) {
540 let builder = self.get_dfir_mut(in_location.root());
541 builder.add_dfir(
542 parse_quote! {
543 #out_ident = #in_ident;
544 },
545 None,
546 None,
547 );
548 }
549
550 fn begin_atomic(
551 &mut self,
552 in_ident: syn::Ident,
553 in_location: &LocationId,
554 _in_kind: &CollectionKind,
555 out_ident: &syn::Ident,
556 _out_location: &LocationId,
557 _op_meta: &HydroIrOpMetadata,
558 ) {
559 let builder = self.get_dfir_mut(in_location.root());
560 builder.add_dfir(
561 parse_quote! {
562 #out_ident = #in_ident;
563 },
564 None,
565 None,
566 );
567 }
568
569 fn end_atomic(
570 &mut self,
571 in_ident: syn::Ident,
572 in_location: &LocationId,
573 _in_kind: &CollectionKind,
574 out_ident: &syn::Ident,
575 ) {
576 let builder = self.get_dfir_mut(in_location.root());
577 builder.add_dfir(
578 parse_quote! {
579 #out_ident = #in_ident;
580 },
581 None,
582 None,
583 );
584 }
585
586 fn observe_nondet(
587 &mut self,
588 _trusted: bool,
589 location: &LocationId,
590 in_ident: syn::Ident,
591 _in_kind: &CollectionKind,
592 out_ident: &syn::Ident,
593 _out_kind: &CollectionKind,
594 _op_meta: &HydroIrOpMetadata,
595 ) {
596 let builder = self.get_dfir_mut(location);
597 builder.add_dfir(
598 parse_quote! {
599 #out_ident = #in_ident;
600 },
601 None,
602 None,
603 );
604 }
605
606 fn create_network(
607 &mut self,
608 from: &LocationId,
609 to: &LocationId,
610 input_ident: syn::Ident,
611 out_ident: &syn::Ident,
612 serialize: Option<&DebugExpr>,
613 sink: syn::Expr,
614 source: syn::Expr,
615 deserialize: Option<&DebugExpr>,
616 tag_id: usize,
617 _networking_info: &crate::networking::NetworkingInfo,
618 ) {
619 let sender_builder = self.get_dfir_mut(from);
620 if let Some(serialize_pipeline) = serialize {
621 sender_builder.add_dfir(
622 parse_quote! {
623 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
624 },
625 None,
626 Some(&format!("send{}", tag_id)),
628 );
629 } else {
630 sender_builder.add_dfir(
631 parse_quote! {
632 #input_ident -> dest_sink(#sink);
633 },
634 None,
635 Some(&format!("send{}", tag_id)),
636 );
637 }
638
639 let receiver_builder = self.get_dfir_mut(to);
640 if let Some(deserialize_pipeline) = deserialize {
641 receiver_builder.add_dfir(
642 parse_quote! {
643 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
644 },
645 None,
646 Some(&format!("recv{}", tag_id)),
647 );
648 } else {
649 receiver_builder.add_dfir(
650 parse_quote! {
651 #out_ident = source_stream(#source);
652 },
653 None,
654 Some(&format!("recv{}", tag_id)),
655 );
656 }
657 }
658
659 fn create_external_source(
660 &mut self,
661 on: &LocationId,
662 source_expr: syn::Expr,
663 out_ident: &syn::Ident,
664 deserialize: Option<&DebugExpr>,
665 tag_id: usize,
666 ) {
667 let receiver_builder = self.get_dfir_mut(on);
668 if let Some(deserialize_pipeline) = deserialize {
669 receiver_builder.add_dfir(
670 parse_quote! {
671 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
672 },
673 None,
674 Some(&format!("recv{}", tag_id)),
675 );
676 } else {
677 receiver_builder.add_dfir(
678 parse_quote! {
679 #out_ident = source_stream(#source_expr);
680 },
681 None,
682 Some(&format!("recv{}", tag_id)),
683 );
684 }
685 }
686
687 fn create_external_output(
688 &mut self,
689 on: &LocationId,
690 sink_expr: syn::Expr,
691 input_ident: &syn::Ident,
692 serialize: Option<&DebugExpr>,
693 tag_id: usize,
694 ) {
695 let sender_builder = self.get_dfir_mut(on);
696 if let Some(serialize_fn) = serialize {
697 sender_builder.add_dfir(
698 parse_quote! {
699 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
700 },
701 None,
702 Some(&format!("send{}", tag_id)),
704 );
705 } else {
706 sender_builder.add_dfir(
707 parse_quote! {
708 #input_ident -> dest_sink(#sink_expr);
709 },
710 None,
711 Some(&format!("send{}", tag_id)),
712 );
713 }
714 }
715
716 fn assert_is_consistent(
717 &mut self,
718 location: &LocationId,
719 in_ident: syn::Ident,
720 out_ident: &syn::Ident,
721 ) {
722 let builder = self.get_dfir_mut(location);
723 builder.add_dfir(
724 parse_quote! {
725 #out_ident = #in_ident;
726 },
727 None,
728 None,
729 );
730 }
731}
732
733#[cfg(feature = "build")]
734pub enum BuildersOrCallback<'a, L, N>
735where
736 L: FnMut(&mut HydroRoot, &mut usize),
737 N: FnMut(&mut HydroNode, &mut usize),
738{
739 Builders(&'a mut dyn DfirBuilder),
740 Callback(L, N),
741}
742
743#[derive(Debug, Hash, serde::Serialize)]
747pub enum HydroRoot {
748 ForEach {
749 f: DebugExpr,
750 input: Box<HydroNode>,
751 op_metadata: HydroIrOpMetadata,
752 },
753 SendExternal {
754 to_external_key: LocationKey,
755 to_port_id: ExternalPortId,
756 to_many: bool,
757 unpaired: bool,
758 serialize_fn: Option<DebugExpr>,
759 instantiate_fn: DebugInstantiate,
760 input: Box<HydroNode>,
761 op_metadata: HydroIrOpMetadata,
762 },
763 DestSink {
764 sink: DebugExpr,
765 input: Box<HydroNode>,
766 op_metadata: HydroIrOpMetadata,
767 },
768 CycleSink {
769 cycle_id: CycleId,
770 input: Box<HydroNode>,
771 op_metadata: HydroIrOpMetadata,
772 },
773 EmbeddedOutput {
774 #[serde(serialize_with = "serialize_ident")]
775 ident: syn::Ident,
776 input: Box<HydroNode>,
777 op_metadata: HydroIrOpMetadata,
778 },
779 Null {
780 input: Box<HydroNode>,
781 op_metadata: HydroIrOpMetadata,
782 },
783}
784
785impl HydroRoot {
786 #[cfg(feature = "build")]
787 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
788 pub fn compile_network<'a, D>(
789 &mut self,
790 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
791 seen_tees: &mut SeenSharedNodes,
792 seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
793 processes: &SparseSecondaryMap<LocationKey, D::Process>,
794 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
795 externals: &SparseSecondaryMap<LocationKey, D::External>,
796 env: &mut D::InstantiateEnv,
797 ) where
798 D: Deploy<'a>,
799 {
800 let refcell_extra_stmts = RefCell::new(extra_stmts);
801 let refcell_env = RefCell::new(env);
802 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
803 self.transform_bottom_up(
804 &mut |l| {
805 if let HydroRoot::SendExternal {
806 input,
807 to_external_key,
808 to_port_id,
809 to_many,
810 unpaired,
811 instantiate_fn,
812 ..
813 } = l
814 {
815 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
816 DebugInstantiate::Building => {
817 let to_node = externals
818 .get(*to_external_key)
819 .unwrap_or_else(|| {
820 panic!("A external used in the graph was not instantiated: {}", to_external_key)
821 })
822 .clone();
823
824 match input.metadata().location_id.root() {
825 &LocationId::Process(process_key) => {
826 if *to_many {
827 (
828 (
829 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
830 parse_quote!(DUMMY),
831 ),
832 Box::new(|| {}) as Box<dyn FnOnce()>,
833 )
834 } else {
835 let from_node = processes
836 .get(process_key)
837 .unwrap_or_else(|| {
838 panic!("A process used in the graph was not instantiated: {}", process_key)
839 })
840 .clone();
841
842 let sink_port = from_node.next_port();
843 let source_port = to_node.next_port();
844
845 if *unpaired {
846 use stageleft::quote_type;
847 use tokio_util::codec::LengthDelimitedCodec;
848
849 to_node.register(*to_port_id, source_port.clone());
850
851 let _ = D::e2o_source(
852 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
853 &to_node, &source_port,
854 &from_node, &sink_port,
855 "e_type::<LengthDelimitedCodec>(),
856 format!("{}_{}", *to_external_key, *to_port_id)
857 );
858 }
859
860 (
861 (
862 D::o2e_sink(
863 &from_node,
864 &sink_port,
865 &to_node,
866 &source_port,
867 format!("{}_{}", *to_external_key, *to_port_id)
868 ),
869 parse_quote!(DUMMY),
870 ),
871 if *unpaired {
872 D::e2o_connect(
873 &to_node,
874 &source_port,
875 &from_node,
876 &sink_port,
877 *to_many,
878 NetworkHint::Auto,
879 )
880 } else {
881 Box::new(|| {}) as Box<dyn FnOnce()>
882 },
883 )
884 }
885 }
886 LocationId::Cluster(cluster_key, _) => {
887 let from_node = clusters
888 .get(*cluster_key)
889 .unwrap_or_else(|| {
890 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
891 })
892 .clone();
893
894 let sink_port = from_node.next_port();
895 let source_port = to_node.next_port();
896
897 if *unpaired {
898 to_node.register(*to_port_id, source_port.clone());
899 }
900
901 (
902 (
903 D::m2e_sink(
904 &from_node,
905 &sink_port,
906 &to_node,
907 &source_port,
908 format!("{}_{}", *to_external_key, *to_port_id)
909 ),
910 parse_quote!(DUMMY),
911 ),
912 Box::new(|| {}) as Box<dyn FnOnce()>,
913 )
914 }
915 _ => panic!()
916 }
917 },
918
919 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
920 };
921
922 *instantiate_fn = DebugInstantiateFinalized {
923 sink: sink_expr,
924 source: source_expr,
925 connect_fn: Some(connect_fn),
926 }
927 .into();
928 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
929 let element_type = match &input.metadata().collection_kind {
930 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
931 _ => panic!("Embedded output must have Stream collection kind"),
932 };
933 let location_key = match input.metadata().location_id.root() {
934 LocationId::Process(key) | LocationId::Cluster(key, _) => *key,
935 _ => panic!("Embedded output must be on a process or cluster"),
936 };
937 D::register_embedded_output(
938 &mut refcell_env.borrow_mut(),
939 location_key,
940 ident,
941 &element_type,
942 );
943 }
944 },
945 &mut |n| {
946 if let HydroNode::Network {
947 name,
948 networking_info,
949 input,
950 instantiate_fn,
951 metadata,
952 ..
953 } = n
954 {
955 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
956 DebugInstantiate::Building => instantiate_network::<D>(
957 &mut refcell_env.borrow_mut(),
958 input.metadata().location_id.root(),
959 metadata.location_id.root(),
960 processes,
961 clusters,
962 name.as_deref(),
963 networking_info,
964 ),
965
966 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
967 };
968
969 *instantiate_fn = DebugInstantiateFinalized {
970 sink: sink_expr,
971 source: source_expr,
972 connect_fn: Some(connect_fn),
973 }
974 .into();
975 } else if let HydroNode::ExternalInput {
976 from_external_key,
977 from_port_id,
978 from_many,
979 codec_type,
980 port_hint,
981 instantiate_fn,
982 metadata,
983 ..
984 } = n
985 {
986 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
987 DebugInstantiate::Building => {
988 let from_node = externals
989 .get(*from_external_key)
990 .unwrap_or_else(|| {
991 panic!(
992 "A external used in the graph was not instantiated: {}",
993 from_external_key,
994 )
995 })
996 .clone();
997
998 match metadata.location_id.root() {
999 &LocationId::Process(process_key) => {
1000 let to_node = processes
1001 .get(process_key)
1002 .unwrap_or_else(|| {
1003 panic!("A process used in the graph was not instantiated: {}", process_key)
1004 })
1005 .clone();
1006
1007 let sink_port = from_node.next_port();
1008 let source_port = to_node.next_port();
1009
1010 from_node.register(*from_port_id, sink_port.clone());
1011
1012 (
1013 (
1014 parse_quote!(DUMMY),
1015 if *from_many {
1016 D::e2o_many_source(
1017 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1018 &to_node, &source_port,
1019 codec_type.0.as_ref(),
1020 format!("{}_{}", *from_external_key, *from_port_id)
1021 )
1022 } else {
1023 D::e2o_source(
1024 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1025 &from_node, &sink_port,
1026 &to_node, &source_port,
1027 codec_type.0.as_ref(),
1028 format!("{}_{}", *from_external_key, *from_port_id)
1029 )
1030 },
1031 ),
1032 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1033 )
1034 }
1035 LocationId::Cluster(cluster_key, _) => {
1036 let to_node = clusters
1037 .get(*cluster_key)
1038 .unwrap_or_else(|| {
1039 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1040 })
1041 .clone();
1042
1043 let sink_port = from_node.next_port();
1044 let source_port = to_node.next_port();
1045
1046 from_node.register(*from_port_id, sink_port.clone());
1047
1048 (
1049 (
1050 parse_quote!(DUMMY),
1051 D::e2m_source(
1052 refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1053 &from_node, &sink_port,
1054 &to_node, &source_port,
1055 codec_type.0.as_ref(),
1056 format!("{}_{}", *from_external_key, *from_port_id)
1057 ),
1058 ),
1059 D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1060 )
1061 }
1062 _ => panic!()
1063 }
1064 },
1065
1066 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1067 };
1068
1069 *instantiate_fn = DebugInstantiateFinalized {
1070 sink: sink_expr,
1071 source: source_expr,
1072 connect_fn: Some(connect_fn),
1073 }
1074 .into();
1075 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1076 let element_type = match &metadata.collection_kind {
1077 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1078 _ => panic!("Embedded source must have Stream collection kind"),
1079 };
1080 let location_key = match metadata.location_id.root() {
1081 LocationId::Process(key) | LocationId::Cluster(key, _) => *key,
1082 _ => panic!("Embedded source must be on a process or cluster"),
1083 };
1084 D::register_embedded_stream_input(
1085 &mut refcell_env.borrow_mut(),
1086 location_key,
1087 ident,
1088 &element_type,
1089 );
1090 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1091 let element_type = match &metadata.collection_kind {
1092 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1093 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1094 };
1095 let location_key = match metadata.location_id.root() {
1096 LocationId::Process(key) | LocationId::Cluster(key, _) => *key,
1097 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1098 };
1099 D::register_embedded_singleton_input(
1100 &mut refcell_env.borrow_mut(),
1101 location_key,
1102 ident,
1103 &element_type,
1104 );
1105 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1106 match state {
1107 ClusterMembersState::Uninit => {
1108 let at_location = metadata.location_id.root().clone();
1109 let key = (at_location.clone(), location_id.key());
1110 if refcell_seen_cluster_members.borrow_mut().insert(key) {
1111 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1113 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1114 &(),
1115 );
1116 *state = ClusterMembersState::Stream(expr.into());
1117 } else {
1118 *state = ClusterMembersState::Tee(at_location, location_id.clone());
1120 }
1121 }
1122 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1123 panic!("cluster members already finalized");
1124 }
1125 }
1126 }
1127 },
1128 seen_tees,
1129 false,
1130 );
1131 }
1132
1133 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1134 self.transform_bottom_up(
1135 &mut |l| {
1136 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1137 match instantiate_fn {
1138 DebugInstantiate::Building => panic!("network not built"),
1139
1140 DebugInstantiate::Finalized(finalized) => {
1141 (finalized.connect_fn.take().unwrap())();
1142 }
1143 }
1144 }
1145 },
1146 &mut |n| {
1147 if let HydroNode::Network { instantiate_fn, .. }
1148 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1149 {
1150 match instantiate_fn {
1151 DebugInstantiate::Building => panic!("network not built"),
1152
1153 DebugInstantiate::Finalized(finalized) => {
1154 (finalized.connect_fn.take().unwrap())();
1155 }
1156 }
1157 }
1158 },
1159 seen_tees,
1160 false,
1161 );
1162 }
1163
1164 pub fn transform_bottom_up(
1165 &mut self,
1166 transform_root: &mut impl FnMut(&mut HydroRoot),
1167 transform_node: &mut impl FnMut(&mut HydroNode),
1168 seen_tees: &mut SeenSharedNodes,
1169 check_well_formed: bool,
1170 ) {
1171 self.transform_children(
1172 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1173 seen_tees,
1174 );
1175
1176 transform_root(self);
1177 }
1178
1179 pub fn transform_children(
1180 &mut self,
1181 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1182 seen_tees: &mut SeenSharedNodes,
1183 ) {
1184 match self {
1185 HydroRoot::ForEach { input, .. }
1186 | HydroRoot::SendExternal { input, .. }
1187 | HydroRoot::DestSink { input, .. }
1188 | HydroRoot::CycleSink { input, .. }
1189 | HydroRoot::EmbeddedOutput { input, .. }
1190 | HydroRoot::Null { input, .. } => {
1191 transform(input, seen_tees);
1192 }
1193 }
1194 }
1195
1196 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1197 match self {
1198 HydroRoot::ForEach {
1199 f,
1200 input,
1201 op_metadata,
1202 } => HydroRoot::ForEach {
1203 f: f.clone(),
1204 input: Box::new(input.deep_clone(seen_tees)),
1205 op_metadata: op_metadata.clone(),
1206 },
1207 HydroRoot::SendExternal {
1208 to_external_key,
1209 to_port_id,
1210 to_many,
1211 unpaired,
1212 serialize_fn,
1213 instantiate_fn,
1214 input,
1215 op_metadata,
1216 } => HydroRoot::SendExternal {
1217 to_external_key: *to_external_key,
1218 to_port_id: *to_port_id,
1219 to_many: *to_many,
1220 unpaired: *unpaired,
1221 serialize_fn: serialize_fn.clone(),
1222 instantiate_fn: instantiate_fn.clone(),
1223 input: Box::new(input.deep_clone(seen_tees)),
1224 op_metadata: op_metadata.clone(),
1225 },
1226 HydroRoot::DestSink {
1227 sink,
1228 input,
1229 op_metadata,
1230 } => HydroRoot::DestSink {
1231 sink: sink.clone(),
1232 input: Box::new(input.deep_clone(seen_tees)),
1233 op_metadata: op_metadata.clone(),
1234 },
1235 HydroRoot::CycleSink {
1236 cycle_id,
1237 input,
1238 op_metadata,
1239 } => HydroRoot::CycleSink {
1240 cycle_id: *cycle_id,
1241 input: Box::new(input.deep_clone(seen_tees)),
1242 op_metadata: op_metadata.clone(),
1243 },
1244 HydroRoot::EmbeddedOutput {
1245 ident,
1246 input,
1247 op_metadata,
1248 } => HydroRoot::EmbeddedOutput {
1249 ident: ident.clone(),
1250 input: Box::new(input.deep_clone(seen_tees)),
1251 op_metadata: op_metadata.clone(),
1252 },
1253 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1254 input: Box::new(input.deep_clone(seen_tees)),
1255 op_metadata: op_metadata.clone(),
1256 },
1257 }
1258 }
1259
1260 #[cfg(feature = "build")]
1261 pub fn emit(
1262 &mut self,
1263 graph_builders: &mut dyn DfirBuilder,
1264 seen_tees: &mut SeenSharedNodes,
1265 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1266 next_stmt_id: &mut usize,
1267 ) {
1268 self.emit_core(
1269 &mut BuildersOrCallback::<
1270 fn(&mut HydroRoot, &mut usize),
1271 fn(&mut HydroNode, &mut usize),
1272 >::Builders(graph_builders),
1273 seen_tees,
1274 built_tees,
1275 next_stmt_id,
1276 );
1277 }
1278
1279 #[cfg(feature = "build")]
1280 pub fn emit_core(
1281 &mut self,
1282 builders_or_callback: &mut BuildersOrCallback<
1283 impl FnMut(&mut HydroRoot, &mut usize),
1284 impl FnMut(&mut HydroNode, &mut usize),
1285 >,
1286 seen_tees: &mut SeenSharedNodes,
1287 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1288 next_stmt_id: &mut usize,
1289 ) {
1290 match self {
1291 HydroRoot::ForEach { f, input, .. } => {
1292 let input_ident =
1293 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1294
1295 match builders_or_callback {
1296 BuildersOrCallback::Builders(graph_builders) => {
1297 graph_builders
1298 .get_dfir_mut(&input.metadata().location_id)
1299 .add_dfir(
1300 parse_quote! {
1301 #input_ident -> for_each(#f);
1302 },
1303 None,
1304 Some(&next_stmt_id.to_string()),
1305 );
1306 }
1307 BuildersOrCallback::Callback(leaf_callback, _) => {
1308 leaf_callback(self, next_stmt_id);
1309 }
1310 }
1311
1312 *next_stmt_id += 1;
1313 }
1314
1315 HydroRoot::SendExternal {
1316 serialize_fn,
1317 instantiate_fn,
1318 input,
1319 ..
1320 } => {
1321 let input_ident =
1322 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1323
1324 match builders_or_callback {
1325 BuildersOrCallback::Builders(graph_builders) => {
1326 let (sink_expr, _) = match instantiate_fn {
1327 DebugInstantiate::Building => (
1328 syn::parse_quote!(DUMMY_SINK),
1329 syn::parse_quote!(DUMMY_SOURCE),
1330 ),
1331
1332 DebugInstantiate::Finalized(finalized) => {
1333 (finalized.sink.clone(), finalized.source.clone())
1334 }
1335 };
1336
1337 graph_builders.create_external_output(
1338 &input.metadata().location_id,
1339 sink_expr,
1340 &input_ident,
1341 serialize_fn.as_ref(),
1342 *next_stmt_id,
1343 );
1344 }
1345 BuildersOrCallback::Callback(leaf_callback, _) => {
1346 leaf_callback(self, next_stmt_id);
1347 }
1348 }
1349
1350 *next_stmt_id += 1;
1351 }
1352
1353 HydroRoot::DestSink { sink, input, .. } => {
1354 let input_ident =
1355 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1356
1357 match builders_or_callback {
1358 BuildersOrCallback::Builders(graph_builders) => {
1359 graph_builders
1360 .get_dfir_mut(&input.metadata().location_id)
1361 .add_dfir(
1362 parse_quote! {
1363 #input_ident -> dest_sink(#sink);
1364 },
1365 None,
1366 Some(&next_stmt_id.to_string()),
1367 );
1368 }
1369 BuildersOrCallback::Callback(leaf_callback, _) => {
1370 leaf_callback(self, next_stmt_id);
1371 }
1372 }
1373
1374 *next_stmt_id += 1;
1375 }
1376
1377 HydroRoot::CycleSink {
1378 cycle_id, input, ..
1379 } => {
1380 let input_ident =
1381 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1382
1383 match builders_or_callback {
1384 BuildersOrCallback::Builders(graph_builders) => {
1385 let elem_type: syn::Type = match &input.metadata().collection_kind {
1386 CollectionKind::KeyedSingleton {
1387 key_type,
1388 value_type,
1389 ..
1390 }
1391 | CollectionKind::KeyedStream {
1392 key_type,
1393 value_type,
1394 ..
1395 } => {
1396 parse_quote!((#key_type, #value_type))
1397 }
1398 CollectionKind::Stream { element_type, .. }
1399 | CollectionKind::Singleton { element_type, .. }
1400 | CollectionKind::Optional { element_type, .. } => {
1401 parse_quote!(#element_type)
1402 }
1403 };
1404
1405 let cycle_id_ident = cycle_id.as_ident();
1406 graph_builders
1407 .get_dfir_mut(&input.metadata().location_id)
1408 .add_dfir(
1409 parse_quote! {
1410 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1411 },
1412 None,
1413 None,
1414 );
1415 }
1416 BuildersOrCallback::Callback(_, _) => {}
1418 }
1419 }
1420
1421 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1422 let input_ident =
1423 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1424
1425 match builders_or_callback {
1426 BuildersOrCallback::Builders(graph_builders) => {
1427 graph_builders
1428 .get_dfir_mut(&input.metadata().location_id)
1429 .add_dfir(
1430 parse_quote! {
1431 #input_ident -> for_each(&mut #ident);
1432 },
1433 None,
1434 Some(&next_stmt_id.to_string()),
1435 );
1436 }
1437 BuildersOrCallback::Callback(leaf_callback, _) => {
1438 leaf_callback(self, next_stmt_id);
1439 }
1440 }
1441
1442 *next_stmt_id += 1;
1443 }
1444
1445 HydroRoot::Null { input, .. } => {
1446 let input_ident =
1447 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1448
1449 match builders_or_callback {
1450 BuildersOrCallback::Builders(graph_builders) => {
1451 graph_builders
1452 .get_dfir_mut(&input.metadata().location_id)
1453 .add_dfir(
1454 parse_quote! {
1455 #input_ident -> for_each(|_| {});
1456 },
1457 None,
1458 Some(&next_stmt_id.to_string()),
1459 );
1460 }
1461 BuildersOrCallback::Callback(leaf_callback, _) => {
1462 leaf_callback(self, next_stmt_id);
1463 }
1464 }
1465
1466 *next_stmt_id += 1;
1467 }
1468 }
1469 }
1470
1471 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1472 match self {
1473 HydroRoot::ForEach { op_metadata, .. }
1474 | HydroRoot::SendExternal { op_metadata, .. }
1475 | HydroRoot::DestSink { op_metadata, .. }
1476 | HydroRoot::CycleSink { op_metadata, .. }
1477 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1478 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1479 }
1480 }
1481
1482 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1483 match self {
1484 HydroRoot::ForEach { op_metadata, .. }
1485 | HydroRoot::SendExternal { op_metadata, .. }
1486 | HydroRoot::DestSink { op_metadata, .. }
1487 | HydroRoot::CycleSink { op_metadata, .. }
1488 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1489 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1490 }
1491 }
1492
1493 pub fn input(&self) -> &HydroNode {
1494 match self {
1495 HydroRoot::ForEach { input, .. }
1496 | HydroRoot::SendExternal { input, .. }
1497 | HydroRoot::DestSink { input, .. }
1498 | HydroRoot::CycleSink { input, .. }
1499 | HydroRoot::EmbeddedOutput { input, .. }
1500 | HydroRoot::Null { input, .. } => input,
1501 }
1502 }
1503
1504 pub fn input_metadata(&self) -> &HydroIrMetadata {
1505 self.input().metadata()
1506 }
1507
1508 pub fn print_root(&self) -> String {
1509 match self {
1510 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1511 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1512 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1513 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1514 HydroRoot::EmbeddedOutput { ident, .. } => {
1515 format!("EmbeddedOutput({})", ident)
1516 }
1517 HydroRoot::Null { .. } => "Null".to_owned(),
1518 }
1519 }
1520
1521 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1522 match self {
1523 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1524 transform(f);
1525 }
1526 HydroRoot::SendExternal { .. }
1527 | HydroRoot::CycleSink { .. }
1528 | HydroRoot::EmbeddedOutput { .. }
1529 | HydroRoot::Null { .. } => {}
1530 }
1531 }
1532}
1533
1534#[cfg(feature = "build")]
1535fn tick_of(loc: &LocationId) -> Option<ClockId> {
1536 match loc {
1537 LocationId::Tick(id, _) => Some(*id),
1538 LocationId::Atomic(inner) => tick_of(inner),
1539 _ => None,
1540 }
1541}
1542
1543#[cfg(feature = "build")]
1544fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1545 match loc {
1546 LocationId::Tick(id, inner) => {
1547 *id = uf_find(uf, *id);
1548 remap_location(inner, uf);
1549 }
1550 LocationId::Atomic(inner) => {
1551 remap_location(inner, uf);
1552 }
1553 LocationId::Process(_) | LocationId::Cluster(_, _) => {}
1554 }
1555}
1556
1557#[cfg(feature = "build")]
1558fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1559 let p = *parent.get(&x).unwrap_or(&x);
1560 if p == x {
1561 return x;
1562 }
1563 let root = uf_find(parent, p);
1564 parent.insert(x, root);
1565 root
1566}
1567
1568#[cfg(feature = "build")]
1569fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1570 let ra = uf_find(parent, a);
1571 let rb = uf_find(parent, b);
1572 if ra != rb {
1573 parent.insert(ra, rb);
1574 }
1575}
1576
1577#[cfg(feature = "build")]
1581pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1582 let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1583
1584 transform_bottom_up(
1586 ir,
1587 &mut |_| {},
1588 &mut |node: &mut HydroNode| {
1589 if let HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } =
1590 node
1591 && let (Some(a), Some(b)) = (
1592 tick_of(&inner.metadata().location_id),
1593 tick_of(&metadata.location_id),
1594 )
1595 {
1596 uf_union(&mut uf, a, b);
1597 }
1598 },
1599 false,
1600 );
1601
1602 transform_bottom_up(
1604 ir,
1605 &mut |_| {},
1606 &mut |node: &mut HydroNode| {
1607 remap_location(&mut node.metadata_mut().location_id, &mut uf);
1608 },
1609 false,
1610 );
1611}
1612
1613#[cfg(feature = "build")]
1614pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1615 let mut builders = SecondaryMap::new();
1616 let mut seen_tees = HashMap::new();
1617 let mut built_tees = HashMap::new();
1618 let mut next_stmt_id = 0;
1619 for leaf in ir {
1620 leaf.emit(
1621 &mut builders,
1622 &mut seen_tees,
1623 &mut built_tees,
1624 &mut next_stmt_id,
1625 );
1626 }
1627 builders
1628}
1629
1630#[cfg(feature = "build")]
1631pub fn traverse_dfir(
1632 ir: &mut [HydroRoot],
1633 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1634 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1635) {
1636 let mut seen_tees = HashMap::new();
1637 let mut built_tees = HashMap::new();
1638 let mut next_stmt_id = 0;
1639 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1640 ir.iter_mut().for_each(|leaf| {
1641 leaf.emit_core(
1642 &mut callback,
1643 &mut seen_tees,
1644 &mut built_tees,
1645 &mut next_stmt_id,
1646 );
1647 });
1648}
1649
1650pub fn transform_bottom_up(
1651 ir: &mut [HydroRoot],
1652 transform_root: &mut impl FnMut(&mut HydroRoot),
1653 transform_node: &mut impl FnMut(&mut HydroNode),
1654 check_well_formed: bool,
1655) {
1656 let mut seen_tees = HashMap::new();
1657 ir.iter_mut().for_each(|leaf| {
1658 leaf.transform_bottom_up(
1659 transform_root,
1660 transform_node,
1661 &mut seen_tees,
1662 check_well_formed,
1663 );
1664 });
1665}
1666
1667pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1668 let mut seen_tees = HashMap::new();
1669 ir.iter()
1670 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1671 .collect()
1672}
1673
1674type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1675thread_local! {
1676 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1677 static SERIALIZED_SHARED: PrintedTees
1681 = const { RefCell::new(None) };
1682}
1683
1684pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1685 PRINTED_TEES.with(|printed_tees| {
1686 let mut printed_tees_mut = printed_tees.borrow_mut();
1687 *printed_tees_mut = Some((0, HashMap::new()));
1688 drop(printed_tees_mut);
1689
1690 let ret = f();
1691
1692 let mut printed_tees_mut = printed_tees.borrow_mut();
1693 *printed_tees_mut = None;
1694
1695 ret
1696 })
1697}
1698
1699pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1704 let _guard = SerializedSharedGuard::enter();
1705 f()
1706}
1707
1708struct SerializedSharedGuard {
1711 previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1712}
1713
1714impl SerializedSharedGuard {
1715 fn enter() -> Self {
1716 let previous = SERIALIZED_SHARED.with(|cell| {
1717 let mut guard = cell.borrow_mut();
1718 guard.replace((0, HashMap::new()))
1719 });
1720 Self { previous }
1721 }
1722}
1723
1724impl Drop for SerializedSharedGuard {
1725 fn drop(&mut self) {
1726 SERIALIZED_SHARED.with(|cell| {
1727 *cell.borrow_mut() = self.previous.take();
1728 });
1729 }
1730}
1731
1732pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1733
1734impl serde::Serialize for SharedNode {
1735 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1746 SERIALIZED_SHARED.with(|cell| {
1747 let mut guard = cell.borrow_mut();
1748 let state = guard.as_mut().ok_or_else(|| {
1750 serde::ser::Error::custom(
1751 "SharedNode serialization requires an active serialize_dedup_shared scope",
1752 )
1753 })?;
1754 let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
1755
1756 if let Some(&id) = state.1.get(&ptr) {
1757 drop(guard);
1758 use serde::ser::SerializeMap;
1759 let mut map = serializer.serialize_map(Some(1))?;
1760 map.serialize_entry("$shared_ref", &id)?;
1761 map.end()
1762 } else {
1763 let id = state.0;
1764 state.0 += 1;
1765 state.1.insert(ptr, id);
1766 drop(guard);
1767
1768 use serde::ser::SerializeMap;
1769 let mut map = serializer.serialize_map(Some(2))?;
1770 map.serialize_entry("$shared", &id)?;
1771 map.serialize_entry("node", &*self.0.borrow())?;
1772 map.end()
1773 }
1774 })
1775 }
1776}
1777
1778impl SharedNode {
1779 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1780 Rc::as_ptr(&self.0)
1781 }
1782}
1783
1784impl Debug for SharedNode {
1785 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1786 PRINTED_TEES.with(|printed_tees| {
1787 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1788 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1789
1790 if let Some(printed_tees_mut) = printed_tees_mut {
1791 if let Some(existing) = printed_tees_mut
1792 .1
1793 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1794 {
1795 write!(f, "<shared {}>", existing)
1796 } else {
1797 let next_id = printed_tees_mut.0;
1798 printed_tees_mut.0 += 1;
1799 printed_tees_mut
1800 .1
1801 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1802 drop(printed_tees_mut_borrow);
1803 write!(f, "<shared {}>: ", next_id)?;
1804 Debug::fmt(&self.0.borrow(), f)
1805 }
1806 } else {
1807 drop(printed_tees_mut_borrow);
1808 write!(f, "<shared>: ")?;
1809 Debug::fmt(&self.0.borrow(), f)
1810 }
1811 })
1812 }
1813}
1814
1815impl Hash for SharedNode {
1816 fn hash<H: Hasher>(&self, state: &mut H) {
1817 self.0.borrow_mut().hash(state);
1818 }
1819}
1820
1821#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1822pub enum BoundKind {
1823 Unbounded,
1824 Bounded,
1825}
1826
1827#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1828pub enum StreamOrder {
1829 NoOrder,
1830 TotalOrder,
1831}
1832
1833#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1834pub enum StreamRetry {
1835 AtLeastOnce,
1836 ExactlyOnce,
1837}
1838
1839#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1840pub enum KeyedSingletonBoundKind {
1841 Unbounded,
1842 MonotonicValue,
1843 BoundedValue,
1844 Bounded,
1845}
1846
1847#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1848pub enum SingletonBoundKind {
1849 Unbounded,
1850 Monotonic,
1851 Bounded,
1852}
1853
1854#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
1855pub enum CollectionKind {
1856 Stream {
1857 bound: BoundKind,
1858 order: StreamOrder,
1859 retry: StreamRetry,
1860 element_type: DebugType,
1861 },
1862 Singleton {
1863 bound: SingletonBoundKind,
1864 element_type: DebugType,
1865 },
1866 Optional {
1867 bound: BoundKind,
1868 element_type: DebugType,
1869 },
1870 KeyedStream {
1871 bound: BoundKind,
1872 value_order: StreamOrder,
1873 value_retry: StreamRetry,
1874 key_type: DebugType,
1875 value_type: DebugType,
1876 },
1877 KeyedSingleton {
1878 bound: KeyedSingletonBoundKind,
1879 key_type: DebugType,
1880 value_type: DebugType,
1881 },
1882}
1883
1884impl CollectionKind {
1885 pub fn is_bounded(&self) -> bool {
1886 matches!(
1887 self,
1888 CollectionKind::Stream {
1889 bound: BoundKind::Bounded,
1890 ..
1891 } | CollectionKind::Singleton {
1892 bound: SingletonBoundKind::Bounded,
1893 ..
1894 } | CollectionKind::Optional {
1895 bound: BoundKind::Bounded,
1896 ..
1897 } | CollectionKind::KeyedStream {
1898 bound: BoundKind::Bounded,
1899 ..
1900 } | CollectionKind::KeyedSingleton {
1901 bound: KeyedSingletonBoundKind::Bounded,
1902 ..
1903 }
1904 )
1905 }
1906}
1907
1908#[derive(Clone, serde::Serialize)]
1909pub struct HydroIrMetadata {
1910 pub location_id: LocationId,
1911 pub collection_kind: CollectionKind,
1912 pub cardinality: Option<usize>,
1913 pub tag: Option<String>,
1914 pub op: HydroIrOpMetadata,
1915}
1916
1917impl Hash for HydroIrMetadata {
1919 fn hash<H: Hasher>(&self, _: &mut H) {}
1920}
1921
1922impl PartialEq for HydroIrMetadata {
1923 fn eq(&self, _: &Self) -> bool {
1924 true
1925 }
1926}
1927
1928impl Eq for HydroIrMetadata {}
1929
1930impl Debug for HydroIrMetadata {
1931 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1932 f.debug_struct("HydroIrMetadata")
1933 .field("location_id", &self.location_id)
1934 .field("collection_kind", &self.collection_kind)
1935 .finish()
1936 }
1937}
1938
1939#[derive(Clone, serde::Serialize)]
1942pub struct HydroIrOpMetadata {
1943 #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
1944 pub backtrace: Backtrace,
1945 pub cpu_usage: Option<f64>,
1946 pub network_recv_cpu_usage: Option<f64>,
1947 pub id: Option<usize>,
1948}
1949
1950impl HydroIrOpMetadata {
1951 #[expect(
1952 clippy::new_without_default,
1953 reason = "explicit calls to new ensure correct backtrace bounds"
1954 )]
1955 pub fn new() -> HydroIrOpMetadata {
1956 Self::new_with_skip(1)
1957 }
1958
1959 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1960 HydroIrOpMetadata {
1961 backtrace: Backtrace::get_backtrace(2 + skip_count),
1962 cpu_usage: None,
1963 network_recv_cpu_usage: None,
1964 id: None,
1965 }
1966 }
1967}
1968
1969impl Debug for HydroIrOpMetadata {
1970 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1971 f.debug_struct("HydroIrOpMetadata").finish()
1972 }
1973}
1974
1975impl Hash for HydroIrOpMetadata {
1976 fn hash<H: Hasher>(&self, _: &mut H) {}
1977}
1978
1979#[derive(Debug, Hash, serde::Serialize)]
1982pub enum HydroNode {
1983 Placeholder,
1984
1985 Cast {
1993 inner: Box<HydroNode>,
1994 metadata: HydroIrMetadata,
1995 },
1996
1997 ObserveNonDet {
2003 inner: Box<HydroNode>,
2004 trusted: bool, metadata: HydroIrMetadata,
2006 },
2007
2008 Source {
2009 source: HydroSource,
2010 metadata: HydroIrMetadata,
2011 },
2012
2013 SingletonSource {
2014 value: DebugExpr,
2015 first_tick_only: bool,
2016 metadata: HydroIrMetadata,
2017 },
2018
2019 CycleSource {
2020 cycle_id: CycleId,
2021 metadata: HydroIrMetadata,
2022 },
2023
2024 Tee {
2025 inner: SharedNode,
2026 metadata: HydroIrMetadata,
2027 },
2028
2029 Partition {
2030 inner: SharedNode,
2031 f: DebugExpr,
2032 is_true: bool,
2033 metadata: HydroIrMetadata,
2034 },
2035
2036 BeginAtomic {
2037 inner: Box<HydroNode>,
2038 metadata: HydroIrMetadata,
2039 },
2040
2041 EndAtomic {
2042 inner: Box<HydroNode>,
2043 metadata: HydroIrMetadata,
2044 },
2045
2046 Batch {
2047 inner: Box<HydroNode>,
2048 metadata: HydroIrMetadata,
2049 },
2050
2051 YieldConcat {
2052 inner: Box<HydroNode>,
2053 metadata: HydroIrMetadata,
2054 },
2055
2056 Chain {
2057 first: Box<HydroNode>,
2058 second: Box<HydroNode>,
2059 metadata: HydroIrMetadata,
2060 },
2061
2062 ChainFirst {
2063 first: Box<HydroNode>,
2064 second: Box<HydroNode>,
2065 metadata: HydroIrMetadata,
2066 },
2067
2068 CrossProduct {
2069 left: Box<HydroNode>,
2070 right: Box<HydroNode>,
2071 metadata: HydroIrMetadata,
2072 },
2073
2074 CrossSingleton {
2075 left: Box<HydroNode>,
2076 right: Box<HydroNode>,
2077 metadata: HydroIrMetadata,
2078 },
2079
2080 Join {
2081 left: Box<HydroNode>,
2082 right: Box<HydroNode>,
2083 metadata: HydroIrMetadata,
2084 },
2085
2086 JoinHalf {
2090 left: Box<HydroNode>,
2091 right: Box<HydroNode>,
2092 metadata: HydroIrMetadata,
2093 },
2094
2095 Difference {
2096 pos: Box<HydroNode>,
2097 neg: Box<HydroNode>,
2098 metadata: HydroIrMetadata,
2099 },
2100
2101 AntiJoin {
2102 pos: Box<HydroNode>,
2103 neg: Box<HydroNode>,
2104 metadata: HydroIrMetadata,
2105 },
2106
2107 ResolveFutures {
2108 input: Box<HydroNode>,
2109 metadata: HydroIrMetadata,
2110 },
2111 ResolveFuturesBlocking {
2112 input: Box<HydroNode>,
2113 metadata: HydroIrMetadata,
2114 },
2115 ResolveFuturesOrdered {
2116 input: Box<HydroNode>,
2117 metadata: HydroIrMetadata,
2118 },
2119
2120 Map {
2121 f: DebugExpr,
2122 input: Box<HydroNode>,
2123 metadata: HydroIrMetadata,
2124 },
2125 FlatMap {
2126 f: DebugExpr,
2127 input: Box<HydroNode>,
2128 metadata: HydroIrMetadata,
2129 },
2130 FlatMapStreamBlocking {
2131 f: DebugExpr,
2132 input: Box<HydroNode>,
2133 metadata: HydroIrMetadata,
2134 },
2135 Filter {
2136 f: DebugExpr,
2137 input: Box<HydroNode>,
2138 metadata: HydroIrMetadata,
2139 },
2140 FilterMap {
2141 f: DebugExpr,
2142 input: Box<HydroNode>,
2143 metadata: HydroIrMetadata,
2144 },
2145
2146 DeferTick {
2147 input: Box<HydroNode>,
2148 metadata: HydroIrMetadata,
2149 },
2150 Enumerate {
2151 input: Box<HydroNode>,
2152 metadata: HydroIrMetadata,
2153 },
2154 Inspect {
2155 f: DebugExpr,
2156 input: Box<HydroNode>,
2157 metadata: HydroIrMetadata,
2158 },
2159
2160 Unique {
2161 input: Box<HydroNode>,
2162 metadata: HydroIrMetadata,
2163 },
2164
2165 Sort {
2166 input: Box<HydroNode>,
2167 metadata: HydroIrMetadata,
2168 },
2169 Fold {
2170 init: DebugExpr,
2171 acc: DebugExpr,
2172 input: Box<HydroNode>,
2173 metadata: HydroIrMetadata,
2174 },
2175
2176 Scan {
2177 init: DebugExpr,
2178 acc: DebugExpr,
2179 input: Box<HydroNode>,
2180 metadata: HydroIrMetadata,
2181 },
2182 ScanAsyncBlocking {
2183 init: DebugExpr,
2184 acc: DebugExpr,
2185 input: Box<HydroNode>,
2186 metadata: HydroIrMetadata,
2187 },
2188 FoldKeyed {
2189 init: DebugExpr,
2190 acc: DebugExpr,
2191 input: Box<HydroNode>,
2192 metadata: HydroIrMetadata,
2193 },
2194
2195 Reduce {
2196 f: DebugExpr,
2197 input: Box<HydroNode>,
2198 metadata: HydroIrMetadata,
2199 },
2200 ReduceKeyed {
2201 f: DebugExpr,
2202 input: Box<HydroNode>,
2203 metadata: HydroIrMetadata,
2204 },
2205 ReduceKeyedWatermark {
2206 f: DebugExpr,
2207 input: Box<HydroNode>,
2208 watermark: Box<HydroNode>,
2209 metadata: HydroIrMetadata,
2210 },
2211
2212 Network {
2213 name: Option<String>,
2214 networking_info: crate::networking::NetworkingInfo,
2215 serialize_fn: Option<DebugExpr>,
2216 instantiate_fn: DebugInstantiate,
2217 deserialize_fn: Option<DebugExpr>,
2218 input: Box<HydroNode>,
2219 metadata: HydroIrMetadata,
2220 },
2221
2222 ExternalInput {
2223 from_external_key: LocationKey,
2224 from_port_id: ExternalPortId,
2225 from_many: bool,
2226 codec_type: DebugType,
2227 #[serde(skip)]
2228 port_hint: NetworkHint,
2229 instantiate_fn: DebugInstantiate,
2230 deserialize_fn: Option<DebugExpr>,
2231 metadata: HydroIrMetadata,
2232 },
2233
2234 Counter {
2235 tag: String,
2236 duration: DebugExpr,
2237 prefix: String,
2238 input: Box<HydroNode>,
2239 metadata: HydroIrMetadata,
2240 },
2241
2242 AssertIsConsistent {
2243 inner: Box<HydroNode>,
2244 metadata: HydroIrMetadata,
2245 },
2246}
2247
2248pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2249pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2250
2251impl HydroNode {
2252 pub fn transform_bottom_up(
2253 &mut self,
2254 transform: &mut impl FnMut(&mut HydroNode),
2255 seen_tees: &mut SeenSharedNodes,
2256 check_well_formed: bool,
2257 ) {
2258 self.transform_children(
2259 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2260 seen_tees,
2261 );
2262
2263 transform(self);
2264
2265 let self_location = self.metadata().location_id.root();
2266
2267 if check_well_formed {
2268 match &*self {
2269 HydroNode::Network { .. } => {}
2270 _ => {
2271 self.input_metadata().iter().for_each(|i| {
2272 if i.location_id.root() != self_location {
2273 panic!(
2274 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2275 i,
2276 i.location_id.root(),
2277 self,
2278 self_location
2279 )
2280 }
2281 });
2282 }
2283 }
2284 }
2285 }
2286
2287 #[inline(always)]
2288 pub fn transform_children(
2289 &mut self,
2290 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2291 seen_tees: &mut SeenSharedNodes,
2292 ) {
2293 match self {
2294 HydroNode::Placeholder => {
2295 panic!();
2296 }
2297
2298 HydroNode::Source { .. }
2299 | HydroNode::SingletonSource { .. }
2300 | HydroNode::CycleSource { .. }
2301 | HydroNode::ExternalInput { .. } => {}
2302
2303 HydroNode::Tee { inner, .. } => {
2304 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2305 *inner = SharedNode(transformed.clone());
2306 } else {
2307 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2308 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2309 let mut orig = inner.0.replace(HydroNode::Placeholder);
2310 transform(&mut orig, seen_tees);
2311 *transformed_cell.borrow_mut() = orig;
2312 *inner = SharedNode(transformed_cell);
2313 }
2314 }
2315
2316 HydroNode::Partition { inner, .. } => {
2317 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2318 *inner = SharedNode(transformed.clone());
2319 } else {
2320 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2321 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2322 let mut orig = inner.0.replace(HydroNode::Placeholder);
2323 transform(&mut orig, seen_tees);
2324 *transformed_cell.borrow_mut() = orig;
2325 *inner = SharedNode(transformed_cell);
2326 }
2327 }
2328
2329 HydroNode::Cast { inner, .. }
2330 | HydroNode::ObserveNonDet { inner, .. }
2331 | HydroNode::BeginAtomic { inner, .. }
2332 | HydroNode::EndAtomic { inner, .. }
2333 | HydroNode::Batch { inner, .. }
2334 | HydroNode::YieldConcat { inner, .. }
2335 | HydroNode::AssertIsConsistent { inner, .. } => {
2336 transform(inner.as_mut(), seen_tees);
2337 }
2338
2339 HydroNode::Chain { first, second, .. } => {
2340 transform(first.as_mut(), seen_tees);
2341 transform(second.as_mut(), seen_tees);
2342 }
2343
2344 HydroNode::ChainFirst { first, second, .. } => {
2345 transform(first.as_mut(), seen_tees);
2346 transform(second.as_mut(), seen_tees);
2347 }
2348
2349 HydroNode::CrossSingleton { left, right, .. }
2350 | HydroNode::CrossProduct { left, right, .. }
2351 | HydroNode::Join { left, right, .. }
2352 | HydroNode::JoinHalf { left, right, .. } => {
2353 transform(left.as_mut(), seen_tees);
2354 transform(right.as_mut(), seen_tees);
2355 }
2356
2357 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2358 transform(pos.as_mut(), seen_tees);
2359 transform(neg.as_mut(), seen_tees);
2360 }
2361
2362 HydroNode::ReduceKeyedWatermark {
2363 input, watermark, ..
2364 } => {
2365 transform(input.as_mut(), seen_tees);
2366 transform(watermark.as_mut(), seen_tees);
2367 }
2368
2369 HydroNode::Map { input, .. }
2370 | HydroNode::ResolveFutures { input, .. }
2371 | HydroNode::ResolveFuturesBlocking { input, .. }
2372 | HydroNode::ResolveFuturesOrdered { input, .. }
2373 | HydroNode::FlatMap { input, .. }
2374 | HydroNode::FlatMapStreamBlocking { input, .. }
2375 | HydroNode::Filter { input, .. }
2376 | HydroNode::FilterMap { input, .. }
2377 | HydroNode::Sort { input, .. }
2378 | HydroNode::DeferTick { input, .. }
2379 | HydroNode::Enumerate { input, .. }
2380 | HydroNode::Inspect { input, .. }
2381 | HydroNode::Unique { input, .. }
2382 | HydroNode::Network { input, .. }
2383 | HydroNode::Fold { input, .. }
2384 | HydroNode::Scan { input, .. }
2385 | HydroNode::ScanAsyncBlocking { input, .. }
2386 | HydroNode::FoldKeyed { input, .. }
2387 | HydroNode::Reduce { input, .. }
2388 | HydroNode::ReduceKeyed { input, .. }
2389 | HydroNode::Counter { input, .. } => {
2390 transform(input.as_mut(), seen_tees);
2391 }
2392 }
2393 }
2394
2395 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2396 match self {
2397 HydroNode::Placeholder => HydroNode::Placeholder,
2398 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2399 inner: Box::new(inner.deep_clone(seen_tees)),
2400 metadata: metadata.clone(),
2401 },
2402 HydroNode::ObserveNonDet {
2403 inner,
2404 trusted,
2405 metadata,
2406 } => HydroNode::ObserveNonDet {
2407 inner: Box::new(inner.deep_clone(seen_tees)),
2408 trusted: *trusted,
2409 metadata: metadata.clone(),
2410 },
2411 HydroNode::AssertIsConsistent { inner, metadata } => HydroNode::AssertIsConsistent {
2412 inner: Box::new(inner.deep_clone(seen_tees)),
2413 metadata: metadata.clone(),
2414 },
2415 HydroNode::Source { source, metadata } => HydroNode::Source {
2416 source: source.clone(),
2417 metadata: metadata.clone(),
2418 },
2419 HydroNode::SingletonSource {
2420 value,
2421 first_tick_only,
2422 metadata,
2423 } => HydroNode::SingletonSource {
2424 value: value.clone(),
2425 first_tick_only: *first_tick_only,
2426 metadata: metadata.clone(),
2427 },
2428 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2429 cycle_id: *cycle_id,
2430 metadata: metadata.clone(),
2431 },
2432 HydroNode::Tee { inner, metadata } => {
2433 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2434 HydroNode::Tee {
2435 inner: SharedNode(transformed.clone()),
2436 metadata: metadata.clone(),
2437 }
2438 } else {
2439 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2440 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2441 let cloned = inner.0.borrow().deep_clone(seen_tees);
2442 *new_rc.borrow_mut() = cloned;
2443 HydroNode::Tee {
2444 inner: SharedNode(new_rc),
2445 metadata: metadata.clone(),
2446 }
2447 }
2448 }
2449 HydroNode::Partition {
2450 inner,
2451 f,
2452 is_true,
2453 metadata,
2454 } => {
2455 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2456 HydroNode::Partition {
2457 inner: SharedNode(transformed.clone()),
2458 f: f.clone(),
2459 is_true: *is_true,
2460 metadata: metadata.clone(),
2461 }
2462 } else {
2463 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2464 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2465 let cloned = inner.0.borrow().deep_clone(seen_tees);
2466 *new_rc.borrow_mut() = cloned;
2467 HydroNode::Partition {
2468 inner: SharedNode(new_rc),
2469 f: f.clone(),
2470 is_true: *is_true,
2471 metadata: metadata.clone(),
2472 }
2473 }
2474 }
2475 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2476 inner: Box::new(inner.deep_clone(seen_tees)),
2477 metadata: metadata.clone(),
2478 },
2479 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2480 inner: Box::new(inner.deep_clone(seen_tees)),
2481 metadata: metadata.clone(),
2482 },
2483 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2484 inner: Box::new(inner.deep_clone(seen_tees)),
2485 metadata: metadata.clone(),
2486 },
2487 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2488 inner: Box::new(inner.deep_clone(seen_tees)),
2489 metadata: metadata.clone(),
2490 },
2491 HydroNode::Chain {
2492 first,
2493 second,
2494 metadata,
2495 } => HydroNode::Chain {
2496 first: Box::new(first.deep_clone(seen_tees)),
2497 second: Box::new(second.deep_clone(seen_tees)),
2498 metadata: metadata.clone(),
2499 },
2500 HydroNode::ChainFirst {
2501 first,
2502 second,
2503 metadata,
2504 } => HydroNode::ChainFirst {
2505 first: Box::new(first.deep_clone(seen_tees)),
2506 second: Box::new(second.deep_clone(seen_tees)),
2507 metadata: metadata.clone(),
2508 },
2509 HydroNode::CrossProduct {
2510 left,
2511 right,
2512 metadata,
2513 } => HydroNode::CrossProduct {
2514 left: Box::new(left.deep_clone(seen_tees)),
2515 right: Box::new(right.deep_clone(seen_tees)),
2516 metadata: metadata.clone(),
2517 },
2518 HydroNode::CrossSingleton {
2519 left,
2520 right,
2521 metadata,
2522 } => HydroNode::CrossSingleton {
2523 left: Box::new(left.deep_clone(seen_tees)),
2524 right: Box::new(right.deep_clone(seen_tees)),
2525 metadata: metadata.clone(),
2526 },
2527 HydroNode::Join {
2528 left,
2529 right,
2530 metadata,
2531 } => HydroNode::Join {
2532 left: Box::new(left.deep_clone(seen_tees)),
2533 right: Box::new(right.deep_clone(seen_tees)),
2534 metadata: metadata.clone(),
2535 },
2536 HydroNode::JoinHalf {
2537 left,
2538 right,
2539 metadata,
2540 } => HydroNode::JoinHalf {
2541 left: Box::new(left.deep_clone(seen_tees)),
2542 right: Box::new(right.deep_clone(seen_tees)),
2543 metadata: metadata.clone(),
2544 },
2545 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2546 pos: Box::new(pos.deep_clone(seen_tees)),
2547 neg: Box::new(neg.deep_clone(seen_tees)),
2548 metadata: metadata.clone(),
2549 },
2550 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2551 pos: Box::new(pos.deep_clone(seen_tees)),
2552 neg: Box::new(neg.deep_clone(seen_tees)),
2553 metadata: metadata.clone(),
2554 },
2555 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2556 input: Box::new(input.deep_clone(seen_tees)),
2557 metadata: metadata.clone(),
2558 },
2559 HydroNode::ResolveFuturesBlocking { input, metadata } => {
2560 HydroNode::ResolveFuturesBlocking {
2561 input: Box::new(input.deep_clone(seen_tees)),
2562 metadata: metadata.clone(),
2563 }
2564 }
2565 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2566 HydroNode::ResolveFuturesOrdered {
2567 input: Box::new(input.deep_clone(seen_tees)),
2568 metadata: metadata.clone(),
2569 }
2570 }
2571 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2572 f: f.clone(),
2573 input: Box::new(input.deep_clone(seen_tees)),
2574 metadata: metadata.clone(),
2575 },
2576 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2577 f: f.clone(),
2578 input: Box::new(input.deep_clone(seen_tees)),
2579 metadata: metadata.clone(),
2580 },
2581 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2582 HydroNode::FlatMapStreamBlocking {
2583 f: f.clone(),
2584 input: Box::new(input.deep_clone(seen_tees)),
2585 metadata: metadata.clone(),
2586 }
2587 }
2588 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2589 f: f.clone(),
2590 input: Box::new(input.deep_clone(seen_tees)),
2591 metadata: metadata.clone(),
2592 },
2593 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2594 f: f.clone(),
2595 input: Box::new(input.deep_clone(seen_tees)),
2596 metadata: metadata.clone(),
2597 },
2598 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2599 input: Box::new(input.deep_clone(seen_tees)),
2600 metadata: metadata.clone(),
2601 },
2602 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2603 input: Box::new(input.deep_clone(seen_tees)),
2604 metadata: metadata.clone(),
2605 },
2606 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2607 f: f.clone(),
2608 input: Box::new(input.deep_clone(seen_tees)),
2609 metadata: metadata.clone(),
2610 },
2611 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2612 input: Box::new(input.deep_clone(seen_tees)),
2613 metadata: metadata.clone(),
2614 },
2615 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2616 input: Box::new(input.deep_clone(seen_tees)),
2617 metadata: metadata.clone(),
2618 },
2619 HydroNode::Fold {
2620 init,
2621 acc,
2622 input,
2623 metadata,
2624 } => HydroNode::Fold {
2625 init: init.clone(),
2626 acc: acc.clone(),
2627 input: Box::new(input.deep_clone(seen_tees)),
2628 metadata: metadata.clone(),
2629 },
2630 HydroNode::Scan {
2631 init,
2632 acc,
2633 input,
2634 metadata,
2635 } => HydroNode::Scan {
2636 init: init.clone(),
2637 acc: acc.clone(),
2638 input: Box::new(input.deep_clone(seen_tees)),
2639 metadata: metadata.clone(),
2640 },
2641 HydroNode::ScanAsyncBlocking {
2642 init,
2643 acc,
2644 input,
2645 metadata,
2646 } => HydroNode::ScanAsyncBlocking {
2647 init: init.clone(),
2648 acc: acc.clone(),
2649 input: Box::new(input.deep_clone(seen_tees)),
2650 metadata: metadata.clone(),
2651 },
2652 HydroNode::FoldKeyed {
2653 init,
2654 acc,
2655 input,
2656 metadata,
2657 } => HydroNode::FoldKeyed {
2658 init: init.clone(),
2659 acc: acc.clone(),
2660 input: Box::new(input.deep_clone(seen_tees)),
2661 metadata: metadata.clone(),
2662 },
2663 HydroNode::ReduceKeyedWatermark {
2664 f,
2665 input,
2666 watermark,
2667 metadata,
2668 } => HydroNode::ReduceKeyedWatermark {
2669 f: f.clone(),
2670 input: Box::new(input.deep_clone(seen_tees)),
2671 watermark: Box::new(watermark.deep_clone(seen_tees)),
2672 metadata: metadata.clone(),
2673 },
2674 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2675 f: f.clone(),
2676 input: Box::new(input.deep_clone(seen_tees)),
2677 metadata: metadata.clone(),
2678 },
2679 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2680 f: f.clone(),
2681 input: Box::new(input.deep_clone(seen_tees)),
2682 metadata: metadata.clone(),
2683 },
2684 HydroNode::Network {
2685 name,
2686 networking_info,
2687 serialize_fn,
2688 instantiate_fn,
2689 deserialize_fn,
2690 input,
2691 metadata,
2692 } => HydroNode::Network {
2693 name: name.clone(),
2694 networking_info: networking_info.clone(),
2695 serialize_fn: serialize_fn.clone(),
2696 instantiate_fn: instantiate_fn.clone(),
2697 deserialize_fn: deserialize_fn.clone(),
2698 input: Box::new(input.deep_clone(seen_tees)),
2699 metadata: metadata.clone(),
2700 },
2701 HydroNode::ExternalInput {
2702 from_external_key,
2703 from_port_id,
2704 from_many,
2705 codec_type,
2706 port_hint,
2707 instantiate_fn,
2708 deserialize_fn,
2709 metadata,
2710 } => HydroNode::ExternalInput {
2711 from_external_key: *from_external_key,
2712 from_port_id: *from_port_id,
2713 from_many: *from_many,
2714 codec_type: codec_type.clone(),
2715 port_hint: *port_hint,
2716 instantiate_fn: instantiate_fn.clone(),
2717 deserialize_fn: deserialize_fn.clone(),
2718 metadata: metadata.clone(),
2719 },
2720 HydroNode::Counter {
2721 tag,
2722 duration,
2723 prefix,
2724 input,
2725 metadata,
2726 } => HydroNode::Counter {
2727 tag: tag.clone(),
2728 duration: duration.clone(),
2729 prefix: prefix.clone(),
2730 input: Box::new(input.deep_clone(seen_tees)),
2731 metadata: metadata.clone(),
2732 },
2733 }
2734 }
2735
2736 #[cfg(feature = "build")]
2737 pub fn emit_core(
2738 &mut self,
2739 builders_or_callback: &mut BuildersOrCallback<
2740 impl FnMut(&mut HydroRoot, &mut usize),
2741 impl FnMut(&mut HydroNode, &mut usize),
2742 >,
2743 seen_tees: &mut SeenSharedNodes,
2744 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
2745 next_stmt_id: &mut usize,
2746 ) -> syn::Ident {
2747 let mut ident_stack: Vec<syn::Ident> = Vec::new();
2748
2749 self.transform_bottom_up(
2750 &mut |node: &mut HydroNode| {
2751 let out_location = node.metadata().location_id.clone();
2752 match node {
2753 HydroNode::Placeholder => {
2754 panic!()
2755 }
2756
2757 HydroNode::Cast { .. } => {
2758 match builders_or_callback {
2761 BuildersOrCallback::Builders(_) => {}
2762 BuildersOrCallback::Callback(_, node_callback) => {
2763 node_callback(node, next_stmt_id);
2764 }
2765 }
2766
2767 *next_stmt_id += 1;
2768 }
2770
2771 HydroNode::AssertIsConsistent { inner, .. } => {
2772 let inner_ident = ident_stack.pop().unwrap();
2773
2774 let out_ident =
2775 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2776
2777 match builders_or_callback {
2778 BuildersOrCallback::Builders(graph_builders) => {
2779 graph_builders.assert_is_consistent(
2780 &inner.metadata().location_id,
2781 inner_ident,
2782 &out_ident,
2783 );
2784 }
2785 BuildersOrCallback::Callback(_, node_callback) => {
2786 node_callback(node, next_stmt_id);
2787 }
2788 }
2789
2790 *next_stmt_id += 1;
2791
2792 ident_stack.push(out_ident);
2793 }
2794
2795 HydroNode::ObserveNonDet {
2796 inner,
2797 trusted,
2798 metadata,
2799 ..
2800 } => {
2801 let inner_ident = ident_stack.pop().unwrap();
2802
2803 let observe_ident =
2804 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2805
2806 match builders_or_callback {
2807 BuildersOrCallback::Builders(graph_builders) => {
2808 graph_builders.observe_nondet(
2809 *trusted,
2810 &inner.metadata().location_id,
2811 inner_ident,
2812 &inner.metadata().collection_kind,
2813 &observe_ident,
2814 &metadata.collection_kind,
2815 &metadata.op,
2816 );
2817 }
2818 BuildersOrCallback::Callback(_, node_callback) => {
2819 node_callback(node, next_stmt_id);
2820 }
2821 }
2822
2823 *next_stmt_id += 1;
2824
2825 ident_stack.push(observe_ident);
2826 }
2827
2828 HydroNode::Batch {
2829 inner, metadata, ..
2830 } => {
2831 let inner_ident = ident_stack.pop().unwrap();
2832
2833 let batch_ident =
2834 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2835
2836 match builders_or_callback {
2837 BuildersOrCallback::Builders(graph_builders) => {
2838 graph_builders.batch(
2839 inner_ident,
2840 &inner.metadata().location_id,
2841 &inner.metadata().collection_kind,
2842 &batch_ident,
2843 &out_location,
2844 &metadata.op,
2845 );
2846 }
2847 BuildersOrCallback::Callback(_, node_callback) => {
2848 node_callback(node, next_stmt_id);
2849 }
2850 }
2851
2852 *next_stmt_id += 1;
2853
2854 ident_stack.push(batch_ident);
2855 }
2856
2857 HydroNode::YieldConcat { inner, .. } => {
2858 let inner_ident = ident_stack.pop().unwrap();
2859
2860 let yield_ident =
2861 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2862
2863 match builders_or_callback {
2864 BuildersOrCallback::Builders(graph_builders) => {
2865 graph_builders.yield_from_tick(
2866 inner_ident,
2867 &inner.metadata().location_id,
2868 &inner.metadata().collection_kind,
2869 &yield_ident,
2870 &out_location,
2871 );
2872 }
2873 BuildersOrCallback::Callback(_, node_callback) => {
2874 node_callback(node, next_stmt_id);
2875 }
2876 }
2877
2878 *next_stmt_id += 1;
2879
2880 ident_stack.push(yield_ident);
2881 }
2882
2883 HydroNode::BeginAtomic { inner, metadata } => {
2884 let inner_ident = ident_stack.pop().unwrap();
2885
2886 let begin_ident =
2887 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2888
2889 match builders_or_callback {
2890 BuildersOrCallback::Builders(graph_builders) => {
2891 graph_builders.begin_atomic(
2892 inner_ident,
2893 &inner.metadata().location_id,
2894 &inner.metadata().collection_kind,
2895 &begin_ident,
2896 &out_location,
2897 &metadata.op,
2898 );
2899 }
2900 BuildersOrCallback::Callback(_, node_callback) => {
2901 node_callback(node, next_stmt_id);
2902 }
2903 }
2904
2905 *next_stmt_id += 1;
2906
2907 ident_stack.push(begin_ident);
2908 }
2909
2910 HydroNode::EndAtomic { inner, .. } => {
2911 let inner_ident = ident_stack.pop().unwrap();
2912
2913 let end_ident =
2914 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2915
2916 match builders_or_callback {
2917 BuildersOrCallback::Builders(graph_builders) => {
2918 graph_builders.end_atomic(
2919 inner_ident,
2920 &inner.metadata().location_id,
2921 &inner.metadata().collection_kind,
2922 &end_ident,
2923 );
2924 }
2925 BuildersOrCallback::Callback(_, node_callback) => {
2926 node_callback(node, next_stmt_id);
2927 }
2928 }
2929
2930 *next_stmt_id += 1;
2931
2932 ident_stack.push(end_ident);
2933 }
2934
2935 HydroNode::Source {
2936 source, metadata, ..
2937 } => {
2938 if let HydroSource::ExternalNetwork() = source {
2939 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2940 } else {
2941 let source_ident =
2942 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2943
2944 let source_stmt = match source {
2945 HydroSource::Stream(expr) => {
2946 debug_assert!(metadata.location_id.is_top_level());
2947 parse_quote! {
2948 #source_ident = source_stream(#expr);
2949 }
2950 }
2951
2952 HydroSource::ExternalNetwork() => {
2953 unreachable!()
2954 }
2955
2956 HydroSource::Iter(expr) => {
2957 if metadata.location_id.is_top_level() {
2958 parse_quote! {
2959 #source_ident = source_iter(#expr);
2960 }
2961 } else {
2962 parse_quote! {
2964 #source_ident = source_iter(#expr) -> persist::<'static>();
2965 }
2966 }
2967 }
2968
2969 HydroSource::Spin() => {
2970 debug_assert!(metadata.location_id.is_top_level());
2971 parse_quote! {
2972 #source_ident = spin();
2973 }
2974 }
2975
2976 HydroSource::ClusterMembers(target_loc, state) => {
2977 debug_assert!(metadata.location_id.is_top_level());
2978
2979 let members_tee_ident = syn::Ident::new(
2980 &format!(
2981 "__cluster_members_tee_{}_{}",
2982 metadata.location_id.root().key(),
2983 target_loc.key(),
2984 ),
2985 Span::call_site(),
2986 );
2987
2988 match state {
2989 ClusterMembersState::Stream(d) => {
2990 parse_quote! {
2991 #members_tee_ident = source_stream(#d) -> tee();
2992 #source_ident = #members_tee_ident;
2993 }
2994 },
2995 ClusterMembersState::Uninit => syn::parse_quote! {
2996 #source_ident = source_stream(DUMMY);
2997 },
2998 ClusterMembersState::Tee(..) => parse_quote! {
2999 #source_ident = #members_tee_ident;
3000 },
3001 }
3002 }
3003
3004 HydroSource::Embedded(ident) => {
3005 parse_quote! {
3006 #source_ident = source_stream(#ident);
3007 }
3008 }
3009
3010 HydroSource::EmbeddedSingleton(ident) => {
3011 parse_quote! {
3012 #source_ident = source_iter([#ident]);
3013 }
3014 }
3015 };
3016
3017 match builders_or_callback {
3018 BuildersOrCallback::Builders(graph_builders) => {
3019 let builder = graph_builders.get_dfir_mut(&out_location);
3020 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
3021 }
3022 BuildersOrCallback::Callback(_, node_callback) => {
3023 node_callback(node, next_stmt_id);
3024 }
3025 }
3026
3027 *next_stmt_id += 1;
3028
3029 ident_stack.push(source_ident);
3030 }
3031 }
3032
3033 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3034 let source_ident =
3035 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3036
3037 match builders_or_callback {
3038 BuildersOrCallback::Builders(graph_builders) => {
3039 let builder = graph_builders.get_dfir_mut(&out_location);
3040
3041 if *first_tick_only {
3042 assert!(
3043 !metadata.location_id.is_top_level(),
3044 "first_tick_only SingletonSource must be inside a tick"
3045 );
3046 }
3047
3048 if *first_tick_only
3049 || (metadata.location_id.is_top_level()
3050 && metadata.collection_kind.is_bounded())
3051 {
3052 builder.add_dfir(
3053 parse_quote! {
3054 #source_ident = source_iter([#value]);
3055 },
3056 None,
3057 Some(&next_stmt_id.to_string()),
3058 );
3059 } else {
3060 builder.add_dfir(
3061 parse_quote! {
3062 #source_ident = source_iter([#value]) -> persist::<'static>();
3063 },
3064 None,
3065 Some(&next_stmt_id.to_string()),
3066 );
3067 }
3068 }
3069 BuildersOrCallback::Callback(_, node_callback) => {
3070 node_callback(node, next_stmt_id);
3071 }
3072 }
3073
3074 *next_stmt_id += 1;
3075
3076 ident_stack.push(source_ident);
3077 }
3078
3079 HydroNode::CycleSource { cycle_id, .. } => {
3080 let ident = cycle_id.as_ident();
3081
3082 match builders_or_callback {
3083 BuildersOrCallback::Builders(_) => {}
3084 BuildersOrCallback::Callback(_, node_callback) => {
3085 node_callback(node, next_stmt_id);
3086 }
3087 }
3088
3089 *next_stmt_id += 1;
3091
3092 ident_stack.push(ident);
3093 }
3094
3095 HydroNode::Tee { inner, .. } => {
3096 let ret_ident = if let Some(built_idents) =
3097 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3098 {
3099 match builders_or_callback {
3100 BuildersOrCallback::Builders(_) => {}
3101 BuildersOrCallback::Callback(_, node_callback) => {
3102 node_callback(node, next_stmt_id);
3103 }
3104 }
3105
3106 built_idents[0].clone()
3107 } else {
3108 let inner_ident = ident_stack.pop().unwrap();
3111
3112 let tee_ident =
3113 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3114
3115 built_tees.insert(
3116 inner.0.as_ref() as *const RefCell<HydroNode>,
3117 vec![tee_ident.clone()],
3118 );
3119
3120 match builders_or_callback {
3121 BuildersOrCallback::Builders(graph_builders) => {
3122 let builder = graph_builders.get_dfir_mut(&out_location);
3123 builder.add_dfir(
3124 parse_quote! {
3125 #tee_ident = #inner_ident -> tee();
3126 },
3127 None,
3128 Some(&next_stmt_id.to_string()),
3129 );
3130 }
3131 BuildersOrCallback::Callback(_, node_callback) => {
3132 node_callback(node, next_stmt_id);
3133 }
3134 }
3135
3136 tee_ident
3137 };
3138
3139 *next_stmt_id += 1;
3143 ident_stack.push(ret_ident);
3144 }
3145
3146 HydroNode::Partition {
3147 inner, f, is_true, ..
3148 } => {
3149 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3151 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3152 match builders_or_callback {
3153 BuildersOrCallback::Builders(_) => {}
3154 BuildersOrCallback::Callback(_, node_callback) => {
3155 node_callback(node, next_stmt_id);
3156 }
3157 }
3158
3159 let idx = if is_true { 0 } else { 1 };
3160 built_idents[idx].clone()
3161 } else {
3162 let inner_ident = ident_stack.pop().unwrap();
3165
3166 let partition_ident = syn::Ident::new(
3167 &format!("stream_{}_partition", *next_stmt_id),
3168 Span::call_site(),
3169 );
3170 let true_ident = syn::Ident::new(
3171 &format!("stream_{}_true", *next_stmt_id),
3172 Span::call_site(),
3173 );
3174 let false_ident = syn::Ident::new(
3175 &format!("stream_{}_false", *next_stmt_id),
3176 Span::call_site(),
3177 );
3178
3179 built_tees.insert(
3180 ptr,
3181 vec![true_ident.clone(), false_ident.clone()],
3182 );
3183
3184 match builders_or_callback {
3185 BuildersOrCallback::Builders(graph_builders) => {
3186 let builder = graph_builders.get_dfir_mut(&out_location);
3187 builder.add_dfir(
3188 parse_quote! {
3189 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f)(__item) { 0_usize } else { 1_usize });
3190 #true_ident = #partition_ident[0];
3191 #false_ident = #partition_ident[1];
3192 },
3193 None,
3194 Some(&next_stmt_id.to_string()),
3195 );
3196 }
3197 BuildersOrCallback::Callback(_, node_callback) => {
3198 node_callback(node, next_stmt_id);
3199 }
3200 }
3201
3202 if is_true { true_ident } else { false_ident }
3203 };
3204
3205 *next_stmt_id += 1;
3206 ident_stack.push(ret_ident);
3207 }
3208
3209 HydroNode::Chain { .. } => {
3210 let second_ident = ident_stack.pop().unwrap();
3212 let first_ident = ident_stack.pop().unwrap();
3213
3214 let chain_ident =
3215 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3216
3217 match builders_or_callback {
3218 BuildersOrCallback::Builders(graph_builders) => {
3219 let builder = graph_builders.get_dfir_mut(&out_location);
3220 builder.add_dfir(
3221 parse_quote! {
3222 #chain_ident = chain();
3223 #first_ident -> [0]#chain_ident;
3224 #second_ident -> [1]#chain_ident;
3225 },
3226 None,
3227 Some(&next_stmt_id.to_string()),
3228 );
3229 }
3230 BuildersOrCallback::Callback(_, node_callback) => {
3231 node_callback(node, next_stmt_id);
3232 }
3233 }
3234
3235 *next_stmt_id += 1;
3236
3237 ident_stack.push(chain_ident);
3238 }
3239
3240 HydroNode::ChainFirst { .. } => {
3241 let second_ident = ident_stack.pop().unwrap();
3242 let first_ident = ident_stack.pop().unwrap();
3243
3244 let chain_ident =
3245 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3246
3247 match builders_or_callback {
3248 BuildersOrCallback::Builders(graph_builders) => {
3249 let builder = graph_builders.get_dfir_mut(&out_location);
3250 builder.add_dfir(
3251 parse_quote! {
3252 #chain_ident = chain_first_n(1);
3253 #first_ident -> [0]#chain_ident;
3254 #second_ident -> [1]#chain_ident;
3255 },
3256 None,
3257 Some(&next_stmt_id.to_string()),
3258 );
3259 }
3260 BuildersOrCallback::Callback(_, node_callback) => {
3261 node_callback(node, next_stmt_id);
3262 }
3263 }
3264
3265 *next_stmt_id += 1;
3266
3267 ident_stack.push(chain_ident);
3268 }
3269
3270 HydroNode::CrossSingleton { right, .. } => {
3271 let right_ident = ident_stack.pop().unwrap();
3272 let left_ident = ident_stack.pop().unwrap();
3273
3274 let cross_ident =
3275 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3276
3277 match builders_or_callback {
3278 BuildersOrCallback::Builders(graph_builders) => {
3279 let builder = graph_builders.get_dfir_mut(&out_location);
3280
3281 if right.metadata().location_id.is_top_level()
3282 && right.metadata().collection_kind.is_bounded()
3283 {
3284 builder.add_dfir(
3285 parse_quote! {
3286 #cross_ident = cross_singleton();
3287 #left_ident -> [input]#cross_ident;
3288 #right_ident -> persist::<'static>() -> [single]#cross_ident;
3289 },
3290 None,
3291 Some(&next_stmt_id.to_string()),
3292 );
3293 } else {
3294 builder.add_dfir(
3295 parse_quote! {
3296 #cross_ident = cross_singleton();
3297 #left_ident -> [input]#cross_ident;
3298 #right_ident -> [single]#cross_ident;
3299 },
3300 None,
3301 Some(&next_stmt_id.to_string()),
3302 );
3303 }
3304 }
3305 BuildersOrCallback::Callback(_, node_callback) => {
3306 node_callback(node, next_stmt_id);
3307 }
3308 }
3309
3310 *next_stmt_id += 1;
3311
3312 ident_stack.push(cross_ident);
3313 }
3314
3315 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3316 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3317 parse_quote!(cross_join_multiset)
3318 } else {
3319 parse_quote!(join_multiset)
3320 };
3321
3322 let (HydroNode::CrossProduct { left, right, .. }
3323 | HydroNode::Join { left, right, .. }) = node
3324 else {
3325 unreachable!()
3326 };
3327
3328 let is_top_level = left.metadata().location_id.is_top_level()
3329 && right.metadata().location_id.is_top_level();
3330 let left_lifetime = if left.metadata().location_id.is_top_level() {
3331 quote!('static)
3332 } else {
3333 quote!('tick)
3334 };
3335
3336 let right_lifetime = if right.metadata().location_id.is_top_level() {
3337 quote!('static)
3338 } else {
3339 quote!('tick)
3340 };
3341
3342 let right_ident = ident_stack.pop().unwrap();
3343 let left_ident = ident_stack.pop().unwrap();
3344
3345 let stream_ident =
3346 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3347
3348 match builders_or_callback {
3349 BuildersOrCallback::Builders(graph_builders) => {
3350 let builder = graph_builders.get_dfir_mut(&out_location);
3351 builder.add_dfir(
3352 if is_top_level {
3353 parse_quote! {
3356 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3357 #left_ident -> [0]#stream_ident;
3358 #right_ident -> [1]#stream_ident;
3359 }
3360 } else {
3361 parse_quote! {
3362 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3363 #left_ident -> [0]#stream_ident;
3364 #right_ident -> [1]#stream_ident;
3365 }
3366 }
3367 ,
3368 None,
3369 Some(&next_stmt_id.to_string()),
3370 );
3371 }
3372 BuildersOrCallback::Callback(_, node_callback) => {
3373 node_callback(node, next_stmt_id);
3374 }
3375 }
3376
3377 *next_stmt_id += 1;
3378
3379 ident_stack.push(stream_ident);
3380 }
3381
3382 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3383 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3384 parse_quote!(difference)
3385 } else {
3386 parse_quote!(anti_join)
3387 };
3388
3389 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3390 node
3391 else {
3392 unreachable!()
3393 };
3394
3395 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3396 quote!('static)
3397 } else {
3398 quote!('tick)
3399 };
3400
3401 let neg_ident = ident_stack.pop().unwrap();
3402 let pos_ident = ident_stack.pop().unwrap();
3403
3404 let stream_ident =
3405 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3406
3407 match builders_or_callback {
3408 BuildersOrCallback::Builders(graph_builders) => {
3409 let builder = graph_builders.get_dfir_mut(&out_location);
3410 builder.add_dfir(
3411 parse_quote! {
3412 #stream_ident = #operator::<'tick, #neg_lifetime>();
3413 #pos_ident -> [pos]#stream_ident;
3414 #neg_ident -> [neg]#stream_ident;
3415 },
3416 None,
3417 Some(&next_stmt_id.to_string()),
3418 );
3419 }
3420 BuildersOrCallback::Callback(_, node_callback) => {
3421 node_callback(node, next_stmt_id);
3422 }
3423 }
3424
3425 *next_stmt_id += 1;
3426
3427 ident_stack.push(stream_ident);
3428 }
3429
3430 HydroNode::JoinHalf { .. } => {
3431 let HydroNode::JoinHalf { right, .. } = node else {
3432 unreachable!()
3433 };
3434
3435 assert!(
3436 right.metadata().collection_kind.is_bounded(),
3437 "JoinHalf requires the right (build) side to be Bounded, got {:?}",
3438 right.metadata().collection_kind
3439 );
3440
3441 let build_lifetime = if right.metadata().location_id.is_top_level() {
3442 quote!('static)
3443 } else {
3444 quote!('tick)
3445 };
3446
3447 let build_ident = ident_stack.pop().unwrap();
3448 let probe_ident = ident_stack.pop().unwrap();
3449
3450 let stream_ident =
3451 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3452
3453 match builders_or_callback {
3454 BuildersOrCallback::Builders(graph_builders) => {
3455 let builder = graph_builders.get_dfir_mut(&out_location);
3456 builder.add_dfir(
3457 parse_quote! {
3458 #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
3459 #probe_ident -> [probe]#stream_ident;
3460 #build_ident -> [build]#stream_ident;
3461 },
3462 None,
3463 Some(&next_stmt_id.to_string()),
3464 );
3465 }
3466 BuildersOrCallback::Callback(_, node_callback) => {
3467 node_callback(node, next_stmt_id);
3468 }
3469 }
3470
3471 *next_stmt_id += 1;
3472
3473 ident_stack.push(stream_ident);
3474 }
3475
3476 HydroNode::ResolveFutures { .. } => {
3477 let input_ident = ident_stack.pop().unwrap();
3478
3479 let futures_ident =
3480 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3481
3482 match builders_or_callback {
3483 BuildersOrCallback::Builders(graph_builders) => {
3484 let builder = graph_builders.get_dfir_mut(&out_location);
3485 builder.add_dfir(
3486 parse_quote! {
3487 #futures_ident = #input_ident -> resolve_futures();
3488 },
3489 None,
3490 Some(&next_stmt_id.to_string()),
3491 );
3492 }
3493 BuildersOrCallback::Callback(_, node_callback) => {
3494 node_callback(node, next_stmt_id);
3495 }
3496 }
3497
3498 *next_stmt_id += 1;
3499
3500 ident_stack.push(futures_ident);
3501 }
3502
3503 HydroNode::ResolveFuturesBlocking { .. } => {
3504 let input_ident = ident_stack.pop().unwrap();
3505
3506 let futures_ident =
3507 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3508
3509 match builders_or_callback {
3510 BuildersOrCallback::Builders(graph_builders) => {
3511 let builder = graph_builders.get_dfir_mut(&out_location);
3512 builder.add_dfir(
3513 parse_quote! {
3514 #futures_ident = #input_ident -> resolve_futures_blocking();
3515 },
3516 None,
3517 Some(&next_stmt_id.to_string()),
3518 );
3519 }
3520 BuildersOrCallback::Callback(_, node_callback) => {
3521 node_callback(node, next_stmt_id);
3522 }
3523 }
3524
3525 *next_stmt_id += 1;
3526
3527 ident_stack.push(futures_ident);
3528 }
3529
3530 HydroNode::ResolveFuturesOrdered { .. } => {
3531 let input_ident = ident_stack.pop().unwrap();
3532
3533 let futures_ident =
3534 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3535
3536 match builders_or_callback {
3537 BuildersOrCallback::Builders(graph_builders) => {
3538 let builder = graph_builders.get_dfir_mut(&out_location);
3539 builder.add_dfir(
3540 parse_quote! {
3541 #futures_ident = #input_ident -> resolve_futures_ordered();
3542 },
3543 None,
3544 Some(&next_stmt_id.to_string()),
3545 );
3546 }
3547 BuildersOrCallback::Callback(_, node_callback) => {
3548 node_callback(node, next_stmt_id);
3549 }
3550 }
3551
3552 *next_stmt_id += 1;
3553
3554 ident_stack.push(futures_ident);
3555 }
3556
3557 HydroNode::Map { f, .. } => {
3558 let input_ident = ident_stack.pop().unwrap();
3559
3560 let map_ident =
3561 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3562
3563 match builders_or_callback {
3564 BuildersOrCallback::Builders(graph_builders) => {
3565 let builder = graph_builders.get_dfir_mut(&out_location);
3566 builder.add_dfir(
3567 parse_quote! {
3568 #map_ident = #input_ident -> map(#f);
3569 },
3570 None,
3571 Some(&next_stmt_id.to_string()),
3572 );
3573 }
3574 BuildersOrCallback::Callback(_, node_callback) => {
3575 node_callback(node, next_stmt_id);
3576 }
3577 }
3578
3579 *next_stmt_id += 1;
3580
3581 ident_stack.push(map_ident);
3582 }
3583
3584 HydroNode::FlatMap { f, .. } => {
3585 let input_ident = ident_stack.pop().unwrap();
3586
3587 let flat_map_ident =
3588 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3589
3590 match builders_or_callback {
3591 BuildersOrCallback::Builders(graph_builders) => {
3592 let builder = graph_builders.get_dfir_mut(&out_location);
3593 builder.add_dfir(
3594 parse_quote! {
3595 #flat_map_ident = #input_ident -> flat_map(#f);
3596 },
3597 None,
3598 Some(&next_stmt_id.to_string()),
3599 );
3600 }
3601 BuildersOrCallback::Callback(_, node_callback) => {
3602 node_callback(node, next_stmt_id);
3603 }
3604 }
3605
3606 *next_stmt_id += 1;
3607
3608 ident_stack.push(flat_map_ident);
3609 }
3610
3611 HydroNode::FlatMapStreamBlocking { f, .. } => {
3612 let input_ident = ident_stack.pop().unwrap();
3613
3614 let flat_map_stream_blocking_ident =
3615 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3616
3617 match builders_or_callback {
3618 BuildersOrCallback::Builders(graph_builders) => {
3619 let builder = graph_builders.get_dfir_mut(&out_location);
3620 builder.add_dfir(
3621 parse_quote! {
3622 #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f);
3623 },
3624 None,
3625 Some(&next_stmt_id.to_string()),
3626 );
3627 }
3628 BuildersOrCallback::Callback(_, node_callback) => {
3629 node_callback(node, next_stmt_id);
3630 }
3631 }
3632
3633 *next_stmt_id += 1;
3634
3635 ident_stack.push(flat_map_stream_blocking_ident);
3636 }
3637
3638 HydroNode::Filter { f, .. } => {
3639 let input_ident = ident_stack.pop().unwrap();
3640
3641 let filter_ident =
3642 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3643
3644 match builders_or_callback {
3645 BuildersOrCallback::Builders(graph_builders) => {
3646 let builder = graph_builders.get_dfir_mut(&out_location);
3647 builder.add_dfir(
3648 parse_quote! {
3649 #filter_ident = #input_ident -> filter(#f);
3650 },
3651 None,
3652 Some(&next_stmt_id.to_string()),
3653 );
3654 }
3655 BuildersOrCallback::Callback(_, node_callback) => {
3656 node_callback(node, next_stmt_id);
3657 }
3658 }
3659
3660 *next_stmt_id += 1;
3661
3662 ident_stack.push(filter_ident);
3663 }
3664
3665 HydroNode::FilterMap { f, .. } => {
3666 let input_ident = ident_stack.pop().unwrap();
3667
3668 let filter_map_ident =
3669 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3670
3671 match builders_or_callback {
3672 BuildersOrCallback::Builders(graph_builders) => {
3673 let builder = graph_builders.get_dfir_mut(&out_location);
3674 builder.add_dfir(
3675 parse_quote! {
3676 #filter_map_ident = #input_ident -> filter_map(#f);
3677 },
3678 None,
3679 Some(&next_stmt_id.to_string()),
3680 );
3681 }
3682 BuildersOrCallback::Callback(_, node_callback) => {
3683 node_callback(node, next_stmt_id);
3684 }
3685 }
3686
3687 *next_stmt_id += 1;
3688
3689 ident_stack.push(filter_map_ident);
3690 }
3691
3692 HydroNode::Sort { .. } => {
3693 let input_ident = ident_stack.pop().unwrap();
3694
3695 let sort_ident =
3696 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3697
3698 match builders_or_callback {
3699 BuildersOrCallback::Builders(graph_builders) => {
3700 let builder = graph_builders.get_dfir_mut(&out_location);
3701 builder.add_dfir(
3702 parse_quote! {
3703 #sort_ident = #input_ident -> sort();
3704 },
3705 None,
3706 Some(&next_stmt_id.to_string()),
3707 );
3708 }
3709 BuildersOrCallback::Callback(_, node_callback) => {
3710 node_callback(node, next_stmt_id);
3711 }
3712 }
3713
3714 *next_stmt_id += 1;
3715
3716 ident_stack.push(sort_ident);
3717 }
3718
3719 HydroNode::DeferTick { .. } => {
3720 let input_ident = ident_stack.pop().unwrap();
3721
3722 let defer_tick_ident =
3723 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3724
3725 match builders_or_callback {
3726 BuildersOrCallback::Builders(graph_builders) => {
3727 let builder = graph_builders.get_dfir_mut(&out_location);
3728 builder.add_dfir(
3729 parse_quote! {
3730 #defer_tick_ident = #input_ident -> defer_tick_lazy();
3731 },
3732 None,
3733 Some(&next_stmt_id.to_string()),
3734 );
3735 }
3736 BuildersOrCallback::Callback(_, node_callback) => {
3737 node_callback(node, next_stmt_id);
3738 }
3739 }
3740
3741 *next_stmt_id += 1;
3742
3743 ident_stack.push(defer_tick_ident);
3744 }
3745
3746 HydroNode::Enumerate { input, .. } => {
3747 let input_ident = ident_stack.pop().unwrap();
3748
3749 let enumerate_ident =
3750 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3751
3752 match builders_or_callback {
3753 BuildersOrCallback::Builders(graph_builders) => {
3754 let builder = graph_builders.get_dfir_mut(&out_location);
3755 let lifetime = if input.metadata().location_id.is_top_level() {
3756 quote!('static)
3757 } else {
3758 quote!('tick)
3759 };
3760 builder.add_dfir(
3761 parse_quote! {
3762 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3763 },
3764 None,
3765 Some(&next_stmt_id.to_string()),
3766 );
3767 }
3768 BuildersOrCallback::Callback(_, node_callback) => {
3769 node_callback(node, next_stmt_id);
3770 }
3771 }
3772
3773 *next_stmt_id += 1;
3774
3775 ident_stack.push(enumerate_ident);
3776 }
3777
3778 HydroNode::Inspect { f, .. } => {
3779 let input_ident = ident_stack.pop().unwrap();
3780
3781 let inspect_ident =
3782 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3783
3784 match builders_or_callback {
3785 BuildersOrCallback::Builders(graph_builders) => {
3786 let builder = graph_builders.get_dfir_mut(&out_location);
3787 builder.add_dfir(
3788 parse_quote! {
3789 #inspect_ident = #input_ident -> inspect(#f);
3790 },
3791 None,
3792 Some(&next_stmt_id.to_string()),
3793 );
3794 }
3795 BuildersOrCallback::Callback(_, node_callback) => {
3796 node_callback(node, next_stmt_id);
3797 }
3798 }
3799
3800 *next_stmt_id += 1;
3801
3802 ident_stack.push(inspect_ident);
3803 }
3804
3805 HydroNode::Unique { input, .. } => {
3806 let input_ident = ident_stack.pop().unwrap();
3807
3808 let unique_ident =
3809 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3810
3811 match builders_or_callback {
3812 BuildersOrCallback::Builders(graph_builders) => {
3813 let builder = graph_builders.get_dfir_mut(&out_location);
3814 let lifetime = if input.metadata().location_id.is_top_level() {
3815 quote!('static)
3816 } else {
3817 quote!('tick)
3818 };
3819
3820 builder.add_dfir(
3821 parse_quote! {
3822 #unique_ident = #input_ident -> unique::<#lifetime>();
3823 },
3824 None,
3825 Some(&next_stmt_id.to_string()),
3826 );
3827 }
3828 BuildersOrCallback::Callback(_, node_callback) => {
3829 node_callback(node, next_stmt_id);
3830 }
3831 }
3832
3833 *next_stmt_id += 1;
3834
3835 ident_stack.push(unique_ident);
3836 }
3837
3838 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
3839 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3840 if input.metadata().location_id.is_top_level()
3841 && input.metadata().collection_kind.is_bounded()
3842 {
3843 parse_quote!(fold_no_replay)
3844 } else {
3845 parse_quote!(fold)
3846 }
3847 } else if matches!(node, HydroNode::Scan { .. }) {
3848 parse_quote!(scan)
3849 } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
3850 parse_quote!(scan_async_blocking)
3851 } else if let HydroNode::FoldKeyed { input, .. } = node {
3852 if input.metadata().location_id.is_top_level()
3853 && input.metadata().collection_kind.is_bounded()
3854 {
3855 todo!("Fold keyed on a top-level bounded collection is not yet supported")
3856 } else {
3857 parse_quote!(fold_keyed)
3858 }
3859 } else {
3860 unreachable!()
3861 };
3862
3863 let (HydroNode::Fold { input, .. }
3864 | HydroNode::FoldKeyed { input, .. }
3865 | HydroNode::Scan { input, .. }
3866 | HydroNode::ScanAsyncBlocking { input, .. }) = node
3867 else {
3868 unreachable!()
3869 };
3870
3871 let lifetime = if input.metadata().location_id.is_top_level() {
3872 quote!('static)
3873 } else {
3874 quote!('tick)
3875 };
3876
3877 let input_ident = ident_stack.pop().unwrap();
3878
3879 let (HydroNode::Fold { init, acc, .. }
3880 | HydroNode::FoldKeyed { init, acc, .. }
3881 | HydroNode::Scan { init, acc, .. }
3882 | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
3883 else {
3884 unreachable!()
3885 };
3886
3887 let fold_ident =
3888 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3889
3890 match builders_or_callback {
3891 BuildersOrCallback::Builders(graph_builders) => {
3892 if matches!(node, HydroNode::Fold { .. })
3893 && node.metadata().location_id.is_top_level()
3894 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3895 && graph_builders.singleton_intermediates()
3896 && !node.metadata().collection_kind.is_bounded()
3897 {
3898 let builder = graph_builders.get_dfir_mut(&out_location);
3899
3900 let acc: syn::Expr = parse_quote!({
3901 let mut __inner = #acc;
3902 move |__state, __value| {
3903 __inner(__state, __value);
3904 Some(__state.clone())
3905 }
3906 });
3907
3908 builder.add_dfir(
3909 parse_quote! {
3910 source_iter([(#init)()]) -> [0]#fold_ident;
3911 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3912 #fold_ident = chain();
3913 },
3914 None,
3915 Some(&next_stmt_id.to_string()),
3916 );
3917 } else if matches!(node, HydroNode::FoldKeyed { .. })
3918 && node.metadata().location_id.is_top_level()
3919 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3920 && graph_builders.singleton_intermediates()
3921 && !node.metadata().collection_kind.is_bounded()
3922 {
3923 let builder = graph_builders.get_dfir_mut(&out_location);
3924
3925 let acc: syn::Expr = parse_quote!({
3926 let mut __init = #init;
3927 let mut __inner = #acc;
3928 move |__state, __kv: (_, _)| {
3929 let __state = __state
3931 .entry(::std::clone::Clone::clone(&__kv.0))
3932 .or_insert_with(|| (__init)());
3933 __inner(__state, __kv.1);
3934 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3935 }
3936 });
3937
3938 builder.add_dfir(
3939 parse_quote! {
3940 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3941 },
3942 None,
3943 Some(&next_stmt_id.to_string()),
3944 );
3945 } else {
3946 let builder = graph_builders.get_dfir_mut(&out_location);
3947 builder.add_dfir(
3948 parse_quote! {
3949 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3950 },
3951 None,
3952 Some(&next_stmt_id.to_string()),
3953 );
3954 }
3955 }
3956 BuildersOrCallback::Callback(_, node_callback) => {
3957 node_callback(node, next_stmt_id);
3958 }
3959 }
3960
3961 *next_stmt_id += 1;
3962
3963 ident_stack.push(fold_ident);
3964 }
3965
3966 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3967 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3968 if input.metadata().location_id.is_top_level()
3969 && input.metadata().collection_kind.is_bounded()
3970 {
3971 parse_quote!(reduce_no_replay)
3972 } else {
3973 parse_quote!(reduce)
3974 }
3975 } else if let HydroNode::ReduceKeyed { input, .. } = node {
3976 if input.metadata().location_id.is_top_level()
3977 && input.metadata().collection_kind.is_bounded()
3978 {
3979 todo!(
3980 "Calling keyed reduce on a top-level bounded collection is not supported"
3981 )
3982 } else {
3983 parse_quote!(reduce_keyed)
3984 }
3985 } else {
3986 unreachable!()
3987 };
3988
3989 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3990 else {
3991 unreachable!()
3992 };
3993
3994 let lifetime = if input.metadata().location_id.is_top_level() {
3995 quote!('static)
3996 } else {
3997 quote!('tick)
3998 };
3999
4000 let input_ident = ident_stack.pop().unwrap();
4001
4002 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4003 else {
4004 unreachable!()
4005 };
4006
4007 let reduce_ident =
4008 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4009
4010 match builders_or_callback {
4011 BuildersOrCallback::Builders(graph_builders) => {
4012 if matches!(node, HydroNode::Reduce { .. })
4013 && node.metadata().location_id.is_top_level()
4014 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4015 && graph_builders.singleton_intermediates()
4016 && !node.metadata().collection_kind.is_bounded()
4017 {
4018 todo!(
4019 "Reduce with optional intermediates is not yet supported in simulator"
4020 );
4021 } else if matches!(node, HydroNode::ReduceKeyed { .. })
4022 && node.metadata().location_id.is_top_level()
4023 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4024 && graph_builders.singleton_intermediates()
4025 && !node.metadata().collection_kind.is_bounded()
4026 {
4027 todo!(
4028 "Reduce keyed with optional intermediates is not yet supported in simulator"
4029 );
4030 } else {
4031 let builder = graph_builders.get_dfir_mut(&out_location);
4032 builder.add_dfir(
4033 parse_quote! {
4034 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
4035 },
4036 None,
4037 Some(&next_stmt_id.to_string()),
4038 );
4039 }
4040 }
4041 BuildersOrCallback::Callback(_, node_callback) => {
4042 node_callback(node, next_stmt_id);
4043 }
4044 }
4045
4046 *next_stmt_id += 1;
4047
4048 ident_stack.push(reduce_ident);
4049 }
4050
4051 HydroNode::ReduceKeyedWatermark {
4052 f,
4053 input,
4054 metadata,
4055 ..
4056 } => {
4057 let lifetime = if input.metadata().location_id.is_top_level() {
4058 quote!('static)
4059 } else {
4060 quote!('tick)
4061 };
4062
4063 let watermark_ident = ident_stack.pop().unwrap();
4065 let input_ident = ident_stack.pop().unwrap();
4066
4067 let chain_ident = syn::Ident::new(
4068 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
4069 Span::call_site(),
4070 );
4071
4072 let fold_ident =
4073 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4074
4075 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4076 && input.metadata().collection_kind.is_bounded()
4077 {
4078 parse_quote!(fold_no_replay)
4079 } else {
4080 parse_quote!(fold)
4081 };
4082
4083 match builders_or_callback {
4084 BuildersOrCallback::Builders(graph_builders) => {
4085 if metadata.location_id.is_top_level()
4086 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4087 && graph_builders.singleton_intermediates()
4088 && !metadata.collection_kind.is_bounded()
4089 {
4090 todo!(
4091 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4092 )
4093 } else {
4094 let builder = graph_builders.get_dfir_mut(&out_location);
4095 builder.add_dfir(
4096 parse_quote! {
4097 #chain_ident = chain();
4098 #input_ident
4099 -> map(|x| (Some(x), None))
4100 -> [0]#chain_ident;
4101 #watermark_ident
4102 -> map(|watermark| (None, Some(watermark)))
4103 -> [1]#chain_ident;
4104
4105 #fold_ident = #chain_ident
4106 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4107 let __reduce_keyed_fn = #f;
4108 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4109 if let Some((k, v)) = opt_payload {
4110 if let Some(curr_watermark) = *opt_curr_watermark {
4111 if k < curr_watermark {
4112 return;
4113 }
4114 }
4115 match map.entry(k) {
4116 ::std::collections::hash_map::Entry::Vacant(e) => {
4117 e.insert(v);
4118 }
4119 ::std::collections::hash_map::Entry::Occupied(mut e) => {
4120 __reduce_keyed_fn(e.get_mut(), v);
4121 }
4122 }
4123 } else {
4124 let watermark = opt_watermark.unwrap();
4125 if let Some(curr_watermark) = *opt_curr_watermark {
4126 if watermark <= curr_watermark {
4127 return;
4128 }
4129 }
4130 map.retain(|k, _| *k >= watermark);
4131 *opt_curr_watermark = Some(watermark);
4132 }
4133 }
4134 })
4135 -> flat_map(|(map, _curr_watermark)| map);
4136 },
4137 None,
4138 Some(&next_stmt_id.to_string()),
4139 );
4140 }
4141 }
4142 BuildersOrCallback::Callback(_, node_callback) => {
4143 node_callback(node, next_stmt_id);
4144 }
4145 }
4146
4147 *next_stmt_id += 1;
4148
4149 ident_stack.push(fold_ident);
4150 }
4151
4152 HydroNode::Network {
4153 networking_info,
4154 serialize_fn: serialize_pipeline,
4155 instantiate_fn,
4156 deserialize_fn: deserialize_pipeline,
4157 input,
4158 ..
4159 } => {
4160 let input_ident = ident_stack.pop().unwrap();
4161
4162 let receiver_stream_ident =
4163 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4164
4165 match builders_or_callback {
4166 BuildersOrCallback::Builders(graph_builders) => {
4167 let (sink_expr, source_expr) = match instantiate_fn {
4168 DebugInstantiate::Building => (
4169 syn::parse_quote!(DUMMY_SINK),
4170 syn::parse_quote!(DUMMY_SOURCE),
4171 ),
4172
4173 DebugInstantiate::Finalized(finalized) => {
4174 (finalized.sink.clone(), finalized.source.clone())
4175 }
4176 };
4177
4178 graph_builders.create_network(
4179 &input.metadata().location_id,
4180 &out_location,
4181 input_ident,
4182 &receiver_stream_ident,
4183 serialize_pipeline.as_ref(),
4184 sink_expr,
4185 source_expr,
4186 deserialize_pipeline.as_ref(),
4187 *next_stmt_id,
4188 networking_info,
4189 );
4190 }
4191 BuildersOrCallback::Callback(_, node_callback) => {
4192 node_callback(node, next_stmt_id);
4193 }
4194 }
4195
4196 *next_stmt_id += 1;
4197
4198 ident_stack.push(receiver_stream_ident);
4199 }
4200
4201 HydroNode::ExternalInput {
4202 instantiate_fn,
4203 deserialize_fn: deserialize_pipeline,
4204 ..
4205 } => {
4206 let receiver_stream_ident =
4207 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4208
4209 match builders_or_callback {
4210 BuildersOrCallback::Builders(graph_builders) => {
4211 let (_, source_expr) = match instantiate_fn {
4212 DebugInstantiate::Building => (
4213 syn::parse_quote!(DUMMY_SINK),
4214 syn::parse_quote!(DUMMY_SOURCE),
4215 ),
4216
4217 DebugInstantiate::Finalized(finalized) => {
4218 (finalized.sink.clone(), finalized.source.clone())
4219 }
4220 };
4221
4222 graph_builders.create_external_source(
4223 &out_location,
4224 source_expr,
4225 &receiver_stream_ident,
4226 deserialize_pipeline.as_ref(),
4227 *next_stmt_id,
4228 );
4229 }
4230 BuildersOrCallback::Callback(_, node_callback) => {
4231 node_callback(node, next_stmt_id);
4232 }
4233 }
4234
4235 *next_stmt_id += 1;
4236
4237 ident_stack.push(receiver_stream_ident);
4238 }
4239
4240 HydroNode::Counter {
4241 tag,
4242 duration,
4243 prefix,
4244 ..
4245 } => {
4246 let input_ident = ident_stack.pop().unwrap();
4247
4248 let counter_ident =
4249 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4250
4251 match builders_or_callback {
4252 BuildersOrCallback::Builders(graph_builders) => {
4253 let arg = format!("{}({})", prefix, tag);
4254 let builder = graph_builders.get_dfir_mut(&out_location);
4255 builder.add_dfir(
4256 parse_quote! {
4257 #counter_ident = #input_ident -> _counter(#arg, #duration);
4258 },
4259 None,
4260 Some(&next_stmt_id.to_string()),
4261 );
4262 }
4263 BuildersOrCallback::Callback(_, node_callback) => {
4264 node_callback(node, next_stmt_id);
4265 }
4266 }
4267
4268 *next_stmt_id += 1;
4269
4270 ident_stack.push(counter_ident);
4271 }
4272 }
4273 },
4274 seen_tees,
4275 false,
4276 );
4277
4278 ident_stack
4279 .pop()
4280 .expect("ident_stack should have exactly one element after traversal")
4281 }
4282
4283 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4284 match self {
4285 HydroNode::Placeholder => {
4286 panic!()
4287 }
4288 HydroNode::Cast { .. }
4289 | HydroNode::ObserveNonDet { .. }
4290 | HydroNode::AssertIsConsistent { .. } => {}
4291 HydroNode::Source { source, .. } => match source {
4292 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4293 HydroSource::ExternalNetwork()
4294 | HydroSource::Spin()
4295 | HydroSource::ClusterMembers(_, _)
4296 | HydroSource::Embedded(_)
4297 | HydroSource::EmbeddedSingleton(_) => {} },
4299 HydroNode::SingletonSource { value, .. } => {
4300 transform(value);
4301 }
4302 HydroNode::CycleSource { .. }
4303 | HydroNode::Tee { .. }
4304 | HydroNode::YieldConcat { .. }
4305 | HydroNode::BeginAtomic { .. }
4306 | HydroNode::EndAtomic { .. }
4307 | HydroNode::Batch { .. }
4308 | HydroNode::Chain { .. }
4309 | HydroNode::ChainFirst { .. }
4310 | HydroNode::CrossProduct { .. }
4311 | HydroNode::CrossSingleton { .. }
4312 | HydroNode::ResolveFutures { .. }
4313 | HydroNode::ResolveFuturesBlocking { .. }
4314 | HydroNode::ResolveFuturesOrdered { .. }
4315 | HydroNode::Join { .. }
4316 | HydroNode::JoinHalf { .. }
4317 | HydroNode::Difference { .. }
4318 | HydroNode::AntiJoin { .. }
4319 | HydroNode::DeferTick { .. }
4320 | HydroNode::Enumerate { .. }
4321 | HydroNode::Unique { .. }
4322 | HydroNode::Sort { .. } => {}
4323 HydroNode::Map { f, .. }
4324 | HydroNode::FlatMap { f, .. }
4325 | HydroNode::FlatMapStreamBlocking { f, .. }
4326 | HydroNode::Filter { f, .. }
4327 | HydroNode::FilterMap { f, .. }
4328 | HydroNode::Inspect { f, .. }
4329 | HydroNode::Partition { f, .. }
4330 | HydroNode::Reduce { f, .. }
4331 | HydroNode::ReduceKeyed { f, .. }
4332 | HydroNode::ReduceKeyedWatermark { f, .. } => {
4333 transform(f);
4334 }
4335 HydroNode::Fold { init, acc, .. }
4336 | HydroNode::Scan { init, acc, .. }
4337 | HydroNode::ScanAsyncBlocking { init, acc, .. }
4338 | HydroNode::FoldKeyed { init, acc, .. } => {
4339 transform(init);
4340 transform(acc);
4341 }
4342 HydroNode::Network {
4343 serialize_fn,
4344 deserialize_fn,
4345 ..
4346 } => {
4347 if let Some(serialize_fn) = serialize_fn {
4348 transform(serialize_fn);
4349 }
4350 if let Some(deserialize_fn) = deserialize_fn {
4351 transform(deserialize_fn);
4352 }
4353 }
4354 HydroNode::ExternalInput { deserialize_fn, .. } => {
4355 if let Some(deserialize_fn) = deserialize_fn {
4356 transform(deserialize_fn);
4357 }
4358 }
4359 HydroNode::Counter { duration, .. } => {
4360 transform(duration);
4361 }
4362 }
4363 }
4364
4365 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4366 &self.metadata().op
4367 }
4368
4369 pub fn metadata(&self) -> &HydroIrMetadata {
4370 match self {
4371 HydroNode::Placeholder => {
4372 panic!()
4373 }
4374 HydroNode::Cast { metadata, .. } => metadata,
4375 HydroNode::ObserveNonDet { metadata, .. } => metadata,
4376 HydroNode::AssertIsConsistent { metadata, .. } => metadata,
4377 HydroNode::Source { metadata, .. } => metadata,
4378 HydroNode::SingletonSource { metadata, .. } => metadata,
4379 HydroNode::CycleSource { metadata, .. } => metadata,
4380 HydroNode::Tee { metadata, .. } => metadata,
4381 HydroNode::Partition { metadata, .. } => metadata,
4382 HydroNode::YieldConcat { metadata, .. } => metadata,
4383 HydroNode::BeginAtomic { metadata, .. } => metadata,
4384 HydroNode::EndAtomic { metadata, .. } => metadata,
4385 HydroNode::Batch { metadata, .. } => metadata,
4386 HydroNode::Chain { metadata, .. } => metadata,
4387 HydroNode::ChainFirst { metadata, .. } => metadata,
4388 HydroNode::CrossProduct { metadata, .. } => metadata,
4389 HydroNode::CrossSingleton { metadata, .. } => metadata,
4390 HydroNode::Join { metadata, .. } => metadata,
4391 HydroNode::JoinHalf { metadata, .. } => metadata,
4392 HydroNode::Difference { metadata, .. } => metadata,
4393 HydroNode::AntiJoin { metadata, .. } => metadata,
4394 HydroNode::ResolveFutures { metadata, .. } => metadata,
4395 HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4396 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4397 HydroNode::Map { metadata, .. } => metadata,
4398 HydroNode::FlatMap { metadata, .. } => metadata,
4399 HydroNode::FlatMapStreamBlocking { metadata, .. } => metadata,
4400 HydroNode::Filter { metadata, .. } => metadata,
4401 HydroNode::FilterMap { metadata, .. } => metadata,
4402 HydroNode::DeferTick { metadata, .. } => metadata,
4403 HydroNode::Enumerate { metadata, .. } => metadata,
4404 HydroNode::Inspect { metadata, .. } => metadata,
4405 HydroNode::Unique { metadata, .. } => metadata,
4406 HydroNode::Sort { metadata, .. } => metadata,
4407 HydroNode::Scan { metadata, .. } => metadata,
4408 HydroNode::ScanAsyncBlocking { metadata, .. } => metadata,
4409 HydroNode::Fold { metadata, .. } => metadata,
4410 HydroNode::FoldKeyed { metadata, .. } => metadata,
4411 HydroNode::Reduce { metadata, .. } => metadata,
4412 HydroNode::ReduceKeyed { metadata, .. } => metadata,
4413 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4414 HydroNode::ExternalInput { metadata, .. } => metadata,
4415 HydroNode::Network { metadata, .. } => metadata,
4416 HydroNode::Counter { metadata, .. } => metadata,
4417 }
4418 }
4419
4420 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
4421 &mut self.metadata_mut().op
4422 }
4423
4424 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
4425 match self {
4426 HydroNode::Placeholder => {
4427 panic!()
4428 }
4429 HydroNode::Cast { metadata, .. } => metadata,
4430 HydroNode::ObserveNonDet { metadata, .. } => metadata,
4431 HydroNode::AssertIsConsistent { metadata, .. } => metadata,
4432 HydroNode::Source { metadata, .. } => metadata,
4433 HydroNode::SingletonSource { metadata, .. } => metadata,
4434 HydroNode::CycleSource { metadata, .. } => metadata,
4435 HydroNode::Tee { metadata, .. } => metadata,
4436 HydroNode::Partition { metadata, .. } => metadata,
4437 HydroNode::YieldConcat { metadata, .. } => metadata,
4438 HydroNode::BeginAtomic { metadata, .. } => metadata,
4439 HydroNode::EndAtomic { metadata, .. } => metadata,
4440 HydroNode::Batch { metadata, .. } => metadata,
4441 HydroNode::Chain { metadata, .. } => metadata,
4442 HydroNode::ChainFirst { metadata, .. } => metadata,
4443 HydroNode::CrossProduct { metadata, .. } => metadata,
4444 HydroNode::CrossSingleton { metadata, .. } => metadata,
4445 HydroNode::Join { metadata, .. } => metadata,
4446 HydroNode::JoinHalf { metadata, .. } => metadata,
4447 HydroNode::Difference { metadata, .. } => metadata,
4448 HydroNode::AntiJoin { metadata, .. } => metadata,
4449 HydroNode::ResolveFutures { metadata, .. } => metadata,
4450 HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4451 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4452 HydroNode::Map { metadata, .. } => metadata,
4453 HydroNode::FlatMap { metadata, .. } => metadata,
4454 HydroNode::FlatMapStreamBlocking { metadata, .. } => metadata,
4455 HydroNode::Filter { metadata, .. } => metadata,
4456 HydroNode::FilterMap { metadata, .. } => metadata,
4457 HydroNode::DeferTick { metadata, .. } => metadata,
4458 HydroNode::Enumerate { metadata, .. } => metadata,
4459 HydroNode::Inspect { metadata, .. } => metadata,
4460 HydroNode::Unique { metadata, .. } => metadata,
4461 HydroNode::Sort { metadata, .. } => metadata,
4462 HydroNode::Scan { metadata, .. } => metadata,
4463 HydroNode::ScanAsyncBlocking { metadata, .. } => metadata,
4464 HydroNode::Fold { metadata, .. } => metadata,
4465 HydroNode::FoldKeyed { metadata, .. } => metadata,
4466 HydroNode::Reduce { metadata, .. } => metadata,
4467 HydroNode::ReduceKeyed { metadata, .. } => metadata,
4468 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4469 HydroNode::ExternalInput { metadata, .. } => metadata,
4470 HydroNode::Network { metadata, .. } => metadata,
4471 HydroNode::Counter { metadata, .. } => metadata,
4472 }
4473 }
4474
4475 pub fn input(&self) -> Vec<&HydroNode> {
4476 match self {
4477 HydroNode::Placeholder => {
4478 panic!()
4479 }
4480 HydroNode::Source { .. }
4481 | HydroNode::SingletonSource { .. }
4482 | HydroNode::ExternalInput { .. }
4483 | HydroNode::CycleSource { .. }
4484 | HydroNode::Tee { .. }
4485 | HydroNode::Partition { .. } => {
4486 vec![]
4488 }
4489 HydroNode::Cast { inner, .. }
4490 | HydroNode::ObserveNonDet { inner, .. }
4491 | HydroNode::YieldConcat { inner, .. }
4492 | HydroNode::BeginAtomic { inner, .. }
4493 | HydroNode::EndAtomic { inner, .. }
4494 | HydroNode::Batch { inner, .. }
4495 | HydroNode::AssertIsConsistent { inner, .. } => {
4496 vec![inner]
4497 }
4498 HydroNode::Chain { first, second, .. } => {
4499 vec![first, second]
4500 }
4501 HydroNode::ChainFirst { first, second, .. } => {
4502 vec![first, second]
4503 }
4504 HydroNode::CrossProduct { left, right, .. }
4505 | HydroNode::CrossSingleton { left, right, .. }
4506 | HydroNode::Join { left, right, .. }
4507 | HydroNode::JoinHalf { left, right, .. } => {
4508 vec![left, right]
4509 }
4510 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
4511 vec![pos, neg]
4512 }
4513 HydroNode::Map { input, .. }
4514 | HydroNode::FlatMap { input, .. }
4515 | HydroNode::FlatMapStreamBlocking { input, .. }
4516 | HydroNode::Filter { input, .. }
4517 | HydroNode::FilterMap { input, .. }
4518 | HydroNode::Sort { input, .. }
4519 | HydroNode::DeferTick { input, .. }
4520 | HydroNode::Enumerate { input, .. }
4521 | HydroNode::Inspect { input, .. }
4522 | HydroNode::Unique { input, .. }
4523 | HydroNode::Network { input, .. }
4524 | HydroNode::Counter { input, .. }
4525 | HydroNode::ResolveFutures { input, .. }
4526 | HydroNode::ResolveFuturesBlocking { input, .. }
4527 | HydroNode::ResolveFuturesOrdered { input, .. }
4528 | HydroNode::Fold { input, .. }
4529 | HydroNode::FoldKeyed { input, .. }
4530 | HydroNode::Reduce { input, .. }
4531 | HydroNode::ReduceKeyed { input, .. }
4532 | HydroNode::Scan { input, .. }
4533 | HydroNode::ScanAsyncBlocking { input, .. } => {
4534 vec![input]
4535 }
4536 HydroNode::ReduceKeyedWatermark {
4537 input, watermark, ..
4538 } => {
4539 vec![input, watermark]
4540 }
4541 }
4542 }
4543
4544 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
4545 self.input()
4546 .iter()
4547 .map(|input_node| input_node.metadata())
4548 .collect()
4549 }
4550
4551 pub fn is_shared_with_others(&self) -> bool {
4555 match self {
4556 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
4557 Rc::strong_count(&inner.0) > 1
4558 }
4559 _ => false,
4560 }
4561 }
4562
4563 pub fn print_root(&self) -> String {
4564 match self {
4565 HydroNode::Placeholder => {
4566 panic!()
4567 }
4568 HydroNode::Cast { .. } => "Cast()".to_owned(),
4569 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
4570 HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
4571 HydroNode::Source { source, .. } => format!("Source({:?})", source),
4572 HydroNode::SingletonSource {
4573 value,
4574 first_tick_only,
4575 ..
4576 } => format!(
4577 "SingletonSource({:?}, first_tick_only={})",
4578 value, first_tick_only
4579 ),
4580 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
4581 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
4582 HydroNode::Partition { f, is_true, .. } => {
4583 format!("Partition({:?}, is_true={})", f, is_true)
4584 }
4585 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
4586 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
4587 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
4588 HydroNode::Batch { .. } => "Batch()".to_owned(),
4589 HydroNode::Chain { first, second, .. } => {
4590 format!("Chain({}, {})", first.print_root(), second.print_root())
4591 }
4592 HydroNode::ChainFirst { first, second, .. } => {
4593 format!(
4594 "ChainFirst({}, {})",
4595 first.print_root(),
4596 second.print_root()
4597 )
4598 }
4599 HydroNode::CrossProduct { left, right, .. } => {
4600 format!(
4601 "CrossProduct({}, {})",
4602 left.print_root(),
4603 right.print_root()
4604 )
4605 }
4606 HydroNode::CrossSingleton { left, right, .. } => {
4607 format!(
4608 "CrossSingleton({}, {})",
4609 left.print_root(),
4610 right.print_root()
4611 )
4612 }
4613 HydroNode::Join { left, right, .. } => {
4614 format!("Join({}, {})", left.print_root(), right.print_root())
4615 }
4616 HydroNode::JoinHalf { left, right, .. } => {
4617 format!("JoinHalf({}, {})", left.print_root(), right.print_root())
4618 }
4619 HydroNode::Difference { pos, neg, .. } => {
4620 format!("Difference({}, {})", pos.print_root(), neg.print_root())
4621 }
4622 HydroNode::AntiJoin { pos, neg, .. } => {
4623 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
4624 }
4625 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
4626 HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
4627 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
4628 HydroNode::Map { f, .. } => format!("Map({:?})", f),
4629 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
4630 HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
4631 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
4632 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
4633 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
4634 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
4635 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
4636 HydroNode::Unique { .. } => "Unique()".to_owned(),
4637 HydroNode::Sort { .. } => "Sort()".to_owned(),
4638 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
4639 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
4640 HydroNode::ScanAsyncBlocking { init, acc, .. } => {
4641 format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
4642 }
4643 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
4644 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
4645 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
4646 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
4647 HydroNode::Network { .. } => "Network()".to_owned(),
4648 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
4649 HydroNode::Counter { tag, duration, .. } => {
4650 format!("Counter({:?}, {:?})", tag, duration)
4651 }
4652 }
4653 }
4654}
4655
4656#[cfg(feature = "build")]
4657fn instantiate_network<'a, D>(
4658 env: &mut D::InstantiateEnv,
4659 from_location: &LocationId,
4660 to_location: &LocationId,
4661 processes: &SparseSecondaryMap<LocationKey, D::Process>,
4662 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
4663 name: Option<&str>,
4664 networking_info: &crate::networking::NetworkingInfo,
4665) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
4666where
4667 D: Deploy<'a>,
4668{
4669 let ((sink, source), connect_fn) = match (from_location, to_location) {
4670 (&LocationId::Process(from), &LocationId::Process(to)) => {
4671 let from_node = processes
4672 .get(from)
4673 .unwrap_or_else(|| {
4674 panic!("A process used in the graph was not instantiated: {}", from)
4675 })
4676 .clone();
4677 let to_node = processes
4678 .get(to)
4679 .unwrap_or_else(|| {
4680 panic!("A process used in the graph was not instantiated: {}", to)
4681 })
4682 .clone();
4683
4684 let sink_port = from_node.next_port();
4685 let source_port = to_node.next_port();
4686
4687 (
4688 D::o2o_sink_source(
4689 env,
4690 &from_node,
4691 &sink_port,
4692 &to_node,
4693 &source_port,
4694 name,
4695 networking_info,
4696 ),
4697 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
4698 )
4699 }
4700 (&LocationId::Process(from), &LocationId::Cluster(to, _)) => {
4701 let from_node = processes
4702 .get(from)
4703 .unwrap_or_else(|| {
4704 panic!("A process used in the graph was not instantiated: {}", from)
4705 })
4706 .clone();
4707 let to_node = clusters
4708 .get(to)
4709 .unwrap_or_else(|| {
4710 panic!("A cluster used in the graph was not instantiated: {}", to)
4711 })
4712 .clone();
4713
4714 let sink_port = from_node.next_port();
4715 let source_port = to_node.next_port();
4716
4717 (
4718 D::o2m_sink_source(
4719 env,
4720 &from_node,
4721 &sink_port,
4722 &to_node,
4723 &source_port,
4724 name,
4725 networking_info,
4726 ),
4727 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
4728 )
4729 }
4730 (&LocationId::Cluster(from, _), &LocationId::Process(to)) => {
4731 let from_node = clusters
4732 .get(from)
4733 .unwrap_or_else(|| {
4734 panic!("A cluster used in the graph was not instantiated: {}", from)
4735 })
4736 .clone();
4737 let to_node = processes
4738 .get(to)
4739 .unwrap_or_else(|| {
4740 panic!("A process used in the graph was not instantiated: {}", to)
4741 })
4742 .clone();
4743
4744 let sink_port = from_node.next_port();
4745 let source_port = to_node.next_port();
4746
4747 (
4748 D::m2o_sink_source(
4749 env,
4750 &from_node,
4751 &sink_port,
4752 &to_node,
4753 &source_port,
4754 name,
4755 networking_info,
4756 ),
4757 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
4758 )
4759 }
4760 (&LocationId::Cluster(from, _), &LocationId::Cluster(to, _)) => {
4761 let from_node = clusters
4762 .get(from)
4763 .unwrap_or_else(|| {
4764 panic!("A cluster used in the graph was not instantiated: {}", from)
4765 })
4766 .clone();
4767 let to_node = clusters
4768 .get(to)
4769 .unwrap_or_else(|| {
4770 panic!("A cluster used in the graph was not instantiated: {}", to)
4771 })
4772 .clone();
4773
4774 let sink_port = from_node.next_port();
4775 let source_port = to_node.next_port();
4776
4777 (
4778 D::m2m_sink_source(
4779 env,
4780 &from_node,
4781 &sink_port,
4782 &to_node,
4783 &source_port,
4784 name,
4785 networking_info,
4786 ),
4787 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4788 )
4789 }
4790 (LocationId::Tick(_, _), _) => panic!(),
4791 (_, LocationId::Tick(_, _)) => panic!(),
4792 (LocationId::Atomic(_), _) => panic!(),
4793 (_, LocationId::Atomic(_)) => panic!(),
4794 };
4795 (sink, source, connect_fn)
4796}
4797
4798#[cfg(test)]
4799mod serde_test;
4800
4801#[cfg(test)]
4802mod test {
4803 use std::mem::size_of;
4804
4805 use stageleft::{QuotedWithContext, q};
4806
4807 use super::*;
4808
4809 #[test]
4810 #[cfg_attr(
4811 not(feature = "build"),
4812 ignore = "expects inclusion of feature-gated fields"
4813 )]
4814 fn hydro_node_size() {
4815 assert_eq!(size_of::<HydroNode>(), 248);
4816 }
4817
4818 #[test]
4819 #[cfg_attr(
4820 not(feature = "build"),
4821 ignore = "expects inclusion of feature-gated fields"
4822 )]
4823 fn hydro_root_size() {
4824 assert_eq!(size_of::<HydroRoot>(), 136);
4825 }
4826
4827 #[test]
4828 fn test_simplify_q_macro_basic() {
4829 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4831 let result = simplify_q_macro(simple_expr.clone());
4832 assert_eq!(result, simple_expr);
4833 }
4834
4835 #[test]
4836 fn test_simplify_q_macro_actual_stageleft_call() {
4837 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4839 let result = simplify_q_macro(stageleft_call);
4840 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4843 }
4844
4845 #[test]
4846 fn test_closure_no_pipe_at_start() {
4847 let stageleft_call = q!({
4849 let foo = 123;
4850 move |b: usize| b + foo
4851 })
4852 .splice_fn1_ctx(&());
4853 let result = simplify_q_macro(stageleft_call);
4854 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4855 }
4856}