Skip to main content

hydro_lang/compile/
deploy_provider.rs

1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::compile::builder::ExternalPortId;
12use crate::location::dynamic::LocationId;
13use crate::location::member_id::TaglessMemberId;
14use crate::location::{LocationKey, MembershipEvent, NetworkHint};
15
16pub trait Deploy<'a> {
17    type Meta: Default;
18    type InstantiateEnv;
19
20    type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
21    type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
22    type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
23        + RegisterPort<'a, Self>;
24
25    /// Generates the source and sink expressions when connecting a [`Self::Process`] to another
26    /// [`Self::Process`].
27    ///
28    /// The [`Self::InstantiateEnv`] can be used to record metadata about the created channel. The
29    /// provided `name` is the user-configured channel name from the network IR node.
30    fn o2o_sink_source(
31        env: &mut Self::InstantiateEnv,
32        p1: &Self::Process,
33        p1_port: &<Self::Process as Node>::Port,
34        p2: &Self::Process,
35        p2_port: &<Self::Process as Node>::Port,
36        name: Option<&str>,
37    ) -> (syn::Expr, syn::Expr);
38
39    /// Performs any runtime wiring needed after code generation for a
40    /// [`Self::Process`]-to-[`Self::Process`] channel.
41    ///
42    /// The returned closure is executed once all locations have been instantiated.
43    fn o2o_connect(
44        p1: &Self::Process,
45        p1_port: &<Self::Process as Node>::Port,
46        p2: &Self::Process,
47        p2_port: &<Self::Process as Node>::Port,
48    ) -> Box<dyn FnOnce()>;
49
50    /// Generates the source and sink expressions when connecting a [`Self::Process`] to a
51    /// [`Self::Cluster`] (one-to-many).
52    ///
53    /// The sink expression is used on the sending process and the source expression on each
54    /// receiving cluster member. The [`Self::InstantiateEnv`] can be used to record metadata
55    /// about the created channel. The provided `name` is the user-configured channel name
56    /// from the network IR node.
57    fn o2m_sink_source(
58        env: &mut Self::InstantiateEnv,
59        p1: &Self::Process,
60        p1_port: &<Self::Process as Node>::Port,
61        c2: &Self::Cluster,
62        c2_port: &<Self::Cluster as Node>::Port,
63        name: Option<&str>,
64    ) -> (syn::Expr, syn::Expr);
65
66    /// Performs any runtime wiring needed after code generation for a
67    /// [`Self::Process`]-to-[`Self::Cluster`] channel.
68    ///
69    /// The returned closure is executed once all locations have been instantiated.
70    fn o2m_connect(
71        p1: &Self::Process,
72        p1_port: &<Self::Process as Node>::Port,
73        c2: &Self::Cluster,
74        c2_port: &<Self::Cluster as Node>::Port,
75    ) -> Box<dyn FnOnce()>;
76
77    /// Generates the source and sink expressions when connecting a [`Self::Cluster`] to a
78    /// [`Self::Process`] (many-to-one).
79    ///
80    /// The sink expression is used on each sending cluster member and the source expression
81    /// on the receiving process. The [`Self::InstantiateEnv`] can be used to record metadata
82    /// about the created channel. The provided `name` is the user-configured channel name
83    /// from the network IR node.
84    fn m2o_sink_source(
85        env: &mut Self::InstantiateEnv,
86        c1: &Self::Cluster,
87        c1_port: &<Self::Cluster as Node>::Port,
88        p2: &Self::Process,
89        p2_port: &<Self::Process as Node>::Port,
90        name: Option<&str>,
91    ) -> (syn::Expr, syn::Expr);
92
93    /// Performs any runtime wiring needed after code generation for a
94    /// [`Self::Cluster`]-to-[`Self::Process`] channel.
95    ///
96    /// The returned closure is executed once all locations have been instantiated.
97    fn m2o_connect(
98        c1: &Self::Cluster,
99        c1_port: &<Self::Cluster as Node>::Port,
100        p2: &Self::Process,
101        p2_port: &<Self::Process as Node>::Port,
102    ) -> Box<dyn FnOnce()>;
103
104    /// Generates the source and sink expressions when connecting a [`Self::Cluster`] to another
105    /// [`Self::Cluster`] (many-to-many).
106    ///
107    /// The sink expression is used on each sending cluster member and the source expression
108    /// on each receiving cluster member. The [`Self::InstantiateEnv`] can be used to record
109    /// metadata about the created channel. The provided `name` is the user-configured channel
110    /// name from the network IR node.
111    fn m2m_sink_source(
112        env: &mut Self::InstantiateEnv,
113        c1: &Self::Cluster,
114        c1_port: &<Self::Cluster as Node>::Port,
115        c2: &Self::Cluster,
116        c2_port: &<Self::Cluster as Node>::Port,
117        name: Option<&str>,
118    ) -> (syn::Expr, syn::Expr);
119
120    /// Performs any runtime wiring needed after code generation for a
121    /// [`Self::Cluster`]-to-[`Self::Cluster`] channel.
122    ///
123    /// The returned closure is executed once all locations have been instantiated.
124    fn m2m_connect(
125        c1: &Self::Cluster,
126        c1_port: &<Self::Cluster as Node>::Port,
127        c2: &Self::Cluster,
128        c2_port: &<Self::Cluster as Node>::Port,
129    ) -> Box<dyn FnOnce()>;
130
131    fn e2o_many_source(
132        extra_stmts: &mut Vec<syn::Stmt>,
133        p2: &Self::Process,
134        p2_port: &<Self::Process as Node>::Port,
135        codec_type: &syn::Type,
136        shared_handle: String,
137    ) -> syn::Expr;
138    fn e2o_many_sink(shared_handle: String) -> syn::Expr;
139
140    fn e2o_source(
141        extra_stmts: &mut Vec<syn::Stmt>,
142        p1: &Self::External,
143        p1_port: &<Self::External as Node>::Port,
144        p2: &Self::Process,
145        p2_port: &<Self::Process as Node>::Port,
146        codec_type: &syn::Type,
147        shared_handle: String,
148    ) -> syn::Expr;
149    fn e2o_connect(
150        p1: &Self::External,
151        p1_port: &<Self::External as Node>::Port,
152        p2: &Self::Process,
153        p2_port: &<Self::Process as Node>::Port,
154        many: bool,
155        server_hint: NetworkHint,
156    ) -> Box<dyn FnOnce()>;
157
158    fn o2e_sink(
159        p1: &Self::Process,
160        p1_port: &<Self::Process as Node>::Port,
161        p2: &Self::External,
162        p2_port: &<Self::External as Node>::Port,
163        shared_handle: String,
164    ) -> syn::Expr;
165
166    fn cluster_ids(
167        of_cluster: LocationKey,
168    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a;
169
170    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a;
171
172    fn cluster_membership_stream(
173        env: &mut Self::InstantiateEnv,
174        at_location: &LocationId,
175        location_id: &LocationId,
176    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>;
177
178    /// Registers an embedded input for the given ident and element type.
179    ///
180    /// Only meaningful for the embedded deployment backend. The default
181    /// implementation panics.
182    fn register_embedded_input(
183        _env: &mut Self::InstantiateEnv,
184        _location_key: LocationKey,
185        _ident: &syn::Ident,
186        _element_type: &syn::Type,
187    ) {
188        panic!("register_embedded_input is only supported by EmbeddedDeploy");
189    }
190
191    /// Registers an embedded output for the given ident and element type.
192    ///
193    /// Only meaningful for the embedded deployment backend. The default
194    /// implementation panics.
195    fn register_embedded_output(
196        _env: &mut Self::InstantiateEnv,
197        _location_key: LocationKey,
198        _ident: &syn::Ident,
199        _element_type: &syn::Type,
200    ) {
201        panic!("register_embedded_output is only supported by EmbeddedDeploy");
202    }
203}
204
205pub trait ProcessSpec<'a, D>
206where
207    D: Deploy<'a> + ?Sized,
208{
209    fn build(self, location_key: LocationKey, name_hint: &str) -> D::Process;
210}
211
212pub trait IntoProcessSpec<'a, D>
213where
214    D: Deploy<'a> + ?Sized,
215{
216    type ProcessSpec: ProcessSpec<'a, D>;
217    fn into_process_spec(self) -> Self::ProcessSpec;
218}
219
220impl<'a, D, T> IntoProcessSpec<'a, D> for T
221where
222    D: Deploy<'a> + ?Sized,
223    T: ProcessSpec<'a, D>,
224{
225    type ProcessSpec = T;
226    fn into_process_spec(self) -> Self::ProcessSpec {
227        self
228    }
229}
230
231pub trait ClusterSpec<'a, D>
232where
233    D: Deploy<'a> + ?Sized,
234{
235    fn build(self, location_key: LocationKey, name_hint: &str) -> D::Cluster;
236}
237
238pub trait ExternalSpec<'a, D>
239where
240    D: Deploy<'a> + ?Sized,
241{
242    fn build(self, location_key: LocationKey, name_hint: &str) -> D::External;
243}
244
245pub trait Node {
246    /// A logical communication endpoint for this node.
247    ///
248    /// Implementors are free to choose the concrete representation (for example,
249    /// a handle or identifier), but it must be `Clone` so that a single logical
250    /// port can be duplicated and passed to multiple consumers. New ports are
251    /// allocated via [`Self::next_port`].
252    type Port: Clone;
253    type Meta: Default;
254    type InstantiateEnv;
255
256    /// Allocates and returns a new port.
257    fn next_port(&self) -> Self::Port;
258
259    fn update_meta(&self, meta: &Self::Meta);
260
261    fn instantiate(
262        &self,
263        env: &mut Self::InstantiateEnv,
264        meta: &mut Self::Meta,
265        graph: DfirGraph,
266        extra_stmts: &[syn::Stmt],
267        sidecars: &[syn::Expr],
268    );
269}
270
271pub type DynSourceSink<Out, In, InErr> = (
272    Pin<Box<dyn Stream<Item = Out>>>,
273    Pin<Box<dyn Sink<In, Error = InErr>>>,
274);
275
276pub trait RegisterPort<'a, D>: Node + Clone
277where
278    D: Deploy<'a> + ?Sized,
279{
280    fn register(&self, external_port_id: ExternalPortId, port: Self::Port);
281
282    fn as_bytes_bidi(
283        &self,
284        external_port_id: ExternalPortId,
285    ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
286
287    fn as_bincode_bidi<InT, OutT>(
288        &self,
289        external_port_id: ExternalPortId,
290    ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
291    where
292        InT: Serialize + 'static,
293        OutT: DeserializeOwned + 'static;
294
295    fn as_bincode_sink<T>(
296        &self,
297        external_port_id: ExternalPortId,
298    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
299    where
300        T: Serialize + 'static;
301
302    fn as_bincode_source<T>(
303        &self,
304        external_port_id: ExternalPortId,
305    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
306    where
307        T: DeserializeOwned + 'static;
308}