1use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bollard::Docker;
9use bollard::models::{ContainerCreateBody, EndpointSettings, HostConfig, NetworkCreateRequest};
10use bollard::query_parameters::{
11 BuildImageOptions, CreateContainerOptions, InspectContainerOptions, KillContainerOptions,
12 RemoveContainerOptions, StartContainerOptions,
13};
14use bollard::secret::NetworkingConfig;
15use bytes::Bytes;
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, SinkExt, Stream, StreamExt};
18use http_body_util::Full;
19use hydro_deploy::rust_crate::build::{BuildError, build_crate_memoized};
20use hydro_deploy::{LinuxCompileType, RustCrate};
21use nanoid::nanoid;
22use proc_macro2::Span;
23use sinktools::lazy::LazySink;
24use stageleft::QuotedWithContext;
25use syn::parse_quote;
26use tar::{Builder, Header};
27use tokio::net::TcpStream;
28use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
29use tracing::{Instrument, instrument, trace, warn};
30
31use super::deploy_runtime_containerized::*;
32use crate::compile::builder::ExternalPortId;
33use crate::compile::deploy::DeployResult;
34use crate::compile::deploy_provider::{
35 ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
36};
37use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
38use crate::location::dynamic::LocationId;
39use crate::location::member_id::TaglessMemberId;
40use crate::location::{LocationKey, MembershipEvent, NetworkHint};
41
42#[derive(Clone, Debug)]
44pub struct DockerNetwork {
45 name: String,
46}
47
48impl DockerNetwork {
49 pub fn new(name: String) -> Self {
51 Self {
52 name: format!("{name}-{}", nanoid::nanoid!(6, &CONTAINER_ALPHABET)),
53 }
54 }
55}
56
57#[derive(Clone)]
59pub struct DockerDeployProcess {
60 key: LocationKey,
61 name: String,
62 next_port: Rc<RefCell<u16>>,
63 rust_crate: Rc<RefCell<Option<RustCrate>>>,
64
65 exposed_ports: Rc<RefCell<Vec<u16>>>,
66
67 docker_container_name: Rc<RefCell<Option<String>>>,
68
69 compilation_options: Option<String>,
70
71 config: Vec<String>,
72
73 network: DockerNetwork,
74}
75
76impl Node for DockerDeployProcess {
77 type Port = u16;
78 type Meta = ();
79 type InstantiateEnv = DockerDeploy;
80
81 #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
82 fn next_port(&self) -> Self::Port {
83 let port = {
84 let mut borrow = self.next_port.borrow_mut();
85 let port = *borrow;
86 *borrow += 1;
87 port
88 };
89
90 port
91 }
92
93 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
94 fn update_meta(&self, _meta: &Self::Meta) {}
95
96 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
97 fn instantiate(
98 &self,
99 _env: &mut Self::InstantiateEnv,
100 meta: &mut Self::Meta,
101 graph: DfirGraph,
102 extra_stmts: &[syn::Stmt],
103 sidecars: &[syn::Expr],
104 ) {
105 let (bin_name, config) = create_graph_trybuild(
106 graph,
107 extra_stmts,
108 sidecars,
109 Some(&self.name),
110 crate::compile::trybuild::generate::DeployMode::Containerized,
111 LinkingMode::Static,
112 );
113
114 let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
115 .target_dir(config.target_dir)
116 .example(bin_name)
117 .no_default_features();
118
119 ret = ret.display_name("test_display_name");
120
121 ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
122
123 if let Some(features) = config.features {
124 ret = ret.features(features);
125 }
126
127 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
128 ret = ret.config("build.incremental = false");
129
130 *self.rust_crate.borrow_mut() = Some(ret);
131 }
132}
133
134#[derive(Clone)]
136pub struct DockerDeployCluster {
137 key: LocationKey,
138 name: String,
139 next_port: Rc<RefCell<u16>>,
140 rust_crate: Rc<RefCell<Option<RustCrate>>>,
141
142 docker_container_name: Rc<RefCell<Vec<String>>>,
143
144 compilation_options: Option<String>,
145
146 config: Vec<String>,
147
148 count: usize,
149}
150
151impl Node for DockerDeployCluster {
152 type Port = u16;
153 type Meta = ();
154 type InstantiateEnv = DockerDeploy;
155
156 #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
157 fn next_port(&self) -> Self::Port {
158 let port = {
159 let mut borrow = self.next_port.borrow_mut();
160 let port = *borrow;
161 *borrow += 1;
162 port
163 };
164
165 port
166 }
167
168 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
169 fn update_meta(&self, _meta: &Self::Meta) {}
170
171 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, extra_stmts = extra_stmts.len()))]
172 fn instantiate(
173 &self,
174 _env: &mut Self::InstantiateEnv,
175 _meta: &mut Self::Meta,
176 graph: DfirGraph,
177 extra_stmts: &[syn::Stmt],
178 sidecars: &[syn::Expr],
179 ) {
180 let (bin_name, config) = create_graph_trybuild(
181 graph,
182 extra_stmts,
183 sidecars,
184 Some(&self.name),
185 crate::compile::trybuild::generate::DeployMode::Containerized,
186 LinkingMode::Static,
187 );
188
189 let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
190 .target_dir(config.target_dir)
191 .example(bin_name)
192 .no_default_features();
193
194 ret = ret.display_name("test_display_name");
195
196 ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
197
198 if let Some(features) = config.features {
199 ret = ret.features(features);
200 }
201
202 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
203 ret = ret.config("build.incremental = false");
204
205 *self.rust_crate.borrow_mut() = Some(ret);
206 }
207}
208
209#[derive(Clone, Debug)]
211pub struct DockerDeployExternal {
212 name: String,
213 next_port: Rc<RefCell<u16>>,
214
215 ports: Rc<RefCell<HashMap<ExternalPortId, u16>>>,
216
217 #[expect(clippy::type_complexity, reason = "internal code")]
218 connection_info: Rc<RefCell<HashMap<u16, (Rc<RefCell<Option<String>>>, u16, DockerNetwork)>>>,
219}
220
221impl Node for DockerDeployExternal {
222 type Port = u16;
223 type Meta = ();
224 type InstantiateEnv = DockerDeploy;
225
226 #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
227 fn next_port(&self) -> Self::Port {
228 let port = {
229 let mut borrow = self.next_port.borrow_mut();
230 let port = *borrow;
231 *borrow += 1;
232 port
233 };
234
235 port
236 }
237
238 #[instrument(level = "trace", skip_all, fields(name = self.name))]
239 fn update_meta(&self, _meta: &Self::Meta) {}
240
241 #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
242 fn instantiate(
243 &self,
244 _env: &mut Self::InstantiateEnv,
245 meta: &mut Self::Meta,
246 graph: DfirGraph,
247 extra_stmts: &[syn::Stmt],
248 sidecars: &[syn::Expr],
249 ) {
250 trace!(name: "surface", surface = graph.surface_syntax_string());
251 }
252}
253
254type DynSourceSink<Out, In, InErr> = (
255 Pin<Box<dyn Stream<Item = Out>>>,
256 Pin<Box<dyn Sink<In, Error = InErr>>>,
257);
258
259impl<'a> RegisterPort<'a, DockerDeploy> for DockerDeployExternal {
260 #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
261 fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
262 self.ports.borrow_mut().insert(external_port_id, port);
263 }
264
265 fn as_bytes_bidi(
266 &self,
267 external_port_id: ExternalPortId,
268 ) -> impl Future<
269 Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
270 > + 'a {
271 let guard =
272 tracing::trace_span!("as_bytes_bidi", name = %self.name, %external_port_id).entered();
273
274 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
275 let (docker_container_name, remote_port, _) = self
276 .connection_info
277 .borrow()
278 .get(&local_port)
279 .unwrap()
280 .clone();
281
282 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
283
284 async move {
285 let local_port =
286 find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
287 let remote_ip_address = "localhost";
288
289 trace!(name: "as_bytes_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
290
291 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
292 .await
293 .unwrap();
294
295 trace!(name: "as_bytes_bidi_connected", to = %remote_ip_address, to_port = %local_port);
296
297 let (rx, tx) = stream.into_split();
298
299 let source = Box::pin(
300 FramedRead::new(rx, LengthDelimitedCodec::new()),
301 ) as Pin<Box<dyn Stream<Item = Result<bytes::BytesMut, std::io::Error>>>>;
302
303 let sink = Box::pin(FramedWrite::new(tx, LengthDelimitedCodec::new()))
304 as Pin<Box<dyn Sink<Bytes, Error = std::io::Error>>>;
305
306 (source, sink)
307 }
308 .instrument(guard.exit())
309 }
310
311 fn as_bincode_bidi<InT, OutT>(
312 &self,
313 external_port_id: ExternalPortId,
314 ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
315 where
316 InT: serde::Serialize + 'static,
317 OutT: serde::de::DeserializeOwned + 'static,
318 {
319 let guard =
320 tracing::trace_span!("as_bincode_bidi", name = %self.name, %external_port_id).entered();
321
322 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
323 let (docker_container_name, remote_port, _) = self
324 .connection_info
325 .borrow()
326 .get(&local_port)
327 .unwrap()
328 .clone();
329
330 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
331
332 async move {
333 let local_port =
334 find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
335 let remote_ip_address = "localhost";
336
337 trace!(name: "as_bincode_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
338
339 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
340 .await
341 .unwrap();
342
343 trace!(name: "as_bincode_bidi_connected", to = %remote_ip_address, to_port = %local_port);
344
345 let (rx, tx) = stream.into_split();
346
347 let source = Box::pin(
348 FramedRead::new(rx, LengthDelimitedCodec::new())
349 .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
350 ) as Pin<Box<dyn Stream<Item = OutT>>>;
351
352 let sink = Box::pin(
353 FramedWrite::new(tx, LengthDelimitedCodec::new()).with(move |v: InT| async move {
354 Ok::<_, std::io::Error>(Bytes::from(bincode::serialize(&v).unwrap()))
355 }),
356 ) as Pin<Box<dyn Sink<InT, Error = std::io::Error>>>;
357
358 (source, sink)
359 }
360 .instrument(guard.exit())
361 }
362
363 fn as_bincode_sink<T>(
364 &self,
365 external_port_id: ExternalPortId,
366 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
367 where
368 T: serde::Serialize + 'static,
369 {
370 let guard =
371 tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
372
373 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
374 let (docker_container_name, remote_port, _) = self
375 .connection_info
376 .borrow()
377 .get(&local_port)
378 .unwrap()
379 .clone();
380
381 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
382
383 async move {
384 let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
385 let remote_ip_address = "localhost";
386
387 Box::pin(
388 LazySink::new(move || {
389 Box::pin(async move {
390 trace!(name: "as_bincode_sink_connecting", to = %remote_ip_address, to_port = %local_port);
391
392 let stream =
393 TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
394 .await?;
395
396 trace!(name: "as_bincode_sink_connected", to = %remote_ip_address, to_port = %local_port);
397
398 Result::<_, std::io::Error>::Ok(FramedWrite::new(
399 stream,
400 LengthDelimitedCodec::new(),
401 ))
402 })
403 })
404 .with(move |v| async move {
405 Ok(Bytes::from(bincode::serialize(&v).unwrap()))
406 }),
407 ) as Pin<Box<dyn Sink<T, Error = std::io::Error>>>
408 }
409 .instrument(guard.exit())
410 }
411
412 fn as_bincode_source<T>(
413 &self,
414 external_port_id: ExternalPortId,
415 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
416 where
417 T: serde::de::DeserializeOwned + 'static,
418 {
419 let guard =
420 tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
421
422 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
423 let (docker_container_name, remote_port, _) = self
424 .connection_info
425 .borrow()
426 .get(&local_port)
427 .unwrap()
428 .clone();
429
430 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
431
432 async move {
433
434 let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
435 let remote_ip_address = "localhost";
436
437 trace!(name: "as_bincode_source_connecting", to = %remote_ip_address, to_port = %local_port);
438
439 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
440 .await
441 .unwrap();
442
443 trace!(name: "as_bincode_source_connected", to = %remote_ip_address, to_port = %local_port);
444
445 Box::pin(
446 FramedRead::new(stream, LengthDelimitedCodec::new())
447 .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
448 ) as Pin<Box<dyn Stream<Item = T>>>
449 }
450 .instrument(guard.exit())
451 }
452}
453
454#[instrument(level = "trace", skip_all, fields(%docker_container_name, %destination_port))]
455async fn find_dynamically_allocated_docker_port(
456 docker_container_name: &str,
457 destination_port: u16,
458) -> u16 {
459 let docker = Docker::connect_with_local_defaults().unwrap();
460
461 let container_info = docker
462 .inspect_container(docker_container_name, None::<InspectContainerOptions>)
463 .await
464 .unwrap();
465
466 trace!(name: "port struct", container_info = ?container_info.network_settings.as_ref().unwrap().ports.as_ref().unwrap());
467
468 let remote_port = container_info
470 .network_settings
471 .as_ref()
472 .unwrap()
473 .ports
474 .as_ref()
475 .unwrap()
476 .get(&format!("{destination_port}/tcp"))
477 .unwrap()
478 .as_ref()
479 .unwrap()
480 .iter()
481 .find(|v| v.host_ip == Some("0.0.0.0".to_owned()))
482 .unwrap()
483 .host_port
484 .as_ref()
485 .unwrap()
486 .parse()
487 .unwrap();
488
489 remote_port
490}
491
492pub struct DockerDeploy {
494 docker_processes: Vec<DockerDeployProcessSpec>,
495 docker_clusters: Vec<DockerDeployClusterSpec>,
496 network: DockerNetwork,
497 deployment_instance: String,
498}
499
500#[instrument(level = "trace", skip_all, fields(%image_name, %container_name, %network_name, %deployment_instance))]
501async fn create_and_start_container(
502 docker: &Docker,
503 container_name: &str,
504 image_name: &str,
505 network_name: &str,
506 deployment_instance: &str,
507) -> Result<(), anyhow::Error> {
508 let config = ContainerCreateBody {
509 image: Some(image_name.to_owned()),
510 hostname: Some(container_name.to_owned()),
511 host_config: Some(HostConfig {
512 binds: Some(vec!["/var/run/docker.sock:/var/run/docker.sock".to_owned()]),
513 publish_all_ports: Some(true),
514 port_bindings: Some(HashMap::new()), ..Default::default()
516 }),
517 env: Some(vec![
518 format!("CONTAINER_NAME={container_name}"),
519 format!("DEPLOYMENT_INSTANCE={deployment_instance}"),
520 format!("RUST_LOG=trace"),
521 ]),
522 networking_config: Some(NetworkingConfig {
523 endpoints_config: Some(HashMap::from([(
524 network_name.to_owned(),
525 EndpointSettings {
526 ..Default::default()
527 },
528 )])),
529 }),
530 tty: Some(true),
531 ..Default::default()
532 };
533
534 let options = CreateContainerOptions {
535 name: Some(container_name.to_owned()),
536 ..Default::default()
537 };
538
539 tracing::error!("Config: {}", serde_json::to_string_pretty(&config).unwrap());
540 docker.create_container(Some(options), config).await?;
541 docker
542 .start_container(container_name, None::<StartContainerOptions>)
543 .await?;
544
545 Ok(())
546}
547
548#[instrument(level = "trace", skip_all, fields(%image_name))]
549async fn build_and_create_image(
550 rust_crate: &Rc<RefCell<Option<RustCrate>>>,
551 compilation_options: Option<&str>,
552 config: &[String],
553 exposed_ports: &[u16],
554 image_name: &str,
555) -> Result<(), anyhow::Error> {
556 let mut rust_crate = rust_crate
557 .borrow_mut()
558 .take()
559 .unwrap()
560 .rustflags(compilation_options.unwrap_or_default());
561
562 for cfg in config {
563 rust_crate = rust_crate.config(cfg);
564 }
565
566 let build_output = match build_crate_memoized(
567 rust_crate.get_build_params(hydro_deploy::HostTargetType::Linux(LinuxCompileType::Musl)),
568 )
569 .await
570 {
571 Ok(build_output) => build_output,
572 Err(BuildError::FailedToBuildCrate {
573 exit_status,
574 diagnostics,
575 text_lines,
576 stderr_lines,
577 }) => {
578 let diagnostics = diagnostics
579 .into_iter()
580 .map(|d| d.rendered.unwrap())
581 .collect::<Vec<_>>()
582 .join("\n");
583 let text_lines = text_lines.join("\n");
584 let stderr_lines = stderr_lines.join("\n");
585
586 anyhow::bail!(
587 r#"
588Failed to build crate {exit_status:?}
589--- diagnostics
590---
591{diagnostics}
592---
593---
594---
595
596--- text_lines
597---
598---
599{text_lines}
600---
601---
602---
603
604--- stderr_lines
605---
606---
607{stderr_lines}
608---
609---
610---"#
611 );
612 }
613 Err(err) => {
614 anyhow::bail!("Failed to build crate {err:?}");
615 }
616 };
617
618 let docker = Docker::connect_with_local_defaults()?;
619
620 let mut tar_data = Vec::new();
621 {
622 let mut tar = Builder::new(&mut tar_data);
623
624 let exposed_ports = exposed_ports
625 .iter()
626 .map(|port| format!("EXPOSE {port}/tcp"))
627 .collect::<Vec<_>>()
628 .join("\n");
629
630 let dockerfile_content = format!(
631 r#"
632 FROM scratch
633 {exposed_ports}
634 COPY app /app
635 CMD ["/app"]
636 "#,
637 );
638
639 trace!(name: "dockerfile", %dockerfile_content);
640
641 let mut header = Header::new_gnu();
642 header.set_path("Dockerfile")?;
643 header.set_size(dockerfile_content.len() as u64);
644 header.set_cksum();
645 tar.append(&header, dockerfile_content.as_bytes())?;
646
647 let mut header = Header::new_gnu();
648 header.set_path("app")?;
649 header.set_size(build_output.bin_data.len() as u64);
650 header.set_mode(0o755);
651 header.set_cksum();
652 tar.append(&header, &build_output.bin_data[..])?;
653
654 tar.finish()?;
655 }
656
657 let build_options = BuildImageOptions {
658 dockerfile: "Dockerfile".to_owned(),
659 t: Some(image_name.to_owned()),
660 rm: true,
661 ..Default::default()
662 };
663
664 use bollard::errors::Error;
665
666 let body = http_body_util::Either::Left(Full::new(Bytes::from(tar_data)));
667 let mut build_stream = docker.build_image(build_options, None, Some(body));
668 while let Some(msg) = build_stream.next().await {
669 match msg {
670 Ok(_) => {}
671 Err(e) => match e {
672 Error::DockerStreamError { error } => {
673 return Err(anyhow::anyhow!(
674 "Docker build failed: DockerStreamError: {{ error: {error} }}"
675 ));
676 }
677 _ => return Err(anyhow::anyhow!("Docker build failed: {}", e)),
678 },
679 }
680 }
681
682 Ok(())
683}
684
685impl DockerDeploy {
686 pub fn new(network: DockerNetwork) -> Self {
688 Self {
689 docker_processes: Vec::new(),
690 docker_clusters: Vec::new(),
691 network,
692 deployment_instance: nanoid!(6, &CONTAINER_ALPHABET),
693 }
694 }
695
696 pub fn add_localhost_docker(
698 &mut self,
699 compilation_options: Option<String>,
700 config: Vec<String>,
701 ) -> DockerDeployProcessSpec {
702 let process = DockerDeployProcessSpec {
703 compilation_options,
704 config,
705 network: self.network.clone(),
706 deployment_instance: self.deployment_instance.clone(),
707 };
708
709 self.docker_processes.push(process.clone());
710
711 process
712 }
713
714 pub fn add_localhost_docker_cluster(
716 &mut self,
717 compilation_options: Option<String>,
718 config: Vec<String>,
719 count: usize,
720 ) -> DockerDeployClusterSpec {
721 let cluster = DockerDeployClusterSpec {
722 compilation_options,
723 config,
724 count,
725 deployment_instance: self.deployment_instance.clone(),
726 };
727
728 self.docker_clusters.push(cluster.clone());
729
730 cluster
731 }
732
733 pub fn add_external(&self, name: String) -> DockerDeployExternalSpec {
735 DockerDeployExternalSpec { name }
736 }
737
738 pub fn get_deployment_instance(&self) -> String {
740 self.deployment_instance.clone()
741 }
742
743 #[instrument(level = "trace", skip_all)]
745 pub async fn provision(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
746 for (_, _, process) in nodes.get_all_processes() {
747 let exposed_ports = process.exposed_ports.borrow().clone();
748
749 build_and_create_image(
750 &process.rust_crate,
751 process.compilation_options.as_deref(),
752 &process.config,
753 &exposed_ports,
754 &process.name,
755 )
756 .await?;
757 }
758
759 for (_, _, cluster) in nodes.get_all_clusters() {
760 build_and_create_image(
761 &cluster.rust_crate,
762 cluster.compilation_options.as_deref(),
763 &cluster.config,
764 &[], &cluster.name,
766 )
767 .await?;
768 }
769
770 Ok(())
771 }
772
773 #[instrument(level = "trace", skip_all)]
775 pub async fn start(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
776 let docker = Docker::connect_with_local_defaults()?;
777
778 match docker
779 .create_network(NetworkCreateRequest {
780 name: self.network.name.clone(),
781 driver: Some("bridge".to_owned()),
782 ..Default::default()
783 })
784 .await
785 {
786 Ok(v) => v.id,
787 Err(e) => {
788 panic!("Failed to create docker network: {e:?}");
789 }
790 };
791
792 for (_, _, process) in nodes.get_all_processes() {
793 let docker_container_name: String = get_docker_container_name(&process.name, None);
794 *process.docker_container_name.borrow_mut() = Some(docker_container_name.clone());
795
796 create_and_start_container(
797 &docker,
798 &docker_container_name,
799 &process.name,
800 &self.network.name,
801 &self.deployment_instance,
802 )
803 .await?;
804 }
805
806 for (_, _, cluster) in nodes.get_all_clusters() {
807 for num in 0..cluster.count {
808 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
809 cluster
810 .docker_container_name
811 .borrow_mut()
812 .push(docker_container_name.clone());
813
814 create_and_start_container(
815 &docker,
816 &docker_container_name,
817 &cluster.name,
818 &self.network.name,
819 &self.deployment_instance,
820 )
821 .await?;
822 }
823 }
824
825 Ok(())
826 }
827
828 #[instrument(level = "trace", skip_all)]
830 pub async fn stop(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
831 let docker = Docker::connect_with_local_defaults()?;
832
833 for (_, _, process) in nodes.get_all_processes() {
834 let docker_container_name: String = get_docker_container_name(&process.name, None);
835
836 docker
837 .kill_container(&docker_container_name, None::<KillContainerOptions>)
838 .await?;
839 }
840
841 for (_, _, cluster) in nodes.get_all_clusters() {
842 for num in 0..cluster.count {
843 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
844
845 docker
846 .kill_container(&docker_container_name, None::<KillContainerOptions>)
847 .await?;
848 }
849 }
850
851 Ok(())
852 }
853
854 #[instrument(level = "trace", skip_all)]
856 pub async fn cleanup(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
857 let docker = Docker::connect_with_local_defaults()?;
858
859 for (_, _, process) in nodes.get_all_processes() {
860 let docker_container_name: String = get_docker_container_name(&process.name, None);
861
862 docker
863 .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
864 .await?;
865 }
866
867 for (_, _, cluster) in nodes.get_all_clusters() {
868 for num in 0..cluster.count {
869 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
870
871 docker
872 .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
873 .await?;
874 }
875 }
876
877 docker
878 .remove_network(&self.network.name)
879 .await
880 .map_err(|e| anyhow::anyhow!("Failed to remove docker network: {e:?}"))?;
881
882 use bollard::query_parameters::RemoveImageOptions;
883
884 for (_, _, process) in nodes.get_all_processes() {
885 docker
886 .remove_image(&process.name, None::<RemoveImageOptions>, None)
887 .await?;
888 }
889
890 for (_, _, cluster) in nodes.get_all_clusters() {
891 docker
892 .remove_image(&cluster.name, None::<RemoveImageOptions>, None)
893 .await?;
894 }
895
896 Ok(())
897 }
898}
899
900impl<'a> Deploy<'a> for DockerDeploy {
901 type Meta = ();
902 type InstantiateEnv = Self;
903
904 type Process = DockerDeployProcess;
905 type Cluster = DockerDeployCluster;
906 type External = DockerDeployExternal;
907
908 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
909 fn o2o_sink_source(
910 _env: &mut Self::InstantiateEnv,
911 p1: &Self::Process,
912 p1_port: &<Self::Process as Node>::Port,
913 p2: &Self::Process,
914 p2_port: &<Self::Process as Node>::Port,
915 _name: Option<&str>,
916 ) -> (syn::Expr, syn::Expr) {
917 let bind_addr = format!("0.0.0.0:{}", p2_port);
918 let target = format!("{}:{p2_port}", p2.name);
919
920 deploy_containerized_o2o(target.as_str(), bind_addr.as_str())
921 }
922
923 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
924 fn o2o_connect(
925 p1: &Self::Process,
926 p1_port: &<Self::Process as Node>::Port,
927 p2: &Self::Process,
928 p2_port: &<Self::Process as Node>::Port,
929 ) -> Box<dyn FnOnce()> {
930 let serialized = format!("o2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
931
932 Box::new(move || {
933 trace!(name: "o2o_connect thunk", %serialized);
934 })
935 }
936
937 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
938 fn o2m_sink_source(
939 _env: &mut Self::InstantiateEnv,
940 p1: &Self::Process,
941 p1_port: &<Self::Process as Node>::Port,
942 c2: &Self::Cluster,
943 c2_port: &<Self::Cluster as Node>::Port,
944 _name: Option<&str>,
945 ) -> (syn::Expr, syn::Expr) {
946 deploy_containerized_o2m(*c2_port)
947 }
948
949 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
950 fn o2m_connect(
951 p1: &Self::Process,
952 p1_port: &<Self::Process as Node>::Port,
953 c2: &Self::Cluster,
954 c2_port: &<Self::Cluster as Node>::Port,
955 ) -> Box<dyn FnOnce()> {
956 let serialized = format!("o2m_connect {}:{p1_port} -> {}:{c2_port}", p1.name, c2.name);
957
958 Box::new(move || {
959 trace!(name: "o2m_connect thunk", %serialized);
960 })
961 }
962
963 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
964 fn m2o_sink_source(
965 _env: &mut Self::InstantiateEnv,
966 c1: &Self::Cluster,
967 c1_port: &<Self::Cluster as Node>::Port,
968 p2: &Self::Process,
969 p2_port: &<Self::Process as Node>::Port,
970 _name: Option<&str>,
971 ) -> (syn::Expr, syn::Expr) {
972 deploy_containerized_m2o(*p2_port, &p2.name)
973 }
974
975 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
976 fn m2o_connect(
977 c1: &Self::Cluster,
978 c1_port: &<Self::Cluster as Node>::Port,
979 p2: &Self::Process,
980 p2_port: &<Self::Process as Node>::Port,
981 ) -> Box<dyn FnOnce()> {
982 let serialized = format!("o2m_connect {}:{c1_port} -> {}:{p2_port}", c1.name, p2.name);
983
984 Box::new(move || {
985 trace!(name: "m2o_connect thunk", %serialized);
986 })
987 }
988
989 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
990 fn m2m_sink_source(
991 _env: &mut Self::InstantiateEnv,
992 c1: &Self::Cluster,
993 c1_port: &<Self::Cluster as Node>::Port,
994 c2: &Self::Cluster,
995 c2_port: &<Self::Cluster as Node>::Port,
996 _name: Option<&str>,
997 ) -> (syn::Expr, syn::Expr) {
998 deploy_containerized_m2m(*c2_port)
999 }
1000
1001 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1002 fn m2m_connect(
1003 c1: &Self::Cluster,
1004 c1_port: &<Self::Cluster as Node>::Port,
1005 c2: &Self::Cluster,
1006 c2_port: &<Self::Cluster as Node>::Port,
1007 ) -> Box<dyn FnOnce()> {
1008 let serialized = format!("m2m_connect {}:{c1_port} -> {}:{c2_port}", c1.name, c2.name);
1009
1010 Box::new(move || {
1011 trace!(name: "m2m_connect thunk", %serialized);
1012 })
1013 }
1014
1015 #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
1016 fn e2o_many_source(
1017 extra_stmts: &mut Vec<syn::Stmt>,
1018 p2: &Self::Process,
1019 p2_port: &<Self::Process as Node>::Port,
1020 codec_type: &syn::Type,
1021 shared_handle: String,
1022 ) -> syn::Expr {
1023 p2.exposed_ports.borrow_mut().push(*p2_port);
1024
1025 let socket_ident = syn::Ident::new(
1026 &format!("__hydro_deploy_many_{}_socket", &shared_handle),
1027 Span::call_site(),
1028 );
1029
1030 let source_ident = syn::Ident::new(
1031 &format!("__hydro_deploy_many_{}_source", &shared_handle),
1032 Span::call_site(),
1033 );
1034
1035 let sink_ident = syn::Ident::new(
1036 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1037 Span::call_site(),
1038 );
1039
1040 let membership_ident = syn::Ident::new(
1041 &format!("__hydro_deploy_many_{}_membership", &shared_handle),
1042 Span::call_site(),
1043 );
1044
1045 let bind_addr = format!("0.0.0.0:{}", p2_port);
1046
1047 extra_stmts.push(syn::parse_quote! {
1048 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1049 });
1050
1051 let root = crate::staging_util::get_this_crate();
1052
1053 extra_stmts.push(syn::parse_quote! {
1054 let (#source_ident, #sink_ident, #membership_ident) = #root::runtime_support::hydro_deploy_integration::multi_connection::tcp_multi_connection::<_, #codec_type>(#socket_ident);
1055 });
1056
1057 parse_quote!(#source_ident)
1058 }
1059
1060 #[instrument(level = "trace", skip_all, fields(%shared_handle))]
1061 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
1062 let sink_ident = syn::Ident::new(
1063 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1064 Span::call_site(),
1065 );
1066 parse_quote!(#sink_ident)
1067 }
1068
1069 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1070 fn e2o_source(
1071 extra_stmts: &mut Vec<syn::Stmt>,
1072 p1: &Self::External,
1073 p1_port: &<Self::External as Node>::Port,
1074 p2: &Self::Process,
1075 p2_port: &<Self::Process as Node>::Port,
1076 _codec_type: &syn::Type,
1077 shared_handle: String,
1078 ) -> syn::Expr {
1079 p1.connection_info.borrow_mut().insert(
1080 *p1_port,
1081 (
1082 p2.docker_container_name.clone(),
1083 *p2_port,
1084 p2.network.clone(),
1085 ),
1086 );
1087
1088 p2.exposed_ports.borrow_mut().push(*p2_port);
1089
1090 let socket_ident = syn::Ident::new(
1091 &format!("__hydro_deploy_{}_socket", &shared_handle),
1092 Span::call_site(),
1093 );
1094
1095 let source_ident = syn::Ident::new(
1096 &format!("__hydro_deploy_{}_source", &shared_handle),
1097 Span::call_site(),
1098 );
1099
1100 let sink_ident = syn::Ident::new(
1101 &format!("__hydro_deploy_{}_sink", &shared_handle),
1102 Span::call_site(),
1103 );
1104
1105 let bind_addr = format!("0.0.0.0:{}", p2_port);
1106
1107 extra_stmts.push(syn::parse_quote! {
1108 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1109 });
1110
1111 let create_expr = deploy_containerized_external_sink_source_ident(socket_ident);
1112
1113 extra_stmts.push(syn::parse_quote! {
1114 let (#sink_ident, #source_ident) = (#create_expr).split();
1115 });
1116
1117 parse_quote!(#source_ident)
1118 }
1119
1120 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
1121 fn e2o_connect(
1122 p1: &Self::External,
1123 p1_port: &<Self::External as Node>::Port,
1124 p2: &Self::Process,
1125 p2_port: &<Self::Process as Node>::Port,
1126 many: bool,
1127 server_hint: NetworkHint,
1128 ) -> Box<dyn FnOnce()> {
1129 if server_hint != NetworkHint::Auto {
1130 panic!(
1131 "Docker deployment only supports NetworkHint::Auto, got {:?}",
1132 server_hint
1133 );
1134 }
1135
1136 if many {
1138 p1.connection_info.borrow_mut().insert(
1139 *p1_port,
1140 (
1141 p2.docker_container_name.clone(),
1142 *p2_port,
1143 p2.network.clone(),
1144 ),
1145 );
1146 }
1147
1148 let serialized = format!("e2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
1149
1150 Box::new(move || {
1151 trace!(name: "e2o_connect thunk", %serialized);
1152 })
1153 }
1154
1155 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1156 fn o2e_sink(
1157 p1: &Self::Process,
1158 p1_port: &<Self::Process as Node>::Port,
1159 p2: &Self::External,
1160 p2_port: &<Self::External as Node>::Port,
1161 shared_handle: String,
1162 ) -> syn::Expr {
1163 let sink_ident = syn::Ident::new(
1164 &format!("__hydro_deploy_{}_sink", &shared_handle),
1165 Span::call_site(),
1166 );
1167 parse_quote!(#sink_ident)
1168 }
1169
1170 #[instrument(level = "trace", skip_all, fields(%of_cluster))]
1171 fn cluster_ids(
1172 of_cluster: LocationKey,
1173 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
1174 cluster_ids()
1175 }
1176
1177 #[instrument(level = "trace", skip_all)]
1178 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
1179 cluster_self_id()
1180 }
1181
1182 #[instrument(level = "trace", skip_all, fields(?location_id))]
1183 fn cluster_membership_stream(
1184 _env: &mut Self::InstantiateEnv,
1185 _at_location: &LocationId,
1186 location_id: &LocationId,
1187 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
1188 {
1189 cluster_membership_stream(location_id)
1190 }
1191}
1192
1193const CONTAINER_ALPHABET: [char; 36] = [
1194 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
1195 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
1196];
1197
1198#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location_key, %deployment_instance))]
1199fn get_docker_image_name(
1200 name_hint: &str,
1201 location_key: LocationKey,
1202 deployment_instance: &str,
1203) -> String {
1204 let name_hint = name_hint
1205 .split("::")
1206 .last()
1207 .unwrap()
1208 .to_ascii_lowercase()
1209 .replace(".", "-")
1210 .replace("_", "-")
1211 .replace("::", "-");
1212
1213 let image_unique_tag = nanoid::nanoid!(6, &CONTAINER_ALPHABET);
1214
1215 format!("hy-{name_hint}-{image_unique_tag}-{deployment_instance}-{location_key}")
1216}
1217
1218#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
1219fn get_docker_container_name(image_name: &str, instance: Option<usize>) -> String {
1220 if let Some(instance) = instance {
1221 format!("{image_name}-{instance}")
1222 } else {
1223 image_name.to_owned()
1224 }
1225}
1226#[derive(Clone)]
1228pub struct DockerDeployProcessSpec {
1229 compilation_options: Option<String>,
1230 config: Vec<String>,
1231 network: DockerNetwork,
1232 deployment_instance: String,
1233}
1234
1235impl<'a> ProcessSpec<'a, DockerDeploy> for DockerDeployProcessSpec {
1236 #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1237 fn build(self, key: LocationKey, name_hint: &'_ str) -> <DockerDeploy as Deploy<'a>>::Process {
1238 DockerDeployProcess {
1239 key,
1240 name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1241
1242 next_port: Rc::new(RefCell::new(1000)),
1243 rust_crate: Rc::new(RefCell::new(None)),
1244
1245 exposed_ports: Rc::new(RefCell::new(Vec::new())),
1246
1247 docker_container_name: Rc::new(RefCell::new(None)),
1248
1249 compilation_options: self.compilation_options,
1250 config: self.config,
1251
1252 network: self.network.clone(),
1253 }
1254 }
1255}
1256
1257#[derive(Clone)]
1259pub struct DockerDeployClusterSpec {
1260 compilation_options: Option<String>,
1261 config: Vec<String>,
1262 count: usize,
1263 deployment_instance: String,
1264}
1265
1266impl<'a> ClusterSpec<'a, DockerDeploy> for DockerDeployClusterSpec {
1267 #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1268 fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::Cluster {
1269 DockerDeployCluster {
1270 key,
1271 name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1272
1273 next_port: Rc::new(RefCell::new(1000)),
1274 rust_crate: Rc::new(RefCell::new(None)),
1275
1276 docker_container_name: Rc::new(RefCell::new(Vec::new())),
1277
1278 compilation_options: self.compilation_options,
1279 config: self.config,
1280
1281 count: self.count,
1282 }
1283 }
1284}
1285
1286pub struct DockerDeployExternalSpec {
1288 name: String,
1289}
1290
1291impl<'a> ExternalSpec<'a, DockerDeploy> for DockerDeployExternalSpec {
1292 #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1293 fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::External {
1294 DockerDeployExternal {
1295 name: self.name,
1296 next_port: Rc::new(RefCell::new(10000)),
1297 ports: Rc::new(RefCell::new(HashMap::new())),
1298 connection_info: Rc::new(RefCell::new(HashMap::new())),
1299 }
1300 }
1301}