Skip to main content

hydro_lang/live_collections/stream/
networking.rs

1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, MinOrder, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(feature = "sim")]
18use crate::location::LocationKey;
19use crate::location::cluster::NoConsistency;
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::DynLocation;
22use crate::location::external_process::ExternalBincodeStream;
23use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, Process};
24use crate::networking::{NetworkFor, TCP};
25use crate::nondet::{NonDet, nondet};
26#[cfg(feature = "sim")]
27use crate::sim::SimReceiver;
28use crate::staging_util::get_this_crate;
29
30// same as the one in `hydro_std`, but internal use only
31fn track_membership<'a, C, L: Location<'a>>(
32    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
33) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
34    membership.fold(
35        q!(|| false),
36        q!(|present, event| {
37            match event {
38                MembershipEvent::Joined => *present = true,
39                MembershipEvent::Left => *present = false,
40            }
41        }),
42    )
43}
44
45fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
46    let root = get_this_crate();
47
48    if is_demux {
49        parse_quote! {
50            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
51                |(id, data)| {
52                    (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
53                }
54            )
55        }
56    } else {
57        parse_quote! {
58            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
59                |data| {
60                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
61                }
62            )
63        }
64    }
65}
66
67pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
68    serialize_bincode_with_type(is_demux, &quote_type::<T>())
69}
70
71fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
72    let root = get_this_crate();
73    if let Some(c_type) = tagged {
74        parse_quote! {
75            |res| {
76                let (id, b) = res.unwrap();
77                (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
78            }
79        }
80    } else {
81        parse_quote! {
82            |res| {
83                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
84            }
85        }
86    }
87}
88
89pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
90    deserialize_bincode_with_type(tagged, &quote_type::<T>())
91}
92
93impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
94    #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
95    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
96    /// using [`bincode`] to serialize/deserialize messages.
97    ///
98    /// The returned stream captures the elements received at the destination, where values will
99    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
100    /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
101    /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
102    /// dropped no further messages will be sent.
103    ///
104    /// # Example
105    /// ```rust
106    /// # #[cfg(feature = "deploy")] {
107    /// # use hydro_lang::prelude::*;
108    /// # use futures::StreamExt;
109    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
110    /// let p1 = flow.process::<()>();
111    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
112    /// let p2 = flow.process::<()>();
113    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
114    /// // 1, 2, 3
115    /// # on_p2.send_bincode(&p_out)
116    /// # }, |mut stream| async move {
117    /// # for w in 1..=3 {
118    /// #     assert_eq!(stream.next().await, Some(w));
119    /// # }
120    /// # }));
121    /// # }
122    /// ```
123    pub fn send_bincode<L2>(
124        self,
125        other: &Process<'a, L2>,
126    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
127    where
128        T: Serialize + DeserializeOwned,
129    {
130        self.send(other, TCP.fail_stop().bincode())
131    }
132
133    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
134    /// using the configuration in `via` to set up the message transport.
135    ///
136    /// The returned stream captures the elements received at the destination, where values will
137    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
138    /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
139    /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
140    /// dropped no further messages will be sent.
141    ///
142    /// # Example
143    /// ```rust
144    /// # #[cfg(feature = "deploy")] {
145    /// # use hydro_lang::prelude::*;
146    /// # use futures::StreamExt;
147    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
148    /// let p1 = flow.process::<()>();
149    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
150    /// let p2 = flow.process::<()>();
151    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.fail_stop().bincode());
152    /// // 1, 2, 3
153    /// # on_p2.send(&p_out, TCP.fail_stop().bincode())
154    /// # }, |mut stream| async move {
155    /// # for w in 1..=3 {
156    /// #     assert_eq!(stream.next().await, Some(w));
157    /// # }
158    /// # }));
159    /// # }
160    /// ```
161    pub fn send<L2, N: NetworkFor<T>>(
162        self,
163        to: &Process<'a, L2>,
164        via: N,
165    ) -> Stream<T, Process<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
166    where
167        T: Serialize + DeserializeOwned,
168        O: MinOrder<N::OrderingGuarantee>,
169    {
170        let serialize_pipeline = Some(N::serialize_thunk(false));
171        let deserialize_pipeline = Some(N::deserialize_thunk(None));
172
173        let name = via.name();
174        if to.multiversioned() && name.is_none() {
175            panic!(
176                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
177            );
178        }
179
180        Stream::new(
181            to.clone(),
182            HydroNode::Network {
183                name: name.map(ToOwned::to_owned),
184                networking_info: N::networking_info(),
185                serialize_fn: serialize_pipeline.map(|e| e.into()),
186                instantiate_fn: DebugInstantiate::Building,
187                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
188                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
189                metadata: to.new_node_metadata(Stream::<
190                    T,
191                    Process<'a, L2>,
192                    Unbounded,
193                    <O as MinOrder<N::OrderingGuarantee>>::Min,
194                    R,
195                >::collection_kind()),
196            },
197        )
198    }
199
200    #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
201    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
202    /// using [`bincode`] to serialize/deserialize messages.
203    ///
204    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
205    /// membership information. This is a common pattern in distributed systems for broadcasting data to
206    /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
207    /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
208    /// each element to all cluster members.
209    ///
210    /// # Non-Determinism
211    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
212    /// to the current cluster members _at that point in time_. Depending on when we are notified of
213    /// membership changes, we will broadcast each element to different members.
214    ///
215    /// # Example
216    /// ```rust
217    /// # #[cfg(feature = "deploy")] {
218    /// # use hydro_lang::prelude::*;
219    /// # use futures::StreamExt;
220    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
221    /// let p1 = flow.process::<()>();
222    /// let workers: Cluster<()> = flow.cluster::<()>();
223    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
224    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
225    /// # on_worker.send_bincode(&p2).entries()
226    /// // if there are 4 members in the cluster, each receives one element
227    /// // - MemberId::<()>(0): [123]
228    /// // - MemberId::<()>(1): [123]
229    /// // - MemberId::<()>(2): [123]
230    /// // - MemberId::<()>(3): [123]
231    /// # }, |mut stream| async move {
232    /// # let mut results = Vec::new();
233    /// # for w in 0..4 {
234    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
235    /// # }
236    /// # results.sort();
237    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
238    /// # }));
239    /// # }
240    /// ```
241    pub fn broadcast_bincode<L2: 'a>(
242        self,
243        other: &Cluster<'a, L2>,
244        nondet_membership: NonDet,
245    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
246    where
247        T: Clone + Serialize + DeserializeOwned,
248    {
249        self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
250    }
251
252    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
253    /// using the configuration in `via` to set up the message transport.
254    ///
255    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
256    /// membership information. This is a common pattern in distributed systems for broadcasting data to
257    /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
258    /// target specific members, `broadcast` takes a stream of **only data elements** and sends
259    /// each element to all cluster members.
260    ///
261    /// # Non-Determinism
262    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
263    /// to the current cluster members _at that point in time_. Depending on when we are notified of
264    /// membership changes, we will broadcast each element to different members.
265    ///
266    /// # Example
267    /// ```rust
268    /// # #[cfg(feature = "deploy")] {
269    /// # use hydro_lang::prelude::*;
270    /// # use futures::StreamExt;
271    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
272    /// let p1 = flow.process::<()>();
273    /// let workers: Cluster<()> = flow.cluster::<()>();
274    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
275    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
276    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
277    /// // if there are 4 members in the cluster, each receives one element
278    /// // - MemberId::<()>(0): [123]
279    /// // - MemberId::<()>(1): [123]
280    /// // - MemberId::<()>(2): [123]
281    /// // - MemberId::<()>(3): [123]
282    /// # }, |mut stream| async move {
283    /// # let mut results = Vec::new();
284    /// # for w in 0..4 {
285    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
286    /// # }
287    /// # results.sort();
288    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
289    /// # }));
290    /// # }
291    /// ```
292    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
293        self,
294        to: &Cluster<'a, L2>,
295        via: N,
296        nondet_membership: NonDet,
297    ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
298    where
299        T: Clone + Serialize + DeserializeOwned,
300        O: MinOrder<N::OrderingGuarantee>,
301    {
302        let ids = track_membership(self.location.source_cluster_membership_stream(
303            to,
304            nondet!(/** droppped prefixes don't affect broadcast */),
305        ));
306        sliced! {
307            let members_snapshot = use(ids, nondet_membership);
308            let elements = use(self, nondet_membership);
309
310            let current_members = members_snapshot.filter(q!(|b| *b));
311            elements.repeat_with_keys(current_members)
312        }
313        .demux(to, via)
314    }
315
316    /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
317    /// serialization. The external process can receive these elements by establishing a TCP
318    /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
319    ///
320    /// # Example
321    /// ```rust
322    /// # #[cfg(feature = "deploy")] {
323    /// # use hydro_lang::prelude::*;
324    /// # use futures::StreamExt;
325    /// # tokio_test::block_on(async move {
326    /// let mut flow = FlowBuilder::new();
327    /// let process = flow.process::<()>();
328    /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
329    /// let external = flow.external::<()>();
330    /// let external_handle = numbers.send_bincode_external(&external);
331    ///
332    /// let mut deployment = hydro_deploy::Deployment::new();
333    /// let nodes = flow
334    ///     .with_process(&process, deployment.Localhost())
335    ///     .with_external(&external, deployment.Localhost())
336    ///     .deploy(&mut deployment);
337    ///
338    /// deployment.deploy().await.unwrap();
339    /// // establish the TCP connection
340    /// let mut external_recv_stream = nodes.connect(external_handle).await;
341    /// deployment.start().await.unwrap();
342    ///
343    /// for w in 1..=3 {
344    ///     assert_eq!(external_recv_stream.next().await, Some(w));
345    /// }
346    /// # });
347    /// # }
348    /// ```
349    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
350    where
351        T: Serialize + DeserializeOwned,
352    {
353        let serialize_pipeline = Some(serialize_bincode::<T>(false));
354
355        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
356
357        let external_port_id = flow_state_borrow.next_external_port();
358
359        flow_state_borrow.push_root(HydroRoot::SendExternal {
360            to_external_key: other.key,
361            to_port_id: external_port_id,
362            to_many: false,
363            unpaired: true,
364            serialize_fn: serialize_pipeline.map(|e| e.into()),
365            instantiate_fn: DebugInstantiate::Building,
366            input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
367            op_metadata: HydroIrOpMetadata::new(),
368        });
369
370        ExternalBincodeStream {
371            process_key: other.key,
372            port_id: external_port_id,
373            _phantom: PhantomData,
374        }
375    }
376
377    #[cfg(feature = "sim")]
378    /// Sets up a simulation output port for this stream, allowing test code to receive elements
379    /// sent to this stream during simulation.
380    pub fn sim_output(self) -> SimReceiver<T, O, R>
381    where
382        T: Serialize + DeserializeOwned,
383    {
384        let external_location: External<'a, ()> = External {
385            key: LocationKey::FIRST,
386            flow_state: self.location.flow_state().clone(),
387            _phantom: PhantomData,
388        };
389
390        let external = self.send_bincode_external(&external_location);
391
392        SimReceiver(external.port_id, PhantomData)
393    }
394}
395
396impl<'a, T, L: Location<'a>, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce> {
397    /// Creates an external output for embedded deployment mode.
398    ///
399    /// The `name` parameter specifies the name of the field in the generated
400    /// `EmbeddedOutputs` struct that will receive elements from this stream.
401    /// The generated function will accept an `EmbeddedOutputs` struct with an
402    /// `impl FnMut(T)` field with this name.
403    pub fn embedded_output(self, name: impl Into<String>) {
404        let ident = syn::Ident::new(&name.into(), proc_macro2::Span::call_site());
405
406        self.location
407            .flow_state()
408            .borrow_mut()
409            .push_root(HydroRoot::EmbeddedOutput {
410                ident,
411                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
412                op_metadata: HydroIrOpMetadata::new(),
413            });
414    }
415}
416
417impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
418    Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
419{
420    #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
421    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
422    /// using [`bincode`] to serialize/deserialize messages.
423    ///
424    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
425    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
426    /// this API allows precise targeting of specific cluster members rather than broadcasting to
427    /// all members.
428    ///
429    /// # Example
430    /// ```rust
431    /// # #[cfg(feature = "deploy")] {
432    /// # use hydro_lang::prelude::*;
433    /// # use futures::StreamExt;
434    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
435    /// let p1 = flow.process::<()>();
436    /// let workers: Cluster<()> = flow.cluster::<()>();
437    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
438    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
439    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
440    ///     .demux_bincode(&workers);
441    /// # on_worker.send_bincode(&p2).entries()
442    /// // if there are 4 members in the cluster, each receives one element
443    /// // - MemberId::<()>(0): [0]
444    /// // - MemberId::<()>(1): [1]
445    /// // - MemberId::<()>(2): [2]
446    /// // - MemberId::<()>(3): [3]
447    /// # }, |mut stream| async move {
448    /// # let mut results = Vec::new();
449    /// # for w in 0..4 {
450    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
451    /// # }
452    /// # results.sort();
453    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
454    /// # }));
455    /// # }
456    /// ```
457    pub fn demux_bincode(
458        self,
459        other: &Cluster<'a, L2>,
460    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
461    where
462        T: Serialize + DeserializeOwned,
463    {
464        self.demux(other, TCP.fail_stop().bincode())
465    }
466
467    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
468    /// using the configuration in `via` to set up the message transport.
469    ///
470    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
471    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
472    /// this API allows precise targeting of specific cluster members rather than broadcasting to
473    /// all members.
474    ///
475    /// # Example
476    /// ```rust
477    /// # #[cfg(feature = "deploy")] {
478    /// # use hydro_lang::prelude::*;
479    /// # use futures::StreamExt;
480    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
481    /// let p1 = flow.process::<()>();
482    /// let workers: Cluster<()> = flow.cluster::<()>();
483    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
484    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
485    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
486    ///     .demux(&workers, TCP.fail_stop().bincode());
487    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
488    /// // if there are 4 members in the cluster, each receives one element
489    /// // - MemberId::<()>(0): [0]
490    /// // - MemberId::<()>(1): [1]
491    /// // - MemberId::<()>(2): [2]
492    /// // - MemberId::<()>(3): [3]
493    /// # }, |mut stream| async move {
494    /// # let mut results = Vec::new();
495    /// # for w in 0..4 {
496    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
497    /// # }
498    /// # results.sort();
499    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
500    /// # }));
501    /// # }
502    /// ```
503    #[expect(clippy::type_complexity, reason = "NoConsistency type")]
504    pub fn demux<N: NetworkFor<T>>(
505        self,
506        to: &Cluster<'a, L2>,
507        via: N,
508    ) -> Stream<
509        T,
510        Cluster<'a, L2, NoConsistency>,
511        Unbounded,
512        <O as MinOrder<N::OrderingGuarantee>>::Min,
513        R,
514    >
515    where
516        T: Serialize + DeserializeOwned,
517        O: MinOrder<N::OrderingGuarantee>,
518    {
519        self.into_keyed().demux(to, via)
520    }
521}
522
523impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
524    #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
525    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
526    /// [`bincode`] to serialize/deserialize messages.
527    ///
528    /// This provides load balancing by evenly distributing work across cluster members. The
529    /// distribution is deterministic based on element order - the first element goes to member 0,
530    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
531    ///
532    /// # Non-Determinism
533    /// The set of cluster members may asynchronously change over time. Each element is distributed
534    /// based on the current cluster membership _at that point in time_. Depending on when cluster
535    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
536    /// membership is stable, the order of members in the round-robin pattern may change across runs.
537    ///
538    /// # Ordering Requirements
539    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
540    /// order of messages and retries affects the round-robin pattern.
541    ///
542    /// # Example
543    /// ```rust
544    /// # #[cfg(feature = "deploy")] {
545    /// # use hydro_lang::prelude::*;
546    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
547    /// # use futures::StreamExt;
548    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
549    /// let p1 = flow.process::<()>();
550    /// let workers: Cluster<()> = flow.cluster::<()>();
551    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
552    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
553    /// on_worker.send_bincode(&p2)
554    /// # .first().values() // we use first to assert that each member gets one element
555    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
556    /// // - MemberId::<()>(?): [1]
557    /// // - MemberId::<()>(?): [2]
558    /// // - MemberId::<()>(?): [3]
559    /// // - MemberId::<()>(?): [4]
560    /// # }, |mut stream| async move {
561    /// # let mut results = Vec::new();
562    /// # for w in 0..4 {
563    /// #     results.push(stream.next().await.unwrap());
564    /// # }
565    /// # results.sort();
566    /// # assert_eq!(results, vec![1, 2, 3, 4]);
567    /// # }));
568    /// # }
569    /// ```
570    pub fn round_robin_bincode<L2: 'a>(
571        self,
572        other: &Cluster<'a, L2>,
573        nondet_membership: NonDet,
574    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
575    where
576        T: Serialize + DeserializeOwned,
577    {
578        self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
579    }
580
581    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
582    /// the configuration in `via` to set up the message transport.
583    ///
584    /// This provides load balancing by evenly distributing work across cluster members. The
585    /// distribution is deterministic based on element order - the first element goes to member 0,
586    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
587    ///
588    /// # Non-Determinism
589    /// The set of cluster members may asynchronously change over time. Each element is distributed
590    /// based on the current cluster membership _at that point in time_. Depending on when cluster
591    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
592    /// membership is stable, the order of members in the round-robin pattern may change across runs.
593    ///
594    /// # Ordering Requirements
595    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
596    /// order of messages and retries affects the round-robin pattern.
597    ///
598    /// # Example
599    /// ```rust
600    /// # #[cfg(feature = "deploy")] {
601    /// # use hydro_lang::prelude::*;
602    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
603    /// # use futures::StreamExt;
604    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
605    /// let p1 = flow.process::<()>();
606    /// let workers: Cluster<()> = flow.cluster::<()>();
607    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
608    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
609    /// on_worker.send(&p2, TCP.fail_stop().bincode())
610    /// # .first().values() // we use first to assert that each member gets one element
611    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
612    /// // - MemberId::<()>(?): [1]
613    /// // - MemberId::<()>(?): [2]
614    /// // - MemberId::<()>(?): [3]
615    /// // - MemberId::<()>(?): [4]
616    /// # }, |mut stream| async move {
617    /// # let mut results = Vec::new();
618    /// # for w in 0..4 {
619    /// #     results.push(stream.next().await.unwrap());
620    /// # }
621    /// # results.sort();
622    /// # assert_eq!(results, vec![1, 2, 3, 4]);
623    /// # }));
624    /// # }
625    /// ```
626    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
627        self,
628        to: &Cluster<'a, L2>,
629        via: N,
630        nondet_membership: NonDet,
631    ) -> Stream<T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
632    where
633        T: Serialize + DeserializeOwned,
634    {
635        let ids = track_membership(self.location.source_cluster_membership_stream(
636            to,
637            nondet!(/** droppped prefixes don't affect broadcast */),
638        ));
639        sliced! {
640            let members_snapshot = use(ids, nondet_membership);
641            let elements = use(self.enumerate(), nondet_membership);
642
643            let current_members = members_snapshot
644                .filter(q!(|b| *b))
645                .keys()
646                .assume_ordering::<TotalOrder>(nondet_membership)
647                .collect_vec();
648
649            elements
650                .cross_singleton(current_members)
651                .filter_map(q!(|(data, members)| {
652                    if members.is_empty() {
653                        None
654                    } else {
655                        Some((members[data.0 % members.len()].clone(), data.1))
656                    }
657                }))
658        }
659        .demux(to, via)
660    }
661}
662
663impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
664    #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
665    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
666    /// [`bincode`] to serialize/deserialize messages.
667    ///
668    /// This provides load balancing by evenly distributing work across cluster members. The
669    /// distribution is deterministic based on element order - the first element goes to member 0,
670    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
671    ///
672    /// # Non-Determinism
673    /// The set of cluster members may asynchronously change over time. Each element is distributed
674    /// based on the current cluster membership _at that point in time_. Depending on when cluster
675    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
676    /// membership is stable, the order of members in the round-robin pattern may change across runs.
677    ///
678    /// # Ordering Requirements
679    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
680    /// order of messages and retries affects the round-robin pattern.
681    ///
682    /// # Example
683    /// ```rust
684    /// # #[cfg(feature = "deploy")] {
685    /// # use hydro_lang::prelude::*;
686    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
687    /// # use hydro_lang::location::MemberId;
688    /// # use futures::StreamExt;
689    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
690    /// let p1 = flow.process::<()>();
691    /// let workers1: Cluster<()> = flow.cluster::<()>();
692    /// let workers2: Cluster<()> = flow.cluster::<()>();
693    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
694    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
695    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
696    /// on_worker2.send_bincode(&p2)
697    /// # .entries()
698    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
699    /// # }, |mut stream| async move {
700    /// # let mut results = Vec::new();
701    /// # let mut locations = std::collections::HashSet::new();
702    /// # for w in 0..=16 {
703    /// #     let (location, v) = stream.next().await.unwrap();
704    /// #     locations.insert(location);
705    /// #     results.push(v);
706    /// # }
707    /// # results.sort();
708    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
709    /// # assert_eq!(locations.len(), 16);
710    /// # }));
711    /// # }
712    /// ```
713    pub fn round_robin_bincode<L2: 'a>(
714        self,
715        other: &Cluster<'a, L2>,
716        nondet_membership: NonDet,
717    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
718    where
719        T: Serialize + DeserializeOwned,
720    {
721        self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
722    }
723
724    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
725    /// the configuration in `via` to set up the message transport.
726    ///
727    /// This provides load balancing by evenly distributing work across cluster members. The
728    /// distribution is deterministic based on element order - the first element goes to member 0,
729    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
730    ///
731    /// # Non-Determinism
732    /// The set of cluster members may asynchronously change over time. Each element is distributed
733    /// based on the current cluster membership _at that point in time_. Depending on when cluster
734    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
735    /// membership is stable, the order of members in the round-robin pattern may change across runs.
736    ///
737    /// # Ordering Requirements
738    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
739    /// order of messages and retries affects the round-robin pattern.
740    ///
741    /// # Example
742    /// ```rust
743    /// # #[cfg(feature = "deploy")] {
744    /// # use hydro_lang::prelude::*;
745    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
746    /// # use hydro_lang::location::MemberId;
747    /// # use futures::StreamExt;
748    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
749    /// let p1 = flow.process::<()>();
750    /// let workers1: Cluster<()> = flow.cluster::<()>();
751    /// let workers2: Cluster<()> = flow.cluster::<()>();
752    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
753    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
754    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
755    /// on_worker2.send(&p2, TCP.fail_stop().bincode())
756    /// # .entries()
757    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
758    /// # }, |mut stream| async move {
759    /// # let mut results = Vec::new();
760    /// # let mut locations = std::collections::HashSet::new();
761    /// # for w in 0..=16 {
762    /// #     let (location, v) = stream.next().await.unwrap();
763    /// #     locations.insert(location);
764    /// #     results.push(v);
765    /// # }
766    /// # results.sort();
767    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
768    /// # assert_eq!(locations.len(), 16);
769    /// # }));
770    /// # }
771    /// ```
772    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
773        self,
774        to: &Cluster<'a, L2>,
775        via: N,
776        nondet_membership: NonDet,
777    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
778    where
779        T: Serialize + DeserializeOwned,
780    {
781        let ids = track_membership(self.location.source_cluster_membership_stream(
782            to,
783            nondet!(/** droppped prefixes don't affect broadcast */),
784        ));
785        sliced! {
786            let members_snapshot = use(ids, nondet_membership);
787            let elements = use(self.enumerate(), nondet_membership);
788
789            let current_members = members_snapshot
790                .filter(q!(|b| *b))
791                .keys()
792                .assume_ordering::<TotalOrder>(nondet_membership)
793                .collect_vec();
794
795            elements
796                .cross_singleton(current_members)
797                .filter_map(q!(|(data, members)| {
798                    if members.is_empty() {
799                        None
800                    } else {
801                        Some((members[data.0 % members.len()].clone(), data.1))
802                    }
803                }))
804        }
805        .demux(to, via)
806    }
807}
808
809impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
810    #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
811    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
812    /// using [`bincode`] to serialize/deserialize messages.
813    ///
814    /// Each cluster member sends its local stream elements, and they are collected at the destination
815    /// as a [`KeyedStream`] where keys identify the source cluster member.
816    ///
817    /// # Example
818    /// ```rust
819    /// # #[cfg(feature = "deploy")] {
820    /// # use hydro_lang::prelude::*;
821    /// # use futures::StreamExt;
822    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
823    /// let workers: Cluster<()> = flow.cluster::<()>();
824    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
825    /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
826    /// # all_received.entries()
827    /// # }, |mut stream| async move {
828    /// // if there are 4 members in the cluster, we should receive 4 elements
829    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
830    /// # let mut results = Vec::new();
831    /// # for w in 0..4 {
832    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
833    /// # }
834    /// # results.sort();
835    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
836    /// # }));
837    /// # }
838    /// ```
839    ///
840    /// If you don't need to know the source for each element, you can use `.values()`
841    /// to get just the data:
842    /// ```rust
843    /// # #[cfg(feature = "deploy")] {
844    /// # use hydro_lang::prelude::*;
845    /// # use hydro_lang::live_collections::stream::NoOrder;
846    /// # use futures::StreamExt;
847    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
848    /// # let workers: Cluster<()> = flow.cluster::<()>();
849    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
850    /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
851    /// # values
852    /// # }, |mut stream| async move {
853    /// # let mut results = Vec::new();
854    /// # for w in 0..4 {
855    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
856    /// # }
857    /// # results.sort();
858    /// // if there are 4 members in the cluster, we should receive 4 elements
859    /// // 1, 1, 1, 1
860    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
861    /// # }));
862    /// # }
863    /// ```
864    pub fn send_bincode<L2>(
865        self,
866        other: &Process<'a, L2>,
867    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
868    where
869        T: Serialize + DeserializeOwned,
870    {
871        self.send(other, TCP.fail_stop().bincode())
872    }
873
874    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
875    /// using the configuration in `via` to set up the message transport.
876    ///
877    /// Each cluster member sends its local stream elements, and they are collected at the destination
878    /// as a [`KeyedStream`] where keys identify the source cluster member.
879    ///
880    /// # Example
881    /// ```rust
882    /// # #[cfg(feature = "deploy")] {
883    /// # use hydro_lang::prelude::*;
884    /// # use futures::StreamExt;
885    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
886    /// let workers: Cluster<()> = flow.cluster::<()>();
887    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
888    /// let all_received = numbers.send(&process, TCP.fail_stop().bincode()); // KeyedStream<MemberId<()>, i32, ...>
889    /// # all_received.entries()
890    /// # }, |mut stream| async move {
891    /// // if there are 4 members in the cluster, we should receive 4 elements
892    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
893    /// # let mut results = Vec::new();
894    /// # for w in 0..4 {
895    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
896    /// # }
897    /// # results.sort();
898    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
899    /// # }));
900    /// # }
901    /// ```
902    ///
903    /// If you don't need to know the source for each element, you can use `.values()`
904    /// to get just the data:
905    /// ```rust
906    /// # #[cfg(feature = "deploy")] {
907    /// # use hydro_lang::prelude::*;
908    /// # use hydro_lang::live_collections::stream::NoOrder;
909    /// # use futures::StreamExt;
910    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
911    /// # let workers: Cluster<()> = flow.cluster::<()>();
912    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
913    /// let values: Stream<i32, _, _, NoOrder> =
914    ///     numbers.send(&process, TCP.fail_stop().bincode()).values();
915    /// # values
916    /// # }, |mut stream| async move {
917    /// # let mut results = Vec::new();
918    /// # for w in 0..4 {
919    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
920    /// # }
921    /// # results.sort();
922    /// // if there are 4 members in the cluster, we should receive 4 elements
923    /// // 1, 1, 1, 1
924    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
925    /// # }));
926    /// # }
927    /// ```
928    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
929    pub fn send<L2, N: NetworkFor<T>>(
930        self,
931        to: &Process<'a, L2>,
932        via: N,
933    ) -> KeyedStream<
934        MemberId<L>,
935        T,
936        Process<'a, L2>,
937        Unbounded,
938        <O as MinOrder<N::OrderingGuarantee>>::Min,
939        R,
940    >
941    where
942        T: Serialize + DeserializeOwned,
943        O: MinOrder<N::OrderingGuarantee>,
944    {
945        let serialize_pipeline = Some(N::serialize_thunk(false));
946
947        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
948
949        let name = via.name();
950        if to.multiversioned() && name.is_none() {
951            panic!(
952                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
953            );
954        }
955
956        let raw_stream: Stream<
957            (MemberId<L>, T),
958            Process<'a, L2>,
959            Unbounded,
960            <O as MinOrder<N::OrderingGuarantee>>::Min,
961            R,
962        > = Stream::new(
963            to.clone(),
964            HydroNode::Network {
965                name: name.map(ToOwned::to_owned),
966                networking_info: N::networking_info(),
967                serialize_fn: serialize_pipeline.map(|e| e.into()),
968                instantiate_fn: DebugInstantiate::Building,
969                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
970                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
971                metadata: to.new_node_metadata(Stream::<
972                    (MemberId<L>, T),
973                    Process<'a, L2>,
974                    Unbounded,
975                    <O as MinOrder<N::OrderingGuarantee>>::Min,
976                    R,
977                >::collection_kind()),
978            },
979        );
980
981        raw_stream.into_keyed()
982    }
983
984    #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
985    /// Broadcasts elements of this stream at each source member to all members of a destination
986    /// cluster, using [`bincode`] to serialize/deserialize messages.
987    ///
988    /// Each source member sends each of its stream elements to **every** member of the cluster
989    /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
990    /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
991    /// **only data elements** and sends each element to all cluster members.
992    ///
993    /// # Non-Determinism
994    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
995    /// to the current cluster members known _at that point in time_ at the source member. Depending
996    /// on when each source member is notified of membership changes, it will broadcast each element
997    /// to different members.
998    ///
999    /// # Example
1000    /// ```rust
1001    /// # #[cfg(feature = "deploy")] {
1002    /// # use hydro_lang::prelude::*;
1003    /// # use hydro_lang::location::MemberId;
1004    /// # use futures::StreamExt;
1005    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1006    /// # type Source = ();
1007    /// # type Destination = ();
1008    /// let source: Cluster<Source> = flow.cluster::<Source>();
1009    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1010    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1011    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
1012    /// # on_destination.entries().send_bincode(&p2).entries()
1013    /// // if there are 4 members in the desination, each receives one element from each source member
1014    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1015    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1016    /// // - ...
1017    /// # }, |mut stream| async move {
1018    /// # let mut results = Vec::new();
1019    /// # for w in 0..16 {
1020    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1021    /// # }
1022    /// # results.sort();
1023    /// # assert_eq!(results, vec![
1024    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1025    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1026    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1027    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1028    /// # ]);
1029    /// # }));
1030    /// # }
1031    /// ```
1032    pub fn broadcast_bincode<L2: 'a>(
1033        self,
1034        other: &Cluster<'a, L2>,
1035        nondet_membership: NonDet,
1036    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1037    where
1038        T: Clone + Serialize + DeserializeOwned,
1039    {
1040        self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
1041    }
1042
1043    /// Broadcasts elements of this stream at each source member to all members of a destination
1044    /// cluster, using the configuration in `via` to set up the message transport.
1045    ///
1046    /// Each source member sends each of its stream elements to **every** member of the cluster
1047    /// based on its latest membership information. Unlike [`Stream::demux`], which requires
1048    /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
1049    /// **only data elements** and sends each element to all cluster members.
1050    ///
1051    /// # Non-Determinism
1052    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1053    /// to the current cluster members known _at that point in time_ at the source member. Depending
1054    /// on when each source member is notified of membership changes, it will broadcast each element
1055    /// to different members.
1056    ///
1057    /// # Example
1058    /// ```rust
1059    /// # #[cfg(feature = "deploy")] {
1060    /// # use hydro_lang::prelude::*;
1061    /// # use hydro_lang::location::MemberId;
1062    /// # use futures::StreamExt;
1063    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1064    /// # type Source = ();
1065    /// # type Destination = ();
1066    /// let source: Cluster<Source> = flow.cluster::<Source>();
1067    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1068    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1069    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
1070    /// # on_destination.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1071    /// // if there are 4 members in the desination, each receives one element from each source member
1072    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1073    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1074    /// // - ...
1075    /// # }, |mut stream| async move {
1076    /// # let mut results = Vec::new();
1077    /// # for w in 0..16 {
1078    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1079    /// # }
1080    /// # results.sort();
1081    /// # assert_eq!(results, vec![
1082    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1083    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1084    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1085    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1086    /// # ]);
1087    /// # }));
1088    /// # }
1089    /// ```
1090    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1091    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1092        self,
1093        to: &Cluster<'a, L2>,
1094        via: N,
1095        nondet_membership: NonDet,
1096    ) -> KeyedStream<
1097        MemberId<L>,
1098        T,
1099        Cluster<'a, L2>,
1100        Unbounded,
1101        <O as MinOrder<N::OrderingGuarantee>>::Min,
1102        R,
1103    >
1104    where
1105        T: Clone + Serialize + DeserializeOwned,
1106        O: MinOrder<N::OrderingGuarantee>,
1107    {
1108        let ids = track_membership(self.location.source_cluster_membership_stream(
1109            to,
1110            nondet!(/** droppped prefixes don't affect broadcast */),
1111        ));
1112        sliced! {
1113            let members_snapshot = use(ids, nondet_membership);
1114            let elements = use(self, nondet_membership);
1115
1116            let current_members = members_snapshot.filter(q!(|b| *b));
1117            elements.repeat_with_keys(current_members)
1118        }
1119        .demux(to, via)
1120    }
1121
1122    #[cfg(feature = "sim")]
1123    /// Sends elements of this cluster stream to an external location using bincode serialization.
1124    fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
1125    where
1126        T: Serialize + DeserializeOwned,
1127    {
1128        let serialize_pipeline = Some(serialize_bincode::<T>(false));
1129
1130        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
1131
1132        let external_port_id = flow_state_borrow.next_external_port();
1133
1134        flow_state_borrow.push_root(HydroRoot::SendExternal {
1135            to_external_key: other.key,
1136            to_port_id: external_port_id,
1137            to_many: false,
1138            unpaired: true,
1139            serialize_fn: serialize_pipeline.map(|e| e.into()),
1140            instantiate_fn: DebugInstantiate::Building,
1141            input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1142            op_metadata: HydroIrOpMetadata::new(),
1143        });
1144
1145        ExternalBincodeStream {
1146            process_key: other.key,
1147            port_id: external_port_id,
1148            _phantom: PhantomData,
1149        }
1150    }
1151
1152    #[cfg(feature = "sim")]
1153    /// Sets up a simulation output port for this cluster stream, allowing test code
1154    /// to receive `(member_id, T)` pairs during simulation.
1155    pub fn sim_cluster_output(self) -> crate::sim::SimClusterReceiver<T, O, R>
1156    where
1157        T: Serialize + DeserializeOwned,
1158    {
1159        let external_location: External<'a, ()> = External {
1160            key: LocationKey::FIRST,
1161            flow_state: self.location.flow_state().clone(),
1162            _phantom: PhantomData,
1163        };
1164
1165        let external = self.send_bincode_external(&external_location);
1166
1167        crate::sim::SimClusterReceiver(external.port_id, PhantomData)
1168    }
1169}
1170
1171impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
1172    Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
1173{
1174    #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
1175    /// Sends elements of this stream at each source member to specific members of a destination
1176    /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1177    ///
1178    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1179    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1180    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1181    /// all members.
1182    ///
1183    /// Each cluster member sends its local stream elements, and they are collected at each
1184    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1185    ///
1186    /// # Example
1187    /// ```rust
1188    /// # #[cfg(feature = "deploy")] {
1189    /// # use hydro_lang::prelude::*;
1190    /// # use futures::StreamExt;
1191    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1192    /// # type Source = ();
1193    /// # type Destination = ();
1194    /// let source: Cluster<Source> = flow.cluster::<Source>();
1195    /// let to_send: Stream<_, Cluster<_>, _> = source
1196    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1197    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1198    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1199    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1200    /// # all_received.entries().send_bincode(&p2).entries()
1201    /// # }, |mut stream| async move {
1202    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1203    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1204    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1205    /// // - ...
1206    /// # let mut results = Vec::new();
1207    /// # for w in 0..16 {
1208    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1209    /// # }
1210    /// # results.sort();
1211    /// # assert_eq!(results, vec![
1212    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1213    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1214    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1215    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1216    /// # ]);
1217    /// # }));
1218    /// # }
1219    /// ```
1220    pub fn demux_bincode(
1221        self,
1222        other: &Cluster<'a, L2>,
1223    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1224    where
1225        T: Serialize + DeserializeOwned,
1226    {
1227        self.demux(other, TCP.fail_stop().bincode())
1228    }
1229
1230    /// Sends elements of this stream at each source member to specific members of a destination
1231    /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1232    /// message transport.
1233    ///
1234    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1235    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1236    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1237    /// all members.
1238    ///
1239    /// Each cluster member sends its local stream elements, and they are collected at each
1240    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1241    ///
1242    /// # Example
1243    /// ```rust
1244    /// # #[cfg(feature = "deploy")] {
1245    /// # use hydro_lang::prelude::*;
1246    /// # use futures::StreamExt;
1247    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1248    /// # type Source = ();
1249    /// # type Destination = ();
1250    /// let source: Cluster<Source> = flow.cluster::<Source>();
1251    /// let to_send: Stream<_, Cluster<_>, _> = source
1252    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1253    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1254    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1255    /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1256    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1257    /// # }, |mut stream| async move {
1258    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1259    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1260    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1261    /// // - ...
1262    /// # let mut results = Vec::new();
1263    /// # for w in 0..16 {
1264    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1265    /// # }
1266    /// # results.sort();
1267    /// # assert_eq!(results, vec![
1268    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1269    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1270    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1271    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1272    /// # ]);
1273    /// # }));
1274    /// # }
1275    /// ```
1276    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1277    pub fn demux<N: NetworkFor<T>>(
1278        self,
1279        to: &Cluster<'a, L2>,
1280        via: N,
1281    ) -> KeyedStream<
1282        MemberId<L>,
1283        T,
1284        Cluster<'a, L2, NoConsistency>,
1285        Unbounded,
1286        <O as MinOrder<N::OrderingGuarantee>>::Min,
1287        R,
1288    >
1289    where
1290        T: Serialize + DeserializeOwned,
1291        O: MinOrder<N::OrderingGuarantee>,
1292    {
1293        self.into_keyed().demux(to, via)
1294    }
1295}
1296
1297#[cfg(test)]
1298mod tests {
1299    #[cfg(feature = "sim")]
1300    use stageleft::q;
1301
1302    #[cfg(feature = "sim")]
1303    use crate::live_collections::sliced::sliced;
1304    #[cfg(feature = "sim")]
1305    use crate::location::{Location, MemberId};
1306    #[cfg(feature = "sim")]
1307    use crate::networking::TCP;
1308    #[cfg(feature = "sim")]
1309    use crate::nondet::nondet;
1310    #[cfg(feature = "sim")]
1311    use crate::prelude::FlowBuilder;
1312
1313    #[cfg(feature = "sim")]
1314    #[test]
1315    fn sim_send_bincode_o2o() {
1316        use crate::networking::TCP;
1317
1318        let mut flow = FlowBuilder::new();
1319        let node = flow.process::<()>();
1320        let node2 = flow.process::<()>();
1321
1322        let (in_send, input) = node.sim_input();
1323
1324        let out_recv = input
1325            .send(&node2, TCP.fail_stop().bincode())
1326            .batch(&node2.tick(), nondet!(/** test */))
1327            .count()
1328            .all_ticks()
1329            .sim_output();
1330
1331        let instances = flow.sim().exhaustive(async || {
1332            in_send.send(());
1333            in_send.send(());
1334            in_send.send(());
1335
1336            let received = out_recv.collect::<Vec<_>>().await;
1337            assert!(received.into_iter().sum::<usize>() == 3);
1338        });
1339
1340        assert_eq!(instances, 4); // 2^{3 - 1}
1341    }
1342
1343    #[cfg(feature = "sim")]
1344    #[test]
1345    fn sim_send_bincode_m2o() {
1346        let mut flow = FlowBuilder::new();
1347        let cluster = flow.cluster::<()>();
1348        let node = flow.process::<()>();
1349
1350        let input = cluster.source_iter(q!(vec![1]));
1351
1352        let out_recv = input
1353            .send(&node, TCP.fail_stop().bincode())
1354            .entries()
1355            .batch(&node.tick(), nondet!(/** test */))
1356            .all_ticks()
1357            .sim_output();
1358
1359        let instances = flow
1360            .sim()
1361            .with_cluster_size(&cluster, 4)
1362            .exhaustive(async || {
1363                out_recv
1364                    .assert_yields_only_unordered(vec![
1365                        (MemberId::from_raw_id(0), 1),
1366                        (MemberId::from_raw_id(1), 1),
1367                        (MemberId::from_raw_id(2), 1),
1368                        (MemberId::from_raw_id(3), 1),
1369                    ])
1370                    .await
1371            });
1372
1373        assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1374    }
1375
1376    #[cfg(feature = "sim")]
1377    #[test]
1378    fn sim_send_bincode_multiple_m2o() {
1379        let mut flow = FlowBuilder::new();
1380        let cluster1 = flow.cluster::<()>();
1381        let cluster2 = flow.cluster::<()>();
1382        let node = flow.process::<()>();
1383
1384        let out_recv_1 = cluster1
1385            .source_iter(q!(vec![1]))
1386            .send(&node, TCP.fail_stop().bincode())
1387            .entries()
1388            .sim_output();
1389
1390        let out_recv_2 = cluster2
1391            .source_iter(q!(vec![2]))
1392            .send(&node, TCP.fail_stop().bincode())
1393            .entries()
1394            .sim_output();
1395
1396        let instances = flow
1397            .sim()
1398            .with_cluster_size(&cluster1, 3)
1399            .with_cluster_size(&cluster2, 4)
1400            .exhaustive(async || {
1401                out_recv_1
1402                    .assert_yields_only_unordered(vec![
1403                        (MemberId::from_raw_id(0), 1),
1404                        (MemberId::from_raw_id(1), 1),
1405                        (MemberId::from_raw_id(2), 1),
1406                    ])
1407                    .await;
1408
1409                out_recv_2
1410                    .assert_yields_only_unordered(vec![
1411                        (MemberId::from_raw_id(0), 2),
1412                        (MemberId::from_raw_id(1), 2),
1413                        (MemberId::from_raw_id(2), 2),
1414                        (MemberId::from_raw_id(3), 2),
1415                    ])
1416                    .await;
1417            });
1418
1419        assert_eq!(instances, 1);
1420    }
1421
1422    #[cfg(feature = "sim")]
1423    #[test]
1424    fn sim_send_bincode_o2m() {
1425        let mut flow = FlowBuilder::new();
1426        let cluster = flow.cluster::<()>();
1427        let node = flow.process::<()>();
1428
1429        let input = node.source_iter(q!(vec![
1430            (MemberId::from_raw_id(0), 123),
1431            (MemberId::from_raw_id(1), 456),
1432        ]));
1433
1434        let out_recv = input
1435            .demux(&cluster, TCP.fail_stop().bincode())
1436            .map(q!(|x| x + 1))
1437            .send(&node, TCP.fail_stop().bincode())
1438            .entries()
1439            .sim_output();
1440
1441        flow.sim()
1442            .with_cluster_size(&cluster, 4)
1443            .exhaustive(async || {
1444                out_recv
1445                    .assert_yields_only_unordered(vec![
1446                        (MemberId::from_raw_id(0), 124),
1447                        (MemberId::from_raw_id(1), 457),
1448                    ])
1449                    .await
1450            });
1451    }
1452
1453    #[cfg(feature = "sim")]
1454    #[test]
1455    fn sim_broadcast_bincode_o2m() {
1456        let mut flow = FlowBuilder::new();
1457        let cluster = flow.cluster::<()>();
1458        let node = flow.process::<()>();
1459
1460        let input = node.source_iter(q!(vec![123, 456]));
1461
1462        let out_recv = input
1463            .broadcast(&cluster, TCP.fail_stop().bincode(), nondet!(/** test */))
1464            .map(q!(|x| x + 1))
1465            .send(&node, TCP.fail_stop().bincode())
1466            .entries()
1467            .sim_output();
1468
1469        let mut c_1_produced = false;
1470        let mut c_2_produced = false;
1471
1472        flow.sim()
1473            .with_cluster_size(&cluster, 2)
1474            .exhaustive(async || {
1475                let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1476
1477                // check that order is preserved
1478                if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1479                    assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1480                    c_1_produced = true;
1481                }
1482
1483                if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1484                    assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1485                    c_2_produced = true;
1486                }
1487            });
1488
1489        assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1490    }
1491
1492    #[cfg(feature = "sim")]
1493    #[test]
1494    fn sim_send_bincode_m2m() {
1495        let mut flow = FlowBuilder::new();
1496        let cluster = flow.cluster::<()>();
1497        let node = flow.process::<()>();
1498
1499        let input = node.source_iter(q!(vec![
1500            (MemberId::from_raw_id(0), 123),
1501            (MemberId::from_raw_id(1), 456),
1502        ]));
1503
1504        let out_recv = input
1505            .demux(&cluster, TCP.fail_stop().bincode())
1506            .map(q!(|x| x + 1))
1507            .flat_map_ordered(q!(|x| vec![
1508                (MemberId::from_raw_id(0), x),
1509                (MemberId::from_raw_id(1), x),
1510            ]))
1511            .demux(&cluster, TCP.fail_stop().bincode())
1512            .entries()
1513            .send(&node, TCP.fail_stop().bincode())
1514            .entries()
1515            .sim_output();
1516
1517        flow.sim()
1518            .with_cluster_size(&cluster, 4)
1519            .exhaustive(async || {
1520                out_recv
1521                    .assert_yields_only_unordered(vec![
1522                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1523                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1524                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1525                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1526                    ])
1527                    .await
1528            });
1529    }
1530
1531    #[cfg(feature = "sim")]
1532    #[test]
1533    fn sim_lossy_delayed_forever_o2o() {
1534        use std::collections::HashSet;
1535
1536        use crate::properties::manual_proof;
1537
1538        let mut flow = FlowBuilder::new();
1539        let node = flow.process::<()>();
1540        let node2 = flow.process::<()>();
1541
1542        let received = node
1543            .source_iter(q!(0..3_u32))
1544            .send(&node2, TCP.lossy_delayed_forever().bincode())
1545            .fold(
1546                q!(|| std::collections::HashSet::<u32>::new()),
1547                q!(
1548                    |set, v| {
1549                        set.insert(v);
1550                    },
1551                    commutative = manual_proof!(/** set insert is commutative */)
1552                ),
1553            );
1554
1555        let out_recv = sliced! {
1556            let snapshot = use(received, nondet!(/** test */));
1557            snapshot.into_stream()
1558        }
1559        .sim_output();
1560
1561        let mut saw_non_contiguous = false;
1562
1563        flow.sim().test_safety_only().exhaustive(async || {
1564            let snapshots = out_recv.collect::<Vec<HashSet<u32>>>().await;
1565
1566            // Check each individual snapshot for a non-contiguous subset.
1567            for set in &snapshots {
1568                #[expect(clippy::disallowed_methods, reason = "min / max are deterministic")]
1569                if set.len() >= 2 && set.len() < 3 {
1570                    let min = *set.iter().min().unwrap();
1571                    let max = *set.iter().max().unwrap();
1572                    if set.len() < (max - min + 1) as usize {
1573                        saw_non_contiguous = true;
1574                    }
1575                }
1576            }
1577        });
1578
1579        assert!(
1580            saw_non_contiguous,
1581            "Expected at least one execution with a non-contiguous subset of inputs"
1582        );
1583    }
1584}