Skip to main content

Embedded Mode

Hydro's standard deployment model compiles each location into a standalone binary and manages networking automatically. But sometimes you need more control — maybe you're integrating Hydro into an existing Rust application, or you need to wire up custom I/O like DPDK, a game engine's event loop, or a hardware interface. Embedded mode is designed for these cases.

Instead of producing self-contained binaries, embedded mode generates a Rust source file containing one plain function per location. Each function returns a Dfir dataflow graph that you drive manually. You decide when to tick it, how to feed it data, and where the outputs go.

When to Use Embedded Mode

  • Incremental adoption: You have an existing Rust codebase and want to introduce Hydro for a specific computation without restructuring your application around Hydro Deploy.
  • Custom I/O: You need to connect Hydro to a transport or runtime that Hydro Deploy doesn't support (DPDK, shared memory, a game loop, etc.).
  • Library use: You want to ship a crate that uses Hydro internally but exposes a normal Rust API to consumers.

When your Hydro program sends data between processes, the generated functions gain additional network_out and network_in parameters. You wire these up yourself — for example with in-memory channels, Unix sockets, or any transport you like. See Networking below for details.

How It Works

Embedded mode uses a build.rs script to compile your Hydro program at build time. The generated code is then include!-ed into your crate. The workflow has three parts:

  1. Define your Hydro logic using embedded_input and embedded_output to mark where data enters and leaves the dataflow.
  2. Generate code in build.rs using generate_embedded.
  3. Call the generated function from your application, passing in streams and output callbacks.

Let's walk through a complete example that capitalizes strings using Hydro in embedded mode.

1. Define the Hydro Logic

Write your Hydro function as usual, but use embedded_input to create an input stream and embedded_output to mark where results should be emitted:

my_hydro_crate/src/lib.rs
use hydro_lang::prelude::*;

pub fn capitalize<'a>(input: Stream<String, Process<'a, ()>>) {
input
.map(q!(|s| s.to_uppercase()))
.embedded_output("output");
}

embedded_input creates a stream parameter on the generated function — the name you pass ("input") becomes the parameter name. Similarly, embedded_output creates a field on a generated EmbeddedOutputs struct — the name ("output") becomes the field name. The output field accepts an impl FnMut(T) closure that will be called for each emitted element.

Your base crate also needs to declare an empty stageleft_macro_entrypoint feature. This is required by the Stageleft code generation machinery to set up the correct re-exports:

my_hydro_crate/Cargo.toml
[features]
stageleft_macro_entrypoint = []

2. Generate Code in build.rs

Create a wrapper crate that depends on your Hydro crate.

Your wrapper crate needs the following dependencies:

my_wrapper_crate/Cargo.toml
[dependencies]
hydro_lang = { version = "...", features = ["runtime_support"] }
my_hydro_crate = { path = "../my_hydro_crate", features = ["stageleft_macro_entrypoint"] }

[build-dependencies]
hydro_lang = { version = "...", features = ["build"] }
my_hydro_crate = { path = "../my_hydro_crate" }
prettyplease = { version = "0.2.0", features = ["verbatim"] }

[dev-dependencies]
dfir_rs = { version = "..." }
tokio = { version = "1", features = ["full"] }

The runtime_support feature is needed at runtime to provide the DFIR runtime. The build feature is needed in build-dependencies for the code generation APIs.

In its build.rs, construct a FlowBuilder, wire up the logic, and call generate_embedded:

my_wrapper_crate/build.rs
use hydro_lang::location::Location;

fn main() {
println!("cargo::rerun-if-changed=build.rs");

let mut flow = hydro_lang::compile::builder::FlowBuilder::new();
let process = flow.process::<()>();

// Wire up the Hydro logic with an embedded input.
my_hydro_crate::capitalize(process.embedded_input("input"));

// Compile and generate the embedded code.
let code = flow
.with_process(&process, "capitalize")
.generate_embedded("my_hydro_crate");

let out_dir = std::env::var("OUT_DIR").unwrap();
std::fs::write(
format!("{out_dir}/embedded.rs"),
prettyplease::unparse(&code),
)
.unwrap();
}

