Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28#[cfg(feature = "build")]
29use crate::compile::builder::StmtId;
30use crate::compile::builder::{CycleId, ExternalPortId};
31#[cfg(feature = "build")]
32use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35
36pub mod backtrace;
37use backtrace::Backtrace;
38
39/// A closure expression bundled with any singleton references it captures.
40///
41/// When a `q!()` closure captures a `SingletonRef`, the reference is recorded here
42/// alongside the closure's expression. This allows per-closure tracking of singleton
43/// captures, which is important for nodes with multiple closures (e.g. Fold has `init` and `acc`).
44pub struct ClosureExpr {
45    pub expr: DebugExpr,
46    pub singleton_refs: Vec<(syn::Ident, HydroNode)>,
47}
48
49impl Clone for ClosureExpr {
50    fn clone(&self) -> Self {
51        Self {
52            expr: self.expr.clone(),
53            singleton_refs: self
54                .singleton_refs
55                .iter()
56                .map(|(ident, node)| {
57                    let cloned_node = match node {
58                        HydroNode::Singleton { inner, metadata } => HydroNode::Singleton {
59                            inner: SharedNode(inner.0.clone()),
60                            metadata: metadata.clone(),
61                        },
62                        _ => panic!("singleton_refs should only contain HydroNode::Singleton"),
63                    };
64                    (ident.clone(), cloned_node)
65                })
66                .collect(),
67        }
68    }
69}
70
71impl Hash for ClosureExpr {
72    fn hash<H: Hasher>(&self, state: &mut H) {
73        self.expr.hash(state);
74        // singleton_refs are structural children (like HydroIrMetadata), not
75        // identity-defining. Two closures with the same expr but different
76        // captured refs are the same closure text — the refs only affect codegen.
77    }
78}
79
80impl serde::Serialize for ClosureExpr {
81    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
82        use serde::ser::SerializeStruct;
83        let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
84        s.serialize_field("expr", &self.expr)?;
85        s.serialize_field(
86            "singleton_refs",
87            &SerializableSingletonRefs(&self.singleton_refs),
88        )?;
89        s.end()
90    }
91}
92
93struct SerializableSingletonRefs<'a>(&'a [(syn::Ident, HydroNode)]);
94
95impl serde::Serialize for SerializableSingletonRefs<'_> {
96    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
97        use serde::ser::SerializeSeq;
98        let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
99        for (ident, node) in self.0 {
100            seq.serialize_element(&(ident.to_string(), node))?;
101        }
102        seq.end()
103    }
104}
105
106impl Debug for ClosureExpr {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        Debug::fmt(&self.expr, f)
109    }
110}
111
112impl Display for ClosureExpr {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        Display::fmt(&self.expr, f)
115    }
116}
117
118impl From<syn::Expr> for ClosureExpr {
119    fn from(expr: syn::Expr) -> Self {
120        Self {
121            expr: DebugExpr(Box::new(expr)),
122            singleton_refs: Vec::new(),
123        }
124    }
125}
126
127impl From<DebugExpr> for ClosureExpr {
128    fn from(expr: DebugExpr) -> Self {
129        Self {
130            expr,
131            singleton_refs: Vec::new(),
132        }
133    }
134}
135
136impl ClosureExpr {
137    pub fn new(expr: DebugExpr, singleton_refs: Vec<(syn::Ident, HydroNode)>) -> Self {
138        Self {
139            expr,
140            singleton_refs,
141        }
142    }
143
144    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
145        Self {
146            expr: self.expr.clone(),
147            singleton_refs: self
148                .singleton_refs
149                .iter()
150                .map(|(ident, node)| (ident.clone(), node.deep_clone(seen_tees)))
151                .collect(),
152        }
153    }
154
155    pub fn transform_children(
156        &mut self,
157        transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
158        seen_tees: &mut SeenSharedNodes,
159    ) {
160        for (_ident, ref_node) in self.singleton_refs.iter_mut() {
161            transform(ref_node, seen_tees);
162        }
163    }
164
165    /// Pop singleton ref idents from the stack and rewrite the closure's token stream,
166    /// replacing local singleton ref idents with `#dfir_ident` references.
167    #[cfg(feature = "build")]
168    pub fn emit_tokens(&self, ident_stack: &mut Vec<syn::Ident>) -> TokenStream {
169        if self.singleton_refs.is_empty() {
170            self.expr.0.to_token_stream()
171        } else {
172            let ref_idents = (0..self.singleton_refs.len())
173                .map(|_| ident_stack.pop().unwrap())
174                .collect::<Vec<_>>()
175                .into_iter()
176                .rev()
177                .collect::<Vec<_>>();
178            let local_idents = self
179                .singleton_refs
180                .iter()
181                .map(|(local_ident, _)| local_ident);
182            let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
183            let expr = &self.expr.0;
184            quote! {
185                {
186                    #(
187                        let #local_idents = #hash #ref_idents;
188                    )*
189                    #expr
190                }
191            }
192        }
193    }
194}
195
196/// Wrapper that displays only the tokens of a parsed expr.
197///
198/// Boxes `syn::Type` which is ~240 bytes.
199#[derive(Clone, Hash)]
200pub struct DebugExpr(pub Box<syn::Expr>);
201
202impl serde::Serialize for DebugExpr {
203    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
204        serializer.serialize_str(&self.to_string())
205    }
206}
207
208impl From<syn::Expr> for DebugExpr {
209    fn from(expr: syn::Expr) -> Self {
210        Self(Box::new(expr))
211    }
212}
213
214impl Deref for DebugExpr {
215    type Target = syn::Expr;
216
217    fn deref(&self) -> &Self::Target {
218        &self.0
219    }
220}
221
222impl ToTokens for DebugExpr {
223    fn to_tokens(&self, tokens: &mut TokenStream) {
224        self.0.to_tokens(tokens);
225    }
226}
227
228impl Debug for DebugExpr {
229    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230        write!(f, "{}", self.0.to_token_stream())
231    }
232}
233
234impl Display for DebugExpr {
235    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236        let original = self.0.as_ref().clone();
237        let simplified = simplify_q_macro(original);
238
239        // For now, just use quote formatting without trying to parse as a statement
240        // This avoids the syn::parse_quote! issues entirely
241        write!(f, "q!({})", quote::quote!(#simplified))
242    }
243}
244
245/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
246fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
247    // Try to parse the token string as a syn::Expr
248    // Use a visitor to simplify q! macro expansions
249    let mut simplifier = QMacroSimplifier::new();
250    simplifier.visit_expr_mut(&mut expr);
251
252    // If we found and simplified a q! macro, return the simplified version
253    if let Some(simplified) = simplifier.simplified_result {
254        simplified
255    } else {
256        expr
257    }
258}
259
260/// AST visitor that simplifies q! macro expansions
261#[derive(Default)]
262pub struct QMacroSimplifier {
263    pub simplified_result: Option<syn::Expr>,
264}
265
266impl QMacroSimplifier {
267    pub fn new() -> Self {
268        Self::default()
269    }
270}
271
272impl VisitMut for QMacroSimplifier {
273    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
274        // Check if we already found a result to avoid further processing
275        if self.simplified_result.is_some() {
276            return;
277        }
278
279        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
280            // Look for calls to stageleft::runtime_support::fn*
281            && self.is_stageleft_runtime_support_call(&path_expr.path)
282            // Try to extract the closure from the arguments
283            && let Some(closure) = self.extract_closure_from_args(&call.args)
284        {
285            self.simplified_result = Some(closure);
286            return;
287        }
288
289        // Continue visiting child expressions using the default implementation
290        // Use the default visitor to avoid infinite recursion
291        syn::visit_mut::visit_expr_mut(self, expr);
292    }
293}
294
295impl QMacroSimplifier {
296    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
297        // Check if this is a call to stageleft::runtime_support::fn*
298        if let Some(last_segment) = path.segments.last() {
299            let fn_name = last_segment.ident.to_string();
300            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
301            fn_name.contains("_type_hint")
302                && path.segments.len() > 2
303                && path.segments[0].ident == "stageleft"
304                && path.segments[1].ident == "runtime_support"
305        } else {
306            false
307        }
308    }
309
310    fn extract_closure_from_args(
311        &self,
312        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
313    ) -> Option<syn::Expr> {
314        // Look through the arguments for a closure expression
315        for arg in args {
316            if let syn::Expr::Closure(_) = arg {
317                return Some(arg.clone());
318            }
319            // Also check for closures nested in other expressions (like blocks)
320            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
321                return Some(closure_expr);
322            }
323        }
324        None
325    }
326
327    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
328        let mut visitor = ClosureFinder {
329            found_closure: None,
330            prefer_inner_blocks: true,
331        };
332        visitor.visit_expr(expr);
333        visitor.found_closure
334    }
335}
336
337/// Visitor that finds closures in expressions with special block handling
338struct ClosureFinder {
339    found_closure: Option<syn::Expr>,
340    prefer_inner_blocks: bool,
341}
342
343impl<'ast> Visit<'ast> for ClosureFinder {
344    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
345        // If we already found a closure, don't continue searching
346        if self.found_closure.is_some() {
347            return;
348        }
349
350        match expr {
351            syn::Expr::Closure(_) => {
352                self.found_closure = Some(expr.clone());
353            }
354            syn::Expr::Block(block) if self.prefer_inner_blocks => {
355                // Special handling for blocks - look for inner blocks that contain closures
356                for stmt in &block.block.stmts {
357                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
358                        && let syn::Expr::Block(_) = stmt_expr
359                    {
360                        // Check if this nested block contains a closure
361                        let mut inner_visitor = ClosureFinder {
362                            found_closure: None,
363                            prefer_inner_blocks: false, // Avoid infinite recursion
364                        };
365                        inner_visitor.visit_expr(stmt_expr);
366                        if inner_visitor.found_closure.is_some() {
367                            // Found a closure in an inner block, return that block
368                            self.found_closure = Some(stmt_expr.clone());
369                            return;
370                        }
371                    }
372                }
373
374                // If no inner block with closure found, continue with normal visitation
375                visit::visit_expr(self, expr);
376
377                // If we found a closure, just return the closure itself, not the whole block
378                // unless we're in the special case where we want the containing block
379                if self.found_closure.is_some() {
380                    // The closure was found during visitation, no need to wrap in block
381                }
382            }
383            _ => {
384                // Use default visitor behavior for all other expressions
385                visit::visit_expr(self, expr);
386            }
387        }
388    }
389}
390
391/// Debug displays the type's tokens.
392///
393/// Boxes `syn::Type` which is ~320 bytes.
394#[derive(Clone, PartialEq, Eq, Hash)]
395pub struct DebugType(pub Box<syn::Type>);
396
397impl From<syn::Type> for DebugType {
398    fn from(t: syn::Type) -> Self {
399        Self(Box::new(t))
400    }
401}
402
403impl Deref for DebugType {
404    type Target = syn::Type;
405
406    fn deref(&self) -> &Self::Target {
407        &self.0
408    }
409}
410
411impl ToTokens for DebugType {
412    fn to_tokens(&self, tokens: &mut TokenStream) {
413        self.0.to_tokens(tokens);
414    }
415}
416
417impl Debug for DebugType {
418    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
419        write!(f, "{}", self.0.to_token_stream())
420    }
421}
422
423impl serde::Serialize for DebugType {
424    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
425        serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
426    }
427}
428
429fn serialize_backtrace_as_span<S: serde::Serializer>(
430    backtrace: &Backtrace,
431    serializer: S,
432) -> Result<S::Ok, S::Error> {
433    match backtrace.format_span() {
434        Some(span) => serializer.serialize_some(&span),
435        None => serializer.serialize_none(),
436    }
437}
438
439fn serialize_ident<S: serde::Serializer>(
440    ident: &syn::Ident,
441    serializer: S,
442) -> Result<S::Ok, S::Error> {
443    serializer.serialize_str(&ident.to_string())
444}
445
446pub enum DebugInstantiate {
447    Building,
448    Finalized(Box<DebugInstantiateFinalized>),
449}
450
451impl serde::Serialize for DebugInstantiate {
452    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
453        match self {
454            DebugInstantiate::Building => {
455                serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
456            }
457            DebugInstantiate::Finalized(_) => {
458                panic!(
459                    "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
460                )
461            }
462        }
463    }
464}
465
466#[cfg_attr(
467    not(feature = "build"),
468    expect(
469        dead_code,
470        reason = "sink, source unused without `feature = \"build\"`."
471    )
472)]
473pub struct DebugInstantiateFinalized {
474    sink: syn::Expr,
475    source: syn::Expr,
476    connect_fn: Option<Box<dyn FnOnce()>>,
477}
478
479impl From<DebugInstantiateFinalized> for DebugInstantiate {
480    fn from(f: DebugInstantiateFinalized) -> Self {
481        Self::Finalized(Box::new(f))
482    }
483}
484
485impl Debug for DebugInstantiate {
486    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
487        write!(f, "<network instantiate>")
488    }
489}
490
491impl Hash for DebugInstantiate {
492    fn hash<H: Hasher>(&self, _state: &mut H) {
493        // Do nothing
494    }
495}
496
497impl Clone for DebugInstantiate {
498    fn clone(&self) -> Self {
499        match self {
500            DebugInstantiate::Building => DebugInstantiate::Building,
501            DebugInstantiate::Finalized(_) => {
502                panic!("DebugInstantiate::Finalized should not be cloned")
503            }
504        }
505    }
506}
507
508/// Tracks the instantiation state of a `ClusterMembers` source.
509///
510/// During `compile_network`, the first `ClusterMembers` node for a given
511/// `(at_location, target_cluster)` pair is promoted to [`Self::Stream`] and
512/// receives the expression returned by `Deploy::cluster_membership_stream`.
513/// All subsequent nodes for the same pair are set to [`Self::Tee`] so that
514/// during code-gen they simply reference the tee output of the first node
515/// instead of creating a redundant `source_stream`.
516#[derive(Debug, Hash, Clone, serde::Serialize)]
517pub enum ClusterMembersState {
518    /// Not yet instantiated.
519    Uninit,
520    /// The primary instance: holds the stream expression and will emit
521    /// `source_stream(expr) -> tee()` during code-gen.
522    Stream(DebugExpr),
523    /// A secondary instance that references the tee output of the primary.
524    /// Stores `(at_location_root, target_cluster_location)` so that `emit_core`
525    /// can derive the deterministic tee ident without extra state.
526    Tee(LocationId, LocationId),
527}
528
529/// A source in a Hydro graph, where data enters the graph.
530#[derive(Debug, Hash, Clone, serde::Serialize)]
531pub enum HydroSource {
532    Stream(DebugExpr),
533    ExternalNetwork(),
534    Iter(DebugExpr),
535    Spin(),
536    ClusterMembers(LocationId, ClusterMembersState),
537    Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
538    EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
539}
540
541#[cfg(feature = "build")]
542/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
543/// and simulations.
544///
545/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
546/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
547pub trait DfirBuilder {
548    /// Whether the representation of singletons should include intermediate states.
549    fn singleton_intermediates(&self) -> bool;
550
551    /// Gets the DFIR builder for the given location, creating it if necessary.
552    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
553
554    #[expect(clippy::too_many_arguments, reason = "TODO")]
555    fn batch(
556        &mut self,
557        in_ident: syn::Ident,
558        in_location: &LocationId,
559        in_kind: &CollectionKind,
560        out_ident: &syn::Ident,
561        out_location: &LocationId,
562        op_meta: &HydroIrOpMetadata,
563        fold_hooked_idents: &HashSet<String>,
564    );
565    fn yield_from_tick(
566        &mut self,
567        in_ident: syn::Ident,
568        in_location: &LocationId,
569        in_kind: &CollectionKind,
570        out_ident: &syn::Ident,
571        out_location: &LocationId,
572    );
573
574    fn begin_atomic(
575        &mut self,
576        in_ident: syn::Ident,
577        in_location: &LocationId,
578        in_kind: &CollectionKind,
579        out_ident: &syn::Ident,
580        out_location: &LocationId,
581        op_meta: &HydroIrOpMetadata,
582    );
583    fn end_atomic(
584        &mut self,
585        in_ident: syn::Ident,
586        in_location: &LocationId,
587        in_kind: &CollectionKind,
588        out_ident: &syn::Ident,
589    );
590
591    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
592    fn observe_nondet(
593        &mut self,
594        trusted: bool,
595        location: &LocationId,
596        in_ident: syn::Ident,
597        in_kind: &CollectionKind,
598        out_ident: &syn::Ident,
599        out_kind: &CollectionKind,
600        op_meta: &HydroIrOpMetadata,
601    );
602
603    #[expect(clippy::too_many_arguments, reason = "TODO")]
604    fn merge_ordered(
605        &mut self,
606        location: &LocationId,
607        first_ident: syn::Ident,
608        second_ident: syn::Ident,
609        out_ident: &syn::Ident,
610        in_kind: &CollectionKind,
611        op_meta: &HydroIrOpMetadata,
612        operator_tag: Option<&str>,
613    );
614
615    #[expect(clippy::too_many_arguments, reason = "TODO")]
616    fn create_network(
617        &mut self,
618        from: &LocationId,
619        to: &LocationId,
620        input_ident: syn::Ident,
621        out_ident: &syn::Ident,
622        serialize: Option<&DebugExpr>,
623        sink: syn::Expr,
624        source: syn::Expr,
625        deserialize: Option<&DebugExpr>,
626        tag_id: StmtId,
627        networking_info: &crate::networking::NetworkingInfo,
628    );
629
630    fn create_external_source(
631        &mut self,
632        on: &LocationId,
633        source_expr: syn::Expr,
634        out_ident: &syn::Ident,
635        deserialize: Option<&DebugExpr>,
636        tag_id: StmtId,
637    );
638
639    fn create_external_output(
640        &mut self,
641        on: &LocationId,
642        sink_expr: syn::Expr,
643        input_ident: &syn::Ident,
644        serialize: Option<&DebugExpr>,
645        tag_id: StmtId,
646    );
647
648    /// Optionally emit a fold hook that buffers and permutes inputs before the fold.
649    /// Returns the new input ident to use for the fold if a hook was emitted.
650    fn emit_fold_hook(
651        &mut self,
652        location: &LocationId,
653        in_ident: &syn::Ident,
654        in_kind: &CollectionKind,
655        op_meta: &HydroIrOpMetadata,
656    ) -> Option<syn::Ident>;
657
658    /// Inserts necessary code to validate a manual assertion that at this point the
659    /// input live collection is consistent. In production, this is a no-op, but in simulation
660    /// this will (not yet implemented) inject assertions that validate consistency.
661    fn assert_is_consistent(
662        &mut self,
663        trusted: bool,
664        location: &LocationId,
665        in_ident: syn::Ident,
666        out_ident: &syn::Ident,
667    );
668}
669
670#[cfg(feature = "build")]
671impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
672    fn singleton_intermediates(&self) -> bool {
673        false
674    }
675
676    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
677        self.entry(location.root().key())
678            .expect("location was removed")
679            .or_default()
680    }
681
682    fn batch(
683        &mut self,
684        in_ident: syn::Ident,
685        in_location: &LocationId,
686        in_kind: &CollectionKind,
687        out_ident: &syn::Ident,
688        _out_location: &LocationId,
689        _op_meta: &HydroIrOpMetadata,
690        _fold_hooked_idents: &HashSet<String>,
691    ) {
692        let builder = self.get_dfir_mut(in_location.root());
693        if in_kind.is_bounded()
694            && matches!(
695                in_kind,
696                CollectionKind::Singleton { .. }
697                    | CollectionKind::Optional { .. }
698                    | CollectionKind::KeyedSingleton { .. }
699            )
700        {
701            assert!(in_location.is_top_level());
702            builder.add_dfir(
703                parse_quote! {
704                    #out_ident = #in_ident -> persist::<'static>();
705                },
706                None,
707                None,
708            );
709        } else {
710            builder.add_dfir(
711                parse_quote! {
712                    #out_ident = #in_ident;
713                },
714                None,
715                None,
716            );
717        }
718    }
719
720    fn yield_from_tick(
721        &mut self,
722        in_ident: syn::Ident,
723        in_location: &LocationId,
724        _in_kind: &CollectionKind,
725        out_ident: &syn::Ident,
726        _out_location: &LocationId,
727    ) {
728        let builder = self.get_dfir_mut(in_location.root());
729        builder.add_dfir(
730            parse_quote! {
731                #out_ident = #in_ident;
732            },
733            None,
734            None,
735        );
736    }
737
738    fn begin_atomic(
739        &mut self,
740        in_ident: syn::Ident,
741        in_location: &LocationId,
742        _in_kind: &CollectionKind,
743        out_ident: &syn::Ident,
744        _out_location: &LocationId,
745        _op_meta: &HydroIrOpMetadata,
746    ) {
747        let builder = self.get_dfir_mut(in_location.root());
748        builder.add_dfir(
749            parse_quote! {
750                #out_ident = #in_ident;
751            },
752            None,
753            None,
754        );
755    }
756
757    fn end_atomic(
758        &mut self,
759        in_ident: syn::Ident,
760        in_location: &LocationId,
761        _in_kind: &CollectionKind,
762        out_ident: &syn::Ident,
763    ) {
764        let builder = self.get_dfir_mut(in_location.root());
765        builder.add_dfir(
766            parse_quote! {
767                #out_ident = #in_ident;
768            },
769            None,
770            None,
771        );
772    }
773
774    fn observe_nondet(
775        &mut self,
776        _trusted: bool,
777        location: &LocationId,
778        in_ident: syn::Ident,
779        _in_kind: &CollectionKind,
780        out_ident: &syn::Ident,
781        _out_kind: &CollectionKind,
782        _op_meta: &HydroIrOpMetadata,
783    ) {
784        let builder = self.get_dfir_mut(location);
785        builder.add_dfir(
786            parse_quote! {
787                #out_ident = #in_ident;
788            },
789            None,
790            None,
791        );
792    }
793
794    fn merge_ordered(
795        &mut self,
796        location: &LocationId,
797        first_ident: syn::Ident,
798        second_ident: syn::Ident,
799        out_ident: &syn::Ident,
800        _in_kind: &CollectionKind,
801        _op_meta: &HydroIrOpMetadata,
802        operator_tag: Option<&str>,
803    ) {
804        let builder = self.get_dfir_mut(location);
805        builder.add_dfir(
806            parse_quote! {
807                #out_ident = union();
808                #first_ident -> [0]#out_ident;
809                #second_ident -> [1]#out_ident;
810            },
811            None,
812            operator_tag,
813        );
814    }
815
816    fn create_network(
817        &mut self,
818        from: &LocationId,
819        to: &LocationId,
820        input_ident: syn::Ident,
821        out_ident: &syn::Ident,
822        serialize: Option<&DebugExpr>,
823        sink: syn::Expr,
824        source: syn::Expr,
825        deserialize: Option<&DebugExpr>,
826        tag_id: StmtId,
827        _networking_info: &crate::networking::NetworkingInfo,
828    ) {
829        let sender_builder = self.get_dfir_mut(from);
830        if let Some(serialize_pipeline) = serialize {
831            sender_builder.add_dfir(
832                parse_quote! {
833                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
834                },
835                None,
836                // operator tag separates send and receive, which otherwise have the same next_stmt_id
837                Some(&format!("send{}", tag_id)),
838            );
839        } else {
840            sender_builder.add_dfir(
841                parse_quote! {
842                    #input_ident -> dest_sink(#sink);
843                },
844                None,
845                Some(&format!("send{}", tag_id)),
846            );
847        }
848
849        let receiver_builder = self.get_dfir_mut(to);
850        if let Some(deserialize_pipeline) = deserialize {
851            receiver_builder.add_dfir(
852                parse_quote! {
853                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
854                },
855                None,
856                Some(&format!("recv{}", tag_id)),
857            );
858        } else {
859            receiver_builder.add_dfir(
860                parse_quote! {
861                    #out_ident = source_stream(#source);
862                },
863                None,
864                Some(&format!("recv{}", tag_id)),
865            );
866        }
867    }
868
869    fn create_external_source(
870        &mut self,
871        on: &LocationId,
872        source_expr: syn::Expr,
873        out_ident: &syn::Ident,
874        deserialize: Option<&DebugExpr>,
875        tag_id: StmtId,
876    ) {
877        let receiver_builder = self.get_dfir_mut(on);
878        if let Some(deserialize_pipeline) = deserialize {
879            receiver_builder.add_dfir(
880                parse_quote! {
881                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
882                },
883                None,
884                Some(&format!("recv{}", tag_id)),
885            );
886        } else {
887            receiver_builder.add_dfir(
888                parse_quote! {
889                    #out_ident = source_stream(#source_expr);
890                },
891                None,
892                Some(&format!("recv{}", tag_id)),
893            );
894        }
895    }
896
897    fn create_external_output(
898        &mut self,
899        on: &LocationId,
900        sink_expr: syn::Expr,
901        input_ident: &syn::Ident,
902        serialize: Option<&DebugExpr>,
903        tag_id: StmtId,
904    ) {
905        let sender_builder = self.get_dfir_mut(on);
906        if let Some(serialize_fn) = serialize {
907            sender_builder.add_dfir(
908                parse_quote! {
909                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
910                },
911                None,
912                // operator tag separates send and receive, which otherwise have the same next_stmt_id
913                Some(&format!("send{}", tag_id)),
914            );
915        } else {
916            sender_builder.add_dfir(
917                parse_quote! {
918                    #input_ident -> dest_sink(#sink_expr);
919                },
920                None,
921                Some(&format!("send{}", tag_id)),
922            );
923        }
924    }
925
926    fn emit_fold_hook(
927        &mut self,
928        _location: &LocationId,
929        _in_ident: &syn::Ident,
930        _in_kind: &CollectionKind,
931        _op_meta: &HydroIrOpMetadata,
932    ) -> Option<syn::Ident> {
933        None
934    }
935
936    fn assert_is_consistent(
937        &mut self,
938        _trusted: bool,
939        location: &LocationId,
940        in_ident: syn::Ident,
941        out_ident: &syn::Ident,
942    ) {
943        let builder = self.get_dfir_mut(location);
944        builder.add_dfir(
945            parse_quote! {
946                #out_ident = #in_ident;
947            },
948            None,
949            None,
950        );
951    }
952}
953
954#[cfg(feature = "build")]
955pub enum BuildersOrCallback<'a, L, N>
956where
957    L: FnMut(&mut HydroRoot, &mut StmtId),
958    N: FnMut(&mut HydroNode, &mut StmtId),
959{
960    Builders(&'a mut dyn DfirBuilder),
961    Callback(L, N),
962}
963
964/// An root in a Hydro graph, which is an pipeline that doesn't emit
965/// any downstream values. Traversals over the dataflow graph and
966/// generating DFIR IR start from roots.
967#[derive(Debug, Hash, serde::Serialize)]
968pub enum HydroRoot {
969    ForEach {
970        f: DebugExpr,
971        input: Box<HydroNode>,
972        op_metadata: HydroIrOpMetadata,
973    },
974    SendExternal {
975        to_external_key: LocationKey,
976        to_port_id: ExternalPortId,
977        to_many: bool,
978        unpaired: bool,
979        serialize_fn: Option<DebugExpr>,
980        instantiate_fn: DebugInstantiate,
981        input: Box<HydroNode>,
982        op_metadata: HydroIrOpMetadata,
983    },
984    DestSink {
985        sink: DebugExpr,
986        input: Box<HydroNode>,
987        op_metadata: HydroIrOpMetadata,
988    },
989    CycleSink {
990        cycle_id: CycleId,
991        input: Box<HydroNode>,
992        op_metadata: HydroIrOpMetadata,
993    },
994    EmbeddedOutput {
995        #[serde(serialize_with = "serialize_ident")]
996        ident: syn::Ident,
997        input: Box<HydroNode>,
998        op_metadata: HydroIrOpMetadata,
999    },
1000    Null {
1001        input: Box<HydroNode>,
1002        op_metadata: HydroIrOpMetadata,
1003    },
1004}
1005
1006impl HydroRoot {
1007    #[cfg(feature = "build")]
1008    #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
1009    pub fn compile_network<'a, D>(
1010        &mut self,
1011        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
1012        seen_tees: &mut SeenSharedNodes,
1013        seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
1014        processes: &SparseSecondaryMap<LocationKey, D::Process>,
1015        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
1016        externals: &SparseSecondaryMap<LocationKey, D::External>,
1017        env: &mut D::InstantiateEnv,
1018    ) where
1019        D: Deploy<'a>,
1020    {
1021        let refcell_extra_stmts = RefCell::new(extra_stmts);
1022        let refcell_env = RefCell::new(env);
1023        let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
1024        self.transform_bottom_up(
1025            &mut |l| {
1026                if let HydroRoot::SendExternal {
1027                    input,
1028                    to_external_key,
1029                    to_port_id,
1030                    to_many,
1031                    unpaired,
1032                    instantiate_fn,
1033                    ..
1034                } = l
1035                {
1036                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1037                        DebugInstantiate::Building => {
1038                            let to_node = externals
1039                                .get(*to_external_key)
1040                                .unwrap_or_else(|| {
1041                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
1042                                })
1043                                .clone();
1044
1045                            match input.metadata().location_id.root() {
1046                                &LocationId::Process(process_key) => {
1047                                    if *to_many {
1048                                        (
1049                                            (
1050                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
1051                                                parse_quote!(DUMMY),
1052                                            ),
1053                                            Box::new(|| {}) as Box<dyn FnOnce()>,
1054                                        )
1055                                    } else {
1056                                        let from_node = processes
1057                                            .get(process_key)
1058                                            .unwrap_or_else(|| {
1059                                                panic!("A process used in the graph was not instantiated: {}", process_key)
1060                                            })
1061                                            .clone();
1062
1063                                        let sink_port = from_node.next_port();
1064                                        let source_port = to_node.next_port();
1065
1066                                        if *unpaired {
1067                                            use stageleft::quote_type;
1068                                            use tokio_util::codec::LengthDelimitedCodec;
1069
1070                                            to_node.register(*to_port_id, source_port.clone());
1071
1072                                            let _ = D::e2o_source(
1073                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1074                                                &to_node, &source_port,
1075                                                &from_node, &sink_port,
1076                                                &quote_type::<LengthDelimitedCodec>(),
1077                                                format!("{}_{}", *to_external_key, *to_port_id)
1078                                            );
1079                                        }
1080
1081                                        (
1082                                            (
1083                                                D::o2e_sink(
1084                                                    &from_node,
1085                                                    &sink_port,
1086                                                    &to_node,
1087                                                    &source_port,
1088                                                    format!("{}_{}", *to_external_key, *to_port_id)
1089                                                ),
1090                                                parse_quote!(DUMMY),
1091                                            ),
1092                                            if *unpaired {
1093                                                D::e2o_connect(
1094                                                    &to_node,
1095                                                    &source_port,
1096                                                    &from_node,
1097                                                    &sink_port,
1098                                                    *to_many,
1099                                                    NetworkHint::Auto,
1100                                                )
1101                                            } else {
1102                                                Box::new(|| {}) as Box<dyn FnOnce()>
1103                                            },
1104                                        )
1105                                    }
1106                                }
1107                                LocationId::Cluster(cluster_key) => {
1108                                    let from_node = clusters
1109                                        .get(*cluster_key)
1110                                        .unwrap_or_else(|| {
1111                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1112                                        })
1113                                        .clone();
1114
1115                                    let sink_port = from_node.next_port();
1116                                    let source_port = to_node.next_port();
1117
1118                                    if *unpaired {
1119                                        to_node.register(*to_port_id, source_port.clone());
1120                                    }
1121
1122                                    (
1123                                        (
1124                                            D::m2e_sink(
1125                                                &from_node,
1126                                                &sink_port,
1127                                                &to_node,
1128                                                &source_port,
1129                                                format!("{}_{}", *to_external_key, *to_port_id)
1130                                            ),
1131                                            parse_quote!(DUMMY),
1132                                        ),
1133                                        Box::new(|| {}) as Box<dyn FnOnce()>,
1134                                    )
1135                                }
1136                                _ => panic!()
1137                            }
1138                        },
1139
1140                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1141                    };
1142
1143                    *instantiate_fn = DebugInstantiateFinalized {
1144                        sink: sink_expr,
1145                        source: source_expr,
1146                        connect_fn: Some(connect_fn),
1147                    }
1148                    .into();
1149                } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1150                    let element_type = match &input.metadata().collection_kind {
1151                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1152                        _ => panic!("Embedded output must have Stream collection kind"),
1153                    };
1154                    let location_key = match input.metadata().location_id.root() {
1155                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1156                        _ => panic!("Embedded output must be on a process or cluster"),
1157                    };
1158                    D::register_embedded_output(
1159                        &mut refcell_env.borrow_mut(),
1160                        location_key,
1161                        ident,
1162                        &element_type,
1163                    );
1164                }
1165            },
1166            &mut |n| {
1167                if let HydroNode::Network {
1168                    name,
1169                    networking_info,
1170                    input,
1171                    instantiate_fn,
1172                    metadata,
1173                    ..
1174                } = n
1175                {
1176                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1177                        DebugInstantiate::Building => instantiate_network::<D>(
1178                            &mut refcell_env.borrow_mut(),
1179                            input.metadata().location_id.root(),
1180                            metadata.location_id.root(),
1181                            processes,
1182                            clusters,
1183                            name.as_deref(),
1184                            networking_info,
1185                        ),
1186
1187                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1188                    };
1189
1190                    *instantiate_fn = DebugInstantiateFinalized {
1191                        sink: sink_expr,
1192                        source: source_expr,
1193                        connect_fn: Some(connect_fn),
1194                    }
1195                    .into();
1196                } else if let HydroNode::ExternalInput {
1197                    from_external_key,
1198                    from_port_id,
1199                    from_many,
1200                    codec_type,
1201                    port_hint,
1202                    instantiate_fn,
1203                    metadata,
1204                    ..
1205                } = n
1206                {
1207                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1208                        DebugInstantiate::Building => {
1209                            let from_node = externals
1210                                .get(*from_external_key)
1211                                .unwrap_or_else(|| {
1212                                    panic!(
1213                                        "A external used in the graph was not instantiated: {}",
1214                                        from_external_key,
1215                                    )
1216                                })
1217                                .clone();
1218
1219                            match metadata.location_id.root() {
1220                                &LocationId::Process(process_key) => {
1221                                    let to_node = processes
1222                                        .get(process_key)
1223                                        .unwrap_or_else(|| {
1224                                            panic!("A process used in the graph was not instantiated: {}", process_key)
1225                                        })
1226                                        .clone();
1227
1228                                    let sink_port = from_node.next_port();
1229                                    let source_port = to_node.next_port();
1230
1231                                    from_node.register(*from_port_id, sink_port.clone());
1232
1233                                    (
1234                                        (
1235                                            parse_quote!(DUMMY),
1236                                            if *from_many {
1237                                                D::e2o_many_source(
1238                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1239                                                    &to_node, &source_port,
1240                                                    codec_type.0.as_ref(),
1241                                                    format!("{}_{}", *from_external_key, *from_port_id)
1242                                                )
1243                                            } else {
1244                                                D::e2o_source(
1245                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1246                                                    &from_node, &sink_port,
1247                                                    &to_node, &source_port,
1248                                                    codec_type.0.as_ref(),
1249                                                    format!("{}_{}", *from_external_key, *from_port_id)
1250                                                )
1251                                            },
1252                                        ),
1253                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1254                                    )
1255                                }
1256                                LocationId::Cluster(cluster_key) => {
1257                                    let to_node = clusters
1258                                        .get(*cluster_key)
1259                                        .unwrap_or_else(|| {
1260                                            panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1261                                        })
1262                                        .clone();
1263
1264                                    let sink_port = from_node.next_port();
1265                                    let source_port = to_node.next_port();
1266
1267                                    from_node.register(*from_port_id, sink_port.clone());
1268
1269                                    (
1270                                        (
1271                                            parse_quote!(DUMMY),
1272                                            D::e2m_source(
1273                                                refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1274                                                &from_node, &sink_port,
1275                                                &to_node, &source_port,
1276                                                codec_type.0.as_ref(),
1277                                                format!("{}_{}", *from_external_key, *from_port_id)
1278                                            ),
1279                                        ),
1280                                        D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1281                                    )
1282                                }
1283                                _ => panic!()
1284                            }
1285                        },
1286
1287                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1288                    };
1289
1290                    *instantiate_fn = DebugInstantiateFinalized {
1291                        sink: sink_expr,
1292                        source: source_expr,
1293                        connect_fn: Some(connect_fn),
1294                    }
1295                    .into();
1296                } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1297                    let element_type = match &metadata.collection_kind {
1298                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1299                        _ => panic!("Embedded source must have Stream collection kind"),
1300                    };
1301                    let location_key = match metadata.location_id.root() {
1302                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1303                        _ => panic!("Embedded source must be on a process or cluster"),
1304                    };
1305                    D::register_embedded_stream_input(
1306                        &mut refcell_env.borrow_mut(),
1307                        location_key,
1308                        ident,
1309                        &element_type,
1310                    );
1311                } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1312                    let element_type = match &metadata.collection_kind {
1313                        CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1314                        _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1315                    };
1316                    let location_key = match metadata.location_id.root() {
1317                        LocationId::Process(key) | LocationId::Cluster(key) => *key,
1318                        _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1319                    };
1320                    D::register_embedded_singleton_input(
1321                        &mut refcell_env.borrow_mut(),
1322                        location_key,
1323                        ident,
1324                        &element_type,
1325                    );
1326                } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1327                    match state {
1328                        ClusterMembersState::Uninit => {
1329                            let at_location = metadata.location_id.root().clone();
1330                            let key = (at_location.clone(), location_id.key());
1331                            if refcell_seen_cluster_members.borrow_mut().insert(key) {
1332                                // First occurrence: call cluster_membership_stream and mark as Stream.
1333                                let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1334                                    D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1335                                    &(),
1336                                );
1337                                *state = ClusterMembersState::Stream(expr.into());
1338                            } else {
1339                                // Already instantiated for this (at, target) pair: just tee.
1340                                *state = ClusterMembersState::Tee(at_location, location_id.clone());
1341                            }
1342                        }
1343                        ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1344                            panic!("cluster members already finalized");
1345                        }
1346                    }
1347                }
1348            },
1349            seen_tees,
1350            false,
1351        );
1352    }
1353
1354    pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1355        self.transform_bottom_up(
1356            &mut |l| {
1357                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1358                    match instantiate_fn {
1359                        DebugInstantiate::Building => panic!("network not built"),
1360
1361                        DebugInstantiate::Finalized(finalized) => {
1362                            (finalized.connect_fn.take().unwrap())();
1363                        }
1364                    }
1365                }
1366            },
1367            &mut |n| {
1368                if let HydroNode::Network { instantiate_fn, .. }
1369                | HydroNode::ExternalInput { instantiate_fn, .. } = n
1370                {
1371                    match instantiate_fn {
1372                        DebugInstantiate::Building => panic!("network not built"),
1373
1374                        DebugInstantiate::Finalized(finalized) => {
1375                            (finalized.connect_fn.take().unwrap())();
1376                        }
1377                    }
1378                }
1379            },
1380            seen_tees,
1381            false,
1382        );
1383    }
1384
1385    pub fn transform_bottom_up(
1386        &mut self,
1387        transform_root: &mut impl FnMut(&mut HydroRoot),
1388        transform_node: &mut impl FnMut(&mut HydroNode),
1389        seen_tees: &mut SeenSharedNodes,
1390        check_well_formed: bool,
1391    ) {
1392        self.transform_children(
1393            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1394            seen_tees,
1395        );
1396
1397        transform_root(self);
1398    }
1399
1400    pub fn transform_children(
1401        &mut self,
1402        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1403        seen_tees: &mut SeenSharedNodes,
1404    ) {
1405        match self {
1406            HydroRoot::ForEach { input, .. }
1407            | HydroRoot::SendExternal { input, .. }
1408            | HydroRoot::DestSink { input, .. }
1409            | HydroRoot::CycleSink { input, .. }
1410            | HydroRoot::EmbeddedOutput { input, .. }
1411            | HydroRoot::Null { input, .. } => {
1412                transform(input, seen_tees);
1413            }
1414        }
1415    }
1416
1417    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1418        match self {
1419            HydroRoot::ForEach {
1420                f,
1421                input,
1422                op_metadata,
1423            } => HydroRoot::ForEach {
1424                f: f.clone(),
1425                input: Box::new(input.deep_clone(seen_tees)),
1426                op_metadata: op_metadata.clone(),
1427            },
1428            HydroRoot::SendExternal {
1429                to_external_key,
1430                to_port_id,
1431                to_many,
1432                unpaired,
1433                serialize_fn,
1434                instantiate_fn,
1435                input,
1436                op_metadata,
1437            } => HydroRoot::SendExternal {
1438                to_external_key: *to_external_key,
1439                to_port_id: *to_port_id,
1440                to_many: *to_many,
1441                unpaired: *unpaired,
1442                serialize_fn: serialize_fn.clone(),
1443                instantiate_fn: instantiate_fn.clone(),
1444                input: Box::new(input.deep_clone(seen_tees)),
1445                op_metadata: op_metadata.clone(),
1446            },
1447            HydroRoot::DestSink {
1448                sink,
1449                input,
1450                op_metadata,
1451            } => HydroRoot::DestSink {
1452                sink: sink.clone(),
1453                input: Box::new(input.deep_clone(seen_tees)),
1454                op_metadata: op_metadata.clone(),
1455            },
1456            HydroRoot::CycleSink {
1457                cycle_id,
1458                input,
1459                op_metadata,
1460            } => HydroRoot::CycleSink {
1461                cycle_id: *cycle_id,
1462                input: Box::new(input.deep_clone(seen_tees)),
1463                op_metadata: op_metadata.clone(),
1464            },
1465            HydroRoot::EmbeddedOutput {
1466                ident,
1467                input,
1468                op_metadata,
1469            } => HydroRoot::EmbeddedOutput {
1470                ident: ident.clone(),
1471                input: Box::new(input.deep_clone(seen_tees)),
1472                op_metadata: op_metadata.clone(),
1473            },
1474            HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1475                input: Box::new(input.deep_clone(seen_tees)),
1476                op_metadata: op_metadata.clone(),
1477            },
1478        }
1479    }
1480
1481    #[cfg(feature = "build")]
1482    pub fn emit(
1483        &mut self,
1484        graph_builders: &mut dyn DfirBuilder,
1485        seen_tees: &mut SeenSharedNodes,
1486        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1487        next_stmt_id: &mut StmtId,
1488        fold_hooked_idents: &mut HashSet<String>,
1489    ) {
1490        self.emit_core(
1491            &mut BuildersOrCallback::<
1492                fn(&mut HydroRoot, &mut StmtId),
1493                fn(&mut HydroNode, &mut StmtId),
1494            >::Builders(graph_builders),
1495            seen_tees,
1496            built_tees,
1497            next_stmt_id,
1498            fold_hooked_idents,
1499        );
1500    }
1501
1502    #[cfg(feature = "build")]
1503    pub fn emit_core(
1504        &mut self,
1505        builders_or_callback: &mut BuildersOrCallback<
1506            impl FnMut(&mut HydroRoot, &mut StmtId),
1507            impl FnMut(&mut HydroNode, &mut StmtId),
1508        >,
1509        seen_tees: &mut SeenSharedNodes,
1510        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1511        next_stmt_id: &mut StmtId,
1512        fold_hooked_idents: &mut HashSet<String>,
1513    ) {
1514        match self {
1515            HydroRoot::ForEach { f, input, .. } => {
1516                let input_ident = input.emit_core(
1517                    builders_or_callback,
1518                    seen_tees,
1519                    built_tees,
1520                    next_stmt_id,
1521                    fold_hooked_idents,
1522                );
1523
1524                match builders_or_callback {
1525                    BuildersOrCallback::Builders(graph_builders) => {
1526                        graph_builders
1527                            .get_dfir_mut(&input.metadata().location_id)
1528                            .add_dfir(
1529                                parse_quote! {
1530                                    #input_ident -> for_each(#f);
1531                                },
1532                                None,
1533                                Some(&next_stmt_id.to_string()),
1534                            );
1535                    }
1536                    BuildersOrCallback::Callback(leaf_callback, _) => {
1537                        leaf_callback(self, next_stmt_id);
1538                    }
1539                }
1540
1541                let _ = next_stmt_id.get_and_increment();
1542            }
1543
1544            HydroRoot::SendExternal {
1545                serialize_fn,
1546                instantiate_fn,
1547                input,
1548                ..
1549            } => {
1550                let input_ident = input.emit_core(
1551                    builders_or_callback,
1552                    seen_tees,
1553                    built_tees,
1554                    next_stmt_id,
1555                    fold_hooked_idents,
1556                );
1557
1558                match builders_or_callback {
1559                    BuildersOrCallback::Builders(graph_builders) => {
1560                        let (sink_expr, _) = match instantiate_fn {
1561                            DebugInstantiate::Building => (
1562                                syn::parse_quote!(DUMMY_SINK),
1563                                syn::parse_quote!(DUMMY_SOURCE),
1564                            ),
1565
1566                            DebugInstantiate::Finalized(finalized) => {
1567                                (finalized.sink.clone(), finalized.source.clone())
1568                            }
1569                        };
1570
1571                        graph_builders.create_external_output(
1572                            &input.metadata().location_id,
1573                            sink_expr,
1574                            &input_ident,
1575                            serialize_fn.as_ref(),
1576                            *next_stmt_id,
1577                        );
1578                    }
1579                    BuildersOrCallback::Callback(leaf_callback, _) => {
1580                        leaf_callback(self, next_stmt_id);
1581                    }
1582                }
1583
1584                let _ = next_stmt_id.get_and_increment();
1585            }
1586
1587            HydroRoot::DestSink { sink, input, .. } => {
1588                let input_ident = input.emit_core(
1589                    builders_or_callback,
1590                    seen_tees,
1591                    built_tees,
1592                    next_stmt_id,
1593                    fold_hooked_idents,
1594                );
1595
1596                match builders_or_callback {
1597                    BuildersOrCallback::Builders(graph_builders) => {
1598                        graph_builders
1599                            .get_dfir_mut(&input.metadata().location_id)
1600                            .add_dfir(
1601                                parse_quote! {
1602                                    #input_ident -> dest_sink(#sink);
1603                                },
1604                                None,
1605                                Some(&next_stmt_id.to_string()),
1606                            );
1607                    }
1608                    BuildersOrCallback::Callback(leaf_callback, _) => {
1609                        leaf_callback(self, next_stmt_id);
1610                    }
1611                }
1612
1613                let _ = next_stmt_id.get_and_increment();
1614            }
1615
1616            HydroRoot::CycleSink {
1617                cycle_id, input, ..
1618            } => {
1619                let input_ident = input.emit_core(
1620                    builders_or_callback,
1621                    seen_tees,
1622                    built_tees,
1623                    next_stmt_id,
1624                    fold_hooked_idents,
1625                );
1626
1627                match builders_or_callback {
1628                    BuildersOrCallback::Builders(graph_builders) => {
1629                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1630                            CollectionKind::KeyedSingleton {
1631                                key_type,
1632                                value_type,
1633                                ..
1634                            }
1635                            | CollectionKind::KeyedStream {
1636                                key_type,
1637                                value_type,
1638                                ..
1639                            } => {
1640                                parse_quote!((#key_type, #value_type))
1641                            }
1642                            CollectionKind::Stream { element_type, .. }
1643                            | CollectionKind::Singleton { element_type, .. }
1644                            | CollectionKind::Optional { element_type, .. } => {
1645                                parse_quote!(#element_type)
1646                            }
1647                        };
1648
1649                        let cycle_id_ident = cycle_id.as_ident();
1650                        graph_builders
1651                            .get_dfir_mut(&input.metadata().location_id)
1652                            .add_dfir(
1653                                parse_quote! {
1654                                    #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1655                                },
1656                                None,
1657                                None,
1658                            );
1659                    }
1660                    // No ID, no callback
1661                    BuildersOrCallback::Callback(_, _) => {}
1662                }
1663            }
1664
1665            HydroRoot::EmbeddedOutput { ident, input, .. } => {
1666                let input_ident = input.emit_core(
1667                    builders_or_callback,
1668                    seen_tees,
1669                    built_tees,
1670                    next_stmt_id,
1671                    fold_hooked_idents,
1672                );
1673
1674                match builders_or_callback {
1675                    BuildersOrCallback::Builders(graph_builders) => {
1676                        graph_builders
1677                            .get_dfir_mut(&input.metadata().location_id)
1678                            .add_dfir(
1679                                parse_quote! {
1680                                    #input_ident -> for_each(&mut #ident);
1681                                },
1682                                None,
1683                                Some(&next_stmt_id.to_string()),
1684                            );
1685                    }
1686                    BuildersOrCallback::Callback(leaf_callback, _) => {
1687                        leaf_callback(self, next_stmt_id);
1688                    }
1689                }
1690
1691                let _ = next_stmt_id.get_and_increment();
1692            }
1693
1694            HydroRoot::Null { input, .. } => {
1695                let input_ident = input.emit_core(
1696                    builders_or_callback,
1697                    seen_tees,
1698                    built_tees,
1699                    next_stmt_id,
1700                    fold_hooked_idents,
1701                );
1702
1703                match builders_or_callback {
1704                    BuildersOrCallback::Builders(graph_builders) => {
1705                        graph_builders
1706                            .get_dfir_mut(&input.metadata().location_id)
1707                            .add_dfir(
1708                                parse_quote! {
1709                                    #input_ident -> for_each(|_| {});
1710                                },
1711                                None,
1712                                Some(&next_stmt_id.to_string()),
1713                            );
1714                    }
1715                    BuildersOrCallback::Callback(leaf_callback, _) => {
1716                        leaf_callback(self, next_stmt_id);
1717                    }
1718                }
1719
1720                let _ = next_stmt_id.get_and_increment();
1721            }
1722        }
1723    }
1724
1725    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1726        match self {
1727            HydroRoot::ForEach { op_metadata, .. }
1728            | HydroRoot::SendExternal { op_metadata, .. }
1729            | HydroRoot::DestSink { op_metadata, .. }
1730            | HydroRoot::CycleSink { op_metadata, .. }
1731            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1732            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1733        }
1734    }
1735
1736    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1737        match self {
1738            HydroRoot::ForEach { op_metadata, .. }
1739            | HydroRoot::SendExternal { op_metadata, .. }
1740            | HydroRoot::DestSink { op_metadata, .. }
1741            | HydroRoot::CycleSink { op_metadata, .. }
1742            | HydroRoot::EmbeddedOutput { op_metadata, .. }
1743            | HydroRoot::Null { op_metadata, .. } => op_metadata,
1744        }
1745    }
1746
1747    pub fn input(&self) -> &HydroNode {
1748        match self {
1749            HydroRoot::ForEach { input, .. }
1750            | HydroRoot::SendExternal { input, .. }
1751            | HydroRoot::DestSink { input, .. }
1752            | HydroRoot::CycleSink { input, .. }
1753            | HydroRoot::EmbeddedOutput { input, .. }
1754            | HydroRoot::Null { input, .. } => input,
1755        }
1756    }
1757
1758    pub fn input_metadata(&self) -> &HydroIrMetadata {
1759        self.input().metadata()
1760    }
1761
1762    pub fn print_root(&self) -> String {
1763        match self {
1764            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1765            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1766            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1767            HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1768            HydroRoot::EmbeddedOutput { ident, .. } => {
1769                format!("EmbeddedOutput({})", ident)
1770            }
1771            HydroRoot::Null { .. } => "Null".to_owned(),
1772        }
1773    }
1774
1775    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1776        match self {
1777            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1778                transform(f);
1779            }
1780            HydroRoot::SendExternal { .. }
1781            | HydroRoot::CycleSink { .. }
1782            | HydroRoot::EmbeddedOutput { .. }
1783            | HydroRoot::Null { .. } => {}
1784        }
1785    }
1786}
1787
1788#[cfg(feature = "build")]
1789fn tick_of(loc: &LocationId) -> Option<ClockId> {
1790    match loc {
1791        LocationId::Tick(id, _) => Some(*id),
1792        LocationId::Atomic(inner) => tick_of(inner),
1793        _ => None,
1794    }
1795}
1796
1797#[cfg(feature = "build")]
1798fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1799    match loc {
1800        LocationId::Tick(id, inner) => {
1801            *id = uf_find(uf, *id);
1802            remap_location(inner, uf);
1803        }
1804        LocationId::Atomic(inner) => {
1805            remap_location(inner, uf);
1806        }
1807        LocationId::Process(_) | LocationId::Cluster(_) => {}
1808    }
1809}
1810
1811#[cfg(feature = "build")]
1812fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1813    let p = *parent.get(&x).unwrap_or(&x);
1814    if p == x {
1815        return x;
1816    }
1817    let root = uf_find(parent, p);
1818    parent.insert(x, root);
1819    root
1820}
1821
1822#[cfg(feature = "build")]
1823fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1824    let ra = uf_find(parent, a);
1825    let rb = uf_find(parent, b);
1826    if ra != rb {
1827        parent.insert(ra, rb);
1828    }
1829}
1830
1831/// Traverse the IR to build a union-find that unifies tick IDs connected
1832/// through `Batch` and `YieldConcat` nodes at atomic boundaries, then
1833/// rewrite all `LocationId`s to use the representative tick ID.
1834#[cfg(feature = "build")]
1835pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1836    let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1837
1838    // Pass 1: collect unifications.
1839    transform_bottom_up(
1840        ir,
1841        &mut |_| {},
1842        &mut |node: &mut HydroNode| match node {
1843            HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1844                if let (Some(a), Some(b)) = (
1845                    tick_of(&inner.metadata().location_id),
1846                    tick_of(&metadata.location_id),
1847                ) {
1848                    uf_union(&mut uf, a, b);
1849                }
1850            }
1851            HydroNode::Chain {
1852                first,
1853                second,
1854                metadata,
1855            }
1856            | HydroNode::ChainFirst {
1857                first,
1858                second,
1859                metadata,
1860            }
1861            | HydroNode::MergeOrdered {
1862                first,
1863                second,
1864                metadata,
1865            } => {
1866                if let (Some(a), Some(b)) = (
1867                    tick_of(&first.metadata().location_id),
1868                    tick_of(&metadata.location_id),
1869                ) {
1870                    uf_union(&mut uf, a, b);
1871                }
1872                if let (Some(a), Some(b)) = (
1873                    tick_of(&second.metadata().location_id),
1874                    tick_of(&metadata.location_id),
1875                ) {
1876                    uf_union(&mut uf, a, b);
1877                }
1878            }
1879            _ => {}
1880        },
1881        false,
1882    );
1883
1884    // Pass 2: rewrite all LocationIds.
1885    transform_bottom_up(
1886        ir,
1887        &mut |_| {},
1888        &mut |node: &mut HydroNode| {
1889            remap_location(&mut node.metadata_mut().location_id, &mut uf);
1890        },
1891        false,
1892    );
1893}
1894
1895#[cfg(feature = "build")]
1896pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1897    let mut builders = SecondaryMap::new();
1898    let mut seen_tees = HashMap::new();
1899    let mut built_tees = HashMap::new();
1900    let mut next_stmt_id = StmtId::default();
1901    let mut fold_hooked_idents = HashSet::new();
1902    for leaf in ir {
1903        leaf.emit(
1904            &mut builders,
1905            &mut seen_tees,
1906            &mut built_tees,
1907            &mut next_stmt_id,
1908            &mut fold_hooked_idents,
1909        );
1910    }
1911    builders
1912}
1913
1914#[cfg(feature = "build")]
1915pub fn traverse_dfir(
1916    ir: &mut [HydroRoot],
1917    transform_root: impl FnMut(&mut HydroRoot, &mut StmtId),
1918    transform_node: impl FnMut(&mut HydroNode, &mut StmtId),
1919) {
1920    let mut seen_tees = HashMap::new();
1921    let mut built_tees = HashMap::new();
1922    let mut next_stmt_id = StmtId::default();
1923    let mut fold_hooked_idents = HashSet::new();
1924    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1925    ir.iter_mut().for_each(|leaf| {
1926        leaf.emit_core(
1927            &mut callback,
1928            &mut seen_tees,
1929            &mut built_tees,
1930            &mut next_stmt_id,
1931            &mut fold_hooked_idents,
1932        );
1933    });
1934}
1935
1936pub fn transform_bottom_up(
1937    ir: &mut [HydroRoot],
1938    transform_root: &mut impl FnMut(&mut HydroRoot),
1939    transform_node: &mut impl FnMut(&mut HydroNode),
1940    check_well_formed: bool,
1941) {
1942    let mut seen_tees = HashMap::new();
1943    ir.iter_mut().for_each(|leaf| {
1944        leaf.transform_bottom_up(
1945            transform_root,
1946            transform_node,
1947            &mut seen_tees,
1948            check_well_formed,
1949        );
1950    });
1951}
1952
1953pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1954    let mut seen_tees = HashMap::new();
1955    ir.iter()
1956        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1957        .collect()
1958}
1959
1960type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1961thread_local! {
1962    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1963    /// Tracks shared nodes already serialized so that `SharedNode::serialize`
1964    /// emits the full subtree only once and uses a `"<shared N>"` back-reference
1965    /// on subsequent encounters, preventing infinite loops.
1966    static SERIALIZED_SHARED: PrintedTees
1967        = const { RefCell::new(None) };
1968}
1969
1970pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1971    PRINTED_TEES.with(|printed_tees| {
1972        let mut printed_tees_mut = printed_tees.borrow_mut();
1973        *printed_tees_mut = Some((0, HashMap::new()));
1974        drop(printed_tees_mut);
1975
1976        let ret = f();
1977
1978        let mut printed_tees_mut = printed_tees.borrow_mut();
1979        *printed_tees_mut = None;
1980
1981        ret
1982    })
1983}
1984
1985/// Runs `f` with a fresh shared-node deduplication scope for serialization.
1986/// Any `SharedNode` serialized inside `f` will be tracked; the first occurrence
1987/// emits the full subtree while later occurrences emit a `{"$shared_ref": id}`
1988/// back-reference.  The tracking state is restored when `f` returns or panics.
1989pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1990    let _guard = SerializedSharedGuard::enter();
1991    f()
1992}
1993
1994/// Serialize a slice of [`HydroRoot`]s to a JSON string with shared-node deduplication.
1995#[cfg(feature = "viz")]
1996pub fn ir_to_json(ir: &[HydroRoot]) -> Result<String, serde_json::Error> {
1997    serialize_dedup_shared(|| serde_json::to_string(ir))
1998}
1999
2000/// RAII guard that saves/restores the `SERIALIZED_SHARED` thread-local,
2001/// making `serialize_dedup_shared` re-entrant and panic-safe.
2002struct SerializedSharedGuard {
2003    previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
2004}
2005
2006impl SerializedSharedGuard {
2007    fn enter() -> Self {
2008        let previous = SERIALIZED_SHARED.with(|cell| {
2009            let mut guard = cell.borrow_mut();
2010            guard.replace((0, HashMap::new()))
2011        });
2012        Self { previous }
2013    }
2014}
2015
2016impl Drop for SerializedSharedGuard {
2017    fn drop(&mut self) {
2018        SERIALIZED_SHARED.with(|cell| {
2019            *cell.borrow_mut() = self.previous.take();
2020        });
2021    }
2022}
2023
2024pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
2025
2026impl serde::Serialize for SharedNode {
2027    /// Multiple `SharedNode`s can point to the same underlying `HydroNode` (via
2028    /// `Tee` / `Partition`).  A naïve recursive serialization would revisit the
2029    /// same subtree every time and, if the graph ever contains a cycle, loop
2030    /// forever.
2031    ///
2032    /// We keep a thread-local map (`SERIALIZED_SHARED`) from raw `Rc` pointer →
2033    /// integer id.  The first time we see a pointer we assign it the next id and
2034    /// emit the full subtree as `{"$shared": <id>, "node": …}`.  Every later
2035    /// encounter of the same pointer emits `{"$shared_ref": <id>}`, cutting the
2036    /// recursion.  Requires an active `serialize_dedup_shared` scope.
2037    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2038        SERIALIZED_SHARED.with(|cell| {
2039            let mut guard = cell.borrow_mut();
2040            // (next_id, pointer → assigned_id)
2041            let state = guard.as_mut().ok_or_else(|| {
2042                serde::ser::Error::custom(
2043                    "SharedNode serialization requires an active serialize_dedup_shared scope",
2044                )
2045            })?;
2046            let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
2047
2048            if let Some(&id) = state.1.get(&ptr) {
2049                drop(guard);
2050                use serde::ser::SerializeMap;
2051                let mut map = serializer.serialize_map(Some(1))?;
2052                map.serialize_entry("$shared_ref", &id)?;
2053                map.end()
2054            } else {
2055                let id = state.0;
2056                state.0 += 1;
2057                state.1.insert(ptr, id);
2058                drop(guard);
2059
2060                use serde::ser::SerializeMap;
2061                let mut map = serializer.serialize_map(Some(2))?;
2062                map.serialize_entry("$shared", &id)?;
2063                map.serialize_entry("node", &*self.0.borrow())?;
2064                map.end()
2065            }
2066        })
2067    }
2068}
2069
2070impl SharedNode {
2071    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2072        Rc::as_ptr(&self.0)
2073    }
2074}
2075
2076impl Debug for SharedNode {
2077    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2078        PRINTED_TEES.with(|printed_tees| {
2079            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2080            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2081
2082            if let Some(printed_tees_mut) = printed_tees_mut {
2083                if let Some(existing) = printed_tees_mut
2084                    .1
2085                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2086                {
2087                    write!(f, "<shared {}>", existing)
2088                } else {
2089                    let next_id = printed_tees_mut.0;
2090                    printed_tees_mut.0 += 1;
2091                    printed_tees_mut
2092                        .1
2093                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2094                    drop(printed_tees_mut_borrow);
2095                    write!(f, "<shared {}>: ", next_id)?;
2096                    Debug::fmt(&self.0.borrow(), f)
2097                }
2098            } else {
2099                drop(printed_tees_mut_borrow);
2100                write!(f, "<shared>: ")?;
2101                Debug::fmt(&self.0.borrow(), f)
2102            }
2103        })
2104    }
2105}
2106
2107impl Hash for SharedNode {
2108    fn hash<H: Hasher>(&self, state: &mut H) {
2109        self.0.borrow_mut().hash(state);
2110    }
2111}
2112
2113#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2114pub enum BoundKind {
2115    Unbounded,
2116    Bounded,
2117}
2118
2119#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2120pub enum StreamOrder {
2121    NoOrder,
2122    TotalOrder,
2123}
2124
2125#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2126pub enum StreamRetry {
2127    AtLeastOnce,
2128    ExactlyOnce,
2129}
2130
2131#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2132pub enum KeyedSingletonBoundKind {
2133    Unbounded,
2134    MonotonicValue,
2135    BoundedValue,
2136    Bounded,
2137}
2138
2139#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2140pub enum SingletonBoundKind {
2141    Unbounded,
2142    Monotonic,
2143    Bounded,
2144}
2145
2146#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2147pub enum CollectionKind {
2148    Stream {
2149        bound: BoundKind,
2150        order: StreamOrder,
2151        retry: StreamRetry,
2152        element_type: DebugType,
2153    },
2154    Singleton {
2155        bound: SingletonBoundKind,
2156        element_type: DebugType,
2157    },
2158    Optional {
2159        bound: BoundKind,
2160        element_type: DebugType,
2161    },
2162    KeyedStream {
2163        bound: BoundKind,
2164        value_order: StreamOrder,
2165        value_retry: StreamRetry,
2166        key_type: DebugType,
2167        value_type: DebugType,
2168    },
2169    KeyedSingleton {
2170        bound: KeyedSingletonBoundKind,
2171        key_type: DebugType,
2172        value_type: DebugType,
2173    },
2174}
2175
2176impl CollectionKind {
2177    pub fn is_bounded(&self) -> bool {
2178        matches!(
2179            self,
2180            CollectionKind::Stream {
2181                bound: BoundKind::Bounded,
2182                ..
2183            } | CollectionKind::Singleton {
2184                bound: SingletonBoundKind::Bounded,
2185                ..
2186            } | CollectionKind::Optional {
2187                bound: BoundKind::Bounded,
2188                ..
2189            } | CollectionKind::KeyedStream {
2190                bound: BoundKind::Bounded,
2191                ..
2192            } | CollectionKind::KeyedSingleton {
2193                bound: KeyedSingletonBoundKind::Bounded,
2194                ..
2195            }
2196        )
2197    }
2198}
2199
2200#[derive(Clone, serde::Serialize)]
2201pub struct HydroIrMetadata {
2202    pub location_id: LocationId,
2203    pub collection_kind: CollectionKind,
2204    pub consistency: Option<ClusterConsistency>,
2205    pub cardinality: Option<usize>,
2206    pub tag: Option<String>,
2207    pub op: HydroIrOpMetadata,
2208}
2209
2210// HydroIrMetadata shouldn't be used to hash or compare
2211impl Hash for HydroIrMetadata {
2212    fn hash<H: Hasher>(&self, _: &mut H) {}
2213}
2214
2215impl PartialEq for HydroIrMetadata {
2216    fn eq(&self, _: &Self) -> bool {
2217        true
2218    }
2219}
2220
2221impl Eq for HydroIrMetadata {}
2222
2223impl Debug for HydroIrMetadata {
2224    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2225        f.debug_struct("HydroIrMetadata")
2226            .field("location_id", &self.location_id)
2227            .field("collection_kind", &self.collection_kind)
2228            .finish()
2229    }
2230}
2231
2232/// Metadata that is specific to the operator itself, rather than its outputs.
2233/// This is available on _both_ inner nodes and roots.
2234#[derive(Clone, serde::Serialize)]
2235pub struct HydroIrOpMetadata {
2236    #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2237    pub backtrace: Backtrace,
2238    pub cpu_usage: Option<f64>,
2239    pub network_recv_cpu_usage: Option<f64>,
2240    pub id: Option<usize>,
2241}
2242
2243impl HydroIrOpMetadata {
2244    #[expect(
2245        clippy::new_without_default,
2246        reason = "explicit calls to new ensure correct backtrace bounds"
2247    )]
2248    pub fn new() -> HydroIrOpMetadata {
2249        Self::new_with_skip(1)
2250    }
2251
2252    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2253        HydroIrOpMetadata {
2254            backtrace: Backtrace::get_backtrace(2 + skip_count),
2255            cpu_usage: None,
2256            network_recv_cpu_usage: None,
2257            id: None,
2258        }
2259    }
2260}
2261
2262impl Debug for HydroIrOpMetadata {
2263    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2264        f.debug_struct("HydroIrOpMetadata").finish()
2265    }
2266}
2267
2268impl Hash for HydroIrOpMetadata {
2269    fn hash<H: Hasher>(&self, _: &mut H) {}
2270}
2271
2272/// An intermediate node in a Hydro graph, which consumes data
2273/// from upstream nodes and emits data to downstream nodes.
2274#[derive(Debug, Hash, serde::Serialize)]
2275pub enum HydroNode {
2276    Placeholder,
2277
2278    /// Manually "casts" between two different collection kinds.
2279    ///
2280    /// Using this IR node requires special care, since it bypasses many of Hydro's core
2281    /// correctness checks. In particular, the user must ensure that every possible
2282    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
2283    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
2284    /// collection. This ensures that the simulator does not miss any possible outputs.
2285    Cast {
2286        inner: Box<HydroNode>,
2287        metadata: HydroIrMetadata,
2288    },
2289
2290    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
2291    /// interpretation of the input stream.
2292    ///
2293    /// In production, this simply passes through the input, but in simulation, this operator
2294    /// explicitly selects a randomized interpretation.
2295    ObserveNonDet {
2296        inner: Box<HydroNode>,
2297        trusted: bool, // if true, we do not need to simulate non-determinism
2298        metadata: HydroIrMetadata,
2299    },
2300
2301    Source {
2302        source: HydroSource,
2303        metadata: HydroIrMetadata,
2304    },
2305
2306    SingletonSource {
2307        value: DebugExpr,
2308        first_tick_only: bool,
2309        metadata: HydroIrMetadata,
2310    },
2311
2312    CycleSource {
2313        cycle_id: CycleId,
2314        metadata: HydroIrMetadata,
2315    },
2316
2317    Tee {
2318        inner: SharedNode,
2319        metadata: HydroIrMetadata,
2320    },
2321
2322    /// A singleton materialization point. Wraps a SharedNode so that:
2323    /// - The pipe output delivers the single item to one consumer
2324    /// - `#var` references can borrow the value from the singleton slot
2325    ///
2326    /// In DFIR codegen, emits `ident = inner_ident -> singleton()`.
2327    ///
2328    /// Uses the same `built_tees` dedup pattern as `Tee`.
2329    Singleton {
2330        inner: SharedNode,
2331        metadata: HydroIrMetadata,
2332    },
2333
2334    Partition {
2335        inner: SharedNode,
2336        f: ClosureExpr,
2337        is_true: bool,
2338        metadata: HydroIrMetadata,
2339    },
2340
2341    BeginAtomic {
2342        inner: Box<HydroNode>,
2343        metadata: HydroIrMetadata,
2344    },
2345
2346    EndAtomic {
2347        inner: Box<HydroNode>,
2348        metadata: HydroIrMetadata,
2349    },
2350
2351    Batch {
2352        inner: Box<HydroNode>,
2353        metadata: HydroIrMetadata,
2354    },
2355
2356    YieldConcat {
2357        inner: Box<HydroNode>,
2358        metadata: HydroIrMetadata,
2359    },
2360
2361    Chain {
2362        first: Box<HydroNode>,
2363        second: Box<HydroNode>,
2364        metadata: HydroIrMetadata,
2365    },
2366
2367    MergeOrdered {
2368        first: Box<HydroNode>,
2369        second: Box<HydroNode>,
2370        metadata: HydroIrMetadata,
2371    },
2372
2373    ChainFirst {
2374        first: Box<HydroNode>,
2375        second: Box<HydroNode>,
2376        metadata: HydroIrMetadata,
2377    },
2378
2379    CrossProduct {
2380        left: Box<HydroNode>,
2381        right: Box<HydroNode>,
2382        metadata: HydroIrMetadata,
2383    },
2384
2385    CrossSingleton {
2386        left: Box<HydroNode>,
2387        right: Box<HydroNode>,
2388        metadata: HydroIrMetadata,
2389    },
2390
2391    Join {
2392        left: Box<HydroNode>,
2393        right: Box<HydroNode>,
2394        metadata: HydroIrMetadata,
2395    },
2396
2397    /// Asymmetric join where the right (build) side is bounded.
2398    /// The build side is accumulated (stratum-delayed) into a hash table,
2399    /// then the left (probe) side streams through preserving its ordering.
2400    JoinHalf {
2401        left: Box<HydroNode>,
2402        right: Box<HydroNode>,
2403        metadata: HydroIrMetadata,
2404    },
2405
2406    Difference {
2407        pos: Box<HydroNode>,
2408        neg: Box<HydroNode>,
2409        metadata: HydroIrMetadata,
2410    },
2411
2412    AntiJoin {
2413        pos: Box<HydroNode>,
2414        neg: Box<HydroNode>,
2415        metadata: HydroIrMetadata,
2416    },
2417
2418    ResolveFutures {
2419        input: Box<HydroNode>,
2420        metadata: HydroIrMetadata,
2421    },
2422    ResolveFuturesBlocking {
2423        input: Box<HydroNode>,
2424        metadata: HydroIrMetadata,
2425    },
2426    ResolveFuturesOrdered {
2427        input: Box<HydroNode>,
2428        metadata: HydroIrMetadata,
2429    },
2430
2431    Map {
2432        f: ClosureExpr,
2433        input: Box<HydroNode>,
2434        metadata: HydroIrMetadata,
2435    },
2436    FlatMap {
2437        f: ClosureExpr,
2438        input: Box<HydroNode>,
2439        metadata: HydroIrMetadata,
2440    },
2441    FlatMapStreamBlocking {
2442        f: ClosureExpr,
2443        input: Box<HydroNode>,
2444        metadata: HydroIrMetadata,
2445    },
2446    Filter {
2447        f: ClosureExpr,
2448        input: Box<HydroNode>,
2449        metadata: HydroIrMetadata,
2450    },
2451    FilterMap {
2452        f: ClosureExpr,
2453        input: Box<HydroNode>,
2454        metadata: HydroIrMetadata,
2455    },
2456
2457    DeferTick {
2458        input: Box<HydroNode>,
2459        metadata: HydroIrMetadata,
2460    },
2461    Enumerate {
2462        input: Box<HydroNode>,
2463        metadata: HydroIrMetadata,
2464    },
2465    Inspect {
2466        f: ClosureExpr,
2467        input: Box<HydroNode>,
2468        metadata: HydroIrMetadata,
2469    },
2470
2471    Unique {
2472        input: Box<HydroNode>,
2473        metadata: HydroIrMetadata,
2474    },
2475
2476    Sort {
2477        input: Box<HydroNode>,
2478        metadata: HydroIrMetadata,
2479    },
2480    Fold {
2481        init: ClosureExpr,
2482        acc: ClosureExpr,
2483        input: Box<HydroNode>,
2484        metadata: HydroIrMetadata,
2485    },
2486
2487    Scan {
2488        init: ClosureExpr,
2489        acc: ClosureExpr,
2490        input: Box<HydroNode>,
2491        metadata: HydroIrMetadata,
2492    },
2493    ScanAsyncBlocking {
2494        init: ClosureExpr,
2495        acc: ClosureExpr,
2496        input: Box<HydroNode>,
2497        metadata: HydroIrMetadata,
2498    },
2499    FoldKeyed {
2500        init: ClosureExpr,
2501        acc: ClosureExpr,
2502        input: Box<HydroNode>,
2503        metadata: HydroIrMetadata,
2504    },
2505
2506    Reduce {
2507        f: ClosureExpr,
2508        input: Box<HydroNode>,
2509        metadata: HydroIrMetadata,
2510    },
2511    ReduceKeyed {
2512        f: ClosureExpr,
2513        input: Box<HydroNode>,
2514        metadata: HydroIrMetadata,
2515    },
2516    ReduceKeyedWatermark {
2517        f: ClosureExpr,
2518        input: Box<HydroNode>,
2519        watermark: Box<HydroNode>,
2520        metadata: HydroIrMetadata,
2521    },
2522
2523    Network {
2524        name: Option<String>,
2525        networking_info: crate::networking::NetworkingInfo,
2526        serialize_fn: Option<DebugExpr>,
2527        instantiate_fn: DebugInstantiate,
2528        deserialize_fn: Option<DebugExpr>,
2529        input: Box<HydroNode>,
2530        metadata: HydroIrMetadata,
2531    },
2532
2533    ExternalInput {
2534        from_external_key: LocationKey,
2535        from_port_id: ExternalPortId,
2536        from_many: bool,
2537        codec_type: DebugType,
2538        #[serde(skip)]
2539        port_hint: NetworkHint,
2540        instantiate_fn: DebugInstantiate,
2541        deserialize_fn: Option<DebugExpr>,
2542        metadata: HydroIrMetadata,
2543    },
2544
2545    Counter {
2546        tag: String,
2547        duration: DebugExpr,
2548        prefix: String,
2549        input: Box<HydroNode>,
2550        metadata: HydroIrMetadata,
2551    },
2552
2553    AssertIsConsistent {
2554        inner: Box<HydroNode>,
2555        trusted: bool,
2556        metadata: HydroIrMetadata,
2557    },
2558
2559    UnboundSingleton {
2560        inner: Box<HydroNode>,
2561        metadata: HydroIrMetadata,
2562    },
2563}
2564
2565pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2566pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2567
2568impl HydroNode {
2569    pub fn transform_bottom_up(
2570        &mut self,
2571        transform: &mut impl FnMut(&mut HydroNode),
2572        seen_tees: &mut SeenSharedNodes,
2573        check_well_formed: bool,
2574    ) {
2575        self.transform_children(
2576            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2577            seen_tees,
2578        );
2579
2580        transform(self);
2581
2582        let self_location = self.metadata().location_id.root();
2583
2584        if check_well_formed {
2585            match &*self {
2586                HydroNode::Network { .. } => {}
2587                _ => {
2588                    self.input_metadata().iter().for_each(|i| {
2589                        if i.location_id.root() != self_location {
2590                            panic!(
2591                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2592                                i,
2593                                i.location_id.root(),
2594                                self,
2595                                self_location
2596                            )
2597                        }
2598                    });
2599                }
2600            }
2601        }
2602    }
2603
2604    #[inline(always)]
2605    pub fn transform_children(
2606        &mut self,
2607        mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2608        seen_tees: &mut SeenSharedNodes,
2609    ) {
2610        match self {
2611            HydroNode::Placeholder => {
2612                panic!();
2613            }
2614
2615            HydroNode::Source { .. }
2616            | HydroNode::SingletonSource { .. }
2617            | HydroNode::CycleSource { .. }
2618            | HydroNode::ExternalInput { .. } => {}
2619
2620            HydroNode::Tee { inner, .. } | HydroNode::Singleton { inner, .. } => {
2621                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2622                    *inner = SharedNode(transformed.clone());
2623                } else {
2624                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2625                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2626                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2627                    transform(&mut orig, seen_tees);
2628                    *transformed_cell.borrow_mut() = orig;
2629                    *inner = SharedNode(transformed_cell);
2630                }
2631            }
2632
2633            HydroNode::Partition { inner, f, .. } => {
2634                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2635                    *inner = SharedNode(transformed.clone());
2636                } else {
2637                    f.transform_children(&mut transform, seen_tees);
2638                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2639                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2640                    let mut orig = inner.0.replace(HydroNode::Placeholder);
2641                    transform(&mut orig, seen_tees);
2642                    *transformed_cell.borrow_mut() = orig;
2643                    *inner = SharedNode(transformed_cell);
2644                }
2645            }
2646
2647            HydroNode::Cast { inner, .. }
2648            | HydroNode::ObserveNonDet { inner, .. }
2649            | HydroNode::BeginAtomic { inner, .. }
2650            | HydroNode::EndAtomic { inner, .. }
2651            | HydroNode::Batch { inner, .. }
2652            | HydroNode::YieldConcat { inner, .. }
2653            | HydroNode::UnboundSingleton { inner, .. }
2654            | HydroNode::AssertIsConsistent { inner, .. } => {
2655                transform(inner.as_mut(), seen_tees);
2656            }
2657
2658            HydroNode::Chain { first, second, .. } => {
2659                transform(first.as_mut(), seen_tees);
2660                transform(second.as_mut(), seen_tees);
2661            }
2662
2663            HydroNode::MergeOrdered { first, second, .. } => {
2664                transform(first.as_mut(), seen_tees);
2665                transform(second.as_mut(), seen_tees);
2666            }
2667
2668            HydroNode::ChainFirst { first, second, .. } => {
2669                transform(first.as_mut(), seen_tees);
2670                transform(second.as_mut(), seen_tees);
2671            }
2672
2673            HydroNode::CrossSingleton { left, right, .. }
2674            | HydroNode::CrossProduct { left, right, .. }
2675            | HydroNode::Join { left, right, .. }
2676            | HydroNode::JoinHalf { left, right, .. } => {
2677                transform(left.as_mut(), seen_tees);
2678                transform(right.as_mut(), seen_tees);
2679            }
2680
2681            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2682                transform(pos.as_mut(), seen_tees);
2683                transform(neg.as_mut(), seen_tees);
2684            }
2685
2686            HydroNode::Map { f, input, .. } => {
2687                f.transform_children(&mut transform, seen_tees);
2688                transform(input.as_mut(), seen_tees);
2689            }
2690            HydroNode::FlatMap { f, input, .. }
2691            | HydroNode::FlatMapStreamBlocking { f, input, .. }
2692            | HydroNode::Filter { f, input, .. }
2693            | HydroNode::FilterMap { f, input, .. }
2694            | HydroNode::Inspect { f, input, .. }
2695            | HydroNode::Reduce { f, input, .. }
2696            | HydroNode::ReduceKeyed { f, input, .. } => {
2697                f.transform_children(&mut transform, seen_tees);
2698                transform(input.as_mut(), seen_tees);
2699            }
2700            HydroNode::ReduceKeyedWatermark {
2701                f,
2702                input,
2703                watermark,
2704                ..
2705            } => {
2706                f.transform_children(&mut transform, seen_tees);
2707                transform(input.as_mut(), seen_tees);
2708                transform(watermark.as_mut(), seen_tees);
2709            }
2710            HydroNode::Fold {
2711                init, acc, input, ..
2712            }
2713            | HydroNode::Scan {
2714                init, acc, input, ..
2715            }
2716            | HydroNode::ScanAsyncBlocking {
2717                init, acc, input, ..
2718            }
2719            | HydroNode::FoldKeyed {
2720                init, acc, input, ..
2721            } => {
2722                init.transform_children(&mut transform, seen_tees);
2723                acc.transform_children(&mut transform, seen_tees);
2724                transform(input.as_mut(), seen_tees);
2725            }
2726            HydroNode::ResolveFutures { input, .. }
2727            | HydroNode::ResolveFuturesBlocking { input, .. }
2728            | HydroNode::ResolveFuturesOrdered { input, .. }
2729            | HydroNode::Sort { input, .. }
2730            | HydroNode::DeferTick { input, .. }
2731            | HydroNode::Enumerate { input, .. }
2732            | HydroNode::Unique { input, .. }
2733            | HydroNode::Network { input, .. }
2734            | HydroNode::Counter { input, .. } => {
2735                transform(input.as_mut(), seen_tees);
2736            }
2737        }
2738    }
2739
2740    pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2741        match self {
2742            HydroNode::Placeholder => HydroNode::Placeholder,
2743            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2744                inner: Box::new(inner.deep_clone(seen_tees)),
2745                metadata: metadata.clone(),
2746            },
2747            HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2748                inner: Box::new(inner.deep_clone(seen_tees)),
2749                metadata: metadata.clone(),
2750            },
2751            HydroNode::ObserveNonDet {
2752                inner,
2753                trusted,
2754                metadata,
2755            } => HydroNode::ObserveNonDet {
2756                inner: Box::new(inner.deep_clone(seen_tees)),
2757                trusted: *trusted,
2758                metadata: metadata.clone(),
2759            },
2760            HydroNode::AssertIsConsistent {
2761                inner,
2762                trusted,
2763                metadata,
2764            } => HydroNode::AssertIsConsistent {
2765                inner: Box::new(inner.deep_clone(seen_tees)),
2766                trusted: *trusted,
2767                metadata: metadata.clone(),
2768            },
2769            HydroNode::Source { source, metadata } => HydroNode::Source {
2770                source: source.clone(),
2771                metadata: metadata.clone(),
2772            },
2773            HydroNode::SingletonSource {
2774                value,
2775                first_tick_only,
2776                metadata,
2777            } => HydroNode::SingletonSource {
2778                value: value.clone(),
2779                first_tick_only: *first_tick_only,
2780                metadata: metadata.clone(),
2781            },
2782            HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2783                cycle_id: *cycle_id,
2784                metadata: metadata.clone(),
2785            },
2786            HydroNode::Tee { inner, metadata } | HydroNode::Singleton { inner, metadata } => {
2787                let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2788                    SharedNode(transformed.clone())
2789                } else {
2790                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2791                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2792                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2793                    *new_rc.borrow_mut() = cloned;
2794                    SharedNode(new_rc)
2795                };
2796                if matches!(self, HydroNode::Singleton { .. }) {
2797                    HydroNode::Singleton {
2798                        inner: cloned_inner,
2799                        metadata: metadata.clone(),
2800                    }
2801                } else {
2802                    HydroNode::Tee {
2803                        inner: cloned_inner,
2804                        metadata: metadata.clone(),
2805                    }
2806                }
2807            }
2808            HydroNode::Partition {
2809                inner,
2810                f,
2811                is_true,
2812                metadata,
2813            } => {
2814                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2815                    HydroNode::Partition {
2816                        inner: SharedNode(transformed.clone()),
2817                        f: f.deep_clone(seen_tees),
2818                        is_true: *is_true,
2819                        metadata: metadata.clone(),
2820                    }
2821                } else {
2822                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2823                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
2824                    let cloned = inner.0.borrow().deep_clone(seen_tees);
2825                    *new_rc.borrow_mut() = cloned;
2826                    HydroNode::Partition {
2827                        inner: SharedNode(new_rc),
2828                        f: f.deep_clone(seen_tees),
2829                        is_true: *is_true,
2830                        metadata: metadata.clone(),
2831                    }
2832                }
2833            }
2834            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2835                inner: Box::new(inner.deep_clone(seen_tees)),
2836                metadata: metadata.clone(),
2837            },
2838            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2839                inner: Box::new(inner.deep_clone(seen_tees)),
2840                metadata: metadata.clone(),
2841            },
2842            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2843                inner: Box::new(inner.deep_clone(seen_tees)),
2844                metadata: metadata.clone(),
2845            },
2846            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2847                inner: Box::new(inner.deep_clone(seen_tees)),
2848                metadata: metadata.clone(),
2849            },
2850            HydroNode::Chain {
2851                first,
2852                second,
2853                metadata,
2854            } => HydroNode::Chain {
2855                first: Box::new(first.deep_clone(seen_tees)),
2856                second: Box::new(second.deep_clone(seen_tees)),
2857                metadata: metadata.clone(),
2858            },
2859            HydroNode::MergeOrdered {
2860                first,
2861                second,
2862                metadata,
2863            } => HydroNode::MergeOrdered {
2864                first: Box::new(first.deep_clone(seen_tees)),
2865                second: Box::new(second.deep_clone(seen_tees)),
2866                metadata: metadata.clone(),
2867            },
2868            HydroNode::ChainFirst {
2869                first,
2870                second,
2871                metadata,
2872            } => HydroNode::ChainFirst {
2873                first: Box::new(first.deep_clone(seen_tees)),
2874                second: Box::new(second.deep_clone(seen_tees)),
2875                metadata: metadata.clone(),
2876            },
2877            HydroNode::CrossProduct {
2878                left,
2879                right,
2880                metadata,
2881            } => HydroNode::CrossProduct {
2882                left: Box::new(left.deep_clone(seen_tees)),
2883                right: Box::new(right.deep_clone(seen_tees)),
2884                metadata: metadata.clone(),
2885            },
2886            HydroNode::CrossSingleton {
2887                left,
2888                right,
2889                metadata,
2890            } => HydroNode::CrossSingleton {
2891                left: Box::new(left.deep_clone(seen_tees)),
2892                right: Box::new(right.deep_clone(seen_tees)),
2893                metadata: metadata.clone(),
2894            },
2895            HydroNode::Join {
2896                left,
2897                right,
2898                metadata,
2899            } => HydroNode::Join {
2900                left: Box::new(left.deep_clone(seen_tees)),
2901                right: Box::new(right.deep_clone(seen_tees)),
2902                metadata: metadata.clone(),
2903            },
2904            HydroNode::JoinHalf {
2905                left,
2906                right,
2907                metadata,
2908            } => HydroNode::JoinHalf {
2909                left: Box::new(left.deep_clone(seen_tees)),
2910                right: Box::new(right.deep_clone(seen_tees)),
2911                metadata: metadata.clone(),
2912            },
2913            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2914                pos: Box::new(pos.deep_clone(seen_tees)),
2915                neg: Box::new(neg.deep_clone(seen_tees)),
2916                metadata: metadata.clone(),
2917            },
2918            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2919                pos: Box::new(pos.deep_clone(seen_tees)),
2920                neg: Box::new(neg.deep_clone(seen_tees)),
2921                metadata: metadata.clone(),
2922            },
2923            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2924                input: Box::new(input.deep_clone(seen_tees)),
2925                metadata: metadata.clone(),
2926            },
2927            HydroNode::ResolveFuturesBlocking { input, metadata } => {
2928                HydroNode::ResolveFuturesBlocking {
2929                    input: Box::new(input.deep_clone(seen_tees)),
2930                    metadata: metadata.clone(),
2931                }
2932            }
2933            HydroNode::ResolveFuturesOrdered { input, metadata } => {
2934                HydroNode::ResolveFuturesOrdered {
2935                    input: Box::new(input.deep_clone(seen_tees)),
2936                    metadata: metadata.clone(),
2937                }
2938            }
2939            HydroNode::Map { f, input, metadata } => HydroNode::Map {
2940                f: f.deep_clone(seen_tees),
2941                input: Box::new(input.deep_clone(seen_tees)),
2942                metadata: metadata.clone(),
2943            },
2944            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2945                f: f.deep_clone(seen_tees),
2946                input: Box::new(input.deep_clone(seen_tees)),
2947                metadata: metadata.clone(),
2948            },
2949            HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2950                HydroNode::FlatMapStreamBlocking {
2951                    f: f.deep_clone(seen_tees),
2952                    input: Box::new(input.deep_clone(seen_tees)),
2953                    metadata: metadata.clone(),
2954                }
2955            }
2956            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2957                f: f.deep_clone(seen_tees),
2958                input: Box::new(input.deep_clone(seen_tees)),
2959                metadata: metadata.clone(),
2960            },
2961            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2962                f: f.deep_clone(seen_tees),
2963                input: Box::new(input.deep_clone(seen_tees)),
2964                metadata: metadata.clone(),
2965            },
2966            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2967                input: Box::new(input.deep_clone(seen_tees)),
2968                metadata: metadata.clone(),
2969            },
2970            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2971                input: Box::new(input.deep_clone(seen_tees)),
2972                metadata: metadata.clone(),
2973            },
2974            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2975                f: f.deep_clone(seen_tees),
2976                input: Box::new(input.deep_clone(seen_tees)),
2977                metadata: metadata.clone(),
2978            },
2979            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2980                input: Box::new(input.deep_clone(seen_tees)),
2981                metadata: metadata.clone(),
2982            },
2983            HydroNode::Sort { input, metadata } => HydroNode::Sort {
2984                input: Box::new(input.deep_clone(seen_tees)),
2985                metadata: metadata.clone(),
2986            },
2987            HydroNode::Fold {
2988                init,
2989                acc,
2990                input,
2991                metadata,
2992            } => HydroNode::Fold {
2993                init: init.deep_clone(seen_tees),
2994                acc: acc.deep_clone(seen_tees),
2995                input: Box::new(input.deep_clone(seen_tees)),
2996                metadata: metadata.clone(),
2997            },
2998            HydroNode::Scan {
2999                init,
3000                acc,
3001                input,
3002                metadata,
3003            } => HydroNode::Scan {
3004                init: init.deep_clone(seen_tees),
3005                acc: acc.deep_clone(seen_tees),
3006                input: Box::new(input.deep_clone(seen_tees)),
3007                metadata: metadata.clone(),
3008            },
3009            HydroNode::ScanAsyncBlocking {
3010                init,
3011                acc,
3012                input,
3013                metadata,
3014            } => HydroNode::ScanAsyncBlocking {
3015                init: init.deep_clone(seen_tees),
3016                acc: acc.deep_clone(seen_tees),
3017                input: Box::new(input.deep_clone(seen_tees)),
3018                metadata: metadata.clone(),
3019            },
3020            HydroNode::FoldKeyed {
3021                init,
3022                acc,
3023                input,
3024                metadata,
3025            } => HydroNode::FoldKeyed {
3026                init: init.deep_clone(seen_tees),
3027                acc: acc.deep_clone(seen_tees),
3028                input: Box::new(input.deep_clone(seen_tees)),
3029                metadata: metadata.clone(),
3030            },
3031            HydroNode::ReduceKeyedWatermark {
3032                f,
3033                input,
3034                watermark,
3035                metadata,
3036            } => HydroNode::ReduceKeyedWatermark {
3037                f: f.deep_clone(seen_tees),
3038                input: Box::new(input.deep_clone(seen_tees)),
3039                watermark: Box::new(watermark.deep_clone(seen_tees)),
3040                metadata: metadata.clone(),
3041            },
3042            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
3043                f: f.deep_clone(seen_tees),
3044                input: Box::new(input.deep_clone(seen_tees)),
3045                metadata: metadata.clone(),
3046            },
3047            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3048                f: f.deep_clone(seen_tees),
3049                input: Box::new(input.deep_clone(seen_tees)),
3050                metadata: metadata.clone(),
3051            },
3052            HydroNode::Network {
3053                name,
3054                networking_info,
3055                serialize_fn,
3056                instantiate_fn,
3057                deserialize_fn,
3058                input,
3059                metadata,
3060            } => HydroNode::Network {
3061                name: name.clone(),
3062                networking_info: networking_info.clone(),
3063                serialize_fn: serialize_fn.clone(),
3064                instantiate_fn: instantiate_fn.clone(),
3065                deserialize_fn: deserialize_fn.clone(),
3066                input: Box::new(input.deep_clone(seen_tees)),
3067                metadata: metadata.clone(),
3068            },
3069            HydroNode::ExternalInput {
3070                from_external_key,
3071                from_port_id,
3072                from_many,
3073                codec_type,
3074                port_hint,
3075                instantiate_fn,
3076                deserialize_fn,
3077                metadata,
3078            } => HydroNode::ExternalInput {
3079                from_external_key: *from_external_key,
3080                from_port_id: *from_port_id,
3081                from_many: *from_many,
3082                codec_type: codec_type.clone(),
3083                port_hint: *port_hint,
3084                instantiate_fn: instantiate_fn.clone(),
3085                deserialize_fn: deserialize_fn.clone(),
3086                metadata: metadata.clone(),
3087            },
3088            HydroNode::Counter {
3089                tag,
3090                duration,
3091                prefix,
3092                input,
3093                metadata,
3094            } => HydroNode::Counter {
3095                tag: tag.clone(),
3096                duration: duration.clone(),
3097                prefix: prefix.clone(),
3098                input: Box::new(input.deep_clone(seen_tees)),
3099                metadata: metadata.clone(),
3100            },
3101        }
3102    }
3103
3104    #[cfg(feature = "build")]
3105    pub fn emit_core(
3106        &mut self,
3107        builders_or_callback: &mut BuildersOrCallback<
3108            impl FnMut(&mut HydroRoot, &mut StmtId),
3109            impl FnMut(&mut HydroNode, &mut StmtId),
3110        >,
3111        seen_tees: &mut SeenSharedNodes,
3112        built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3113        next_stmt_id: &mut StmtId,
3114        fold_hooked_idents: &mut HashSet<String>,
3115    ) -> syn::Ident {
3116        let mut ident_stack: Vec<syn::Ident> = Vec::new();
3117
3118        self.transform_bottom_up(
3119            &mut |node: &mut HydroNode| {
3120                let out_location = node.metadata().location_id.clone();
3121                match node {
3122                    HydroNode::Placeholder => {
3123                        panic!()
3124                    }
3125
3126                    HydroNode::Cast { .. } => {
3127                        // Cast passes through the input ident unchanged
3128                        // The input ident is already on the stack from processing the child
3129                        match builders_or_callback {
3130                            BuildersOrCallback::Builders(_) => {}
3131                            BuildersOrCallback::Callback(_, node_callback) => {
3132                                node_callback(node, next_stmt_id);
3133                            }
3134                        }
3135
3136                        let _ = next_stmt_id.get_and_increment();
3137                        // input_ident stays on stack as output
3138                    }
3139
3140                    HydroNode::UnboundSingleton { .. } => {
3141                        let inner_ident = ident_stack.pop().unwrap();
3142
3143                        let out_ident =
3144                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3145
3146                        match builders_or_callback {
3147                            BuildersOrCallback::Builders(graph_builders) => {
3148                                if graph_builders.singleton_intermediates() {
3149                                    let builder = graph_builders.get_dfir_mut(&out_location);
3150                                    builder.add_dfir(
3151                                        parse_quote! {
3152                                            #out_ident = #inner_ident;
3153                                        },
3154                                        None,
3155                                        None,
3156                                    );
3157                                } else {
3158                                    let builder = graph_builders.get_dfir_mut(&out_location);
3159                                    builder.add_dfir(
3160                                        parse_quote! {
3161                                            #out_ident = #inner_ident -> persist::<'static>();
3162                                        },
3163                                        None,
3164                                        None,
3165                                    );
3166                                }
3167                            }
3168                            BuildersOrCallback::Callback(_, node_callback) => {
3169                                node_callback(node, next_stmt_id);
3170                            }
3171                        }
3172
3173                        let _ = next_stmt_id.get_and_increment();
3174
3175                        ident_stack.push(out_ident);
3176                    }
3177
3178                    HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3179                        let inner_ident = ident_stack.pop().unwrap();
3180
3181                        let out_ident =
3182                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3183
3184                        match builders_or_callback {
3185                            BuildersOrCallback::Builders(graph_builders) => {
3186                                graph_builders.assert_is_consistent(
3187                                    *trusted,
3188                                    &inner.metadata().location_id,
3189                                    inner_ident,
3190                                    &out_ident,
3191                                );
3192                            }
3193                            BuildersOrCallback::Callback(_, node_callback) => {
3194                                node_callback(node, next_stmt_id);
3195                            }
3196                        }
3197
3198                        let _ = next_stmt_id.get_and_increment();
3199
3200                        ident_stack.push(out_ident);
3201                    }
3202
3203                    HydroNode::ObserveNonDet {
3204                        inner,
3205                        trusted,
3206                        metadata,
3207                        ..
3208                    } => {
3209                        let inner_ident = ident_stack.pop().unwrap();
3210
3211                        let observe_ident =
3212                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3213
3214                        match builders_or_callback {
3215                            BuildersOrCallback::Builders(graph_builders) => {
3216                                graph_builders.observe_nondet(
3217                                    *trusted,
3218                                    &inner.metadata().location_id,
3219                                    inner_ident,
3220                                    &inner.metadata().collection_kind,
3221                                    &observe_ident,
3222                                    &metadata.collection_kind,
3223                                    &metadata.op,
3224                                );
3225                            }
3226                            BuildersOrCallback::Callback(_, node_callback) => {
3227                                node_callback(node, next_stmt_id);
3228                            }
3229                        }
3230
3231                        let _ = next_stmt_id.get_and_increment();
3232
3233                        ident_stack.push(observe_ident);
3234                    }
3235
3236                    HydroNode::Batch {
3237                        inner, metadata, ..
3238                    } => {
3239                        let inner_ident = ident_stack.pop().unwrap();
3240
3241                        let batch_ident =
3242                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3243
3244                        match builders_or_callback {
3245                            BuildersOrCallback::Builders(graph_builders) => {
3246                                graph_builders.batch(
3247                                    inner_ident,
3248                                    &inner.metadata().location_id,
3249                                    &inner.metadata().collection_kind,
3250                                    &batch_ident,
3251                                    &out_location,
3252                                    &metadata.op,
3253                                    fold_hooked_idents,
3254                                );
3255                            }
3256                            BuildersOrCallback::Callback(_, node_callback) => {
3257                                node_callback(node, next_stmt_id);
3258                            }
3259                        }
3260
3261                        let _ = next_stmt_id.get_and_increment();
3262
3263                        ident_stack.push(batch_ident);
3264                    }
3265
3266                    HydroNode::YieldConcat { inner, .. } => {
3267                        let inner_ident = ident_stack.pop().unwrap();
3268
3269                        let yield_ident =
3270                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3271
3272                        match builders_or_callback {
3273                            BuildersOrCallback::Builders(graph_builders) => {
3274                                graph_builders.yield_from_tick(
3275                                    inner_ident,
3276                                    &inner.metadata().location_id,
3277                                    &inner.metadata().collection_kind,
3278                                    &yield_ident,
3279                                    &out_location,
3280                                );
3281                            }
3282                            BuildersOrCallback::Callback(_, node_callback) => {
3283                                node_callback(node, next_stmt_id);
3284                            }
3285                        }
3286
3287                        let _ = next_stmt_id.get_and_increment();
3288
3289                        ident_stack.push(yield_ident);
3290                    }
3291
3292                    HydroNode::BeginAtomic { inner, metadata } => {
3293                        let inner_ident = ident_stack.pop().unwrap();
3294
3295                        let begin_ident =
3296                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3297
3298                        match builders_or_callback {
3299                            BuildersOrCallback::Builders(graph_builders) => {
3300                                graph_builders.begin_atomic(
3301                                    inner_ident,
3302                                    &inner.metadata().location_id,
3303                                    &inner.metadata().collection_kind,
3304                                    &begin_ident,
3305                                    &out_location,
3306                                    &metadata.op,
3307                                );
3308                            }
3309                            BuildersOrCallback::Callback(_, node_callback) => {
3310                                node_callback(node, next_stmt_id);
3311                            }
3312                        }
3313
3314                        let _ = next_stmt_id.get_and_increment();
3315
3316                        ident_stack.push(begin_ident);
3317                    }
3318
3319                    HydroNode::EndAtomic { inner, .. } => {
3320                        let inner_ident = ident_stack.pop().unwrap();
3321
3322                        let end_ident =
3323                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3324
3325                        match builders_or_callback {
3326                            BuildersOrCallback::Builders(graph_builders) => {
3327                                graph_builders.end_atomic(
3328                                    inner_ident,
3329                                    &inner.metadata().location_id,
3330                                    &inner.metadata().collection_kind,
3331                                    &end_ident,
3332                                );
3333                            }
3334                            BuildersOrCallback::Callback(_, node_callback) => {
3335                                node_callback(node, next_stmt_id);
3336                            }
3337                        }
3338
3339                        let _ = next_stmt_id.get_and_increment();
3340
3341                        ident_stack.push(end_ident);
3342                    }
3343
3344                    HydroNode::Source {
3345                        source, metadata, ..
3346                    } => {
3347                        if let HydroSource::ExternalNetwork() = source {
3348                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3349                        } else {
3350                            let source_ident =
3351                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3352
3353                            let source_stmt = match source {
3354                                HydroSource::Stream(expr) => {
3355                                    debug_assert!(metadata.location_id.is_top_level());
3356                                    parse_quote! {
3357                                        #source_ident = source_stream(#expr);
3358                                    }
3359                                }
3360
3361                                HydroSource::ExternalNetwork() => {
3362                                    unreachable!()
3363                                }
3364
3365                                HydroSource::Iter(expr) => {
3366                                    if metadata.location_id.is_top_level() {
3367                                        parse_quote! {
3368                                            #source_ident = source_iter(#expr);
3369                                        }
3370                                    } else {
3371                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
3372                                        parse_quote! {
3373                                            #source_ident = source_iter(#expr) -> persist::<'static>();
3374                                        }
3375                                    }
3376                                }
3377
3378                                HydroSource::Spin() => {
3379                                    debug_assert!(metadata.location_id.is_top_level());
3380                                    parse_quote! {
3381                                        #source_ident = spin();
3382                                    }
3383                                }
3384
3385                                HydroSource::ClusterMembers(target_loc, state) => {
3386                                    debug_assert!(metadata.location_id.is_top_level());
3387
3388                                    let members_tee_ident = syn::Ident::new(
3389                                        &format!(
3390                                            "__cluster_members_tee_{}_{}",
3391                                            metadata.location_id.root().key(),
3392                                            target_loc.key(),
3393                                        ),
3394                                        Span::call_site(),
3395                                    );
3396
3397                                    match state {
3398                                        ClusterMembersState::Stream(d) => {
3399                                            parse_quote! {
3400                                                #members_tee_ident = source_stream(#d) -> tee();
3401                                                #source_ident = #members_tee_ident;
3402                                            }
3403                                        },
3404                                        ClusterMembersState::Uninit => syn::parse_quote! {
3405                                            #source_ident = source_stream(DUMMY);
3406                                        },
3407                                        ClusterMembersState::Tee(..) => parse_quote! {
3408                                            #source_ident = #members_tee_ident;
3409                                        },
3410                                    }
3411                                }
3412
3413                                HydroSource::Embedded(ident) => {
3414                                    parse_quote! {
3415                                        #source_ident = source_stream(#ident);
3416                                    }
3417                                }
3418
3419                                HydroSource::EmbeddedSingleton(ident) => {
3420                                    parse_quote! {
3421                                        #source_ident = source_iter([#ident]);
3422                                    }
3423                                }
3424                            };
3425
3426                            match builders_or_callback {
3427                                BuildersOrCallback::Builders(graph_builders) => {
3428                                    let builder = graph_builders.get_dfir_mut(&out_location);
3429                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
3430                                }
3431                                BuildersOrCallback::Callback(_, node_callback) => {
3432                                    node_callback(node, next_stmt_id);
3433                                }
3434                            }
3435
3436                            let _ = next_stmt_id.get_and_increment();
3437
3438                            ident_stack.push(source_ident);
3439                        }
3440                    }
3441
3442                    HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3443                        let source_ident =
3444                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3445
3446                        match builders_or_callback {
3447                            BuildersOrCallback::Builders(graph_builders) => {
3448                                let builder = graph_builders.get_dfir_mut(&out_location);
3449
3450                                if *first_tick_only {
3451                                    assert!(
3452                                        !metadata.location_id.is_top_level(),
3453                                        "first_tick_only SingletonSource must be inside a tick"
3454                                    );
3455                                }
3456
3457                                if *first_tick_only
3458                                    || (metadata.location_id.is_top_level()
3459                                        && metadata.collection_kind.is_bounded())
3460                                {
3461                                    builder.add_dfir(
3462                                        parse_quote! {
3463                                            #source_ident = source_iter([#value]);
3464                                        },
3465                                        None,
3466                                        Some(&next_stmt_id.to_string()),
3467                                    );
3468                                } else {
3469                                    builder.add_dfir(
3470                                        parse_quote! {
3471                                            #source_ident = source_iter([#value]) -> persist::<'static>();
3472                                        },
3473                                        None,
3474                                        Some(&next_stmt_id.to_string()),
3475                                    );
3476                                }
3477                            }
3478                            BuildersOrCallback::Callback(_, node_callback) => {
3479                                node_callback(node, next_stmt_id);
3480                            }
3481                        }
3482
3483                        let _ = next_stmt_id.get_and_increment();
3484
3485                        ident_stack.push(source_ident);
3486                    }
3487
3488                    HydroNode::CycleSource { cycle_id, .. } => {
3489                        let ident = cycle_id.as_ident();
3490
3491                        match builders_or_callback {
3492                            BuildersOrCallback::Builders(_) => {}
3493                            BuildersOrCallback::Callback(_, node_callback) => {
3494                                node_callback(node, next_stmt_id);
3495                            }
3496                        }
3497
3498                        // consume a stmt id even though we did not emit anything so that we can instrument this
3499                        let _ = next_stmt_id.get_and_increment();
3500
3501                        ident_stack.push(ident);
3502                    }
3503
3504                    HydroNode::Tee { inner, .. } => {
3505                        let ret_ident = if let Some(built_idents) =
3506                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3507                        {
3508                            match builders_or_callback {
3509                                BuildersOrCallback::Builders(_) => {}
3510                                BuildersOrCallback::Callback(_, node_callback) => {
3511                                    node_callback(node, next_stmt_id);
3512                                }
3513                            }
3514
3515                            built_idents[0].clone()
3516                        } else {
3517                            // The inner node was already processed by transform_bottom_up,
3518                            // so its ident is on the stack
3519                            let inner_ident = ident_stack.pop().unwrap();
3520
3521                            let tee_ident =
3522                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3523
3524                            built_tees.insert(
3525                                inner.0.as_ref() as *const RefCell<HydroNode>,
3526                                vec![tee_ident.clone()],
3527                            );
3528
3529                            match builders_or_callback {
3530                                BuildersOrCallback::Builders(graph_builders) => {
3531                                    // NOTE: With `forward_ref`, the fold codegen may not have
3532                                    // run yet when we reach this tee, so `fold_hooked_idents`
3533                                    // might not contain the inner ident. In that case we won't
3534                                    // propagate the "hooked" status to the tee and the
3535                                    // downstream singleton batch will use the normal
3536                                    // `SingletonHook` instead of `PassthroughSingletonHook`.
3537                                    // This is not a soundness issue: the fallback hook still
3538                                    // produces correct behavior, just with a redundant decision
3539                                    // point. TODO(https://github.com/hydro-project/hydro/issues/2856):
3540                                    // fix ordering so forward_ref folds are always processed
3541                                    // before their downstream tees.
3542                                    if fold_hooked_idents.contains(&inner_ident.to_string()) {
3543                                        fold_hooked_idents.insert(tee_ident.to_string());
3544                                    }
3545                                    let builder = graph_builders.get_dfir_mut(&out_location);
3546                                    builder.add_dfir(
3547                                        parse_quote! {
3548                                            #tee_ident = #inner_ident -> tee();
3549                                        },
3550                                        None,
3551                                        Some(&next_stmt_id.to_string()),
3552                                    );
3553                                }
3554                                BuildersOrCallback::Callback(_, node_callback) => {
3555                                    node_callback(node, next_stmt_id);
3556                                }
3557                            }
3558
3559                            tee_ident
3560                        };
3561
3562                        // we consume a stmt id regardless of if we emit the tee() operator,
3563                        // so that during rewrites we touch all recipients of the tee()
3564
3565                        let _ = next_stmt_id.get_and_increment();
3566                        ident_stack.push(ret_ident);
3567                    }
3568
3569                    HydroNode::Singleton { inner, .. } => {
3570                        let ret_ident = if let Some(built_idents) =
3571                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3572                        {
3573                            built_idents[0].clone()
3574                        } else {
3575                            let inner_ident = ident_stack.pop().unwrap();
3576
3577                            let singleton_ident =
3578                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3579
3580                            built_tees.insert(
3581                                inner.0.as_ref() as *const RefCell<HydroNode>,
3582                                vec![singleton_ident.clone()],
3583                            );
3584
3585                            match builders_or_callback {
3586                                BuildersOrCallback::Builders(graph_builders) => {
3587                                    let builder = graph_builders.get_dfir_mut(&out_location);
3588                                    builder.add_dfir(
3589                                        parse_quote! {
3590                                            #singleton_ident = #inner_ident -> singleton();
3591                                        },
3592                                        None,
3593                                        Some(&next_stmt_id.to_string()),
3594                                    );
3595                                }
3596                                BuildersOrCallback::Callback(_, node_callback) => {
3597                                    node_callback(node, next_stmt_id);
3598                                }
3599                            }
3600
3601                            singleton_ident
3602                        };
3603
3604                        // we consume a stmt id regardless of if we emit the singleton() operator,
3605                        // so that during rewrites we touch all recipients of the singleton()
3606                        let _ = next_stmt_id.get_and_increment();
3607                        ident_stack.push(ret_ident);
3608                    }
3609
3610                    HydroNode::Partition {
3611                        inner, f, is_true, ..
3612                    } => {
3613                        let is_true = *is_true; // need to copy early to avoid borrow checking issues with node
3614                        let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3615                        let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3616                            match builders_or_callback {
3617                                BuildersOrCallback::Builders(_) => {}
3618                                BuildersOrCallback::Callback(_, node_callback) => {
3619                                    node_callback(node, next_stmt_id);
3620                                }
3621                            }
3622
3623                            let idx = if is_true { 0 } else { 1 };
3624                            built_idents[idx].clone()
3625                        } else {
3626                            // The inner node was already processed by transform_bottom_up,
3627                            // so its ident is on the stack
3628                            let inner_ident = ident_stack.pop().unwrap();
3629                            let f_tokens = f.emit_tokens(&mut ident_stack);
3630
3631                            let partition_ident = syn::Ident::new(
3632                                &format!("stream_{}_partition", *next_stmt_id),
3633                                Span::call_site(),
3634                            );
3635                            let true_ident = syn::Ident::new(
3636                                &format!("stream_{}_true", *next_stmt_id),
3637                                Span::call_site(),
3638                            );
3639                            let false_ident = syn::Ident::new(
3640                                &format!("stream_{}_false", *next_stmt_id),
3641                                Span::call_site(),
3642                            );
3643
3644                            built_tees.insert(
3645                                ptr,
3646                                vec![true_ident.clone(), false_ident.clone()],
3647                            );
3648
3649                            match builders_or_callback {
3650                                BuildersOrCallback::Builders(graph_builders) => {
3651                                    let builder = graph_builders.get_dfir_mut(&out_location);
3652                                    builder.add_dfir(
3653                                        parse_quote! {
3654                                            #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3655                                            #true_ident = #partition_ident[0];
3656                                            #false_ident = #partition_ident[1];
3657                                        },
3658                                        None,
3659                                        Some(&next_stmt_id.to_string()),
3660                                    );
3661                                }
3662                                BuildersOrCallback::Callback(_, node_callback) => {
3663                                    node_callback(node, next_stmt_id);
3664                                }
3665                            }
3666
3667                            if is_true { true_ident } else { false_ident }
3668                        };
3669
3670                        let _ = next_stmt_id.get_and_increment();
3671                        ident_stack.push(ret_ident);
3672                    }
3673
3674                    HydroNode::Chain { .. } => {
3675                        // Children are processed left-to-right, so second is on top
3676                        let second_ident = ident_stack.pop().unwrap();
3677                        let first_ident = ident_stack.pop().unwrap();
3678
3679                        let chain_ident =
3680                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3681
3682                        match builders_or_callback {
3683                            BuildersOrCallback::Builders(graph_builders) => {
3684                                let builder = graph_builders.get_dfir_mut(&out_location);
3685                                builder.add_dfir(
3686                                    parse_quote! {
3687                                        #chain_ident = chain();
3688                                        #first_ident -> [0]#chain_ident;
3689                                        #second_ident -> [1]#chain_ident;
3690                                    },
3691                                    None,
3692                                    Some(&next_stmt_id.to_string()),
3693                                );
3694                            }
3695                            BuildersOrCallback::Callback(_, node_callback) => {
3696                                node_callback(node, next_stmt_id);
3697                            }
3698                        }
3699
3700                        let _ = next_stmt_id.get_and_increment();
3701
3702                        ident_stack.push(chain_ident);
3703                    }
3704
3705                    HydroNode::MergeOrdered { first, metadata, .. } => {
3706                        let second_ident = ident_stack.pop().unwrap();
3707                        let first_ident = ident_stack.pop().unwrap();
3708
3709                        let merge_ident =
3710                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3711
3712                        match builders_or_callback {
3713                            BuildersOrCallback::Builders(graph_builders) => {
3714                                graph_builders.merge_ordered(
3715                                    &first.metadata().location_id,
3716                                    first_ident,
3717                                    second_ident,
3718                                    &merge_ident,
3719                                    &first.metadata().collection_kind,
3720                                    &metadata.op,
3721                                    Some(&next_stmt_id.to_string()),
3722                                );
3723                            }
3724                            BuildersOrCallback::Callback(_, node_callback) => {
3725                                node_callback(node, next_stmt_id);
3726                            }
3727                        }
3728
3729                        let _ = next_stmt_id.get_and_increment();
3730
3731                        ident_stack.push(merge_ident);
3732                    }
3733
3734                    HydroNode::ChainFirst { .. } => {
3735                        let second_ident = ident_stack.pop().unwrap();
3736                        let first_ident = ident_stack.pop().unwrap();
3737
3738                        let chain_ident =
3739                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3740
3741                        match builders_or_callback {
3742                            BuildersOrCallback::Builders(graph_builders) => {
3743                                let builder = graph_builders.get_dfir_mut(&out_location);
3744                                builder.add_dfir(
3745                                    parse_quote! {
3746                                        #chain_ident = chain_first_n(1);
3747                                        #first_ident -> [0]#chain_ident;
3748                                        #second_ident -> [1]#chain_ident;
3749                                    },
3750                                    None,
3751                                    Some(&next_stmt_id.to_string()),
3752                                );
3753                            }
3754                            BuildersOrCallback::Callback(_, node_callback) => {
3755                                node_callback(node, next_stmt_id);
3756                            }
3757                        }
3758
3759                        let _ = next_stmt_id.get_and_increment();
3760
3761                        ident_stack.push(chain_ident);
3762                    }
3763
3764                    HydroNode::CrossSingleton { right, .. } => {
3765                        let right_ident = ident_stack.pop().unwrap();
3766                        let left_ident = ident_stack.pop().unwrap();
3767
3768                        let cross_ident =
3769                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3770
3771                        match builders_or_callback {
3772                            BuildersOrCallback::Builders(graph_builders) => {
3773                                let builder = graph_builders.get_dfir_mut(&out_location);
3774
3775                                if right.metadata().location_id.is_top_level()
3776                                    && right.metadata().collection_kind.is_bounded()
3777                                {
3778                                    builder.add_dfir(
3779                                        parse_quote! {
3780                                            #cross_ident = cross_singleton::<'static>();
3781                                            #left_ident -> [input]#cross_ident;
3782                                            #right_ident -> [single]#cross_ident;
3783                                        },
3784                                        None,
3785                                        Some(&next_stmt_id.to_string()),
3786                                    );
3787                                } else {
3788                                    builder.add_dfir(
3789                                        parse_quote! {
3790                                            #cross_ident = cross_singleton();
3791                                            #left_ident -> [input]#cross_ident;
3792                                            #right_ident -> [single]#cross_ident;
3793                                        },
3794                                        None,
3795                                        Some(&next_stmt_id.to_string()),
3796                                    );
3797                                }
3798                            }
3799                            BuildersOrCallback::Callback(_, node_callback) => {
3800                                node_callback(node, next_stmt_id);
3801                            }
3802                        }
3803
3804                        let _ = next_stmt_id.get_and_increment();
3805
3806                        ident_stack.push(cross_ident);
3807                    }
3808
3809                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3810                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3811                            parse_quote!(cross_join_multiset)
3812                        } else {
3813                            parse_quote!(join_multiset)
3814                        };
3815
3816                        let (HydroNode::CrossProduct { left, right, .. }
3817                        | HydroNode::Join { left, right, .. }) = node
3818                        else {
3819                            unreachable!()
3820                        };
3821
3822                        let is_top_level = left.metadata().location_id.is_top_level()
3823                            && right.metadata().location_id.is_top_level();
3824                        let left_lifetime = if left.metadata().location_id.is_top_level() {
3825                            quote!('static)
3826                        } else {
3827                            quote!('tick)
3828                        };
3829
3830                        let right_lifetime = if right.metadata().location_id.is_top_level() {
3831                            quote!('static)
3832                        } else {
3833                            quote!('tick)
3834                        };
3835
3836                        let right_ident = ident_stack.pop().unwrap();
3837                        let left_ident = ident_stack.pop().unwrap();
3838
3839                        let stream_ident =
3840                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3841
3842                        match builders_or_callback {
3843                            BuildersOrCallback::Builders(graph_builders) => {
3844                                let builder = graph_builders.get_dfir_mut(&out_location);
3845                                builder.add_dfir(
3846                                    if is_top_level {
3847                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
3848                                        // a multiset_delta() to negate the replay behavior
3849                                        parse_quote! {
3850                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3851                                            #left_ident -> [0]#stream_ident;
3852                                            #right_ident -> [1]#stream_ident;
3853                                        }
3854                                    } else {
3855                                        parse_quote! {
3856                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3857                                            #left_ident -> [0]#stream_ident;
3858                                            #right_ident -> [1]#stream_ident;
3859                                        }
3860                                    }
3861                                    ,
3862                                    None,
3863                                    Some(&next_stmt_id.to_string()),
3864                                );
3865                            }
3866                            BuildersOrCallback::Callback(_, node_callback) => {
3867                                node_callback(node, next_stmt_id);
3868                            }
3869                        }
3870
3871                        let _ = next_stmt_id.get_and_increment();
3872
3873                        ident_stack.push(stream_ident);
3874                    }
3875
3876                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3877                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3878                            parse_quote!(difference)
3879                        } else {
3880                            parse_quote!(anti_join)
3881                        };
3882
3883                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3884                            node
3885                        else {
3886                            unreachable!()
3887                        };
3888
3889                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3890                            quote!('static)
3891                        } else {
3892                            quote!('tick)
3893                        };
3894
3895                        let neg_ident = ident_stack.pop().unwrap();
3896                        let pos_ident = ident_stack.pop().unwrap();
3897
3898                        let stream_ident =
3899                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3900
3901                        match builders_or_callback {
3902                            BuildersOrCallback::Builders(graph_builders) => {
3903                                let builder = graph_builders.get_dfir_mut(&out_location);
3904                                builder.add_dfir(
3905                                    parse_quote! {
3906                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
3907                                        #pos_ident -> [pos]#stream_ident;
3908                                        #neg_ident -> [neg]#stream_ident;
3909                                    },
3910                                    None,
3911                                    Some(&next_stmt_id.to_string()),
3912                                );
3913                            }
3914                            BuildersOrCallback::Callback(_, node_callback) => {
3915                                node_callback(node, next_stmt_id);
3916                            }
3917                        }
3918
3919                        let _ = next_stmt_id.get_and_increment();
3920
3921                        ident_stack.push(stream_ident);
3922                    }
3923
3924                    HydroNode::JoinHalf { .. } => {
3925                        let HydroNode::JoinHalf { right, .. } = node else {
3926                            unreachable!()
3927                        };
3928
3929                        assert!(
3930                            right.metadata().collection_kind.is_bounded(),
3931                            "JoinHalf requires the right (build) side to be Bounded, got {:?}",
3932                            right.metadata().collection_kind
3933                        );
3934
3935                        let build_lifetime = if right.metadata().location_id.is_top_level() {
3936                            quote!('static)
3937                        } else {
3938                            quote!('tick)
3939                        };
3940
3941                        let build_ident = ident_stack.pop().unwrap();
3942                        let probe_ident = ident_stack.pop().unwrap();
3943
3944                        let stream_ident =
3945                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3946
3947                        match builders_or_callback {
3948                            BuildersOrCallback::Builders(graph_builders) => {
3949                                let builder = graph_builders.get_dfir_mut(&out_location);
3950                                builder.add_dfir(
3951                                    parse_quote! {
3952                                        #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
3953                                        #probe_ident -> [probe]#stream_ident;
3954                                        #build_ident -> [build]#stream_ident;
3955                                    },
3956                                    None,
3957                                    Some(&next_stmt_id.to_string()),
3958                                );
3959                            }
3960                            BuildersOrCallback::Callback(_, node_callback) => {
3961                                node_callback(node, next_stmt_id);
3962                            }
3963                        }
3964
3965                        let _ = next_stmt_id.get_and_increment();
3966
3967                        ident_stack.push(stream_ident);
3968                    }
3969
3970                    HydroNode::ResolveFutures { .. } => {
3971                        let input_ident = ident_stack.pop().unwrap();
3972
3973                        let futures_ident =
3974                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3975
3976                        match builders_or_callback {
3977                            BuildersOrCallback::Builders(graph_builders) => {
3978                                let builder = graph_builders.get_dfir_mut(&out_location);
3979                                builder.add_dfir(
3980                                    parse_quote! {
3981                                        #futures_ident = #input_ident -> resolve_futures();
3982                                    },
3983                                    None,
3984                                    Some(&next_stmt_id.to_string()),
3985                                );
3986                            }
3987                            BuildersOrCallback::Callback(_, node_callback) => {
3988                                node_callback(node, next_stmt_id);
3989                            }
3990                        }
3991
3992                        let _ = next_stmt_id.get_and_increment();
3993
3994                        ident_stack.push(futures_ident);
3995                    }
3996
3997                    HydroNode::ResolveFuturesBlocking { .. } => {
3998                        let input_ident = ident_stack.pop().unwrap();
3999
4000                        let futures_ident =
4001                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4002
4003                        match builders_or_callback {
4004                            BuildersOrCallback::Builders(graph_builders) => {
4005                                let builder = graph_builders.get_dfir_mut(&out_location);
4006                                builder.add_dfir(
4007                                    parse_quote! {
4008                                        #futures_ident = #input_ident -> resolve_futures_blocking();
4009                                    },
4010                                    None,
4011                                    Some(&next_stmt_id.to_string()),
4012                                );
4013                            }
4014                            BuildersOrCallback::Callback(_, node_callback) => {
4015                                node_callback(node, next_stmt_id);
4016                            }
4017                        }
4018
4019                        let _ = next_stmt_id.get_and_increment();
4020
4021                        ident_stack.push(futures_ident);
4022                    }
4023
4024                    HydroNode::ResolveFuturesOrdered { .. } => {
4025                        let input_ident = ident_stack.pop().unwrap();
4026
4027                        let futures_ident =
4028                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4029
4030                        match builders_or_callback {
4031                            BuildersOrCallback::Builders(graph_builders) => {
4032                                let builder = graph_builders.get_dfir_mut(&out_location);
4033                                builder.add_dfir(
4034                                    parse_quote! {
4035                                        #futures_ident = #input_ident -> resolve_futures_ordered();
4036                                    },
4037                                    None,
4038                                    Some(&next_stmt_id.to_string()),
4039                                );
4040                            }
4041                            BuildersOrCallback::Callback(_, node_callback) => {
4042                                node_callback(node, next_stmt_id);
4043                            }
4044                        }
4045
4046                        let _ = next_stmt_id.get_and_increment();
4047
4048                        ident_stack.push(futures_ident);
4049                    }
4050
4051                    HydroNode::Map { f, .. } => {
4052                        // Pop input ident (pushed last by transform_children).
4053                        let input_ident = ident_stack.pop().unwrap();
4054                        let f_tokens = f.emit_tokens(&mut ident_stack);
4055
4056                        let map_ident =
4057                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4058
4059                        match builders_or_callback {
4060                            BuildersOrCallback::Builders(graph_builders) => {
4061                                let builder = graph_builders.get_dfir_mut(&out_location);
4062                                builder.add_dfir(
4063                                    parse_quote! {
4064                                        #map_ident = #input_ident -> map(#f_tokens);
4065                                    },
4066                                    None,
4067                                    Some(&next_stmt_id.to_string()),
4068                                );
4069                            }
4070                            BuildersOrCallback::Callback(_, node_callback) => {
4071                                node_callback(node, next_stmt_id);
4072                            }
4073                        }
4074
4075                        let _ = next_stmt_id.get_and_increment();
4076
4077                        ident_stack.push(map_ident);
4078                    }
4079
4080                    HydroNode::FlatMap { f, .. } => {
4081                        let input_ident = ident_stack.pop().unwrap();
4082                        let f_tokens = f.emit_tokens(&mut ident_stack);
4083
4084                        let flat_map_ident =
4085                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4086
4087                        match builders_or_callback {
4088                            BuildersOrCallback::Builders(graph_builders) => {
4089                                let builder = graph_builders.get_dfir_mut(&out_location);
4090                                builder.add_dfir(
4091                                    parse_quote! {
4092                                        #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4093                                    },
4094                                    None,
4095                                    Some(&next_stmt_id.to_string()),
4096                                );
4097                            }
4098                            BuildersOrCallback::Callback(_, node_callback) => {
4099                                node_callback(node, next_stmt_id);
4100                            }
4101                        }
4102
4103                        let _ = next_stmt_id.get_and_increment();
4104
4105                        ident_stack.push(flat_map_ident);
4106                    }
4107
4108                    HydroNode::FlatMapStreamBlocking { f, .. } => {
4109                        let input_ident = ident_stack.pop().unwrap();
4110                        let f_tokens = f.emit_tokens(&mut ident_stack);
4111
4112                        let flat_map_stream_blocking_ident =
4113                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4114
4115                        match builders_or_callback {
4116                            BuildersOrCallback::Builders(graph_builders) => {
4117                                let builder = graph_builders.get_dfir_mut(&out_location);
4118                                builder.add_dfir(
4119                                    parse_quote! {
4120                                        #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4121                                    },
4122                                    None,
4123                                    Some(&next_stmt_id.to_string()),
4124                                );
4125                            }
4126                            BuildersOrCallback::Callback(_, node_callback) => {
4127                                node_callback(node, next_stmt_id);
4128                            }
4129                        }
4130
4131                        let _ = next_stmt_id.get_and_increment();
4132
4133                        ident_stack.push(flat_map_stream_blocking_ident);
4134                    }
4135
4136                    HydroNode::Filter { f, .. } => {
4137                        let input_ident = ident_stack.pop().unwrap();
4138                        let f_tokens = f.emit_tokens(&mut ident_stack);
4139
4140                        let filter_ident =
4141                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4142
4143                        match builders_or_callback {
4144                            BuildersOrCallback::Builders(graph_builders) => {
4145                                let builder = graph_builders.get_dfir_mut(&out_location);
4146                                builder.add_dfir(
4147                                    parse_quote! {
4148                                        #filter_ident = #input_ident -> filter(#f_tokens);
4149                                    },
4150                                    None,
4151                                    Some(&next_stmt_id.to_string()),
4152                                );
4153                            }
4154                            BuildersOrCallback::Callback(_, node_callback) => {
4155                                node_callback(node, next_stmt_id);
4156                            }
4157                        }
4158
4159                        let _ = next_stmt_id.get_and_increment();
4160
4161                        ident_stack.push(filter_ident);
4162                    }
4163
4164                    HydroNode::FilterMap { f, .. } => {
4165                        let input_ident = ident_stack.pop().unwrap();
4166                        let f_tokens = f.emit_tokens(&mut ident_stack);
4167
4168                        let filter_map_ident =
4169                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4170
4171                        match builders_or_callback {
4172                            BuildersOrCallback::Builders(graph_builders) => {
4173                                let builder = graph_builders.get_dfir_mut(&out_location);
4174                                builder.add_dfir(
4175                                    parse_quote! {
4176                                        #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4177                                    },
4178                                    None,
4179                                    Some(&next_stmt_id.to_string()),
4180                                );
4181                            }
4182                            BuildersOrCallback::Callback(_, node_callback) => {
4183                                node_callback(node, next_stmt_id);
4184                            }
4185                        }
4186
4187                        let _ = next_stmt_id.get_and_increment();
4188
4189                        ident_stack.push(filter_map_ident);
4190                    }
4191
4192                    HydroNode::Sort { .. } => {
4193                        let input_ident = ident_stack.pop().unwrap();
4194
4195                        let sort_ident =
4196                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4197
4198                        match builders_or_callback {
4199                            BuildersOrCallback::Builders(graph_builders) => {
4200                                let builder = graph_builders.get_dfir_mut(&out_location);
4201                                builder.add_dfir(
4202                                    parse_quote! {
4203                                        #sort_ident = #input_ident -> sort();
4204                                    },
4205                                    None,
4206                                    Some(&next_stmt_id.to_string()),
4207                                );
4208                            }
4209                            BuildersOrCallback::Callback(_, node_callback) => {
4210                                node_callback(node, next_stmt_id);
4211                            }
4212                        }
4213
4214                        let _ = next_stmt_id.get_and_increment();
4215
4216                        ident_stack.push(sort_ident);
4217                    }
4218
4219                    HydroNode::DeferTick { .. } => {
4220                        let input_ident = ident_stack.pop().unwrap();
4221
4222                        let defer_tick_ident =
4223                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4224
4225                        match builders_or_callback {
4226                            BuildersOrCallback::Builders(graph_builders) => {
4227                                let builder = graph_builders.get_dfir_mut(&out_location);
4228                                builder.add_dfir(
4229                                    parse_quote! {
4230                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
4231                                    },
4232                                    None,
4233                                    Some(&next_stmt_id.to_string()),
4234                                );
4235                            }
4236                            BuildersOrCallback::Callback(_, node_callback) => {
4237                                node_callback(node, next_stmt_id);
4238                            }
4239                        }
4240
4241                        let _ = next_stmt_id.get_and_increment();
4242
4243                        ident_stack.push(defer_tick_ident);
4244                    }
4245
4246                    HydroNode::Enumerate { input, .. } => {
4247                        let input_ident = ident_stack.pop().unwrap();
4248
4249                        let enumerate_ident =
4250                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4251
4252                        match builders_or_callback {
4253                            BuildersOrCallback::Builders(graph_builders) => {
4254                                let builder = graph_builders.get_dfir_mut(&out_location);
4255                                let lifetime = if input.metadata().location_id.is_top_level() {
4256                                    quote!('static)
4257                                } else {
4258                                    quote!('tick)
4259                                };
4260                                builder.add_dfir(
4261                                    parse_quote! {
4262                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4263                                    },
4264                                    None,
4265                                    Some(&next_stmt_id.to_string()),
4266                                );
4267                            }
4268                            BuildersOrCallback::Callback(_, node_callback) => {
4269                                node_callback(node, next_stmt_id);
4270                            }
4271                        }
4272
4273                        let _ = next_stmt_id.get_and_increment();
4274
4275                        ident_stack.push(enumerate_ident);
4276                    }
4277
4278                    HydroNode::Inspect { f, .. } => {
4279                        let input_ident = ident_stack.pop().unwrap();
4280                        let f_tokens = f.emit_tokens(&mut ident_stack);
4281
4282                        let inspect_ident =
4283                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4284
4285                        match builders_or_callback {
4286                            BuildersOrCallback::Builders(graph_builders) => {
4287                                let builder = graph_builders.get_dfir_mut(&out_location);
4288                                builder.add_dfir(
4289                                    parse_quote! {
4290                                        #inspect_ident = #input_ident -> inspect(#f_tokens);
4291                                    },
4292                                    None,
4293                                    Some(&next_stmt_id.to_string()),
4294                                );
4295                            }
4296                            BuildersOrCallback::Callback(_, node_callback) => {
4297                                node_callback(node, next_stmt_id);
4298                            }
4299                        }
4300
4301                        let _ = next_stmt_id.get_and_increment();
4302
4303                        ident_stack.push(inspect_ident);
4304                    }
4305
4306                    HydroNode::Unique { input, .. } => {
4307                        let input_ident = ident_stack.pop().unwrap();
4308
4309                        let unique_ident =
4310                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4311
4312                        match builders_or_callback {
4313                            BuildersOrCallback::Builders(graph_builders) => {
4314                                let builder = graph_builders.get_dfir_mut(&out_location);
4315                                let lifetime = if input.metadata().location_id.is_top_level() {
4316                                    quote!('static)
4317                                } else {
4318                                    quote!('tick)
4319                                };
4320
4321                                builder.add_dfir(
4322                                    parse_quote! {
4323                                        #unique_ident = #input_ident -> unique::<#lifetime>();
4324                                    },
4325                                    None,
4326                                    Some(&next_stmt_id.to_string()),
4327                                );
4328                            }
4329                            BuildersOrCallback::Callback(_, node_callback) => {
4330                                node_callback(node, next_stmt_id);
4331                            }
4332                        }
4333
4334                        let _ = next_stmt_id.get_and_increment();
4335
4336                        ident_stack.push(unique_ident);
4337                    }
4338
4339                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4340                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4341                            if input.metadata().location_id.is_top_level()
4342                                && input.metadata().collection_kind.is_bounded()
4343                            {
4344                                parse_quote!(fold_no_replay)
4345                            } else {
4346                                parse_quote!(fold)
4347                            }
4348                        } else if matches!(node, HydroNode::Scan { .. }) {
4349                            parse_quote!(scan)
4350                        } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4351                            parse_quote!(scan_async_blocking)
4352                        } else if let HydroNode::FoldKeyed { input, .. } = node {
4353                            if input.metadata().location_id.is_top_level()
4354                                && input.metadata().collection_kind.is_bounded()
4355                            {
4356                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
4357                            } else {
4358                                parse_quote!(fold_keyed)
4359                            }
4360                        } else {
4361                            unreachable!()
4362                        };
4363
4364                        let (HydroNode::Fold { input, .. }
4365                        | HydroNode::FoldKeyed { input, .. }
4366                        | HydroNode::Scan { input, .. }
4367                        | HydroNode::ScanAsyncBlocking { input, .. }) = node
4368                        else {
4369                            unreachable!()
4370                        };
4371
4372                        let lifetime = if input.metadata().location_id.is_top_level() {
4373                            quote!('static)
4374                        } else {
4375                            quote!('tick)
4376                        };
4377
4378                        let input_ident = ident_stack.pop().unwrap();
4379
4380                        let (HydroNode::Fold { init, acc, .. }
4381                        | HydroNode::FoldKeyed { init, acc, .. }
4382                        | HydroNode::Scan { init, acc, .. }
4383                        | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4384                        else {
4385                            unreachable!()
4386                        };
4387
4388                        let acc_tokens = acc.emit_tokens(&mut ident_stack);
4389                        let init_tokens = init.emit_tokens(&mut ident_stack);
4390
4391                        let fold_ident =
4392                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4393
4394                        match builders_or_callback {
4395                            BuildersOrCallback::Builders(graph_builders) => {
4396                                if matches!(node, HydroNode::Fold { .. })
4397                                    && node.metadata().location_id.is_top_level()
4398                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4399                                    && graph_builders.singleton_intermediates()
4400                                    && !node.metadata().collection_kind.is_bounded()
4401                                {
4402                                    let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4403                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4404                                        &input.metadata().location_id,
4405                                        &input_ident,
4406                                        &input.metadata().collection_kind,
4407                                        &node.metadata().op,
4408                                    );
4409
4410                                    let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4411                                        let acc: syn::Expr = parse_quote!({
4412                                            let mut __inner = #acc_tokens;
4413                                            move |__state, __batch: Vec<_>| {
4414                                                if __batch.is_empty() {
4415                                                    return None;
4416                                                }
4417                                                for __value in __batch {
4418                                                    __inner(__state, __value);
4419                                                }
4420                                                Some(__state.clone())
4421                                            }
4422                                        });
4423                                        (hooked, acc)
4424                                    } else {
4425                                        let acc: syn::Expr = parse_quote!({
4426                                            let mut __inner = #acc_tokens;
4427                                            move |__state, __value| {
4428                                                __inner(__state, __value);
4429                                                Some(__state.clone())
4430                                            }
4431                                        });
4432                                        (&input_ident, acc)
4433                                    };
4434
4435                                    let builder = graph_builders.get_dfir_mut(&out_location);
4436                                    builder.add_dfir(
4437                                        parse_quote! {
4438                                            source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4439                                            #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4440                                            #fold_ident = chain();
4441                                        },
4442                                        None,
4443                                        Some(&next_stmt_id.to_string()),
4444                                    );
4445
4446                                    if hooked_input_ident.is_some() {
4447                                        fold_hooked_idents.insert(fold_ident.to_string());
4448                                    }
4449                                } else if matches!(node, HydroNode::FoldKeyed { .. })
4450                                    && node.metadata().location_id.is_top_level()
4451                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4452                                    && graph_builders.singleton_intermediates()
4453                                    && !node.metadata().collection_kind.is_bounded()
4454                                {
4455                                    let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4456                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4457                                        &input.metadata().location_id,
4458                                        &input_ident,
4459                                        &input.metadata().collection_kind,
4460                                        &node.metadata().op,
4461                                    );
4462                                    let builder = graph_builders.get_dfir_mut(&out_location);
4463
4464                                    let wrapped_acc: syn::Expr = parse_quote!({
4465                                        let mut __init = #init_tokens;
4466                                        let mut __inner = #acc_tokens;
4467                                        move |__state, __kv: (_, _)| {
4468                                            // TODO(shadaj): we can avoid the clone when the entry exists
4469                                            let __state = __state
4470                                                .entry(::std::clone::Clone::clone(&__kv.0))
4471                                                .or_insert_with(|| (__init)());
4472                                            __inner(__state, __kv.1);
4473                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4474                                        }
4475                                    });
4476
4477                                    if let Some(hooked_input_ident) = hooked_input_ident {
4478                                        builder.add_dfir(
4479                                            parse_quote! {
4480                                                #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4481                                            },
4482                                            None,
4483                                            Some(&next_stmt_id.to_string()),
4484                                        );
4485
4486                                        fold_hooked_idents.insert(fold_ident.to_string());
4487                                    } else {
4488                                        builder.add_dfir(
4489                                            parse_quote! {
4490                                                #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4491                                            },
4492                                            None,
4493                                            Some(&next_stmt_id.to_string()),
4494                                        );
4495                                    }
4496                                } else if (matches!(node, HydroNode::Fold { .. })
4497                                    || matches!(node, HydroNode::FoldKeyed { .. }))
4498                                    && !node.metadata().location_id.is_top_level()
4499                                    && graph_builders.singleton_intermediates()
4500                                {
4501                                    let input_ref = match &*node {
4502                                        HydroNode::Fold { input, .. } => input,
4503                                        HydroNode::FoldKeyed { input, .. } => input,
4504                                        _ => unreachable!(),
4505                                    };
4506                                    let hooked_input_ident = graph_builders.emit_fold_hook(
4507                                        &input_ref.metadata().location_id,
4508                                        &input_ident,
4509                                        &input_ref.metadata().collection_kind,
4510                                        &node.metadata().op,
4511                                    );
4512
4513                                    let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4514                                    let builder = graph_builders.get_dfir_mut(&out_location);
4515                                    builder.add_dfir(
4516                                        parse_quote! {
4517                                            #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4518                                        },
4519                                        None,
4520                                        Some(&next_stmt_id.to_string()),
4521                                    );
4522                                } else {
4523                                    let builder = graph_builders.get_dfir_mut(&out_location);
4524                                    builder.add_dfir(
4525                                        parse_quote! {
4526                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4527                                        },
4528                                        None,
4529                                        Some(&next_stmt_id.to_string()),
4530                                    );
4531                                }
4532                            }
4533                            BuildersOrCallback::Callback(_, node_callback) => {
4534                                node_callback(node, next_stmt_id);
4535                            }
4536                        }
4537
4538                        let _ = next_stmt_id.get_and_increment();
4539
4540                        ident_stack.push(fold_ident);
4541                    }
4542
4543                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4544                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4545                            if input.metadata().location_id.is_top_level()
4546                                && input.metadata().collection_kind.is_bounded()
4547                            {
4548                                parse_quote!(reduce_no_replay)
4549                            } else {
4550                                parse_quote!(reduce)
4551                            }
4552                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
4553                            if input.metadata().location_id.is_top_level()
4554                                && input.metadata().collection_kind.is_bounded()
4555                            {
4556                                todo!(
4557                                    "Calling keyed reduce on a top-level bounded collection is not supported"
4558                                )
4559                            } else {
4560                                parse_quote!(reduce_keyed)
4561                            }
4562                        } else {
4563                            unreachable!()
4564                        };
4565
4566                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4567                        else {
4568                            unreachable!()
4569                        };
4570
4571                        let lifetime = if input.metadata().location_id.is_top_level() {
4572                            quote!('static)
4573                        } else {
4574                            quote!('tick)
4575                        };
4576
4577                        let input_ident = ident_stack.pop().unwrap();
4578
4579                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4580                        else {
4581                            unreachable!()
4582                        };
4583
4584                        let f_tokens = f.emit_tokens(&mut ident_stack);
4585
4586                        let reduce_ident =
4587                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4588
4589                        match builders_or_callback {
4590                            BuildersOrCallback::Builders(graph_builders) => {
4591                                if matches!(node, HydroNode::Reduce { .. })
4592                                    && node.metadata().location_id.is_top_level()
4593                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4594                                    && graph_builders.singleton_intermediates()
4595                                    && !node.metadata().collection_kind.is_bounded()
4596                                {
4597                                    todo!(
4598                                        "Reduce with optional intermediates is not yet supported in simulator"
4599                                    );
4600                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
4601                                    && node.metadata().location_id.is_top_level()
4602                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4603                                    && graph_builders.singleton_intermediates()
4604                                    && !node.metadata().collection_kind.is_bounded()
4605                                {
4606                                    todo!(
4607                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
4608                                    );
4609                                } else {
4610                                    let builder = graph_builders.get_dfir_mut(&out_location);
4611                                    builder.add_dfir(
4612                                        parse_quote! {
4613                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4614                                        },
4615                                        None,
4616                                        Some(&next_stmt_id.to_string()),
4617                                    );
4618                                }
4619                            }
4620                            BuildersOrCallback::Callback(_, node_callback) => {
4621                                node_callback(node, next_stmt_id);
4622                            }
4623                        }
4624
4625                        let _ = next_stmt_id.get_and_increment();
4626
4627                        ident_stack.push(reduce_ident);
4628                    }
4629
4630                    HydroNode::ReduceKeyedWatermark {
4631                        f,
4632                        input,
4633                        metadata,
4634                        ..
4635                    } => {
4636                        let lifetime = if input.metadata().location_id.is_top_level() {
4637                            quote!('static)
4638                        } else {
4639                            quote!('tick)
4640                        };
4641
4642                        // watermark is processed second, so it's on top
4643                        let watermark_ident = ident_stack.pop().unwrap();
4644                        let input_ident = ident_stack.pop().unwrap();
4645                        let f_tokens = f.emit_tokens(&mut ident_stack);
4646
4647                        let chain_ident = syn::Ident::new(
4648                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
4649                            Span::call_site(),
4650                        );
4651
4652                        let fold_ident =
4653                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4654
4655                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4656                            && input.metadata().collection_kind.is_bounded()
4657                        {
4658                            parse_quote!(fold_no_replay)
4659                        } else {
4660                            parse_quote!(fold)
4661                        };
4662
4663                        match builders_or_callback {
4664                            BuildersOrCallback::Builders(graph_builders) => {
4665                                if metadata.location_id.is_top_level()
4666                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4667                                    && graph_builders.singleton_intermediates()
4668                                    && !metadata.collection_kind.is_bounded()
4669                                {
4670                                    todo!(
4671                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4672                                    )
4673                                } else {
4674                                    let builder = graph_builders.get_dfir_mut(&out_location);
4675                                    builder.add_dfir(
4676                                        parse_quote! {
4677                                            #chain_ident = chain();
4678                                            #input_ident
4679                                                -> map(|x| (Some(x), None))
4680                                                -> [0]#chain_ident;
4681                                            #watermark_ident
4682                                                -> map(|watermark| (None, Some(watermark)))
4683                                                -> [1]#chain_ident;
4684
4685                                            #fold_ident = #chain_ident
4686                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4687                                                    let __reduce_keyed_fn = #f_tokens;
4688                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4689                                                        if let Some((k, v)) = opt_payload {
4690                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4691                                                                if k < curr_watermark {
4692                                                                    return;
4693                                                                }
4694                                                            }
4695                                                            match map.entry(k) {
4696                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
4697                                                                    e.insert(v);
4698                                                                }
4699                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
4700                                                                    __reduce_keyed_fn(e.get_mut(), v);
4701                                                                }
4702                                                            }
4703                                                        } else {
4704                                                            let watermark = opt_watermark.unwrap();
4705                                                            if let Some(curr_watermark) = *opt_curr_watermark {
4706                                                                if watermark <= curr_watermark {
4707                                                                    return;
4708                                                                }
4709                                                            }
4710                                                            map.retain(|k, _| *k >= watermark);
4711                                                            *opt_curr_watermark = Some(watermark);
4712                                                        }
4713                                                    }
4714                                                })
4715                                                -> flat_map(|(map, _curr_watermark)| map);
4716                                        },
4717                                        None,
4718                                        Some(&next_stmt_id.to_string()),
4719                                    );
4720                                }
4721                            }
4722                            BuildersOrCallback::Callback(_, node_callback) => {
4723                                node_callback(node, next_stmt_id);
4724                            }
4725                        }
4726
4727                        let _ = next_stmt_id.get_and_increment();
4728
4729                        ident_stack.push(fold_ident);
4730                    }
4731
4732                    HydroNode::Network {
4733                        networking_info,
4734                        serialize_fn: serialize_pipeline,
4735                        instantiate_fn,
4736                        deserialize_fn: deserialize_pipeline,
4737                        input,
4738                        ..
4739                    } => {
4740                        let input_ident = ident_stack.pop().unwrap();
4741
4742                        let receiver_stream_ident =
4743                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4744
4745                        match builders_or_callback {
4746                            BuildersOrCallback::Builders(graph_builders) => {
4747                                let (sink_expr, source_expr) = match instantiate_fn {
4748                                    DebugInstantiate::Building => (
4749                                        syn::parse_quote!(DUMMY_SINK),
4750                                        syn::parse_quote!(DUMMY_SOURCE),
4751                                    ),
4752
4753                                    DebugInstantiate::Finalized(finalized) => {
4754                                        (finalized.sink.clone(), finalized.source.clone())
4755                                    }
4756                                };
4757
4758                                graph_builders.create_network(
4759                                    &input.metadata().location_id,
4760                                    &out_location,
4761                                    input_ident,
4762                                    &receiver_stream_ident,
4763                                    serialize_pipeline.as_ref(),
4764                                    sink_expr,
4765                                    source_expr,
4766                                    deserialize_pipeline.as_ref(),
4767                                    *next_stmt_id,
4768                                    networking_info,
4769                                );
4770                            }
4771                            BuildersOrCallback::Callback(_, node_callback) => {
4772                                node_callback(node, next_stmt_id);
4773                            }
4774                        }
4775
4776                        let _ = next_stmt_id.get_and_increment();
4777
4778                        ident_stack.push(receiver_stream_ident);
4779                    }
4780
4781                    HydroNode::ExternalInput {
4782                        instantiate_fn,
4783                        deserialize_fn: deserialize_pipeline,
4784                        ..
4785                    } => {
4786                        let receiver_stream_ident =
4787                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4788
4789                        match builders_or_callback {
4790                            BuildersOrCallback::Builders(graph_builders) => {
4791                                let (_, source_expr) = match instantiate_fn {
4792                                    DebugInstantiate::Building => (
4793                                        syn::parse_quote!(DUMMY_SINK),
4794                                        syn::parse_quote!(DUMMY_SOURCE),
4795                                    ),
4796
4797                                    DebugInstantiate::Finalized(finalized) => {
4798                                        (finalized.sink.clone(), finalized.source.clone())
4799                                    }
4800                                };
4801
4802                                graph_builders.create_external_source(
4803                                    &out_location,
4804                                    source_expr,
4805                                    &receiver_stream_ident,
4806                                    deserialize_pipeline.as_ref(),
4807                                    *next_stmt_id,
4808                                );
4809                            }
4810                            BuildersOrCallback::Callback(_, node_callback) => {
4811                                node_callback(node, next_stmt_id);
4812                            }
4813                        }
4814
4815                        let _ = next_stmt_id.get_and_increment();
4816
4817                        ident_stack.push(receiver_stream_ident);
4818                    }
4819
4820                    HydroNode::Counter {
4821                        tag,
4822                        duration,
4823                        prefix,
4824                        ..
4825                    } => {
4826                        let input_ident = ident_stack.pop().unwrap();
4827
4828                        let counter_ident =
4829                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4830
4831                        match builders_or_callback {
4832                            BuildersOrCallback::Builders(graph_builders) => {
4833                                let arg = format!("{}({})", prefix, tag);
4834                                let builder = graph_builders.get_dfir_mut(&out_location);
4835                                builder.add_dfir(
4836                                    parse_quote! {
4837                                        #counter_ident = #input_ident -> _counter(#arg, #duration);
4838                                    },
4839                                    None,
4840                                    Some(&next_stmt_id.to_string()),
4841                                );
4842                            }
4843                            BuildersOrCallback::Callback(_, node_callback) => {
4844                                node_callback(node, next_stmt_id);
4845                            }
4846                        }
4847
4848                        let _ = next_stmt_id.get_and_increment();
4849
4850                        ident_stack.push(counter_ident);
4851                    }
4852                }
4853            },
4854            seen_tees,
4855            false,
4856        );
4857
4858        let ret = ident_stack
4859            .pop()
4860            .expect("ident_stack should have exactly one element after traversal");
4861        assert!(
4862            ident_stack.is_empty(),
4863            "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
4864             This indicates a bug in the code gen: some node pushed idents that were never consumed.",
4865            ident_stack.len()
4866        );
4867        ret
4868    }
4869
4870    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4871        match self {
4872            HydroNode::Placeholder => {
4873                panic!()
4874            }
4875            HydroNode::Cast { .. }
4876            | HydroNode::ObserveNonDet { .. }
4877            | HydroNode::UnboundSingleton { .. }
4878            | HydroNode::AssertIsConsistent { .. } => {}
4879            HydroNode::Source { source, .. } => match source {
4880                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4881                HydroSource::ExternalNetwork()
4882                | HydroSource::Spin()
4883                | HydroSource::ClusterMembers(_, _)
4884                | HydroSource::Embedded(_)
4885                | HydroSource::EmbeddedSingleton(_) => {} // TODO: what goes here?
4886            },
4887            HydroNode::SingletonSource { value, .. } => {
4888                transform(value);
4889            }
4890            HydroNode::CycleSource { .. }
4891            | HydroNode::Tee { .. }
4892            | HydroNode::Singleton { .. }
4893            | HydroNode::YieldConcat { .. }
4894            | HydroNode::BeginAtomic { .. }
4895            | HydroNode::EndAtomic { .. }
4896            | HydroNode::Batch { .. }
4897            | HydroNode::Chain { .. }
4898            | HydroNode::MergeOrdered { .. }
4899            | HydroNode::ChainFirst { .. }
4900            | HydroNode::CrossProduct { .. }
4901            | HydroNode::CrossSingleton { .. }
4902            | HydroNode::ResolveFutures { .. }
4903            | HydroNode::ResolveFuturesBlocking { .. }
4904            | HydroNode::ResolveFuturesOrdered { .. }
4905            | HydroNode::Join { .. }
4906            | HydroNode::JoinHalf { .. }
4907            | HydroNode::Difference { .. }
4908            | HydroNode::AntiJoin { .. }
4909            | HydroNode::DeferTick { .. }
4910            | HydroNode::Enumerate { .. }
4911            | HydroNode::Unique { .. }
4912            | HydroNode::Sort { .. } => {}
4913            HydroNode::Map { f, .. }
4914            | HydroNode::FlatMap { f, .. }
4915            | HydroNode::FlatMapStreamBlocking { f, .. }
4916            | HydroNode::Filter { f, .. }
4917            | HydroNode::FilterMap { f, .. }
4918            | HydroNode::Inspect { f, .. }
4919            | HydroNode::Partition { f, .. }
4920            | HydroNode::Reduce { f, .. }
4921            | HydroNode::ReduceKeyed { f, .. }
4922            | HydroNode::ReduceKeyedWatermark { f, .. } => {
4923                transform(&mut f.expr);
4924            }
4925            HydroNode::Fold { init, acc, .. }
4926            | HydroNode::Scan { init, acc, .. }
4927            | HydroNode::ScanAsyncBlocking { init, acc, .. }
4928            | HydroNode::FoldKeyed { init, acc, .. } => {
4929                transform(&mut init.expr);
4930                transform(&mut acc.expr);
4931            }
4932            HydroNode::Network {
4933                serialize_fn,
4934                deserialize_fn,
4935                ..
4936            } => {
4937                if let Some(serialize_fn) = serialize_fn {
4938                    transform(serialize_fn);
4939                }
4940                if let Some(deserialize_fn) = deserialize_fn {
4941                    transform(deserialize_fn);
4942                }
4943            }
4944            HydroNode::ExternalInput { deserialize_fn, .. } => {
4945                if let Some(deserialize_fn) = deserialize_fn {
4946                    transform(deserialize_fn);
4947                }
4948            }
4949            HydroNode::Counter { duration, .. } => {
4950                transform(duration);
4951            }
4952        }
4953    }
4954
4955    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4956        &self.metadata().op
4957    }
4958
4959    pub fn metadata(&self) -> &HydroIrMetadata {
4960        match self {
4961            HydroNode::Placeholder => {
4962                panic!()
4963            }
4964            HydroNode::Cast { metadata, .. }
4965            | HydroNode::ObserveNonDet { metadata, .. }
4966            | HydroNode::AssertIsConsistent { metadata, .. }
4967            | HydroNode::UnboundSingleton { metadata, .. }
4968            | HydroNode::Source { metadata, .. }
4969            | HydroNode::SingletonSource { metadata, .. }
4970            | HydroNode::CycleSource { metadata, .. }
4971            | HydroNode::Tee { metadata, .. }
4972            | HydroNode::Singleton { metadata, .. }
4973            | HydroNode::Partition { metadata, .. }
4974            | HydroNode::YieldConcat { metadata, .. }
4975            | HydroNode::BeginAtomic { metadata, .. }
4976            | HydroNode::EndAtomic { metadata, .. }
4977            | HydroNode::Batch { metadata, .. }
4978            | HydroNode::Chain { metadata, .. }
4979            | HydroNode::MergeOrdered { metadata, .. }
4980            | HydroNode::ChainFirst { metadata, .. }
4981            | HydroNode::CrossProduct { metadata, .. }
4982            | HydroNode::CrossSingleton { metadata, .. }
4983            | HydroNode::Join { metadata, .. }
4984            | HydroNode::JoinHalf { metadata, .. }
4985            | HydroNode::Difference { metadata, .. }
4986            | HydroNode::AntiJoin { metadata, .. }
4987            | HydroNode::ResolveFutures { metadata, .. }
4988            | HydroNode::ResolveFuturesBlocking { metadata, .. }
4989            | HydroNode::ResolveFuturesOrdered { metadata, .. }
4990            | HydroNode::Map { metadata, .. }
4991            | HydroNode::FlatMap { metadata, .. }
4992            | HydroNode::FlatMapStreamBlocking { metadata, .. }
4993            | HydroNode::Filter { metadata, .. }
4994            | HydroNode::FilterMap { metadata, .. }
4995            | HydroNode::DeferTick { metadata, .. }
4996            | HydroNode::Enumerate { metadata, .. }
4997            | HydroNode::Inspect { metadata, .. }
4998            | HydroNode::Unique { metadata, .. }
4999            | HydroNode::Sort { metadata, .. }
5000            | HydroNode::Scan { metadata, .. }
5001            | HydroNode::ScanAsyncBlocking { metadata, .. }
5002            | HydroNode::Fold { metadata, .. }
5003            | HydroNode::FoldKeyed { metadata, .. }
5004            | HydroNode::Reduce { metadata, .. }
5005            | HydroNode::ReduceKeyed { metadata, .. }
5006            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5007            | HydroNode::ExternalInput { metadata, .. }
5008            | HydroNode::Network { metadata, .. }
5009            | HydroNode::Counter { metadata, .. } => metadata,
5010        }
5011    }
5012
5013    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
5014        &mut self.metadata_mut().op
5015    }
5016
5017    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
5018        match self {
5019            HydroNode::Placeholder => {
5020                panic!()
5021            }
5022            HydroNode::Cast { metadata, .. }
5023            | HydroNode::ObserveNonDet { metadata, .. }
5024            | HydroNode::AssertIsConsistent { metadata, .. }
5025            | HydroNode::UnboundSingleton { metadata, .. }
5026            | HydroNode::Source { metadata, .. }
5027            | HydroNode::SingletonSource { metadata, .. }
5028            | HydroNode::CycleSource { metadata, .. }
5029            | HydroNode::Tee { metadata, .. }
5030            | HydroNode::Singleton { metadata, .. }
5031            | HydroNode::Partition { metadata, .. }
5032            | HydroNode::YieldConcat { metadata, .. }
5033            | HydroNode::BeginAtomic { metadata, .. }
5034            | HydroNode::EndAtomic { metadata, .. }
5035            | HydroNode::Batch { metadata, .. }
5036            | HydroNode::Chain { metadata, .. }
5037            | HydroNode::MergeOrdered { metadata, .. }
5038            | HydroNode::ChainFirst { metadata, .. }
5039            | HydroNode::CrossProduct { metadata, .. }
5040            | HydroNode::CrossSingleton { metadata, .. }
5041            | HydroNode::Join { metadata, .. }
5042            | HydroNode::JoinHalf { metadata, .. }
5043            | HydroNode::Difference { metadata, .. }
5044            | HydroNode::AntiJoin { metadata, .. }
5045            | HydroNode::ResolveFutures { metadata, .. }
5046            | HydroNode::ResolveFuturesBlocking { metadata, .. }
5047            | HydroNode::ResolveFuturesOrdered { metadata, .. }
5048            | HydroNode::Map { metadata, .. }
5049            | HydroNode::FlatMap { metadata, .. }
5050            | HydroNode::FlatMapStreamBlocking { metadata, .. }
5051            | HydroNode::Filter { metadata, .. }
5052            | HydroNode::FilterMap { metadata, .. }
5053            | HydroNode::DeferTick { metadata, .. }
5054            | HydroNode::Enumerate { metadata, .. }
5055            | HydroNode::Inspect { metadata, .. }
5056            | HydroNode::Unique { metadata, .. }
5057            | HydroNode::Sort { metadata, .. }
5058            | HydroNode::Scan { metadata, .. }
5059            | HydroNode::ScanAsyncBlocking { metadata, .. }
5060            | HydroNode::Fold { metadata, .. }
5061            | HydroNode::FoldKeyed { metadata, .. }
5062            | HydroNode::Reduce { metadata, .. }
5063            | HydroNode::ReduceKeyed { metadata, .. }
5064            | HydroNode::ReduceKeyedWatermark { metadata, .. }
5065            | HydroNode::ExternalInput { metadata, .. }
5066            | HydroNode::Network { metadata, .. }
5067            | HydroNode::Counter { metadata, .. } => metadata,
5068        }
5069    }
5070
5071    pub fn input(&self) -> Vec<&HydroNode> {
5072        match self {
5073            HydroNode::Placeholder => {
5074                panic!()
5075            }
5076            HydroNode::Source { .. }
5077            | HydroNode::SingletonSource { .. }
5078            | HydroNode::ExternalInput { .. }
5079            | HydroNode::CycleSource { .. }
5080            | HydroNode::Tee { .. }
5081            | HydroNode::Singleton { .. }
5082            | HydroNode::Partition { .. } => {
5083                // Tee/Partition should find their input in separate special ways
5084                vec![]
5085            }
5086            HydroNode::Cast { inner, .. }
5087            | HydroNode::ObserveNonDet { inner, .. }
5088            | HydroNode::YieldConcat { inner, .. }
5089            | HydroNode::BeginAtomic { inner, .. }
5090            | HydroNode::EndAtomic { inner, .. }
5091            | HydroNode::Batch { inner, .. }
5092            | HydroNode::UnboundSingleton { inner, .. }
5093            | HydroNode::AssertIsConsistent { inner, .. } => {
5094                vec![inner]
5095            }
5096            HydroNode::Chain { first, second, .. } => {
5097                vec![first, second]
5098            }
5099            HydroNode::MergeOrdered { first, second, .. } => {
5100                vec![first, second]
5101            }
5102            HydroNode::ChainFirst { first, second, .. } => {
5103                vec![first, second]
5104            }
5105            HydroNode::CrossProduct { left, right, .. }
5106            | HydroNode::CrossSingleton { left, right, .. }
5107            | HydroNode::Join { left, right, .. }
5108            | HydroNode::JoinHalf { left, right, .. } => {
5109                vec![left, right]
5110            }
5111            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5112                vec![pos, neg]
5113            }
5114            HydroNode::Map { input, .. }
5115            | HydroNode::FlatMap { input, .. }
5116            | HydroNode::FlatMapStreamBlocking { input, .. }
5117            | HydroNode::Filter { input, .. }
5118            | HydroNode::FilterMap { input, .. }
5119            | HydroNode::Sort { input, .. }
5120            | HydroNode::DeferTick { input, .. }
5121            | HydroNode::Enumerate { input, .. }
5122            | HydroNode::Inspect { input, .. }
5123            | HydroNode::Unique { input, .. }
5124            | HydroNode::Network { input, .. }
5125            | HydroNode::Counter { input, .. }
5126            | HydroNode::ResolveFutures { input, .. }
5127            | HydroNode::ResolveFuturesBlocking { input, .. }
5128            | HydroNode::ResolveFuturesOrdered { input, .. }
5129            | HydroNode::Fold { input, .. }
5130            | HydroNode::FoldKeyed { input, .. }
5131            | HydroNode::Reduce { input, .. }
5132            | HydroNode::ReduceKeyed { input, .. }
5133            | HydroNode::Scan { input, .. }
5134            | HydroNode::ScanAsyncBlocking { input, .. } => {
5135                vec![input]
5136            }
5137            HydroNode::ReduceKeyedWatermark {
5138                input, watermark, ..
5139            } => {
5140                vec![input, watermark]
5141            }
5142        }
5143    }
5144
5145    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5146        self.input()
5147            .iter()
5148            .map(|input_node| input_node.metadata())
5149            .collect()
5150    }
5151
5152    /// Returns `true` if this node is a Tee or Partition whose inner Rc
5153    /// has other live references, meaning the upstream is already driven
5154    /// by another consumer and does not need a Null sink.
5155    pub fn is_shared_with_others(&self) -> bool {
5156        match self {
5157            HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5158                Rc::strong_count(&inner.0) > 1
5159            }
5160            // A zero-output singleton() is valid in DFIR (it drains itself at
5161            // end of tick), so it doesn't need to be driven by another consumer.
5162            HydroNode::Singleton { .. } => false,
5163            _ => false,
5164        }
5165    }
5166
5167    pub fn print_root(&self) -> String {
5168        match self {
5169            HydroNode::Placeholder => {
5170                panic!()
5171            }
5172            HydroNode::Cast { .. } => "Cast()".to_owned(),
5173            HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5174            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5175            HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5176            HydroNode::Source { source, .. } => format!("Source({:?})", source),
5177            HydroNode::SingletonSource {
5178                value,
5179                first_tick_only,
5180                ..
5181            } => format!(
5182                "SingletonSource({:?}, first_tick_only={})",
5183                value, first_tick_only
5184            ),
5185            HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5186            HydroNode::Tee { inner, .. } => {
5187                format!("Tee({})", inner.0.borrow().print_root())
5188            }
5189            HydroNode::Singleton { inner, .. } => {
5190                format!("Singleton({})", inner.0.borrow().print_root())
5191            }
5192            HydroNode::Partition { f, is_true, .. } => {
5193                format!("Partition({:?}, is_true={})", f, is_true)
5194            }
5195            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5196            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5197            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5198            HydroNode::Batch { .. } => "Batch()".to_owned(),
5199            HydroNode::Chain { first, second, .. } => {
5200                format!("Chain({}, {})", first.print_root(), second.print_root())
5201            }
5202            HydroNode::MergeOrdered { first, second, .. } => {
5203                format!(
5204                    "MergeOrdered({}, {})",
5205                    first.print_root(),
5206                    second.print_root()
5207                )
5208            }
5209            HydroNode::ChainFirst { first, second, .. } => {
5210                format!(
5211                    "ChainFirst({}, {})",
5212                    first.print_root(),
5213                    second.print_root()
5214                )
5215            }
5216            HydroNode::CrossProduct { left, right, .. } => {
5217                format!(
5218                    "CrossProduct({}, {})",
5219                    left.print_root(),
5220                    right.print_root()
5221                )
5222            }
5223            HydroNode::CrossSingleton { left, right, .. } => {
5224                format!(
5225                    "CrossSingleton({}, {})",
5226                    left.print_root(),
5227                    right.print_root()
5228                )
5229            }
5230            HydroNode::Join { left, right, .. } => {
5231                format!("Join({}, {})", left.print_root(), right.print_root())
5232            }
5233            HydroNode::JoinHalf { left, right, .. } => {
5234                format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5235            }
5236            HydroNode::Difference { pos, neg, .. } => {
5237                format!("Difference({}, {})", pos.print_root(), neg.print_root())
5238            }
5239            HydroNode::AntiJoin { pos, neg, .. } => {
5240                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5241            }
5242            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5243            HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5244            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5245            HydroNode::Map { f, .. } => format!("Map({:?})", f),
5246            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5247            HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5248            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5249            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5250            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5251            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5252            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5253            HydroNode::Unique { .. } => "Unique()".to_owned(),
5254            HydroNode::Sort { .. } => "Sort()".to_owned(),
5255            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5256            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5257            HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5258                format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5259            }
5260            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5261            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5262            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5263            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5264            HydroNode::Network { .. } => "Network()".to_owned(),
5265            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5266            HydroNode::Counter { tag, duration, .. } => {
5267                format!("Counter({:?}, {:?})", tag, duration)
5268            }
5269        }
5270    }
5271}
5272
5273#[cfg(feature = "build")]
5274fn instantiate_network<'a, D>(
5275    env: &mut D::InstantiateEnv,
5276    from_location: &LocationId,
5277    to_location: &LocationId,
5278    processes: &SparseSecondaryMap<LocationKey, D::Process>,
5279    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5280    name: Option<&str>,
5281    networking_info: &crate::networking::NetworkingInfo,
5282) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5283where
5284    D: Deploy<'a>,
5285{
5286    let ((sink, source), connect_fn) = match (from_location, to_location) {
5287        (&LocationId::Process(from), &LocationId::Process(to)) => {
5288            let from_node = processes
5289                .get(from)
5290                .unwrap_or_else(|| {
5291                    panic!("A process used in the graph was not instantiated: {}", from)
5292                })
5293                .clone();
5294            let to_node = processes
5295                .get(to)
5296                .unwrap_or_else(|| {
5297                    panic!("A process used in the graph was not instantiated: {}", to)
5298                })
5299                .clone();
5300
5301            let sink_port = from_node.next_port();
5302            let source_port = to_node.next_port();
5303
5304            (
5305                D::o2o_sink_source(
5306                    env,
5307                    &from_node,
5308                    &sink_port,
5309                    &to_node,
5310                    &source_port,
5311                    name,
5312                    networking_info,
5313                ),
5314                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5315            )
5316        }
5317        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5318            let from_node = processes
5319                .get(from)
5320                .unwrap_or_else(|| {
5321                    panic!("A process used in the graph was not instantiated: {}", from)
5322                })
5323                .clone();
5324            let to_node = clusters
5325                .get(to)
5326                .unwrap_or_else(|| {
5327                    panic!("A cluster used in the graph was not instantiated: {}", to)
5328                })
5329                .clone();
5330
5331            let sink_port = from_node.next_port();
5332            let source_port = to_node.next_port();
5333
5334            (
5335                D::o2m_sink_source(
5336                    env,
5337                    &from_node,
5338                    &sink_port,
5339                    &to_node,
5340                    &source_port,
5341                    name,
5342                    networking_info,
5343                ),
5344                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5345            )
5346        }
5347        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5348            let from_node = clusters
5349                .get(from)
5350                .unwrap_or_else(|| {
5351                    panic!("A cluster used in the graph was not instantiated: {}", from)
5352                })
5353                .clone();
5354            let to_node = processes
5355                .get(to)
5356                .unwrap_or_else(|| {
5357                    panic!("A process used in the graph was not instantiated: {}", to)
5358                })
5359                .clone();
5360
5361            let sink_port = from_node.next_port();
5362            let source_port = to_node.next_port();
5363
5364            (
5365                D::m2o_sink_source(
5366                    env,
5367                    &from_node,
5368                    &sink_port,
5369                    &to_node,
5370                    &source_port,
5371                    name,
5372                    networking_info,
5373                ),
5374                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5375            )
5376        }
5377        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5378            let from_node = clusters
5379                .get(from)
5380                .unwrap_or_else(|| {
5381                    panic!("A cluster used in the graph was not instantiated: {}", from)
5382                })
5383                .clone();
5384            let to_node = clusters
5385                .get(to)
5386                .unwrap_or_else(|| {
5387                    panic!("A cluster used in the graph was not instantiated: {}", to)
5388                })
5389                .clone();
5390
5391            let sink_port = from_node.next_port();
5392            let source_port = to_node.next_port();
5393
5394            (
5395                D::m2m_sink_source(
5396                    env,
5397                    &from_node,
5398                    &sink_port,
5399                    &to_node,
5400                    &source_port,
5401                    name,
5402                    networking_info,
5403                ),
5404                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5405            )
5406        }
5407        (LocationId::Tick(_, _), _) => panic!(),
5408        (_, LocationId::Tick(_, _)) => panic!(),
5409        (LocationId::Atomic(_), _) => panic!(),
5410        (_, LocationId::Atomic(_)) => panic!(),
5411    };
5412    (sink, source, connect_fn)
5413}
5414
5415#[cfg(test)]
5416mod serde_test;
5417
5418#[cfg(test)]
5419mod test {
5420    use std::mem::size_of;
5421
5422    use stageleft::{QuotedWithContext, q};
5423
5424    use super::*;
5425
5426    #[test]
5427    #[cfg_attr(
5428        not(feature = "build"),
5429        ignore = "expects inclusion of feature-gated fields"
5430    )]
5431    fn hydro_node_size() {
5432        assert_eq!(size_of::<HydroNode>(), 264);
5433    }
5434
5435    #[test]
5436    #[cfg_attr(
5437        not(feature = "build"),
5438        ignore = "expects inclusion of feature-gated fields"
5439    )]
5440    fn hydro_root_size() {
5441        assert_eq!(size_of::<HydroRoot>(), 136);
5442    }
5443
5444    #[test]
5445    fn test_simplify_q_macro_basic() {
5446        // Test basic non-q! expression
5447        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5448        let result = simplify_q_macro(simple_expr.clone());
5449        assert_eq!(result, simple_expr);
5450    }
5451
5452    #[test]
5453    fn test_simplify_q_macro_actual_stageleft_call() {
5454        // Test a simplified version of what a real stageleft call might look like
5455        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5456        let result = simplify_q_macro(stageleft_call);
5457        // This should be processed by our visitor and simplified to q!(...)
5458        // since we detect the stageleft::runtime_support::fn_* pattern
5459        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5460    }
5461
5462    #[test]
5463    fn test_closure_no_pipe_at_start() {
5464        // Test a closure that does not start with a pipe
5465        let stageleft_call = q!({
5466            let foo = 123;
5467            move |b: usize| b + foo
5468        })
5469        .splice_fn1_ctx(&());
5470        let result = simplify_q_macro(stageleft_call);
5471        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5472    }
5473}