1use std::collections::{HashMap, HashSet};
2use std::io::Error;
3use std::marker::PhantomData;
4use std::pin::Pin;
5
6use bytes::{Bytes, BytesMut};
7use futures::{Sink, Stream};
8use proc_macro2::Span;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use slotmap::{SecondaryMap, SlotMap, SparseSecondaryMap};
12use stageleft::QuotedWithContext;
13
14use super::built::build_inner;
15use super::compiled::CompiledFlow;
16use super::deploy_provider::{
17 ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
18};
19use super::ir::HydroRoot;
20use crate::live_collections::stream::{Ordering, Retries};
21use crate::location::dynamic::LocationId;
22use crate::location::external_process::{
23 ExternalBincodeBidi, ExternalBincodeSink, ExternalBincodeStream, ExternalBytesPort,
24};
25use crate::location::{Cluster, External, Location, LocationKey, LocationType, Process};
26use crate::staging_util::Invariant;
27use crate::telemetry::Sidecar;
28
29pub struct DeployFlow<'a, D>
30where
31 D: Deploy<'a>,
32{
33 pub(super) ir: Vec<HydroRoot>,
34
35 pub(super) locations: SlotMap<LocationKey, LocationType>,
36 pub(super) location_names: SecondaryMap<LocationKey, String>,
37
38 pub(super) processes: SparseSecondaryMap<LocationKey, D::Process>,
40 pub(super) clusters: SparseSecondaryMap<LocationKey, D::Cluster>,
41 pub(super) externals: SparseSecondaryMap<LocationKey, D::External>,
42
43 pub(super) sidecars: SparseSecondaryMap<LocationKey, Vec<syn::Expr>>,
46
47 pub(super) flow_name: String,
49
50 pub(super) _phantom: Invariant<'a, D>,
51}
52
53impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
54 pub fn ir(&self) -> &Vec<HydroRoot> {
55 &self.ir
56 }
57
58 pub fn flow_name(&self) -> &str {
60 &self.flow_name
61 }
62
63 pub fn with_process<P>(
64 mut self,
65 process: &Process<P>,
66 spec: impl IntoProcessSpec<'a, D>,
67 ) -> Self {
68 self.processes.insert(
69 process.key,
70 spec.into_process_spec()
71 .build(process.key, &self.location_names[process.key]),
72 );
73 self
74 }
75
76 #[doc(hidden)]
78 pub fn with_process_erased(
79 mut self,
80 process_loc_key: LocationKey,
81 spec: impl IntoProcessSpec<'a, D>,
82 ) -> Self {
83 assert_eq!(
84 Some(&LocationType::Process),
85 self.locations.get(process_loc_key),
86 "No process with the given `LocationKey` was found."
87 );
88 self.processes.insert(
89 process_loc_key,
90 spec.into_process_spec()
91 .build(process_loc_key, &self.location_names[process_loc_key]),
92 );
93 self
94 }
95
96 pub fn with_remaining_processes<S: IntoProcessSpec<'a, D> + 'a>(
97 mut self,
98 spec: impl Fn() -> S,
99 ) -> Self {
100 for (location_key, &location_type) in self.locations.iter() {
101 if LocationType::Process == location_type {
102 self.processes
103 .entry(location_key)
104 .expect("location was removed")
105 .or_insert_with(|| {
106 spec()
107 .into_process_spec()
108 .build(location_key, &self.location_names[location_key])
109 });
110 }
111 }
112 self
113 }
114
115 pub fn with_cluster<C>(mut self, cluster: &Cluster<C>, spec: impl ClusterSpec<'a, D>) -> Self {
116 self.clusters.insert(
117 cluster.key,
118 spec.build(cluster.key, &self.location_names[cluster.key]),
119 );
120 self
121 }
122
123 #[doc(hidden)]
125 pub fn with_cluster_erased(
126 mut self,
127 cluster_loc_key: LocationKey,
128 spec: impl ClusterSpec<'a, D>,
129 ) -> Self {
130 assert_eq!(
131 Some(&LocationType::Cluster),
132 self.locations.get(cluster_loc_key),
133 "No cluster with the given `LocationKey` was found."
134 );
135 self.clusters.insert(
136 cluster_loc_key,
137 spec.build(cluster_loc_key, &self.location_names[cluster_loc_key]),
138 );
139 self
140 }
141
142 pub fn with_remaining_clusters<S: ClusterSpec<'a, D> + 'a>(
143 mut self,
144 spec: impl Fn() -> S,
145 ) -> Self {
146 for (location_key, &location_type) in self.locations.iter() {
147 if LocationType::Cluster == location_type {
148 self.clusters
149 .entry(location_key)
150 .expect("location was removed")
151 .or_insert_with(|| {
152 spec().build(location_key, &self.location_names[location_key])
153 });
154 }
155 }
156 self
157 }
158
159 pub fn with_external<P>(
160 mut self,
161 external: &External<P>,
162 spec: impl ExternalSpec<'a, D>,
163 ) -> Self {
164 self.externals.insert(
165 external.key,
166 spec.build(external.key, &self.location_names[external.key]),
167 );
168 self
169 }
170
171 pub fn with_remaining_externals<S: ExternalSpec<'a, D> + 'a>(
172 mut self,
173 spec: impl Fn() -> S,
174 ) -> Self {
175 for (location_key, &location_type) in self.locations.iter() {
176 if LocationType::External == location_type {
177 self.externals
178 .entry(location_key)
179 .expect("location was removed")
180 .or_insert_with(|| {
181 spec().build(location_key, &self.location_names[location_key])
182 });
183 }
184 }
185 self
186 }
187
188 pub fn with_sidecar_all(mut self, sidecar: &impl Sidecar) -> Self {
190 for (location_key, &location_type) in self.locations.iter() {
191 if !matches!(location_type, LocationType::Process | LocationType::Cluster) {
192 continue;
193 }
194
195 let location_name = &self.location_names[location_key];
196
197 let sidecar = sidecar.to_expr(
198 self.flow_name(),
199 location_key,
200 location_type,
201 location_name,
202 "e::format_ident!("{}", super::DFIR_IDENT),
203 );
204 self.sidecars
205 .entry(location_key)
206 .expect("location was removed")
207 .or_default()
208 .push(sidecar);
209 }
210
211 self
212 }
213
214 pub fn with_sidecar_internal(
216 mut self,
217 location_key: LocationKey,
218 sidecar: &impl Sidecar,
219 ) -> Self {
220 let location_type = self.locations[location_key];
221 let location_name = &self.location_names[location_key];
222 let sidecar = sidecar.to_expr(
223 self.flow_name(),
224 location_key,
225 location_type,
226 location_name,
227 "e::format_ident!("{}", super::DFIR_IDENT),
228 );
229 self.sidecars
230 .entry(location_key)
231 .expect("location was removed")
232 .or_default()
233 .push(sidecar);
234 self
235 }
236
237 pub fn with_sidecar_process(self, process: &Process<()>, sidecar: &impl Sidecar) -> Self {
239 self.with_sidecar_internal(process.key, sidecar)
240 }
241
242 pub fn with_sidecar_cluster(self, cluster: &Cluster<()>, sidecar: &impl Sidecar) -> Self {
244 self.with_sidecar_internal(cluster.key, sidecar)
245 }
246
247 pub fn preview_compile(&mut self) -> CompiledFlow<'a> {
252 CompiledFlow {
255 dfir: build_inner(&mut self.ir),
256 extra_stmts: SparseSecondaryMap::new(),
257 sidecars: SparseSecondaryMap::new(),
258 _phantom: PhantomData,
259 }
260 }
261
262 pub fn compile(mut self) -> CompiledFlow<'a>
266 where
267 D: Deploy<'a, InstantiateEnv = ()>,
268 {
269 self.compile_internal(&mut ())
270 }
271
272 pub(super) fn compile_internal(&mut self, env: &mut D::InstantiateEnv) -> CompiledFlow<'a> {
276 let mut seen_tees: HashMap<_, _> = HashMap::new();
277 let mut seen_cluster_members = HashSet::new();
278 let mut extra_stmts = SparseSecondaryMap::new();
279 for leaf in self.ir.iter_mut() {
280 leaf.compile_network::<D>(
281 &mut extra_stmts,
282 &mut seen_tees,
283 &mut seen_cluster_members,
284 &self.processes,
285 &self.clusters,
286 &self.externals,
287 env,
288 );
289 }
290
291 CompiledFlow {
292 dfir: build_inner(&mut self.ir),
293 extra_stmts,
294 sidecars: std::mem::take(&mut self.sidecars),
295 _phantom: PhantomData,
296 }
297 }
298
299 fn cluster_id_stmts(&self, extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>) {
301 #[expect(
302 clippy::disallowed_methods,
303 reason = "nondeterministic iteration order, will be sorted"
304 )]
305 let mut all_clusters_sorted = self.clusters.keys().collect::<Vec<_>>();
306 all_clusters_sorted.sort();
307
308 for cluster_key in all_clusters_sorted {
309 let self_id_ident = syn::Ident::new(
310 &format!("__hydro_lang_cluster_self_id_{}", cluster_key),
311 Span::call_site(),
312 );
313 let self_id_expr = D::cluster_self_id().splice_untyped();
314 extra_stmts
315 .entry(cluster_key)
316 .expect("location was removed")
317 .or_default()
318 .push(syn::parse_quote! {
319 let #self_id_ident = &*Box::leak(Box::new(#self_id_expr));
320 });
321
322 let process_cluster_locations = self.location_names.keys().filter(|&location_key| {
323 self.processes.contains_key(location_key)
324 || self.clusters.contains_key(location_key)
325 });
326 for other_location in process_cluster_locations {
327 let other_id_ident = syn::Ident::new(
328 &format!("__hydro_lang_cluster_ids_{}", cluster_key),
329 Span::call_site(),
330 );
331 let other_id_expr = D::cluster_ids(cluster_key).splice_untyped();
332 extra_stmts
333 .entry(other_location)
334 .expect("location was removed")
335 .or_default()
336 .push(syn::parse_quote! {
337 let #other_id_ident = #other_id_expr;
338 });
339 }
340 }
341 }
342
343 #[must_use]
351 pub fn deploy(mut self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
352 let CompiledFlow {
353 dfir,
354 mut extra_stmts,
355 mut sidecars,
356 _phantom,
357 } = self.compile_internal(env);
358
359 let mut compiled = dfir;
360 self.cluster_id_stmts(&mut extra_stmts);
361 let mut meta = D::Meta::default();
362
363 let (processes, clusters, externals) = (
364 self.processes
365 .into_iter()
366 .filter(|&(node_key, ref node)| {
367 if let Some(ir) = compiled.remove(node_key) {
368 node.instantiate(
369 env,
370 &mut meta,
371 ir,
372 extra_stmts.remove(node_key).as_deref().unwrap_or_default(),
373 sidecars.remove(node_key).as_deref().unwrap_or_default(),
374 );
375 true
376 } else {
377 false
378 }
379 })
380 .collect::<SparseSecondaryMap<_, _>>(),
381 self.clusters
382 .into_iter()
383 .filter(|&(cluster_key, ref cluster)| {
384 if let Some(ir) = compiled.remove(cluster_key) {
385 cluster.instantiate(
386 env,
387 &mut meta,
388 ir,
389 extra_stmts
390 .remove(cluster_key)
391 .as_deref()
392 .unwrap_or_default(),
393 sidecars.remove(cluster_key).as_deref().unwrap_or_default(),
394 );
395 true
396 } else {
397 false
398 }
399 })
400 .collect::<SparseSecondaryMap<_, _>>(),
401 self.externals
402 .into_iter()
403 .inspect(|&(external_key, ref external)| {
404 assert!(!extra_stmts.contains_key(external_key));
405 assert!(!sidecars.contains_key(external_key));
406 external.instantiate(env, &mut meta, Default::default(), &[], &[]);
407 })
408 .collect::<SparseSecondaryMap<_, _>>(),
409 );
410
411 for location_key in self.locations.keys() {
412 if let Some(node) = processes.get(location_key) {
413 node.update_meta(&meta);
414 } else if let Some(cluster) = clusters.get(location_key) {
415 cluster.update_meta(&meta);
416 } else if let Some(external) = externals.get(location_key) {
417 external.update_meta(&meta);
418 }
419 }
420
421 let mut seen_tees_connect = HashMap::new();
422 for leaf in self.ir.iter_mut() {
423 leaf.connect_network(&mut seen_tees_connect);
424 }
425
426 DeployResult {
427 location_names: self.location_names,
428 processes,
429 clusters,
430 externals,
431 }
432 }
433}
434
435pub struct DeployResult<'a, D: Deploy<'a>> {
436 location_names: SecondaryMap<LocationKey, String>,
437 processes: SparseSecondaryMap<LocationKey, D::Process>,
438 clusters: SparseSecondaryMap<LocationKey, D::Cluster>,
439 externals: SparseSecondaryMap<LocationKey, D::External>,
440}
441
442impl<'a, D: Deploy<'a>> DeployResult<'a, D> {
443 pub fn get_process<P>(&self, p: &Process<P>) -> &D::Process {
444 let LocationId::Process(location_key) = p.id() else {
445 panic!("Process ID expected")
446 };
447 self.processes.get(location_key).unwrap()
448 }
449
450 pub fn get_cluster<C>(&self, c: &Cluster<'a, C>) -> &D::Cluster {
451 let LocationId::Cluster(location_key) = c.id() else {
452 panic!("Cluster ID expected")
453 };
454 self.clusters.get(location_key).unwrap()
455 }
456
457 pub fn get_external<P>(&self, e: &External<P>) -> &D::External {
458 self.externals.get(e.key).unwrap()
459 }
460
461 pub fn get_all_processes(&self) -> impl Iterator<Item = (LocationId, &str, &D::Process)> {
462 self.location_names
463 .iter()
464 .filter_map(|(location_key, location_name)| {
465 self.processes
466 .get(location_key)
467 .map(|process| (LocationId::Process(location_key), &**location_name, process))
468 })
469 }
470
471 pub fn get_all_clusters(&self) -> impl Iterator<Item = (LocationId, &str, &D::Cluster)> {
472 self.location_names
473 .iter()
474 .filter_map(|(location_key, location_name)| {
475 self.clusters
476 .get(location_key)
477 .map(|cluster| (LocationId::Cluster(location_key), &**location_name, cluster))
478 })
479 }
480
481 #[deprecated(note = "use `connect` instead")]
482 pub async fn connect_bytes<M>(
483 &self,
484 port: ExternalBytesPort<M>,
485 ) -> (
486 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
487 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
488 ) {
489 self.connect(port).await
490 }
491
492 #[deprecated(note = "use `connect` instead")]
493 pub async fn connect_sink_bytes<M>(
494 &self,
495 port: ExternalBytesPort<M>,
496 ) -> Pin<Box<dyn Sink<Bytes, Error = Error>>> {
497 self.connect(port).await.1
498 }
499
500 pub async fn connect_bincode<
501 InT: Serialize + 'static,
502 OutT: DeserializeOwned + 'static,
503 Many,
504 >(
505 &self,
506 port: ExternalBincodeBidi<InT, OutT, Many>,
507 ) -> (
508 Pin<Box<dyn Stream<Item = OutT>>>,
509 Pin<Box<dyn Sink<InT, Error = Error>>>,
510 ) {
511 self.externals
512 .get(port.process_key)
513 .unwrap()
514 .as_bincode_bidi(port.port_id)
515 .await
516 }
517
518 #[deprecated(note = "use `connect` instead")]
519 pub async fn connect_sink_bincode<T: Serialize + DeserializeOwned + 'static, Many>(
520 &self,
521 port: ExternalBincodeSink<T, Many>,
522 ) -> Pin<Box<dyn Sink<T, Error = Error>>> {
523 self.connect(port).await
524 }
525
526 #[deprecated(note = "use `connect` instead")]
527 pub async fn connect_source_bytes(
528 &self,
529 port: ExternalBytesPort,
530 ) -> Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>> {
531 self.connect(port).await.0
532 }
533
534 #[deprecated(note = "use `connect` instead")]
535 pub async fn connect_source_bincode<
536 T: Serialize + DeserializeOwned + 'static,
537 O: Ordering,
538 R: Retries,
539 >(
540 &self,
541 port: ExternalBincodeStream<T, O, R>,
542 ) -> Pin<Box<dyn Stream<Item = T>>> {
543 self.connect(port).await
544 }
545
546 pub async fn connect<'b, P: ConnectableAsync<&'b Self>>(
547 &'b self,
548 port: P,
549 ) -> <P as ConnectableAsync<&'b Self>>::Output {
550 port.connect(self).await
551 }
552}
553
554#[cfg(stageleft_runtime)]
555#[cfg(feature = "deploy")]
556#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
557impl DeployResult<'_, crate::deploy::HydroDeploy> {
558 pub fn raw_port<M>(
560 &self,
561 port: ExternalBytesPort<M>,
562 ) -> hydro_deploy::custom_service::CustomClientPort {
563 self.externals
564 .get(port.process_key)
565 .unwrap()
566 .raw_port(port.port_id)
567 }
568}
569
570pub trait ConnectableAsync<Ctx> {
571 type Output;
572
573 fn connect(self, ctx: Ctx) -> impl Future<Output = Self::Output>;
574}
575
576impl<'a, D: Deploy<'a>, M> ConnectableAsync<&DeployResult<'a, D>> for ExternalBytesPort<M> {
577 type Output = (
578 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
579 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
580 );
581
582 async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
583 ctx.externals
584 .get(self.process_key)
585 .unwrap()
586 .as_bytes_bidi(self.port_id)
587 .await
588 }
589}
590
591impl<'a, D: Deploy<'a>, T: DeserializeOwned + 'static, O: Ordering, R: Retries>
592 ConnectableAsync<&DeployResult<'a, D>> for ExternalBincodeStream<T, O, R>
593{
594 type Output = Pin<Box<dyn Stream<Item = T>>>;
595
596 async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
597 ctx.externals
598 .get(self.process_key)
599 .unwrap()
600 .as_bincode_source(self.port_id)
601 .await
602 }
603}
604
605impl<'a, D: Deploy<'a>, T: Serialize + 'static, Many> ConnectableAsync<&DeployResult<'a, D>>
606 for ExternalBincodeSink<T, Many>
607{
608 type Output = Pin<Box<dyn Sink<T, Error = Error>>>;
609
610 async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
611 ctx.externals
612 .get(self.process_key)
613 .unwrap()
614 .as_bincode_sink(self.port_id)
615 .await
616 }
617}