Skip to main content

hydro_lang/location/
mod.rs

1//! Type definitions for distributed locations, which specify where pieces of a Hydro
2//! program will be executed.
3//!
4//! Hydro is a **global**, **distributed** programming model. This means that the data
5//! and computation in a Hydro program can be spread across multiple machines, data
6//! centers, and even continents. To achieve this, Hydro uses the concept of
7//! **locations** to keep track of _where_ data is located and computation is executed.
8//!
9//! Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
10//! which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
11//! and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
12//! to allow live collections to be _moved_ between locations via network send/receive.
13//!
14//! See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
15
16use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::num::ParseIntError;
19use std::time::Duration;
20
21use bytes::{Bytes, BytesMut};
22use futures::stream::Stream as FuturesStream;
23use proc_macro2::Span;
24use quote::quote;
25use serde::de::DeserializeOwned;
26use serde::{Deserialize, Serialize};
27use slotmap::{Key, new_key_type};
28use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
29use stageleft::{QuotedWithContext, q, quote_type};
30use syn::parse_quote;
31use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
32
33use crate::compile::ir::{
34    ClusterMembersState, DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
35};
36use crate::forward_handle::ForwardRef;
37#[cfg(stageleft_runtime)]
38use crate::forward_handle::{CycleCollection, ForwardHandle};
39use crate::live_collections::boundedness::{Bounded, Unbounded};
40use crate::live_collections::keyed_stream::KeyedStream;
41use crate::live_collections::singleton::Singleton;
42use crate::live_collections::stream::{
43    ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
44};
45use crate::location::dynamic::LocationId;
46use crate::location::external_process::{
47    ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
48};
49use crate::nondet::NonDet;
50#[cfg(feature = "sim")]
51use crate::sim::SimSender;
52use crate::staging_util::get_this_crate;
53
54pub mod dynamic;
55
56#[expect(missing_docs, reason = "TODO")]
57pub mod external_process;
58pub use external_process::External;
59
60#[expect(missing_docs, reason = "TODO")]
61pub mod process;
62pub use process::Process;
63
64#[expect(missing_docs, reason = "TODO")]
65pub mod cluster;
66pub use cluster::Cluster;
67
68#[expect(missing_docs, reason = "TODO")]
69pub mod member_id;
70pub use member_id::{MemberId, TaglessMemberId};
71
72#[expect(missing_docs, reason = "TODO")]
73pub mod tick;
74pub use tick::{Atomic, NoTick, Tick};
75
76#[expect(missing_docs, reason = "TODO")]
77#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
78pub enum MembershipEvent {
79    Joined,
80    Left,
81}
82
83#[expect(missing_docs, reason = "TODO")]
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
85pub enum NetworkHint {
86    Auto,
87    TcpPort(Option<u16>),
88}
89
90pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
91    assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
92}
93
94#[stageleft::export(LocationKey)]
95new_key_type! {
96    /// A unique identifier for a clock tick.
97    pub struct LocationKey;
98}
99
100impl std::fmt::Display for LocationKey {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        write!(f, "loc{:?}", self.data()) // `"loc1v1"``
103    }
104}
105
106/// This is used for the ECS membership stream.
107/// TODO(mingwei): Make this more robust?
108impl std::str::FromStr for LocationKey {
109    type Err = Option<ParseIntError>;
110
111    fn from_str(s: &str) -> Result<Self, Self::Err> {
112        let nvn = s.strip_prefix("loc").ok_or(None)?;
113        let (idx, ver) = nvn.split_once("v").ok_or(None)?;
114        let idx: u64 = idx.parse()?;
115        let ver: u64 = ver.parse()?;
116        Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
117    }
118}
119
120impl LocationKey {
121    /// TODO(minwgei): Remove this and avoid magic key for simulator external.
122    /// The first location key, used by the simulator as the default external location.
123    pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); // `1v1`
124
125    /// A key for testing with index 1.
126    #[cfg(test)]
127    pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000001)); // `1v255`
128
129    /// A key for testing with index 2.
130    #[cfg(test)]
131    pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000002)); // `2v255`
132}
133
134/// This is used within `q!` code in docker and ECS.
135impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
136    type O = LocationKey;
137
138    fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
139    where
140        Self: Sized,
141    {
142        let root = get_this_crate();
143        let n = Key::data(&self).as_ffi();
144        (
145            QuoteTokens {
146                prelude: None,
147                expr: Some(quote! {
148                    #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
149                }),
150            },
151            (),
152        )
153    }
154}
155
156/// A simple enum for the type of a root location.
157#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
158pub enum LocationType {
159    /// A process (single node).
160    Process,
161    /// A cluster (multiple nodes).
162    Cluster,
163    /// An external client.
164    External,
165}
166
167/// A location where data can be materialized and computation can be executed.
168///
169/// Hydro is a **global**, **distributed** programming model. This means that the data
170/// and computation in a Hydro program can be spread across multiple machines, data
171/// centers, and even continents. To achieve this, Hydro uses the concept of
172/// **locations** to keep track of _where_ data is located and computation is executed.
173///
174/// Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
175/// which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
176/// and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
177/// to allow live collections to be _moved_ between locations via network send/receive.
178///
179/// See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
180#[expect(
181    private_bounds,
182    reason = "only internal Hydro code can define location types"
183)]
184pub trait Location<'a>: dynamic::DynLocation {
185    /// The root location type for this location.
186    ///
187    /// For top-level locations like [`Process`] and [`Cluster`], this is `Self`.
188    /// For nested locations like [`Tick`], this is the root location that contains it.
189    type Root: Location<'a>;
190
191    /// Returns the root location for this location.
192    ///
193    /// For top-level locations like [`Process`] and [`Cluster`], this returns `self`.
194    /// For nested locations like [`Tick`], this returns the root location that contains it.
195    fn root(&self) -> Self::Root;
196
197    /// Attempts to create a new [`Tick`] clock domain at this location.
198    ///
199    /// Returns `Some(Tick)` if this is a top-level location (like [`Process`] or [`Cluster`]),
200    /// or `None` if this location is already inside a tick (nested ticks are not supported).
201    ///
202    /// Prefer using [`Location::tick`] when you know the location is top-level.
203    fn try_tick(&self) -> Option<Tick<Self>> {
204        if Self::is_top_level() {
205            let id = self.flow_state().borrow_mut().next_clock_id();
206            Some(Tick {
207                id,
208                l: self.clone(),
209            })
210        } else {
211            None
212        }
213    }
214
215    /// Returns the unique identifier for this location.
216    fn id(&self) -> LocationId {
217        dynamic::DynLocation::id(self)
218    }
219
220    /// Creates a new [`Tick`] clock domain at this location.
221    ///
222    /// A tick represents a logical clock that can be used to batch streaming data
223    /// into discrete time steps. This is useful for implementing iterative algorithms
224    /// or for synchronizing data across multiple streams.
225    ///
226    /// # Example
227    /// ```rust
228    /// # #[cfg(feature = "deploy")] {
229    /// # use hydro_lang::prelude::*;
230    /// # use futures::StreamExt;
231    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
232    /// let tick = process.tick();
233    /// let inside_tick = process
234    ///     .source_iter(q!(vec![1, 2, 3, 4]))
235    ///     .batch(&tick, nondet!(/** test */));
236    /// inside_tick.all_ticks()
237    /// # }, |mut stream| async move {
238    /// // 1, 2, 3, 4
239    /// # for w in vec![1, 2, 3, 4] {
240    /// #     assert_eq!(stream.next().await.unwrap(), w);
241    /// # }
242    /// # }));
243    /// # }
244    /// ```
245    fn tick(&self) -> Tick<Self>
246    where
247        Self: NoTick,
248    {
249        let id = self.flow_state().borrow_mut().next_clock_id();
250        Tick {
251            id,
252            l: self.clone(),
253        }
254    }
255
256    /// Creates an unbounded stream that continuously emits unit values `()`.
257    ///
258    /// This is useful for driving computations that need to run continuously,
259    /// such as polling or heartbeat mechanisms.
260    ///
261    /// # Example
262    /// ```rust
263    /// # #[cfg(feature = "deploy")] {
264    /// # use hydro_lang::prelude::*;
265    /// # use futures::StreamExt;
266    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
267    /// let tick = process.tick();
268    /// process.spin()
269    ///     .batch(&tick, nondet!(/** test */))
270    ///     .map(q!(|_| 42))
271    ///     .all_ticks()
272    /// # }, |mut stream| async move {
273    /// // 42, 42, 42, ...
274    /// # assert_eq!(stream.next().await.unwrap(), 42);
275    /// # assert_eq!(stream.next().await.unwrap(), 42);
276    /// # assert_eq!(stream.next().await.unwrap(), 42);
277    /// # }));
278    /// # }
279    /// ```
280    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
281    where
282        Self: Sized + NoTick,
283    {
284        Stream::new(
285            self.clone(),
286            HydroNode::Source {
287                source: HydroSource::Spin(),
288                metadata: self.new_node_metadata(Stream::<
289                    (),
290                    Self,
291                    Unbounded,
292                    TotalOrder,
293                    ExactlyOnce,
294                >::collection_kind()),
295            },
296        )
297    }
298
299    /// Creates a stream from an async [`FuturesStream`].
300    ///
301    /// This is useful for integrating with external async data sources,
302    /// such as network connections or file readers.
303    ///
304    /// # Example
305    /// ```rust
306    /// # #[cfg(feature = "deploy")] {
307    /// # use hydro_lang::prelude::*;
308    /// # use futures::StreamExt;
309    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
310    /// process.source_stream(q!(futures::stream::iter(vec![1, 2, 3])))
311    /// # }, |mut stream| async move {
312    /// // 1, 2, 3
313    /// # for w in vec![1, 2, 3] {
314    /// #     assert_eq!(stream.next().await.unwrap(), w);
315    /// # }
316    /// # }));
317    /// # }
318    /// ```
319    fn source_stream<T, E>(
320        &self,
321        e: impl QuotedWithContext<'a, E, Self>,
322    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
323    where
324        E: FuturesStream<Item = T> + Unpin,
325        Self: Sized + NoTick,
326    {
327        let e = e.splice_untyped_ctx(self);
328
329        Stream::new(
330            self.clone(),
331            HydroNode::Source {
332                source: HydroSource::Stream(e.into()),
333                metadata: self.new_node_metadata(Stream::<
334                    T,
335                    Self,
336                    Unbounded,
337                    TotalOrder,
338                    ExactlyOnce,
339                >::collection_kind()),
340            },
341        )
342    }
343
344    /// Creates a bounded stream from an iterator.
345    ///
346    /// The iterator is evaluated once at runtime, and all elements are emitted
347    /// in order. This is useful for creating streams from static data or
348    /// for testing.
349    ///
350    /// # Example
351    /// ```rust
352    /// # #[cfg(feature = "deploy")] {
353    /// # use hydro_lang::prelude::*;
354    /// # use futures::StreamExt;
355    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
356    /// process.source_iter(q!(vec![1, 2, 3, 4]))
357    /// # }, |mut stream| async move {
358    /// // 1, 2, 3, 4
359    /// # for w in vec![1, 2, 3, 4] {
360    /// #     assert_eq!(stream.next().await.unwrap(), w);
361    /// # }
362    /// # }));
363    /// # }
364    /// ```
365    fn source_iter<T, E>(
366        &self,
367        e: impl QuotedWithContext<'a, E, Self>,
368    ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
369    where
370        E: IntoIterator<Item = T>,
371        Self: Sized + NoTick,
372    {
373        let e = e.splice_typed_ctx(self);
374
375        Stream::new(
376            self.clone(),
377            HydroNode::Source {
378                source: HydroSource::Iter(e.into()),
379                metadata: self.new_node_metadata(
380                    Stream::<T, Self, Bounded, TotalOrder, ExactlyOnce>::collection_kind(),
381                ),
382            },
383        )
384    }
385
386    /// Creates a stream of membership events for a cluster.
387    ///
388    /// This stream emits [`MembershipEvent::Joined`] when a cluster member joins
389    /// and [`MembershipEvent::Left`] when a cluster member leaves. The stream is
390    /// keyed by the [`MemberId`] of the cluster member.
391    ///
392    /// This is useful for implementing protocols that need to track cluster membership,
393    /// such as broadcasting to all members or detecting failures.
394    ///
395    /// # Example
396    /// ```rust
397    /// # #[cfg(feature = "deploy")] {
398    /// # use hydro_lang::prelude::*;
399    /// # use futures::StreamExt;
400    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
401    /// let p1 = flow.process::<()>();
402    /// let workers: Cluster<()> = flow.cluster::<()>();
403    /// # // do nothing on each worker
404    /// # workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
405    /// let cluster_members = p1.source_cluster_members(&workers);
406    /// # cluster_members.entries().send(&p2, TCP.fail_stop().bincode())
407    /// // if there are 4 members in the cluster, we would see a join event for each
408    /// // { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
409    /// # }, |mut stream| async move {
410    /// # let mut results = Vec::new();
411    /// # for w in 0..4 {
412    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
413    /// # }
414    /// # results.sort();
415    /// # assert_eq!(results, vec!["(MemberId::<()>(0), Joined)", "(MemberId::<()>(1), Joined)", "(MemberId::<()>(2), Joined)", "(MemberId::<()>(3), Joined)"]);
416    /// # }));
417    /// # }
418    /// ```
419    fn source_cluster_members<C: 'a>(
420        &self,
421        cluster: &Cluster<'a, C>,
422    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
423    where
424        Self: Sized + NoTick,
425    {
426        Stream::new(
427            self.clone(),
428            HydroNode::Source {
429                source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
430                metadata: self.new_node_metadata(Stream::<
431                    (TaglessMemberId, MembershipEvent),
432                    Self,
433                    Unbounded,
434                    TotalOrder,
435                    ExactlyOnce,
436                >::collection_kind()),
437            },
438        )
439        .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
440        .into_keyed()
441    }
442
443    /// Creates a one-way connection from an external process to receive raw bytes.
444    ///
445    /// Returns a port handle for the external process to connect to, and a stream
446    /// of received byte buffers.
447    ///
448    /// For bidirectional communication or typed data, see [`Location::bind_single_client`]
449    /// or [`Location::source_external_bincode`].
450    fn source_external_bytes<L>(
451        &self,
452        from: &External<L>,
453    ) -> (
454        ExternalBytesPort,
455        Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
456    )
457    where
458        Self: Sized + NoTick,
459    {
460        let (port, stream, sink) =
461            self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
462
463        sink.complete(self.source_iter(q!([])));
464
465        (port, stream)
466    }
467
468    /// Creates a one-way connection from an external process to receive bincode-serialized data.
469    ///
470    /// Returns a sink handle for the external process to send data to, and a stream
471    /// of received values.
472    ///
473    /// For bidirectional communication, see [`Location::bind_single_client_bincode`].
474    #[expect(clippy::type_complexity, reason = "stream markers")]
475    fn source_external_bincode<L, T, O: Ordering, R: Retries>(
476        &self,
477        from: &External<L>,
478    ) -> (
479        ExternalBincodeSink<T, NotMany, O, R>,
480        Stream<T, Self, Unbounded, O, R>,
481    )
482    where
483        Self: Sized + NoTick,
484        T: Serialize + DeserializeOwned,
485    {
486        let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
487        sink.complete(self.source_iter(q!([])));
488
489        (
490            ExternalBincodeSink {
491                process_key: from.key,
492                port_id: port.port_id,
493                _phantom: PhantomData,
494            },
495            stream.weaken_ordering().weaken_retries(),
496        )
497    }
498
499    /// Sets up a simulated input port on this location for testing.
500    ///
501    /// Returns a handle to send messages to the location as well as a stream
502    /// of received messages. This is only available when the `sim` feature is enabled.
503    #[cfg(feature = "sim")]
504    #[expect(clippy::type_complexity, reason = "stream markers")]
505    fn sim_input<T, O: Ordering, R: Retries>(
506        &self,
507    ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
508    where
509        Self: Sized + NoTick,
510        T: Serialize + DeserializeOwned,
511    {
512        let external_location: External<'a, ()> = External {
513            key: LocationKey::FIRST,
514            flow_state: self.flow_state().clone(),
515            _phantom: PhantomData,
516        };
517
518        let (external, stream) = self.source_external_bincode(&external_location);
519
520        (SimSender(external.port_id, PhantomData), stream)
521    }
522
523    /// Creates an external input stream for embedded deployment mode.
524    ///
525    /// The `name` parameter specifies the name of the generated function parameter
526    /// that will supply data to this stream at runtime. The generated function will
527    /// accept an `impl Stream<Item = T> + Unpin` argument with this name.
528    fn embedded_input<T>(
529        &self,
530        name: impl Into<String>,
531    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
532    where
533        Self: Sized + NoTick,
534    {
535        let ident = syn::Ident::new(&name.into(), Span::call_site());
536
537        Stream::new(
538            self.clone(),
539            HydroNode::Source {
540                source: HydroSource::Embedded(ident),
541                metadata: self.new_node_metadata(Stream::<
542                    T,
543                    Self,
544                    Unbounded,
545                    TotalOrder,
546                    ExactlyOnce,
547                >::collection_kind()),
548            },
549        )
550    }
551
552    /// Establishes a server on this location to receive a bidirectional connection from a single
553    /// client, identified by the given `External` handle. Returns a port handle for the external
554    /// process to connect to, a stream of incoming messages, and a handle to send outgoing
555    /// messages.
556    ///
557    /// # Example
558    /// ```rust
559    /// # #[cfg(feature = "deploy")] {
560    /// # use hydro_lang::prelude::*;
561    /// # use hydro_deploy::Deployment;
562    /// # use futures::{SinkExt, StreamExt};
563    /// # tokio_test::block_on(async {
564    /// # use bytes::Bytes;
565    /// # use hydro_lang::location::NetworkHint;
566    /// # use tokio_util::codec::LengthDelimitedCodec;
567    /// # let mut flow = FlowBuilder::new();
568    /// let node = flow.process::<()>();
569    /// let external = flow.external::<()>();
570    /// let (port, incoming, outgoing) =
571    ///     node.bind_single_client::<_, Bytes, LengthDelimitedCodec>(&external, NetworkHint::Auto);
572    /// outgoing.complete(incoming.map(q!(|data /* : Bytes */| {
573    ///     let mut resp: Vec<u8> = data.into();
574    ///     resp.push(42);
575    ///     resp.into() // : Bytes
576    /// })));
577    ///
578    /// # let mut deployment = Deployment::new();
579    /// let nodes = flow // ... with_process and with_external
580    /// #     .with_process(&node, deployment.Localhost())
581    /// #     .with_external(&external, deployment.Localhost())
582    /// #     .deploy(&mut deployment);
583    ///
584    /// deployment.deploy().await.unwrap();
585    /// deployment.start().await.unwrap();
586    ///
587    /// let (mut external_out, mut external_in) = nodes.connect(port).await;
588    /// external_in.send(vec![1, 2, 3].into()).await.unwrap();
589    /// assert_eq!(
590    ///     external_out.next().await.unwrap().unwrap(),
591    ///     vec![1, 2, 3, 42]
592    /// );
593    /// # });
594    /// # }
595    /// ```
596    #[expect(clippy::type_complexity, reason = "stream markers")]
597    fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
598        &self,
599        from: &External<L>,
600        port_hint: NetworkHint,
601    ) -> (
602        ExternalBytesPort<NotMany>,
603        Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
604        ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
605    )
606    where
607        Self: Sized + NoTick,
608    {
609        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
610
611        let (fwd_ref, to_sink) =
612            self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
613        let mut flow_state_borrow = self.flow_state().borrow_mut();
614
615        flow_state_borrow.push_root(HydroRoot::SendExternal {
616            to_external_key: from.key,
617            to_port_id: next_external_port_id,
618            to_many: false,
619            unpaired: false,
620            serialize_fn: None,
621            instantiate_fn: DebugInstantiate::Building,
622            input: Box::new(to_sink.ir_node.into_inner()),
623            op_metadata: HydroIrOpMetadata::new(),
624        });
625
626        let raw_stream: Stream<
627            Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
628            Self,
629            Unbounded,
630            TotalOrder,
631            ExactlyOnce,
632        > = Stream::new(
633            self.clone(),
634            HydroNode::ExternalInput {
635                from_external_key: from.key,
636                from_port_id: next_external_port_id,
637                from_many: false,
638                codec_type: quote_type::<Codec>().into(),
639                port_hint,
640                instantiate_fn: DebugInstantiate::Building,
641                deserialize_fn: None,
642                metadata: self.new_node_metadata(Stream::<
643                    Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
644                    Self,
645                    Unbounded,
646                    TotalOrder,
647                    ExactlyOnce,
648                >::collection_kind()),
649            },
650        );
651
652        (
653            ExternalBytesPort {
654                process_key: from.key,
655                port_id: next_external_port_id,
656                _phantom: PhantomData,
657            },
658            raw_stream.flatten_ordered(),
659            fwd_ref,
660        )
661    }
662
663    /// Establishes a bidirectional connection from a single external client using bincode serialization.
664    ///
665    /// Returns a port handle for the external process to connect to, a stream of incoming messages,
666    /// and a handle to send outgoing messages. This is a convenience wrapper around
667    /// [`Location::bind_single_client`] that uses bincode for serialization.
668    ///
669    /// # Type Parameters
670    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
671    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
672    #[expect(clippy::type_complexity, reason = "stream markers")]
673    fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
674        &self,
675        from: &External<L>,
676    ) -> (
677        ExternalBincodeBidi<InT, OutT, NotMany>,
678        Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
679        ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
680    )
681    where
682        Self: Sized + NoTick,
683    {
684        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
685
686        let (fwd_ref, to_sink) =
687            self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
688        let mut flow_state_borrow = self.flow_state().borrow_mut();
689
690        let root = get_this_crate();
691
692        let out_t_type = quote_type::<OutT>();
693        let ser_fn: syn::Expr = syn::parse_quote! {
694            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
695                |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
696            )
697        };
698
699        flow_state_borrow.push_root(HydroRoot::SendExternal {
700            to_external_key: from.key,
701            to_port_id: next_external_port_id,
702            to_many: false,
703            unpaired: false,
704            serialize_fn: Some(ser_fn.into()),
705            instantiate_fn: DebugInstantiate::Building,
706            input: Box::new(to_sink.ir_node.into_inner()),
707            op_metadata: HydroIrOpMetadata::new(),
708        });
709
710        let in_t_type = quote_type::<InT>();
711
712        let deser_fn: syn::Expr = syn::parse_quote! {
713            |res| {
714                let b = res.unwrap();
715                #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
716            }
717        };
718
719        let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
720            self.clone(),
721            HydroNode::ExternalInput {
722                from_external_key: from.key,
723                from_port_id: next_external_port_id,
724                from_many: false,
725                codec_type: quote_type::<LengthDelimitedCodec>().into(),
726                port_hint: NetworkHint::Auto,
727                instantiate_fn: DebugInstantiate::Building,
728                deserialize_fn: Some(deser_fn.into()),
729                metadata: self.new_node_metadata(Stream::<
730                    InT,
731                    Self,
732                    Unbounded,
733                    TotalOrder,
734                    ExactlyOnce,
735                >::collection_kind()),
736            },
737        );
738
739        (
740            ExternalBincodeBidi {
741                process_key: from.key,
742                port_id: next_external_port_id,
743                _phantom: PhantomData,
744            },
745            raw_stream,
746            fwd_ref,
747        )
748    }
749
750    /// Establishes a server on this location to receive bidirectional connections from multiple
751    /// external clients using raw bytes.
752    ///
753    /// Unlike [`Location::bind_single_client`], this method supports multiple concurrent client
754    /// connections. Each client is assigned a unique `u64` identifier.
755    ///
756    /// Returns:
757    /// - A port handle for external processes to connect to
758    /// - A keyed stream of incoming messages, keyed by client ID
759    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
760    /// - A handle to send outgoing messages, keyed by client ID
761    #[expect(clippy::type_complexity, reason = "stream markers")]
762    fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
763        &self,
764        from: &External<L>,
765        port_hint: NetworkHint,
766    ) -> (
767        ExternalBytesPort<Many>,
768        KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
769        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
770        ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
771    )
772    where
773        Self: Sized + NoTick,
774    {
775        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
776
777        let (fwd_ref, to_sink) =
778            self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
779        let mut flow_state_borrow = self.flow_state().borrow_mut();
780
781        flow_state_borrow.push_root(HydroRoot::SendExternal {
782            to_external_key: from.key,
783            to_port_id: next_external_port_id,
784            to_many: true,
785            unpaired: false,
786            serialize_fn: None,
787            instantiate_fn: DebugInstantiate::Building,
788            input: Box::new(to_sink.entries().ir_node.into_inner()),
789            op_metadata: HydroIrOpMetadata::new(),
790        });
791
792        let raw_stream: Stream<
793            Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
794            Self,
795            Unbounded,
796            TotalOrder,
797            ExactlyOnce,
798        > = Stream::new(
799            self.clone(),
800            HydroNode::ExternalInput {
801                from_external_key: from.key,
802                from_port_id: next_external_port_id,
803                from_many: true,
804                codec_type: quote_type::<Codec>().into(),
805                port_hint,
806                instantiate_fn: DebugInstantiate::Building,
807                deserialize_fn: None,
808                metadata: self.new_node_metadata(Stream::<
809                    Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
810                    Self,
811                    Unbounded,
812                    TotalOrder,
813                    ExactlyOnce,
814                >::collection_kind()),
815            },
816        );
817
818        let membership_stream_ident = syn::Ident::new(
819            &format!(
820                "__hydro_deploy_many_{}_{}_membership",
821                from.key, next_external_port_id
822            ),
823            Span::call_site(),
824        );
825        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
826        let raw_membership_stream: KeyedStream<
827            u64,
828            bool,
829            Self,
830            Unbounded,
831            TotalOrder,
832            ExactlyOnce,
833        > = KeyedStream::new(
834            self.clone(),
835            HydroNode::Source {
836                source: HydroSource::Stream(membership_stream_expr.into()),
837                metadata: self.new_node_metadata(KeyedStream::<
838                    u64,
839                    bool,
840                    Self,
841                    Unbounded,
842                    TotalOrder,
843                    ExactlyOnce,
844                >::collection_kind()),
845            },
846        );
847
848        (
849            ExternalBytesPort {
850                process_key: from.key,
851                port_id: next_external_port_id,
852                _phantom: PhantomData,
853            },
854            raw_stream
855                .flatten_ordered() // TODO(shadaj): this silently drops framing errors, decide on right defaults
856                .into_keyed(),
857            raw_membership_stream.map(q!(|join| {
858                if join {
859                    MembershipEvent::Joined
860                } else {
861                    MembershipEvent::Left
862                }
863            })),
864            fwd_ref,
865        )
866    }
867
868    /// Establishes a server on this location to receive bidirectional connections from multiple
869    /// external clients using bincode serialization.
870    ///
871    /// Unlike [`Location::bind_single_client_bincode`], this method supports multiple concurrent
872    /// client connections. Each client is assigned a unique `u64` identifier.
873    ///
874    /// Returns:
875    /// - A port handle for external processes to connect to
876    /// - A keyed stream of incoming messages, keyed by client ID
877    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
878    /// - A handle to send outgoing messages, keyed by client ID
879    ///
880    /// # Type Parameters
881    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
882    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
883    #[expect(clippy::type_complexity, reason = "stream markers")]
884    fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
885        &self,
886        from: &External<L>,
887    ) -> (
888        ExternalBincodeBidi<InT, OutT, Many>,
889        KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
890        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
891        ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
892    )
893    where
894        Self: Sized + NoTick,
895    {
896        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
897
898        let (fwd_ref, to_sink) =
899            self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
900        let mut flow_state_borrow = self.flow_state().borrow_mut();
901
902        let root = get_this_crate();
903
904        let out_t_type = quote_type::<OutT>();
905        let ser_fn: syn::Expr = syn::parse_quote! {
906            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
907                |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
908            )
909        };
910
911        flow_state_borrow.push_root(HydroRoot::SendExternal {
912            to_external_key: from.key,
913            to_port_id: next_external_port_id,
914            to_many: true,
915            unpaired: false,
916            serialize_fn: Some(ser_fn.into()),
917            instantiate_fn: DebugInstantiate::Building,
918            input: Box::new(to_sink.entries().ir_node.into_inner()),
919            op_metadata: HydroIrOpMetadata::new(),
920        });
921
922        let in_t_type = quote_type::<InT>();
923
924        let deser_fn: syn::Expr = syn::parse_quote! {
925            |res| {
926                let (id, b) = res.unwrap();
927                (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
928            }
929        };
930
931        let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
932            KeyedStream::new(
933                self.clone(),
934                HydroNode::ExternalInput {
935                    from_external_key: from.key,
936                    from_port_id: next_external_port_id,
937                    from_many: true,
938                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
939                    port_hint: NetworkHint::Auto,
940                    instantiate_fn: DebugInstantiate::Building,
941                    deserialize_fn: Some(deser_fn.into()),
942                    metadata: self.new_node_metadata(KeyedStream::<
943                        u64,
944                        InT,
945                        Self,
946                        Unbounded,
947                        TotalOrder,
948                        ExactlyOnce,
949                    >::collection_kind()),
950                },
951            );
952
953        let membership_stream_ident = syn::Ident::new(
954            &format!(
955                "__hydro_deploy_many_{}_{}_membership",
956                from.key, next_external_port_id
957            ),
958            Span::call_site(),
959        );
960        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
961        let raw_membership_stream: KeyedStream<
962            u64,
963            bool,
964            Self,
965            Unbounded,
966            TotalOrder,
967            ExactlyOnce,
968        > = KeyedStream::new(
969            self.clone(),
970            HydroNode::Source {
971                source: HydroSource::Stream(membership_stream_expr.into()),
972                metadata: self.new_node_metadata(KeyedStream::<
973                    u64,
974                    bool,
975                    Self,
976                    Unbounded,
977                    TotalOrder,
978                    ExactlyOnce,
979                >::collection_kind()),
980            },
981        );
982
983        (
984            ExternalBincodeBidi {
985                process_key: from.key,
986                port_id: next_external_port_id,
987                _phantom: PhantomData,
988            },
989            raw_stream,
990            raw_membership_stream.map(q!(|join| {
991                if join {
992                    MembershipEvent::Joined
993                } else {
994                    MembershipEvent::Left
995                }
996            })),
997            fwd_ref,
998        )
999    }
1000
1001    /// Constructs a [`Singleton`] materialized at this location with the given static value.
1002    ///
1003    /// # Example
1004    /// ```rust
1005    /// # #[cfg(feature = "deploy")] {
1006    /// # use hydro_lang::prelude::*;
1007    /// # use futures::StreamExt;
1008    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1009    /// let tick = process.tick();
1010    /// let singleton = tick.singleton(q!(5));
1011    /// # singleton.all_ticks()
1012    /// # }, |mut stream| async move {
1013    /// // 5
1014    /// # assert_eq!(stream.next().await.unwrap(), 5);
1015    /// # }));
1016    /// # }
1017    /// ```
1018    fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Bounded>
1019    where
1020        T: Clone,
1021        Self: Sized,
1022    {
1023        let e = e.splice_untyped_ctx(self);
1024
1025        Singleton::new(
1026            self.clone(),
1027            HydroNode::SingletonSource {
1028                value: e.into(),
1029                metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
1030            },
1031        )
1032    }
1033
1034    /// Generates a stream with values emitted at a fixed interval, with
1035    /// each value being the current time (as an [`tokio::time::Instant`]).
1036    ///
1037    /// The clock source used is monotonic, so elements will be emitted in
1038    /// increasing order.
1039    ///
1040    /// # Non-Determinism
1041    /// Because this stream is generated by an OS timer, it will be
1042    /// non-deterministic because each timestamp will be arbitrary.
1043    fn source_interval(
1044        &self,
1045        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1046        _nondet: NonDet,
1047    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1048    where
1049        Self: Sized + NoTick,
1050    {
1051        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1052            tokio::time::interval(interval)
1053        )))
1054    }
1055
1056    /// Generates a stream with values emitted at a fixed interval (with an
1057    /// initial delay), with each value being the current time
1058    /// (as an [`tokio::time::Instant`]).
1059    ///
1060    /// The clock source used is monotonic, so elements will be emitted in
1061    /// increasing order.
1062    ///
1063    /// # Non-Determinism
1064    /// Because this stream is generated by an OS timer, it will be
1065    /// non-deterministic because each timestamp will be arbitrary.
1066    fn source_interval_delayed(
1067        &self,
1068        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1069        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1070        _nondet: NonDet,
1071    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1072    where
1073        Self: Sized + NoTick,
1074    {
1075        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1076            tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
1077        )))
1078    }
1079
1080    /// Creates a forward reference for defining recursive or mutually-dependent dataflows.
1081    ///
1082    /// Returns a handle that must be completed with the actual stream, and a placeholder
1083    /// stream that can be used in the dataflow graph before the actual stream is defined.
1084    ///
1085    /// This is useful for implementing feedback loops or recursive computations where
1086    /// a stream depends on its own output.
1087    ///
1088    /// # Example
1089    /// ```rust
1090    /// # #[cfg(feature = "deploy")] {
1091    /// # use hydro_lang::prelude::*;
1092    /// # use hydro_lang::live_collections::stream::NoOrder;
1093    /// # use futures::StreamExt;
1094    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1095    /// // Create a forward reference for the feedback stream
1096    /// let (complete, feedback) = process.forward_ref::<Stream<i32, _, _, NoOrder>>();
1097    ///
1098    /// // Combine initial input with feedback, then increment
1099    /// let input: Stream<_, _, Unbounded> = process.source_iter(q!([1])).into();
1100    /// let output: Stream<_, _, _, NoOrder> = input.interleave(feedback).map(q!(|x| x + 1));
1101    ///
1102    /// // Complete the forward reference with the output
1103    /// complete.complete(output.clone());
1104    /// output
1105    /// # }, |mut stream| async move {
1106    /// // 2, 3, 4, 5, ...
1107    /// # assert_eq!(stream.next().await.unwrap(), 2);
1108    /// # assert_eq!(stream.next().await.unwrap(), 3);
1109    /// # assert_eq!(stream.next().await.unwrap(), 4);
1110    /// # }));
1111    /// # }
1112    /// ```
1113    fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1114    where
1115        S: CycleCollection<'a, ForwardRef, Location = Self>,
1116    {
1117        let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1118        (
1119            ForwardHandle::new(cycle_id, Location::id(self)),
1120            S::create_source(cycle_id, self.clone()),
1121        )
1122    }
1123}
1124
1125#[cfg(feature = "deploy")]
1126#[cfg(test)]
1127mod tests {
1128    use std::collections::HashSet;
1129
1130    use futures::{SinkExt, StreamExt};
1131    use hydro_deploy::Deployment;
1132    use stageleft::q;
1133    use tokio_util::codec::LengthDelimitedCodec;
1134
1135    use crate::compile::builder::FlowBuilder;
1136    use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1137    use crate::location::{Location, NetworkHint};
1138    use crate::nondet::nondet;
1139
1140    #[tokio::test]
1141    async fn top_level_singleton_replay_cardinality() {
1142        let mut deployment = Deployment::new();
1143
1144        let mut flow = FlowBuilder::new();
1145        let node = flow.process::<()>();
1146        let external = flow.external::<()>();
1147
1148        let (in_port, input) =
1149            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1150        let singleton = node.singleton(q!(123));
1151        let tick = node.tick();
1152        let out = input
1153            .batch(&tick, nondet!(/** test */))
1154            .cross_singleton(singleton.clone().snapshot(&tick, nondet!(/** test */)))
1155            .cross_singleton(
1156                singleton
1157                    .snapshot(&tick, nondet!(/** test */))
1158                    .into_stream()
1159                    .count(),
1160            )
1161            .all_ticks()
1162            .send_bincode_external(&external);
1163
1164        let nodes = flow
1165            .with_process(&node, deployment.Localhost())
1166            .with_external(&external, deployment.Localhost())
1167            .deploy(&mut deployment);
1168
1169        deployment.deploy().await.unwrap();
1170
1171        let mut external_in = nodes.connect(in_port).await;
1172        let mut external_out = nodes.connect(out).await;
1173
1174        deployment.start().await.unwrap();
1175
1176        external_in.send(1).await.unwrap();
1177        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1178
1179        external_in.send(2).await.unwrap();
1180        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1181    }
1182
1183    #[tokio::test]
1184    async fn tick_singleton_replay_cardinality() {
1185        let mut deployment = Deployment::new();
1186
1187        let mut flow = FlowBuilder::new();
1188        let node = flow.process::<()>();
1189        let external = flow.external::<()>();
1190
1191        let (in_port, input) =
1192            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1193        let tick = node.tick();
1194        let singleton = tick.singleton(q!(123));
1195        let out = input
1196            .batch(&tick, nondet!(/** test */))
1197            .cross_singleton(singleton.clone())
1198            .cross_singleton(singleton.into_stream().count())
1199            .all_ticks()
1200            .send_bincode_external(&external);
1201
1202        let nodes = flow
1203            .with_process(&node, deployment.Localhost())
1204            .with_external(&external, deployment.Localhost())
1205            .deploy(&mut deployment);
1206
1207        deployment.deploy().await.unwrap();
1208
1209        let mut external_in = nodes.connect(in_port).await;
1210        let mut external_out = nodes.connect(out).await;
1211
1212        deployment.start().await.unwrap();
1213
1214        external_in.send(1).await.unwrap();
1215        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1216
1217        external_in.send(2).await.unwrap();
1218        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1219    }
1220
1221    #[tokio::test]
1222    async fn external_bytes() {
1223        let mut deployment = Deployment::new();
1224
1225        let mut flow = FlowBuilder::new();
1226        let first_node = flow.process::<()>();
1227        let external = flow.external::<()>();
1228
1229        let (in_port, input) = first_node.source_external_bytes(&external);
1230        let out = input.send_bincode_external(&external);
1231
1232        let nodes = flow
1233            .with_process(&first_node, deployment.Localhost())
1234            .with_external(&external, deployment.Localhost())
1235            .deploy(&mut deployment);
1236
1237        deployment.deploy().await.unwrap();
1238
1239        let mut external_in = nodes.connect(in_port).await.1;
1240        let mut external_out = nodes.connect(out).await;
1241
1242        deployment.start().await.unwrap();
1243
1244        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1245
1246        assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1247    }
1248
1249    #[tokio::test]
1250    async fn multi_external_source() {
1251        let mut deployment = Deployment::new();
1252
1253        let mut flow = FlowBuilder::new();
1254        let first_node = flow.process::<()>();
1255        let external = flow.external::<()>();
1256
1257        let (in_port, input, _membership, complete_sink) =
1258            first_node.bidi_external_many_bincode(&external);
1259        let out = input.entries().send_bincode_external(&external);
1260        complete_sink.complete(
1261            first_node
1262                .source_iter::<(u64, ()), _>(q!([]))
1263                .into_keyed()
1264                .weaken_ordering(),
1265        );
1266
1267        let nodes = flow
1268            .with_process(&first_node, deployment.Localhost())
1269            .with_external(&external, deployment.Localhost())
1270            .deploy(&mut deployment);
1271
1272        deployment.deploy().await.unwrap();
1273
1274        let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1275        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1276        let external_out = nodes.connect(out).await;
1277
1278        deployment.start().await.unwrap();
1279
1280        external_in_1.send(123).await.unwrap();
1281        external_in_2.send(456).await.unwrap();
1282
1283        assert_eq!(
1284            external_out.take(2).collect::<HashSet<_>>().await,
1285            vec![(0, 123), (1, 456)].into_iter().collect()
1286        );
1287    }
1288
1289    #[tokio::test]
1290    async fn second_connection_only_multi_source() {
1291        let mut deployment = Deployment::new();
1292
1293        let mut flow = FlowBuilder::new();
1294        let first_node = flow.process::<()>();
1295        let external = flow.external::<()>();
1296
1297        let (in_port, input, _membership, complete_sink) =
1298            first_node.bidi_external_many_bincode(&external);
1299        let out = input.entries().send_bincode_external(&external);
1300        complete_sink.complete(
1301            first_node
1302                .source_iter::<(u64, ()), _>(q!([]))
1303                .into_keyed()
1304                .weaken_ordering(),
1305        );
1306
1307        let nodes = flow
1308            .with_process(&first_node, deployment.Localhost())
1309            .with_external(&external, deployment.Localhost())
1310            .deploy(&mut deployment);
1311
1312        deployment.deploy().await.unwrap();
1313
1314        // intentionally skipped to test stream waking logic
1315        let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1316        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1317        let mut external_out = nodes.connect(out).await;
1318
1319        deployment.start().await.unwrap();
1320
1321        external_in_2.send(456).await.unwrap();
1322
1323        assert_eq!(external_out.next().await.unwrap(), (1, 456));
1324    }
1325
1326    #[tokio::test]
1327    async fn multi_external_bytes() {
1328        let mut deployment = Deployment::new();
1329
1330        let mut flow = FlowBuilder::new();
1331        let first_node = flow.process::<()>();
1332        let external = flow.external::<()>();
1333
1334        let (in_port, input, _membership, complete_sink) = first_node
1335            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1336        let out = input.entries().send_bincode_external(&external);
1337        complete_sink.complete(
1338            first_node
1339                .source_iter(q!([]))
1340                .into_keyed()
1341                .weaken_ordering(),
1342        );
1343
1344        let nodes = flow
1345            .with_process(&first_node, deployment.Localhost())
1346            .with_external(&external, deployment.Localhost())
1347            .deploy(&mut deployment);
1348
1349        deployment.deploy().await.unwrap();
1350
1351        let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1352        let mut external_in_2 = nodes.connect(in_port).await.1;
1353        let external_out = nodes.connect(out).await;
1354
1355        deployment.start().await.unwrap();
1356
1357        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1358        external_in_2.send(vec![4, 5].into()).await.unwrap();
1359
1360        assert_eq!(
1361            external_out.take(2).collect::<HashSet<_>>().await,
1362            vec![
1363                (0, (&[1u8, 2, 3] as &[u8]).into()),
1364                (1, (&[4u8, 5] as &[u8]).into())
1365            ]
1366            .into_iter()
1367            .collect()
1368        );
1369    }
1370
1371    #[tokio::test]
1372    async fn single_client_external_bytes() {
1373        let mut deployment = Deployment::new();
1374        let mut flow = FlowBuilder::new();
1375        let first_node = flow.process::<()>();
1376        let external = flow.external::<()>();
1377        let (port, input, complete_sink) = first_node
1378            .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1379        complete_sink.complete(input.map(q!(|data| {
1380            let mut resp: Vec<u8> = data.into();
1381            resp.push(42);
1382            resp.into() // : Bytes
1383        })));
1384
1385        let nodes = flow
1386            .with_process(&first_node, deployment.Localhost())
1387            .with_external(&external, deployment.Localhost())
1388            .deploy(&mut deployment);
1389
1390        deployment.deploy().await.unwrap();
1391        deployment.start().await.unwrap();
1392
1393        let (mut external_out, mut external_in) = nodes.connect(port).await;
1394
1395        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1396        assert_eq!(
1397            external_out.next().await.unwrap().unwrap(),
1398            vec![1, 2, 3, 42]
1399        );
1400    }
1401
1402    #[tokio::test]
1403    async fn echo_external_bytes() {
1404        let mut deployment = Deployment::new();
1405
1406        let mut flow = FlowBuilder::new();
1407        let first_node = flow.process::<()>();
1408        let external = flow.external::<()>();
1409
1410        let (port, input, _membership, complete_sink) = first_node
1411            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1412        complete_sink
1413            .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1414
1415        let nodes = flow
1416            .with_process(&first_node, deployment.Localhost())
1417            .with_external(&external, deployment.Localhost())
1418            .deploy(&mut deployment);
1419
1420        deployment.deploy().await.unwrap();
1421
1422        let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1423        let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1424
1425        deployment.start().await.unwrap();
1426
1427        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1428        external_in_2.send(vec![4, 5].into()).await.unwrap();
1429
1430        assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1431        assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1432    }
1433
1434    #[tokio::test]
1435    async fn echo_external_bincode() {
1436        let mut deployment = Deployment::new();
1437
1438        let mut flow = FlowBuilder::new();
1439        let first_node = flow.process::<()>();
1440        let external = flow.external::<()>();
1441
1442        let (port, input, _membership, complete_sink) =
1443            first_node.bidi_external_many_bincode(&external);
1444        complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1445
1446        let nodes = flow
1447            .with_process(&first_node, deployment.Localhost())
1448            .with_external(&external, deployment.Localhost())
1449            .deploy(&mut deployment);
1450
1451        deployment.deploy().await.unwrap();
1452
1453        let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1454        let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1455
1456        deployment.start().await.unwrap();
1457
1458        external_in_1.send("hi".to_owned()).await.unwrap();
1459        external_in_2.send("hello".to_owned()).await.unwrap();
1460
1461        assert_eq!(external_out_1.next().await.unwrap(), "HI");
1462        assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1463    }
1464}