The string "capitalize" passed to with_process becomes the name of the generated function. The argument to generate_embedded is the name of the crate containing your Hydro logic (hyphens are automatically replaced with underscores).

3. Include and Use the Generated Code

In your wrapper crate's lib.rs, include the generated file:

my_wrapper_crate/src/lib.rs
#[allow(unused_imports, missing_docs)]
pub mod embedded {
include!(concat!(env!("OUT_DIR"), "/embedded.rs"));
}

Now you can call the generated function from anywhere in your application:

use dfir_rs::futures;

async fn run() {
// Create an input stream from any source you like.
let input = futures::stream::iter(vec![
"hello".to_owned(),
"world".to_owned(),
]);

// Collect outputs via a closure.
let mut results = vec![];
let mut outputs = embedded::capitalize::EmbeddedOutputs {
output: |s: String| {
results.push(s);
},
};

// Build and run the dataflow.
let mut flow = embedded::capitalize(input, &mut outputs);
tokio::task::LocalSet::new()
.run_until(flow.run_available())
.await;
drop(flow);

assert_eq!(results, vec!["HELLO", "WORLD"]);
}

The generated function accepts your input streams as impl Stream<Item = T> + Unpin parameters and a mutable reference to the EmbeddedOutputs struct. It returns a Dfir graph that you run with run_available() (or tick manually).

Clusters

When a location is a cluster (registered with with_cluster in build.rs), the generated function receives additional parameters for the cluster's identity and membership information.

Self ID

A function generated for a cluster location receives a __cluster_self_id: &TaglessMemberId parameter. This tells the running instance which member of the cluster it is. You provide this when you call the generated function:

let member_id = TaglessMemberId::from_raw_id(0);
let mut flow = my_cluster_fn(&member_id, input, &mut outputs);

Membership Streams

When a location (process or cluster) needs to know the members of another cluster — for example, to broadcast to it — the generated function receives an EmbeddedMembershipStreams struct parameter. This struct has one field per cluster whose membership is needed, named after that cluster's function name. Each field is a Stream<Item = (TaglessMemberId, MembershipEvent)>:

pub mod my_sender {
pub struct EmbeddedMembershipStreams<S: Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin> {
pub my_receiver: S,
}
}

You provide the membership stream when calling the function. For example, a process that sends to a cluster:

use hydro_lang::location::MembershipEvent;
use hydro_lang::location::member_id::TaglessMemberId;

let member_id = TaglessMemberId::from_raw_id(0);
let membership = my_sender::EmbeddedMembershipStreams {
my_receiver: futures::stream::iter(vec![(member_id.clone(), MembershipEvent::Joined)]),
};
let mut flow = my_sender(membership, input, &mut net_out);

A cluster that sends to another cluster receives both __cluster_self_id and the membership struct:

let mut flow = m2m_sender(&src_id, membership, input, &mut net_out);

In build.rs, use with_cluster instead of with_process to register cluster locations:

let cluster = flow.cluster::<MyClusterTag>();
// ...
let code = flow
.with_cluster(&cluster, "my_cluster_fn")
.generate_embedded("my_crate");

Networking

When your Hydro program uses .send() between locations, the generated functions get extra parameters so you can wire up the transport yourself. Hydro automatically handles serialization — just shuttle the raw bytes between sender and receiver. You must ensure that you are preserving any guarantees of the network protocol specified in the Hydro program. For example, if the channel uses TCP, you must ensure that your networking mechanism preserves ordering and exactly-once delivery.

Network channels in embedded mode must be named. Use .name() on the networking config:

input.send(receiver, TCP.fail_stop().bincode().name("messages"))

The name becomes a field name in the generated structs, so it must be a valid Rust identifier.

Process-to-Process (o2o)

