1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28#[cfg(feature = "build")]
29use crate::compile::builder::StmtId;
30use crate::compile::builder::{CycleId, ExternalPortId};
31#[cfg(feature = "build")]
32use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35
36pub mod backtrace;
37use backtrace::Backtrace;
38
39pub struct ClosureExpr {
45 pub expr: DebugExpr,
46 pub singleton_refs: Vec<(syn::Ident, HydroNode)>,
47}
48
49impl Clone for ClosureExpr {
50 fn clone(&self) -> Self {
51 Self {
52 expr: self.expr.clone(),
53 singleton_refs: self
54 .singleton_refs
55 .iter()
56 .map(|(ident, node)| {
57 let cloned_node = match node {
58 HydroNode::Singleton { inner, metadata } => HydroNode::Singleton {
59 inner: SharedNode(inner.0.clone()),
60 metadata: metadata.clone(),
61 },
62 _ => panic!("singleton_refs should only contain HydroNode::Singleton"),
63 };
64 (ident.clone(), cloned_node)
65 })
66 .collect(),
67 }
68 }
69}
70
71impl Hash for ClosureExpr {
72 fn hash<H: Hasher>(&self, state: &mut H) {
73 self.expr.hash(state);
74 }
78}
79
80impl serde::Serialize for ClosureExpr {
81 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
82 use serde::ser::SerializeStruct;
83 let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
84 s.serialize_field("expr", &self.expr)?;
85 s.serialize_field(
86 "singleton_refs",
87 &SerializableSingletonRefs(&self.singleton_refs),
88 )?;
89 s.end()
90 }
91}
92
93struct SerializableSingletonRefs<'a>(&'a [(syn::Ident, HydroNode)]);
94
95impl serde::Serialize for SerializableSingletonRefs<'_> {
96 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
97 use serde::ser::SerializeSeq;
98 let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
99 for (ident, node) in self.0 {
100 seq.serialize_element(&(ident.to_string(), node))?;
101 }
102 seq.end()
103 }
104}
105
106impl Debug for ClosureExpr {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 Debug::fmt(&self.expr, f)
109 }
110}
111
112impl Display for ClosureExpr {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 Display::fmt(&self.expr, f)
115 }
116}
117
118impl From<syn::Expr> for ClosureExpr {
119 fn from(expr: syn::Expr) -> Self {
120 Self {
121 expr: DebugExpr(Box::new(expr)),
122 singleton_refs: Vec::new(),
123 }
124 }
125}
126
127impl From<DebugExpr> for ClosureExpr {
128 fn from(expr: DebugExpr) -> Self {
129 Self {
130 expr,
131 singleton_refs: Vec::new(),
132 }
133 }
134}
135
136impl ClosureExpr {
137 pub fn new(expr: DebugExpr, singleton_refs: Vec<(syn::Ident, HydroNode)>) -> Self {
138 Self {
139 expr,
140 singleton_refs,
141 }
142 }
143
144 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
145 Self {
146 expr: self.expr.clone(),
147 singleton_refs: self
148 .singleton_refs
149 .iter()
150 .map(|(ident, node)| (ident.clone(), node.deep_clone(seen_tees)))
151 .collect(),
152 }
153 }
154
155 pub fn transform_children(
156 &mut self,
157 transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
158 seen_tees: &mut SeenSharedNodes,
159 ) {
160 for (_ident, ref_node) in self.singleton_refs.iter_mut() {
161 transform(ref_node, seen_tees);
162 }
163 }
164
165 #[cfg(feature = "build")]
168 pub fn emit_tokens(&self, ident_stack: &mut Vec<syn::Ident>) -> TokenStream {
169 if self.singleton_refs.is_empty() {
170 self.expr.0.to_token_stream()
171 } else {
172 let ref_idents = (0..self.singleton_refs.len())
173 .map(|_| ident_stack.pop().unwrap())
174 .collect::<Vec<_>>()
175 .into_iter()
176 .rev()
177 .collect::<Vec<_>>();
178 let local_idents = self
179 .singleton_refs
180 .iter()
181 .map(|(local_ident, _)| local_ident);
182 let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
183 let expr = &self.expr.0;
184 quote! {
185 {
186 #(
187 let #local_idents = #hash #ref_idents;
188 )*
189 #expr
190 }
191 }
192 }
193 }
194}
195
196#[derive(Clone, Hash)]
200pub struct DebugExpr(pub Box<syn::Expr>);
201
202impl serde::Serialize for DebugExpr {
203 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
204 serializer.serialize_str(&self.to_string())
205 }
206}
207
208impl From<syn::Expr> for DebugExpr {
209 fn from(expr: syn::Expr) -> Self {
210 Self(Box::new(expr))
211 }
212}
213
214impl Deref for DebugExpr {
215 type Target = syn::Expr;
216
217 fn deref(&self) -> &Self::Target {
218 &self.0
219 }
220}
221
222impl ToTokens for DebugExpr {
223 fn to_tokens(&self, tokens: &mut TokenStream) {
224 self.0.to_tokens(tokens);
225 }
226}
227
228impl Debug for DebugExpr {
229 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230 write!(f, "{}", self.0.to_token_stream())
231 }
232}
233
234impl Display for DebugExpr {
235 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236 let original = self.0.as_ref().clone();
237 let simplified = simplify_q_macro(original);
238
239 write!(f, "q!({})", quote::quote!(#simplified))
242 }
243}
244
245fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
247 let mut simplifier = QMacroSimplifier::new();
250 simplifier.visit_expr_mut(&mut expr);
251
252 if let Some(simplified) = simplifier.simplified_result {
254 simplified
255 } else {
256 expr
257 }
258}
259
260#[derive(Default)]
262pub struct QMacroSimplifier {
263 pub simplified_result: Option<syn::Expr>,
264}
265
266impl QMacroSimplifier {
267 pub fn new() -> Self {
268 Self::default()
269 }
270}
271
272impl VisitMut for QMacroSimplifier {
273 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
274 if self.simplified_result.is_some() {
276 return;
277 }
278
279 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
280 && self.is_stageleft_runtime_support_call(&path_expr.path)
282 && let Some(closure) = self.extract_closure_from_args(&call.args)
284 {
285 self.simplified_result = Some(closure);
286 return;
287 }
288
289 syn::visit_mut::visit_expr_mut(self, expr);
292 }
293}
294
295impl QMacroSimplifier {
296 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
297 if let Some(last_segment) = path.segments.last() {
299 let fn_name = last_segment.ident.to_string();
300 fn_name.contains("_type_hint")
302 && path.segments.len() > 2
303 && path.segments[0].ident == "stageleft"
304 && path.segments[1].ident == "runtime_support"
305 } else {
306 false
307 }
308 }
309
310 fn extract_closure_from_args(
311 &self,
312 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
313 ) -> Option<syn::Expr> {
314 for arg in args {
316 if let syn::Expr::Closure(_) = arg {
317 return Some(arg.clone());
318 }
319 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
321 return Some(closure_expr);
322 }
323 }
324 None
325 }
326
327 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
328 let mut visitor = ClosureFinder {
329 found_closure: None,
330 prefer_inner_blocks: true,
331 };
332 visitor.visit_expr(expr);
333 visitor.found_closure
334 }
335}
336
337struct ClosureFinder {
339 found_closure: Option<syn::Expr>,
340 prefer_inner_blocks: bool,
341}
342
343impl<'ast> Visit<'ast> for ClosureFinder {
344 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
345 if self.found_closure.is_some() {
347 return;
348 }
349
350 match expr {
351 syn::Expr::Closure(_) => {
352 self.found_closure = Some(expr.clone());
353 }
354 syn::Expr::Block(block) if self.prefer_inner_blocks => {
355 for stmt in &block.block.stmts {
357 if let syn::Stmt::Expr(stmt_expr, _) = stmt
358 && let syn::Expr::Block(_) = stmt_expr
359 {
360 let mut inner_visitor = ClosureFinder {
362 found_closure: None,
363 prefer_inner_blocks: false, };
365 inner_visitor.visit_expr(stmt_expr);
366 if inner_visitor.found_closure.is_some() {
367 self.found_closure = Some(stmt_expr.clone());
369 return;
370 }
371 }
372 }
373
374 visit::visit_expr(self, expr);
376
377 if self.found_closure.is_some() {
380 }
382 }
383 _ => {
384 visit::visit_expr(self, expr);
386 }
387 }
388 }
389}
390
391#[derive(Clone, PartialEq, Eq, Hash)]
395pub struct DebugType(pub Box<syn::Type>);
396
397impl From<syn::Type> for DebugType {
398 fn from(t: syn::Type) -> Self {
399 Self(Box::new(t))
400 }
401}
402
403impl Deref for DebugType {
404 type Target = syn::Type;
405
406 fn deref(&self) -> &Self::Target {
407 &self.0
408 }
409}
410
411impl ToTokens for DebugType {
412 fn to_tokens(&self, tokens: &mut TokenStream) {
413 self.0.to_tokens(tokens);
414 }
415}
416
417impl Debug for DebugType {
418 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
419 write!(f, "{}", self.0.to_token_stream())
420 }
421}
422
423impl serde::Serialize for DebugType {
424 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
425 serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
426 }
427}
428
429fn serialize_backtrace_as_span<S: serde::Serializer>(
430 backtrace: &Backtrace,
431 serializer: S,
432) -> Result<S::Ok, S::Error> {
433 match backtrace.format_span() {
434 Some(span) => serializer.serialize_some(&span),
435 None => serializer.serialize_none(),
436 }
437}
438
439fn serialize_ident<S: serde::Serializer>(
440 ident: &syn::Ident,
441 serializer: S,
442) -> Result<S::Ok, S::Error> {
443 serializer.serialize_str(&ident.to_string())
444}
445
446pub enum DebugInstantiate {
447 Building,
448 Finalized(Box<DebugInstantiateFinalized>),
449}
450
451impl serde::Serialize for DebugInstantiate {
452 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
453 match self {
454 DebugInstantiate::Building => {
455 serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
456 }
457 DebugInstantiate::Finalized(_) => {
458 panic!(
459 "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
460 )
461 }
462 }
463 }
464}
465
466#[cfg_attr(
467 not(feature = "build"),
468 expect(
469 dead_code,
470 reason = "sink, source unused without `feature = \"build\"`."
471 )
472)]
473pub struct DebugInstantiateFinalized {
474 sink: syn::Expr,
475 source: syn::Expr,
476 connect_fn: Option<Box<dyn FnOnce()>>,
477}
478
479impl From<DebugInstantiateFinalized> for DebugInstantiate {
480 fn from(f: DebugInstantiateFinalized) -> Self {
481 Self::Finalized(Box::new(f))
482 }
483}
484
485impl Debug for DebugInstantiate {
486 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
487 write!(f, "<network instantiate>")
488 }
489}
490
491impl Hash for DebugInstantiate {
492 fn hash<H: Hasher>(&self, _state: &mut H) {
493 }
495}
496
497impl Clone for DebugInstantiate {
498 fn clone(&self) -> Self {
499 match self {
500 DebugInstantiate::Building => DebugInstantiate::Building,
501 DebugInstantiate::Finalized(_) => {
502 panic!("DebugInstantiate::Finalized should not be cloned")
503 }
504 }
505 }
506}
507
508#[derive(Debug, Hash, Clone, serde::Serialize)]
517pub enum ClusterMembersState {
518 Uninit,
520 Stream(DebugExpr),
523 Tee(LocationId, LocationId),
527}
528
529#[derive(Debug, Hash, Clone, serde::Serialize)]
531pub enum HydroSource {
532 Stream(DebugExpr),
533 ExternalNetwork(),
534 Iter(DebugExpr),
535 Spin(),
536 ClusterMembers(LocationId, ClusterMembersState),
537 Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
538 EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
539}
540
541#[cfg(feature = "build")]
542pub trait DfirBuilder {
548 fn singleton_intermediates(&self) -> bool;
550
551 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
553
554 #[expect(clippy::too_many_arguments, reason = "TODO")]
555 fn batch(
556 &mut self,
557 in_ident: syn::Ident,
558 in_location: &LocationId,
559 in_kind: &CollectionKind,
560 out_ident: &syn::Ident,
561 out_location: &LocationId,
562 op_meta: &HydroIrOpMetadata,
563 fold_hooked_idents: &HashSet<String>,
564 );
565 fn yield_from_tick(
566 &mut self,
567 in_ident: syn::Ident,
568 in_location: &LocationId,
569 in_kind: &CollectionKind,
570 out_ident: &syn::Ident,
571 out_location: &LocationId,
572 );
573
574 fn begin_atomic(
575 &mut self,
576 in_ident: syn::Ident,
577 in_location: &LocationId,
578 in_kind: &CollectionKind,
579 out_ident: &syn::Ident,
580 out_location: &LocationId,
581 op_meta: &HydroIrOpMetadata,
582 );
583 fn end_atomic(
584 &mut self,
585 in_ident: syn::Ident,
586 in_location: &LocationId,
587 in_kind: &CollectionKind,
588 out_ident: &syn::Ident,
589 );
590
591 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
592 fn observe_nondet(
593 &mut self,
594 trusted: bool,
595 location: &LocationId,
596 in_ident: syn::Ident,
597 in_kind: &CollectionKind,
598 out_ident: &syn::Ident,
599 out_kind: &CollectionKind,
600 op_meta: &HydroIrOpMetadata,
601 );
602
603 #[expect(clippy::too_many_arguments, reason = "TODO")]
604 fn merge_ordered(
605 &mut self,
606 location: &LocationId,
607 first_ident: syn::Ident,
608 second_ident: syn::Ident,
609 out_ident: &syn::Ident,
610 in_kind: &CollectionKind,
611 op_meta: &HydroIrOpMetadata,
612 operator_tag: Option<&str>,
613 );
614
615 #[expect(clippy::too_many_arguments, reason = "TODO")]
616 fn create_network(
617 &mut self,
618 from: &LocationId,
619 to: &LocationId,
620 input_ident: syn::Ident,
621 out_ident: &syn::Ident,
622 serialize: Option<&DebugExpr>,
623 sink: syn::Expr,
624 source: syn::Expr,
625 deserialize: Option<&DebugExpr>,
626 tag_id: StmtId,
627 networking_info: &crate::networking::NetworkingInfo,
628 );
629
630 fn create_external_source(
631 &mut self,
632 on: &LocationId,
633 source_expr: syn::Expr,
634 out_ident: &syn::Ident,
635 deserialize: Option<&DebugExpr>,
636 tag_id: StmtId,
637 );
638
639 fn create_external_output(
640 &mut self,
641 on: &LocationId,
642 sink_expr: syn::Expr,
643 input_ident: &syn::Ident,
644 serialize: Option<&DebugExpr>,
645 tag_id: StmtId,
646 );
647
648 fn emit_fold_hook(
651 &mut self,
652 location: &LocationId,
653 in_ident: &syn::Ident,
654 in_kind: &CollectionKind,
655 op_meta: &HydroIrOpMetadata,
656 ) -> Option<syn::Ident>;
657
658 fn assert_is_consistent(
662 &mut self,
663 trusted: bool,
664 location: &LocationId,
665 in_ident: syn::Ident,
666 out_ident: &syn::Ident,
667 );
668}
669
670#[cfg(feature = "build")]
671impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
672 fn singleton_intermediates(&self) -> bool {
673 false
674 }
675
676 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
677 self.entry(location.root().key())
678 .expect("location was removed")
679 .or_default()
680 }
681
682 fn batch(
683 &mut self,
684 in_ident: syn::Ident,
685 in_location: &LocationId,
686 in_kind: &CollectionKind,
687 out_ident: &syn::Ident,
688 _out_location: &LocationId,
689 _op_meta: &HydroIrOpMetadata,
690 _fold_hooked_idents: &HashSet<String>,
691 ) {
692 let builder = self.get_dfir_mut(in_location.root());
693 if in_kind.is_bounded()
694 && matches!(
695 in_kind,
696 CollectionKind::Singleton { .. }
697 | CollectionKind::Optional { .. }
698 | CollectionKind::KeyedSingleton { .. }
699 )
700 {
701 assert!(in_location.is_top_level());
702 builder.add_dfir(
703 parse_quote! {
704 #out_ident = #in_ident -> persist::<'static>();
705 },
706 None,
707 None,
708 );
709 } else {
710 builder.add_dfir(
711 parse_quote! {
712 #out_ident = #in_ident;
713 },
714 None,
715 None,
716 );
717 }
718 }
719
720 fn yield_from_tick(
721 &mut self,
722 in_ident: syn::Ident,
723 in_location: &LocationId,
724 _in_kind: &CollectionKind,
725 out_ident: &syn::Ident,
726 _out_location: &LocationId,
727 ) {
728 let builder = self.get_dfir_mut(in_location.root());
729 builder.add_dfir(
730 parse_quote! {
731 #out_ident = #in_ident;
732 },
733 None,
734 None,
735 );
736 }
737
738 fn begin_atomic(
739 &mut self,
740 in_ident: syn::Ident,
741 in_location: &LocationId,
742 _in_kind: &CollectionKind,
743 out_ident: &syn::Ident,
744 _out_location: &LocationId,
745 _op_meta: &HydroIrOpMetadata,
746 ) {
747 let builder = self.get_dfir_mut(in_location.root());
748 builder.add_dfir(
749 parse_quote! {
750 #out_ident = #in_ident;
751 },
752 None,
753 None,
754 );
755 }
756
757 fn end_atomic(
758 &mut self,
759 in_ident: syn::Ident,
760 in_location: &LocationId,
761 _in_kind: &CollectionKind,
762 out_ident: &syn::Ident,
763 ) {
764 let builder = self.get_dfir_mut(in_location.root());
765 builder.add_dfir(
766 parse_quote! {
767 #out_ident = #in_ident;
768 },
769 None,
770 None,
771 );
772 }
773
774 fn observe_nondet(
775 &mut self,
776 _trusted: bool,
777 location: &LocationId,
778 in_ident: syn::Ident,
779 _in_kind: &CollectionKind,
780 out_ident: &syn::Ident,
781 _out_kind: &CollectionKind,
782 _op_meta: &HydroIrOpMetadata,
783 ) {
784 let builder = self.get_dfir_mut(location);
785 builder.add_dfir(
786 parse_quote! {
787 #out_ident = #in_ident;
788 },
789 None,
790 None,
791 );
792 }
793
794 fn merge_ordered(
795 &mut self,
796 location: &LocationId,
797 first_ident: syn::Ident,
798 second_ident: syn::Ident,
799 out_ident: &syn::Ident,
800 _in_kind: &CollectionKind,
801 _op_meta: &HydroIrOpMetadata,
802 operator_tag: Option<&str>,
803 ) {
804 let builder = self.get_dfir_mut(location);
805 builder.add_dfir(
806 parse_quote! {
807 #out_ident = union();
808 #first_ident -> [0]#out_ident;
809 #second_ident -> [1]#out_ident;
810 },
811 None,
812 operator_tag,
813 );
814 }
815
816 fn create_network(
817 &mut self,
818 from: &LocationId,
819 to: &LocationId,
820 input_ident: syn::Ident,
821 out_ident: &syn::Ident,
822 serialize: Option<&DebugExpr>,
823 sink: syn::Expr,
824 source: syn::Expr,
825 deserialize: Option<&DebugExpr>,
826 tag_id: StmtId,
827 _networking_info: &crate::networking::NetworkingInfo,
828 ) {
829 let sender_builder = self.get_dfir_mut(from);
830 if let Some(serialize_pipeline) = serialize {
831 sender_builder.add_dfir(
832 parse_quote! {
833 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
834 },
835 None,
836 Some(&format!("send{}", tag_id)),
838 );
839 } else {
840 sender_builder.add_dfir(
841 parse_quote! {
842 #input_ident -> dest_sink(#sink);
843 },
844 None,
845 Some(&format!("send{}", tag_id)),
846 );
847 }
848
849 let receiver_builder = self.get_dfir_mut(to);
850 if let Some(deserialize_pipeline) = deserialize {
851 receiver_builder.add_dfir(
852 parse_quote! {
853 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
854 },
855 None,
856 Some(&format!("recv{}", tag_id)),
857 );
858 } else {
859 receiver_builder.add_dfir(
860 parse_quote! {
861 #out_ident = source_stream(#source);
862 },
863 None,
864 Some(&format!("recv{}", tag_id)),
865 );
866 }
867 }
868
869 fn create_external_source(
870 &mut self,
871 on: &LocationId,
872 source_expr: syn::Expr,
873 out_ident: &syn::Ident,
874 deserialize: Option<&DebugExpr>,
875 tag_id: StmtId,
876 ) {
877 let receiver_builder = self.get_dfir_mut(on);
878 if let Some(deserialize_pipeline) = deserialize {
879 receiver_builder.add_dfir(
880 parse_quote! {
881 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
882 },
883 None,
884 Some(&format!("recv{}", tag_id)),
885 );
886 } else {
887 receiver_builder.add_dfir(
888 parse_quote! {
889 #out_ident = source_stream(#source_expr);
890 },
891 None,
892 Some(&format!("recv{}", tag_id)),
893 );
894 }
895 }
896
897 fn create_external_output(
898 &mut self,
899 on: &LocationId,
900 sink_expr: syn::Expr,
901 input_ident: &syn::Ident,
902 serialize: Option<&DebugExpr>,
903 tag_id: StmtId,
904 ) {
905 let sender_builder = self.get_dfir_mut(on);
906 if let Some(serialize_fn) = serialize {
907 sender_builder.add_dfir(
908 parse_quote! {
909 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
910 },
911 None,
912 Some(&format!("send{}", tag_id)),
914 );
915 } else {
916 sender_builder.add_dfir(
917 parse_quote! {
918 #input_ident -> dest_sink(#sink_expr);
919 },
920 None,
921 Some(&format!("send{}", tag_id)),
922 );
923 }
924 }
925
926 fn emit_fold_hook(
927 &mut self,
928 _location: &LocationId,
929 _in_ident: &syn::Ident,
930 _in_kind: &CollectionKind,
931 _op_meta: &HydroIrOpMetadata,
932 ) -> Option<syn::Ident> {
933 None
934 }
935
936 fn assert_is_consistent(
937 &mut self,
938 _trusted: bool,
939 location: &LocationId,
940 in_ident: syn::Ident,
941 out_ident: &syn::Ident,
942 ) {
943 let builder = self.get_dfir_mut(location);
944 builder.add_dfir(
945 parse_quote! {
946 #out_ident = #in_ident;
947 },
948 None,
949 None,
950 );
951 }
952}
953
954#[cfg(feature = "build")]
955pub enum BuildersOrCallback<'a, L, N>
956where
957 L: FnMut(&mut HydroRoot, &mut StmtId),
958 N: FnMut(&mut HydroNode, &mut StmtId),
959{
960 Builders(&'a mut dyn DfirBuilder),
961 Callback(L, N),
962}
963
964#[derive(Debug, Hash, serde::Serialize)]
968pub enum HydroRoot {
969 ForEach {
970 f: DebugExpr,
971 input: Box<HydroNode>,
972 op_metadata: HydroIrOpMetadata,
973 },
974 SendExternal {
975 to_external_key: LocationKey,
976 to_port_id: ExternalPortId,
977 to_many: bool,
978 unpaired: bool,
979 serialize_fn: Option<DebugExpr>,
980 instantiate_fn: DebugInstantiate,
981 input: Box<HydroNode>,
982 op_metadata: HydroIrOpMetadata,
983 },
984 DestSink {
985 sink: DebugExpr,
986 input: Box<HydroNode>,
987 op_metadata: HydroIrOpMetadata,
988 },
989 CycleSink {
990 cycle_id: CycleId,
991 input: Box<HydroNode>,
992 op_metadata: HydroIrOpMetadata,
993 },
994 EmbeddedOutput {
995 #[serde(serialize_with = "serialize_ident")]
996 ident: syn::Ident,
997 input: Box<HydroNode>,
998 op_metadata: HydroIrOpMetadata,
999 },
1000 Null {
1001 input: Box<HydroNode>,
1002 op_metadata: HydroIrOpMetadata,
1003 },
1004}
1005
1006impl HydroRoot {
1007 #[cfg(feature = "build")]
1008 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
1009 pub fn compile_network<'a, D>(
1010 &mut self,
1011 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
1012 seen_tees: &mut SeenSharedNodes,
1013 seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
1014 processes: &SparseSecondaryMap<LocationKey, D::Process>,
1015 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
1016 externals: &SparseSecondaryMap<LocationKey, D::External>,
1017 env: &mut D::InstantiateEnv,
1018 ) where
1019 D: Deploy<'a>,
1020 {
1021 let refcell_extra_stmts = RefCell::new(extra_stmts);
1022 let refcell_env = RefCell::new(env);
1023 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
1024 self.transform_bottom_up(
1025 &mut |l| {
1026 if let HydroRoot::SendExternal {
1027 input,
1028 to_external_key,
1029 to_port_id,
1030 to_many,
1031 unpaired,
1032 instantiate_fn,
1033 ..
1034 } = l
1035 {
1036 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1037 DebugInstantiate::Building => {
1038 let to_node = externals
1039 .get(*to_external_key)
1040 .unwrap_or_else(|| {
1041 panic!("A external used in the graph was not instantiated: {}", to_external_key)
1042 })
1043 .clone();
1044
1045 match input.metadata().location_id.root() {
1046 &LocationId::Process(process_key) => {
1047 if *to_many {
1048 (
1049 (
1050 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
1051 parse_quote!(DUMMY),
1052 ),
1053 Box::new(|| {}) as Box<dyn FnOnce()>,
1054 )
1055 } else {
1056 let from_node = processes
1057 .get(process_key)
1058 .unwrap_or_else(|| {
1059 panic!("A process used in the graph was not instantiated: {}", process_key)
1060 })
1061 .clone();
1062
1063 let sink_port = from_node.next_port();
1064 let source_port = to_node.next_port();
1065
1066 if *unpaired {
1067 use stageleft::quote_type;
1068 use tokio_util::codec::LengthDelimitedCodec;
1069
1070 to_node.register(*to_port_id, source_port.clone());
1071
1072 let _ = D::e2o_source(
1073 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1074 &to_node, &source_port,
1075 &from_node, &sink_port,
1076 "e_type::<LengthDelimitedCodec>(),
1077 format!("{}_{}", *to_external_key, *to_port_id)
1078 );
1079 }
1080
1081 (
1082 (
1083 D::o2e_sink(
1084 &from_node,
1085 &sink_port,
1086 &to_node,
1087 &source_port,
1088 format!("{}_{}", *to_external_key, *to_port_id)
1089 ),
1090 parse_quote!(DUMMY),
1091 ),
1092 if *unpaired {
1093 D::e2o_connect(
1094 &to_node,
1095 &source_port,
1096 &from_node,
1097 &sink_port,
1098 *to_many,
1099 NetworkHint::Auto,
1100 )
1101 } else {
1102 Box::new(|| {}) as Box<dyn FnOnce()>
1103 },
1104 )
1105 }
1106 }
1107 LocationId::Cluster(cluster_key) => {
1108 let from_node = clusters
1109 .get(*cluster_key)
1110 .unwrap_or_else(|| {
1111 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1112 })
1113 .clone();
1114
1115 let sink_port = from_node.next_port();
1116 let source_port = to_node.next_port();
1117
1118 if *unpaired {
1119 to_node.register(*to_port_id, source_port.clone());
1120 }
1121
1122 (
1123 (
1124 D::m2e_sink(
1125 &from_node,
1126 &sink_port,
1127 &to_node,
1128 &source_port,
1129 format!("{}_{}", *to_external_key, *to_port_id)
1130 ),
1131 parse_quote!(DUMMY),
1132 ),
1133 Box::new(|| {}) as Box<dyn FnOnce()>,
1134 )
1135 }
1136 _ => panic!()
1137 }
1138 },
1139
1140 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1141 };
1142
1143 *instantiate_fn = DebugInstantiateFinalized {
1144 sink: sink_expr,
1145 source: source_expr,
1146 connect_fn: Some(connect_fn),
1147 }
1148 .into();
1149 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1150 let element_type = match &input.metadata().collection_kind {
1151 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1152 _ => panic!("Embedded output must have Stream collection kind"),
1153 };
1154 let location_key = match input.metadata().location_id.root() {
1155 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1156 _ => panic!("Embedded output must be on a process or cluster"),
1157 };
1158 D::register_embedded_output(
1159 &mut refcell_env.borrow_mut(),
1160 location_key,
1161 ident,
1162 &element_type,
1163 );
1164 }
1165 },
1166 &mut |n| {
1167 if let HydroNode::Network {
1168 name,
1169 networking_info,
1170 input,
1171 instantiate_fn,
1172 metadata,
1173 ..
1174 } = n
1175 {
1176 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1177 DebugInstantiate::Building => instantiate_network::<D>(
1178 &mut refcell_env.borrow_mut(),
1179 input.metadata().location_id.root(),
1180 metadata.location_id.root(),
1181 processes,
1182 clusters,
1183 name.as_deref(),
1184 networking_info,
1185 ),
1186
1187 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1188 };
1189
1190 *instantiate_fn = DebugInstantiateFinalized {
1191 sink: sink_expr,
1192 source: source_expr,
1193 connect_fn: Some(connect_fn),
1194 }
1195 .into();
1196 } else if let HydroNode::ExternalInput {
1197 from_external_key,
1198 from_port_id,
1199 from_many,
1200 codec_type,
1201 port_hint,
1202 instantiate_fn,
1203 metadata,
1204 ..
1205 } = n
1206 {
1207 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1208 DebugInstantiate::Building => {
1209 let from_node = externals
1210 .get(*from_external_key)
1211 .unwrap_or_else(|| {
1212 panic!(
1213 "A external used in the graph was not instantiated: {}",
1214 from_external_key,
1215 )
1216 })
1217 .clone();
1218
1219 match metadata.location_id.root() {
1220 &LocationId::Process(process_key) => {
1221 let to_node = processes
1222 .get(process_key)
1223 .unwrap_or_else(|| {
1224 panic!("A process used in the graph was not instantiated: {}", process_key)
1225 })
1226 .clone();
1227
1228 let sink_port = from_node.next_port();
1229 let source_port = to_node.next_port();
1230
1231 from_node.register(*from_port_id, sink_port.clone());
1232
1233 (
1234 (
1235 parse_quote!(DUMMY),
1236 if *from_many {
1237 D::e2o_many_source(
1238 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1239 &to_node, &source_port,
1240 codec_type.0.as_ref(),
1241 format!("{}_{}", *from_external_key, *from_port_id)
1242 )
1243 } else {
1244 D::e2o_source(
1245 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1246 &from_node, &sink_port,
1247 &to_node, &source_port,
1248 codec_type.0.as_ref(),
1249 format!("{}_{}", *from_external_key, *from_port_id)
1250 )
1251 },
1252 ),
1253 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1254 )
1255 }
1256 LocationId::Cluster(cluster_key) => {
1257 let to_node = clusters
1258 .get(*cluster_key)
1259 .unwrap_or_else(|| {
1260 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1261 })
1262 .clone();
1263
1264 let sink_port = from_node.next_port();
1265 let source_port = to_node.next_port();
1266
1267 from_node.register(*from_port_id, sink_port.clone());
1268
1269 (
1270 (
1271 parse_quote!(DUMMY),
1272 D::e2m_source(
1273 refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1274 &from_node, &sink_port,
1275 &to_node, &source_port,
1276 codec_type.0.as_ref(),
1277 format!("{}_{}", *from_external_key, *from_port_id)
1278 ),
1279 ),
1280 D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1281 )
1282 }
1283 _ => panic!()
1284 }
1285 },
1286
1287 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1288 };
1289
1290 *instantiate_fn = DebugInstantiateFinalized {
1291 sink: sink_expr,
1292 source: source_expr,
1293 connect_fn: Some(connect_fn),
1294 }
1295 .into();
1296 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1297 let element_type = match &metadata.collection_kind {
1298 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1299 _ => panic!("Embedded source must have Stream collection kind"),
1300 };
1301 let location_key = match metadata.location_id.root() {
1302 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1303 _ => panic!("Embedded source must be on a process or cluster"),
1304 };
1305 D::register_embedded_stream_input(
1306 &mut refcell_env.borrow_mut(),
1307 location_key,
1308 ident,
1309 &element_type,
1310 );
1311 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1312 let element_type = match &metadata.collection_kind {
1313 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1314 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1315 };
1316 let location_key = match metadata.location_id.root() {
1317 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1318 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1319 };
1320 D::register_embedded_singleton_input(
1321 &mut refcell_env.borrow_mut(),
1322 location_key,
1323 ident,
1324 &element_type,
1325 );
1326 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1327 match state {
1328 ClusterMembersState::Uninit => {
1329 let at_location = metadata.location_id.root().clone();
1330 let key = (at_location.clone(), location_id.key());
1331 if refcell_seen_cluster_members.borrow_mut().insert(key) {
1332 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1334 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1335 &(),
1336 );
1337 *state = ClusterMembersState::Stream(expr.into());
1338 } else {
1339 *state = ClusterMembersState::Tee(at_location, location_id.clone());
1341 }
1342 }
1343 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1344 panic!("cluster members already finalized");
1345 }
1346 }
1347 }
1348 },
1349 seen_tees,
1350 false,
1351 );
1352 }
1353
1354 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1355 self.transform_bottom_up(
1356 &mut |l| {
1357 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1358 match instantiate_fn {
1359 DebugInstantiate::Building => panic!("network not built"),
1360
1361 DebugInstantiate::Finalized(finalized) => {
1362 (finalized.connect_fn.take().unwrap())();
1363 }
1364 }
1365 }
1366 },
1367 &mut |n| {
1368 if let HydroNode::Network { instantiate_fn, .. }
1369 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1370 {
1371 match instantiate_fn {
1372 DebugInstantiate::Building => panic!("network not built"),
1373
1374 DebugInstantiate::Finalized(finalized) => {
1375 (finalized.connect_fn.take().unwrap())();
1376 }
1377 }
1378 }
1379 },
1380 seen_tees,
1381 false,
1382 );
1383 }
1384
1385 pub fn transform_bottom_up(
1386 &mut self,
1387 transform_root: &mut impl FnMut(&mut HydroRoot),
1388 transform_node: &mut impl FnMut(&mut HydroNode),
1389 seen_tees: &mut SeenSharedNodes,
1390 check_well_formed: bool,
1391 ) {
1392 self.transform_children(
1393 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1394 seen_tees,
1395 );
1396
1397 transform_root(self);
1398 }
1399
1400 pub fn transform_children(
1401 &mut self,
1402 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1403 seen_tees: &mut SeenSharedNodes,
1404 ) {
1405 match self {
1406 HydroRoot::ForEach { input, .. }
1407 | HydroRoot::SendExternal { input, .. }
1408 | HydroRoot::DestSink { input, .. }
1409 | HydroRoot::CycleSink { input, .. }
1410 | HydroRoot::EmbeddedOutput { input, .. }
1411 | HydroRoot::Null { input, .. } => {
1412 transform(input, seen_tees);
1413 }
1414 }
1415 }
1416
1417 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1418 match self {
1419 HydroRoot::ForEach {
1420 f,
1421 input,
1422 op_metadata,
1423 } => HydroRoot::ForEach {
1424 f: f.clone(),
1425 input: Box::new(input.deep_clone(seen_tees)),
1426 op_metadata: op_metadata.clone(),
1427 },
1428 HydroRoot::SendExternal {
1429 to_external_key,
1430 to_port_id,
1431 to_many,
1432 unpaired,
1433 serialize_fn,
1434 instantiate_fn,
1435 input,
1436 op_metadata,
1437 } => HydroRoot::SendExternal {
1438 to_external_key: *to_external_key,
1439 to_port_id: *to_port_id,
1440 to_many: *to_many,
1441 unpaired: *unpaired,
1442 serialize_fn: serialize_fn.clone(),
1443 instantiate_fn: instantiate_fn.clone(),
1444 input: Box::new(input.deep_clone(seen_tees)),
1445 op_metadata: op_metadata.clone(),
1446 },
1447 HydroRoot::DestSink {
1448 sink,
1449 input,
1450 op_metadata,
1451 } => HydroRoot::DestSink {
1452 sink: sink.clone(),
1453 input: Box::new(input.deep_clone(seen_tees)),
1454 op_metadata: op_metadata.clone(),
1455 },
1456 HydroRoot::CycleSink {
1457 cycle_id,
1458 input,
1459 op_metadata,
1460 } => HydroRoot::CycleSink {
1461 cycle_id: *cycle_id,
1462 input: Box::new(input.deep_clone(seen_tees)),
1463 op_metadata: op_metadata.clone(),
1464 },
1465 HydroRoot::EmbeddedOutput {
1466 ident,
1467 input,
1468 op_metadata,
1469 } => HydroRoot::EmbeddedOutput {
1470 ident: ident.clone(),
1471 input: Box::new(input.deep_clone(seen_tees)),
1472 op_metadata: op_metadata.clone(),
1473 },
1474 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1475 input: Box::new(input.deep_clone(seen_tees)),
1476 op_metadata: op_metadata.clone(),
1477 },
1478 }
1479 }
1480
1481 #[cfg(feature = "build")]
1482 pub fn emit(
1483 &mut self,
1484 graph_builders: &mut dyn DfirBuilder,
1485 seen_tees: &mut SeenSharedNodes,
1486 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1487 next_stmt_id: &mut StmtId,
1488 fold_hooked_idents: &mut HashSet<String>,
1489 ) {
1490 self.emit_core(
1491 &mut BuildersOrCallback::<
1492 fn(&mut HydroRoot, &mut StmtId),
1493 fn(&mut HydroNode, &mut StmtId),
1494 >::Builders(graph_builders),
1495 seen_tees,
1496 built_tees,
1497 next_stmt_id,
1498 fold_hooked_idents,
1499 );
1500 }
1501
1502 #[cfg(feature = "build")]
1503 pub fn emit_core(
1504 &mut self,
1505 builders_or_callback: &mut BuildersOrCallback<
1506 impl FnMut(&mut HydroRoot, &mut StmtId),
1507 impl FnMut(&mut HydroNode, &mut StmtId),
1508 >,
1509 seen_tees: &mut SeenSharedNodes,
1510 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1511 next_stmt_id: &mut StmtId,
1512 fold_hooked_idents: &mut HashSet<String>,
1513 ) {
1514 match self {
1515 HydroRoot::ForEach { f, input, .. } => {
1516 let input_ident = input.emit_core(
1517 builders_or_callback,
1518 seen_tees,
1519 built_tees,
1520 next_stmt_id,
1521 fold_hooked_idents,
1522 );
1523
1524 match builders_or_callback {
1525 BuildersOrCallback::Builders(graph_builders) => {
1526 graph_builders
1527 .get_dfir_mut(&input.metadata().location_id)
1528 .add_dfir(
1529 parse_quote! {
1530 #input_ident -> for_each(#f);
1531 },
1532 None,
1533 Some(&next_stmt_id.to_string()),
1534 );
1535 }
1536 BuildersOrCallback::Callback(leaf_callback, _) => {
1537 leaf_callback(self, next_stmt_id);
1538 }
1539 }
1540
1541 let _ = next_stmt_id.get_and_increment();
1542 }
1543
1544 HydroRoot::SendExternal {
1545 serialize_fn,
1546 instantiate_fn,
1547 input,
1548 ..
1549 } => {
1550 let input_ident = input.emit_core(
1551 builders_or_callback,
1552 seen_tees,
1553 built_tees,
1554 next_stmt_id,
1555 fold_hooked_idents,
1556 );
1557
1558 match builders_or_callback {
1559 BuildersOrCallback::Builders(graph_builders) => {
1560 let (sink_expr, _) = match instantiate_fn {
1561 DebugInstantiate::Building => (
1562 syn::parse_quote!(DUMMY_SINK),
1563 syn::parse_quote!(DUMMY_SOURCE),
1564 ),
1565
1566 DebugInstantiate::Finalized(finalized) => {
1567 (finalized.sink.clone(), finalized.source.clone())
1568 }
1569 };
1570
1571 graph_builders.create_external_output(
1572 &input.metadata().location_id,
1573 sink_expr,
1574 &input_ident,
1575 serialize_fn.as_ref(),
1576 *next_stmt_id,
1577 );
1578 }
1579 BuildersOrCallback::Callback(leaf_callback, _) => {
1580 leaf_callback(self, next_stmt_id);
1581 }
1582 }
1583
1584 let _ = next_stmt_id.get_and_increment();
1585 }
1586
1587 HydroRoot::DestSink { sink, input, .. } => {
1588 let input_ident = input.emit_core(
1589 builders_or_callback,
1590 seen_tees,
1591 built_tees,
1592 next_stmt_id,
1593 fold_hooked_idents,
1594 );
1595
1596 match builders_or_callback {
1597 BuildersOrCallback::Builders(graph_builders) => {
1598 graph_builders
1599 .get_dfir_mut(&input.metadata().location_id)
1600 .add_dfir(
1601 parse_quote! {
1602 #input_ident -> dest_sink(#sink);
1603 },
1604 None,
1605 Some(&next_stmt_id.to_string()),
1606 );
1607 }
1608 BuildersOrCallback::Callback(leaf_callback, _) => {
1609 leaf_callback(self, next_stmt_id);
1610 }
1611 }
1612
1613 let _ = next_stmt_id.get_and_increment();
1614 }
1615
1616 HydroRoot::CycleSink {
1617 cycle_id, input, ..
1618 } => {
1619 let input_ident = input.emit_core(
1620 builders_or_callback,
1621 seen_tees,
1622 built_tees,
1623 next_stmt_id,
1624 fold_hooked_idents,
1625 );
1626
1627 match builders_or_callback {
1628 BuildersOrCallback::Builders(graph_builders) => {
1629 let elem_type: syn::Type = match &input.metadata().collection_kind {
1630 CollectionKind::KeyedSingleton {
1631 key_type,
1632 value_type,
1633 ..
1634 }
1635 | CollectionKind::KeyedStream {
1636 key_type,
1637 value_type,
1638 ..
1639 } => {
1640 parse_quote!((#key_type, #value_type))
1641 }
1642 CollectionKind::Stream { element_type, .. }
1643 | CollectionKind::Singleton { element_type, .. }
1644 | CollectionKind::Optional { element_type, .. } => {
1645 parse_quote!(#element_type)
1646 }
1647 };
1648
1649 let cycle_id_ident = cycle_id.as_ident();
1650 graph_builders
1651 .get_dfir_mut(&input.metadata().location_id)
1652 .add_dfir(
1653 parse_quote! {
1654 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1655 },
1656 None,
1657 None,
1658 );
1659 }
1660 BuildersOrCallback::Callback(_, _) => {}
1662 }
1663 }
1664
1665 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1666 let input_ident = input.emit_core(
1667 builders_or_callback,
1668 seen_tees,
1669 built_tees,
1670 next_stmt_id,
1671 fold_hooked_idents,
1672 );
1673
1674 match builders_or_callback {
1675 BuildersOrCallback::Builders(graph_builders) => {
1676 graph_builders
1677 .get_dfir_mut(&input.metadata().location_id)
1678 .add_dfir(
1679 parse_quote! {
1680 #input_ident -> for_each(&mut #ident);
1681 },
1682 None,
1683 Some(&next_stmt_id.to_string()),
1684 );
1685 }
1686 BuildersOrCallback::Callback(leaf_callback, _) => {
1687 leaf_callback(self, next_stmt_id);
1688 }
1689 }
1690
1691 let _ = next_stmt_id.get_and_increment();
1692 }
1693
1694 HydroRoot::Null { input, .. } => {
1695 let input_ident = input.emit_core(
1696 builders_or_callback,
1697 seen_tees,
1698 built_tees,
1699 next_stmt_id,
1700 fold_hooked_idents,
1701 );
1702
1703 match builders_or_callback {
1704 BuildersOrCallback::Builders(graph_builders) => {
1705 graph_builders
1706 .get_dfir_mut(&input.metadata().location_id)
1707 .add_dfir(
1708 parse_quote! {
1709 #input_ident -> for_each(|_| {});
1710 },
1711 None,
1712 Some(&next_stmt_id.to_string()),
1713 );
1714 }
1715 BuildersOrCallback::Callback(leaf_callback, _) => {
1716 leaf_callback(self, next_stmt_id);
1717 }
1718 }
1719
1720 let _ = next_stmt_id.get_and_increment();
1721 }
1722 }
1723 }
1724
1725 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1726 match self {
1727 HydroRoot::ForEach { op_metadata, .. }
1728 | HydroRoot::SendExternal { op_metadata, .. }
1729 | HydroRoot::DestSink { op_metadata, .. }
1730 | HydroRoot::CycleSink { op_metadata, .. }
1731 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1732 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1733 }
1734 }
1735
1736 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1737 match self {
1738 HydroRoot::ForEach { op_metadata, .. }
1739 | HydroRoot::SendExternal { op_metadata, .. }
1740 | HydroRoot::DestSink { op_metadata, .. }
1741 | HydroRoot::CycleSink { op_metadata, .. }
1742 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1743 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1744 }
1745 }
1746
1747 pub fn input(&self) -> &HydroNode {
1748 match self {
1749 HydroRoot::ForEach { input, .. }
1750 | HydroRoot::SendExternal { input, .. }
1751 | HydroRoot::DestSink { input, .. }
1752 | HydroRoot::CycleSink { input, .. }
1753 | HydroRoot::EmbeddedOutput { input, .. }
1754 | HydroRoot::Null { input, .. } => input,
1755 }
1756 }
1757
1758 pub fn input_metadata(&self) -> &HydroIrMetadata {
1759 self.input().metadata()
1760 }
1761
1762 pub fn print_root(&self) -> String {
1763 match self {
1764 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1765 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1766 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1767 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1768 HydroRoot::EmbeddedOutput { ident, .. } => {
1769 format!("EmbeddedOutput({})", ident)
1770 }
1771 HydroRoot::Null { .. } => "Null".to_owned(),
1772 }
1773 }
1774
1775 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1776 match self {
1777 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1778 transform(f);
1779 }
1780 HydroRoot::SendExternal { .. }
1781 | HydroRoot::CycleSink { .. }
1782 | HydroRoot::EmbeddedOutput { .. }
1783 | HydroRoot::Null { .. } => {}
1784 }
1785 }
1786}
1787
1788#[cfg(feature = "build")]
1789fn tick_of(loc: &LocationId) -> Option<ClockId> {
1790 match loc {
1791 LocationId::Tick(id, _) => Some(*id),
1792 LocationId::Atomic(inner) => tick_of(inner),
1793 _ => None,
1794 }
1795}
1796
1797#[cfg(feature = "build")]
1798fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1799 match loc {
1800 LocationId::Tick(id, inner) => {
1801 *id = uf_find(uf, *id);
1802 remap_location(inner, uf);
1803 }
1804 LocationId::Atomic(inner) => {
1805 remap_location(inner, uf);
1806 }
1807 LocationId::Process(_) | LocationId::Cluster(_) => {}
1808 }
1809}
1810
1811#[cfg(feature = "build")]
1812fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1813 let p = *parent.get(&x).unwrap_or(&x);
1814 if p == x {
1815 return x;
1816 }
1817 let root = uf_find(parent, p);
1818 parent.insert(x, root);
1819 root
1820}
1821
1822#[cfg(feature = "build")]
1823fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1824 let ra = uf_find(parent, a);
1825 let rb = uf_find(parent, b);
1826 if ra != rb {
1827 parent.insert(ra, rb);
1828 }
1829}
1830
1831#[cfg(feature = "build")]
1835pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1836 let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1837
1838 transform_bottom_up(
1840 ir,
1841 &mut |_| {},
1842 &mut |node: &mut HydroNode| match node {
1843 HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1844 if let (Some(a), Some(b)) = (
1845 tick_of(&inner.metadata().location_id),
1846 tick_of(&metadata.location_id),
1847 ) {
1848 uf_union(&mut uf, a, b);
1849 }
1850 }
1851 HydroNode::Chain {
1852 first,
1853 second,
1854 metadata,
1855 }
1856 | HydroNode::ChainFirst {
1857 first,
1858 second,
1859 metadata,
1860 }
1861 | HydroNode::MergeOrdered {
1862 first,
1863 second,
1864 metadata,
1865 } => {
1866 if let (Some(a), Some(b)) = (
1867 tick_of(&first.metadata().location_id),
1868 tick_of(&metadata.location_id),
1869 ) {
1870 uf_union(&mut uf, a, b);
1871 }
1872 if let (Some(a), Some(b)) = (
1873 tick_of(&second.metadata().location_id),
1874 tick_of(&metadata.location_id),
1875 ) {
1876 uf_union(&mut uf, a, b);
1877 }
1878 }
1879 _ => {}
1880 },
1881 false,
1882 );
1883
1884 transform_bottom_up(
1886 ir,
1887 &mut |_| {},
1888 &mut |node: &mut HydroNode| {
1889 remap_location(&mut node.metadata_mut().location_id, &mut uf);
1890 },
1891 false,
1892 );
1893}
1894
1895#[cfg(feature = "build")]
1896pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1897 let mut builders = SecondaryMap::new();
1898 let mut seen_tees = HashMap::new();
1899 let mut built_tees = HashMap::new();
1900 let mut next_stmt_id = StmtId::default();
1901 let mut fold_hooked_idents = HashSet::new();
1902 for leaf in ir {
1903 leaf.emit(
1904 &mut builders,
1905 &mut seen_tees,
1906 &mut built_tees,
1907 &mut next_stmt_id,
1908 &mut fold_hooked_idents,
1909 );
1910 }
1911 builders
1912}
1913
1914#[cfg(feature = "build")]
1915pub fn traverse_dfir(
1916 ir: &mut [HydroRoot],
1917 transform_root: impl FnMut(&mut HydroRoot, &mut StmtId),
1918 transform_node: impl FnMut(&mut HydroNode, &mut StmtId),
1919) {
1920 let mut seen_tees = HashMap::new();
1921 let mut built_tees = HashMap::new();
1922 let mut next_stmt_id = StmtId::default();
1923 let mut fold_hooked_idents = HashSet::new();
1924 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1925 ir.iter_mut().for_each(|leaf| {
1926 leaf.emit_core(
1927 &mut callback,
1928 &mut seen_tees,
1929 &mut built_tees,
1930 &mut next_stmt_id,
1931 &mut fold_hooked_idents,
1932 );
1933 });
1934}
1935
1936pub fn transform_bottom_up(
1937 ir: &mut [HydroRoot],
1938 transform_root: &mut impl FnMut(&mut HydroRoot),
1939 transform_node: &mut impl FnMut(&mut HydroNode),
1940 check_well_formed: bool,
1941) {
1942 let mut seen_tees = HashMap::new();
1943 ir.iter_mut().for_each(|leaf| {
1944 leaf.transform_bottom_up(
1945 transform_root,
1946 transform_node,
1947 &mut seen_tees,
1948 check_well_formed,
1949 );
1950 });
1951}
1952
1953pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1954 let mut seen_tees = HashMap::new();
1955 ir.iter()
1956 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1957 .collect()
1958}
1959
1960type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1961thread_local! {
1962 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1963 static SERIALIZED_SHARED: PrintedTees
1967 = const { RefCell::new(None) };
1968}
1969
1970pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1971 PRINTED_TEES.with(|printed_tees| {
1972 let mut printed_tees_mut = printed_tees.borrow_mut();
1973 *printed_tees_mut = Some((0, HashMap::new()));
1974 drop(printed_tees_mut);
1975
1976 let ret = f();
1977
1978 let mut printed_tees_mut = printed_tees.borrow_mut();
1979 *printed_tees_mut = None;
1980
1981 ret
1982 })
1983}
1984
1985pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1990 let _guard = SerializedSharedGuard::enter();
1991 f()
1992}
1993
1994#[cfg(feature = "viz")]
1996pub fn ir_to_json(ir: &[HydroRoot]) -> Result<String, serde_json::Error> {
1997 serialize_dedup_shared(|| serde_json::to_string(ir))
1998}
1999
2000struct SerializedSharedGuard {
2003 previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
2004}
2005
2006impl SerializedSharedGuard {
2007 fn enter() -> Self {
2008 let previous = SERIALIZED_SHARED.with(|cell| {
2009 let mut guard = cell.borrow_mut();
2010 guard.replace((0, HashMap::new()))
2011 });
2012 Self { previous }
2013 }
2014}
2015
2016impl Drop for SerializedSharedGuard {
2017 fn drop(&mut self) {
2018 SERIALIZED_SHARED.with(|cell| {
2019 *cell.borrow_mut() = self.previous.take();
2020 });
2021 }
2022}
2023
2024pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
2025
2026impl serde::Serialize for SharedNode {
2027 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2038 SERIALIZED_SHARED.with(|cell| {
2039 let mut guard = cell.borrow_mut();
2040 let state = guard.as_mut().ok_or_else(|| {
2042 serde::ser::Error::custom(
2043 "SharedNode serialization requires an active serialize_dedup_shared scope",
2044 )
2045 })?;
2046 let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
2047
2048 if let Some(&id) = state.1.get(&ptr) {
2049 drop(guard);
2050 use serde::ser::SerializeMap;
2051 let mut map = serializer.serialize_map(Some(1))?;
2052 map.serialize_entry("$shared_ref", &id)?;
2053 map.end()
2054 } else {
2055 let id = state.0;
2056 state.0 += 1;
2057 state.1.insert(ptr, id);
2058 drop(guard);
2059
2060 use serde::ser::SerializeMap;
2061 let mut map = serializer.serialize_map(Some(2))?;
2062 map.serialize_entry("$shared", &id)?;
2063 map.serialize_entry("node", &*self.0.borrow())?;
2064 map.end()
2065 }
2066 })
2067 }
2068}
2069
2070impl SharedNode {
2071 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2072 Rc::as_ptr(&self.0)
2073 }
2074}
2075
2076impl Debug for SharedNode {
2077 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2078 PRINTED_TEES.with(|printed_tees| {
2079 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2080 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2081
2082 if let Some(printed_tees_mut) = printed_tees_mut {
2083 if let Some(existing) = printed_tees_mut
2084 .1
2085 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2086 {
2087 write!(f, "<shared {}>", existing)
2088 } else {
2089 let next_id = printed_tees_mut.0;
2090 printed_tees_mut.0 += 1;
2091 printed_tees_mut
2092 .1
2093 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2094 drop(printed_tees_mut_borrow);
2095 write!(f, "<shared {}>: ", next_id)?;
2096 Debug::fmt(&self.0.borrow(), f)
2097 }
2098 } else {
2099 drop(printed_tees_mut_borrow);
2100 write!(f, "<shared>: ")?;
2101 Debug::fmt(&self.0.borrow(), f)
2102 }
2103 })
2104 }
2105}
2106
2107impl Hash for SharedNode {
2108 fn hash<H: Hasher>(&self, state: &mut H) {
2109 self.0.borrow_mut().hash(state);
2110 }
2111}
2112
2113#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2114pub enum BoundKind {
2115 Unbounded,
2116 Bounded,
2117}
2118
2119#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2120pub enum StreamOrder {
2121 NoOrder,
2122 TotalOrder,
2123}
2124
2125#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2126pub enum StreamRetry {
2127 AtLeastOnce,
2128 ExactlyOnce,
2129}
2130
2131#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2132pub enum KeyedSingletonBoundKind {
2133 Unbounded,
2134 MonotonicValue,
2135 BoundedValue,
2136 Bounded,
2137}
2138
2139#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2140pub enum SingletonBoundKind {
2141 Unbounded,
2142 Monotonic,
2143 Bounded,
2144}
2145
2146#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2147pub enum CollectionKind {
2148 Stream {
2149 bound: BoundKind,
2150 order: StreamOrder,
2151 retry: StreamRetry,
2152 element_type: DebugType,
2153 },
2154 Singleton {
2155 bound: SingletonBoundKind,
2156 element_type: DebugType,
2157 },
2158 Optional {
2159 bound: BoundKind,
2160 element_type: DebugType,
2161 },
2162 KeyedStream {
2163 bound: BoundKind,
2164 value_order: StreamOrder,
2165 value_retry: StreamRetry,
2166 key_type: DebugType,
2167 value_type: DebugType,
2168 },
2169 KeyedSingleton {
2170 bound: KeyedSingletonBoundKind,
2171 key_type: DebugType,
2172 value_type: DebugType,
2173 },
2174}
2175
2176impl CollectionKind {
2177 pub fn is_bounded(&self) -> bool {
2178 matches!(
2179 self,
2180 CollectionKind::Stream {
2181 bound: BoundKind::Bounded,
2182 ..
2183 } | CollectionKind::Singleton {
2184 bound: SingletonBoundKind::Bounded,
2185 ..
2186 } | CollectionKind::Optional {
2187 bound: BoundKind::Bounded,
2188 ..
2189 } | CollectionKind::KeyedStream {
2190 bound: BoundKind::Bounded,
2191 ..
2192 } | CollectionKind::KeyedSingleton {
2193 bound: KeyedSingletonBoundKind::Bounded,
2194 ..
2195 }
2196 )
2197 }
2198}
2199
2200#[derive(Clone, serde::Serialize)]
2201pub struct HydroIrMetadata {
2202 pub location_id: LocationId,
2203 pub collection_kind: CollectionKind,
2204 pub consistency: Option<ClusterConsistency>,
2205 pub cardinality: Option<usize>,
2206 pub tag: Option<String>,
2207 pub op: HydroIrOpMetadata,
2208}
2209
2210impl Hash for HydroIrMetadata {
2212 fn hash<H: Hasher>(&self, _: &mut H) {}
2213}
2214
2215impl PartialEq for HydroIrMetadata {
2216 fn eq(&self, _: &Self) -> bool {
2217 true
2218 }
2219}
2220
2221impl Eq for HydroIrMetadata {}
2222
2223impl Debug for HydroIrMetadata {
2224 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2225 f.debug_struct("HydroIrMetadata")
2226 .field("location_id", &self.location_id)
2227 .field("collection_kind", &self.collection_kind)
2228 .finish()
2229 }
2230}
2231
2232#[derive(Clone, serde::Serialize)]
2235pub struct HydroIrOpMetadata {
2236 #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2237 pub backtrace: Backtrace,
2238 pub cpu_usage: Option<f64>,
2239 pub network_recv_cpu_usage: Option<f64>,
2240 pub id: Option<usize>,
2241}
2242
2243impl HydroIrOpMetadata {
2244 #[expect(
2245 clippy::new_without_default,
2246 reason = "explicit calls to new ensure correct backtrace bounds"
2247 )]
2248 pub fn new() -> HydroIrOpMetadata {
2249 Self::new_with_skip(1)
2250 }
2251
2252 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2253 HydroIrOpMetadata {
2254 backtrace: Backtrace::get_backtrace(2 + skip_count),
2255 cpu_usage: None,
2256 network_recv_cpu_usage: None,
2257 id: None,
2258 }
2259 }
2260}
2261
2262impl Debug for HydroIrOpMetadata {
2263 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2264 f.debug_struct("HydroIrOpMetadata").finish()
2265 }
2266}
2267
2268impl Hash for HydroIrOpMetadata {
2269 fn hash<H: Hasher>(&self, _: &mut H) {}
2270}
2271
2272#[derive(Debug, Hash, serde::Serialize)]
2275pub enum HydroNode {
2276 Placeholder,
2277
2278 Cast {
2286 inner: Box<HydroNode>,
2287 metadata: HydroIrMetadata,
2288 },
2289
2290 ObserveNonDet {
2296 inner: Box<HydroNode>,
2297 trusted: bool, metadata: HydroIrMetadata,
2299 },
2300
2301 Source {
2302 source: HydroSource,
2303 metadata: HydroIrMetadata,
2304 },
2305
2306 SingletonSource {
2307 value: DebugExpr,
2308 first_tick_only: bool,
2309 metadata: HydroIrMetadata,
2310 },
2311
2312 CycleSource {
2313 cycle_id: CycleId,
2314 metadata: HydroIrMetadata,
2315 },
2316
2317 Tee {
2318 inner: SharedNode,
2319 metadata: HydroIrMetadata,
2320 },
2321
2322 Singleton {
2330 inner: SharedNode,
2331 metadata: HydroIrMetadata,
2332 },
2333
2334 Partition {
2335 inner: SharedNode,
2336 f: ClosureExpr,
2337 is_true: bool,
2338 metadata: HydroIrMetadata,
2339 },
2340
2341 BeginAtomic {
2342 inner: Box<HydroNode>,
2343 metadata: HydroIrMetadata,
2344 },
2345
2346 EndAtomic {
2347 inner: Box<HydroNode>,
2348 metadata: HydroIrMetadata,
2349 },
2350
2351 Batch {
2352 inner: Box<HydroNode>,
2353 metadata: HydroIrMetadata,
2354 },
2355
2356 YieldConcat {
2357 inner: Box<HydroNode>,
2358 metadata: HydroIrMetadata,
2359 },
2360
2361 Chain {
2362 first: Box<HydroNode>,
2363 second: Box<HydroNode>,
2364 metadata: HydroIrMetadata,
2365 },
2366
2367 MergeOrdered {
2368 first: Box<HydroNode>,
2369 second: Box<HydroNode>,
2370 metadata: HydroIrMetadata,
2371 },
2372
2373 ChainFirst {
2374 first: Box<HydroNode>,
2375 second: Box<HydroNode>,
2376 metadata: HydroIrMetadata,
2377 },
2378
2379 CrossProduct {
2380 left: Box<HydroNode>,
2381 right: Box<HydroNode>,
2382 metadata: HydroIrMetadata,
2383 },
2384
2385 CrossSingleton {
2386 left: Box<HydroNode>,
2387 right: Box<HydroNode>,
2388 metadata: HydroIrMetadata,
2389 },
2390
2391 Join {
2392 left: Box<HydroNode>,
2393 right: Box<HydroNode>,
2394 metadata: HydroIrMetadata,
2395 },
2396
2397 JoinHalf {
2401 left: Box<HydroNode>,
2402 right: Box<HydroNode>,
2403 metadata: HydroIrMetadata,
2404 },
2405
2406 Difference {
2407 pos: Box<HydroNode>,
2408 neg: Box<HydroNode>,
2409 metadata: HydroIrMetadata,
2410 },
2411
2412 AntiJoin {
2413 pos: Box<HydroNode>,
2414 neg: Box<HydroNode>,
2415 metadata: HydroIrMetadata,
2416 },
2417
2418 ResolveFutures {
2419 input: Box<HydroNode>,
2420 metadata: HydroIrMetadata,
2421 },
2422 ResolveFuturesBlocking {
2423 input: Box<HydroNode>,
2424 metadata: HydroIrMetadata,
2425 },
2426 ResolveFuturesOrdered {
2427 input: Box<HydroNode>,
2428 metadata: HydroIrMetadata,
2429 },
2430
2431 Map {
2432 f: ClosureExpr,
2433 input: Box<HydroNode>,
2434 metadata: HydroIrMetadata,
2435 },
2436 FlatMap {
2437 f: ClosureExpr,
2438 input: Box<HydroNode>,
2439 metadata: HydroIrMetadata,
2440 },
2441 FlatMapStreamBlocking {
2442 f: ClosureExpr,
2443 input: Box<HydroNode>,
2444 metadata: HydroIrMetadata,
2445 },
2446 Filter {
2447 f: ClosureExpr,
2448 input: Box<HydroNode>,
2449 metadata: HydroIrMetadata,
2450 },
2451 FilterMap {
2452 f: ClosureExpr,
2453 input: Box<HydroNode>,
2454 metadata: HydroIrMetadata,
2455 },
2456
2457 DeferTick {
2458 input: Box<HydroNode>,
2459 metadata: HydroIrMetadata,
2460 },
2461 Enumerate {
2462 input: Box<HydroNode>,
2463 metadata: HydroIrMetadata,
2464 },
2465 Inspect {
2466 f: ClosureExpr,
2467 input: Box<HydroNode>,
2468 metadata: HydroIrMetadata,
2469 },
2470
2471 Unique {
2472 input: Box<HydroNode>,
2473 metadata: HydroIrMetadata,
2474 },
2475
2476 Sort {
2477 input: Box<HydroNode>,
2478 metadata: HydroIrMetadata,
2479 },
2480 Fold {
2481 init: ClosureExpr,
2482 acc: ClosureExpr,
2483 input: Box<HydroNode>,
2484 metadata: HydroIrMetadata,
2485 },
2486
2487 Scan {
2488 init: ClosureExpr,
2489 acc: ClosureExpr,
2490 input: Box<HydroNode>,
2491 metadata: HydroIrMetadata,
2492 },
2493 ScanAsyncBlocking {
2494 init: ClosureExpr,
2495 acc: ClosureExpr,
2496 input: Box<HydroNode>,
2497 metadata: HydroIrMetadata,
2498 },
2499 FoldKeyed {
2500 init: ClosureExpr,
2501 acc: ClosureExpr,
2502 input: Box<HydroNode>,
2503 metadata: HydroIrMetadata,
2504 },
2505
2506 Reduce {
2507 f: ClosureExpr,
2508 input: Box<HydroNode>,
2509 metadata: HydroIrMetadata,
2510 },
2511 ReduceKeyed {
2512 f: ClosureExpr,
2513 input: Box<HydroNode>,
2514 metadata: HydroIrMetadata,
2515 },
2516 ReduceKeyedWatermark {
2517 f: ClosureExpr,
2518 input: Box<HydroNode>,
2519 watermark: Box<HydroNode>,
2520 metadata: HydroIrMetadata,
2521 },
2522
2523 Network {
2524 name: Option<String>,
2525 networking_info: crate::networking::NetworkingInfo,
2526 serialize_fn: Option<DebugExpr>,
2527 instantiate_fn: DebugInstantiate,
2528 deserialize_fn: Option<DebugExpr>,
2529 input: Box<HydroNode>,
2530 metadata: HydroIrMetadata,
2531 },
2532
2533 ExternalInput {
2534 from_external_key: LocationKey,
2535 from_port_id: ExternalPortId,
2536 from_many: bool,
2537 codec_type: DebugType,
2538 #[serde(skip)]
2539 port_hint: NetworkHint,
2540 instantiate_fn: DebugInstantiate,
2541 deserialize_fn: Option<DebugExpr>,
2542 metadata: HydroIrMetadata,
2543 },
2544
2545 Counter {
2546 tag: String,
2547 duration: DebugExpr,
2548 prefix: String,
2549 input: Box<HydroNode>,
2550 metadata: HydroIrMetadata,
2551 },
2552
2553 AssertIsConsistent {
2554 inner: Box<HydroNode>,
2555 trusted: bool,
2556 metadata: HydroIrMetadata,
2557 },
2558
2559 UnboundSingleton {
2560 inner: Box<HydroNode>,
2561 metadata: HydroIrMetadata,
2562 },
2563}
2564
2565pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2566pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2567
2568impl HydroNode {
2569 pub fn transform_bottom_up(
2570 &mut self,
2571 transform: &mut impl FnMut(&mut HydroNode),
2572 seen_tees: &mut SeenSharedNodes,
2573 check_well_formed: bool,
2574 ) {
2575 self.transform_children(
2576 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2577 seen_tees,
2578 );
2579
2580 transform(self);
2581
2582 let self_location = self.metadata().location_id.root();
2583
2584 if check_well_formed {
2585 match &*self {
2586 HydroNode::Network { .. } => {}
2587 _ => {
2588 self.input_metadata().iter().for_each(|i| {
2589 if i.location_id.root() != self_location {
2590 panic!(
2591 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2592 i,
2593 i.location_id.root(),
2594 self,
2595 self_location
2596 )
2597 }
2598 });
2599 }
2600 }
2601 }
2602 }
2603
2604 #[inline(always)]
2605 pub fn transform_children(
2606 &mut self,
2607 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2608 seen_tees: &mut SeenSharedNodes,
2609 ) {
2610 match self {
2611 HydroNode::Placeholder => {
2612 panic!();
2613 }
2614
2615 HydroNode::Source { .. }
2616 | HydroNode::SingletonSource { .. }
2617 | HydroNode::CycleSource { .. }
2618 | HydroNode::ExternalInput { .. } => {}
2619
2620 HydroNode::Tee { inner, .. } | HydroNode::Singleton { inner, .. } => {
2621 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2622 *inner = SharedNode(transformed.clone());
2623 } else {
2624 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2625 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2626 let mut orig = inner.0.replace(HydroNode::Placeholder);
2627 transform(&mut orig, seen_tees);
2628 *transformed_cell.borrow_mut() = orig;
2629 *inner = SharedNode(transformed_cell);
2630 }
2631 }
2632
2633 HydroNode::Partition { inner, f, .. } => {
2634 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2635 *inner = SharedNode(transformed.clone());
2636 } else {
2637 f.transform_children(&mut transform, seen_tees);
2638 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2639 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2640 let mut orig = inner.0.replace(HydroNode::Placeholder);
2641 transform(&mut orig, seen_tees);
2642 *transformed_cell.borrow_mut() = orig;
2643 *inner = SharedNode(transformed_cell);
2644 }
2645 }
2646
2647 HydroNode::Cast { inner, .. }
2648 | HydroNode::ObserveNonDet { inner, .. }
2649 | HydroNode::BeginAtomic { inner, .. }
2650 | HydroNode::EndAtomic { inner, .. }
2651 | HydroNode::Batch { inner, .. }
2652 | HydroNode::YieldConcat { inner, .. }
2653 | HydroNode::UnboundSingleton { inner, .. }
2654 | HydroNode::AssertIsConsistent { inner, .. } => {
2655 transform(inner.as_mut(), seen_tees);
2656 }
2657
2658 HydroNode::Chain { first, second, .. } => {
2659 transform(first.as_mut(), seen_tees);
2660 transform(second.as_mut(), seen_tees);
2661 }
2662
2663 HydroNode::MergeOrdered { first, second, .. } => {
2664 transform(first.as_mut(), seen_tees);
2665 transform(second.as_mut(), seen_tees);
2666 }
2667
2668 HydroNode::ChainFirst { first, second, .. } => {
2669 transform(first.as_mut(), seen_tees);
2670 transform(second.as_mut(), seen_tees);
2671 }
2672
2673 HydroNode::CrossSingleton { left, right, .. }
2674 | HydroNode::CrossProduct { left, right, .. }
2675 | HydroNode::Join { left, right, .. }
2676 | HydroNode::JoinHalf { left, right, .. } => {
2677 transform(left.as_mut(), seen_tees);
2678 transform(right.as_mut(), seen_tees);
2679 }
2680
2681 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2682 transform(pos.as_mut(), seen_tees);
2683 transform(neg.as_mut(), seen_tees);
2684 }
2685
2686 HydroNode::Map { f, input, .. } => {
2687 f.transform_children(&mut transform, seen_tees);
2688 transform(input.as_mut(), seen_tees);
2689 }
2690 HydroNode::FlatMap { f, input, .. }
2691 | HydroNode::FlatMapStreamBlocking { f, input, .. }
2692 | HydroNode::Filter { f, input, .. }
2693 | HydroNode::FilterMap { f, input, .. }
2694 | HydroNode::Inspect { f, input, .. }
2695 | HydroNode::Reduce { f, input, .. }
2696 | HydroNode::ReduceKeyed { f, input, .. } => {
2697 f.transform_children(&mut transform, seen_tees);
2698 transform(input.as_mut(), seen_tees);
2699 }
2700 HydroNode::ReduceKeyedWatermark {
2701 f,
2702 input,
2703 watermark,
2704 ..
2705 } => {
2706 f.transform_children(&mut transform, seen_tees);
2707 transform(input.as_mut(), seen_tees);
2708 transform(watermark.as_mut(), seen_tees);
2709 }
2710 HydroNode::Fold {
2711 init, acc, input, ..
2712 }
2713 | HydroNode::Scan {
2714 init, acc, input, ..
2715 }
2716 | HydroNode::ScanAsyncBlocking {
2717 init, acc, input, ..
2718 }
2719 | HydroNode::FoldKeyed {
2720 init, acc, input, ..
2721 } => {
2722 init.transform_children(&mut transform, seen_tees);
2723 acc.transform_children(&mut transform, seen_tees);
2724 transform(input.as_mut(), seen_tees);
2725 }
2726 HydroNode::ResolveFutures { input, .. }
2727 | HydroNode::ResolveFuturesBlocking { input, .. }
2728 | HydroNode::ResolveFuturesOrdered { input, .. }
2729 | HydroNode::Sort { input, .. }
2730 | HydroNode::DeferTick { input, .. }
2731 | HydroNode::Enumerate { input, .. }
2732 | HydroNode::Unique { input, .. }
2733 | HydroNode::Network { input, .. }
2734 | HydroNode::Counter { input, .. } => {
2735 transform(input.as_mut(), seen_tees);
2736 }
2737 }
2738 }
2739
2740 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2741 match self {
2742 HydroNode::Placeholder => HydroNode::Placeholder,
2743 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2744 inner: Box::new(inner.deep_clone(seen_tees)),
2745 metadata: metadata.clone(),
2746 },
2747 HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2748 inner: Box::new(inner.deep_clone(seen_tees)),
2749 metadata: metadata.clone(),
2750 },
2751 HydroNode::ObserveNonDet {
2752 inner,
2753 trusted,
2754 metadata,
2755 } => HydroNode::ObserveNonDet {
2756 inner: Box::new(inner.deep_clone(seen_tees)),
2757 trusted: *trusted,
2758 metadata: metadata.clone(),
2759 },
2760 HydroNode::AssertIsConsistent {
2761 inner,
2762 trusted,
2763 metadata,
2764 } => HydroNode::AssertIsConsistent {
2765 inner: Box::new(inner.deep_clone(seen_tees)),
2766 trusted: *trusted,
2767 metadata: metadata.clone(),
2768 },
2769 HydroNode::Source { source, metadata } => HydroNode::Source {
2770 source: source.clone(),
2771 metadata: metadata.clone(),
2772 },
2773 HydroNode::SingletonSource {
2774 value,
2775 first_tick_only,
2776 metadata,
2777 } => HydroNode::SingletonSource {
2778 value: value.clone(),
2779 first_tick_only: *first_tick_only,
2780 metadata: metadata.clone(),
2781 },
2782 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2783 cycle_id: *cycle_id,
2784 metadata: metadata.clone(),
2785 },
2786 HydroNode::Tee { inner, metadata } | HydroNode::Singleton { inner, metadata } => {
2787 let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2788 SharedNode(transformed.clone())
2789 } else {
2790 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2791 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2792 let cloned = inner.0.borrow().deep_clone(seen_tees);
2793 *new_rc.borrow_mut() = cloned;
2794 SharedNode(new_rc)
2795 };
2796 if matches!(self, HydroNode::Singleton { .. }) {
2797 HydroNode::Singleton {
2798 inner: cloned_inner,
2799 metadata: metadata.clone(),
2800 }
2801 } else {
2802 HydroNode::Tee {
2803 inner: cloned_inner,
2804 metadata: metadata.clone(),
2805 }
2806 }
2807 }
2808 HydroNode::Partition {
2809 inner,
2810 f,
2811 is_true,
2812 metadata,
2813 } => {
2814 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2815 HydroNode::Partition {
2816 inner: SharedNode(transformed.clone()),
2817 f: f.deep_clone(seen_tees),
2818 is_true: *is_true,
2819 metadata: metadata.clone(),
2820 }
2821 } else {
2822 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2823 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2824 let cloned = inner.0.borrow().deep_clone(seen_tees);
2825 *new_rc.borrow_mut() = cloned;
2826 HydroNode::Partition {
2827 inner: SharedNode(new_rc),
2828 f: f.deep_clone(seen_tees),
2829 is_true: *is_true,
2830 metadata: metadata.clone(),
2831 }
2832 }
2833 }
2834 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2835 inner: Box::new(inner.deep_clone(seen_tees)),
2836 metadata: metadata.clone(),
2837 },
2838 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2839 inner: Box::new(inner.deep_clone(seen_tees)),
2840 metadata: metadata.clone(),
2841 },
2842 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2843 inner: Box::new(inner.deep_clone(seen_tees)),
2844 metadata: metadata.clone(),
2845 },
2846 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2847 inner: Box::new(inner.deep_clone(seen_tees)),
2848 metadata: metadata.clone(),
2849 },
2850 HydroNode::Chain {
2851 first,
2852 second,
2853 metadata,
2854 } => HydroNode::Chain {
2855 first: Box::new(first.deep_clone(seen_tees)),
2856 second: Box::new(second.deep_clone(seen_tees)),
2857 metadata: metadata.clone(),
2858 },
2859 HydroNode::MergeOrdered {
2860 first,
2861 second,
2862 metadata,
2863 } => HydroNode::MergeOrdered {
2864 first: Box::new(first.deep_clone(seen_tees)),
2865 second: Box::new(second.deep_clone(seen_tees)),
2866 metadata: metadata.clone(),
2867 },
2868 HydroNode::ChainFirst {
2869 first,
2870 second,
2871 metadata,
2872 } => HydroNode::ChainFirst {
2873 first: Box::new(first.deep_clone(seen_tees)),
2874 second: Box::new(second.deep_clone(seen_tees)),
2875 metadata: metadata.clone(),
2876 },
2877 HydroNode::CrossProduct {
2878 left,
2879 right,
2880 metadata,
2881 } => HydroNode::CrossProduct {
2882 left: Box::new(left.deep_clone(seen_tees)),
2883 right: Box::new(right.deep_clone(seen_tees)),
2884 metadata: metadata.clone(),
2885 },
2886 HydroNode::CrossSingleton {
2887 left,
2888 right,
2889 metadata,
2890 } => HydroNode::CrossSingleton {
2891 left: Box::new(left.deep_clone(seen_tees)),
2892 right: Box::new(right.deep_clone(seen_tees)),
2893 metadata: metadata.clone(),
2894 },
2895 HydroNode::Join {
2896 left,
2897 right,
2898 metadata,
2899 } => HydroNode::Join {
2900 left: Box::new(left.deep_clone(seen_tees)),
2901 right: Box::new(right.deep_clone(seen_tees)),
2902 metadata: metadata.clone(),
2903 },
2904 HydroNode::JoinHalf {
2905 left,
2906 right,
2907 metadata,
2908 } => HydroNode::JoinHalf {
2909 left: Box::new(left.deep_clone(seen_tees)),
2910 right: Box::new(right.deep_clone(seen_tees)),
2911 metadata: metadata.clone(),
2912 },
2913 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2914 pos: Box::new(pos.deep_clone(seen_tees)),
2915 neg: Box::new(neg.deep_clone(seen_tees)),
2916 metadata: metadata.clone(),
2917 },
2918 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2919 pos: Box::new(pos.deep_clone(seen_tees)),
2920 neg: Box::new(neg.deep_clone(seen_tees)),
2921 metadata: metadata.clone(),
2922 },
2923 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2924 input: Box::new(input.deep_clone(seen_tees)),
2925 metadata: metadata.clone(),
2926 },
2927 HydroNode::ResolveFuturesBlocking { input, metadata } => {
2928 HydroNode::ResolveFuturesBlocking {
2929 input: Box::new(input.deep_clone(seen_tees)),
2930 metadata: metadata.clone(),
2931 }
2932 }
2933 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2934 HydroNode::ResolveFuturesOrdered {
2935 input: Box::new(input.deep_clone(seen_tees)),
2936 metadata: metadata.clone(),
2937 }
2938 }
2939 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2940 f: f.deep_clone(seen_tees),
2941 input: Box::new(input.deep_clone(seen_tees)),
2942 metadata: metadata.clone(),
2943 },
2944 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2945 f: f.deep_clone(seen_tees),
2946 input: Box::new(input.deep_clone(seen_tees)),
2947 metadata: metadata.clone(),
2948 },
2949 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2950 HydroNode::FlatMapStreamBlocking {
2951 f: f.deep_clone(seen_tees),
2952 input: Box::new(input.deep_clone(seen_tees)),
2953 metadata: metadata.clone(),
2954 }
2955 }
2956 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2957 f: f.deep_clone(seen_tees),
2958 input: Box::new(input.deep_clone(seen_tees)),
2959 metadata: metadata.clone(),
2960 },
2961 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2962 f: f.deep_clone(seen_tees),
2963 input: Box::new(input.deep_clone(seen_tees)),
2964 metadata: metadata.clone(),
2965 },
2966 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2967 input: Box::new(input.deep_clone(seen_tees)),
2968 metadata: metadata.clone(),
2969 },
2970 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2971 input: Box::new(input.deep_clone(seen_tees)),
2972 metadata: metadata.clone(),
2973 },
2974 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2975 f: f.deep_clone(seen_tees),
2976 input: Box::new(input.deep_clone(seen_tees)),
2977 metadata: metadata.clone(),
2978 },
2979 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2980 input: Box::new(input.deep_clone(seen_tees)),
2981 metadata: metadata.clone(),
2982 },
2983 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2984 input: Box::new(input.deep_clone(seen_tees)),
2985 metadata: metadata.clone(),
2986 },
2987 HydroNode::Fold {
2988 init,
2989 acc,
2990 input,
2991 metadata,
2992 } => HydroNode::Fold {
2993 init: init.deep_clone(seen_tees),
2994 acc: acc.deep_clone(seen_tees),
2995 input: Box::new(input.deep_clone(seen_tees)),
2996 metadata: metadata.clone(),
2997 },
2998 HydroNode::Scan {
2999 init,
3000 acc,
3001 input,
3002 metadata,
3003 } => HydroNode::Scan {
3004 init: init.deep_clone(seen_tees),
3005 acc: acc.deep_clone(seen_tees),
3006 input: Box::new(input.deep_clone(seen_tees)),
3007 metadata: metadata.clone(),
3008 },
3009 HydroNode::ScanAsyncBlocking {
3010 init,
3011 acc,
3012 input,
3013 metadata,
3014 } => HydroNode::ScanAsyncBlocking {
3015 init: init.deep_clone(seen_tees),
3016 acc: acc.deep_clone(seen_tees),
3017 input: Box::new(input.deep_clone(seen_tees)),
3018 metadata: metadata.clone(),
3019 },
3020 HydroNode::FoldKeyed {
3021 init,
3022 acc,
3023 input,
3024 metadata,
3025 } => HydroNode::FoldKeyed {
3026 init: init.deep_clone(seen_tees),
3027 acc: acc.deep_clone(seen_tees),
3028 input: Box::new(input.deep_clone(seen_tees)),
3029 metadata: metadata.clone(),
3030 },
3031 HydroNode::ReduceKeyedWatermark {
3032 f,
3033 input,
3034 watermark,
3035 metadata,
3036 } => HydroNode::ReduceKeyedWatermark {
3037 f: f.deep_clone(seen_tees),
3038 input: Box::new(input.deep_clone(seen_tees)),
3039 watermark: Box::new(watermark.deep_clone(seen_tees)),
3040 metadata: metadata.clone(),
3041 },
3042 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
3043 f: f.deep_clone(seen_tees),
3044 input: Box::new(input.deep_clone(seen_tees)),
3045 metadata: metadata.clone(),
3046 },
3047 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3048 f: f.deep_clone(seen_tees),
3049 input: Box::new(input.deep_clone(seen_tees)),
3050 metadata: metadata.clone(),
3051 },
3052 HydroNode::Network {
3053 name,
3054 networking_info,
3055 serialize_fn,
3056 instantiate_fn,
3057 deserialize_fn,
3058 input,
3059 metadata,
3060 } => HydroNode::Network {
3061 name: name.clone(),
3062 networking_info: networking_info.clone(),
3063 serialize_fn: serialize_fn.clone(),
3064 instantiate_fn: instantiate_fn.clone(),
3065 deserialize_fn: deserialize_fn.clone(),
3066 input: Box::new(input.deep_clone(seen_tees)),
3067 metadata: metadata.clone(),
3068 },
3069 HydroNode::ExternalInput {
3070 from_external_key,
3071 from_port_id,
3072 from_many,
3073 codec_type,
3074 port_hint,
3075 instantiate_fn,
3076 deserialize_fn,
3077 metadata,
3078 } => HydroNode::ExternalInput {
3079 from_external_key: *from_external_key,
3080 from_port_id: *from_port_id,
3081 from_many: *from_many,
3082 codec_type: codec_type.clone(),
3083 port_hint: *port_hint,
3084 instantiate_fn: instantiate_fn.clone(),
3085 deserialize_fn: deserialize_fn.clone(),
3086 metadata: metadata.clone(),
3087 },
3088 HydroNode::Counter {
3089 tag,
3090 duration,
3091 prefix,
3092 input,
3093 metadata,
3094 } => HydroNode::Counter {
3095 tag: tag.clone(),
3096 duration: duration.clone(),
3097 prefix: prefix.clone(),
3098 input: Box::new(input.deep_clone(seen_tees)),
3099 metadata: metadata.clone(),
3100 },
3101 }
3102 }
3103
3104 #[cfg(feature = "build")]
3105 pub fn emit_core(
3106 &mut self,
3107 builders_or_callback: &mut BuildersOrCallback<
3108 impl FnMut(&mut HydroRoot, &mut StmtId),
3109 impl FnMut(&mut HydroNode, &mut StmtId),
3110 >,
3111 seen_tees: &mut SeenSharedNodes,
3112 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3113 next_stmt_id: &mut StmtId,
3114 fold_hooked_idents: &mut HashSet<String>,
3115 ) -> syn::Ident {
3116 let mut ident_stack: Vec<syn::Ident> = Vec::new();
3117
3118 self.transform_bottom_up(
3119 &mut |node: &mut HydroNode| {
3120 let out_location = node.metadata().location_id.clone();
3121 match node {
3122 HydroNode::Placeholder => {
3123 panic!()
3124 }
3125
3126 HydroNode::Cast { .. } => {
3127 match builders_or_callback {
3130 BuildersOrCallback::Builders(_) => {}
3131 BuildersOrCallback::Callback(_, node_callback) => {
3132 node_callback(node, next_stmt_id);
3133 }
3134 }
3135
3136 let _ = next_stmt_id.get_and_increment();
3137 }
3139
3140 HydroNode::UnboundSingleton { .. } => {
3141 let inner_ident = ident_stack.pop().unwrap();
3142
3143 let out_ident =
3144 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3145
3146 match builders_or_callback {
3147 BuildersOrCallback::Builders(graph_builders) => {
3148 if graph_builders.singleton_intermediates() {
3149 let builder = graph_builders.get_dfir_mut(&out_location);
3150 builder.add_dfir(
3151 parse_quote! {
3152 #out_ident = #inner_ident;
3153 },
3154 None,
3155 None,
3156 );
3157 } else {
3158 let builder = graph_builders.get_dfir_mut(&out_location);
3159 builder.add_dfir(
3160 parse_quote! {
3161 #out_ident = #inner_ident -> persist::<'static>();
3162 },
3163 None,
3164 None,
3165 );
3166 }
3167 }
3168 BuildersOrCallback::Callback(_, node_callback) => {
3169 node_callback(node, next_stmt_id);
3170 }
3171 }
3172
3173 let _ = next_stmt_id.get_and_increment();
3174
3175 ident_stack.push(out_ident);
3176 }
3177
3178 HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3179 let inner_ident = ident_stack.pop().unwrap();
3180
3181 let out_ident =
3182 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3183
3184 match builders_or_callback {
3185 BuildersOrCallback::Builders(graph_builders) => {
3186 graph_builders.assert_is_consistent(
3187 *trusted,
3188 &inner.metadata().location_id,
3189 inner_ident,
3190 &out_ident,
3191 );
3192 }
3193 BuildersOrCallback::Callback(_, node_callback) => {
3194 node_callback(node, next_stmt_id);
3195 }
3196 }
3197
3198 let _ = next_stmt_id.get_and_increment();
3199
3200 ident_stack.push(out_ident);
3201 }
3202
3203 HydroNode::ObserveNonDet {
3204 inner,
3205 trusted,
3206 metadata,
3207 ..
3208 } => {
3209 let inner_ident = ident_stack.pop().unwrap();
3210
3211 let observe_ident =
3212 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3213
3214 match builders_or_callback {
3215 BuildersOrCallback::Builders(graph_builders) => {
3216 graph_builders.observe_nondet(
3217 *trusted,
3218 &inner.metadata().location_id,
3219 inner_ident,
3220 &inner.metadata().collection_kind,
3221 &observe_ident,
3222 &metadata.collection_kind,
3223 &metadata.op,
3224 );
3225 }
3226 BuildersOrCallback::Callback(_, node_callback) => {
3227 node_callback(node, next_stmt_id);
3228 }
3229 }
3230
3231 let _ = next_stmt_id.get_and_increment();
3232
3233 ident_stack.push(observe_ident);
3234 }
3235
3236 HydroNode::Batch {
3237 inner, metadata, ..
3238 } => {
3239 let inner_ident = ident_stack.pop().unwrap();
3240
3241 let batch_ident =
3242 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3243
3244 match builders_or_callback {
3245 BuildersOrCallback::Builders(graph_builders) => {
3246 graph_builders.batch(
3247 inner_ident,
3248 &inner.metadata().location_id,
3249 &inner.metadata().collection_kind,
3250 &batch_ident,
3251 &out_location,
3252 &metadata.op,
3253 fold_hooked_idents,
3254 );
3255 }
3256 BuildersOrCallback::Callback(_, node_callback) => {
3257 node_callback(node, next_stmt_id);
3258 }
3259 }
3260
3261 let _ = next_stmt_id.get_and_increment();
3262
3263 ident_stack.push(batch_ident);
3264 }
3265
3266 HydroNode::YieldConcat { inner, .. } => {
3267 let inner_ident = ident_stack.pop().unwrap();
3268
3269 let yield_ident =
3270 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3271
3272 match builders_or_callback {
3273 BuildersOrCallback::Builders(graph_builders) => {
3274 graph_builders.yield_from_tick(
3275 inner_ident,
3276 &inner.metadata().location_id,
3277 &inner.metadata().collection_kind,
3278 &yield_ident,
3279 &out_location,
3280 );
3281 }
3282 BuildersOrCallback::Callback(_, node_callback) => {
3283 node_callback(node, next_stmt_id);
3284 }
3285 }
3286
3287 let _ = next_stmt_id.get_and_increment();
3288
3289 ident_stack.push(yield_ident);
3290 }
3291
3292 HydroNode::BeginAtomic { inner, metadata } => {
3293 let inner_ident = ident_stack.pop().unwrap();
3294
3295 let begin_ident =
3296 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3297
3298 match builders_or_callback {
3299 BuildersOrCallback::Builders(graph_builders) => {
3300 graph_builders.begin_atomic(
3301 inner_ident,
3302 &inner.metadata().location_id,
3303 &inner.metadata().collection_kind,
3304 &begin_ident,
3305 &out_location,
3306 &metadata.op,
3307 );
3308 }
3309 BuildersOrCallback::Callback(_, node_callback) => {
3310 node_callback(node, next_stmt_id);
3311 }
3312 }
3313
3314 let _ = next_stmt_id.get_and_increment();
3315
3316 ident_stack.push(begin_ident);
3317 }
3318
3319 HydroNode::EndAtomic { inner, .. } => {
3320 let inner_ident = ident_stack.pop().unwrap();
3321
3322 let end_ident =
3323 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3324
3325 match builders_or_callback {
3326 BuildersOrCallback::Builders(graph_builders) => {
3327 graph_builders.end_atomic(
3328 inner_ident,
3329 &inner.metadata().location_id,
3330 &inner.metadata().collection_kind,
3331 &end_ident,
3332 );
3333 }
3334 BuildersOrCallback::Callback(_, node_callback) => {
3335 node_callback(node, next_stmt_id);
3336 }
3337 }
3338
3339 let _ = next_stmt_id.get_and_increment();
3340
3341 ident_stack.push(end_ident);
3342 }
3343
3344 HydroNode::Source {
3345 source, metadata, ..
3346 } => {
3347 if let HydroSource::ExternalNetwork() = source {
3348 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3349 } else {
3350 let source_ident =
3351 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3352
3353 let source_stmt = match source {
3354 HydroSource::Stream(expr) => {
3355 debug_assert!(metadata.location_id.is_top_level());
3356 parse_quote! {
3357 #source_ident = source_stream(#expr);
3358 }
3359 }
3360
3361 HydroSource::ExternalNetwork() => {
3362 unreachable!()
3363 }
3364
3365 HydroSource::Iter(expr) => {
3366 if metadata.location_id.is_top_level() {
3367 parse_quote! {
3368 #source_ident = source_iter(#expr);
3369 }
3370 } else {
3371 parse_quote! {
3373 #source_ident = source_iter(#expr) -> persist::<'static>();
3374 }
3375 }
3376 }
3377
3378 HydroSource::Spin() => {
3379 debug_assert!(metadata.location_id.is_top_level());
3380 parse_quote! {
3381 #source_ident = spin();
3382 }
3383 }
3384
3385 HydroSource::ClusterMembers(target_loc, state) => {
3386 debug_assert!(metadata.location_id.is_top_level());
3387
3388 let members_tee_ident = syn::Ident::new(
3389 &format!(
3390 "__cluster_members_tee_{}_{}",
3391 metadata.location_id.root().key(),
3392 target_loc.key(),
3393 ),
3394 Span::call_site(),
3395 );
3396
3397 match state {
3398 ClusterMembersState::Stream(d) => {
3399 parse_quote! {
3400 #members_tee_ident = source_stream(#d) -> tee();
3401 #source_ident = #members_tee_ident;
3402 }
3403 },
3404 ClusterMembersState::Uninit => syn::parse_quote! {
3405 #source_ident = source_stream(DUMMY);
3406 },
3407 ClusterMembersState::Tee(..) => parse_quote! {
3408 #source_ident = #members_tee_ident;
3409 },
3410 }
3411 }
3412
3413 HydroSource::Embedded(ident) => {
3414 parse_quote! {
3415 #source_ident = source_stream(#ident);
3416 }
3417 }
3418
3419 HydroSource::EmbeddedSingleton(ident) => {
3420 parse_quote! {
3421 #source_ident = source_iter([#ident]);
3422 }
3423 }
3424 };
3425
3426 match builders_or_callback {
3427 BuildersOrCallback::Builders(graph_builders) => {
3428 let builder = graph_builders.get_dfir_mut(&out_location);
3429 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
3430 }
3431 BuildersOrCallback::Callback(_, node_callback) => {
3432 node_callback(node, next_stmt_id);
3433 }
3434 }
3435
3436 let _ = next_stmt_id.get_and_increment();
3437
3438 ident_stack.push(source_ident);
3439 }
3440 }
3441
3442 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3443 let source_ident =
3444 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3445
3446 match builders_or_callback {
3447 BuildersOrCallback::Builders(graph_builders) => {
3448 let builder = graph_builders.get_dfir_mut(&out_location);
3449
3450 if *first_tick_only {
3451 assert!(
3452 !metadata.location_id.is_top_level(),
3453 "first_tick_only SingletonSource must be inside a tick"
3454 );
3455 }
3456
3457 if *first_tick_only
3458 || (metadata.location_id.is_top_level()
3459 && metadata.collection_kind.is_bounded())
3460 {
3461 builder.add_dfir(
3462 parse_quote! {
3463 #source_ident = source_iter([#value]);
3464 },
3465 None,
3466 Some(&next_stmt_id.to_string()),
3467 );
3468 } else {
3469 builder.add_dfir(
3470 parse_quote! {
3471 #source_ident = source_iter([#value]) -> persist::<'static>();
3472 },
3473 None,
3474 Some(&next_stmt_id.to_string()),
3475 );
3476 }
3477 }
3478 BuildersOrCallback::Callback(_, node_callback) => {
3479 node_callback(node, next_stmt_id);
3480 }
3481 }
3482
3483 let _ = next_stmt_id.get_and_increment();
3484
3485 ident_stack.push(source_ident);
3486 }
3487
3488 HydroNode::CycleSource { cycle_id, .. } => {
3489 let ident = cycle_id.as_ident();
3490
3491 match builders_or_callback {
3492 BuildersOrCallback::Builders(_) => {}
3493 BuildersOrCallback::Callback(_, node_callback) => {
3494 node_callback(node, next_stmt_id);
3495 }
3496 }
3497
3498 let _ = next_stmt_id.get_and_increment();
3500
3501 ident_stack.push(ident);
3502 }
3503
3504 HydroNode::Tee { inner, .. } => {
3505 let ret_ident = if let Some(built_idents) =
3506 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3507 {
3508 match builders_or_callback {
3509 BuildersOrCallback::Builders(_) => {}
3510 BuildersOrCallback::Callback(_, node_callback) => {
3511 node_callback(node, next_stmt_id);
3512 }
3513 }
3514
3515 built_idents[0].clone()
3516 } else {
3517 let inner_ident = ident_stack.pop().unwrap();
3520
3521 let tee_ident =
3522 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3523
3524 built_tees.insert(
3525 inner.0.as_ref() as *const RefCell<HydroNode>,
3526 vec![tee_ident.clone()],
3527 );
3528
3529 match builders_or_callback {
3530 BuildersOrCallback::Builders(graph_builders) => {
3531 if fold_hooked_idents.contains(&inner_ident.to_string()) {
3543 fold_hooked_idents.insert(tee_ident.to_string());
3544 }
3545 let builder = graph_builders.get_dfir_mut(&out_location);
3546 builder.add_dfir(
3547 parse_quote! {
3548 #tee_ident = #inner_ident -> tee();
3549 },
3550 None,
3551 Some(&next_stmt_id.to_string()),
3552 );
3553 }
3554 BuildersOrCallback::Callback(_, node_callback) => {
3555 node_callback(node, next_stmt_id);
3556 }
3557 }
3558
3559 tee_ident
3560 };
3561
3562 let _ = next_stmt_id.get_and_increment();
3566 ident_stack.push(ret_ident);
3567 }
3568
3569 HydroNode::Singleton { inner, .. } => {
3570 let ret_ident = if let Some(built_idents) =
3571 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3572 {
3573 built_idents[0].clone()
3574 } else {
3575 let inner_ident = ident_stack.pop().unwrap();
3576
3577 let singleton_ident =
3578 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3579
3580 built_tees.insert(
3581 inner.0.as_ref() as *const RefCell<HydroNode>,
3582 vec![singleton_ident.clone()],
3583 );
3584
3585 match builders_or_callback {
3586 BuildersOrCallback::Builders(graph_builders) => {
3587 let builder = graph_builders.get_dfir_mut(&out_location);
3588 builder.add_dfir(
3589 parse_quote! {
3590 #singleton_ident = #inner_ident -> singleton();
3591 },
3592 None,
3593 Some(&next_stmt_id.to_string()),
3594 );
3595 }
3596 BuildersOrCallback::Callback(_, node_callback) => {
3597 node_callback(node, next_stmt_id);
3598 }
3599 }
3600
3601 singleton_ident
3602 };
3603
3604 let _ = next_stmt_id.get_and_increment();
3607 ident_stack.push(ret_ident);
3608 }
3609
3610 HydroNode::Partition {
3611 inner, f, is_true, ..
3612 } => {
3613 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3615 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3616 match builders_or_callback {
3617 BuildersOrCallback::Builders(_) => {}
3618 BuildersOrCallback::Callback(_, node_callback) => {
3619 node_callback(node, next_stmt_id);
3620 }
3621 }
3622
3623 let idx = if is_true { 0 } else { 1 };
3624 built_idents[idx].clone()
3625 } else {
3626 let inner_ident = ident_stack.pop().unwrap();
3629 let f_tokens = f.emit_tokens(&mut ident_stack);
3630
3631 let partition_ident = syn::Ident::new(
3632 &format!("stream_{}_partition", *next_stmt_id),
3633 Span::call_site(),
3634 );
3635 let true_ident = syn::Ident::new(
3636 &format!("stream_{}_true", *next_stmt_id),
3637 Span::call_site(),
3638 );
3639 let false_ident = syn::Ident::new(
3640 &format!("stream_{}_false", *next_stmt_id),
3641 Span::call_site(),
3642 );
3643
3644 built_tees.insert(
3645 ptr,
3646 vec![true_ident.clone(), false_ident.clone()],
3647 );
3648
3649 match builders_or_callback {
3650 BuildersOrCallback::Builders(graph_builders) => {
3651 let builder = graph_builders.get_dfir_mut(&out_location);
3652 builder.add_dfir(
3653 parse_quote! {
3654 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3655 #true_ident = #partition_ident[0];
3656 #false_ident = #partition_ident[1];
3657 },
3658 None,
3659 Some(&next_stmt_id.to_string()),
3660 );
3661 }
3662 BuildersOrCallback::Callback(_, node_callback) => {
3663 node_callback(node, next_stmt_id);
3664 }
3665 }
3666
3667 if is_true { true_ident } else { false_ident }
3668 };
3669
3670 let _ = next_stmt_id.get_and_increment();
3671 ident_stack.push(ret_ident);
3672 }
3673
3674 HydroNode::Chain { .. } => {
3675 let second_ident = ident_stack.pop().unwrap();
3677 let first_ident = ident_stack.pop().unwrap();
3678
3679 let chain_ident =
3680 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3681
3682 match builders_or_callback {
3683 BuildersOrCallback::Builders(graph_builders) => {
3684 let builder = graph_builders.get_dfir_mut(&out_location);
3685 builder.add_dfir(
3686 parse_quote! {
3687 #chain_ident = chain();
3688 #first_ident -> [0]#chain_ident;
3689 #second_ident -> [1]#chain_ident;
3690 },
3691 None,
3692 Some(&next_stmt_id.to_string()),
3693 );
3694 }
3695 BuildersOrCallback::Callback(_, node_callback) => {
3696 node_callback(node, next_stmt_id);
3697 }
3698 }
3699
3700 let _ = next_stmt_id.get_and_increment();
3701
3702 ident_stack.push(chain_ident);
3703 }
3704
3705 HydroNode::MergeOrdered { first, metadata, .. } => {
3706 let second_ident = ident_stack.pop().unwrap();
3707 let first_ident = ident_stack.pop().unwrap();
3708
3709 let merge_ident =
3710 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3711
3712 match builders_or_callback {
3713 BuildersOrCallback::Builders(graph_builders) => {
3714 graph_builders.merge_ordered(
3715 &first.metadata().location_id,
3716 first_ident,
3717 second_ident,
3718 &merge_ident,
3719 &first.metadata().collection_kind,
3720 &metadata.op,
3721 Some(&next_stmt_id.to_string()),
3722 );
3723 }
3724 BuildersOrCallback::Callback(_, node_callback) => {
3725 node_callback(node, next_stmt_id);
3726 }
3727 }
3728
3729 let _ = next_stmt_id.get_and_increment();
3730
3731 ident_stack.push(merge_ident);
3732 }
3733
3734 HydroNode::ChainFirst { .. } => {
3735 let second_ident = ident_stack.pop().unwrap();
3736 let first_ident = ident_stack.pop().unwrap();
3737
3738 let chain_ident =
3739 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3740
3741 match builders_or_callback {
3742 BuildersOrCallback::Builders(graph_builders) => {
3743 let builder = graph_builders.get_dfir_mut(&out_location);
3744 builder.add_dfir(
3745 parse_quote! {
3746 #chain_ident = chain_first_n(1);
3747 #first_ident -> [0]#chain_ident;
3748 #second_ident -> [1]#chain_ident;
3749 },
3750 None,
3751 Some(&next_stmt_id.to_string()),
3752 );
3753 }
3754 BuildersOrCallback::Callback(_, node_callback) => {
3755 node_callback(node, next_stmt_id);
3756 }
3757 }
3758
3759 let _ = next_stmt_id.get_and_increment();
3760
3761 ident_stack.push(chain_ident);
3762 }
3763
3764 HydroNode::CrossSingleton { right, .. } => {
3765 let right_ident = ident_stack.pop().unwrap();
3766 let left_ident = ident_stack.pop().unwrap();
3767
3768 let cross_ident =
3769 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3770
3771 match builders_or_callback {
3772 BuildersOrCallback::Builders(graph_builders) => {
3773 let builder = graph_builders.get_dfir_mut(&out_location);
3774
3775 if right.metadata().location_id.is_top_level()
3776 && right.metadata().collection_kind.is_bounded()
3777 {
3778 builder.add_dfir(
3779 parse_quote! {
3780 #cross_ident = cross_singleton::<'static>();
3781 #left_ident -> [input]#cross_ident;
3782 #right_ident -> [single]#cross_ident;
3783 },
3784 None,
3785 Some(&next_stmt_id.to_string()),
3786 );
3787 } else {
3788 builder.add_dfir(
3789 parse_quote! {
3790 #cross_ident = cross_singleton();
3791 #left_ident -> [input]#cross_ident;
3792 #right_ident -> [single]#cross_ident;
3793 },
3794 None,
3795 Some(&next_stmt_id.to_string()),
3796 );
3797 }
3798 }
3799 BuildersOrCallback::Callback(_, node_callback) => {
3800 node_callback(node, next_stmt_id);
3801 }
3802 }
3803
3804 let _ = next_stmt_id.get_and_increment();
3805
3806 ident_stack.push(cross_ident);
3807 }
3808
3809 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3810 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3811 parse_quote!(cross_join_multiset)
3812 } else {
3813 parse_quote!(join_multiset)
3814 };
3815
3816 let (HydroNode::CrossProduct { left, right, .. }
3817 | HydroNode::Join { left, right, .. }) = node
3818 else {
3819 unreachable!()
3820 };
3821
3822 let is_top_level = left.metadata().location_id.is_top_level()
3823 && right.metadata().location_id.is_top_level();
3824 let left_lifetime = if left.metadata().location_id.is_top_level() {
3825 quote!('static)
3826 } else {
3827 quote!('tick)
3828 };
3829
3830 let right_lifetime = if right.metadata().location_id.is_top_level() {
3831 quote!('static)
3832 } else {
3833 quote!('tick)
3834 };
3835
3836 let right_ident = ident_stack.pop().unwrap();
3837 let left_ident = ident_stack.pop().unwrap();
3838
3839 let stream_ident =
3840 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3841
3842 match builders_or_callback {
3843 BuildersOrCallback::Builders(graph_builders) => {
3844 let builder = graph_builders.get_dfir_mut(&out_location);
3845 builder.add_dfir(
3846 if is_top_level {
3847 parse_quote! {
3850 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3851 #left_ident -> [0]#stream_ident;
3852 #right_ident -> [1]#stream_ident;
3853 }
3854 } else {
3855 parse_quote! {
3856 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3857 #left_ident -> [0]#stream_ident;
3858 #right_ident -> [1]#stream_ident;
3859 }
3860 }
3861 ,
3862 None,
3863 Some(&next_stmt_id.to_string()),
3864 );
3865 }
3866 BuildersOrCallback::Callback(_, node_callback) => {
3867 node_callback(node, next_stmt_id);
3868 }
3869 }
3870
3871 let _ = next_stmt_id.get_and_increment();
3872
3873 ident_stack.push(stream_ident);
3874 }
3875
3876 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3877 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3878 parse_quote!(difference)
3879 } else {
3880 parse_quote!(anti_join)
3881 };
3882
3883 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3884 node
3885 else {
3886 unreachable!()
3887 };
3888
3889 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3890 quote!('static)
3891 } else {
3892 quote!('tick)
3893 };
3894
3895 let neg_ident = ident_stack.pop().unwrap();
3896 let pos_ident = ident_stack.pop().unwrap();
3897
3898 let stream_ident =
3899 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3900
3901 match builders_or_callback {
3902 BuildersOrCallback::Builders(graph_builders) => {
3903 let builder = graph_builders.get_dfir_mut(&out_location);
3904 builder.add_dfir(
3905 parse_quote! {
3906 #stream_ident = #operator::<'tick, #neg_lifetime>();
3907 #pos_ident -> [pos]#stream_ident;
3908 #neg_ident -> [neg]#stream_ident;
3909 },
3910 None,
3911 Some(&next_stmt_id.to_string()),
3912 );
3913 }
3914 BuildersOrCallback::Callback(_, node_callback) => {
3915 node_callback(node, next_stmt_id);
3916 }
3917 }
3918
3919 let _ = next_stmt_id.get_and_increment();
3920
3921 ident_stack.push(stream_ident);
3922 }
3923
3924 HydroNode::JoinHalf { .. } => {
3925 let HydroNode::JoinHalf { right, .. } = node else {
3926 unreachable!()
3927 };
3928
3929 assert!(
3930 right.metadata().collection_kind.is_bounded(),
3931 "JoinHalf requires the right (build) side to be Bounded, got {:?}",
3932 right.metadata().collection_kind
3933 );
3934
3935 let build_lifetime = if right.metadata().location_id.is_top_level() {
3936 quote!('static)
3937 } else {
3938 quote!('tick)
3939 };
3940
3941 let build_ident = ident_stack.pop().unwrap();
3942 let probe_ident = ident_stack.pop().unwrap();
3943
3944 let stream_ident =
3945 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3946
3947 match builders_or_callback {
3948 BuildersOrCallback::Builders(graph_builders) => {
3949 let builder = graph_builders.get_dfir_mut(&out_location);
3950 builder.add_dfir(
3951 parse_quote! {
3952 #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
3953 #probe_ident -> [probe]#stream_ident;
3954 #build_ident -> [build]#stream_ident;
3955 },
3956 None,
3957 Some(&next_stmt_id.to_string()),
3958 );
3959 }
3960 BuildersOrCallback::Callback(_, node_callback) => {
3961 node_callback(node, next_stmt_id);
3962 }
3963 }
3964
3965 let _ = next_stmt_id.get_and_increment();
3966
3967 ident_stack.push(stream_ident);
3968 }
3969
3970 HydroNode::ResolveFutures { .. } => {
3971 let input_ident = ident_stack.pop().unwrap();
3972
3973 let futures_ident =
3974 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3975
3976 match builders_or_callback {
3977 BuildersOrCallback::Builders(graph_builders) => {
3978 let builder = graph_builders.get_dfir_mut(&out_location);
3979 builder.add_dfir(
3980 parse_quote! {
3981 #futures_ident = #input_ident -> resolve_futures();
3982 },
3983 None,
3984 Some(&next_stmt_id.to_string()),
3985 );
3986 }
3987 BuildersOrCallback::Callback(_, node_callback) => {
3988 node_callback(node, next_stmt_id);
3989 }
3990 }
3991
3992 let _ = next_stmt_id.get_and_increment();
3993
3994 ident_stack.push(futures_ident);
3995 }
3996
3997 HydroNode::ResolveFuturesBlocking { .. } => {
3998 let input_ident = ident_stack.pop().unwrap();
3999
4000 let futures_ident =
4001 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4002
4003 match builders_or_callback {
4004 BuildersOrCallback::Builders(graph_builders) => {
4005 let builder = graph_builders.get_dfir_mut(&out_location);
4006 builder.add_dfir(
4007 parse_quote! {
4008 #futures_ident = #input_ident -> resolve_futures_blocking();
4009 },
4010 None,
4011 Some(&next_stmt_id.to_string()),
4012 );
4013 }
4014 BuildersOrCallback::Callback(_, node_callback) => {
4015 node_callback(node, next_stmt_id);
4016 }
4017 }
4018
4019 let _ = next_stmt_id.get_and_increment();
4020
4021 ident_stack.push(futures_ident);
4022 }
4023
4024 HydroNode::ResolveFuturesOrdered { .. } => {
4025 let input_ident = ident_stack.pop().unwrap();
4026
4027 let futures_ident =
4028 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4029
4030 match builders_or_callback {
4031 BuildersOrCallback::Builders(graph_builders) => {
4032 let builder = graph_builders.get_dfir_mut(&out_location);
4033 builder.add_dfir(
4034 parse_quote! {
4035 #futures_ident = #input_ident -> resolve_futures_ordered();
4036 },
4037 None,
4038 Some(&next_stmt_id.to_string()),
4039 );
4040 }
4041 BuildersOrCallback::Callback(_, node_callback) => {
4042 node_callback(node, next_stmt_id);
4043 }
4044 }
4045
4046 let _ = next_stmt_id.get_and_increment();
4047
4048 ident_stack.push(futures_ident);
4049 }
4050
4051 HydroNode::Map { f, .. } => {
4052 let input_ident = ident_stack.pop().unwrap();
4054 let f_tokens = f.emit_tokens(&mut ident_stack);
4055
4056 let map_ident =
4057 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4058
4059 match builders_or_callback {
4060 BuildersOrCallback::Builders(graph_builders) => {
4061 let builder = graph_builders.get_dfir_mut(&out_location);
4062 builder.add_dfir(
4063 parse_quote! {
4064 #map_ident = #input_ident -> map(#f_tokens);
4065 },
4066 None,
4067 Some(&next_stmt_id.to_string()),
4068 );
4069 }
4070 BuildersOrCallback::Callback(_, node_callback) => {
4071 node_callback(node, next_stmt_id);
4072 }
4073 }
4074
4075 let _ = next_stmt_id.get_and_increment();
4076
4077 ident_stack.push(map_ident);
4078 }
4079
4080 HydroNode::FlatMap { f, .. } => {
4081 let input_ident = ident_stack.pop().unwrap();
4082 let f_tokens = f.emit_tokens(&mut ident_stack);
4083
4084 let flat_map_ident =
4085 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4086
4087 match builders_or_callback {
4088 BuildersOrCallback::Builders(graph_builders) => {
4089 let builder = graph_builders.get_dfir_mut(&out_location);
4090 builder.add_dfir(
4091 parse_quote! {
4092 #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4093 },
4094 None,
4095 Some(&next_stmt_id.to_string()),
4096 );
4097 }
4098 BuildersOrCallback::Callback(_, node_callback) => {
4099 node_callback(node, next_stmt_id);
4100 }
4101 }
4102
4103 let _ = next_stmt_id.get_and_increment();
4104
4105 ident_stack.push(flat_map_ident);
4106 }
4107
4108 HydroNode::FlatMapStreamBlocking { f, .. } => {
4109 let input_ident = ident_stack.pop().unwrap();
4110 let f_tokens = f.emit_tokens(&mut ident_stack);
4111
4112 let flat_map_stream_blocking_ident =
4113 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4114
4115 match builders_or_callback {
4116 BuildersOrCallback::Builders(graph_builders) => {
4117 let builder = graph_builders.get_dfir_mut(&out_location);
4118 builder.add_dfir(
4119 parse_quote! {
4120 #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4121 },
4122 None,
4123 Some(&next_stmt_id.to_string()),
4124 );
4125 }
4126 BuildersOrCallback::Callback(_, node_callback) => {
4127 node_callback(node, next_stmt_id);
4128 }
4129 }
4130
4131 let _ = next_stmt_id.get_and_increment();
4132
4133 ident_stack.push(flat_map_stream_blocking_ident);
4134 }
4135
4136 HydroNode::Filter { f, .. } => {
4137 let input_ident = ident_stack.pop().unwrap();
4138 let f_tokens = f.emit_tokens(&mut ident_stack);
4139
4140 let filter_ident =
4141 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4142
4143 match builders_or_callback {
4144 BuildersOrCallback::Builders(graph_builders) => {
4145 let builder = graph_builders.get_dfir_mut(&out_location);
4146 builder.add_dfir(
4147 parse_quote! {
4148 #filter_ident = #input_ident -> filter(#f_tokens);
4149 },
4150 None,
4151 Some(&next_stmt_id.to_string()),
4152 );
4153 }
4154 BuildersOrCallback::Callback(_, node_callback) => {
4155 node_callback(node, next_stmt_id);
4156 }
4157 }
4158
4159 let _ = next_stmt_id.get_and_increment();
4160
4161 ident_stack.push(filter_ident);
4162 }
4163
4164 HydroNode::FilterMap { f, .. } => {
4165 let input_ident = ident_stack.pop().unwrap();
4166 let f_tokens = f.emit_tokens(&mut ident_stack);
4167
4168 let filter_map_ident =
4169 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4170
4171 match builders_or_callback {
4172 BuildersOrCallback::Builders(graph_builders) => {
4173 let builder = graph_builders.get_dfir_mut(&out_location);
4174 builder.add_dfir(
4175 parse_quote! {
4176 #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4177 },
4178 None,
4179 Some(&next_stmt_id.to_string()),
4180 );
4181 }
4182 BuildersOrCallback::Callback(_, node_callback) => {
4183 node_callback(node, next_stmt_id);
4184 }
4185 }
4186
4187 let _ = next_stmt_id.get_and_increment();
4188
4189 ident_stack.push(filter_map_ident);
4190 }
4191
4192 HydroNode::Sort { .. } => {
4193 let input_ident = ident_stack.pop().unwrap();
4194
4195 let sort_ident =
4196 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4197
4198 match builders_or_callback {
4199 BuildersOrCallback::Builders(graph_builders) => {
4200 let builder = graph_builders.get_dfir_mut(&out_location);
4201 builder.add_dfir(
4202 parse_quote! {
4203 #sort_ident = #input_ident -> sort();
4204 },
4205 None,
4206 Some(&next_stmt_id.to_string()),
4207 );
4208 }
4209 BuildersOrCallback::Callback(_, node_callback) => {
4210 node_callback(node, next_stmt_id);
4211 }
4212 }
4213
4214 let _ = next_stmt_id.get_and_increment();
4215
4216 ident_stack.push(sort_ident);
4217 }
4218
4219 HydroNode::DeferTick { .. } => {
4220 let input_ident = ident_stack.pop().unwrap();
4221
4222 let defer_tick_ident =
4223 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4224
4225 match builders_or_callback {
4226 BuildersOrCallback::Builders(graph_builders) => {
4227 let builder = graph_builders.get_dfir_mut(&out_location);
4228 builder.add_dfir(
4229 parse_quote! {
4230 #defer_tick_ident = #input_ident -> defer_tick_lazy();
4231 },
4232 None,
4233 Some(&next_stmt_id.to_string()),
4234 );
4235 }
4236 BuildersOrCallback::Callback(_, node_callback) => {
4237 node_callback(node, next_stmt_id);
4238 }
4239 }
4240
4241 let _ = next_stmt_id.get_and_increment();
4242
4243 ident_stack.push(defer_tick_ident);
4244 }
4245
4246 HydroNode::Enumerate { input, .. } => {
4247 let input_ident = ident_stack.pop().unwrap();
4248
4249 let enumerate_ident =
4250 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4251
4252 match builders_or_callback {
4253 BuildersOrCallback::Builders(graph_builders) => {
4254 let builder = graph_builders.get_dfir_mut(&out_location);
4255 let lifetime = if input.metadata().location_id.is_top_level() {
4256 quote!('static)
4257 } else {
4258 quote!('tick)
4259 };
4260 builder.add_dfir(
4261 parse_quote! {
4262 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4263 },
4264 None,
4265 Some(&next_stmt_id.to_string()),
4266 );
4267 }
4268 BuildersOrCallback::Callback(_, node_callback) => {
4269 node_callback(node, next_stmt_id);
4270 }
4271 }
4272
4273 let _ = next_stmt_id.get_and_increment();
4274
4275 ident_stack.push(enumerate_ident);
4276 }
4277
4278 HydroNode::Inspect { f, .. } => {
4279 let input_ident = ident_stack.pop().unwrap();
4280 let f_tokens = f.emit_tokens(&mut ident_stack);
4281
4282 let inspect_ident =
4283 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4284
4285 match builders_or_callback {
4286 BuildersOrCallback::Builders(graph_builders) => {
4287 let builder = graph_builders.get_dfir_mut(&out_location);
4288 builder.add_dfir(
4289 parse_quote! {
4290 #inspect_ident = #input_ident -> inspect(#f_tokens);
4291 },
4292 None,
4293 Some(&next_stmt_id.to_string()),
4294 );
4295 }
4296 BuildersOrCallback::Callback(_, node_callback) => {
4297 node_callback(node, next_stmt_id);
4298 }
4299 }
4300
4301 let _ = next_stmt_id.get_and_increment();
4302
4303 ident_stack.push(inspect_ident);
4304 }
4305
4306 HydroNode::Unique { input, .. } => {
4307 let input_ident = ident_stack.pop().unwrap();
4308
4309 let unique_ident =
4310 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4311
4312 match builders_or_callback {
4313 BuildersOrCallback::Builders(graph_builders) => {
4314 let builder = graph_builders.get_dfir_mut(&out_location);
4315 let lifetime = if input.metadata().location_id.is_top_level() {
4316 quote!('static)
4317 } else {
4318 quote!('tick)
4319 };
4320
4321 builder.add_dfir(
4322 parse_quote! {
4323 #unique_ident = #input_ident -> unique::<#lifetime>();
4324 },
4325 None,
4326 Some(&next_stmt_id.to_string()),
4327 );
4328 }
4329 BuildersOrCallback::Callback(_, node_callback) => {
4330 node_callback(node, next_stmt_id);
4331 }
4332 }
4333
4334 let _ = next_stmt_id.get_and_increment();
4335
4336 ident_stack.push(unique_ident);
4337 }
4338
4339 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4340 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4341 if input.metadata().location_id.is_top_level()
4342 && input.metadata().collection_kind.is_bounded()
4343 {
4344 parse_quote!(fold_no_replay)
4345 } else {
4346 parse_quote!(fold)
4347 }
4348 } else if matches!(node, HydroNode::Scan { .. }) {
4349 parse_quote!(scan)
4350 } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4351 parse_quote!(scan_async_blocking)
4352 } else if let HydroNode::FoldKeyed { input, .. } = node {
4353 if input.metadata().location_id.is_top_level()
4354 && input.metadata().collection_kind.is_bounded()
4355 {
4356 todo!("Fold keyed on a top-level bounded collection is not yet supported")
4357 } else {
4358 parse_quote!(fold_keyed)
4359 }
4360 } else {
4361 unreachable!()
4362 };
4363
4364 let (HydroNode::Fold { input, .. }
4365 | HydroNode::FoldKeyed { input, .. }
4366 | HydroNode::Scan { input, .. }
4367 | HydroNode::ScanAsyncBlocking { input, .. }) = node
4368 else {
4369 unreachable!()
4370 };
4371
4372 let lifetime = if input.metadata().location_id.is_top_level() {
4373 quote!('static)
4374 } else {
4375 quote!('tick)
4376 };
4377
4378 let input_ident = ident_stack.pop().unwrap();
4379
4380 let (HydroNode::Fold { init, acc, .. }
4381 | HydroNode::FoldKeyed { init, acc, .. }
4382 | HydroNode::Scan { init, acc, .. }
4383 | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4384 else {
4385 unreachable!()
4386 };
4387
4388 let acc_tokens = acc.emit_tokens(&mut ident_stack);
4389 let init_tokens = init.emit_tokens(&mut ident_stack);
4390
4391 let fold_ident =
4392 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4393
4394 match builders_or_callback {
4395 BuildersOrCallback::Builders(graph_builders) => {
4396 if matches!(node, HydroNode::Fold { .. })
4397 && node.metadata().location_id.is_top_level()
4398 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4399 && graph_builders.singleton_intermediates()
4400 && !node.metadata().collection_kind.is_bounded()
4401 {
4402 let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4403 let hooked_input_ident = graph_builders.emit_fold_hook(
4404 &input.metadata().location_id,
4405 &input_ident,
4406 &input.metadata().collection_kind,
4407 &node.metadata().op,
4408 );
4409
4410 let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4411 let acc: syn::Expr = parse_quote!({
4412 let mut __inner = #acc_tokens;
4413 move |__state, __batch: Vec<_>| {
4414 if __batch.is_empty() {
4415 return None;
4416 }
4417 for __value in __batch {
4418 __inner(__state, __value);
4419 }
4420 Some(__state.clone())
4421 }
4422 });
4423 (hooked, acc)
4424 } else {
4425 let acc: syn::Expr = parse_quote!({
4426 let mut __inner = #acc_tokens;
4427 move |__state, __value| {
4428 __inner(__state, __value);
4429 Some(__state.clone())
4430 }
4431 });
4432 (&input_ident, acc)
4433 };
4434
4435 let builder = graph_builders.get_dfir_mut(&out_location);
4436 builder.add_dfir(
4437 parse_quote! {
4438 source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4439 #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4440 #fold_ident = chain();
4441 },
4442 None,
4443 Some(&next_stmt_id.to_string()),
4444 );
4445
4446 if hooked_input_ident.is_some() {
4447 fold_hooked_idents.insert(fold_ident.to_string());
4448 }
4449 } else if matches!(node, HydroNode::FoldKeyed { .. })
4450 && node.metadata().location_id.is_top_level()
4451 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4452 && graph_builders.singleton_intermediates()
4453 && !node.metadata().collection_kind.is_bounded()
4454 {
4455 let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4456 let hooked_input_ident = graph_builders.emit_fold_hook(
4457 &input.metadata().location_id,
4458 &input_ident,
4459 &input.metadata().collection_kind,
4460 &node.metadata().op,
4461 );
4462 let builder = graph_builders.get_dfir_mut(&out_location);
4463
4464 let wrapped_acc: syn::Expr = parse_quote!({
4465 let mut __init = #init_tokens;
4466 let mut __inner = #acc_tokens;
4467 move |__state, __kv: (_, _)| {
4468 let __state = __state
4470 .entry(::std::clone::Clone::clone(&__kv.0))
4471 .or_insert_with(|| (__init)());
4472 __inner(__state, __kv.1);
4473 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4474 }
4475 });
4476
4477 if let Some(hooked_input_ident) = hooked_input_ident {
4478 builder.add_dfir(
4479 parse_quote! {
4480 #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4481 },
4482 None,
4483 Some(&next_stmt_id.to_string()),
4484 );
4485
4486 fold_hooked_idents.insert(fold_ident.to_string());
4487 } else {
4488 builder.add_dfir(
4489 parse_quote! {
4490 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4491 },
4492 None,
4493 Some(&next_stmt_id.to_string()),
4494 );
4495 }
4496 } else if (matches!(node, HydroNode::Fold { .. })
4497 || matches!(node, HydroNode::FoldKeyed { .. }))
4498 && !node.metadata().location_id.is_top_level()
4499 && graph_builders.singleton_intermediates()
4500 {
4501 let input_ref = match &*node {
4502 HydroNode::Fold { input, .. } => input,
4503 HydroNode::FoldKeyed { input, .. } => input,
4504 _ => unreachable!(),
4505 };
4506 let hooked_input_ident = graph_builders.emit_fold_hook(
4507 &input_ref.metadata().location_id,
4508 &input_ident,
4509 &input_ref.metadata().collection_kind,
4510 &node.metadata().op,
4511 );
4512
4513 let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4514 let builder = graph_builders.get_dfir_mut(&out_location);
4515 builder.add_dfir(
4516 parse_quote! {
4517 #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4518 },
4519 None,
4520 Some(&next_stmt_id.to_string()),
4521 );
4522 } else {
4523 let builder = graph_builders.get_dfir_mut(&out_location);
4524 builder.add_dfir(
4525 parse_quote! {
4526 #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4527 },
4528 None,
4529 Some(&next_stmt_id.to_string()),
4530 );
4531 }
4532 }
4533 BuildersOrCallback::Callback(_, node_callback) => {
4534 node_callback(node, next_stmt_id);
4535 }
4536 }
4537
4538 let _ = next_stmt_id.get_and_increment();
4539
4540 ident_stack.push(fold_ident);
4541 }
4542
4543 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4544 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4545 if input.metadata().location_id.is_top_level()
4546 && input.metadata().collection_kind.is_bounded()
4547 {
4548 parse_quote!(reduce_no_replay)
4549 } else {
4550 parse_quote!(reduce)
4551 }
4552 } else if let HydroNode::ReduceKeyed { input, .. } = node {
4553 if input.metadata().location_id.is_top_level()
4554 && input.metadata().collection_kind.is_bounded()
4555 {
4556 todo!(
4557 "Calling keyed reduce on a top-level bounded collection is not supported"
4558 )
4559 } else {
4560 parse_quote!(reduce_keyed)
4561 }
4562 } else {
4563 unreachable!()
4564 };
4565
4566 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4567 else {
4568 unreachable!()
4569 };
4570
4571 let lifetime = if input.metadata().location_id.is_top_level() {
4572 quote!('static)
4573 } else {
4574 quote!('tick)
4575 };
4576
4577 let input_ident = ident_stack.pop().unwrap();
4578
4579 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4580 else {
4581 unreachable!()
4582 };
4583
4584 let f_tokens = f.emit_tokens(&mut ident_stack);
4585
4586 let reduce_ident =
4587 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4588
4589 match builders_or_callback {
4590 BuildersOrCallback::Builders(graph_builders) => {
4591 if matches!(node, HydroNode::Reduce { .. })
4592 && node.metadata().location_id.is_top_level()
4593 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4594 && graph_builders.singleton_intermediates()
4595 && !node.metadata().collection_kind.is_bounded()
4596 {
4597 todo!(
4598 "Reduce with optional intermediates is not yet supported in simulator"
4599 );
4600 } else if matches!(node, HydroNode::ReduceKeyed { .. })
4601 && node.metadata().location_id.is_top_level()
4602 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4603 && graph_builders.singleton_intermediates()
4604 && !node.metadata().collection_kind.is_bounded()
4605 {
4606 todo!(
4607 "Reduce keyed with optional intermediates is not yet supported in simulator"
4608 );
4609 } else {
4610 let builder = graph_builders.get_dfir_mut(&out_location);
4611 builder.add_dfir(
4612 parse_quote! {
4613 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4614 },
4615 None,
4616 Some(&next_stmt_id.to_string()),
4617 );
4618 }
4619 }
4620 BuildersOrCallback::Callback(_, node_callback) => {
4621 node_callback(node, next_stmt_id);
4622 }
4623 }
4624
4625 let _ = next_stmt_id.get_and_increment();
4626
4627 ident_stack.push(reduce_ident);
4628 }
4629
4630 HydroNode::ReduceKeyedWatermark {
4631 f,
4632 input,
4633 metadata,
4634 ..
4635 } => {
4636 let lifetime = if input.metadata().location_id.is_top_level() {
4637 quote!('static)
4638 } else {
4639 quote!('tick)
4640 };
4641
4642 let watermark_ident = ident_stack.pop().unwrap();
4644 let input_ident = ident_stack.pop().unwrap();
4645 let f_tokens = f.emit_tokens(&mut ident_stack);
4646
4647 let chain_ident = syn::Ident::new(
4648 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
4649 Span::call_site(),
4650 );
4651
4652 let fold_ident =
4653 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4654
4655 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4656 && input.metadata().collection_kind.is_bounded()
4657 {
4658 parse_quote!(fold_no_replay)
4659 } else {
4660 parse_quote!(fold)
4661 };
4662
4663 match builders_or_callback {
4664 BuildersOrCallback::Builders(graph_builders) => {
4665 if metadata.location_id.is_top_level()
4666 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4667 && graph_builders.singleton_intermediates()
4668 && !metadata.collection_kind.is_bounded()
4669 {
4670 todo!(
4671 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4672 )
4673 } else {
4674 let builder = graph_builders.get_dfir_mut(&out_location);
4675 builder.add_dfir(
4676 parse_quote! {
4677 #chain_ident = chain();
4678 #input_ident
4679 -> map(|x| (Some(x), None))
4680 -> [0]#chain_ident;
4681 #watermark_ident
4682 -> map(|watermark| (None, Some(watermark)))
4683 -> [1]#chain_ident;
4684
4685 #fold_ident = #chain_ident
4686 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4687 let __reduce_keyed_fn = #f_tokens;
4688 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4689 if let Some((k, v)) = opt_payload {
4690 if let Some(curr_watermark) = *opt_curr_watermark {
4691 if k < curr_watermark {
4692 return;
4693 }
4694 }
4695 match map.entry(k) {
4696 ::std::collections::hash_map::Entry::Vacant(e) => {
4697 e.insert(v);
4698 }
4699 ::std::collections::hash_map::Entry::Occupied(mut e) => {
4700 __reduce_keyed_fn(e.get_mut(), v);
4701 }
4702 }
4703 } else {
4704 let watermark = opt_watermark.unwrap();
4705 if let Some(curr_watermark) = *opt_curr_watermark {
4706 if watermark <= curr_watermark {
4707 return;
4708 }
4709 }
4710 map.retain(|k, _| *k >= watermark);
4711 *opt_curr_watermark = Some(watermark);
4712 }
4713 }
4714 })
4715 -> flat_map(|(map, _curr_watermark)| map);
4716 },
4717 None,
4718 Some(&next_stmt_id.to_string()),
4719 );
4720 }
4721 }
4722 BuildersOrCallback::Callback(_, node_callback) => {
4723 node_callback(node, next_stmt_id);
4724 }
4725 }
4726
4727 let _ = next_stmt_id.get_and_increment();
4728
4729 ident_stack.push(fold_ident);
4730 }
4731
4732 HydroNode::Network {
4733 networking_info,
4734 serialize_fn: serialize_pipeline,
4735 instantiate_fn,
4736 deserialize_fn: deserialize_pipeline,
4737 input,
4738 ..
4739 } => {
4740 let input_ident = ident_stack.pop().unwrap();
4741
4742 let receiver_stream_ident =
4743 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4744
4745 match builders_or_callback {
4746 BuildersOrCallback::Builders(graph_builders) => {
4747 let (sink_expr, source_expr) = match instantiate_fn {
4748 DebugInstantiate::Building => (
4749 syn::parse_quote!(DUMMY_SINK),
4750 syn::parse_quote!(DUMMY_SOURCE),
4751 ),
4752
4753 DebugInstantiate::Finalized(finalized) => {
4754 (finalized.sink.clone(), finalized.source.clone())
4755 }
4756 };
4757
4758 graph_builders.create_network(
4759 &input.metadata().location_id,
4760 &out_location,
4761 input_ident,
4762 &receiver_stream_ident,
4763 serialize_pipeline.as_ref(),
4764 sink_expr,
4765 source_expr,
4766 deserialize_pipeline.as_ref(),
4767 *next_stmt_id,
4768 networking_info,
4769 );
4770 }
4771 BuildersOrCallback::Callback(_, node_callback) => {
4772 node_callback(node, next_stmt_id);
4773 }
4774 }
4775
4776 let _ = next_stmt_id.get_and_increment();
4777
4778 ident_stack.push(receiver_stream_ident);
4779 }
4780
4781 HydroNode::ExternalInput {
4782 instantiate_fn,
4783 deserialize_fn: deserialize_pipeline,
4784 ..
4785 } => {
4786 let receiver_stream_ident =
4787 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4788
4789 match builders_or_callback {
4790 BuildersOrCallback::Builders(graph_builders) => {
4791 let (_, source_expr) = match instantiate_fn {
4792 DebugInstantiate::Building => (
4793 syn::parse_quote!(DUMMY_SINK),
4794 syn::parse_quote!(DUMMY_SOURCE),
4795 ),
4796
4797 DebugInstantiate::Finalized(finalized) => {
4798 (finalized.sink.clone(), finalized.source.clone())
4799 }
4800 };
4801
4802 graph_builders.create_external_source(
4803 &out_location,
4804 source_expr,
4805 &receiver_stream_ident,
4806 deserialize_pipeline.as_ref(),
4807 *next_stmt_id,
4808 );
4809 }
4810 BuildersOrCallback::Callback(_, node_callback) => {
4811 node_callback(node, next_stmt_id);
4812 }
4813 }
4814
4815 let _ = next_stmt_id.get_and_increment();
4816
4817 ident_stack.push(receiver_stream_ident);
4818 }
4819
4820 HydroNode::Counter {
4821 tag,
4822 duration,
4823 prefix,
4824 ..
4825 } => {
4826 let input_ident = ident_stack.pop().unwrap();
4827
4828 let counter_ident =
4829 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4830
4831 match builders_or_callback {
4832 BuildersOrCallback::Builders(graph_builders) => {
4833 let arg = format!("{}({})", prefix, tag);
4834 let builder = graph_builders.get_dfir_mut(&out_location);
4835 builder.add_dfir(
4836 parse_quote! {
4837 #counter_ident = #input_ident -> _counter(#arg, #duration);
4838 },
4839 None,
4840 Some(&next_stmt_id.to_string()),
4841 );
4842 }
4843 BuildersOrCallback::Callback(_, node_callback) => {
4844 node_callback(node, next_stmt_id);
4845 }
4846 }
4847
4848 let _ = next_stmt_id.get_and_increment();
4849
4850 ident_stack.push(counter_ident);
4851 }
4852 }
4853 },
4854 seen_tees,
4855 false,
4856 );
4857
4858 let ret = ident_stack
4859 .pop()
4860 .expect("ident_stack should have exactly one element after traversal");
4861 assert!(
4862 ident_stack.is_empty(),
4863 "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
4864 This indicates a bug in the code gen: some node pushed idents that were never consumed.",
4865 ident_stack.len()
4866 );
4867 ret
4868 }
4869
4870 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4871 match self {
4872 HydroNode::Placeholder => {
4873 panic!()
4874 }
4875 HydroNode::Cast { .. }
4876 | HydroNode::ObserveNonDet { .. }
4877 | HydroNode::UnboundSingleton { .. }
4878 | HydroNode::AssertIsConsistent { .. } => {}
4879 HydroNode::Source { source, .. } => match source {
4880 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4881 HydroSource::ExternalNetwork()
4882 | HydroSource::Spin()
4883 | HydroSource::ClusterMembers(_, _)
4884 | HydroSource::Embedded(_)
4885 | HydroSource::EmbeddedSingleton(_) => {} },
4887 HydroNode::SingletonSource { value, .. } => {
4888 transform(value);
4889 }
4890 HydroNode::CycleSource { .. }
4891 | HydroNode::Tee { .. }
4892 | HydroNode::Singleton { .. }
4893 | HydroNode::YieldConcat { .. }
4894 | HydroNode::BeginAtomic { .. }
4895 | HydroNode::EndAtomic { .. }
4896 | HydroNode::Batch { .. }
4897 | HydroNode::Chain { .. }
4898 | HydroNode::MergeOrdered { .. }
4899 | HydroNode::ChainFirst { .. }
4900 | HydroNode::CrossProduct { .. }
4901 | HydroNode::CrossSingleton { .. }
4902 | HydroNode::ResolveFutures { .. }
4903 | HydroNode::ResolveFuturesBlocking { .. }
4904 | HydroNode::ResolveFuturesOrdered { .. }
4905 | HydroNode::Join { .. }
4906 | HydroNode::JoinHalf { .. }
4907 | HydroNode::Difference { .. }
4908 | HydroNode::AntiJoin { .. }
4909 | HydroNode::DeferTick { .. }
4910 | HydroNode::Enumerate { .. }
4911 | HydroNode::Unique { .. }
4912 | HydroNode::Sort { .. } => {}
4913 HydroNode::Map { f, .. }
4914 | HydroNode::FlatMap { f, .. }
4915 | HydroNode::FlatMapStreamBlocking { f, .. }
4916 | HydroNode::Filter { f, .. }
4917 | HydroNode::FilterMap { f, .. }
4918 | HydroNode::Inspect { f, .. }
4919 | HydroNode::Partition { f, .. }
4920 | HydroNode::Reduce { f, .. }
4921 | HydroNode::ReduceKeyed { f, .. }
4922 | HydroNode::ReduceKeyedWatermark { f, .. } => {
4923 transform(&mut f.expr);
4924 }
4925 HydroNode::Fold { init, acc, .. }
4926 | HydroNode::Scan { init, acc, .. }
4927 | HydroNode::ScanAsyncBlocking { init, acc, .. }
4928 | HydroNode::FoldKeyed { init, acc, .. } => {
4929 transform(&mut init.expr);
4930 transform(&mut acc.expr);
4931 }
4932 HydroNode::Network {
4933 serialize_fn,
4934 deserialize_fn,
4935 ..
4936 } => {
4937 if let Some(serialize_fn) = serialize_fn {
4938 transform(serialize_fn);
4939 }
4940 if let Some(deserialize_fn) = deserialize_fn {
4941 transform(deserialize_fn);
4942 }
4943 }
4944 HydroNode::ExternalInput { deserialize_fn, .. } => {
4945 if let Some(deserialize_fn) = deserialize_fn {
4946 transform(deserialize_fn);
4947 }
4948 }
4949 HydroNode::Counter { duration, .. } => {
4950 transform(duration);
4951 }
4952 }
4953 }
4954
4955 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4956 &self.metadata().op
4957 }
4958
4959 pub fn metadata(&self) -> &HydroIrMetadata {
4960 match self {
4961 HydroNode::Placeholder => {
4962 panic!()
4963 }
4964 HydroNode::Cast { metadata, .. }
4965 | HydroNode::ObserveNonDet { metadata, .. }
4966 | HydroNode::AssertIsConsistent { metadata, .. }
4967 | HydroNode::UnboundSingleton { metadata, .. }
4968 | HydroNode::Source { metadata, .. }
4969 | HydroNode::SingletonSource { metadata, .. }
4970 | HydroNode::CycleSource { metadata, .. }
4971 | HydroNode::Tee { metadata, .. }
4972 | HydroNode::Singleton { metadata, .. }
4973 | HydroNode::Partition { metadata, .. }
4974 | HydroNode::YieldConcat { metadata, .. }
4975 | HydroNode::BeginAtomic { metadata, .. }
4976 | HydroNode::EndAtomic { metadata, .. }
4977 | HydroNode::Batch { metadata, .. }
4978 | HydroNode::Chain { metadata, .. }
4979 | HydroNode::MergeOrdered { metadata, .. }
4980 | HydroNode::ChainFirst { metadata, .. }
4981 | HydroNode::CrossProduct { metadata, .. }
4982 | HydroNode::CrossSingleton { metadata, .. }
4983 | HydroNode::Join { metadata, .. }
4984 | HydroNode::JoinHalf { metadata, .. }
4985 | HydroNode::Difference { metadata, .. }
4986 | HydroNode::AntiJoin { metadata, .. }
4987 | HydroNode::ResolveFutures { metadata, .. }
4988 | HydroNode::ResolveFuturesBlocking { metadata, .. }
4989 | HydroNode::ResolveFuturesOrdered { metadata, .. }
4990 | HydroNode::Map { metadata, .. }
4991 | HydroNode::FlatMap { metadata, .. }
4992 | HydroNode::FlatMapStreamBlocking { metadata, .. }
4993 | HydroNode::Filter { metadata, .. }
4994 | HydroNode::FilterMap { metadata, .. }
4995 | HydroNode::DeferTick { metadata, .. }
4996 | HydroNode::Enumerate { metadata, .. }
4997 | HydroNode::Inspect { metadata, .. }
4998 | HydroNode::Unique { metadata, .. }
4999 | HydroNode::Sort { metadata, .. }
5000 | HydroNode::Scan { metadata, .. }
5001 | HydroNode::ScanAsyncBlocking { metadata, .. }
5002 | HydroNode::Fold { metadata, .. }
5003 | HydroNode::FoldKeyed { metadata, .. }
5004 | HydroNode::Reduce { metadata, .. }
5005 | HydroNode::ReduceKeyed { metadata, .. }
5006 | HydroNode::ReduceKeyedWatermark { metadata, .. }
5007 | HydroNode::ExternalInput { metadata, .. }
5008 | HydroNode::Network { metadata, .. }
5009 | HydroNode::Counter { metadata, .. } => metadata,
5010 }
5011 }
5012
5013 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
5014 &mut self.metadata_mut().op
5015 }
5016
5017 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
5018 match self {
5019 HydroNode::Placeholder => {
5020 panic!()
5021 }
5022 HydroNode::Cast { metadata, .. }
5023 | HydroNode::ObserveNonDet { metadata, .. }
5024 | HydroNode::AssertIsConsistent { metadata, .. }
5025 | HydroNode::UnboundSingleton { metadata, .. }
5026 | HydroNode::Source { metadata, .. }
5027 | HydroNode::SingletonSource { metadata, .. }
5028 | HydroNode::CycleSource { metadata, .. }
5029 | HydroNode::Tee { metadata, .. }
5030 | HydroNode::Singleton { metadata, .. }
5031 | HydroNode::Partition { metadata, .. }
5032 | HydroNode::YieldConcat { metadata, .. }
5033 | HydroNode::BeginAtomic { metadata, .. }
5034 | HydroNode::EndAtomic { metadata, .. }
5035 | HydroNode::Batch { metadata, .. }
5036 | HydroNode::Chain { metadata, .. }
5037 | HydroNode::MergeOrdered { metadata, .. }
5038 | HydroNode::ChainFirst { metadata, .. }
5039 | HydroNode::CrossProduct { metadata, .. }
5040 | HydroNode::CrossSingleton { metadata, .. }
5041 | HydroNode::Join { metadata, .. }
5042 | HydroNode::JoinHalf { metadata, .. }
5043 | HydroNode::Difference { metadata, .. }
5044 | HydroNode::AntiJoin { metadata, .. }
5045 | HydroNode::ResolveFutures { metadata, .. }
5046 | HydroNode::ResolveFuturesBlocking { metadata, .. }
5047 | HydroNode::ResolveFuturesOrdered { metadata, .. }
5048 | HydroNode::Map { metadata, .. }
5049 | HydroNode::FlatMap { metadata, .. }
5050 | HydroNode::FlatMapStreamBlocking { metadata, .. }
5051 | HydroNode::Filter { metadata, .. }
5052 | HydroNode::FilterMap { metadata, .. }
5053 | HydroNode::DeferTick { metadata, .. }
5054 | HydroNode::Enumerate { metadata, .. }
5055 | HydroNode::Inspect { metadata, .. }
5056 | HydroNode::Unique { metadata, .. }
5057 | HydroNode::Sort { metadata, .. }
5058 | HydroNode::Scan { metadata, .. }
5059 | HydroNode::ScanAsyncBlocking { metadata, .. }
5060 | HydroNode::Fold { metadata, .. }
5061 | HydroNode::FoldKeyed { metadata, .. }
5062 | HydroNode::Reduce { metadata, .. }
5063 | HydroNode::ReduceKeyed { metadata, .. }
5064 | HydroNode::ReduceKeyedWatermark { metadata, .. }
5065 | HydroNode::ExternalInput { metadata, .. }
5066 | HydroNode::Network { metadata, .. }
5067 | HydroNode::Counter { metadata, .. } => metadata,
5068 }
5069 }
5070
5071 pub fn input(&self) -> Vec<&HydroNode> {
5072 match self {
5073 HydroNode::Placeholder => {
5074 panic!()
5075 }
5076 HydroNode::Source { .. }
5077 | HydroNode::SingletonSource { .. }
5078 | HydroNode::ExternalInput { .. }
5079 | HydroNode::CycleSource { .. }
5080 | HydroNode::Tee { .. }
5081 | HydroNode::Singleton { .. }
5082 | HydroNode::Partition { .. } => {
5083 vec![]
5085 }
5086 HydroNode::Cast { inner, .. }
5087 | HydroNode::ObserveNonDet { inner, .. }
5088 | HydroNode::YieldConcat { inner, .. }
5089 | HydroNode::BeginAtomic { inner, .. }
5090 | HydroNode::EndAtomic { inner, .. }
5091 | HydroNode::Batch { inner, .. }
5092 | HydroNode::UnboundSingleton { inner, .. }
5093 | HydroNode::AssertIsConsistent { inner, .. } => {
5094 vec![inner]
5095 }
5096 HydroNode::Chain { first, second, .. } => {
5097 vec![first, second]
5098 }
5099 HydroNode::MergeOrdered { first, second, .. } => {
5100 vec![first, second]
5101 }
5102 HydroNode::ChainFirst { first, second, .. } => {
5103 vec![first, second]
5104 }
5105 HydroNode::CrossProduct { left, right, .. }
5106 | HydroNode::CrossSingleton { left, right, .. }
5107 | HydroNode::Join { left, right, .. }
5108 | HydroNode::JoinHalf { left, right, .. } => {
5109 vec![left, right]
5110 }
5111 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5112 vec![pos, neg]
5113 }
5114 HydroNode::Map { input, .. }
5115 | HydroNode::FlatMap { input, .. }
5116 | HydroNode::FlatMapStreamBlocking { input, .. }
5117 | HydroNode::Filter { input, .. }
5118 | HydroNode::FilterMap { input, .. }
5119 | HydroNode::Sort { input, .. }
5120 | HydroNode::DeferTick { input, .. }
5121 | HydroNode::Enumerate { input, .. }
5122 | HydroNode::Inspect { input, .. }
5123 | HydroNode::Unique { input, .. }
5124 | HydroNode::Network { input, .. }
5125 | HydroNode::Counter { input, .. }
5126 | HydroNode::ResolveFutures { input, .. }
5127 | HydroNode::ResolveFuturesBlocking { input, .. }
5128 | HydroNode::ResolveFuturesOrdered { input, .. }
5129 | HydroNode::Fold { input, .. }
5130 | HydroNode::FoldKeyed { input, .. }
5131 | HydroNode::Reduce { input, .. }
5132 | HydroNode::ReduceKeyed { input, .. }
5133 | HydroNode::Scan { input, .. }
5134 | HydroNode::ScanAsyncBlocking { input, .. } => {
5135 vec![input]
5136 }
5137 HydroNode::ReduceKeyedWatermark {
5138 input, watermark, ..
5139 } => {
5140 vec![input, watermark]
5141 }
5142 }
5143 }
5144
5145 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5146 self.input()
5147 .iter()
5148 .map(|input_node| input_node.metadata())
5149 .collect()
5150 }
5151
5152 pub fn is_shared_with_others(&self) -> bool {
5156 match self {
5157 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5158 Rc::strong_count(&inner.0) > 1
5159 }
5160 HydroNode::Singleton { .. } => false,
5163 _ => false,
5164 }
5165 }
5166
5167 pub fn print_root(&self) -> String {
5168 match self {
5169 HydroNode::Placeholder => {
5170 panic!()
5171 }
5172 HydroNode::Cast { .. } => "Cast()".to_owned(),
5173 HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5174 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5175 HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5176 HydroNode::Source { source, .. } => format!("Source({:?})", source),
5177 HydroNode::SingletonSource {
5178 value,
5179 first_tick_only,
5180 ..
5181 } => format!(
5182 "SingletonSource({:?}, first_tick_only={})",
5183 value, first_tick_only
5184 ),
5185 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5186 HydroNode::Tee { inner, .. } => {
5187 format!("Tee({})", inner.0.borrow().print_root())
5188 }
5189 HydroNode::Singleton { inner, .. } => {
5190 format!("Singleton({})", inner.0.borrow().print_root())
5191 }
5192 HydroNode::Partition { f, is_true, .. } => {
5193 format!("Partition({:?}, is_true={})", f, is_true)
5194 }
5195 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5196 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5197 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5198 HydroNode::Batch { .. } => "Batch()".to_owned(),
5199 HydroNode::Chain { first, second, .. } => {
5200 format!("Chain({}, {})", first.print_root(), second.print_root())
5201 }
5202 HydroNode::MergeOrdered { first, second, .. } => {
5203 format!(
5204 "MergeOrdered({}, {})",
5205 first.print_root(),
5206 second.print_root()
5207 )
5208 }
5209 HydroNode::ChainFirst { first, second, .. } => {
5210 format!(
5211 "ChainFirst({}, {})",
5212 first.print_root(),
5213 second.print_root()
5214 )
5215 }
5216 HydroNode::CrossProduct { left, right, .. } => {
5217 format!(
5218 "CrossProduct({}, {})",
5219 left.print_root(),
5220 right.print_root()
5221 )
5222 }
5223 HydroNode::CrossSingleton { left, right, .. } => {
5224 format!(
5225 "CrossSingleton({}, {})",
5226 left.print_root(),
5227 right.print_root()
5228 )
5229 }
5230 HydroNode::Join { left, right, .. } => {
5231 format!("Join({}, {})", left.print_root(), right.print_root())
5232 }
5233 HydroNode::JoinHalf { left, right, .. } => {
5234 format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5235 }
5236 HydroNode::Difference { pos, neg, .. } => {
5237 format!("Difference({}, {})", pos.print_root(), neg.print_root())
5238 }
5239 HydroNode::AntiJoin { pos, neg, .. } => {
5240 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5241 }
5242 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5243 HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5244 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5245 HydroNode::Map { f, .. } => format!("Map({:?})", f),
5246 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5247 HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5248 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5249 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5250 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5251 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5252 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5253 HydroNode::Unique { .. } => "Unique()".to_owned(),
5254 HydroNode::Sort { .. } => "Sort()".to_owned(),
5255 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5256 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5257 HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5258 format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5259 }
5260 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5261 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5262 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5263 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5264 HydroNode::Network { .. } => "Network()".to_owned(),
5265 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5266 HydroNode::Counter { tag, duration, .. } => {
5267 format!("Counter({:?}, {:?})", tag, duration)
5268 }
5269 }
5270 }
5271}
5272
5273#[cfg(feature = "build")]
5274fn instantiate_network<'a, D>(
5275 env: &mut D::InstantiateEnv,
5276 from_location: &LocationId,
5277 to_location: &LocationId,
5278 processes: &SparseSecondaryMap<LocationKey, D::Process>,
5279 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5280 name: Option<&str>,
5281 networking_info: &crate::networking::NetworkingInfo,
5282) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5283where
5284 D: Deploy<'a>,
5285{
5286 let ((sink, source), connect_fn) = match (from_location, to_location) {
5287 (&LocationId::Process(from), &LocationId::Process(to)) => {
5288 let from_node = processes
5289 .get(from)
5290 .unwrap_or_else(|| {
5291 panic!("A process used in the graph was not instantiated: {}", from)
5292 })
5293 .clone();
5294 let to_node = processes
5295 .get(to)
5296 .unwrap_or_else(|| {
5297 panic!("A process used in the graph was not instantiated: {}", to)
5298 })
5299 .clone();
5300
5301 let sink_port = from_node.next_port();
5302 let source_port = to_node.next_port();
5303
5304 (
5305 D::o2o_sink_source(
5306 env,
5307 &from_node,
5308 &sink_port,
5309 &to_node,
5310 &source_port,
5311 name,
5312 networking_info,
5313 ),
5314 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5315 )
5316 }
5317 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5318 let from_node = processes
5319 .get(from)
5320 .unwrap_or_else(|| {
5321 panic!("A process used in the graph was not instantiated: {}", from)
5322 })
5323 .clone();
5324 let to_node = clusters
5325 .get(to)
5326 .unwrap_or_else(|| {
5327 panic!("A cluster used in the graph was not instantiated: {}", to)
5328 })
5329 .clone();
5330
5331 let sink_port = from_node.next_port();
5332 let source_port = to_node.next_port();
5333
5334 (
5335 D::o2m_sink_source(
5336 env,
5337 &from_node,
5338 &sink_port,
5339 &to_node,
5340 &source_port,
5341 name,
5342 networking_info,
5343 ),
5344 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5345 )
5346 }
5347 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5348 let from_node = clusters
5349 .get(from)
5350 .unwrap_or_else(|| {
5351 panic!("A cluster used in the graph was not instantiated: {}", from)
5352 })
5353 .clone();
5354 let to_node = processes
5355 .get(to)
5356 .unwrap_or_else(|| {
5357 panic!("A process used in the graph was not instantiated: {}", to)
5358 })
5359 .clone();
5360
5361 let sink_port = from_node.next_port();
5362 let source_port = to_node.next_port();
5363
5364 (
5365 D::m2o_sink_source(
5366 env,
5367 &from_node,
5368 &sink_port,
5369 &to_node,
5370 &source_port,
5371 name,
5372 networking_info,
5373 ),
5374 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5375 )
5376 }
5377 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5378 let from_node = clusters
5379 .get(from)
5380 .unwrap_or_else(|| {
5381 panic!("A cluster used in the graph was not instantiated: {}", from)
5382 })
5383 .clone();
5384 let to_node = clusters
5385 .get(to)
5386 .unwrap_or_else(|| {
5387 panic!("A cluster used in the graph was not instantiated: {}", to)
5388 })
5389 .clone();
5390
5391 let sink_port = from_node.next_port();
5392 let source_port = to_node.next_port();
5393
5394 (
5395 D::m2m_sink_source(
5396 env,
5397 &from_node,
5398 &sink_port,
5399 &to_node,
5400 &source_port,
5401 name,
5402 networking_info,
5403 ),
5404 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5405 )
5406 }
5407 (LocationId::Tick(_, _), _) => panic!(),
5408 (_, LocationId::Tick(_, _)) => panic!(),
5409 (LocationId::Atomic(_), _) => panic!(),
5410 (_, LocationId::Atomic(_)) => panic!(),
5411 };
5412 (sink, source, connect_fn)
5413}
5414
5415#[cfg(test)]
5416mod serde_test;
5417
5418#[cfg(test)]
5419mod test {
5420 use std::mem::size_of;
5421
5422 use stageleft::{QuotedWithContext, q};
5423
5424 use super::*;
5425
5426 #[test]
5427 #[cfg_attr(
5428 not(feature = "build"),
5429 ignore = "expects inclusion of feature-gated fields"
5430 )]
5431 fn hydro_node_size() {
5432 assert_eq!(size_of::<HydroNode>(), 264);
5433 }
5434
5435 #[test]
5436 #[cfg_attr(
5437 not(feature = "build"),
5438 ignore = "expects inclusion of feature-gated fields"
5439 )]
5440 fn hydro_root_size() {
5441 assert_eq!(size_of::<HydroRoot>(), 136);
5442 }
5443
5444 #[test]
5445 fn test_simplify_q_macro_basic() {
5446 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5448 let result = simplify_q_macro(simple_expr.clone());
5449 assert_eq!(result, simple_expr);
5450 }
5451
5452 #[test]
5453 fn test_simplify_q_macro_actual_stageleft_call() {
5454 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5456 let result = simplify_q_macro(stageleft_call);
5457 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5460 }
5461
5462 #[test]
5463 fn test_closure_no_pipe_at_start() {
5464 let stageleft_call = q!({
5466 let foo = 123;
5467 move |b: usize| b + foo
5468 })
5469 .splice_fn1_ctx(&());
5470 let result = simplify_q_macro(stageleft_call);
5471 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5472 }
5473}