Skip to main content

hydro_lang/deploy/maelstrom/
mod.rs

1//! Deployment backend for running correctness tests against Jepsen Maelstrom (<https://github.com/jepsen-io/maelstrom>)
2
3use serde::Serialize;
4use serde::de::DeserializeOwned;
5
6use crate::forward_handle::ForwardHandle;
7use crate::live_collections::KeyedStream;
8use crate::live_collections::stream::TotalOrder;
9use crate::location::{Cluster, NoTick};
10use crate::nondet::nondet;
11
12#[cfg(stageleft_runtime)]
13#[cfg(feature = "maelstrom")]
14#[cfg_attr(docsrs, doc(cfg(feature = "maelstrom")))]
15pub mod deploy_maelstrom;
16
17pub mod deploy_runtime_maelstrom;
18
19/// Sets up bidirectional communication with Maelstrom clients on a cluster.
20///
21/// This function provides a similar API to `bidi_external_many_bytes` but for Maelstrom
22/// client communication. It returns a keyed input stream of client messages and accepts
23/// a keyed output stream of responses.
24///
25/// The key type is `String` (the client ID like "c1", "c2").
26/// The value type is `serde_json::Value` (the message body).
27///
28/// # Example
29/// ```ignore
30/// let (input, output_handle) = maelstrom_bidi_clients(&cluster);
31/// output_handle.complete(input.map(q!(|(client_id, body)| {
32///     // Process and return response
33///     (client_id, response_body)
34/// })));
35/// ```
36#[expect(clippy::type_complexity, reason = "stream markers")]
37pub fn maelstrom_bidi_clients<'a, C, In: DeserializeOwned, Out: Serialize>(
38    cluster: &Cluster<'a, C>,
39) -> (
40    KeyedStream<String, In, Cluster<'a, C>>,
41    ForwardHandle<'a, KeyedStream<String, Out, Cluster<'a, C>>>,
42)
43where
44    Cluster<'a, C>: NoTick,
45{
46    use stageleft::q;
47
48    use crate::location::Location;
49
50    let meta: stageleft::RuntimeData<&deploy_runtime_maelstrom::MaelstromMeta> =
51        stageleft::RuntimeData::new("__hydro_lang_maelstrom_meta");
52
53    // Create the input stream from Maelstrom clients
54    let input: KeyedStream<String, In, Cluster<'a, C>> = cluster
55        .source_stream(q!(deploy_runtime_maelstrom::maelstrom_client_source(meta)))
56        .into_keyed()
57        .map(q!(|b| serde_json::from_value(b).unwrap()));
58
59    // Create a forward reference for the output stream
60    let (fwd_handle, output_stream) =
61        cluster.forward_ref::<KeyedStream<String, Out, Cluster<'a, C>>>();
62
63    // Set up the output sink to send responses back to clients
64    output_stream
65        .entries()
66        .assume_ordering::<TotalOrder>(nondet!(/** maelstrom responses can be sent in any order */))
67        .for_each(q!(|(client_id, body)| {
68            deploy_runtime_maelstrom::maelstrom_send_response(
69                &meta.node_id,
70                &client_id,
71                serde_json::to_value(body).unwrap(),
72            );
73        }));
74
75    (input, fwd_handle)
76}