Consider a two-process program where Sender sends to Receiver over a channel named "messages". Sender gets an EmbeddedNetworkOut struct parameter with an FnMut(Bytes) field per outgoing channel:

pub mod echo_sender {
pub struct EmbeddedNetworkOut<F: FnMut(Bytes)> {
pub messages: F,
}
}

pub fn echo_sender<'a, F: FnMut(Bytes) + 'a>(
input: impl Stream<Item = String> + Unpin + 'a,
__network_out: &'a mut echo_sender::EmbeddedNetworkOut<F>,
) -> Dfir<'a> { ... }

On the other side, Receiver gets an EmbeddedNetworkIn struct parameter with a Stream<Item = Result<BytesMut, io::Error>> field per incoming channel:

pub mod echo_receiver {
pub struct EmbeddedNetworkIn<S: Stream<Item = Result<BytesMut, io::Error>> + Unpin> {
pub messages: S,
}
}

pub fn echo_receiver<'a, S: Stream<Item = Result<BytesMut, io::Error>> + Unpin + 'a>(
__outputs: &'a mut echo_receiver::EmbeddedOutputs<...>,
__network_in: echo_receiver::EmbeddedNetworkIn<S>,
) -> Dfir<'a> { ... }

Cluster Networking

When a cluster is involved in networking, the generated structs become tagged with TaglessMemberId so you can route messages to and from individual cluster members.

Process → Cluster (o2m)

When a process sends to a cluster, the sender's EmbeddedNetworkOut fields become FnMut((TaglessMemberId, Bytes)) — each outgoing message is paired with the target member ID:

pub mod o2m_sender {
pub struct EmbeddedNetworkOut<F: FnMut((TaglessMemberId, Bytes))> {
pub data: F,
}
}

The receiving cluster's EmbeddedNetworkIn fields use untagged Stream<Item = Result<BytesMut, io::Error>> since each cluster member only receives its own messages:

pub mod o2m_receiver {
pub struct EmbeddedNetworkIn<S: Stream<Item = Result<BytesMut, io::Error>> + Unpin> {
pub data: S,
}
}

Your transport layer is responsible for routing: when the sender calls the FnMut with (member_id, bytes), you deliver bytes to the instance running with that __cluster_self_id.

Cluster → Process (m2o)

When a cluster sends to a process, the sender's EmbeddedNetworkOut fields are untagged FnMut(Bytes) since each cluster member just sends its own data:

pub mod m2o_sender {
pub struct EmbeddedNetworkOut<F: FnMut(Bytes)> {
pub data: F,
}
}

The receiving process's EmbeddedNetworkIn fields become Stream<Item = Result<(TaglessMemberId, BytesMut), io::Error>> — each incoming message is tagged with the sender's member ID:

pub mod m2o_receiver {
pub struct EmbeddedNetworkIn<S: Stream<Item = Result<(TaglessMemberId, BytesMut), io::Error>> + Unpin> {
pub data: S,
}
}

Your transport layer must tag each incoming message with the TaglessMemberId of the cluster member that sent it.

Cluster → Cluster (m2m)

When a cluster sends to another cluster, both sides are tagged. The sender's EmbeddedNetworkOut fields are FnMut((TaglessMemberId, Bytes)) (tagged with the target member ID):

pub mod m2m_sender {
pub struct EmbeddedNetworkOut<F: FnMut((TaglessMemberId, Bytes))> {
pub data: F,
}
}

The receiver's EmbeddedNetworkIn fields are Stream<Item = Result<(TaglessMemberId, BytesMut), io::Error>> (tagged with the source member ID):

pub mod m2m_receiver {
pub struct EmbeddedNetworkIn<S: Stream<Item = Result<(TaglessMemberId, BytesMut), io::Error>> + Unpin> {
pub data: S,
}
}

Your transport layer must handle both directions: route outgoing (target_id, bytes) to the correct destination member, and tag incoming messages with the source_id of the sender.