Skip to main content

hydro_lang/live_collections/stream/
networking.rs

1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(feature = "sim")]
18use crate::location::LocationKey;
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::DynLocation;
21use crate::location::external_process::ExternalBincodeStream;
22use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
23use crate::networking::{NetworkFor, TCP};
24use crate::nondet::NonDet;
25#[cfg(feature = "sim")]
26use crate::sim::SimReceiver;
27use crate::staging_util::get_this_crate;
28
29// same as the one in `hydro_std`, but internal use only
30fn track_membership<'a, C, L: Location<'a> + NoTick>(
31    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
32) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
33    membership.fold(
34        q!(|| false),
35        q!(|present, event| {
36            match event {
37                MembershipEvent::Joined => *present = true,
38                MembershipEvent::Left => *present = false,
39            }
40        }),
41    )
42}
43
44fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
45    let root = get_this_crate();
46
47    if is_demux {
48        parse_quote! {
49            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
50                |(id, data)| {
51                    (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
52                }
53            )
54        }
55    } else {
56        parse_quote! {
57            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
58                |data| {
59                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
60                }
61            )
62        }
63    }
64}
65
66pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
67    serialize_bincode_with_type(is_demux, &quote_type::<T>())
68}
69
70fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
71    let root = get_this_crate();
72    if let Some(c_type) = tagged {
73        parse_quote! {
74            |res| {
75                let (id, b) = res.unwrap();
76                (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
77            }
78        }
79    } else {
80        parse_quote! {
81            |res| {
82                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
83            }
84        }
85    }
86}
87
88pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
89    deserialize_bincode_with_type(tagged, &quote_type::<T>())
90}
91
92impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
93    #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
94    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
95    /// using [`bincode`] to serialize/deserialize messages.
96    ///
97    /// The returned stream captures the elements received at the destination, where values will
98    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
99    /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
100    /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
101    /// dropped no further messages will be sent.
102    ///
103    /// # Example
104    /// ```rust
105    /// # #[cfg(feature = "deploy")] {
106    /// # use hydro_lang::prelude::*;
107    /// # use futures::StreamExt;
108    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
109    /// let p1 = flow.process::<()>();
110    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
111    /// let p2 = flow.process::<()>();
112    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
113    /// // 1, 2, 3
114    /// # on_p2.send_bincode(&p_out)
115    /// # }, |mut stream| async move {
116    /// # for w in 1..=3 {
117    /// #     assert_eq!(stream.next().await, Some(w));
118    /// # }
119    /// # }));
120    /// # }
121    /// ```
122    pub fn send_bincode<L2>(
123        self,
124        other: &Process<'a, L2>,
125    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
126    where
127        T: Serialize + DeserializeOwned,
128    {
129        self.send(other, TCP.fail_stop().bincode())
130    }
131
132    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
133    /// using the configuration in `via` to set up the message transport.
134    ///
135    /// The returned stream captures the elements received at the destination, where values will
136    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
137    /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
138    /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
139    /// dropped no further messages will be sent.
140    ///
141    /// # Example
142    /// ```rust
143    /// # #[cfg(feature = "deploy")] {
144    /// # use hydro_lang::prelude::*;
145    /// # use futures::StreamExt;
146    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
147    /// let p1 = flow.process::<()>();
148    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
149    /// let p2 = flow.process::<()>();
150    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.fail_stop().bincode());
151    /// // 1, 2, 3
152    /// # on_p2.send(&p_out, TCP.fail_stop().bincode())
153    /// # }, |mut stream| async move {
154    /// # for w in 1..=3 {
155    /// #     assert_eq!(stream.next().await, Some(w));
156    /// # }
157    /// # }));
158    /// # }
159    /// ```
160    pub fn send<L2, N: NetworkFor<T>>(
161        self,
162        to: &Process<'a, L2>,
163        via: N,
164    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
165    where
166        T: Serialize + DeserializeOwned,
167    {
168        let serialize_pipeline = Some(N::serialize_thunk(false));
169        let deserialize_pipeline = Some(N::deserialize_thunk(None));
170
171        let name = via.name();
172        if to.multiversioned() && name.is_none() {
173            panic!(
174                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
175            );
176        }
177
178        Stream::new(
179            to.clone(),
180            HydroNode::Network {
181                name: name.map(ToOwned::to_owned),
182                serialize_fn: serialize_pipeline.map(|e| e.into()),
183                instantiate_fn: DebugInstantiate::Building,
184                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
185                input: Box::new(self.ir_node.into_inner()),
186                metadata: to.new_node_metadata(
187                    Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
188                ),
189            },
190        )
191    }
192
193    #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
194    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
195    /// using [`bincode`] to serialize/deserialize messages.
196    ///
197    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
198    /// membership information. This is a common pattern in distributed systems for broadcasting data to
199    /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
200    /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
201    /// each element to all cluster members.
202    ///
203    /// # Non-Determinism
204    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
205    /// to the current cluster members _at that point in time_. Depending on when we are notified of
206    /// membership changes, we will broadcast each element to different members.
207    ///
208    /// # Example
209    /// ```rust
210    /// # #[cfg(feature = "deploy")] {
211    /// # use hydro_lang::prelude::*;
212    /// # use futures::StreamExt;
213    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
214    /// let p1 = flow.process::<()>();
215    /// let workers: Cluster<()> = flow.cluster::<()>();
216    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
217    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
218    /// # on_worker.send_bincode(&p2).entries()
219    /// // if there are 4 members in the cluster, each receives one element
220    /// // - MemberId::<()>(0): [123]
221    /// // - MemberId::<()>(1): [123]
222    /// // - MemberId::<()>(2): [123]
223    /// // - MemberId::<()>(3): [123]
224    /// # }, |mut stream| async move {
225    /// # let mut results = Vec::new();
226    /// # for w in 0..4 {
227    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
228    /// # }
229    /// # results.sort();
230    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
231    /// # }));
232    /// # }
233    /// ```
234    pub fn broadcast_bincode<L2: 'a>(
235        self,
236        other: &Cluster<'a, L2>,
237        nondet_membership: NonDet,
238    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
239    where
240        T: Clone + Serialize + DeserializeOwned,
241    {
242        self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
243    }
244
245    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
246    /// using the configuration in `via` to set up the message transport.
247    ///
248    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
249    /// membership information. This is a common pattern in distributed systems for broadcasting data to
250    /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
251    /// target specific members, `broadcast` takes a stream of **only data elements** and sends
252    /// each element to all cluster members.
253    ///
254    /// # Non-Determinism
255    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
256    /// to the current cluster members _at that point in time_. Depending on when we are notified of
257    /// membership changes, we will broadcast each element to different members.
258    ///
259    /// # Example
260    /// ```rust
261    /// # #[cfg(feature = "deploy")] {
262    /// # use hydro_lang::prelude::*;
263    /// # use futures::StreamExt;
264    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
265    /// let p1 = flow.process::<()>();
266    /// let workers: Cluster<()> = flow.cluster::<()>();
267    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
268    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
269    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
270    /// // if there are 4 members in the cluster, each receives one element
271    /// // - MemberId::<()>(0): [123]
272    /// // - MemberId::<()>(1): [123]
273    /// // - MemberId::<()>(2): [123]
274    /// // - MemberId::<()>(3): [123]
275    /// # }, |mut stream| async move {
276    /// # let mut results = Vec::new();
277    /// # for w in 0..4 {
278    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
279    /// # }
280    /// # results.sort();
281    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
282    /// # }));
283    /// # }
284    /// ```
285    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
286        self,
287        to: &Cluster<'a, L2>,
288        via: N,
289        nondet_membership: NonDet,
290    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
291    where
292        T: Clone + Serialize + DeserializeOwned,
293    {
294        let ids = track_membership(self.location.source_cluster_members(to));
295        sliced! {
296            let members_snapshot = use(ids, nondet_membership);
297            let elements = use(self, nondet_membership);
298
299            let current_members = members_snapshot.filter(q!(|b| *b));
300            elements.repeat_with_keys(current_members)
301        }
302        .demux(to, via)
303    }
304
305    /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
306    /// serialization. The external process can receive these elements by establishing a TCP
307    /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
308    ///
309    /// # Example
310    /// ```rust
311    /// # #[cfg(feature = "deploy")] {
312    /// # use hydro_lang::prelude::*;
313    /// # use futures::StreamExt;
314    /// # tokio_test::block_on(async move {
315    /// let mut flow = FlowBuilder::new();
316    /// let process = flow.process::<()>();
317    /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
318    /// let external = flow.external::<()>();
319    /// let external_handle = numbers.send_bincode_external(&external);
320    ///
321    /// let mut deployment = hydro_deploy::Deployment::new();
322    /// let nodes = flow
323    ///     .with_process(&process, deployment.Localhost())
324    ///     .with_external(&external, deployment.Localhost())
325    ///     .deploy(&mut deployment);
326    ///
327    /// deployment.deploy().await.unwrap();
328    /// // establish the TCP connection
329    /// let mut external_recv_stream = nodes.connect(external_handle).await;
330    /// deployment.start().await.unwrap();
331    ///
332    /// for w in 1..=3 {
333    ///     assert_eq!(external_recv_stream.next().await, Some(w));
334    /// }
335    /// # });
336    /// # }
337    /// ```
338    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
339    where
340        T: Serialize + DeserializeOwned,
341    {
342        let serialize_pipeline = Some(serialize_bincode::<T>(false));
343
344        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
345
346        let external_port_id = flow_state_borrow.next_external_port.get_and_increment();
347
348        flow_state_borrow.push_root(HydroRoot::SendExternal {
349            to_external_key: other.key,
350            to_port_id: external_port_id,
351            to_many: false,
352            unpaired: true,
353            serialize_fn: serialize_pipeline.map(|e| e.into()),
354            instantiate_fn: DebugInstantiate::Building,
355            input: Box::new(self.ir_node.into_inner()),
356            op_metadata: HydroIrOpMetadata::new(),
357        });
358
359        ExternalBincodeStream {
360            process_key: other.key,
361            port_id: external_port_id,
362            _phantom: PhantomData,
363        }
364    }
365
366    #[cfg(feature = "sim")]
367    /// Sets up a simulation output port for this stream, allowing test code to receive elements
368    /// sent to this stream during simulation.
369    pub fn sim_output(self) -> SimReceiver<T, O, R>
370    where
371        T: Serialize + DeserializeOwned,
372    {
373        let external_location: External<'a, ()> = External {
374            key: LocationKey::FIRST,
375            flow_state: self.location.flow_state().clone(),
376            _phantom: PhantomData,
377        };
378
379        let external = self.send_bincode_external(&external_location);
380
381        SimReceiver(external.port_id, PhantomData)
382    }
383}
384
385impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
386    Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
387{
388    #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
389    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
390    /// using [`bincode`] to serialize/deserialize messages.
391    ///
392    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
393    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
394    /// this API allows precise targeting of specific cluster members rather than broadcasting to
395    /// all members.
396    ///
397    /// # Example
398    /// ```rust
399    /// # #[cfg(feature = "deploy")] {
400    /// # use hydro_lang::prelude::*;
401    /// # use futures::StreamExt;
402    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
403    /// let p1 = flow.process::<()>();
404    /// let workers: Cluster<()> = flow.cluster::<()>();
405    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
406    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
407    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
408    ///     .demux_bincode(&workers);
409    /// # on_worker.send_bincode(&p2).entries()
410    /// // if there are 4 members in the cluster, each receives one element
411    /// // - MemberId::<()>(0): [0]
412    /// // - MemberId::<()>(1): [1]
413    /// // - MemberId::<()>(2): [2]
414    /// // - MemberId::<()>(3): [3]
415    /// # }, |mut stream| async move {
416    /// # let mut results = Vec::new();
417    /// # for w in 0..4 {
418    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
419    /// # }
420    /// # results.sort();
421    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
422    /// # }));
423    /// # }
424    /// ```
425    pub fn demux_bincode(
426        self,
427        other: &Cluster<'a, L2>,
428    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
429    where
430        T: Serialize + DeserializeOwned,
431    {
432        self.demux(other, TCP.fail_stop().bincode())
433    }
434
435    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
436    /// using the configuration in `via` to set up the message transport.
437    ///
438    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
439    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
440    /// this API allows precise targeting of specific cluster members rather than broadcasting to
441    /// all members.
442    ///
443    /// # Example
444    /// ```rust
445    /// # #[cfg(feature = "deploy")] {
446    /// # use hydro_lang::prelude::*;
447    /// # use futures::StreamExt;
448    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
449    /// let p1 = flow.process::<()>();
450    /// let workers: Cluster<()> = flow.cluster::<()>();
451    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
452    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
453    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
454    ///     .demux(&workers, TCP.fail_stop().bincode());
455    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
456    /// // if there are 4 members in the cluster, each receives one element
457    /// // - MemberId::<()>(0): [0]
458    /// // - MemberId::<()>(1): [1]
459    /// // - MemberId::<()>(2): [2]
460    /// // - MemberId::<()>(3): [3]
461    /// # }, |mut stream| async move {
462    /// # let mut results = Vec::new();
463    /// # for w in 0..4 {
464    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
465    /// # }
466    /// # results.sort();
467    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
468    /// # }));
469    /// # }
470    /// ```
471    pub fn demux<N: NetworkFor<T>>(
472        self,
473        to: &Cluster<'a, L2>,
474        via: N,
475    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
476    where
477        T: Serialize + DeserializeOwned,
478    {
479        self.into_keyed().demux(to, via)
480    }
481}
482
483impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
484    #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
485    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
486    /// [`bincode`] to serialize/deserialize messages.
487    ///
488    /// This provides load balancing by evenly distributing work across cluster members. The
489    /// distribution is deterministic based on element order - the first element goes to member 0,
490    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
491    ///
492    /// # Non-Determinism
493    /// The set of cluster members may asynchronously change over time. Each element is distributed
494    /// based on the current cluster membership _at that point in time_. Depending on when cluster
495    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
496    /// membership is stable, the order of members in the round-robin pattern may change across runs.
497    ///
498    /// # Ordering Requirements
499    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
500    /// order of messages and retries affects the round-robin pattern.
501    ///
502    /// # Example
503    /// ```rust
504    /// # #[cfg(feature = "deploy")] {
505    /// # use hydro_lang::prelude::*;
506    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
507    /// # use futures::StreamExt;
508    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
509    /// let p1 = flow.process::<()>();
510    /// let workers: Cluster<()> = flow.cluster::<()>();
511    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
512    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
513    /// on_worker.send_bincode(&p2)
514    /// # .first().values() // we use first to assert that each member gets one element
515    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
516    /// // - MemberId::<()>(?): [1]
517    /// // - MemberId::<()>(?): [2]
518    /// // - MemberId::<()>(?): [3]
519    /// // - MemberId::<()>(?): [4]
520    /// # }, |mut stream| async move {
521    /// # let mut results = Vec::new();
522    /// # for w in 0..4 {
523    /// #     results.push(stream.next().await.unwrap());
524    /// # }
525    /// # results.sort();
526    /// # assert_eq!(results, vec![1, 2, 3, 4]);
527    /// # }));
528    /// # }
529    /// ```
530    pub fn round_robin_bincode<L2: 'a>(
531        self,
532        other: &Cluster<'a, L2>,
533        nondet_membership: NonDet,
534    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
535    where
536        T: Serialize + DeserializeOwned,
537    {
538        self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
539    }
540
541    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
542    /// the configuration in `via` to set up the message transport.
543    ///
544    /// This provides load balancing by evenly distributing work across cluster members. The
545    /// distribution is deterministic based on element order - the first element goes to member 0,
546    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
547    ///
548    /// # Non-Determinism
549    /// The set of cluster members may asynchronously change over time. Each element is distributed
550    /// based on the current cluster membership _at that point in time_. Depending on when cluster
551    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
552    /// membership is stable, the order of members in the round-robin pattern may change across runs.
553    ///
554    /// # Ordering Requirements
555    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
556    /// order of messages and retries affects the round-robin pattern.
557    ///
558    /// # Example
559    /// ```rust
560    /// # #[cfg(feature = "deploy")] {
561    /// # use hydro_lang::prelude::*;
562    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
563    /// # use futures::StreamExt;
564    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
565    /// let p1 = flow.process::<()>();
566    /// let workers: Cluster<()> = flow.cluster::<()>();
567    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
568    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
569    /// on_worker.send(&p2, TCP.fail_stop().bincode())
570    /// # .first().values() // we use first to assert that each member gets one element
571    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
572    /// // - MemberId::<()>(?): [1]
573    /// // - MemberId::<()>(?): [2]
574    /// // - MemberId::<()>(?): [3]
575    /// // - MemberId::<()>(?): [4]
576    /// # }, |mut stream| async move {
577    /// # let mut results = Vec::new();
578    /// # for w in 0..4 {
579    /// #     results.push(stream.next().await.unwrap());
580    /// # }
581    /// # results.sort();
582    /// # assert_eq!(results, vec![1, 2, 3, 4]);
583    /// # }));
584    /// # }
585    /// ```
586    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
587        self,
588        to: &Cluster<'a, L2>,
589        via: N,
590        nondet_membership: NonDet,
591    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
592    where
593        T: Serialize + DeserializeOwned,
594    {
595        let ids = track_membership(self.location.source_cluster_members(to));
596        sliced! {
597            let members_snapshot = use(ids, nondet_membership);
598            let elements = use(self.enumerate(), nondet_membership);
599
600            let current_members = members_snapshot
601                .filter(q!(|b| *b))
602                .keys()
603                .assume_ordering(nondet_membership)
604                .collect_vec();
605
606            elements
607                .cross_singleton(current_members)
608                .map(q!(|(data, members)| (
609                    members[data.0 % members.len()].clone(),
610                    data.1
611                )))
612        }
613        .demux(to, via)
614    }
615}
616
617impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
618    #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
619    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
620    /// [`bincode`] to serialize/deserialize messages.
621    ///
622    /// This provides load balancing by evenly distributing work across cluster members. The
623    /// distribution is deterministic based on element order - the first element goes to member 0,
624    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
625    ///
626    /// # Non-Determinism
627    /// The set of cluster members may asynchronously change over time. Each element is distributed
628    /// based on the current cluster membership _at that point in time_. Depending on when cluster
629    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
630    /// membership is stable, the order of members in the round-robin pattern may change across runs.
631    ///
632    /// # Ordering Requirements
633    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
634    /// order of messages and retries affects the round-robin pattern.
635    ///
636    /// # Example
637    /// ```rust
638    /// # #[cfg(feature = "deploy")] {
639    /// # use hydro_lang::prelude::*;
640    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
641    /// # use hydro_lang::location::MemberId;
642    /// # use futures::StreamExt;
643    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
644    /// let p1 = flow.process::<()>();
645    /// let workers1: Cluster<()> = flow.cluster::<()>();
646    /// let workers2: Cluster<()> = flow.cluster::<()>();
647    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
648    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
649    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
650    /// on_worker2.send_bincode(&p2)
651    /// # .entries()
652    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
653    /// # }, |mut stream| async move {
654    /// # let mut results = Vec::new();
655    /// # let mut locations = std::collections::HashSet::new();
656    /// # for w in 0..=16 {
657    /// #     let (location, v) = stream.next().await.unwrap();
658    /// #     locations.insert(location);
659    /// #     results.push(v);
660    /// # }
661    /// # results.sort();
662    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
663    /// # assert_eq!(locations.len(), 16);
664    /// # }));
665    /// # }
666    /// ```
667    pub fn round_robin_bincode<L2: 'a>(
668        self,
669        other: &Cluster<'a, L2>,
670        nondet_membership: NonDet,
671    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
672    where
673        T: Serialize + DeserializeOwned,
674    {
675        self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
676    }
677
678    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
679    /// the configuration in `via` to set up the message transport.
680    ///
681    /// This provides load balancing by evenly distributing work across cluster members. The
682    /// distribution is deterministic based on element order - the first element goes to member 0,
683    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
684    ///
685    /// # Non-Determinism
686    /// The set of cluster members may asynchronously change over time. Each element is distributed
687    /// based on the current cluster membership _at that point in time_. Depending on when cluster
688    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
689    /// membership is stable, the order of members in the round-robin pattern may change across runs.
690    ///
691    /// # Ordering Requirements
692    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
693    /// order of messages and retries affects the round-robin pattern.
694    ///
695    /// # Example
696    /// ```rust
697    /// # #[cfg(feature = "deploy")] {
698    /// # use hydro_lang::prelude::*;
699    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
700    /// # use hydro_lang::location::MemberId;
701    /// # use futures::StreamExt;
702    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
703    /// let p1 = flow.process::<()>();
704    /// let workers1: Cluster<()> = flow.cluster::<()>();
705    /// let workers2: Cluster<()> = flow.cluster::<()>();
706    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
707    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
708    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
709    /// on_worker2.send(&p2, TCP.fail_stop().bincode())
710    /// # .entries()
711    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
712    /// # }, |mut stream| async move {
713    /// # let mut results = Vec::new();
714    /// # let mut locations = std::collections::HashSet::new();
715    /// # for w in 0..=16 {
716    /// #     let (location, v) = stream.next().await.unwrap();
717    /// #     locations.insert(location);
718    /// #     results.push(v);
719    /// # }
720    /// # results.sort();
721    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
722    /// # assert_eq!(locations.len(), 16);
723    /// # }));
724    /// # }
725    /// ```
726    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
727        self,
728        to: &Cluster<'a, L2>,
729        via: N,
730        nondet_membership: NonDet,
731    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
732    where
733        T: Serialize + DeserializeOwned,
734    {
735        let ids = track_membership(self.location.source_cluster_members(to));
736        sliced! {
737            let members_snapshot = use(ids, nondet_membership);
738            let elements = use(self.enumerate(), nondet_membership);
739
740            let current_members = members_snapshot
741                .filter(q!(|b| *b))
742                .keys()
743                .assume_ordering(nondet_membership)
744                .collect_vec();
745
746            elements
747                .cross_singleton(current_members)
748                .map(q!(|(data, members)| (
749                    members[data.0 % members.len()].clone(),
750                    data.1
751                )))
752        }
753        .demux(to, via)
754    }
755}
756
757impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
758    #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
759    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
760    /// using [`bincode`] to serialize/deserialize messages.
761    ///
762    /// Each cluster member sends its local stream elements, and they are collected at the destination
763    /// as a [`KeyedStream`] where keys identify the source cluster member.
764    ///
765    /// # Example
766    /// ```rust
767    /// # #[cfg(feature = "deploy")] {
768    /// # use hydro_lang::prelude::*;
769    /// # use futures::StreamExt;
770    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
771    /// let workers: Cluster<()> = flow.cluster::<()>();
772    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
773    /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
774    /// # all_received.entries()
775    /// # }, |mut stream| async move {
776    /// // if there are 4 members in the cluster, we should receive 4 elements
777    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
778    /// # let mut results = Vec::new();
779    /// # for w in 0..4 {
780    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
781    /// # }
782    /// # results.sort();
783    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
784    /// # }));
785    /// # }
786    /// ```
787    ///
788    /// If you don't need to know the source for each element, you can use `.values()`
789    /// to get just the data:
790    /// ```rust
791    /// # #[cfg(feature = "deploy")] {
792    /// # use hydro_lang::prelude::*;
793    /// # use hydro_lang::live_collections::stream::NoOrder;
794    /// # use futures::StreamExt;
795    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
796    /// # let workers: Cluster<()> = flow.cluster::<()>();
797    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
798    /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
799    /// # values
800    /// # }, |mut stream| async move {
801    /// # let mut results = Vec::new();
802    /// # for w in 0..4 {
803    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
804    /// # }
805    /// # results.sort();
806    /// // if there are 4 members in the cluster, we should receive 4 elements
807    /// // 1, 1, 1, 1
808    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
809    /// # }));
810    /// # }
811    /// ```
812    pub fn send_bincode<L2>(
813        self,
814        other: &Process<'a, L2>,
815    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
816    where
817        T: Serialize + DeserializeOwned,
818    {
819        self.send(other, TCP.fail_stop().bincode())
820    }
821
822    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
823    /// using the configuration in `via` to set up the message transport.
824    ///
825    /// Each cluster member sends its local stream elements, and they are collected at the destination
826    /// as a [`KeyedStream`] where keys identify the source cluster member.
827    ///
828    /// # Example
829    /// ```rust
830    /// # #[cfg(feature = "deploy")] {
831    /// # use hydro_lang::prelude::*;
832    /// # use futures::StreamExt;
833    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
834    /// let workers: Cluster<()> = flow.cluster::<()>();
835    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
836    /// let all_received = numbers.send(&process, TCP.fail_stop().bincode()); // KeyedStream<MemberId<()>, i32, ...>
837    /// # all_received.entries()
838    /// # }, |mut stream| async move {
839    /// // if there are 4 members in the cluster, we should receive 4 elements
840    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
841    /// # let mut results = Vec::new();
842    /// # for w in 0..4 {
843    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
844    /// # }
845    /// # results.sort();
846    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
847    /// # }));
848    /// # }
849    /// ```
850    ///
851    /// If you don't need to know the source for each element, you can use `.values()`
852    /// to get just the data:
853    /// ```rust
854    /// # #[cfg(feature = "deploy")] {
855    /// # use hydro_lang::prelude::*;
856    /// # use hydro_lang::live_collections::stream::NoOrder;
857    /// # use futures::StreamExt;
858    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
859    /// # let workers: Cluster<()> = flow.cluster::<()>();
860    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
861    /// let values: Stream<i32, _, _, NoOrder> =
862    ///     numbers.send(&process, TCP.fail_stop().bincode()).values();
863    /// # values
864    /// # }, |mut stream| async move {
865    /// # let mut results = Vec::new();
866    /// # for w in 0..4 {
867    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
868    /// # }
869    /// # results.sort();
870    /// // if there are 4 members in the cluster, we should receive 4 elements
871    /// // 1, 1, 1, 1
872    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
873    /// # }));
874    /// # }
875    /// ```
876    pub fn send<L2, N: NetworkFor<T>>(
877        self,
878        to: &Process<'a, L2>,
879        via: N,
880    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
881    where
882        T: Serialize + DeserializeOwned,
883    {
884        let serialize_pipeline = Some(N::serialize_thunk(false));
885
886        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
887
888        let name = via.name();
889        if to.multiversioned() && name.is_none() {
890            panic!(
891                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
892            );
893        }
894
895        let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
896            to.clone(),
897            HydroNode::Network {
898                name: name.map(ToOwned::to_owned),
899                serialize_fn: serialize_pipeline.map(|e| e.into()),
900                instantiate_fn: DebugInstantiate::Building,
901                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
902                input: Box::new(self.ir_node.into_inner()),
903                metadata: to.new_node_metadata(Stream::<
904                    (MemberId<L>, T),
905                    Process<'a, L2>,
906                    Unbounded,
907                    O,
908                    R,
909                >::collection_kind()),
910            },
911        );
912
913        raw_stream.into_keyed()
914    }
915
916    #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
917    /// Broadcasts elements of this stream at each source member to all members of a destination
918    /// cluster, using [`bincode`] to serialize/deserialize messages.
919    ///
920    /// Each source member sends each of its stream elements to **every** member of the cluster
921    /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
922    /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
923    /// **only data elements** and sends each element to all cluster members.
924    ///
925    /// # Non-Determinism
926    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
927    /// to the current cluster members known _at that point in time_ at the source member. Depending
928    /// on when each source member is notified of membership changes, it will broadcast each element
929    /// to different members.
930    ///
931    /// # Example
932    /// ```rust
933    /// # #[cfg(feature = "deploy")] {
934    /// # use hydro_lang::prelude::*;
935    /// # use hydro_lang::location::MemberId;
936    /// # use futures::StreamExt;
937    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
938    /// # type Source = ();
939    /// # type Destination = ();
940    /// let source: Cluster<Source> = flow.cluster::<Source>();
941    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
942    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
943    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
944    /// # on_destination.entries().send_bincode(&p2).entries()
945    /// // if there are 4 members in the desination, each receives one element from each source member
946    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
947    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
948    /// // - ...
949    /// # }, |mut stream| async move {
950    /// # let mut results = Vec::new();
951    /// # for w in 0..16 {
952    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
953    /// # }
954    /// # results.sort();
955    /// # assert_eq!(results, vec![
956    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
957    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
958    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
959    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
960    /// # ]);
961    /// # }));
962    /// # }
963    /// ```
964    pub fn broadcast_bincode<L2: 'a>(
965        self,
966        other: &Cluster<'a, L2>,
967        nondet_membership: NonDet,
968    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
969    where
970        T: Clone + Serialize + DeserializeOwned,
971    {
972        self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
973    }
974
975    /// Broadcasts elements of this stream at each source member to all members of a destination
976    /// cluster, using the configuration in `via` to set up the message transport.
977    ///
978    /// Each source member sends each of its stream elements to **every** member of the cluster
979    /// based on its latest membership information. Unlike [`Stream::demux`], which requires
980    /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
981    /// **only data elements** and sends each element to all cluster members.
982    ///
983    /// # Non-Determinism
984    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
985    /// to the current cluster members known _at that point in time_ at the source member. Depending
986    /// on when each source member is notified of membership changes, it will broadcast each element
987    /// to different members.
988    ///
989    /// # Example
990    /// ```rust
991    /// # #[cfg(feature = "deploy")] {
992    /// # use hydro_lang::prelude::*;
993    /// # use hydro_lang::location::MemberId;
994    /// # use futures::StreamExt;
995    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
996    /// # type Source = ();
997    /// # type Destination = ();
998    /// let source: Cluster<Source> = flow.cluster::<Source>();
999    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1000    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1001    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
1002    /// # on_destination.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1003    /// // if there are 4 members in the desination, each receives one element from each source member
1004    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1005    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1006    /// // - ...
1007    /// # }, |mut stream| async move {
1008    /// # let mut results = Vec::new();
1009    /// # for w in 0..16 {
1010    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1011    /// # }
1012    /// # results.sort();
1013    /// # assert_eq!(results, vec![
1014    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1015    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1016    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1017    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1018    /// # ]);
1019    /// # }));
1020    /// # }
1021    /// ```
1022    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1023        self,
1024        to: &Cluster<'a, L2>,
1025        via: N,
1026        nondet_membership: NonDet,
1027    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1028    where
1029        T: Clone + Serialize + DeserializeOwned,
1030    {
1031        let ids = track_membership(self.location.source_cluster_members(to));
1032        sliced! {
1033            let members_snapshot = use(ids, nondet_membership);
1034            let elements = use(self, nondet_membership);
1035
1036            let current_members = members_snapshot.filter(q!(|b| *b));
1037            elements.repeat_with_keys(current_members)
1038        }
1039        .demux(to, via)
1040    }
1041}
1042
1043impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
1044    Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
1045{
1046    #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
1047    /// Sends elements of this stream at each source member to specific members of a destination
1048    /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1049    ///
1050    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1051    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1052    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1053    /// all members.
1054    ///
1055    /// Each cluster member sends its local stream elements, and they are collected at each
1056    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1057    ///
1058    /// # Example
1059    /// ```rust
1060    /// # #[cfg(feature = "deploy")] {
1061    /// # use hydro_lang::prelude::*;
1062    /// # use futures::StreamExt;
1063    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1064    /// # type Source = ();
1065    /// # type Destination = ();
1066    /// let source: Cluster<Source> = flow.cluster::<Source>();
1067    /// let to_send: Stream<_, Cluster<_>, _> = source
1068    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1069    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1070    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1071    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1072    /// # all_received.entries().send_bincode(&p2).entries()
1073    /// # }, |mut stream| async move {
1074    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1075    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1076    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1077    /// // - ...
1078    /// # let mut results = Vec::new();
1079    /// # for w in 0..16 {
1080    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1081    /// # }
1082    /// # results.sort();
1083    /// # assert_eq!(results, vec![
1084    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1085    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1086    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1087    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1088    /// # ]);
1089    /// # }));
1090    /// # }
1091    /// ```
1092    pub fn demux_bincode(
1093        self,
1094        other: &Cluster<'a, L2>,
1095    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1096    where
1097        T: Serialize + DeserializeOwned,
1098    {
1099        self.demux(other, TCP.fail_stop().bincode())
1100    }
1101
1102    /// Sends elements of this stream at each source member to specific members of a destination
1103    /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1104    /// message transport.
1105    ///
1106    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1107    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1108    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1109    /// all members.
1110    ///
1111    /// Each cluster member sends its local stream elements, and they are collected at each
1112    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1113    ///
1114    /// # Example
1115    /// ```rust
1116    /// # #[cfg(feature = "deploy")] {
1117    /// # use hydro_lang::prelude::*;
1118    /// # use futures::StreamExt;
1119    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1120    /// # type Source = ();
1121    /// # type Destination = ();
1122    /// let source: Cluster<Source> = flow.cluster::<Source>();
1123    /// let to_send: Stream<_, Cluster<_>, _> = source
1124    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1125    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1126    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1127    /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1128    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1129    /// # }, |mut stream| async move {
1130    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1131    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1132    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1133    /// // - ...
1134    /// # let mut results = Vec::new();
1135    /// # for w in 0..16 {
1136    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1137    /// # }
1138    /// # results.sort();
1139    /// # assert_eq!(results, vec![
1140    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1141    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1142    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1143    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1144    /// # ]);
1145    /// # }));
1146    /// # }
1147    /// ```
1148    pub fn demux<N: NetworkFor<T>>(
1149        self,
1150        to: &Cluster<'a, L2>,
1151        via: N,
1152    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1153    where
1154        T: Serialize + DeserializeOwned,
1155    {
1156        self.into_keyed().demux(to, via)
1157    }
1158}
1159
1160#[cfg(test)]
1161mod tests {
1162    #[cfg(feature = "sim")]
1163    use stageleft::q;
1164
1165    #[cfg(feature = "sim")]
1166    use crate::location::{Location, MemberId};
1167    #[cfg(feature = "sim")]
1168    use crate::networking::TCP;
1169    #[cfg(feature = "sim")]
1170    use crate::nondet::nondet;
1171    #[cfg(feature = "sim")]
1172    use crate::prelude::FlowBuilder;
1173
1174    #[cfg(feature = "sim")]
1175    #[test]
1176    fn sim_send_bincode_o2o() {
1177        use crate::networking::TCP;
1178
1179        let mut flow = FlowBuilder::new();
1180        let node = flow.process::<()>();
1181        let node2 = flow.process::<()>();
1182
1183        let (in_send, input) = node.sim_input();
1184
1185        let out_recv = input
1186            .send(&node2, TCP.fail_stop().bincode())
1187            .batch(&node2.tick(), nondet!(/** test */))
1188            .count()
1189            .all_ticks()
1190            .sim_output();
1191
1192        let instances = flow.sim().exhaustive(async || {
1193            in_send.send(());
1194            in_send.send(());
1195            in_send.send(());
1196
1197            let received = out_recv.collect::<Vec<_>>().await;
1198            assert!(received.into_iter().sum::<usize>() == 3);
1199        });
1200
1201        assert_eq!(instances, 4); // 2^{3 - 1}
1202    }
1203
1204    #[cfg(feature = "sim")]
1205    #[test]
1206    fn sim_send_bincode_m2o() {
1207        let mut flow = FlowBuilder::new();
1208        let cluster = flow.cluster::<()>();
1209        let node = flow.process::<()>();
1210
1211        let input = cluster.source_iter(q!(vec![1]));
1212
1213        let out_recv = input
1214            .send(&node, TCP.fail_stop().bincode())
1215            .entries()
1216            .batch(&node.tick(), nondet!(/** test */))
1217            .all_ticks()
1218            .sim_output();
1219
1220        let instances = flow
1221            .sim()
1222            .with_cluster_size(&cluster, 4)
1223            .exhaustive(async || {
1224                out_recv
1225                    .assert_yields_only_unordered(vec![
1226                        (MemberId::from_raw_id(0), 1),
1227                        (MemberId::from_raw_id(1), 1),
1228                        (MemberId::from_raw_id(2), 1),
1229                        (MemberId::from_raw_id(3), 1),
1230                    ])
1231                    .await
1232            });
1233
1234        assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1235    }
1236
1237    #[cfg(feature = "sim")]
1238    #[test]
1239    fn sim_send_bincode_multiple_m2o() {
1240        let mut flow = FlowBuilder::new();
1241        let cluster1 = flow.cluster::<()>();
1242        let cluster2 = flow.cluster::<()>();
1243        let node = flow.process::<()>();
1244
1245        let out_recv_1 = cluster1
1246            .source_iter(q!(vec![1]))
1247            .send(&node, TCP.fail_stop().bincode())
1248            .entries()
1249            .sim_output();
1250
1251        let out_recv_2 = cluster2
1252            .source_iter(q!(vec![2]))
1253            .send(&node, TCP.fail_stop().bincode())
1254            .entries()
1255            .sim_output();
1256
1257        let instances = flow
1258            .sim()
1259            .with_cluster_size(&cluster1, 3)
1260            .with_cluster_size(&cluster2, 4)
1261            .exhaustive(async || {
1262                out_recv_1
1263                    .assert_yields_only_unordered(vec![
1264                        (MemberId::from_raw_id(0), 1),
1265                        (MemberId::from_raw_id(1), 1),
1266                        (MemberId::from_raw_id(2), 1),
1267                    ])
1268                    .await;
1269
1270                out_recv_2
1271                    .assert_yields_only_unordered(vec![
1272                        (MemberId::from_raw_id(0), 2),
1273                        (MemberId::from_raw_id(1), 2),
1274                        (MemberId::from_raw_id(2), 2),
1275                        (MemberId::from_raw_id(3), 2),
1276                    ])
1277                    .await;
1278            });
1279
1280        assert_eq!(instances, 1);
1281    }
1282
1283    #[cfg(feature = "sim")]
1284    #[test]
1285    fn sim_send_bincode_o2m() {
1286        let mut flow = FlowBuilder::new();
1287        let cluster = flow.cluster::<()>();
1288        let node = flow.process::<()>();
1289
1290        let input = node.source_iter(q!(vec![
1291            (MemberId::from_raw_id(0), 123),
1292            (MemberId::from_raw_id(1), 456),
1293        ]));
1294
1295        let out_recv = input
1296            .demux(&cluster, TCP.fail_stop().bincode())
1297            .map(q!(|x| x + 1))
1298            .send(&node, TCP.fail_stop().bincode())
1299            .entries()
1300            .sim_output();
1301
1302        flow.sim()
1303            .with_cluster_size(&cluster, 4)
1304            .exhaustive(async || {
1305                out_recv
1306                    .assert_yields_only_unordered(vec![
1307                        (MemberId::from_raw_id(0), 124),
1308                        (MemberId::from_raw_id(1), 457),
1309                    ])
1310                    .await
1311            });
1312    }
1313
1314    #[cfg(feature = "sim")]
1315    #[test]
1316    fn sim_broadcast_bincode_o2m() {
1317        let mut flow = FlowBuilder::new();
1318        let cluster = flow.cluster::<()>();
1319        let node = flow.process::<()>();
1320
1321        let input = node.source_iter(q!(vec![123, 456]));
1322
1323        let out_recv = input
1324            .broadcast(&cluster, TCP.fail_stop().bincode(), nondet!(/** test */))
1325            .map(q!(|x| x + 1))
1326            .send(&node, TCP.fail_stop().bincode())
1327            .entries()
1328            .sim_output();
1329
1330        let mut c_1_produced = false;
1331        let mut c_2_produced = false;
1332
1333        flow.sim()
1334            .with_cluster_size(&cluster, 2)
1335            .exhaustive(async || {
1336                let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1337
1338                // check that order is preserved
1339                if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1340                    assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1341                    c_1_produced = true;
1342                }
1343
1344                if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1345                    assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1346                    c_2_produced = true;
1347                }
1348            });
1349
1350        assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1351    }
1352
1353    #[cfg(feature = "sim")]
1354    #[test]
1355    fn sim_send_bincode_m2m() {
1356        let mut flow = FlowBuilder::new();
1357        let cluster = flow.cluster::<()>();
1358        let node = flow.process::<()>();
1359
1360        let input = node.source_iter(q!(vec![
1361            (MemberId::from_raw_id(0), 123),
1362            (MemberId::from_raw_id(1), 456),
1363        ]));
1364
1365        let out_recv = input
1366            .demux(&cluster, TCP.fail_stop().bincode())
1367            .map(q!(|x| x + 1))
1368            .flat_map_ordered(q!(|x| vec![
1369                (MemberId::from_raw_id(0), x),
1370                (MemberId::from_raw_id(1), x),
1371            ]))
1372            .demux(&cluster, TCP.fail_stop().bincode())
1373            .entries()
1374            .send(&node, TCP.fail_stop().bincode())
1375            .entries()
1376            .sim_output();
1377
1378        flow.sim()
1379            .with_cluster_size(&cluster, 4)
1380            .exhaustive(async || {
1381                out_recv
1382                    .assert_yields_only_unordered(vec![
1383                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1384                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1385                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1386                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1387                    ])
1388                    .await
1389            });
1390    }
1391}