Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31use crate::location::dynamic::LocationId;
32use crate::location::{LocationKey, NetworkHint};
33
34pub mod backtrace;
35use backtrace::Backtrace;
36
37/// Wrapper that displays only the tokens of a parsed expr.
38///
39/// Boxes `syn::Type` which is ~240 bytes.
40#[derive(Clone, Hash)]
41pub struct DebugExpr(pub Box<syn::Expr>);
42
43impl serde::Serialize for DebugExpr {
44    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
45        serializer.serialize_str(&self.to_string())
46    }
47}
48
49impl From<syn::Expr> for DebugExpr {
50    fn from(expr: syn::Expr) -> Self {
51        Self(Box::new(expr))
52    }
53}
54
55impl Deref for DebugExpr {
56    type Target = syn::Expr;
57
58    fn deref(&self) -> &Self::Target {
59        &self.0
60    }
61}
62
63impl ToTokens for DebugExpr {
64    fn to_tokens(&self, tokens: &mut TokenStream) {
65        self.0.to_tokens(tokens);
66    }
67}
68
69impl Debug for DebugExpr {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        write!(f, "{}", self.0.to_token_stream())
72    }
73}
74
75impl Display for DebugExpr {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        let original = self.0.as_ref().clone();
78        let simplified = simplify_q_macro(original);
79
80        // For now, just use quote formatting without trying to parse as a statement
81        // This avoids the syn::parse_quote! issues entirely
82        write!(f, "q!({})", quote::quote!(#simplified))
83    }
84}
85
86/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
87fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
88    // Try to parse the token string as a syn::Expr
89    // Use a visitor to simplify q! macro expansions
90    let mut simplifier = QMacroSimplifier::new();
91    simplifier.visit_expr_mut(&mut expr);
92
93    // If we found and simplified a q! macro, return the simplified version
94    if let Some(simplified) = simplifier.simplified_result {
95        simplified
96    } else {
97        expr
98    }
99}
100
101/// AST visitor that simplifies q! macro expansions
102#[derive(Default)]
103pub struct QMacroSimplifier {
104    pub simplified_result: Option<syn::Expr>,
105}
106
107impl QMacroSimplifier {
108    pub fn new() -> Self {
109        Self::default()
110    }
111}
112
113impl VisitMut for QMacroSimplifier {
114    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
115        // Check if we already found a result to avoid further processing
116        if self.simplified_result.is_some() {
117            return;
118        }
119
120        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
121            // Look for calls to stageleft::runtime_support::fn*
122            && self.is_stageleft_runtime_support_call(&path_expr.path)
123            // Try to extract the closure from the arguments
124            && let Some(closure) = self.extract_closure_from_args(&call.args)
125        {
126            self.simplified_result = Some(closure);
127            return;
128        }
129
130        // Continue visiting child expressions using the default implementation
131        // Use the default visitor to avoid infinite recursion
132        syn::visit_mut::visit_expr_mut(self, expr);
133    }
134}
135
136impl QMacroSimplifier {
137    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
138        // Check if this is a call to stageleft::runtime_support::fn*
139        if let Some(last_segment) = path.segments.last() {
140            let fn_name = last_segment.ident.to_string();
141            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
142            fn_name.contains("_type_hint")
143                && path.segments.len() > 2
144                && path.segments[0].ident == "stageleft"
145                && path.segments[1].ident == "runtime_support"
146        } else {
147            false
148        }
149    }
150
151    fn extract_closure_from_args(
152        &self,
153        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
154    ) -> Option<syn::Expr> {
155        // Look through the arguments for a closure expression
156        for arg in args {
157            if let syn::Expr::Closure(_) = arg {
158                return Some(arg.clone());
159            }
160            // Also check for closures nested in other expressions (like blocks)
161            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
162                return Some(closure_expr);
163            }
164        }
165        None
166    }
167
168    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
169        let mut visitor = ClosureFinder {
170            found_closure: None,
171            prefer_inner_blocks: true,
172        };
173        visitor.visit_expr(expr);
174        visitor.found_closure
175    }
176}
177
178/// Visitor that finds closures in expressions with special block handling
179struct ClosureFinder {
180    found_closure: Option<syn::Expr>,
181    prefer_inner_blocks: bool,
182}
183
184impl<'ast> Visit<'ast> for ClosureFinder {
185    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
186        // If we already found a closure, don't continue searching
187        if self.found_closure.is_some() {
188            return;
189        }
190
191        match expr {
192            syn::Expr::Closure(_) => {
193                self.found_closure = Some(expr.clone());
194            }
195            syn::Expr::Block(block) if self.prefer_inner_blocks => {
196                // Special handling for blocks - look for inner blocks that contain closures
197                for stmt in &block.block.stmts {
198                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
199                        && let syn::Expr::Block(_) = stmt_expr
200                    {
201                        // Check if this nested block contains a closure
202                        let mut inner_visitor = ClosureFinder {
203                            found_closure: None,
204                            prefer_inner_blocks: false, // Avoid infinite recursion
205                        };
206                        inner_visitor.visit_expr(stmt_expr);
207                        if inner_visitor.found_closure.is_some() {
208                            // Found a closure in an inner block, return that block
209                            self.found_closure = Some(stmt_expr.clone());
210                            return;
211                        }
212                    }
213                }
214
215                // If no inner block with closure found, continue with normal visitation
216                visit::visit_expr(self, expr);
217
218                // If we found a closure, just return the closure itself, not the whole block
219                // unless we're in the special case where we want the containing block
220                if self.found_closure.is_some() {
221                    // The closure was found during visitation, no need to wrap in block
222                }
223            }
224            _ => {
225                // Use default visitor behavior for all other expressions
226                visit::visit_expr(self, expr);
227            }
228        }
229    }
230}
231
232/// Debug displays the type's tokens.
233///
234/// Boxes `syn::Type` which is ~320 bytes.
235#[derive(Clone, PartialEq, Eq, Hash)]
236pub struct DebugType(pub Box<syn::Type>);
237
238impl From<syn::Type> for DebugType {
239    fn from(t: syn::Type) -> Self {
240        Self(Box::new(t))
241    }
242}
243
244impl Deref for DebugType {
245    type Target = syn::Type;
246
247    fn deref(&self) -> &Self::Target {
248        &self.0
249    }
250}
251
252impl ToTokens for DebugType {
253    fn to_tokens(&self, tokens: &mut TokenStream) {
254        self.0.to_tokens(tokens);
255    }
256}
257
258impl Debug for DebugType {
259    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260        write!(f, "{}", self.0.to_token_stream())
261    }
262}
263
264impl serde::Serialize for DebugType {
265    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
266        serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
267    }
268}
269
270fn serialize_backtrace_as_span<S: serde::Serializer>(
271    backtrace: &Backtrace,
272    serializer: S,
273) -> Result<S::Ok, S::Error> {
274    match backtrace.format_span() {
275        Some(span) => serializer.serialize_some(&span),
276        None => serializer.serialize_none(),
277    }
278}
279
280fn serialize_ident<S: serde::Serializer>(
281    ident: &syn::Ident,
282    serializer: S,
283) -> Result<S::Ok, S::Error> {
284    serializer.serialize_str(&ident.to_string())
285}
286
287pub enum DebugInstantiate {
288    Building,
289    Finalized(Box<DebugInstantiateFinalized>),
290}
291
292impl serde::Serialize for DebugInstantiate {
293    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
294        match self {
295            DebugInstantiate::Building => {
296                serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
297            }
298            DebugInstantiate::Finalized(_) => {
299                panic!(
300                    "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
301                )
302            }
303        }
304    }
305}
306
307#[cfg_attr(
308    not(feature = "build"),
309    expect(
310        dead_code,
311        reason = "sink, source unused without `feature = \"build\"`."
312    )
313)]
314pub struct DebugInstantiateFinalized {
315    sink: syn::Expr,
316    source: syn::Expr,
317    connect_fn: Option<Box<dyn FnOnce()>>,
318}
319
320impl From<DebugInstantiateFinalized> for DebugInstantiate {
321    fn from(f: DebugInstantiateFinalized) -> Self {
322        Self::Finalized(Box::new(f))
323    }
324}
325
326impl Debug for DebugInstantiate {
327    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
328        write!(f, "<network instantiate>")
329    }
330}
331
332impl Hash for DebugInstantiate {
333    fn hash<H: Hasher>(&self, _state: &mut H) {
334        // Do nothing
335    }
336}
337
338impl Clone for DebugInstantiate {
339    fn clone(&self) -> Self {
340        match self {
341            DebugInstantiate::Building => DebugInstantiate::Building,
342            DebugInstantiate::Finalized(_) => {
343                panic!("DebugInstantiate::Finalized should not be cloned")
344            }
345        }
346    }
347}
348
349/// Tracks the instantiation state of a `ClusterMembers` source.
350///
351/// During `compile_network`, the first `ClusterMembers` node for a given
352/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
353/// receives the expression returned by `Deploy::cluster_membership_stream`.
354/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
355/// during code-gen they simply reference the tee output of the first node
356/// instead of creating a redundant `source_stream`.
357#[derive(Debug, Hash, Clone, serde::Serialize)]
358pub enum ClusterMembersState {
359    /// Not yet instantiated.
360    Uninit,
361    /// The primary instance: holds the stream expression and will emit
362    /// `source_stream(expr) -> tee()` during code-gen.
363    Stream(DebugExpr),
364    /// A secondary instance that references the tee output of the primary.
365    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
366    /// can derive the deterministic tee ident without extra state.
367    Tee(LocationId, LocationId),
368}
369
370/// A source in a Hydro graph, where data enters the graph.
371#[derive(Debug, Hash, Clone, serde::Serialize)]
372pub enum HydroSource {
373    Stream(DebugExpr),
374    ExternalNetwork(),
375    Iter(DebugExpr),
376    Spin(),
377    ClusterMembers(LocationId, ClusterMembersState),
378    Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
379    EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
380}
381
382#[cfg(feature = "build")]
383/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
384/// and simulations.
385///
386/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
387/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
388pub trait DfirBuilder {
389    /// Whether the representation of singletons should include intermediate states.
390    fn singleton_intermediates(&self) -> bool;
391
392    /// Gets the DFIR builder for the given location, creating it if necessary.
393    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
394
395    fn batch(
396        &mut self,
397        in_ident: syn::Ident,
398        in_location: &LocationId,
399        in_kind: &CollectionKind,
400        out_ident: &syn::Ident,
401        out_location: &LocationId,
402        op_meta: &HydroIrOpMetadata,
403    );
404    fn yield_from_tick(
405        &mut self,
406        in_ident: syn::Ident,
407        in_location: &LocationId,
408        in_kind: &CollectionKind,
409        out_ident: &syn::Ident,
410        out_location: &LocationId,
411    );
412
413    fn begin_atomic(
414        &mut self,
415        in_ident: syn::Ident,
416        in_location: &LocationId,
417        in_kind: &CollectionKind,
418        out_ident: &syn::Ident,
419        out_location: &LocationId,
420        op_meta: &HydroIrOpMetadata,
421    );
422    fn end_atomic(
423        &mut self,
424        in_ident: syn::Ident,
425        in_location: &LocationId,
426        in_kind: &CollectionKind,
427        out_ident: &syn::Ident,
428    );
429
430    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
431    fn observe_nondet(
432        &mut self,
433        trusted: bool,
434        location: &LocationId,
435        in_ident: syn::Ident,
436        in_kind: &CollectionKind,
437        out_ident: &syn::Ident,
438        out_kind: &CollectionKind,
439        op_meta: &HydroIrOpMetadata,
440    );
441
442    #[expect(clippy::too_many_arguments, reason = "TODO")]
443    fn create_network(
444        &mut self,
445        from: &LocationId,
446        to: &LocationId,
447        input_ident: syn::Ident,
448        out_ident: &syn::Ident,
449        serialize: Option<&DebugExpr>,
450        sink: syn::Expr,
451        source: syn::Expr,
452        deserialize: Option<&DebugExpr>,
453        tag_id: usize,
454        networking_info: &crate::networking::NetworkingInfo,
455    );
456
457    fn create_external_source(
458        &mut self,
459        on: &LocationId,
460        source_expr: syn::Expr,
461        out_ident: &syn::Ident,
462        deserialize: Option<&DebugExpr>,
463        tag_id: usize,
464    );
465
466    fn create_external_output(
467        &mut self,
468        on: &LocationId,
469        sink_expr: syn::Expr,
470        input_ident: &syn::Ident,
471        serialize: Option<&DebugExpr>,
472        tag_id: usize,
473    );
474
475    fn assert_is_consistent(
476        &mut self,
477        location: &LocationId,
478        in_ident: syn::Ident,
479        out_ident: &syn::Ident,
480    );
481}
482
483#[cfg(feature = "build")]
484impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
485    fn singleton_intermediates(&self) -> bool {
486        false
487    }
488
489    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
490        self.entry(location.root().key())
491            .expect("location was removed")
492            .or_default()
493    }
494
495    fn batch(
496        &mut self,
497        in_ident: syn::Ident,
498        in_location: &LocationId,
499        in_kind: &CollectionKind,
500        out_ident: &syn::Ident,
501        _out_location: &LocationId,
502        _op_meta: &HydroIrOpMetadata,
503    ) {
504        let builder = self.get_dfir_mut(in_location.root());
505        if in_kind.is_bounded()
506            && matches!(
507                in_kind,
508                CollectionKind::Singleton { .. }
509                    | CollectionKind::Optional { .. }
510                    | CollectionKind::KeyedSingleton { .. }
511            )
512        {
513            assert!(in_location.is_top_level());
514            builder.add_dfir(
515                parse_quote! {
516                    #out_ident = #in_ident -> persist::<'static>();
517                },
518                None,
519                None,
520            );
521        } else {
522            builder.add_dfir(
523                parse_quote! {
524                    #out_ident = #in_ident;
525                },
526                None,
527                None,
528            );
529        }
530    }
531
532    fn yield_from_tick(
533        &mut self,
534        in_ident: syn::Ident,
535        in_location: &LocationId,
536        _in_kind: &CollectionKind,
537        out_ident: &syn::Ident,
538        _out_location: &LocationId,
539    ) {
540        let builder = self.get_dfir_mut(in_location.root());
541        builder.add_dfir(
542            parse_quote! {
543                #out_ident = #in_ident;
544            },
545            None,
546            None,
547        );
548    }
549
550    fn begin_atomic(
551        &mut self,
552        in_ident: syn::Ident,
553        in_location: &LocationId,
554        _in_kind: &CollectionKind,
555        out_ident: &syn::Ident,
556        _out_location: &LocationId,
557        _op_meta: &HydroIrOpMetadata,
558    ) {
559        let builder = self.get_dfir_mut(in_location.root());
560        builder.add_dfir(
561            parse_quote! {
562                #out_ident = #in_ident;
563            },
564            None,
565            None,
566        );
567    }
568
569    fn end_atomic(
570        &mut self,
571        in_ident: syn::Ident,
572        in_location: &LocationId,
573        _in_kind: &CollectionKind,
574        out_ident: &syn::Ident,
575    ) {
576        let builder = self.get_dfir_mut(in_location.root());
577        builder.add_dfir(
578            parse_quote! {
579                #out_ident = #in_ident;
580            },
581            None,
582            None,
583        );
584    }
585
586    fn observe_nondet(
587        &mut self,
588        _trusted: bool,
589        location: &LocationId,
590        in_ident: syn::Ident,
591        _in_kind: &CollectionKind,
592        out_ident: &syn::Ident,
593        _out_kind: &CollectionKind,
594        _op_meta: &HydroIrOpMetadata,
595    ) {
596        let builder = self.get_dfir_mut(location);
597        builder.add_dfir(
598            parse_quote! {
599                #out_ident = #in_ident;
600            },
601            None,
602            None,
603        );
604    }
605
606    fn create_network(
607        &mut self,
608        from: &LocationId,
609        to: &LocationId,
610        input_ident: syn::Ident,
611        out_ident: &syn::Ident,
612        serialize: Option<&DebugExpr>,
613        sink: syn::Expr,
614        source: syn::Expr,
615        deserialize: Option<&DebugExpr>,
616        tag_id: usize,
617        _networking_info: &crate::networking::NetworkingInfo,
618    ) {
619        let sender_builder = self.get_dfir_mut(from);
620        if let Some(serialize_pipeline) = serialize {
621            sender_builder.add_dfir(
622                parse_quote! {
623                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
624                },
625                None,
626                // operator tag separates send and receive, which otherwise have the same next_stmt_id
627                Some(&format!("send{}", tag_id)),
628            );
629        } else {
630            sender_builder.add_dfir(
631                parse_quote! {
632                    #input_ident -> dest_sink(#sink);
633                },
634                None,
635                Some(&format!("send{}", tag_id)),
636            );
637        }
638
639        let receiver_builder = self.get_dfir_mut(to);
640        if let Some(deserialize_pipeline) = deserialize {
641            receiver_builder.add_dfir(
642                parse_quote! {
643                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
644                },
645                None,
646                Some(&format!("recv{}", tag_id)),
647            );
648        } else {
649            receiver_builder.add_dfir(
650                parse_quote! {
651                    #out_ident = source_stream(#source);
652                },
653                None,
654                Some(&format!("recv{}", tag_id)),
655            );
656        }
657    }
658
659    fn create_external_source(
660        &mut self,
661        on: &LocationId,
662        source_expr: syn::Expr,
663        out_ident: &syn::Ident,
664        deserialize: Option<&DebugExpr>,
665        tag_id: usize,
666    ) {
667        let receiver_builder = self.get_dfir_mut(on);
668        if let Some(deserialize_pipeline) = deserialize {
669            receiver_builder.add_dfir(
670                parse_quote! {
671                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
672                },
673                None,
674                Some(&format!("recv{}", tag_id)),
675            );
676        } else {
677            receiver_builder.add_dfir(
678                parse_quote! {
679                    #out_ident = source_stream(#source_expr);
680                },
681                None,
682                Some(&format!("recv{}", tag_id)),
683            );
684        }
685    }
686
687    fn create_external_output(
688        &mut self,
689        on: &LocationId,
690        sink_expr: syn::Expr,
691        input_ident: &syn::Ident,
692        serialize: Option<&DebugExpr>,
693        tag_id: usize,
694    ) {
695        let sender_builder = self.get_dfir_mut(on);
696        if let Some(serialize_fn) = serialize {
697            sender_builder.add_dfir(
698                parse_quote! {
699                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
700                },
701                None,
702                // operator tag separates send and receive, which otherwise have the same next_stmt_id
703                Some(&format!("send{}", tag_id)),
704            );
705        } else {
706            sender_builder.add_dfir(
707                parse_quote! {
708                    #input_ident -> dest_sink(#sink_expr);
709                },
710                None,
711                Some(&format!("send{}", tag_id)),
712            );
713        }
714    }
715
716    fn assert_is_consistent(
717        &mut self,
718        location: &LocationId,
719        in_ident: syn::Ident,
720        out_ident: &syn::Ident,
721    ) {
722        let builder = self.get_dfir_mut(location);
723        builder.add_dfir(
724            parse_quote! {
725                #out_ident = #in_ident;
726            },
727            None,
728            None,
729        );
730    }
731}
732
733#[cfg(feature = "build")]
734pub enum BuildersOrCallback<'a, L, N>
735where
736    L: FnMut(&mut HydroRoot, &mut usize),
737    N: FnMut(&mut HydroNode, &mut usize),
738{
739    Builders(&'a mut dyn DfirBuilder),
740    Callback(L, N),
741}
742
743/// An root in a Hydro graph, which is an pipeline that doesn't emit
744/// any downstream values. Traversals over the dataflow graph and
745/// generating DFIR IR start from roots.
746#[derive(Debug, Hash, serde::Serialize)]
747pub enum HydroRoot {
748    ForEach {
749        f: DebugExpr,
750        input: Box<HydroNode>,
751        op_metadata: HydroIrOpMetadata,
752    },
753    SendExternal {
754        to_external_key: LocationKey,
755        to_port_id: ExternalPortId,
756        to_many: bool,
757        unpaired: bool,
758        serialize_fn: Option<DebugExpr>,
759        instantiate_fn: DebugInstantiate,
760        input: Box<HydroNode>,
761        op_metadata: HydroIrOpMetadata,
762    },
763    DestSink {
764        sink: DebugExpr,
765        input: Box<HydroNode>,
766        op_metadata: HydroIrOpMetadata,
767    },
768    CycleSink {
769        cycle_id: CycleId,
770        input: Box<HydroNode>,
771        op_metadata: HydroIrOpMetadata,
772    },
773    EmbeddedOutput {
774        #[serde(serialize_with = "serialize_ident")]
775        ident: syn::Ident,
776        input: Box<HydroNode>,
777        op_metadata: HydroIrOpMetadata,
778    },
779    Null {
780        input: Box<HydroNode>,
781        op_metadata: HydroIrOpMetadata,
782    },
783}
784
785impl HydroRoot {
786    #[cfg(feature = "build")]
787    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
788    pub fn compile_network<'a, D>(
789        &mut self,
790        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
791        seen_tees: &mut SeenSharedNodes,
792        seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
793        processes: &SparseSecondaryMap<LocationKey, D::Process>,
794        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
795        externals: &SparseSecondaryMap<LocationKey, D::External>,
796        env: &mut D::InstantiateEnv,
797    ) where
798        D: Deploy<'a>,
799    {
800        let refcell_extra_stmts = RefCell::new(extra_stmts);
801        let refcell_env = RefCell::new(env);
802        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
803        self.transform_bottom_up(
804            &mut |l| {
805                if let HydroRoot::SendExternal {
806                    input,
807                    to_external_key,
808                    to_port_id,
809                    to_many,
810                    unpaired,
811                    instantiate_fn,
812                    ..
813                } = l
814                {
815                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
816                        DebugInstantiate::Building => {
817                            let to_node = externals
818                                .get(*to_external_key)
819                                .unwrap_or_else(|| {
820                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
821                                })
822                                .clone();
823
824                            match input.metadata().location_id.root() {
825                                &LocationId::Process(process_key) => {
826                                    if *to_many {
827                                        (
828                                            (
829                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
830                                                parse_quote!(DUMMY),
831                                            ),
832                                            Box::new(|| {}) as Box<dyn FnOnce()>,
833                                        )
834                                    } else {
835                                        let from_node = processes
836                                            .get(process_key)
837                                            .unwrap_or_else(|| {
838                                                panic!("A process used in the graph was not instantiated: {}", process_key)
839                                            })
840                                            .clone();
841
842                                        let sink_port = from_node.next_port();
843                                        let source_port = to_node.next_port();
844
845                                        if *unpaired {
846                                            use stageleft::quote_type;
847                                            use tokio_util::codec::LengthDelimitedCodec;
848
849                                            to_node.register(*to_port_id, source_port.clone());
850
851                                            let _ = D::e2o_source(
852                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
853                                                &to_node, &source_port,
854                                                &from_node, &sink_port,
855                                                &quote_type::<LengthDelimitedCodec>(),
856                                                format!("{}_{}", *to_external_key, *to_port_id)
857                                            );
858                                        }
859
860                                        (
861                                            (
862                                                D::o2e_sink(
863                                                    &from_node,
864                                                    &sink_port,
865                                                    &to_node,
866                                                    &source_port,
867                                                    format!("{}_{}", *to_external_key, *to_port_id)
868                                                ),
869                                                parse_quote!(DUMMY),
870                                            ),
871                                            if *unpaired {
872                                                D::e2o_connect(
873                                                    &to_node,
874                                                    &source_port,
875                                                    &from_node,
876                                                    &sink_port,
877                                                    *to_many,
878                                                    NetworkHint::Auto,
879                                                )
880                                            } else {
881                                                Box::new(|| {}) as Box<dyn FnOnce()>
882                                            },
883                                        )
884                                    }
885                                }
886                                LocationId::Cluster(cluster_key, _) => {
887                                    let from_node = clusters
888                                        .get(*cluster_key)
889                                        .unwrap_or_else(|| {
890                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
891                                        })
892                                        .clone();
893
894                                    let sink_port = from_node.next_port();
895                                    let source_port = to_node.next_port();
896
897                                    if *unpaired {
898                                        to_node.register(*to_port_id, source_port.clone());
899                                    }
900
901                                    (
902                                        (
903                                            D::m2e_sink(
904                                                &from_node,
905                                                &sink_port,
906                                                &to_node,
907                                                &source_port,
908                                                format!("{}_{}", *to_external_key, *to_port_id)
909                                            ),
910                                            parse_quote!(DUMMY),
911                                        ),
912                                        Box::new(|| {}) as Box<dyn FnOnce()>,
913                                    )
914                                }
915                                _ => panic!()
916                            }
917                        },
918
919                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
920                    };
921
922                    *instantiate_fn = DebugInstantiateFinalized {
923                        sink: sink_expr,
924                        source: source_expr,
925                        connect_fn: Some(connect_fn),
926                    }
927                    .into();
928                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
929                    let element_type = match &input.metadata().collection_kind {
930                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
931                        _ => panic!("Embedded output must have Stream collection kind"),
932                    };
933                    let location_key = match input.metadata().location_id.root() {
934                        LocationId::Process(key) | LocationId::Cluster(key, _) => *key,
935                        _ => panic!("Embedded output must be on a process or cluster"),
936                    };
937                    D::register_embedded_output(
938                        &mut refcell_env.borrow_mut(),
939                        location_key,
940                        ident,
941                        &element_type,
942                    );
943                }
944            },
945            &mut |n| {
946                if let HydroNode::Network {
947                    name,
948                    networking_info,
949                    input,
950                    instantiate_fn,
951                    metadata,
952                    ..
953                } = n
954                {
955                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
956                        DebugInstantiate::Building => instantiate_network::<D>(
957                            &mut refcell_env.borrow_mut(),
958                            input.metadata().location_id.root(),
959                            metadata.location_id.root(),
960                            processes,
961                            clusters,
962                            name.as_deref(),
963                            networking_info,
964                        ),
965
966                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
967                    };
968
969                    *instantiate_fn = DebugInstantiateFinalized {
970                        sink: sink_expr,
971                        source: source_expr,
972                        connect_fn: Some(connect_fn),
973                    }
974                    .into();
975                } else if let HydroNode::ExternalInput {
976                    from_external_key,
977                    from_port_id,
978                    from_many,
979                    codec_type,
980                    port_hint,
981                    instantiate_fn,
982                    metadata,
983                    ..
984                } = n
985                {
986                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
987                        DebugInstantiate::Building => {
988                            let from_node = externals
989                                .get(*from_external_key)
990                                .unwrap_or_else(|| {
991                                    panic!(
992                                        "A external used in the graph was not instantiated: {}",
993                                        from_external_key,
994                                    )
995                                })
996                                .clone();
997
998                            match metadata.location_id.root() {
999                                &LocationId::Process(process_key) => {
1000                                    let to_node = processes
1001                                        .get(process_key)
1002                                        .unwrap_or_else(|| {
1003                                            panic!("A process used in the graph was not instantiated: {}", process_key)
1004                                        })
1005                                        .clone();
1006
1007                                    let sink_port = from_node.next_port();
1008                                    let source_port = to_node.next_port();
1009
1010                                    from_node.register(*from_port_id, sink_port.clone());
1011
1012                                    (
1013                                        (
1014                                            parse_quote!(DUMMY),
1015                                            if *from_many {
1016                                                D::e2o_many_source(
1017                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1018                                                    &to_node, &source_port,
1019                                                    codec_type.0.as_ref(),
1020                                                    format!("{}_{}", *from_external_key, *from_port_id)
1021                                                )
1022                                            } else {
1023                                                D::e2o_source(
1024                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1025                                                    &from_node, &sink_port,
1026                                                    &to_node, &source_port,
1027                                                    codec_type.0.as_ref(),
1028                                                    format!("{}_{}", *from_external_key, *from_port_id)
1029                                                )
1030                                            },
1031                                        ),
1032                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1033                                    )
1034                                }
1035                                LocationId::Cluster(cluster_key, _) => {
1036                                    let to_node = clusters
1037                                        .get(*cluster_key)
1038                                        .unwrap_or_else(|| {
1039                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1040                                        })
1041                                        .clone();
1042
1043                                    let sink_port = from_node.next_port();
1044                                    let source_port = to_node.next_port();
1045
1046                                    from_node.register(*from_port_id, sink_port.clone());
1047
1048                                    (
1049                                        (
1050                                            parse_quote!(DUMMY),
1051                                            D::e2m_source(
1052                                                refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1053                                                &from_node, &sink_port,
1054                                                &to_node, &source_port,
1055                                                codec_type.0.as_ref(),
1056                                                format!("{}_{}", *from_external_key, *from_port_id)
1057                                            ),
1058                                        ),
1059                                        D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1060                                    )
1061                                }
1062                                _ => panic!()
1063                            }
1064                        },
1065
1066                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1067                    };
1068
1069                    *instantiate_fn = DebugInstantiateFinalized {
1070                        sink: sink_expr,
1071                        source: source_expr,
1072                        connect_fn: Some(connect_fn),
1073                    }
1074                    .into();
1075                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1076                    let element_type = match &metadata.collection_kind {
1077                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1078                        _ => panic!("Embedded source must have Stream collection kind"),
1079                    };
1080                    let location_key = match metadata.location_id.root() {
1081                        LocationId::Process(key) | LocationId::Cluster(key, _) => *key,
1082                        _ => panic!("Embedded source must be on a process or cluster"),
1083                    };
1084                    D::register_embedded_stream_input(
1085                        &mut refcell_env.borrow_mut(),
1086                        location_key,
1087                        ident,
1088                        &element_type,
1089                    );
1090                } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1091                    let element_type = match &metadata.collection_kind {
1092                        CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1093                        _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1094                    };
1095                    let location_key = match metadata.location_id.root() {
1096                        LocationId::Process(key) | LocationId::Cluster(key, _) => *key,
1097                        _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1098                    };
1099                    D::register_embedded_singleton_input(
1100                        &mut refcell_env.borrow_mut(),
1101                        location_key,
1102                        ident,
1103                        &element_type,
1104                    );
1105                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1106                    match state {
1107                        ClusterMembersState::Uninit => {
1108                            let at_location = metadata.location_id.root().clone();
1109                            let key = (at_location.clone(), location_id.key());
1110                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
1111                                // First occurrence: call cluster_membership_stream and mark as Stream.
1112                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1113                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1114                                    &(),
1115                                );
1116                                *state = ClusterMembersState::Stream(expr.into());
1117                            } else {
1118                                // Already instantiated for this (at, target) pair: just tee.
1119                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
1120                            }
1121                        }
1122                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1123                            panic!("cluster members already finalized");
1124                        }
1125                    }
1126                }
1127            },
1128            seen_tees,
1129            false,
1130        );
1131    }
1132
1133    pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1134        self.transform_bottom_up(
1135            &mut |l| {
1136                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1137                    match instantiate_fn {
1138                        DebugInstantiate::Building => panic!("network not built"),
1139
1140                        DebugInstantiate::Finalized(finalized) => {
1141                            (finalized.connect_fn.take().unwrap())();
1142                        }
1143                    }
1144                }
1145            },
1146            &mut |n| {
1147                if let HydroNode::Network { instantiate_fn, .. }
1148                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1149                {
1150                    match instantiate_fn {
1151                        DebugInstantiate::Building => panic!("network not built"),
1152
1153                        DebugInstantiate::Finalized(finalized) => {
1154                            (finalized.connect_fn.take().unwrap())();
1155                        }
1156                    }
1157                }
1158            },
1159            seen_tees,
1160            false,
1161        );
1162    }
1163
1164    pub fn transform_bottom_up(
1165        &mut self,
1166        transform_root: &mut impl FnMut(&mut HydroRoot),
1167        transform_node: &mut impl FnMut(&mut HydroNode),
1168        seen_tees: &mut SeenSharedNodes,
1169        check_well_formed: bool,
1170    ) {
1171        self.transform_children(
1172            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1173            seen_tees,
1174        );
1175
1176        transform_root(self);
1177    }
1178
1179    pub fn transform_children(
1180        &mut self,
1181        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1182        seen_tees: &mut SeenSharedNodes,
1183    ) {
1184        match self {
1185            HydroRoot::ForEach { input, .. }
1186            | HydroRoot::SendExternal { input, .. }
1187            | HydroRoot::DestSink { input, .. }
1188            | HydroRoot::CycleSink { input, .. }
1189            | HydroRoot::EmbeddedOutput { input, .. }
1190            | HydroRoot::Null { input, .. } => {
1191                transform(input, seen_tees);
1192            }
1193        }
1194    }
1195
1196    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1197        match self {
1198            HydroRoot::ForEach {
1199                f,
1200                input,
1201                op_metadata,
1202            } => HydroRoot::ForEach {
1203                f: f.clone(),
1204                input: Box::new(input.deep_clone(seen_tees)),
1205                op_metadata: op_metadata.clone(),
1206            },
1207            HydroRoot::SendExternal {
1208                to_external_key,
1209                to_port_id,
1210                to_many,
1211                unpaired,
1212                serialize_fn,
1213                instantiate_fn,
1214                input,
1215                op_metadata,
1216            } => HydroRoot::SendExternal {
1217                to_external_key: *to_external_key,
1218                to_port_id: *to_port_id,
1219                to_many: *to_many,
1220                unpaired: *unpaired,
1221                serialize_fn: serialize_fn.clone(),
1222                instantiate_fn: instantiate_fn.clone(),
1223                input: Box::new(input.deep_clone(seen_tees)),
1224                op_metadata: op_metadata.clone(),
1225            },
1226            HydroRoot::DestSink {
1227                sink,
1228                input,
1229                op_metadata,
1230            } => HydroRoot::DestSink {
1231                sink: sink.clone(),
1232                input: Box::new(input.deep_clone(seen_tees)),
1233                op_metadata: op_metadata.clone(),
1234            },
1235            HydroRoot::CycleSink {
1236                cycle_id,
1237                input,
1238                op_metadata,
1239            } => HydroRoot::CycleSink {
1240                cycle_id: *cycle_id,
1241                input: Box::new(input.deep_clone(seen_tees)),
1242                op_metadata: op_metadata.clone(),
1243            },
1244            HydroRoot::EmbeddedOutput {
1245                ident,
1246                input,
1247                op_metadata,
1248            } => HydroRoot::EmbeddedOutput {
1249                ident: ident.clone(),
1250                input: Box::new(input.deep_clone(seen_tees)),
1251                op_metadata: op_metadata.clone(),
1252            },
1253            HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1254                input: Box::new(input.deep_clone(seen_tees)),
1255                op_metadata: op_metadata.clone(),
1256            },
1257        }
1258    }
1259
1260    #[cfg(feature = "build")]
1261    pub fn emit(
1262        &mut self,
1263        graph_builders: &mut dyn DfirBuilder,
1264        seen_tees: &mut SeenSharedNodes,
1265        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1266        next_stmt_id: &mut usize,
1267    ) {
1268        self.emit_core(
1269            &mut BuildersOrCallback::<
1270                fn(&mut HydroRoot, &mut usize),
1271                fn(&mut HydroNode, &mut usize),
1272            >::Builders(graph_builders),
1273            seen_tees,
1274            built_tees,
1275            next_stmt_id,
1276        );
1277    }
1278
1279    #[cfg(feature = "build")]
1280    pub fn emit_core(
1281        &mut self,
1282        builders_or_callback: &mut BuildersOrCallback<
1283            impl FnMut(&mut HydroRoot, &mut usize),
1284            impl FnMut(&mut HydroNode, &mut usize),
1285        >,
1286        seen_tees: &mut SeenSharedNodes,
1287        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1288        next_stmt_id: &mut usize,
1289    ) {
1290        match self {
1291            HydroRoot::ForEach { f, input, .. } => {
1292                let input_ident =
1293                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1294
1295                match builders_or_callback {
1296                    BuildersOrCallback::Builders(graph_builders) => {
1297                        graph_builders
1298                            .get_dfir_mut(&input.metadata().location_id)
1299                            .add_dfir(
1300                                parse_quote! {
1301                                    #input_ident -> for_each(#f);
1302                                },
1303                                None,
1304                                Some(&next_stmt_id.to_string()),
1305                            );
1306                    }
1307                    BuildersOrCallback::Callback(leaf_callback, _) => {
1308                        leaf_callback(self, next_stmt_id);
1309                    }
1310                }
1311
1312                *next_stmt_id += 1;
1313            }
1314
1315            HydroRoot::SendExternal {
1316                serialize_fn,
1317                instantiate_fn,
1318                input,
1319                ..
1320            } => {
1321                let input_ident =
1322                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1323
1324                match builders_or_callback {
1325                    BuildersOrCallback::Builders(graph_builders) => {
1326                        let (sink_expr, _) = match instantiate_fn {
1327                            DebugInstantiate::Building => (
1328                                syn::parse_quote!(DUMMY_SINK),
1329                                syn::parse_quote!(DUMMY_SOURCE),
1330                            ),
1331
1332                            DebugInstantiate::Finalized(finalized) => {
1333                                (finalized.sink.clone(), finalized.source.clone())
1334                            }
1335                        };
1336
1337                        graph_builders.create_external_output(
1338                            &input.metadata().location_id,
1339                            sink_expr,
1340                            &input_ident,
1341                            serialize_fn.as_ref(),
1342                            *next_stmt_id,
1343                        );
1344                    }
1345                    BuildersOrCallback::Callback(leaf_callback, _) => {
1346                        leaf_callback(self, next_stmt_id);
1347                    }
1348                }
1349
1350                *next_stmt_id += 1;
1351            }
1352
1353            HydroRoot::DestSink { sink, input, .. } => {
1354                let input_ident =
1355                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1356
1357                match builders_or_callback {
1358                    BuildersOrCallback::Builders(graph_builders) => {
1359                        graph_builders
1360                            .get_dfir_mut(&input.metadata().location_id)
1361                            .add_dfir(
1362                                parse_quote! {
1363                                    #input_ident -> dest_sink(#sink);
1364                                },
1365                                None,
1366                                Some(&next_stmt_id.to_string()),
1367                            );
1368                    }
1369                    BuildersOrCallback::Callback(leaf_callback, _) => {
1370                        leaf_callback(self, next_stmt_id);
1371                    }
1372                }
1373
1374                *next_stmt_id += 1;
1375            }
1376
1377            HydroRoot::CycleSink {
1378                cycle_id, input, ..
1379            } => {
1380                let input_ident =
1381                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1382
1383                match builders_or_callback {
1384                    BuildersOrCallback::Builders(graph_builders) => {
1385                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1386                            CollectionKind::KeyedSingleton {
1387                                key_type,
1388                                value_type,
1389                                ..
1390                            }
1391                            | CollectionKind::KeyedStream {
1392                                key_type,
1393                                value_type,
1394                                ..
1395                            } => {
1396                                parse_quote!((#key_type, #value_type))
1397                            }
1398                            CollectionKind::Stream { element_type, .. }
1399                            | CollectionKind::Singleton { element_type, .. }
1400                            | CollectionKind::Optional { element_type, .. } => {
1401                                parse_quote!(#element_type)
1402                            }
1403                        };
1404
1405                        let cycle_id_ident = cycle_id.as_ident();
1406                        graph_builders
1407                            .get_dfir_mut(&input.metadata().location_id)
1408                            .add_dfir(
1409                                parse_quote! {
1410                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1411                                },
1412                                None,
1413                                None,
1414                            );
1415                    }
1416                    // No ID, no callback
1417                    BuildersOrCallback::Callback(_, _) => {}
1418                }
1419            }
1420
1421            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1422                let input_ident =
1423                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1424
1425                match builders_or_callback {
1426                    BuildersOrCallback::Builders(graph_builders) => {
1427                        graph_builders
1428                            .get_dfir_mut(&input.metadata().location_id)
1429                            .add_dfir(
1430                                parse_quote! {
1431                                    #input_ident -> for_each(&mut #ident);
1432                                },
1433                                None,
1434                                Some(&next_stmt_id.to_string()),
1435                            );
1436                    }
1437                    BuildersOrCallback::Callback(leaf_callback, _) => {
1438                        leaf_callback(self, next_stmt_id);
1439                    }
1440                }
1441
1442                *next_stmt_id += 1;
1443            }
1444
1445            HydroRoot::Null { input, .. } => {
1446                let input_ident =
1447                    input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1448
1449                match builders_or_callback {
1450                    BuildersOrCallback::Builders(graph_builders) => {
1451                        graph_builders
1452                            .get_dfir_mut(&input.metadata().location_id)
1453                            .add_dfir(
1454                                parse_quote! {
1455                                    #input_ident -> for_each(|_| {});
1456                                },
1457                                None,
1458                                Some(&next_stmt_id.to_string()),
1459                            );
1460                    }
1461                    BuildersOrCallback::Callback(leaf_callback, _) => {
1462                        leaf_callback(self, next_stmt_id);
1463                    }
1464                }
1465
1466                *next_stmt_id += 1;
1467            }
1468        }
1469    }
1470
1471    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1472        match self {
1473            HydroRoot::ForEach { op_metadata, .. }
1474            | HydroRoot::SendExternal { op_metadata, .. }
1475            | HydroRoot::DestSink { op_metadata, .. }
1476            | HydroRoot::CycleSink { op_metadata, .. }
1477            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1478            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1479        }
1480    }
1481
1482    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1483        match self {
1484            HydroRoot::ForEach { op_metadata, .. }
1485            | HydroRoot::SendExternal { op_metadata, .. }
1486            | HydroRoot::DestSink { op_metadata, .. }
1487            | HydroRoot::CycleSink { op_metadata, .. }
1488            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1489            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1490        }
1491    }
1492
1493    pub fn input(&self) -> &HydroNode {
1494        match self {
1495            HydroRoot::ForEach { input, .. }
1496            | HydroRoot::SendExternal { input, .. }
1497            | HydroRoot::DestSink { input, .. }
1498            | HydroRoot::CycleSink { input, .. }
1499            | HydroRoot::EmbeddedOutput { input, .. }
1500            | HydroRoot::Null { input, .. } => input,
1501        }
1502    }
1503
1504    pub fn input_metadata(&self) -> &HydroIrMetadata {
1505        self.input().metadata()
1506    }
1507
1508    pub fn print_root(&self) -> String {
1509        match self {
1510            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1511            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1512            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1513            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1514            HydroRoot::EmbeddedOutput { ident, .. } => {
1515                format!("EmbeddedOutput({})", ident)
1516            }
1517            HydroRoot::Null { .. } => "Null".to_owned(),
1518        }
1519    }
1520
1521    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1522        match self {
1523            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1524                transform(f);
1525            }
1526            HydroRoot::SendExternal { .. }
1527            | HydroRoot::CycleSink { .. }
1528            | HydroRoot::EmbeddedOutput { .. }
1529            | HydroRoot::Null { .. } => {}
1530        }
1531    }
1532}
1533
1534#[cfg(feature = "build")]
1535fn tick_of(loc: &LocationId) -> Option<ClockId> {
1536    match loc {
1537        LocationId::Tick(id, _) => Some(*id),
1538        LocationId::Atomic(inner) => tick_of(inner),
1539        _ => None,
1540    }
1541}
1542
1543#[cfg(feature = "build")]
1544fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1545    match loc {
1546        LocationId::Tick(id, inner) => {
1547            *id = uf_find(uf, *id);
1548            remap_location(inner, uf);
1549        }
1550        LocationId::Atomic(inner) => {
1551            remap_location(inner, uf);
1552        }
1553        LocationId::Process(_) | LocationId::Cluster(_, _) => {}
1554    }
1555}
1556
1557#[cfg(feature = "build")]
1558fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1559    let p = *parent.get(&x).unwrap_or(&x);
1560    if p == x {
1561        return x;
1562    }
1563    let root = uf_find(parent, p);
1564    parent.insert(x, root);
1565    root
1566}
1567
1568#[cfg(feature = "build")]
1569fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1570    let ra = uf_find(parent, a);
1571    let rb = uf_find(parent, b);
1572    if ra != rb {
1573        parent.insert(ra, rb);
1574    }
1575}
1576
1577/// Traverse the IR to build a union-find that unifies tick IDs connected
1578/// through `Batch` and `YieldConcat` nodes at atomic boundaries, then
1579/// rewrite all `LocationId`s to use the representative tick ID.
1580#[cfg(feature = "build")]
1581pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1582    let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1583
1584    // Pass 1: collect unifications.
1585    transform_bottom_up(
1586        ir,
1587        &mut |_| {},
1588        &mut |node: &mut HydroNode| {
1589            if let HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } =
1590                node
1591                && let (Some(a), Some(b)) = (
1592                    tick_of(&inner.metadata().location_id),
1593                    tick_of(&metadata.location_id),
1594                )
1595            {
1596                uf_union(&mut uf, a, b);
1597            }
1598        },
1599        false,
1600    );
1601
1602    // Pass 2: rewrite all LocationIds.
1603    transform_bottom_up(
1604        ir,
1605        &mut |_| {},
1606        &mut |node: &mut HydroNode| {
1607            remap_location(&mut node.metadata_mut().location_id, &mut uf);
1608        },
1609        false,
1610    );
1611}
1612
1613#[cfg(feature = "build")]
1614pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1615    let mut builders = SecondaryMap::new();
1616    let mut seen_tees = HashMap::new();
1617    let mut built_tees = HashMap::new();
1618    let mut next_stmt_id = 0;
1619    for leaf in ir {
1620        leaf.emit(
1621            &mut builders,
1622            &mut seen_tees,
1623            &mut built_tees,
1624            &mut next_stmt_id,
1625        );
1626    }
1627    builders
1628}
1629
1630#[cfg(feature = "build")]
1631pub fn traverse_dfir(
1632    ir: &mut [HydroRoot],
1633    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1634    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1635) {
1636    let mut seen_tees = HashMap::new();
1637    let mut built_tees = HashMap::new();
1638    let mut next_stmt_id = 0;
1639    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1640    ir.iter_mut().for_each(|leaf| {
1641        leaf.emit_core(
1642            &mut callback,
1643            &mut seen_tees,
1644            &mut built_tees,
1645            &mut next_stmt_id,
1646        );
1647    });
1648}
1649
1650pub fn transform_bottom_up(
1651    ir: &mut [HydroRoot],
1652    transform_root: &mut impl FnMut(&mut HydroRoot),
1653    transform_node: &mut impl FnMut(&mut HydroNode),
1654    check_well_formed: bool,
1655) {
1656    let mut seen_tees = HashMap::new();
1657    ir.iter_mut().for_each(|leaf| {
1658        leaf.transform_bottom_up(
1659            transform_root,
1660            transform_node,
1661            &mut seen_tees,
1662            check_well_formed,
1663        );
1664    });
1665}
1666
1667pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1668    let mut seen_tees = HashMap::new();
1669    ir.iter()
1670        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1671        .collect()
1672}
1673
1674type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1675thread_local! {
1676    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1677    /// Tracks shared nodes already serialized so that `SharedNode::serialize`
1678    /// emits the full subtree only once and uses a `"<shared N>"` back-reference
1679    /// on subsequent encounters, preventing infinite loops.
1680    static SERIALIZED_SHARED: PrintedTees
1681        = const { RefCell::new(None) };
1682}
1683
1684pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1685    PRINTED_TEES.with(|printed_tees| {
1686        let mut printed_tees_mut = printed_tees.borrow_mut();
1687        *printed_tees_mut = Some((0, HashMap::new()));
1688        drop(printed_tees_mut);
1689
1690        let ret = f();
1691
1692        let mut printed_tees_mut = printed_tees.borrow_mut();
1693        *printed_tees_mut = None;
1694
1695        ret
1696    })
1697}
1698
1699/// Runs `f` with a fresh shared-node deduplication scope for serialization.
1700/// Any `SharedNode` serialized inside `f` will be tracked; the first occurrence
1701/// emits the full subtree while later occurrences emit a `{"$shared_ref": id}`
1702/// back-reference.  The tracking state is restored when `f` returns or panics.
1703pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1704    let _guard = SerializedSharedGuard::enter();
1705    f()
1706}
1707
1708/// RAII guard that saves/restores the `SERIALIZED_SHARED` thread-local,
1709/// making `serialize_dedup_shared` re-entrant and panic-safe.
1710struct SerializedSharedGuard {
1711    previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1712}
1713
1714impl SerializedSharedGuard {
1715    fn enter() -> Self {
1716        let previous = SERIALIZED_SHARED.with(|cell| {
1717            let mut guard = cell.borrow_mut();
1718            guard.replace((0, HashMap::new()))
1719        });
1720        Self { previous }
1721    }
1722}
1723
1724impl Drop for SerializedSharedGuard {
1725    fn drop(&mut self) {
1726        SERIALIZED_SHARED.with(|cell| {
1727            *cell.borrow_mut() = self.previous.take();
1728        });
1729    }
1730}
1731
1732pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1733
1734impl serde::Serialize for SharedNode {
1735    /// Multiple `SharedNode`s can point to the same underlying `HydroNode` (via
1736    /// `Tee` / `Partition`).  A naïve recursive serialization would revisit the
1737    /// same subtree every time and, if the graph ever contains a cycle, loop
1738    /// forever.
1739    ///
1740    /// We keep a thread-local map (`SERIALIZED_SHARED`) from raw `Rc` pointer →
1741    /// integer id.  The first time we see a pointer we assign it the next id and
1742    /// emit the full subtree as `{"$shared": <id>, "node": …}`.  Every later
1743    /// encounter of the same pointer emits `{"$shared_ref": <id>}`, cutting the
1744    /// recursion.  Requires an active `serialize_dedup_shared` scope.
1745    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1746        SERIALIZED_SHARED.with(|cell| {
1747            let mut guard = cell.borrow_mut();
1748            // (next_id, pointer → assigned_id)
1749            let state = guard.as_mut().ok_or_else(|| {
1750                serde::ser::Error::custom(
1751                    "SharedNode serialization requires an active serialize_dedup_shared scope",
1752                )
1753            })?;
1754            let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
1755
1756            if let Some(&id) = state.1.get(&ptr) {
1757                drop(guard);
1758                use serde::ser::SerializeMap;
1759                let mut map = serializer.serialize_map(Some(1))?;
1760                map.serialize_entry("$shared_ref", &id)?;
1761                map.end()
1762            } else {
1763                let id = state.0;
1764                state.0 += 1;
1765                state.1.insert(ptr, id);
1766                drop(guard);
1767
1768                use serde::ser::SerializeMap;
1769                let mut map = serializer.serialize_map(Some(2))?;
1770                map.serialize_entry("$shared", &id)?;
1771                map.serialize_entry("node", &*self.0.borrow())?;
1772                map.end()
1773            }
1774        })
1775    }
1776}
1777
1778impl SharedNode {
1779    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1780        Rc::as_ptr(&self.0)
1781    }
1782}
1783
1784impl Debug for SharedNode {
1785    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1786        PRINTED_TEES.with(|printed_tees| {
1787            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1788            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1789
1790            if let Some(printed_tees_mut) = printed_tees_mut {
1791                if let Some(existing) = printed_tees_mut
1792                    .1
1793                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1794                {
1795                    write!(f, "<shared {}>", existing)
1796                } else {
1797                    let next_id = printed_tees_mut.0;
1798                    printed_tees_mut.0 += 1;
1799                    printed_tees_mut
1800                        .1
1801                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1802                    drop(printed_tees_mut_borrow);
1803                    write!(f, "<shared {}>: ", next_id)?;
1804                    Debug::fmt(&self.0.borrow(), f)
1805                }
1806            } else {
1807                drop(printed_tees_mut_borrow);
1808                write!(f, "<shared>: ")?;
1809                Debug::fmt(&self.0.borrow(), f)
1810            }
1811        })
1812    }
1813}
1814
1815impl Hash for SharedNode {
1816    fn hash<H: Hasher>(&self, state: &mut H) {
1817        self.0.borrow_mut().hash(state);
1818    }
1819}
1820
1821#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1822pub enum BoundKind {
1823    Unbounded,
1824    Bounded,
1825}
1826
1827#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1828pub enum StreamOrder {
1829    NoOrder,
1830    TotalOrder,
1831}
1832
1833#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1834pub enum StreamRetry {
1835    AtLeastOnce,
1836    ExactlyOnce,
1837}
1838
1839#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1840pub enum KeyedSingletonBoundKind {
1841    Unbounded,
1842    MonotonicValue,
1843    BoundedValue,
1844    Bounded,
1845}
1846
1847#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
1848pub enum SingletonBoundKind {
1849    Unbounded,
1850    Monotonic,
1851    Bounded,
1852}
1853
1854#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
1855pub enum CollectionKind {
1856    Stream {
1857        bound: BoundKind,
1858        order: StreamOrder,
1859        retry: StreamRetry,
1860        element_type: DebugType,
1861    },
1862    Singleton {
1863        bound: SingletonBoundKind,
1864        element_type: DebugType,
1865    },
1866    Optional {
1867        bound: BoundKind,
1868        element_type: DebugType,
1869    },
1870    KeyedStream {
1871        bound: BoundKind,
1872        value_order: StreamOrder,
1873        value_retry: StreamRetry,
1874        key_type: DebugType,
1875        value_type: DebugType,
1876    },
1877    KeyedSingleton {
1878        bound: KeyedSingletonBoundKind,
1879        key_type: DebugType,
1880        value_type: DebugType,
1881    },
1882}
1883
1884impl CollectionKind {
1885    pub fn is_bounded(&self) -> bool {
1886        matches!(
1887            self,
1888            CollectionKind::Stream {
1889                bound: BoundKind::Bounded,
1890                ..
1891            } | CollectionKind::Singleton {
1892                bound: SingletonBoundKind::Bounded,
1893                ..
1894            } | CollectionKind::Optional {
1895                bound: BoundKind::Bounded,
1896                ..
1897            } | CollectionKind::KeyedStream {
1898                bound: BoundKind::Bounded,
1899                ..
1900            } | CollectionKind::KeyedSingleton {
1901                bound: KeyedSingletonBoundKind::Bounded,
1902                ..
1903            }
1904        )
1905    }
1906}
1907
1908#[derive(Clone, serde::Serialize)]
1909pub struct HydroIrMetadata {
1910    pub location_id: LocationId,
1911    pub collection_kind: CollectionKind,
1912    pub cardinality: Option<usize>,
1913    pub tag: Option<String>,
1914    pub op: HydroIrOpMetadata,
1915}
1916
1917// HydroIrMetadata shouldn't be used to hash or compare
1918impl Hash for HydroIrMetadata {
1919    fn hash<H: Hasher>(&self, _: &mut H) {}
1920}
1921
1922impl PartialEq for HydroIrMetadata {
1923    fn eq(&self, _: &Self) -> bool {
1924        true
1925    }
1926}
1927
1928impl Eq for HydroIrMetadata {}
1929
1930impl Debug for HydroIrMetadata {
1931    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1932        f.debug_struct("HydroIrMetadata")
1933            .field("location_id", &self.location_id)
1934            .field("collection_kind", &self.collection_kind)
1935            .finish()
1936    }
1937}
1938
1939/// Metadata that is specific to the operator itself, rather than its outputs.
1940/// This is available on _both_ inner nodes and roots.
1941#[derive(Clone, serde::Serialize)]
1942pub struct HydroIrOpMetadata {
1943    #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
1944    pub backtrace: Backtrace,
1945    pub cpu_usage: Option<f64>,
1946    pub network_recv_cpu_usage: Option<f64>,
1947    pub id: Option<usize>,
1948}
1949
1950impl HydroIrOpMetadata {
1951    #[expect(
1952        clippy::new_without_default,
1953        reason = "explicit calls to new ensure correct backtrace bounds"
1954    )]
1955    pub fn new() -> HydroIrOpMetadata {
1956        Self::new_with_skip(1)
1957    }
1958
1959    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1960        HydroIrOpMetadata {
1961            backtrace: Backtrace::get_backtrace(2 + skip_count),
1962            cpu_usage: None,
1963            network_recv_cpu_usage: None,
1964            id: None,
1965        }
1966    }
1967}
1968
1969impl Debug for HydroIrOpMetadata {
1970    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1971        f.debug_struct("HydroIrOpMetadata").finish()
1972    }
1973}
1974
1975impl Hash for HydroIrOpMetadata {
1976    fn hash<H: Hasher>(&self, _: &mut H) {}
1977}
1978
1979/// An intermediate node in a Hydro graph, which consumes data
1980/// from upstream nodes and emits data to downstream nodes.
1981#[derive(Debug, Hash, serde::Serialize)]
1982pub enum HydroNode {
1983    Placeholder,
1984
1985    /// Manually "casts" between two different collection kinds.
1986    ///
1987    /// Using this IR node requires special care, since it bypasses many of Hydro's core
1988    /// correctness checks. In particular, the user must ensure that every possible
1989    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
1990    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
1991    /// collection. This ensures that the simulator does not miss any possible outputs.
1992    Cast {
1993        inner: Box<HydroNode>,
1994        metadata: HydroIrMetadata,
1995    },
1996
1997    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
1998    /// interpretation of the input stream.
1999    ///
2000    /// In production, this simply passes through the input, but in simulation, this operator
2001    /// explicitly selects a randomized interpretation.
2002    ObserveNonDet {
2003        inner: Box<HydroNode>,
2004        trusted: bool, // if true, we do not need to simulate non-determinism
2005        metadata: HydroIrMetadata,
2006    },
2007
2008    Source {
2009        source: HydroSource,
2010        metadata: HydroIrMetadata,
2011    },
2012
2013    SingletonSource {
2014        value: DebugExpr,
2015        first_tick_only: bool,
2016        metadata: HydroIrMetadata,
2017    },
2018
2019    CycleSource {
2020        cycle_id: CycleId,
2021        metadata: HydroIrMetadata,
2022    },
2023
2024    Tee {
2025        inner: SharedNode,
2026        metadata: HydroIrMetadata,
2027    },
2028
2029    Partition {
2030        inner: SharedNode,
2031        f: DebugExpr,
2032        is_true: bool,
2033        metadata: HydroIrMetadata,
2034    },
2035
2036    BeginAtomic {
2037        inner: Box<HydroNode>,
2038        metadata: HydroIrMetadata,
2039    },
2040
2041    EndAtomic {
2042        inner: Box<HydroNode>,
2043        metadata: HydroIrMetadata,
2044    },
2045
2046    Batch {
2047        inner: Box<HydroNode>,
2048        metadata: HydroIrMetadata,
2049    },
2050
2051    YieldConcat {
2052        inner: Box<HydroNode>,
2053        metadata: HydroIrMetadata,
2054    },
2055
2056    Chain {
2057        first: Box<HydroNode>,
2058        second: Box<HydroNode>,
2059        metadata: HydroIrMetadata,
2060    },
2061
2062    ChainFirst {
2063        first: Box<HydroNode>,
2064        second: Box<HydroNode>,
2065        metadata: HydroIrMetadata,
2066    },
2067
2068    CrossProduct {
2069        left: Box<HydroNode>,
2070        right: Box<HydroNode>,
2071        metadata: HydroIrMetadata,
2072    },
2073
2074    CrossSingleton {
2075        left: Box<HydroNode>,
2076        right: Box<HydroNode>,
2077        metadata: HydroIrMetadata,
2078    },
2079
2080    Join {
2081        left: Box<HydroNode>,
2082        right: Box<HydroNode>,
2083        metadata: HydroIrMetadata,
2084    },
2085
2086    /// Asymmetric join where the right (build) side is bounded.
2087    /// The build side is accumulated (stratum-delayed) into a hash table,
2088    /// then the left (probe) side streams through preserving its ordering.
2089    JoinHalf {
2090        left: Box<HydroNode>,
2091        right: Box<HydroNode>,
2092        metadata: HydroIrMetadata,
2093    },
2094
2095    Difference {
2096        pos: Box<HydroNode>,
2097        neg: Box<HydroNode>,
2098        metadata: HydroIrMetadata,
2099    },
2100
2101    AntiJoin {
2102        pos: Box<HydroNode>,
2103        neg: Box<HydroNode>,
2104        metadata: HydroIrMetadata,
2105    },
2106
2107    ResolveFutures {
2108        input: Box<HydroNode>,
2109        metadata: HydroIrMetadata,
2110    },
2111    ResolveFuturesBlocking {
2112        input: Box<HydroNode>,
2113        metadata: HydroIrMetadata,
2114    },
2115    ResolveFuturesOrdered {
2116        input: Box<HydroNode>,
2117        metadata: HydroIrMetadata,
2118    },
2119
2120    Map {
2121        f: DebugExpr,
2122        input: Box<HydroNode>,
2123        metadata: HydroIrMetadata,
2124    },
2125    FlatMap {
2126        f: DebugExpr,
2127        input: Box<HydroNode>,
2128        metadata: HydroIrMetadata,
2129    },
2130    FlatMapStreamBlocking {
2131        f: DebugExpr,
2132        input: Box<HydroNode>,
2133        metadata: HydroIrMetadata,
2134    },
2135    Filter {
2136        f: DebugExpr,
2137        input: Box<HydroNode>,
2138        metadata: HydroIrMetadata,
2139    },
2140    FilterMap {
2141        f: DebugExpr,
2142        input: Box<HydroNode>,
2143        metadata: HydroIrMetadata,
2144    },
2145
2146    DeferTick {
2147        input: Box<HydroNode>,
2148        metadata: HydroIrMetadata,
2149    },
2150    Enumerate {
2151        input: Box<HydroNode>,
2152        metadata: HydroIrMetadata,
2153    },
2154    Inspect {
2155        f: DebugExpr,
2156        input: Box<HydroNode>,
2157        metadata: HydroIrMetadata,
2158    },
2159
2160    Unique {
2161        input: Box<HydroNode>,
2162        metadata: HydroIrMetadata,
2163    },
2164
2165    Sort {
2166        input: Box<HydroNode>,
2167        metadata: HydroIrMetadata,
2168    },
2169    Fold {
2170        init: DebugExpr,
2171        acc: DebugExpr,
2172        input: Box<HydroNode>,
2173        metadata: HydroIrMetadata,
2174    },
2175
2176    Scan {
2177        init: DebugExpr,
2178        acc: DebugExpr,
2179        input: Box<HydroNode>,
2180        metadata: HydroIrMetadata,
2181    },
2182    ScanAsyncBlocking {
2183        init: DebugExpr,
2184        acc: DebugExpr,
2185        input: Box<HydroNode>,
2186        metadata: HydroIrMetadata,
2187    },
2188    FoldKeyed {
2189        init: DebugExpr,
2190        acc: DebugExpr,
2191        input: Box<HydroNode>,
2192        metadata: HydroIrMetadata,
2193    },
2194
2195    Reduce {
2196        f: DebugExpr,
2197        input: Box<HydroNode>,
2198        metadata: HydroIrMetadata,
2199    },
2200    ReduceKeyed {
2201        f: DebugExpr,
2202        input: Box<HydroNode>,
2203        metadata: HydroIrMetadata,
2204    },
2205    ReduceKeyedWatermark {
2206        f: DebugExpr,
2207        input: Box<HydroNode>,
2208        watermark: Box<HydroNode>,
2209        metadata: HydroIrMetadata,
2210    },
2211
2212    Network {
2213        name: Option<String>,
2214        networking_info: crate::networking::NetworkingInfo,
2215        serialize_fn: Option<DebugExpr>,
2216        instantiate_fn: DebugInstantiate,
2217        deserialize_fn: Option<DebugExpr>,
2218        input: Box<HydroNode>,
2219        metadata: HydroIrMetadata,
2220    },
2221
2222    ExternalInput {
2223        from_external_key: LocationKey,
2224        from_port_id: ExternalPortId,
2225        from_many: bool,
2226        codec_type: DebugType,
2227        #[serde(skip)]
2228        port_hint: NetworkHint,
2229        instantiate_fn: DebugInstantiate,
2230        deserialize_fn: Option<DebugExpr>,
2231        metadata: HydroIrMetadata,
2232    },
2233
2234    Counter {
2235        tag: String,
2236        duration: DebugExpr,
2237        prefix: String,
2238        input: Box<HydroNode>,
2239        metadata: HydroIrMetadata,
2240    },
2241
2242    AssertIsConsistent {
2243        inner: Box<HydroNode>,
2244        metadata: HydroIrMetadata,
2245    },
2246}
2247
2248pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2249pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2250
2251impl HydroNode {
2252    pub fn transform_bottom_up(
2253        &mut self,
2254        transform: &mut impl FnMut(&mut HydroNode),
2255        seen_tees: &mut SeenSharedNodes,
2256        check_well_formed: bool,
2257    ) {
2258        self.transform_children(
2259            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2260            seen_tees,
2261        );
2262
2263        transform(self);
2264
2265        let self_location = self.metadata().location_id.root();
2266
2267        if check_well_formed {
2268            match &*self {
2269                HydroNode::Network { .. } => {}
2270                _ => {
2271                    self.input_metadata().iter().for_each(|i| {
2272                        if i.location_id.root() != self_location {
2273                            panic!(
2274                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2275                                i,
2276                                i.location_id.root(),
2277                                self,
2278                                self_location
2279                            )
2280                        }
2281                    });
2282                }
2283            }
2284        }
2285    }
2286
2287    #[inline(always)]
2288    pub fn transform_children(
2289        &mut self,
2290        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2291        seen_tees: &mut SeenSharedNodes,
2292    ) {
2293        match self {
2294            HydroNode::Placeholder => {
2295                panic!();
2296            }
2297
2298            HydroNode::Source { .. }
2299            | HydroNode::SingletonSource { .. }
2300            | HydroNode::CycleSource { .. }
2301            | HydroNode::ExternalInput { .. } => {}
2302
2303            HydroNode::Tee { inner, .. } => {
2304                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2305                    *inner = SharedNode(transformed.clone());
2306                } else {
2307                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2308                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2309                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2310                    transform(&mut orig, seen_tees);
2311                    *transformed_cell.borrow_mut() = orig;
2312                    *inner = SharedNode(transformed_cell);
2313                }
2314            }
2315
2316            HydroNode::Partition { inner, .. } => {
2317                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2318                    *inner = SharedNode(transformed.clone());
2319                } else {
2320                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2321                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2322                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2323                    transform(&mut orig, seen_tees);
2324                    *transformed_cell.borrow_mut() = orig;
2325                    *inner = SharedNode(transformed_cell);
2326                }
2327            }
2328
2329            HydroNode::Cast { inner, .. }
2330            | HydroNode::ObserveNonDet { inner, .. }
2331            | HydroNode::BeginAtomic { inner, .. }
2332            | HydroNode::EndAtomic { inner, .. }
2333            | HydroNode::Batch { inner, .. }
2334            | HydroNode::YieldConcat { inner, .. }
2335            | HydroNode::AssertIsConsistent { inner, .. } => {
2336                transform(inner.as_mut(), seen_tees);
2337            }
2338
2339            HydroNode::Chain { first, second, .. } => {
2340                transform(first.as_mut(), seen_tees);
2341                transform(second.as_mut(), seen_tees);
2342            }
2343
2344            HydroNode::ChainFirst { first, second, .. } => {
2345                transform(first.as_mut(), seen_tees);
2346                transform(second.as_mut(), seen_tees);
2347            }
2348
2349            HydroNode::CrossSingleton { left, right, .. }
2350            | HydroNode::CrossProduct { left, right, .. }
2351            | HydroNode::Join { left, right, .. }
2352            | HydroNode::JoinHalf { left, right, .. } => {
2353                transform(left.as_mut(), seen_tees);
2354                transform(right.as_mut(), seen_tees);
2355            }
2356
2357            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2358                transform(pos.as_mut(), seen_tees);
2359                transform(neg.as_mut(), seen_tees);
2360            }
2361
2362            HydroNode::ReduceKeyedWatermark {
2363                input, watermark, ..
2364            } => {
2365                transform(input.as_mut(), seen_tees);
2366                transform(watermark.as_mut(), seen_tees);
2367            }
2368
2369            HydroNode::Map { input, .. }
2370            | HydroNode::ResolveFutures { input, .. }
2371            | HydroNode::ResolveFuturesBlocking { input, .. }
2372            | HydroNode::ResolveFuturesOrdered { input, .. }
2373            | HydroNode::FlatMap { input, .. }
2374            | HydroNode::FlatMapStreamBlocking { input, .. }
2375            | HydroNode::Filter { input, .. }
2376            | HydroNode::FilterMap { input, .. }
2377            | HydroNode::Sort { input, .. }
2378            | HydroNode::DeferTick { input, .. }
2379            | HydroNode::Enumerate { input, .. }
2380            | HydroNode::Inspect { input, .. }
2381            | HydroNode::Unique { input, .. }
2382            | HydroNode::Network { input, .. }
2383            | HydroNode::Fold { input, .. }
2384            | HydroNode::Scan { input, .. }
2385            | HydroNode::ScanAsyncBlocking { input, .. }
2386            | HydroNode::FoldKeyed { input, .. }
2387            | HydroNode::Reduce { input, .. }
2388            | HydroNode::ReduceKeyed { input, .. }
2389            | HydroNode::Counter { input, .. } => {
2390                transform(input.as_mut(), seen_tees);
2391            }
2392        }
2393    }
2394
2395    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2396        match self {
2397            HydroNode::Placeholder => HydroNode::Placeholder,
2398            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2399                inner: Box::new(inner.deep_clone(seen_tees)),
2400                metadata: metadata.clone(),
2401            },
2402            HydroNode::ObserveNonDet {
2403                inner,
2404                trusted,
2405                metadata,
2406            } => HydroNode::ObserveNonDet {
2407                inner: Box::new(inner.deep_clone(seen_tees)),
2408                trusted: *trusted,
2409                metadata: metadata.clone(),
2410            },
2411            HydroNode::AssertIsConsistent { inner, metadata } => HydroNode::AssertIsConsistent {
2412                inner: Box::new(inner.deep_clone(seen_tees)),
2413                metadata: metadata.clone(),
2414            },
2415            HydroNode::Source { source, metadata } => HydroNode::Source {
2416                source: source.clone(),
2417                metadata: metadata.clone(),
2418            },
2419            HydroNode::SingletonSource {
2420                value,
2421                first_tick_only,
2422                metadata,
2423            } => HydroNode::SingletonSource {
2424                value: value.clone(),
2425                first_tick_only: *first_tick_only,
2426                metadata: metadata.clone(),
2427            },
2428            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2429                cycle_id: *cycle_id,
2430                metadata: metadata.clone(),
2431            },
2432            HydroNode::Tee { inner, metadata } => {
2433                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2434                    HydroNode::Tee {
2435                        inner: SharedNode(transformed.clone()),
2436                        metadata: metadata.clone(),
2437                    }
2438                } else {
2439                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2440                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2441                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2442                    *new_rc.borrow_mut() = cloned;
2443                    HydroNode::Tee {
2444                        inner: SharedNode(new_rc),
2445                        metadata: metadata.clone(),
2446                    }
2447                }
2448            }
2449            HydroNode::Partition {
2450                inner,
2451                f,
2452                is_true,
2453                metadata,
2454            } => {
2455                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2456                    HydroNode::Partition {
2457                        inner: SharedNode(transformed.clone()),
2458                        f: f.clone(),
2459                        is_true: *is_true,
2460                        metadata: metadata.clone(),
2461                    }
2462                } else {
2463                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2464                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2465                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2466                    *new_rc.borrow_mut() = cloned;
2467                    HydroNode::Partition {
2468                        inner: SharedNode(new_rc),
2469                        f: f.clone(),
2470                        is_true: *is_true,
2471                        metadata: metadata.clone(),
2472                    }
2473                }
2474            }
2475            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2476                inner: Box::new(inner.deep_clone(seen_tees)),
2477                metadata: metadata.clone(),
2478            },
2479            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2480                inner: Box::new(inner.deep_clone(seen_tees)),
2481                metadata: metadata.clone(),
2482            },
2483            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2484                inner: Box::new(inner.deep_clone(seen_tees)),
2485                metadata: metadata.clone(),
2486            },
2487            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2488                inner: Box::new(inner.deep_clone(seen_tees)),
2489                metadata: metadata.clone(),
2490            },
2491            HydroNode::Chain {
2492                first,
2493                second,
2494                metadata,
2495            } => HydroNode::Chain {
2496                first: Box::new(first.deep_clone(seen_tees)),
2497                second: Box::new(second.deep_clone(seen_tees)),
2498                metadata: metadata.clone(),
2499            },
2500            HydroNode::ChainFirst {
2501                first,
2502                second,
2503                metadata,
2504            } => HydroNode::ChainFirst {
2505                first: Box::new(first.deep_clone(seen_tees)),
2506                second: Box::new(second.deep_clone(seen_tees)),
2507                metadata: metadata.clone(),
2508            },
2509            HydroNode::CrossProduct {
2510                left,
2511                right,
2512                metadata,
2513            } => HydroNode::CrossProduct {
2514                left: Box::new(left.deep_clone(seen_tees)),
2515                right: Box::new(right.deep_clone(seen_tees)),
2516                metadata: metadata.clone(),
2517            },
2518            HydroNode::CrossSingleton {
2519                left,
2520                right,
2521                metadata,
2522            } => HydroNode::CrossSingleton {
2523                left: Box::new(left.deep_clone(seen_tees)),
2524                right: Box::new(right.deep_clone(seen_tees)),
2525                metadata: metadata.clone(),
2526            },
2527            HydroNode::Join {
2528                left,
2529                right,
2530                metadata,
2531            } => HydroNode::Join {
2532                left: Box::new(left.deep_clone(seen_tees)),
2533                right: Box::new(right.deep_clone(seen_tees)),
2534                metadata: metadata.clone(),
2535            },
2536            HydroNode::JoinHalf {
2537                left,
2538                right,
2539                metadata,
2540            } => HydroNode::JoinHalf {
2541                left: Box::new(left.deep_clone(seen_tees)),
2542                right: Box::new(right.deep_clone(seen_tees)),
2543                metadata: metadata.clone(),
2544            },
2545            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2546                pos: Box::new(pos.deep_clone(seen_tees)),
2547                neg: Box::new(neg.deep_clone(seen_tees)),
2548                metadata: metadata.clone(),
2549            },
2550            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2551                pos: Box::new(pos.deep_clone(seen_tees)),
2552                neg: Box::new(neg.deep_clone(seen_tees)),
2553                metadata: metadata.clone(),
2554            },
2555            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2556                input: Box::new(input.deep_clone(seen_tees)),
2557                metadata: metadata.clone(),
2558            },
2559            HydroNode::ResolveFuturesBlocking { input, metadata } => {
2560                HydroNode::ResolveFuturesBlocking {
2561                    input: Box::new(input.deep_clone(seen_tees)),
2562                    metadata: metadata.clone(),
2563                }
2564            }
2565            HydroNode::ResolveFuturesOrdered { input, metadata } => {
2566                HydroNode::ResolveFuturesOrdered {
2567                    input: Box::new(input.deep_clone(seen_tees)),
2568                    metadata: metadata.clone(),
2569                }
2570            }
2571            HydroNode::Map { f, input, metadata } => HydroNode::Map {
2572                f: f.clone(),
2573                input: Box::new(input.deep_clone(seen_tees)),
2574                metadata: metadata.clone(),
2575            },
2576            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2577                f: f.clone(),
2578                input: Box::new(input.deep_clone(seen_tees)),
2579                metadata: metadata.clone(),
2580            },
2581            HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2582                HydroNode::FlatMapStreamBlocking {
2583                    f: f.clone(),
2584                    input: Box::new(input.deep_clone(seen_tees)),
2585                    metadata: metadata.clone(),
2586                }
2587            }
2588            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2589                f: f.clone(),
2590                input: Box::new(input.deep_clone(seen_tees)),
2591                metadata: metadata.clone(),
2592            },
2593            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2594                f: f.clone(),
2595                input: Box::new(input.deep_clone(seen_tees)),
2596                metadata: metadata.clone(),
2597            },
2598            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2599                input: Box::new(input.deep_clone(seen_tees)),
2600                metadata: metadata.clone(),
2601            },
2602            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2603                input: Box::new(input.deep_clone(seen_tees)),
2604                metadata: metadata.clone(),
2605            },
2606            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2607                f: f.clone(),
2608                input: Box::new(input.deep_clone(seen_tees)),
2609                metadata: metadata.clone(),
2610            },
2611            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2612                input: Box::new(input.deep_clone(seen_tees)),
2613                metadata: metadata.clone(),
2614            },
2615            HydroNode::Sort { input, metadata } => HydroNode::Sort {
2616                input: Box::new(input.deep_clone(seen_tees)),
2617                metadata: metadata.clone(),
2618            },
2619            HydroNode::Fold {
2620                init,
2621                acc,
2622                input,
2623                metadata,
2624            } => HydroNode::Fold {
2625                init: init.clone(),
2626                acc: acc.clone(),
2627                input: Box::new(input.deep_clone(seen_tees)),
2628                metadata: metadata.clone(),
2629            },
2630            HydroNode::Scan {
2631                init,
2632                acc,
2633                input,
2634                metadata,
2635            } => HydroNode::Scan {
2636                init: init.clone(),
2637                acc: acc.clone(),
2638                input: Box::new(input.deep_clone(seen_tees)),
2639                metadata: metadata.clone(),
2640            },
2641            HydroNode::ScanAsyncBlocking {
2642                init,
2643                acc,
2644                input,
2645                metadata,
2646            } => HydroNode::ScanAsyncBlocking {
2647                init: init.clone(),
2648                acc: acc.clone(),
2649                input: Box::new(input.deep_clone(seen_tees)),
2650                metadata: metadata.clone(),
2651            },
2652            HydroNode::FoldKeyed {
2653                init,
2654                acc,
2655                input,
2656                metadata,
2657            } => HydroNode::FoldKeyed {
2658                init: init.clone(),
2659                acc: acc.clone(),
2660                input: Box::new(input.deep_clone(seen_tees)),
2661                metadata: metadata.clone(),
2662            },
2663            HydroNode::ReduceKeyedWatermark {
2664                f,
2665                input,
2666                watermark,
2667                metadata,
2668            } => HydroNode::ReduceKeyedWatermark {
2669                f: f.clone(),
2670                input: Box::new(input.deep_clone(seen_tees)),
2671                watermark: Box::new(watermark.deep_clone(seen_tees)),
2672                metadata: metadata.clone(),
2673            },
2674            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2675                f: f.clone(),
2676                input: Box::new(input.deep_clone(seen_tees)),
2677                metadata: metadata.clone(),
2678            },
2679            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2680                f: f.clone(),
2681                input: Box::new(input.deep_clone(seen_tees)),
2682                metadata: metadata.clone(),
2683            },
2684            HydroNode::Network {
2685                name,
2686                networking_info,
2687                serialize_fn,
2688                instantiate_fn,
2689                deserialize_fn,
2690                input,
2691                metadata,
2692            } => HydroNode::Network {
2693                name: name.clone(),
2694                networking_info: networking_info.clone(),
2695                serialize_fn: serialize_fn.clone(),
2696                instantiate_fn: instantiate_fn.clone(),
2697                deserialize_fn: deserialize_fn.clone(),
2698                input: Box::new(input.deep_clone(seen_tees)),
2699                metadata: metadata.clone(),
2700            },
2701            HydroNode::ExternalInput {
2702                from_external_key,
2703                from_port_id,
2704                from_many,
2705                codec_type,
2706                port_hint,
2707                instantiate_fn,
2708                deserialize_fn,
2709                metadata,
2710            } => HydroNode::ExternalInput {
2711                from_external_key: *from_external_key,
2712                from_port_id: *from_port_id,
2713                from_many: *from_many,
2714                codec_type: codec_type.clone(),
2715                port_hint: *port_hint,
2716                instantiate_fn: instantiate_fn.clone(),
2717                deserialize_fn: deserialize_fn.clone(),
2718                metadata: metadata.clone(),
2719            },
2720            HydroNode::Counter {
2721                tag,
2722                duration,
2723                prefix,
2724                input,
2725                metadata,
2726            } => HydroNode::Counter {
2727                tag: tag.clone(),
2728                duration: duration.clone(),
2729                prefix: prefix.clone(),
2730                input: Box::new(input.deep_clone(seen_tees)),
2731                metadata: metadata.clone(),
2732            },
2733        }
2734    }
2735
2736    #[cfg(feature = "build")]
2737    pub fn emit_core(
2738        &mut self,
2739        builders_or_callback: &mut BuildersOrCallback<
2740            impl FnMut(&mut HydroRoot, &mut usize),
2741            impl FnMut(&mut HydroNode, &mut usize),
2742        >,
2743        seen_tees: &mut SeenSharedNodes,
2744        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
2745        next_stmt_id: &mut usize,
2746    ) -> syn::Ident {
2747        let mut ident_stack: Vec<syn::Ident> = Vec::new();
2748
2749        self.transform_bottom_up(
2750            &mut |node: &mut HydroNode| {
2751                let out_location = node.metadata().location_id.clone();
2752                match node {
2753                    HydroNode::Placeholder => {
2754                        panic!()
2755                    }
2756
2757                    HydroNode::Cast { .. } => {
2758                        // Cast passes through the input ident unchanged
2759                        // The input ident is already on the stack from processing the child
2760                        match builders_or_callback {
2761                            BuildersOrCallback::Builders(_) => {}
2762                            BuildersOrCallback::Callback(_, node_callback) => {
2763                                node_callback(node, next_stmt_id);
2764                            }
2765                        }
2766
2767                        *next_stmt_id += 1;
2768                        // input_ident stays on stack as output
2769                    }
2770
2771                    HydroNode::AssertIsConsistent { inner, .. } => {
2772                        let inner_ident = ident_stack.pop().unwrap();
2773
2774                        let out_ident =
2775                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2776
2777                        match builders_or_callback {
2778                            BuildersOrCallback::Builders(graph_builders) => {
2779                                graph_builders.assert_is_consistent(
2780                                    &inner.metadata().location_id,
2781                                    inner_ident,
2782                                    &out_ident,
2783                                );
2784                            }
2785                            BuildersOrCallback::Callback(_, node_callback) => {
2786                                node_callback(node, next_stmt_id);
2787                            }
2788                        }
2789
2790                        *next_stmt_id += 1;
2791
2792                        ident_stack.push(out_ident);
2793                    }
2794
2795                    HydroNode::ObserveNonDet {
2796                        inner,
2797                        trusted,
2798                        metadata,
2799                        ..
2800                    } => {
2801                        let inner_ident = ident_stack.pop().unwrap();
2802
2803                        let observe_ident =
2804                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2805
2806                        match builders_or_callback {
2807                            BuildersOrCallback::Builders(graph_builders) => {
2808                                graph_builders.observe_nondet(
2809                                    *trusted,
2810                                    &inner.metadata().location_id,
2811                                    inner_ident,
2812                                    &inner.metadata().collection_kind,
2813                                    &observe_ident,
2814                                    &metadata.collection_kind,
2815                                    &metadata.op,
2816                                );
2817                            }
2818                            BuildersOrCallback::Callback(_, node_callback) => {
2819                                node_callback(node, next_stmt_id);
2820                            }
2821                        }
2822
2823                        *next_stmt_id += 1;
2824
2825                        ident_stack.push(observe_ident);
2826                    }
2827
2828                    HydroNode::Batch {
2829                        inner, metadata, ..
2830                    } => {
2831                        let inner_ident = ident_stack.pop().unwrap();
2832
2833                        let batch_ident =
2834                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2835
2836                        match builders_or_callback {
2837                            BuildersOrCallback::Builders(graph_builders) => {
2838                                graph_builders.batch(
2839                                    inner_ident,
2840                                    &inner.metadata().location_id,
2841                                    &inner.metadata().collection_kind,
2842                                    &batch_ident,
2843                                    &out_location,
2844                                    &metadata.op,
2845                                );
2846                            }
2847                            BuildersOrCallback::Callback(_, node_callback) => {
2848                                node_callback(node, next_stmt_id);
2849                            }
2850                        }
2851
2852                        *next_stmt_id += 1;
2853
2854                        ident_stack.push(batch_ident);
2855                    }
2856
2857                    HydroNode::YieldConcat { inner, .. } => {
2858                        let inner_ident = ident_stack.pop().unwrap();
2859
2860                        let yield_ident =
2861                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2862
2863                        match builders_or_callback {
2864                            BuildersOrCallback::Builders(graph_builders) => {
2865                                graph_builders.yield_from_tick(
2866                                    inner_ident,
2867                                    &inner.metadata().location_id,
2868                                    &inner.metadata().collection_kind,
2869                                    &yield_ident,
2870                                    &out_location,
2871                                );
2872                            }
2873                            BuildersOrCallback::Callback(_, node_callback) => {
2874                                node_callback(node, next_stmt_id);
2875                            }
2876                        }
2877
2878                        *next_stmt_id += 1;
2879
2880                        ident_stack.push(yield_ident);
2881                    }
2882
2883                    HydroNode::BeginAtomic { inner, metadata } => {
2884                        let inner_ident = ident_stack.pop().unwrap();
2885
2886                        let begin_ident =
2887                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2888
2889                        match builders_or_callback {
2890                            BuildersOrCallback::Builders(graph_builders) => {
2891                                graph_builders.begin_atomic(
2892                                    inner_ident,
2893                                    &inner.metadata().location_id,
2894                                    &inner.metadata().collection_kind,
2895                                    &begin_ident,
2896                                    &out_location,
2897                                    &metadata.op,
2898                                );
2899                            }
2900                            BuildersOrCallback::Callback(_, node_callback) => {
2901                                node_callback(node, next_stmt_id);
2902                            }
2903                        }
2904
2905                        *next_stmt_id += 1;
2906
2907                        ident_stack.push(begin_ident);
2908                    }
2909
2910                    HydroNode::EndAtomic { inner, .. } => {
2911                        let inner_ident = ident_stack.pop().unwrap();
2912
2913                        let end_ident =
2914                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2915
2916                        match builders_or_callback {
2917                            BuildersOrCallback::Builders(graph_builders) => {
2918                                graph_builders.end_atomic(
2919                                    inner_ident,
2920                                    &inner.metadata().location_id,
2921                                    &inner.metadata().collection_kind,
2922                                    &end_ident,
2923                                );
2924                            }
2925                            BuildersOrCallback::Callback(_, node_callback) => {
2926                                node_callback(node, next_stmt_id);
2927                            }
2928                        }
2929
2930                        *next_stmt_id += 1;
2931
2932                        ident_stack.push(end_ident);
2933                    }
2934
2935                    HydroNode::Source {
2936                        source, metadata, ..
2937                    } => {
2938                        if let HydroSource::ExternalNetwork() = source {
2939                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2940                        } else {
2941                            let source_ident =
2942                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2943
2944                            let source_stmt = match source {
2945                                HydroSource::Stream(expr) => {
2946                                    debug_assert!(metadata.location_id.is_top_level());
2947                                    parse_quote! {
2948                                        #source_ident = source_stream(#expr);
2949                                    }
2950                                }
2951
2952                                HydroSource::ExternalNetwork() => {
2953                                    unreachable!()
2954                                }
2955
2956                                HydroSource::Iter(expr) => {
2957                                    if metadata.location_id.is_top_level() {
2958                                        parse_quote! {
2959                                            #source_ident = source_iter(#expr);
2960                                        }
2961                                    } else {
2962                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
2963                                        parse_quote! {
2964                                            #source_ident = source_iter(#expr) -> persist::<'static>();
2965                                        }
2966                                    }
2967                                }
2968
2969                                HydroSource::Spin() => {
2970                                    debug_assert!(metadata.location_id.is_top_level());
2971                                    parse_quote! {
2972                                        #source_ident = spin();
2973                                    }
2974                                }
2975
2976                                HydroSource::ClusterMembers(target_loc, state) => {
2977                                    debug_assert!(metadata.location_id.is_top_level());
2978
2979                                    let members_tee_ident = syn::Ident::new(
2980                                        &format!(
2981                                            "__cluster_members_tee_{}_{}",
2982                                            metadata.location_id.root().key(),
2983                                            target_loc.key(),
2984                                        ),
2985                                        Span::call_site(),
2986                                    );
2987
2988                                    match state {
2989                                        ClusterMembersState::Stream(d) => {
2990                                            parse_quote! {
2991                                                #members_tee_ident = source_stream(#d) -> tee();
2992                                                #source_ident = #members_tee_ident;
2993                                            }
2994                                        },
2995                                        ClusterMembersState::Uninit => syn::parse_quote! {
2996                                            #source_ident = source_stream(DUMMY);
2997                                        },
2998                                        ClusterMembersState::Tee(..) => parse_quote! {
2999                                            #source_ident = #members_tee_ident;
3000                                        },
3001                                    }
3002                                }
3003
3004                                HydroSource::Embedded(ident) => {
3005                                    parse_quote! {
3006                                        #source_ident = source_stream(#ident);
3007                                    }
3008                                }
3009
3010                                HydroSource::EmbeddedSingleton(ident) => {
3011                                    parse_quote! {
3012                                        #source_ident = source_iter([#ident]);
3013                                    }
3014                                }
3015                            };
3016
3017                            match builders_or_callback {
3018                                BuildersOrCallback::Builders(graph_builders) => {
3019                                    let builder = graph_builders.get_dfir_mut(&out_location);
3020                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
3021                                }
3022                                BuildersOrCallback::Callback(_, node_callback) => {
3023                                    node_callback(node, next_stmt_id);
3024                                }
3025                            }
3026
3027                            *next_stmt_id += 1;
3028
3029                            ident_stack.push(source_ident);
3030                        }
3031                    }
3032
3033                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3034                        let source_ident =
3035                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3036
3037                        match builders_or_callback {
3038                            BuildersOrCallback::Builders(graph_builders) => {
3039                                let builder = graph_builders.get_dfir_mut(&out_location);
3040
3041                                if *first_tick_only {
3042                                    assert!(
3043                                        !metadata.location_id.is_top_level(),
3044                                        "first_tick_only SingletonSource must be inside a tick"
3045                                    );
3046                                }
3047
3048                                if *first_tick_only
3049                                    || (metadata.location_id.is_top_level()
3050                                        && metadata.collection_kind.is_bounded())
3051                                {
3052                                    builder.add_dfir(
3053                                        parse_quote! {
3054                                            #source_ident = source_iter([#value]);
3055                                        },
3056                                        None,
3057                                        Some(&next_stmt_id.to_string()),
3058                                    );
3059                                } else {
3060                                    builder.add_dfir(
3061                                        parse_quote! {
3062                                            #source_ident = source_iter([#value]) -> persist::<'static>();
3063                                        },
3064                                        None,
3065                                        Some(&next_stmt_id.to_string()),
3066                                    );
3067                                }
3068                            }
3069                            BuildersOrCallback::Callback(_, node_callback) => {
3070                                node_callback(node, next_stmt_id);
3071                            }
3072                        }
3073
3074                        *next_stmt_id += 1;
3075
3076                        ident_stack.push(source_ident);
3077                    }
3078
3079                    HydroNode::CycleSource { cycle_id, .. } => {
3080                        let ident = cycle_id.as_ident();
3081
3082                        match builders_or_callback {
3083                            BuildersOrCallback::Builders(_) => {}
3084                            BuildersOrCallback::Callback(_, node_callback) => {
3085                                node_callback(node, next_stmt_id);
3086                            }
3087                        }
3088
3089                        // consume a stmt id even though we did not emit anything so that we can instrument this
3090                        *next_stmt_id += 1;
3091
3092                        ident_stack.push(ident);
3093                    }
3094
3095                    HydroNode::Tee { inner, .. } => {
3096                        let ret_ident = if let Some(built_idents) =
3097                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3098                        {
3099                            match builders_or_callback {
3100                                BuildersOrCallback::Builders(_) => {}
3101                                BuildersOrCallback::Callback(_, node_callback) => {
3102                                    node_callback(node, next_stmt_id);
3103                                }
3104                            }
3105
3106                            built_idents[0].clone()
3107                        } else {
3108                            // The inner node was already processed by transform_bottom_up,
3109                            // so its ident is on the stack
3110                            let inner_ident = ident_stack.pop().unwrap();
3111
3112                            let tee_ident =
3113                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3114
3115                            built_tees.insert(
3116                                inner.0.as_ref() as *const RefCell<HydroNode>,
3117                                vec![tee_ident.clone()],
3118                            );
3119
3120                            match builders_or_callback {
3121                                BuildersOrCallback::Builders(graph_builders) => {
3122                                    let builder = graph_builders.get_dfir_mut(&out_location);
3123                                    builder.add_dfir(
3124                                        parse_quote! {
3125                                            #tee_ident = #inner_ident -> tee();
3126                                        },
3127                                        None,
3128                                        Some(&next_stmt_id.to_string()),
3129                                    );
3130                                }
3131                                BuildersOrCallback::Callback(_, node_callback) => {
3132                                    node_callback(node, next_stmt_id);
3133                                }
3134                            }
3135
3136                            tee_ident
3137                        };
3138
3139                        // we consume a stmt id regardless of if we emit the tee() operator,
3140                        // so that during rewrites we touch all recipients of the tee()
3141
3142                        *next_stmt_id += 1;
3143                        ident_stack.push(ret_ident);
3144                    }
3145
3146                    HydroNode::Partition {
3147                        inner, f, is_true, ..
3148                    } => {
3149                        let is_true = *is_true; // need to copy early to avoid borrow checking issues with node
3150                        let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3151                        let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3152                            match builders_or_callback {
3153                                BuildersOrCallback::Builders(_) => {}
3154                                BuildersOrCallback::Callback(_, node_callback) => {
3155                                    node_callback(node, next_stmt_id);
3156                                }
3157                            }
3158
3159                            let idx = if is_true { 0 } else { 1 };
3160                            built_idents[idx].clone()
3161                        } else {
3162                            // The inner node was already processed by transform_bottom_up,
3163                            // so its ident is on the stack
3164                            let inner_ident = ident_stack.pop().unwrap();
3165
3166                            let partition_ident = syn::Ident::new(
3167                                &format!("stream_{}_partition", *next_stmt_id),
3168                                Span::call_site(),
3169                            );
3170                            let true_ident = syn::Ident::new(
3171                                &format!("stream_{}_true", *next_stmt_id),
3172                                Span::call_site(),
3173                            );
3174                            let false_ident = syn::Ident::new(
3175                                &format!("stream_{}_false", *next_stmt_id),
3176                                Span::call_site(),
3177                            );
3178
3179                            built_tees.insert(
3180                                ptr,
3181                                vec![true_ident.clone(), false_ident.clone()],
3182                            );
3183
3184                            match builders_or_callback {
3185                                BuildersOrCallback::Builders(graph_builders) => {
3186                                    let builder = graph_builders.get_dfir_mut(&out_location);
3187                                    builder.add_dfir(
3188                                        parse_quote! {
3189                                            #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f)(__item) { 0_usize } else { 1_usize });
3190                                            #true_ident = #partition_ident[0];
3191                                            #false_ident = #partition_ident[1];
3192                                        },
3193                                        None,
3194                                        Some(&next_stmt_id.to_string()),
3195                                    );
3196                                }
3197                                BuildersOrCallback::Callback(_, node_callback) => {
3198                                    node_callback(node, next_stmt_id);
3199                                }
3200                            }
3201
3202                            if is_true { true_ident } else { false_ident }
3203                        };
3204
3205                        *next_stmt_id += 1;
3206                        ident_stack.push(ret_ident);
3207                    }
3208
3209                    HydroNode::Chain { .. } => {
3210                        // Children are processed left-to-right, so second is on top
3211                        let second_ident = ident_stack.pop().unwrap();
3212                        let first_ident = ident_stack.pop().unwrap();
3213
3214                        let chain_ident =
3215                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3216
3217                        match builders_or_callback {
3218                            BuildersOrCallback::Builders(graph_builders) => {
3219                                let builder = graph_builders.get_dfir_mut(&out_location);
3220                                builder.add_dfir(
3221                                    parse_quote! {
3222                                        #chain_ident = chain();
3223                                        #first_ident -> [0]#chain_ident;
3224                                        #second_ident -> [1]#chain_ident;
3225                                    },
3226                                    None,
3227                                    Some(&next_stmt_id.to_string()),
3228                                );
3229                            }
3230                            BuildersOrCallback::Callback(_, node_callback) => {
3231                                node_callback(node, next_stmt_id);
3232                            }
3233                        }
3234
3235                        *next_stmt_id += 1;
3236
3237                        ident_stack.push(chain_ident);
3238                    }
3239
3240                    HydroNode::ChainFirst { .. } => {
3241                        let second_ident = ident_stack.pop().unwrap();
3242                        let first_ident = ident_stack.pop().unwrap();
3243
3244                        let chain_ident =
3245                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3246
3247                        match builders_or_callback {
3248                            BuildersOrCallback::Builders(graph_builders) => {
3249                                let builder = graph_builders.get_dfir_mut(&out_location);
3250                                builder.add_dfir(
3251                                    parse_quote! {
3252                                        #chain_ident = chain_first_n(1);
3253                                        #first_ident -> [0]#chain_ident;
3254                                        #second_ident -> [1]#chain_ident;
3255                                    },
3256                                    None,
3257                                    Some(&next_stmt_id.to_string()),
3258                                );
3259                            }
3260                            BuildersOrCallback::Callback(_, node_callback) => {
3261                                node_callback(node, next_stmt_id);
3262                            }
3263                        }
3264
3265                        *next_stmt_id += 1;
3266
3267                        ident_stack.push(chain_ident);
3268                    }
3269
3270                    HydroNode::CrossSingleton { right, .. } => {
3271                        let right_ident = ident_stack.pop().unwrap();
3272                        let left_ident = ident_stack.pop().unwrap();
3273
3274                        let cross_ident =
3275                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3276
3277                        match builders_or_callback {
3278                            BuildersOrCallback::Builders(graph_builders) => {
3279                                let builder = graph_builders.get_dfir_mut(&out_location);
3280
3281                                if right.metadata().location_id.is_top_level()
3282                                    && right.metadata().collection_kind.is_bounded()
3283                                {
3284                                    builder.add_dfir(
3285                                        parse_quote! {
3286                                            #cross_ident = cross_singleton();
3287                                            #left_ident -> [input]#cross_ident;
3288                                            #right_ident -> persist::<'static>() -> [single]#cross_ident;
3289                                        },
3290                                        None,
3291                                        Some(&next_stmt_id.to_string()),
3292                                    );
3293                                } else {
3294                                    builder.add_dfir(
3295                                        parse_quote! {
3296                                            #cross_ident = cross_singleton();
3297                                            #left_ident -> [input]#cross_ident;
3298                                            #right_ident -> [single]#cross_ident;
3299                                        },
3300                                        None,
3301                                        Some(&next_stmt_id.to_string()),
3302                                    );
3303                                }
3304                            }
3305                            BuildersOrCallback::Callback(_, node_callback) => {
3306                                node_callback(node, next_stmt_id);
3307                            }
3308                        }
3309
3310                        *next_stmt_id += 1;
3311
3312                        ident_stack.push(cross_ident);
3313                    }
3314
3315                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3316                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3317                            parse_quote!(cross_join_multiset)
3318                        } else {
3319                            parse_quote!(join_multiset)
3320                        };
3321
3322                        let (HydroNode::CrossProduct { left, right, .. }
3323                        | HydroNode::Join { left, right, .. }) = node
3324                        else {
3325                            unreachable!()
3326                        };
3327
3328                        let is_top_level = left.metadata().location_id.is_top_level()
3329                            && right.metadata().location_id.is_top_level();
3330                        let left_lifetime = if left.metadata().location_id.is_top_level() {
3331                            quote!('static)
3332                        } else {
3333                            quote!('tick)
3334                        };
3335
3336                        let right_lifetime = if right.metadata().location_id.is_top_level() {
3337                            quote!('static)
3338                        } else {
3339                            quote!('tick)
3340                        };
3341
3342                        let right_ident = ident_stack.pop().unwrap();
3343                        let left_ident = ident_stack.pop().unwrap();
3344
3345                        let stream_ident =
3346                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3347
3348                        match builders_or_callback {
3349                            BuildersOrCallback::Builders(graph_builders) => {
3350                                let builder = graph_builders.get_dfir_mut(&out_location);
3351                                builder.add_dfir(
3352                                    if is_top_level {
3353                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
3354                                        // a multiset_delta() to negate the replay behavior
3355                                        parse_quote! {
3356                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3357                                            #left_ident -> [0]#stream_ident;
3358                                            #right_ident -> [1]#stream_ident;
3359                                        }
3360                                    } else {
3361                                        parse_quote! {
3362                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3363                                            #left_ident -> [0]#stream_ident;
3364                                            #right_ident -> [1]#stream_ident;
3365                                        }
3366                                    }
3367                                    ,
3368                                    None,
3369                                    Some(&next_stmt_id.to_string()),
3370                                );
3371                            }
3372                            BuildersOrCallback::Callback(_, node_callback) => {
3373                                node_callback(node, next_stmt_id);
3374                            }
3375                        }
3376
3377                        *next_stmt_id += 1;
3378
3379                        ident_stack.push(stream_ident);
3380                    }
3381
3382                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3383                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3384                            parse_quote!(difference)
3385                        } else {
3386                            parse_quote!(anti_join)
3387                        };
3388
3389                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3390                            node
3391                        else {
3392                            unreachable!()
3393                        };
3394
3395                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3396                            quote!('static)
3397                        } else {
3398                            quote!('tick)
3399                        };
3400
3401                        let neg_ident = ident_stack.pop().unwrap();
3402                        let pos_ident = ident_stack.pop().unwrap();
3403
3404                        let stream_ident =
3405                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3406
3407                        match builders_or_callback {
3408                            BuildersOrCallback::Builders(graph_builders) => {
3409                                let builder = graph_builders.get_dfir_mut(&out_location);
3410                                builder.add_dfir(
3411                                    parse_quote! {
3412                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
3413                                        #pos_ident -> [pos]#stream_ident;
3414                                        #neg_ident -> [neg]#stream_ident;
3415                                    },
3416                                    None,
3417                                    Some(&next_stmt_id.to_string()),
3418                                );
3419                            }
3420                            BuildersOrCallback::Callback(_, node_callback) => {
3421                                node_callback(node, next_stmt_id);
3422                            }
3423                        }
3424
3425                        *next_stmt_id += 1;
3426
3427                        ident_stack.push(stream_ident);
3428                    }
3429
3430                    HydroNode::JoinHalf { .. } => {
3431                        let HydroNode::JoinHalf { right, .. } = node else {
3432                            unreachable!()
3433                        };
3434
3435                        assert!(
3436                            right.metadata().collection_kind.is_bounded(),
3437                            "JoinHalf requires the right (build) side to be Bounded, got {:?}",
3438                            right.metadata().collection_kind
3439                        );
3440
3441                        let build_lifetime = if right.metadata().location_id.is_top_level() {
3442                            quote!('static)
3443                        } else {
3444                            quote!('tick)
3445                        };
3446
3447                        let build_ident = ident_stack.pop().unwrap();
3448                        let probe_ident = ident_stack.pop().unwrap();
3449
3450                        let stream_ident =
3451                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3452
3453                        match builders_or_callback {
3454                            BuildersOrCallback::Builders(graph_builders) => {
3455                                let builder = graph_builders.get_dfir_mut(&out_location);
3456                                builder.add_dfir(
3457                                    parse_quote! {
3458                                        #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
3459                                        #probe_ident -> [probe]#stream_ident;
3460                                        #build_ident -> [build]#stream_ident;
3461                                    },
3462                                    None,
3463                                    Some(&next_stmt_id.to_string()),
3464                                );
3465                            }
3466                            BuildersOrCallback::Callback(_, node_callback) => {
3467                                node_callback(node, next_stmt_id);
3468                            }
3469                        }
3470
3471                        *next_stmt_id += 1;
3472
3473                        ident_stack.push(stream_ident);
3474                    }
3475
3476                    HydroNode::ResolveFutures { .. } => {
3477                        let input_ident = ident_stack.pop().unwrap();
3478
3479                        let futures_ident =
3480                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3481
3482                        match builders_or_callback {
3483                            BuildersOrCallback::Builders(graph_builders) => {
3484                                let builder = graph_builders.get_dfir_mut(&out_location);
3485                                builder.add_dfir(
3486                                    parse_quote! {
3487                                        #futures_ident = #input_ident -> resolve_futures();
3488                                    },
3489                                    None,
3490                                    Some(&next_stmt_id.to_string()),
3491                                );
3492                            }
3493                            BuildersOrCallback::Callback(_, node_callback) => {
3494                                node_callback(node, next_stmt_id);
3495                            }
3496                        }
3497
3498                        *next_stmt_id += 1;
3499
3500                        ident_stack.push(futures_ident);
3501                    }
3502
3503                    HydroNode::ResolveFuturesBlocking { .. } => {
3504                        let input_ident = ident_stack.pop().unwrap();
3505
3506                        let futures_ident =
3507                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3508
3509                        match builders_or_callback {
3510                            BuildersOrCallback::Builders(graph_builders) => {
3511                                let builder = graph_builders.get_dfir_mut(&out_location);
3512                                builder.add_dfir(
3513                                    parse_quote! {
3514                                        #futures_ident = #input_ident -> resolve_futures_blocking();
3515                                    },
3516                                    None,
3517                                    Some(&next_stmt_id.to_string()),
3518                                );
3519                            }
3520                            BuildersOrCallback::Callback(_, node_callback) => {
3521                                node_callback(node, next_stmt_id);
3522                            }
3523                        }
3524
3525                        *next_stmt_id += 1;
3526
3527                        ident_stack.push(futures_ident);
3528                    }
3529
3530                    HydroNode::ResolveFuturesOrdered { .. } => {
3531                        let input_ident = ident_stack.pop().unwrap();
3532
3533                        let futures_ident =
3534                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3535
3536                        match builders_or_callback {
3537                            BuildersOrCallback::Builders(graph_builders) => {
3538                                let builder = graph_builders.get_dfir_mut(&out_location);
3539                                builder.add_dfir(
3540                                    parse_quote! {
3541                                        #futures_ident = #input_ident -> resolve_futures_ordered();
3542                                    },
3543                                    None,
3544                                    Some(&next_stmt_id.to_string()),
3545                                );
3546                            }
3547                            BuildersOrCallback::Callback(_, node_callback) => {
3548                                node_callback(node, next_stmt_id);
3549                            }
3550                        }
3551
3552                        *next_stmt_id += 1;
3553
3554                        ident_stack.push(futures_ident);
3555                    }
3556
3557                    HydroNode::Map { f, .. } => {
3558                        let input_ident = ident_stack.pop().unwrap();
3559
3560                        let map_ident =
3561                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3562
3563                        match builders_or_callback {
3564                            BuildersOrCallback::Builders(graph_builders) => {
3565                                let builder = graph_builders.get_dfir_mut(&out_location);
3566                                builder.add_dfir(
3567                                    parse_quote! {
3568                                        #map_ident = #input_ident -> map(#f);
3569                                    },
3570                                    None,
3571                                    Some(&next_stmt_id.to_string()),
3572                                );
3573                            }
3574                            BuildersOrCallback::Callback(_, node_callback) => {
3575                                node_callback(node, next_stmt_id);
3576                            }
3577                        }
3578
3579                        *next_stmt_id += 1;
3580
3581                        ident_stack.push(map_ident);
3582                    }
3583
3584                    HydroNode::FlatMap { f, .. } => {
3585                        let input_ident = ident_stack.pop().unwrap();
3586
3587                        let flat_map_ident =
3588                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3589
3590                        match builders_or_callback {
3591                            BuildersOrCallback::Builders(graph_builders) => {
3592                                let builder = graph_builders.get_dfir_mut(&out_location);
3593                                builder.add_dfir(
3594                                    parse_quote! {
3595                                        #flat_map_ident = #input_ident -> flat_map(#f);
3596                                    },
3597                                    None,
3598                                    Some(&next_stmt_id.to_string()),
3599                                );
3600                            }
3601                            BuildersOrCallback::Callback(_, node_callback) => {
3602                                node_callback(node, next_stmt_id);
3603                            }
3604                        }
3605
3606                        *next_stmt_id += 1;
3607
3608                        ident_stack.push(flat_map_ident);
3609                    }
3610
3611                    HydroNode::FlatMapStreamBlocking { f, .. } => {
3612                        let input_ident = ident_stack.pop().unwrap();
3613
3614                        let flat_map_stream_blocking_ident =
3615                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3616
3617                        match builders_or_callback {
3618                            BuildersOrCallback::Builders(graph_builders) => {
3619                                let builder = graph_builders.get_dfir_mut(&out_location);
3620                                builder.add_dfir(
3621                                    parse_quote! {
3622                                        #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f);
3623                                    },
3624                                    None,
3625                                    Some(&next_stmt_id.to_string()),
3626                                );
3627                            }
3628                            BuildersOrCallback::Callback(_, node_callback) => {
3629                                node_callback(node, next_stmt_id);
3630                            }
3631                        }
3632
3633                        *next_stmt_id += 1;
3634
3635                        ident_stack.push(flat_map_stream_blocking_ident);
3636                    }
3637
3638                    HydroNode::Filter { f, .. } => {
3639                        let input_ident = ident_stack.pop().unwrap();
3640
3641                        let filter_ident =
3642                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3643
3644                        match builders_or_callback {
3645                            BuildersOrCallback::Builders(graph_builders) => {
3646                                let builder = graph_builders.get_dfir_mut(&out_location);
3647                                builder.add_dfir(
3648                                    parse_quote! {
3649                                        #filter_ident = #input_ident -> filter(#f);
3650                                    },
3651                                    None,
3652                                    Some(&next_stmt_id.to_string()),
3653                                );
3654                            }
3655                            BuildersOrCallback::Callback(_, node_callback) => {
3656                                node_callback(node, next_stmt_id);
3657                            }
3658                        }
3659
3660                        *next_stmt_id += 1;
3661
3662                        ident_stack.push(filter_ident);
3663                    }
3664
3665                    HydroNode::FilterMap { f, .. } => {
3666                        let input_ident = ident_stack.pop().unwrap();
3667
3668                        let filter_map_ident =
3669                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3670
3671                        match builders_or_callback {
3672                            BuildersOrCallback::Builders(graph_builders) => {
3673                                let builder = graph_builders.get_dfir_mut(&out_location);
3674                                builder.add_dfir(
3675                                    parse_quote! {
3676                                        #filter_map_ident = #input_ident -> filter_map(#f);
3677                                    },
3678                                    None,
3679                                    Some(&next_stmt_id.to_string()),
3680                                );
3681                            }
3682                            BuildersOrCallback::Callback(_, node_callback) => {
3683                                node_callback(node, next_stmt_id);
3684                            }
3685                        }
3686
3687                        *next_stmt_id += 1;
3688
3689                        ident_stack.push(filter_map_ident);
3690                    }
3691
3692                    HydroNode::Sort { .. } => {
3693                        let input_ident = ident_stack.pop().unwrap();
3694
3695                        let sort_ident =
3696                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3697
3698                        match builders_or_callback {
3699                            BuildersOrCallback::Builders(graph_builders) => {
3700                                let builder = graph_builders.get_dfir_mut(&out_location);
3701                                builder.add_dfir(
3702                                    parse_quote! {
3703                                        #sort_ident = #input_ident -> sort();
3704                                    },
3705                                    None,
3706                                    Some(&next_stmt_id.to_string()),
3707                                );
3708                            }
3709                            BuildersOrCallback::Callback(_, node_callback) => {
3710                                node_callback(node, next_stmt_id);
3711                            }
3712                        }
3713
3714                        *next_stmt_id += 1;
3715
3716                        ident_stack.push(sort_ident);
3717                    }
3718
3719                    HydroNode::DeferTick { .. } => {
3720                        let input_ident = ident_stack.pop().unwrap();
3721
3722                        let defer_tick_ident =
3723                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3724
3725                        match builders_or_callback {
3726                            BuildersOrCallback::Builders(graph_builders) => {
3727                                let builder = graph_builders.get_dfir_mut(&out_location);
3728                                builder.add_dfir(
3729                                    parse_quote! {
3730                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
3731                                    },
3732                                    None,
3733                                    Some(&next_stmt_id.to_string()),
3734                                );
3735                            }
3736                            BuildersOrCallback::Callback(_, node_callback) => {
3737                                node_callback(node, next_stmt_id);
3738                            }
3739                        }
3740
3741                        *next_stmt_id += 1;
3742
3743                        ident_stack.push(defer_tick_ident);
3744                    }
3745
3746                    HydroNode::Enumerate { input, .. } => {
3747                        let input_ident = ident_stack.pop().unwrap();
3748
3749                        let enumerate_ident =
3750                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3751
3752                        match builders_or_callback {
3753                            BuildersOrCallback::Builders(graph_builders) => {
3754                                let builder = graph_builders.get_dfir_mut(&out_location);
3755                                let lifetime = if input.metadata().location_id.is_top_level() {
3756                                    quote!('static)
3757                                } else {
3758                                    quote!('tick)
3759                                };
3760                                builder.add_dfir(
3761                                    parse_quote! {
3762                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3763                                    },
3764                                    None,
3765                                    Some(&next_stmt_id.to_string()),
3766                                );
3767                            }
3768                            BuildersOrCallback::Callback(_, node_callback) => {
3769                                node_callback(node, next_stmt_id);
3770                            }
3771                        }
3772
3773                        *next_stmt_id += 1;
3774
3775                        ident_stack.push(enumerate_ident);
3776                    }
3777
3778                    HydroNode::Inspect { f, .. } => {
3779                        let input_ident = ident_stack.pop().unwrap();
3780
3781                        let inspect_ident =
3782                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3783
3784                        match builders_or_callback {
3785                            BuildersOrCallback::Builders(graph_builders) => {
3786                                let builder = graph_builders.get_dfir_mut(&out_location);
3787                                builder.add_dfir(
3788                                    parse_quote! {
3789                                        #inspect_ident = #input_ident -> inspect(#f);
3790                                    },
3791                                    None,
3792                                    Some(&next_stmt_id.to_string()),
3793                                );
3794                            }
3795                            BuildersOrCallback::Callback(_, node_callback) => {
3796                                node_callback(node, next_stmt_id);
3797                            }
3798                        }
3799
3800                        *next_stmt_id += 1;
3801
3802                        ident_stack.push(inspect_ident);
3803                    }
3804
3805                    HydroNode::Unique { input, .. } => {
3806                        let input_ident = ident_stack.pop().unwrap();
3807
3808                        let unique_ident =
3809                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3810
3811                        match builders_or_callback {
3812                            BuildersOrCallback::Builders(graph_builders) => {
3813                                let builder = graph_builders.get_dfir_mut(&out_location);
3814                                let lifetime = if input.metadata().location_id.is_top_level() {
3815                                    quote!('static)
3816                                } else {
3817                                    quote!('tick)
3818                                };
3819
3820                                builder.add_dfir(
3821                                    parse_quote! {
3822                                        #unique_ident = #input_ident -> unique::<#lifetime>();
3823                                    },
3824                                    None,
3825                                    Some(&next_stmt_id.to_string()),
3826                                );
3827                            }
3828                            BuildersOrCallback::Callback(_, node_callback) => {
3829                                node_callback(node, next_stmt_id);
3830                            }
3831                        }
3832
3833                        *next_stmt_id += 1;
3834
3835                        ident_stack.push(unique_ident);
3836                    }
3837
3838                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
3839                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3840                            if input.metadata().location_id.is_top_level()
3841                                && input.metadata().collection_kind.is_bounded()
3842                            {
3843                                parse_quote!(fold_no_replay)
3844                            } else {
3845                                parse_quote!(fold)
3846                            }
3847                        } else if matches!(node, HydroNode::Scan { .. }) {
3848                            parse_quote!(scan)
3849                        } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
3850                            parse_quote!(scan_async_blocking)
3851                        } else if let HydroNode::FoldKeyed { input, .. } = node {
3852                            if input.metadata().location_id.is_top_level()
3853                                && input.metadata().collection_kind.is_bounded()
3854                            {
3855                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
3856                            } else {
3857                                parse_quote!(fold_keyed)
3858                            }
3859                        } else {
3860                            unreachable!()
3861                        };
3862
3863                        let (HydroNode::Fold { input, .. }
3864                        | HydroNode::FoldKeyed { input, .. }
3865                        | HydroNode::Scan { input, .. }
3866                        | HydroNode::ScanAsyncBlocking { input, .. }) = node
3867                        else {
3868                            unreachable!()
3869                        };
3870
3871                        let lifetime = if input.metadata().location_id.is_top_level() {
3872                            quote!('static)
3873                        } else {
3874                            quote!('tick)
3875                        };
3876
3877                        let input_ident = ident_stack.pop().unwrap();
3878
3879                        let (HydroNode::Fold { init, acc, .. }
3880                        | HydroNode::FoldKeyed { init, acc, .. }
3881                        | HydroNode::Scan { init, acc, .. }
3882                        | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
3883                        else {
3884                            unreachable!()
3885                        };
3886
3887                        let fold_ident =
3888                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3889
3890                        match builders_or_callback {
3891                            BuildersOrCallback::Builders(graph_builders) => {
3892                                if matches!(node, HydroNode::Fold { .. })
3893                                    && node.metadata().location_id.is_top_level()
3894                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3895                                    && graph_builders.singleton_intermediates()
3896                                    && !node.metadata().collection_kind.is_bounded()
3897                                {
3898                                    let builder = graph_builders.get_dfir_mut(&out_location);
3899
3900                                    let acc: syn::Expr = parse_quote!({
3901                                        let mut __inner = #acc;
3902                                        move |__state, __value| {
3903                                            __inner(__state, __value);
3904                                            Some(__state.clone())
3905                                        }
3906                                    });
3907
3908                                    builder.add_dfir(
3909                                        parse_quote! {
3910                                            source_iter([(#init)()]) -> [0]#fold_ident;
3911                                            #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3912                                            #fold_ident = chain();
3913                                        },
3914                                        None,
3915                                        Some(&next_stmt_id.to_string()),
3916                                    );
3917                                } else if matches!(node, HydroNode::FoldKeyed { .. })
3918                                    && node.metadata().location_id.is_top_level()
3919                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3920                                    && graph_builders.singleton_intermediates()
3921                                    && !node.metadata().collection_kind.is_bounded()
3922                                {
3923                                    let builder = graph_builders.get_dfir_mut(&out_location);
3924
3925                                    let acc: syn::Expr = parse_quote!({
3926                                        let mut __init = #init;
3927                                        let mut __inner = #acc;
3928                                        move |__state, __kv: (_, _)| {
3929                                            // TODO(shadaj): we can avoid the clone when the entry exists
3930                                            let __state = __state
3931                                                .entry(::std::clone::Clone::clone(&__kv.0))
3932                                                .or_insert_with(|| (__init)());
3933                                            __inner(__state, __kv.1);
3934                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3935                                        }
3936                                    });
3937
3938                                    builder.add_dfir(
3939                                        parse_quote! {
3940                                            #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3941                                        },
3942                                        None,
3943                                        Some(&next_stmt_id.to_string()),
3944                                    );
3945                                } else {
3946                                    let builder = graph_builders.get_dfir_mut(&out_location);
3947                                    builder.add_dfir(
3948                                        parse_quote! {
3949                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3950                                        },
3951                                        None,
3952                                        Some(&next_stmt_id.to_string()),
3953                                    );
3954                                }
3955                            }
3956                            BuildersOrCallback::Callback(_, node_callback) => {
3957                                node_callback(node, next_stmt_id);
3958                            }
3959                        }
3960
3961                        *next_stmt_id += 1;
3962
3963                        ident_stack.push(fold_ident);
3964                    }
3965
3966                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3967                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3968                            if input.metadata().location_id.is_top_level()
3969                                && input.metadata().collection_kind.is_bounded()
3970                            {
3971                                parse_quote!(reduce_no_replay)
3972                            } else {
3973                                parse_quote!(reduce)
3974                            }
3975                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
3976                            if input.metadata().location_id.is_top_level()
3977                                && input.metadata().collection_kind.is_bounded()
3978                            {
3979                                todo!(
3980                                    "Calling keyed reduce on a top-level bounded collection is not supported"
3981                                )
3982                            } else {
3983                                parse_quote!(reduce_keyed)
3984                            }
3985                        } else {
3986                            unreachable!()
3987                        };
3988
3989                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3990                        else {
3991                            unreachable!()
3992                        };
3993
3994                        let lifetime = if input.metadata().location_id.is_top_level() {
3995                            quote!('static)
3996                        } else {
3997                            quote!('tick)
3998                        };
3999
4000                        let input_ident = ident_stack.pop().unwrap();
4001
4002                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4003                        else {
4004                            unreachable!()
4005                        };
4006
4007                        let reduce_ident =
4008                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4009
4010                        match builders_or_callback {
4011                            BuildersOrCallback::Builders(graph_builders) => {
4012                                if matches!(node, HydroNode::Reduce { .. })
4013                                    && node.metadata().location_id.is_top_level()
4014                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4015                                    && graph_builders.singleton_intermediates()
4016                                    && !node.metadata().collection_kind.is_bounded()
4017                                {
4018                                    todo!(
4019                                        "Reduce with optional intermediates is not yet supported in simulator"
4020                                    );
4021                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
4022                                    && node.metadata().location_id.is_top_level()
4023                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4024                                    && graph_builders.singleton_intermediates()
4025                                    && !node.metadata().collection_kind.is_bounded()
4026                                {
4027                                    todo!(
4028                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
4029                                    );
4030                                } else {
4031                                    let builder = graph_builders.get_dfir_mut(&out_location);
4032                                    builder.add_dfir(
4033                                        parse_quote! {
4034                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
4035                                        },
4036                                        None,
4037                                        Some(&next_stmt_id.to_string()),
4038                                    );
4039                                }
4040                            }
4041                            BuildersOrCallback::Callback(_, node_callback) => {
4042                                node_callback(node, next_stmt_id);
4043                            }
4044                        }
4045
4046                        *next_stmt_id += 1;
4047
4048                        ident_stack.push(reduce_ident);
4049                    }
4050
4051                    HydroNode::ReduceKeyedWatermark {
4052                        f,
4053                        input,
4054                        metadata,
4055                        ..
4056                    } => {
4057                        let lifetime = if input.metadata().location_id.is_top_level() {
4058                            quote!('static)
4059                        } else {
4060                            quote!('tick)
4061                        };
4062
4063                        // watermark is processed second, so it's on top
4064                        let watermark_ident = ident_stack.pop().unwrap();
4065                        let input_ident = ident_stack.pop().unwrap();
4066
4067                        let chain_ident = syn::Ident::new(
4068                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
4069                            Span::call_site(),
4070                        );
4071
4072                        let fold_ident =
4073                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4074
4075                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4076                            && input.metadata().collection_kind.is_bounded()
4077                        {
4078                            parse_quote!(fold_no_replay)
4079                        } else {
4080                            parse_quote!(fold)
4081                        };
4082
4083                        match builders_or_callback {
4084                            BuildersOrCallback::Builders(graph_builders) => {
4085                                if metadata.location_id.is_top_level()
4086                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4087                                    && graph_builders.singleton_intermediates()
4088                                    && !metadata.collection_kind.is_bounded()
4089                                {
4090                                    todo!(
4091                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4092                                    )
4093                                } else {
4094                                    let builder = graph_builders.get_dfir_mut(&out_location);
4095                                    builder.add_dfir(
4096                                        parse_quote! {
4097                                            #chain_ident = chain();
4098                                            #input_ident
4099                                                -> map(|x| (Some(x), None))
4100                                                -> [0]#chain_ident;
4101                                            #watermark_ident
4102                                                -> map(|watermark| (None, Some(watermark)))
4103                                                -> [1]#chain_ident;
4104
4105                                            #fold_ident = #chain_ident
4106                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4107                                                    let __reduce_keyed_fn = #f;
4108                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4109                                                        if let Some((k, v)) = opt_payload {
4110                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4111                                                                if k < curr_watermark {
4112                                                                    return;
4113                                                                }
4114                                                            }
4115                                                            match map.entry(k) {
4116                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
4117                                                                    e.insert(v);
4118                                                                }
4119                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
4120                                                                    __reduce_keyed_fn(e.get_mut(), v);
4121                                                                }
4122                                                            }
4123                                                        } else {
4124                                                            let watermark = opt_watermark.unwrap();
4125                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4126                                                                if watermark <= curr_watermark {
4127                                                                    return;
4128                                                                }
4129                                                            }
4130                                                            map.retain(|k, _| *k >= watermark);
4131                                                            *opt_curr_watermark = Some(watermark);
4132                                                        }
4133                                                    }
4134                                                })
4135                                                -> flat_map(|(map, _curr_watermark)| map);
4136                                        },
4137                                        None,
4138                                        Some(&next_stmt_id.to_string()),
4139                                    );
4140                                }
4141                            }
4142                            BuildersOrCallback::Callback(_, node_callback) => {
4143                                node_callback(node, next_stmt_id);
4144                            }
4145                        }
4146
4147                        *next_stmt_id += 1;
4148
4149                        ident_stack.push(fold_ident);
4150                    }
4151
4152                    HydroNode::Network {
4153                        networking_info,
4154                        serialize_fn: serialize_pipeline,
4155                        instantiate_fn,
4156                        deserialize_fn: deserialize_pipeline,
4157                        input,
4158                        ..
4159                    } => {
4160                        let input_ident = ident_stack.pop().unwrap();
4161
4162                        let receiver_stream_ident =
4163                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4164
4165                        match builders_or_callback {
4166                            BuildersOrCallback::Builders(graph_builders) => {
4167                                let (sink_expr, source_expr) = match instantiate_fn {
4168                                    DebugInstantiate::Building => (
4169                                        syn::parse_quote!(DUMMY_SINK),
4170                                        syn::parse_quote!(DUMMY_SOURCE),
4171                                    ),
4172
4173                                    DebugInstantiate::Finalized(finalized) => {
4174                                        (finalized.sink.clone(), finalized.source.clone())
4175                                    }
4176                                };
4177
4178                                graph_builders.create_network(
4179                                    &input.metadata().location_id,
4180                                    &out_location,
4181                                    input_ident,
4182                                    &receiver_stream_ident,
4183                                    serialize_pipeline.as_ref(),
4184                                    sink_expr,
4185                                    source_expr,
4186                                    deserialize_pipeline.as_ref(),
4187                                    *next_stmt_id,
4188                                    networking_info,
4189                                );
4190                            }
4191                            BuildersOrCallback::Callback(_, node_callback) => {
4192                                node_callback(node, next_stmt_id);
4193                            }
4194                        }
4195
4196                        *next_stmt_id += 1;
4197
4198                        ident_stack.push(receiver_stream_ident);
4199                    }
4200
4201                    HydroNode::ExternalInput {
4202                        instantiate_fn,
4203                        deserialize_fn: deserialize_pipeline,
4204                        ..
4205                    } => {
4206                        let receiver_stream_ident =
4207                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4208
4209                        match builders_or_callback {
4210                            BuildersOrCallback::Builders(graph_builders) => {
4211                                let (_, source_expr) = match instantiate_fn {
4212                                    DebugInstantiate::Building => (
4213                                        syn::parse_quote!(DUMMY_SINK),
4214                                        syn::parse_quote!(DUMMY_SOURCE),
4215                                    ),
4216
4217                                    DebugInstantiate::Finalized(finalized) => {
4218                                        (finalized.sink.clone(), finalized.source.clone())
4219                                    }
4220                                };
4221
4222                                graph_builders.create_external_source(
4223                                    &out_location,
4224                                    source_expr,
4225                                    &receiver_stream_ident,
4226                                    deserialize_pipeline.as_ref(),
4227                                    *next_stmt_id,
4228                                );
4229                            }
4230                            BuildersOrCallback::Callback(_, node_callback) => {
4231                                node_callback(node, next_stmt_id);
4232                            }
4233                        }
4234
4235                        *next_stmt_id += 1;
4236
4237                        ident_stack.push(receiver_stream_ident);
4238                    }
4239
4240                    HydroNode::Counter {
4241                        tag,
4242                        duration,
4243                        prefix,
4244                        ..
4245                    } => {
4246                        let input_ident = ident_stack.pop().unwrap();
4247
4248                        let counter_ident =
4249                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4250
4251                        match builders_or_callback {
4252                            BuildersOrCallback::Builders(graph_builders) => {
4253                                let arg = format!("{}({})", prefix, tag);
4254                                let builder = graph_builders.get_dfir_mut(&out_location);
4255                                builder.add_dfir(
4256                                    parse_quote! {
4257                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
4258                                    },
4259                                    None,
4260                                    Some(&next_stmt_id.to_string()),
4261                                );
4262                            }
4263                            BuildersOrCallback::Callback(_, node_callback) => {
4264                                node_callback(node, next_stmt_id);
4265                            }
4266                        }
4267
4268                        *next_stmt_id += 1;
4269
4270                        ident_stack.push(counter_ident);
4271                    }
4272                }
4273            },
4274            seen_tees,
4275            false,
4276        );
4277
4278        ident_stack
4279            .pop()
4280            .expect("ident_stack should have exactly one element after traversal")
4281    }
4282
4283    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4284        match self {
4285            HydroNode::Placeholder => {
4286                panic!()
4287            }
4288            HydroNode::Cast { .. }
4289            | HydroNode::ObserveNonDet { .. }
4290            | HydroNode::AssertIsConsistent { .. } => {}
4291            HydroNode::Source { source, .. } => match source {
4292                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4293                HydroSource::ExternalNetwork()
4294                | HydroSource::Spin()
4295                | HydroSource::ClusterMembers(_, _)
4296                | HydroSource::Embedded(_)
4297                | HydroSource::EmbeddedSingleton(_) => {} // TODO: what goes here?
4298            },
4299            HydroNode::SingletonSource { value, .. } => {
4300                transform(value);
4301            }
4302            HydroNode::CycleSource { .. }
4303            | HydroNode::Tee { .. }
4304            | HydroNode::YieldConcat { .. }
4305            | HydroNode::BeginAtomic { .. }
4306            | HydroNode::EndAtomic { .. }
4307            | HydroNode::Batch { .. }
4308            | HydroNode::Chain { .. }
4309            | HydroNode::ChainFirst { .. }
4310            | HydroNode::CrossProduct { .. }
4311            | HydroNode::CrossSingleton { .. }
4312            | HydroNode::ResolveFutures { .. }
4313            | HydroNode::ResolveFuturesBlocking { .. }
4314            | HydroNode::ResolveFuturesOrdered { .. }
4315            | HydroNode::Join { .. }
4316            | HydroNode::JoinHalf { .. }
4317            | HydroNode::Difference { .. }
4318            | HydroNode::AntiJoin { .. }
4319            | HydroNode::DeferTick { .. }
4320            | HydroNode::Enumerate { .. }
4321            | HydroNode::Unique { .. }
4322            | HydroNode::Sort { .. } => {}
4323            HydroNode::Map { f, .. }
4324            | HydroNode::FlatMap { f, .. }
4325            | HydroNode::FlatMapStreamBlocking { f, .. }
4326            | HydroNode::Filter { f, .. }
4327            | HydroNode::FilterMap { f, .. }
4328            | HydroNode::Inspect { f, .. }
4329            | HydroNode::Partition { f, .. }
4330            | HydroNode::Reduce { f, .. }
4331            | HydroNode::ReduceKeyed { f, .. }
4332            | HydroNode::ReduceKeyedWatermark { f, .. } => {
4333                transform(f);
4334            }
4335            HydroNode::Fold { init, acc, .. }
4336            | HydroNode::Scan { init, acc, .. }
4337            | HydroNode::ScanAsyncBlocking { init, acc, .. }
4338            | HydroNode::FoldKeyed { init, acc, .. } => {
4339                transform(init);
4340                transform(acc);
4341            }
4342            HydroNode::Network {
4343                serialize_fn,
4344                deserialize_fn,
4345                ..
4346            } => {
4347                if let Some(serialize_fn) = serialize_fn {
4348                    transform(serialize_fn);
4349                }
4350                if let Some(deserialize_fn) = deserialize_fn {
4351                    transform(deserialize_fn);
4352                }
4353            }
4354            HydroNode::ExternalInput { deserialize_fn, .. } => {
4355                if let Some(deserialize_fn) = deserialize_fn {
4356                    transform(deserialize_fn);
4357                }
4358            }
4359            HydroNode::Counter { duration, .. } => {
4360                transform(duration);
4361            }
4362        }
4363    }
4364
4365    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4366        &self.metadata().op
4367    }
4368
4369    pub fn metadata(&self) -> &HydroIrMetadata {
4370        match self {
4371            HydroNode::Placeholder => {
4372                panic!()
4373            }
4374            HydroNode::Cast { metadata, .. } => metadata,
4375            HydroNode::ObserveNonDet { metadata, .. } => metadata,
4376            HydroNode::AssertIsConsistent { metadata, .. } => metadata,
4377            HydroNode::Source { metadata, .. } => metadata,
4378            HydroNode::SingletonSource { metadata, .. } => metadata,
4379            HydroNode::CycleSource { metadata, .. } => metadata,
4380            HydroNode::Tee { metadata, .. } => metadata,
4381            HydroNode::Partition { metadata, .. } => metadata,
4382            HydroNode::YieldConcat { metadata, .. } => metadata,
4383            HydroNode::BeginAtomic { metadata, .. } => metadata,
4384            HydroNode::EndAtomic { metadata, .. } => metadata,
4385            HydroNode::Batch { metadata, .. } => metadata,
4386            HydroNode::Chain { metadata, .. } => metadata,
4387            HydroNode::ChainFirst { metadata, .. } => metadata,
4388            HydroNode::CrossProduct { metadata, .. } => metadata,
4389            HydroNode::CrossSingleton { metadata, .. } => metadata,
4390            HydroNode::Join { metadata, .. } => metadata,
4391            HydroNode::JoinHalf { metadata, .. } => metadata,
4392            HydroNode::Difference { metadata, .. } => metadata,
4393            HydroNode::AntiJoin { metadata, .. } => metadata,
4394            HydroNode::ResolveFutures { metadata, .. } => metadata,
4395            HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4396            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4397            HydroNode::Map { metadata, .. } => metadata,
4398            HydroNode::FlatMap { metadata, .. } => metadata,
4399            HydroNode::FlatMapStreamBlocking { metadata, .. } => metadata,
4400            HydroNode::Filter { metadata, .. } => metadata,
4401            HydroNode::FilterMap { metadata, .. } => metadata,
4402            HydroNode::DeferTick { metadata, .. } => metadata,
4403            HydroNode::Enumerate { metadata, .. } => metadata,
4404            HydroNode::Inspect { metadata, .. } => metadata,
4405            HydroNode::Unique { metadata, .. } => metadata,
4406            HydroNode::Sort { metadata, .. } => metadata,
4407            HydroNode::Scan { metadata, .. } => metadata,
4408            HydroNode::ScanAsyncBlocking { metadata, .. } => metadata,
4409            HydroNode::Fold { metadata, .. } => metadata,
4410            HydroNode::FoldKeyed { metadata, .. } => metadata,
4411            HydroNode::Reduce { metadata, .. } => metadata,
4412            HydroNode::ReduceKeyed { metadata, .. } => metadata,
4413            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4414            HydroNode::ExternalInput { metadata, .. } => metadata,
4415            HydroNode::Network { metadata, .. } => metadata,
4416            HydroNode::Counter { metadata, .. } => metadata,
4417        }
4418    }
4419
4420    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
4421        &mut self.metadata_mut().op
4422    }
4423
4424    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
4425        match self {
4426            HydroNode::Placeholder => {
4427                panic!()
4428            }
4429            HydroNode::Cast { metadata, .. } => metadata,
4430            HydroNode::ObserveNonDet { metadata, .. } => metadata,
4431            HydroNode::AssertIsConsistent { metadata, .. } => metadata,
4432            HydroNode::Source { metadata, .. } => metadata,
4433            HydroNode::SingletonSource { metadata, .. } => metadata,
4434            HydroNode::CycleSource { metadata, .. } => metadata,
4435            HydroNode::Tee { metadata, .. } => metadata,
4436            HydroNode::Partition { metadata, .. } => metadata,
4437            HydroNode::YieldConcat { metadata, .. } => metadata,
4438            HydroNode::BeginAtomic { metadata, .. } => metadata,
4439            HydroNode::EndAtomic { metadata, .. } => metadata,
4440            HydroNode::Batch { metadata, .. } => metadata,
4441            HydroNode::Chain { metadata, .. } => metadata,
4442            HydroNode::ChainFirst { metadata, .. } => metadata,
4443            HydroNode::CrossProduct { metadata, .. } => metadata,
4444            HydroNode::CrossSingleton { metadata, .. } => metadata,
4445            HydroNode::Join { metadata, .. } => metadata,
4446            HydroNode::JoinHalf { metadata, .. } => metadata,
4447            HydroNode::Difference { metadata, .. } => metadata,
4448            HydroNode::AntiJoin { metadata, .. } => metadata,
4449            HydroNode::ResolveFutures { metadata, .. } => metadata,
4450            HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4451            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4452            HydroNode::Map { metadata, .. } => metadata,
4453            HydroNode::FlatMap { metadata, .. } => metadata,
4454            HydroNode::FlatMapStreamBlocking { metadata, .. } => metadata,
4455            HydroNode::Filter { metadata, .. } => metadata,
4456            HydroNode::FilterMap { metadata, .. } => metadata,
4457            HydroNode::DeferTick { metadata, .. } => metadata,
4458            HydroNode::Enumerate { metadata, .. } => metadata,
4459            HydroNode::Inspect { metadata, .. } => metadata,
4460            HydroNode::Unique { metadata, .. } => metadata,
4461            HydroNode::Sort { metadata, .. } => metadata,
4462            HydroNode::Scan { metadata, .. } => metadata,
4463            HydroNode::ScanAsyncBlocking { metadata, .. } => metadata,
4464            HydroNode::Fold { metadata, .. } => metadata,
4465            HydroNode::FoldKeyed { metadata, .. } => metadata,
4466            HydroNode::Reduce { metadata, .. } => metadata,
4467            HydroNode::ReduceKeyed { metadata, .. } => metadata,
4468            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4469            HydroNode::ExternalInput { metadata, .. } => metadata,
4470            HydroNode::Network { metadata, .. } => metadata,
4471            HydroNode::Counter { metadata, .. } => metadata,
4472        }
4473    }
4474
4475    pub fn input(&self) -> Vec<&HydroNode> {
4476        match self {
4477            HydroNode::Placeholder => {
4478                panic!()
4479            }
4480            HydroNode::Source { .. }
4481            | HydroNode::SingletonSource { .. }
4482            | HydroNode::ExternalInput { .. }
4483            | HydroNode::CycleSource { .. }
4484            | HydroNode::Tee { .. }
4485            | HydroNode::Partition { .. } => {
4486                // Tee/Partition should find their input in separate special ways
4487                vec![]
4488            }
4489            HydroNode::Cast { inner, .. }
4490            | HydroNode::ObserveNonDet { inner, .. }
4491            | HydroNode::YieldConcat { inner, .. }
4492            | HydroNode::BeginAtomic { inner, .. }
4493            | HydroNode::EndAtomic { inner, .. }
4494            | HydroNode::Batch { inner, .. }
4495            | HydroNode::AssertIsConsistent { inner, .. } => {
4496                vec![inner]
4497            }
4498            HydroNode::Chain { first, second, .. } => {
4499                vec![first, second]
4500            }
4501            HydroNode::ChainFirst { first, second, .. } => {
4502                vec![first, second]
4503            }
4504            HydroNode::CrossProduct { left, right, .. }
4505            | HydroNode::CrossSingleton { left, right, .. }
4506            | HydroNode::Join { left, right, .. }
4507            | HydroNode::JoinHalf { left, right, .. } => {
4508                vec![left, right]
4509            }
4510            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
4511                vec![pos, neg]
4512            }
4513            HydroNode::Map { input, .. }
4514            | HydroNode::FlatMap { input, .. }
4515            | HydroNode::FlatMapStreamBlocking { input, .. }
4516            | HydroNode::Filter { input, .. }
4517            | HydroNode::FilterMap { input, .. }
4518            | HydroNode::Sort { input, .. }
4519            | HydroNode::DeferTick { input, .. }
4520            | HydroNode::Enumerate { input, .. }
4521            | HydroNode::Inspect { input, .. }
4522            | HydroNode::Unique { input, .. }
4523            | HydroNode::Network { input, .. }
4524            | HydroNode::Counter { input, .. }
4525            | HydroNode::ResolveFutures { input, .. }
4526            | HydroNode::ResolveFuturesBlocking { input, .. }
4527            | HydroNode::ResolveFuturesOrdered { input, .. }
4528            | HydroNode::Fold { input, .. }
4529            | HydroNode::FoldKeyed { input, .. }
4530            | HydroNode::Reduce { input, .. }
4531            | HydroNode::ReduceKeyed { input, .. }
4532            | HydroNode::Scan { input, .. }
4533            | HydroNode::ScanAsyncBlocking { input, .. } => {
4534                vec![input]
4535            }
4536            HydroNode::ReduceKeyedWatermark {
4537                input, watermark, ..
4538            } => {
4539                vec![input, watermark]
4540            }
4541        }
4542    }
4543
4544    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
4545        self.input()
4546            .iter()
4547            .map(|input_node| input_node.metadata())
4548            .collect()
4549    }
4550
4551    /// Returns `true` if this node is a Tee or Partition whose inner Rc
4552    /// has other live references, meaning the upstream is already driven
4553    /// by another consumer and does not need a Null sink.
4554    pub fn is_shared_with_others(&self) -> bool {
4555        match self {
4556            HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
4557                Rc::strong_count(&inner.0) > 1
4558            }
4559            _ => false,
4560        }
4561    }
4562
4563    pub fn print_root(&self) -> String {
4564        match self {
4565            HydroNode::Placeholder => {
4566                panic!()
4567            }
4568            HydroNode::Cast { .. } => "Cast()".to_owned(),
4569            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
4570            HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
4571            HydroNode::Source { source, .. } => format!("Source({:?})", source),
4572            HydroNode::SingletonSource {
4573                value,
4574                first_tick_only,
4575                ..
4576            } => format!(
4577                "SingletonSource({:?}, first_tick_only={})",
4578                value, first_tick_only
4579            ),
4580            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
4581            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
4582            HydroNode::Partition { f, is_true, .. } => {
4583                format!("Partition({:?}, is_true={})", f, is_true)
4584            }
4585            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
4586            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
4587            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
4588            HydroNode::Batch { .. } => "Batch()".to_owned(),
4589            HydroNode::Chain { first, second, .. } => {
4590                format!("Chain({}, {})", first.print_root(), second.print_root())
4591            }
4592            HydroNode::ChainFirst { first, second, .. } => {
4593                format!(
4594                    "ChainFirst({}, {})",
4595                    first.print_root(),
4596                    second.print_root()
4597                )
4598            }
4599            HydroNode::CrossProduct { left, right, .. } => {
4600                format!(
4601                    "CrossProduct({}, {})",
4602                    left.print_root(),
4603                    right.print_root()
4604                )
4605            }
4606            HydroNode::CrossSingleton { left, right, .. } => {
4607                format!(
4608                    "CrossSingleton({}, {})",
4609                    left.print_root(),
4610                    right.print_root()
4611                )
4612            }
4613            HydroNode::Join { left, right, .. } => {
4614                format!("Join({}, {})", left.print_root(), right.print_root())
4615            }
4616            HydroNode::JoinHalf { left, right, .. } => {
4617                format!("JoinHalf({}, {})", left.print_root(), right.print_root())
4618            }
4619            HydroNode::Difference { pos, neg, .. } => {
4620                format!("Difference({}, {})", pos.print_root(), neg.print_root())
4621            }
4622            HydroNode::AntiJoin { pos, neg, .. } => {
4623                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
4624            }
4625            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
4626            HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
4627            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
4628            HydroNode::Map { f, .. } => format!("Map({:?})", f),
4629            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
4630            HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
4631            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
4632            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
4633            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
4634            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
4635            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
4636            HydroNode::Unique { .. } => "Unique()".to_owned(),
4637            HydroNode::Sort { .. } => "Sort()".to_owned(),
4638            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
4639            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
4640            HydroNode::ScanAsyncBlocking { init, acc, .. } => {
4641                format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
4642            }
4643            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
4644            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
4645            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
4646            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
4647            HydroNode::Network { .. } => "Network()".to_owned(),
4648            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
4649            HydroNode::Counter { tag, duration, .. } => {
4650                format!("Counter({:?}, {:?})", tag, duration)
4651            }
4652        }
4653    }
4654}
4655
4656#[cfg(feature = "build")]
4657fn instantiate_network<'a, D>(
4658    env: &mut D::InstantiateEnv,
4659    from_location: &LocationId,
4660    to_location: &LocationId,
4661    processes: &SparseSecondaryMap<LocationKey, D::Process>,
4662    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
4663    name: Option<&str>,
4664    networking_info: &crate::networking::NetworkingInfo,
4665) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
4666where
4667    D: Deploy<'a>,
4668{
4669    let ((sink, source), connect_fn) = match (from_location, to_location) {
4670        (&LocationId::Process(from), &LocationId::Process(to)) => {
4671            let from_node = processes
4672                .get(from)
4673                .unwrap_or_else(|| {
4674                    panic!("A process used in the graph was not instantiated: {}", from)
4675                })
4676                .clone();
4677            let to_node = processes
4678                .get(to)
4679                .unwrap_or_else(|| {
4680                    panic!("A process used in the graph was not instantiated: {}", to)
4681                })
4682                .clone();
4683
4684            let sink_port = from_node.next_port();
4685            let source_port = to_node.next_port();
4686
4687            (
4688                D::o2o_sink_source(
4689                    env,
4690                    &from_node,
4691                    &sink_port,
4692                    &to_node,
4693                    &source_port,
4694                    name,
4695                    networking_info,
4696                ),
4697                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
4698            )
4699        }
4700        (&LocationId::Process(from), &LocationId::Cluster(to, _)) => {
4701            let from_node = processes
4702                .get(from)
4703                .unwrap_or_else(|| {
4704                    panic!("A process used in the graph was not instantiated: {}", from)
4705                })
4706                .clone();
4707            let to_node = clusters
4708                .get(to)
4709                .unwrap_or_else(|| {
4710                    panic!("A cluster used in the graph was not instantiated: {}", to)
4711                })
4712                .clone();
4713
4714            let sink_port = from_node.next_port();
4715            let source_port = to_node.next_port();
4716
4717            (
4718                D::o2m_sink_source(
4719                    env,
4720                    &from_node,
4721                    &sink_port,
4722                    &to_node,
4723                    &source_port,
4724                    name,
4725                    networking_info,
4726                ),
4727                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
4728            )
4729        }
4730        (&LocationId::Cluster(from, _), &LocationId::Process(to)) => {
4731            let from_node = clusters
4732                .get(from)
4733                .unwrap_or_else(|| {
4734                    panic!("A cluster used in the graph was not instantiated: {}", from)
4735                })
4736                .clone();
4737            let to_node = processes
4738                .get(to)
4739                .unwrap_or_else(|| {
4740                    panic!("A process used in the graph was not instantiated: {}", to)
4741                })
4742                .clone();
4743
4744            let sink_port = from_node.next_port();
4745            let source_port = to_node.next_port();
4746
4747            (
4748                D::m2o_sink_source(
4749                    env,
4750                    &from_node,
4751                    &sink_port,
4752                    &to_node,
4753                    &source_port,
4754                    name,
4755                    networking_info,
4756                ),
4757                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
4758            )
4759        }
4760        (&LocationId::Cluster(from, _), &LocationId::Cluster(to, _)) => {
4761            let from_node = clusters
4762                .get(from)
4763                .unwrap_or_else(|| {
4764                    panic!("A cluster used in the graph was not instantiated: {}", from)
4765                })
4766                .clone();
4767            let to_node = clusters
4768                .get(to)
4769                .unwrap_or_else(|| {
4770                    panic!("A cluster used in the graph was not instantiated: {}", to)
4771                })
4772                .clone();
4773
4774            let sink_port = from_node.next_port();
4775            let source_port = to_node.next_port();
4776
4777            (
4778                D::m2m_sink_source(
4779                    env,
4780                    &from_node,
4781                    &sink_port,
4782                    &to_node,
4783                    &source_port,
4784                    name,
4785                    networking_info,
4786                ),
4787                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4788            )
4789        }
4790        (LocationId::Tick(_, _), _) => panic!(),
4791        (_, LocationId::Tick(_, _)) => panic!(),
4792        (LocationId::Atomic(_), _) => panic!(),
4793        (_, LocationId::Atomic(_)) => panic!(),
4794    };
4795    (sink, source, connect_fn)
4796}
4797
4798#[cfg(test)]
4799mod serde_test;
4800
4801#[cfg(test)]
4802mod test {
4803    use std::mem::size_of;
4804
4805    use stageleft::{QuotedWithContext, q};
4806
4807    use super::*;
4808
4809    #[test]
4810    #[cfg_attr(
4811        not(feature = "build"),
4812        ignore = "expects inclusion of feature-gated fields"
4813    )]
4814    fn hydro_node_size() {
4815        assert_eq!(size_of::<HydroNode>(), 248);
4816    }
4817
4818    #[test]
4819    #[cfg_attr(
4820        not(feature = "build"),
4821        ignore = "expects inclusion of feature-gated fields"
4822    )]
4823    fn hydro_root_size() {
4824        assert_eq!(size_of::<HydroRoot>(), 136);
4825    }
4826
4827    #[test]
4828    fn test_simplify_q_macro_basic() {
4829        // Test basic non-q! expression
4830        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4831        let result = simplify_q_macro(simple_expr.clone());
4832        assert_eq!(result, simple_expr);
4833    }
4834
4835    #[test]
4836    fn test_simplify_q_macro_actual_stageleft_call() {
4837        // Test a simplified version of what a real stageleft call might look like
4838        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4839        let result = simplify_q_macro(stageleft_call);
4840        // This should be processed by our visitor and simplified to q!(...)
4841        // since we detect the stageleft::runtime_support::fn_* pattern
4842        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4843    }
4844
4845    #[test]
4846    fn test_closure_no_pipe_at_start() {
4847        // Test a closure that does not start with a pipe
4848        let stageleft_call = q!({
4849            let foo = 123;
4850            move |b: usize| b + foo
4851        })
4852        .splice_fn1_ctx(&());
4853        let result = simplify_q_macro(stageleft_call);
4854        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4855    }
4856}