Skip to main content

hydro_lang/deploy/maelstrom/
deploy_maelstrom.rs

1//! Deployment backend for Hydro that targets Maelstrom for distributed systems testing.
2//!
3//! Maelstrom is a workbench for learning distributed systems by writing your own.
4//! This backend compiles Hydro programs to binaries that communicate via Maelstrom's
5//! stdin/stdout JSON protocol.
6
7use std::cell::RefCell;
8use std::future::Future;
9use std::io::{BufRead, BufReader, Error};
10use std::path::PathBuf;
11use std::pin::Pin;
12use std::process::Stdio;
13use std::rc::Rc;
14
15use bytes::{Bytes, BytesMut};
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, Stream};
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use stageleft::{QuotedWithContext, RuntimeData};
21
22use super::deploy_runtime_maelstrom::*;
23use crate::compile::builder::ExternalPortId;
24use crate::compile::deploy_provider::{ClusterSpec, Deploy, Node, RegisterPort};
25use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
26use crate::location::dynamic::LocationId;
27use crate::location::member_id::TaglessMemberId;
28use crate::location::{LocationKey, MembershipEvent, NetworkHint};
29
30/// Deployment backend that targets Maelstrom for distributed systems testing.
31///
32/// This backend compiles Hydro programs to binaries that communicate via Maelstrom's
33/// stdin/stdout JSON protocol. It is restricted to programs with:
34/// - Exactly one cluster (no processes)
35/// - A single external input channel for client communication
36pub enum MaelstromDeploy {}
37
38impl<'a> Deploy<'a> for MaelstromDeploy {
39    type Meta = ();
40    type InstantiateEnv = MaelstromDeployment;
41
42    type Process = MaelstromProcess;
43    type Cluster = MaelstromCluster;
44    type External = MaelstromExternal;
45
46    fn o2o_sink_source(
47        _env: &mut Self::InstantiateEnv,
48        _p1: &Self::Process,
49        _p1_port: &<Self::Process as Node>::Port,
50        _p2: &Self::Process,
51        _p2_port: &<Self::Process as Node>::Port,
52        _name: Option<&str>,
53    ) -> (syn::Expr, syn::Expr) {
54        panic!("Maelstrom deployment does not support processes, only clusters")
55    }
56
57    fn o2o_connect(
58        _p1: &Self::Process,
59        _p1_port: &<Self::Process as Node>::Port,
60        _p2: &Self::Process,
61        _p2_port: &<Self::Process as Node>::Port,
62    ) -> Box<dyn FnOnce()> {
63        panic!("Maelstrom deployment does not support processes, only clusters")
64    }
65
66    fn o2m_sink_source(
67        _env: &mut Self::InstantiateEnv,
68        _p1: &Self::Process,
69        _p1_port: &<Self::Process as Node>::Port,
70        _c2: &Self::Cluster,
71        _c2_port: &<Self::Cluster as Node>::Port,
72        _name: Option<&str>,
73    ) -> (syn::Expr, syn::Expr) {
74        panic!("Maelstrom deployment does not support processes, only clusters")
75    }
76
77    fn o2m_connect(
78        _p1: &Self::Process,
79        _p1_port: &<Self::Process as Node>::Port,
80        _c2: &Self::Cluster,
81        _c2_port: &<Self::Cluster as Node>::Port,
82    ) -> Box<dyn FnOnce()> {
83        panic!("Maelstrom deployment does not support processes, only clusters")
84    }
85
86    fn m2o_sink_source(
87        _env: &mut Self::InstantiateEnv,
88        _c1: &Self::Cluster,
89        _c1_port: &<Self::Cluster as Node>::Port,
90        _p2: &Self::Process,
91        _p2_port: &<Self::Process as Node>::Port,
92        _name: Option<&str>,
93    ) -> (syn::Expr, syn::Expr) {
94        panic!("Maelstrom deployment does not support processes, only clusters")
95    }
96
97    fn m2o_connect(
98        _c1: &Self::Cluster,
99        _c1_port: &<Self::Cluster as Node>::Port,
100        _p2: &Self::Process,
101        _p2_port: &<Self::Process as Node>::Port,
102    ) -> Box<dyn FnOnce()> {
103        panic!("Maelstrom deployment does not support processes, only clusters")
104    }
105
106    fn m2m_sink_source(
107        _env: &mut Self::InstantiateEnv,
108        _c1: &Self::Cluster,
109        _c1_port: &<Self::Cluster as Node>::Port,
110        _c2: &Self::Cluster,
111        _c2_port: &<Self::Cluster as Node>::Port,
112        _name: Option<&str>,
113    ) -> (syn::Expr, syn::Expr) {
114        deploy_maelstrom_m2m(RuntimeData::new("__hydro_lang_maelstrom_meta"))
115    }
116
117    fn m2m_connect(
118        _c1: &Self::Cluster,
119        _c1_port: &<Self::Cluster as Node>::Port,
120        _c2: &Self::Cluster,
121        _c2_port: &<Self::Cluster as Node>::Port,
122    ) -> Box<dyn FnOnce()> {
123        // No runtime connection needed for Maelstrom - all routing is via stdin/stdout
124        Box::new(|| {})
125    }
126
127    fn e2o_many_source(
128        _extra_stmts: &mut Vec<syn::Stmt>,
129        _p2: &Self::Process,
130        _p2_port: &<Self::Process as Node>::Port,
131        _codec_type: &syn::Type,
132        _shared_handle: String,
133    ) -> syn::Expr {
134        panic!("Maelstrom deployment does not support processes, only clusters")
135    }
136
137    fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
138        panic!("Maelstrom deployment does not support processes, only clusters")
139    }
140
141    fn e2o_source(
142        _extra_stmts: &mut Vec<syn::Stmt>,
143        _p1: &Self::External,
144        _p1_port: &<Self::External as Node>::Port,
145        _p2: &Self::Process,
146        _p2_port: &<Self::Process as Node>::Port,
147        _codec_type: &syn::Type,
148        _shared_handle: String,
149    ) -> syn::Expr {
150        panic!("Maelstrom deployment does not support processes, only clusters")
151    }
152
153    fn e2o_connect(
154        _p1: &Self::External,
155        _p1_port: &<Self::External as Node>::Port,
156        _p2: &Self::Process,
157        _p2_port: &<Self::Process as Node>::Port,
158        _many: bool,
159        _server_hint: NetworkHint,
160    ) -> Box<dyn FnOnce()> {
161        panic!("Maelstrom deployment does not support processes, only clusters")
162    }
163
164    fn o2e_sink(
165        _p1: &Self::Process,
166        _p1_port: &<Self::Process as Node>::Port,
167        _p2: &Self::External,
168        _p2_port: &<Self::External as Node>::Port,
169        _shared_handle: String,
170    ) -> syn::Expr {
171        panic!("Maelstrom deployment does not support processes, only clusters")
172    }
173
174    fn cluster_ids(
175        _of_cluster: LocationKey,
176    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
177        cluster_members(RuntimeData::new("__hydro_lang_maelstrom_meta"), _of_cluster)
178    }
179
180    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
181        cluster_self_id(RuntimeData::new("__hydro_lang_maelstrom_meta"))
182    }
183
184    fn cluster_membership_stream(
185        _env: &mut Self::InstantiateEnv,
186        _at_location: &LocationId,
187        location_id: &LocationId,
188    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
189    {
190        cluster_membership_stream(location_id)
191    }
192}
193
194/// A dummy process type for Maelstrom (processes are not supported).
195#[derive(Clone)]
196pub struct MaelstromProcess {
197    _private: (),
198}
199
200impl Node for MaelstromProcess {
201    type Port = String;
202    type Meta = ();
203    type InstantiateEnv = MaelstromDeployment;
204
205    fn next_port(&self) -> Self::Port {
206        panic!("Maelstrom deployment does not support processes")
207    }
208
209    fn update_meta(&self, _meta: &Self::Meta) {}
210
211    fn instantiate(
212        &self,
213        _env: &mut Self::InstantiateEnv,
214        _meta: &mut Self::Meta,
215        _graph: DfirGraph,
216        _extra_stmts: &[syn::Stmt],
217        _sidecars: &[syn::Expr],
218    ) {
219        panic!("Maelstrom deployment does not support processes")
220    }
221}
222
223/// Represents a cluster in Maelstrom deployment.
224#[derive(Clone)]
225pub struct MaelstromCluster {
226    next_port: Rc<RefCell<usize>>,
227    name_hint: Option<String>,
228}
229
230impl Node for MaelstromCluster {
231    type Port = String;
232    type Meta = ();
233    type InstantiateEnv = MaelstromDeployment;
234
235    fn next_port(&self) -> Self::Port {
236        let next_port = *self.next_port.borrow();
237        *self.next_port.borrow_mut() += 1;
238        format!("port_{}", next_port)
239    }
240
241    fn update_meta(&self, _meta: &Self::Meta) {}
242
243    fn instantiate(
244        &self,
245        env: &mut Self::InstantiateEnv,
246        _meta: &mut Self::Meta,
247        graph: DfirGraph,
248        extra_stmts: &[syn::Stmt],
249        sidecars: &[syn::Expr],
250    ) {
251        let (bin_name, config) = create_graph_trybuild(
252            graph,
253            extra_stmts,
254            sidecars,
255            self.name_hint.as_deref(),
256            crate::compile::trybuild::generate::DeployMode::Maelstrom,
257            LinkingMode::Static,
258        );
259
260        env.bin_name = Some(bin_name);
261        env.project_dir = Some(config.project_dir);
262        env.target_dir = Some(config.target_dir);
263        env.features = config.features;
264    }
265}
266
267/// Represents an external client in Maelstrom deployment.
268#[derive(Clone)]
269pub enum MaelstromExternal {}
270
271impl Node for MaelstromExternal {
272    type Port = String;
273    type Meta = ();
274    type InstantiateEnv = MaelstromDeployment;
275
276    fn next_port(&self) -> Self::Port {
277        unreachable!()
278    }
279
280    fn update_meta(&self, _meta: &Self::Meta) {}
281
282    fn instantiate(
283        &self,
284        _env: &mut Self::InstantiateEnv,
285        _meta: &mut Self::Meta,
286        _graph: DfirGraph,
287        _extra_stmts: &[syn::Stmt],
288        _sidecars: &[syn::Expr],
289    ) {
290        unreachable!()
291    }
292}
293
294impl<'a> RegisterPort<'a, MaelstromDeploy> for MaelstromExternal {
295    fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
296        unreachable!()
297    }
298
299    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
300    fn as_bytes_bidi(
301        &self,
302        _external_port_id: ExternalPortId,
303    ) -> impl Future<
304        Output = (
305            Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
306            Pin<Box<dyn Sink<Bytes, Error = Error>>>,
307        ),
308    > + 'a {
309        async move { unreachable!() }
310    }
311
312    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
313    fn as_bincode_bidi<InT, OutT>(
314        &self,
315        _external_port_id: ExternalPortId,
316    ) -> impl Future<
317        Output = (
318            Pin<Box<dyn Stream<Item = OutT>>>,
319            Pin<Box<dyn Sink<InT, Error = Error>>>,
320        ),
321    > + 'a
322    where
323        InT: Serialize + 'static,
324        OutT: DeserializeOwned + 'static,
325    {
326        async move { unreachable!() }
327    }
328
329    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
330    fn as_bincode_sink<T: Serialize + 'static>(
331        &self,
332        _external_port_id: ExternalPortId,
333    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
334        async move { unreachable!() }
335    }
336
337    #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
338    fn as_bincode_source<T: DeserializeOwned + 'static>(
339        &self,
340        _external_port_id: ExternalPortId,
341    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
342        async move { unreachable!() }
343    }
344}
345
346/// Specification for building a Maelstrom cluster.
347#[derive(Clone)]
348pub struct MaelstromClusterSpec;
349
350impl<'a> ClusterSpec<'a, MaelstromDeploy> for MaelstromClusterSpec {
351    fn build(self, key: LocationKey, name_hint: &str) -> MaelstromCluster {
352        assert_eq!(
353            key,
354            LocationKey::FIRST,
355            "there should only be one location for a Maelstrom deployment"
356        );
357        MaelstromCluster {
358            next_port: Rc::new(RefCell::new(0)),
359            name_hint: Some(name_hint.to_owned()),
360        }
361    }
362}
363
364/// The Maelstrom deployment environment.
365///
366/// This holds configuration for the Maelstrom run and accumulates
367/// compilation artifacts during deployment.
368pub struct MaelstromDeployment {
369    /// Number of nodes in the cluster.
370    pub node_count: usize,
371    /// Path to the maelstrom binary.
372    pub maelstrom_path: PathBuf,
373    /// Workload to run (e.g., "echo", "broadcast", "g-counter").
374    pub workload: String,
375    /// Time limit in seconds.
376    pub time_limit: Option<u64>,
377    /// Rate of requests per second.
378    pub rate: Option<u64>,
379    /// The availability of nodes.
380    pub availability: Option<String>,
381    /// Nemesis to run during tests.
382    pub nemesis: Option<String>,
383    /// Additional maelstrom arguments.
384    pub extra_args: Vec<String>,
385
386    // Populated during deployment
387    pub(crate) bin_name: Option<String>,
388    pub(crate) project_dir: Option<PathBuf>,
389    pub(crate) target_dir: Option<PathBuf>,
390    pub(crate) features: Option<Vec<String>>,
391}
392
393impl MaelstromDeployment {
394    /// Create a new Maelstrom deployment with the given node count.
395    pub fn new(workload: impl Into<String>) -> Self {
396        Self {
397            node_count: 1,
398            maelstrom_path: PathBuf::from("maelstrom"),
399            workload: workload.into(),
400            time_limit: None,
401            rate: None,
402            availability: None,
403            nemesis: None,
404            extra_args: vec![],
405            bin_name: None,
406            project_dir: None,
407            target_dir: None,
408            features: None,
409        }
410    }
411
412    /// Set the node count.
413    pub fn node_count(mut self, count: usize) -> Self {
414        self.node_count = count;
415        self
416    }
417
418    /// Set the path to the maelstrom binary.
419    pub fn maelstrom_path(mut self, path: impl Into<PathBuf>) -> Self {
420        self.maelstrom_path = path.into();
421        self
422    }
423
424    /// Set the time limit in seconds.
425    pub fn time_limit(mut self, seconds: u64) -> Self {
426        self.time_limit = Some(seconds);
427        self
428    }
429
430    /// Set the request rate per second.
431    pub fn rate(mut self, rate: u64) -> Self {
432        self.rate = Some(rate);
433        self
434    }
435
436    /// Set the availability for the test.
437    pub fn availability(mut self, availability: impl Into<String>) -> Self {
438        self.availability = Some(availability.into());
439        self
440    }
441
442    /// Set the nemesis for the test.
443    pub fn nemesis(mut self, nemesis: impl Into<String>) -> Self {
444        self.nemesis = Some(nemesis.into());
445        self
446    }
447
448    /// Add extra arguments to pass to maelstrom.
449    pub fn extra_args(mut self, args: impl IntoIterator<Item = impl Into<String>>) -> Self {
450        self.extra_args.extend(args.into_iter().map(Into::into));
451        self
452    }
453
454    /// Build the compiled binary in dev mode.
455    /// Returns the path to the compiled binary.
456    pub fn build(&self) -> Result<PathBuf, Error> {
457        let bin_name = self
458            .bin_name
459            .as_ref()
460            .expect("No binary name set - did you call deploy?");
461        let project_dir = self.project_dir.as_ref().expect("No project dir set");
462        let target_dir = self.target_dir.as_ref().expect("No target dir set");
463
464        let mut cmd = std::process::Command::new("cargo");
465        cmd.arg("build")
466            .arg("--example")
467            .arg(bin_name)
468            .arg("--no-default-features")
469            .current_dir(project_dir)
470            .env("CARGO_TARGET_DIR", target_dir)
471            .env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
472
473        // Always include maelstrom_runtime feature for runtime support
474        let mut all_features = vec!["hydro___feature_maelstrom_runtime".to_owned()];
475        if let Some(features) = &self.features {
476            all_features.extend(features.iter().cloned());
477        }
478        if !all_features.is_empty() {
479            cmd.arg("--features").arg(all_features.join(","));
480        }
481
482        let status = cmd.status()?;
483        if !status.success() {
484            return Err(Error::other(format!(
485                "cargo build failed with status: {}",
486                status
487            )));
488        }
489
490        Ok(target_dir.join("debug").join("examples").join(bin_name))
491    }
492
493    /// Run Maelstrom with the compiled binary, return Ok(()) if all checks pass.
494    ///
495    /// This will block until Maelstrom completes.
496    pub fn run(self) -> Result<(), Error> {
497        let binary_path = self.build()?;
498
499        let mut cmd = std::process::Command::new(&self.maelstrom_path);
500        cmd.arg("test")
501            .arg("-w")
502            .arg(&self.workload)
503            .arg("--bin")
504            .arg(&binary_path)
505            .arg("--node-count")
506            .arg(self.node_count.to_string())
507            .stdout(Stdio::piped());
508
509        if let Some(time_limit) = self.time_limit {
510            cmd.arg("--time-limit").arg(time_limit.to_string());
511        }
512
513        if let Some(rate) = self.rate {
514            cmd.arg("--rate").arg(rate.to_string());
515        }
516
517        if let Some(availability) = self.availability {
518            cmd.arg("--availability").arg(availability);
519        }
520
521        if let Some(nemesis) = self.nemesis {
522            cmd.arg("--nemesis").arg(nemesis);
523        }
524
525        for arg in &self.extra_args {
526            cmd.arg(arg);
527        }
528
529        let spawned = cmd.spawn()?;
530
531        for line in BufReader::new(spawned.stdout.unwrap()).lines() {
532            let line = line?;
533            eprintln!("{}", &line);
534
535            if line.starts_with("Analysis invalid!") {
536                return Err(Error::other("Analysis was invalid"));
537            } else if line.starts_with("Errors occurred during analysis, but no anomalies found.")
538                || line.starts_with("Everything looks good!")
539            {
540                return Ok(());
541            }
542        }
543
544        Err(Error::other("Maelstrom produced an unexpected result"))
545    }
546
547    /// Get the path to the compiled binary (after building).
548    pub fn binary_path(&self) -> Option<PathBuf> {
549        let bin_name = self.bin_name.as_ref()?;
550        let target_dir = self.target_dir.as_ref()?;
551        Some(target_dir.join("debug").join("examples").join(bin_name))
552    }
553}