hydro_lang/live_collections/stream/networking.rs
1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(stageleft_runtime)]
18use crate::location::dynamic::DynLocation;
19use crate::location::external_process::ExternalBincodeStream;
20use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
21use crate::networking::{NetworkFor, TCP};
22use crate::nondet::NonDet;
23#[cfg(feature = "sim")]
24use crate::sim::SimReceiver;
25use crate::staging_util::get_this_crate;
26
27// same as the one in `hydro_std`, but internal use only
28fn track_membership<'a, C, L: Location<'a> + NoTick>(
29 membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
30) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
31 membership.fold(
32 q!(|| false),
33 q!(|present, event| {
34 match event {
35 MembershipEvent::Joined => *present = true,
36 MembershipEvent::Left => *present = false,
37 }
38 }),
39 )
40}
41
42fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
43 let root = get_this_crate();
44
45 if is_demux {
46 parse_quote! {
47 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
48 |(id, data)| {
49 (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
50 }
51 )
52 }
53 } else {
54 parse_quote! {
55 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
56 |data| {
57 #root::runtime_support::bincode::serialize(&data).unwrap().into()
58 }
59 )
60 }
61 }
62}
63
64pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
65 serialize_bincode_with_type(is_demux, "e_type::<T>())
66}
67
68fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
69 let root = get_this_crate();
70
71 if let Some(c_type) = tagged {
72 parse_quote! {
73 |res| {
74 let (id, b) = res.unwrap();
75 (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
76 }
77 }
78 } else {
79 parse_quote! {
80 |res| {
81 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
82 }
83 }
84 }
85}
86
87pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
88 deserialize_bincode_with_type(tagged, "e_type::<T>())
89}
90
91impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
92 #[deprecated = "use Stream::send(..., TCP.bincode()) instead"]
93 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
94 /// using [`bincode`] to serialize/deserialize messages.
95 ///
96 /// The returned stream captures the elements received at the destination, where values will
97 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
98 /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
99 /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
100 /// dropped no further messages will be sent.
101 ///
102 /// # Example
103 /// ```rust
104 /// # #[cfg(feature = "deploy")] {
105 /// # use hydro_lang::prelude::*;
106 /// # use futures::StreamExt;
107 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
108 /// let p1 = flow.process::<()>();
109 /// let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
110 /// let p2 = flow.process::<()>();
111 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
112 /// // 1, 2, 3
113 /// # on_p2.send_bincode(&p_out)
114 /// # }, |mut stream| async move {
115 /// # for w in 1..=3 {
116 /// # assert_eq!(stream.next().await, Some(w));
117 /// # }
118 /// # }));
119 /// # }
120 /// ```
121 pub fn send_bincode<L2>(
122 self,
123 other: &Process<'a, L2>,
124 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
125 where
126 T: Serialize + DeserializeOwned,
127 {
128 self.send(other, TCP.bincode())
129 }
130
131 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
132 /// using the configuration in `via` to set up the message transport.
133 ///
134 /// The returned stream captures the elements received at the destination, where values will
135 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
136 /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
137 /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
138 /// dropped no further messages will be sent.
139 ///
140 /// # Example
141 /// ```rust
142 /// # #[cfg(feature = "deploy")] {
143 /// # use hydro_lang::prelude::*;
144 /// # use futures::StreamExt;
145 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
146 /// let p1 = flow.process::<()>();
147 /// let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
148 /// let p2 = flow.process::<()>();
149 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.bincode());
150 /// // 1, 2, 3
151 /// # on_p2.send(&p_out, TCP.bincode())
152 /// # }, |mut stream| async move {
153 /// # for w in 1..=3 {
154 /// # assert_eq!(stream.next().await, Some(w));
155 /// # }
156 /// # }));
157 /// # }
158 /// ```
159 pub fn send<L2, N: NetworkFor<T>>(
160 self,
161 to: &Process<'a, L2>,
162 via: N,
163 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
164 where
165 T: Serialize + DeserializeOwned,
166 {
167 let _ = via;
168 let serialize_pipeline = Some(N::serialize_thunk(false));
169 let deserialize_pipeline = Some(N::deserialize_thunk(None));
170
171 Stream::new(
172 to.clone(),
173 HydroNode::Network {
174 serialize_fn: serialize_pipeline.map(|e| e.into()),
175 instantiate_fn: DebugInstantiate::Building,
176 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
177 input: Box::new(self.ir_node.into_inner()),
178 metadata: to.new_node_metadata(
179 Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
180 ),
181 },
182 )
183 }
184
185 #[deprecated = "use Stream::broadcast(..., TCP.bincode()) instead"]
186 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
187 /// using [`bincode`] to serialize/deserialize messages.
188 ///
189 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
190 /// membership information. This is a common pattern in distributed systems for broadcasting data to
191 /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
192 /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
193 /// each element to all cluster members.
194 ///
195 /// # Non-Determinism
196 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
197 /// to the current cluster members _at that point in time_. Depending on when we are notified of
198 /// membership changes, we will broadcast each element to different members.
199 ///
200 /// # Example
201 /// ```rust
202 /// # #[cfg(feature = "deploy")] {
203 /// # use hydro_lang::prelude::*;
204 /// # use futures::StreamExt;
205 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
206 /// let p1 = flow.process::<()>();
207 /// let workers: Cluster<()> = flow.cluster::<()>();
208 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
209 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
210 /// # on_worker.send_bincode(&p2).entries()
211 /// // if there are 4 members in the cluster, each receives one element
212 /// // - MemberId::<()>(0): [123]
213 /// // - MemberId::<()>(1): [123]
214 /// // - MemberId::<()>(2): [123]
215 /// // - MemberId::<()>(3): [123]
216 /// # }, |mut stream| async move {
217 /// # let mut results = Vec::new();
218 /// # for w in 0..4 {
219 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
220 /// # }
221 /// # results.sort();
222 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
223 /// # }));
224 /// # }
225 /// ```
226 pub fn broadcast_bincode<L2: 'a>(
227 self,
228 other: &Cluster<'a, L2>,
229 nondet_membership: NonDet,
230 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
231 where
232 T: Clone + Serialize + DeserializeOwned,
233 {
234 self.broadcast(other, TCP.bincode(), nondet_membership)
235 }
236
237 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
238 /// using the configuration in `via` to set up the message transport.
239 ///
240 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
241 /// membership information. This is a common pattern in distributed systems for broadcasting data to
242 /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
243 /// target specific members, `broadcast` takes a stream of **only data elements** and sends
244 /// each element to all cluster members.
245 ///
246 /// # Non-Determinism
247 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
248 /// to the current cluster members _at that point in time_. Depending on when we are notified of
249 /// membership changes, we will broadcast each element to different members.
250 ///
251 /// # Example
252 /// ```rust
253 /// # #[cfg(feature = "deploy")] {
254 /// # use hydro_lang::prelude::*;
255 /// # use futures::StreamExt;
256 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
257 /// let p1 = flow.process::<()>();
258 /// let workers: Cluster<()> = flow.cluster::<()>();
259 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
260 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.bincode(), nondet!(/** assuming stable membership */));
261 /// # on_worker.send(&p2, TCP.bincode()).entries()
262 /// // if there are 4 members in the cluster, each receives one element
263 /// // - MemberId::<()>(0): [123]
264 /// // - MemberId::<()>(1): [123]
265 /// // - MemberId::<()>(2): [123]
266 /// // - MemberId::<()>(3): [123]
267 /// # }, |mut stream| async move {
268 /// # let mut results = Vec::new();
269 /// # for w in 0..4 {
270 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
271 /// # }
272 /// # results.sort();
273 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
274 /// # }));
275 /// # }
276 /// ```
277 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
278 self,
279 to: &Cluster<'a, L2>,
280 via: N,
281 nondet_membership: NonDet,
282 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
283 where
284 T: Clone + Serialize + DeserializeOwned,
285 {
286 let ids = track_membership(self.location.source_cluster_members(to));
287 sliced! {
288 let members_snapshot = use(ids, nondet_membership);
289 let elements = use(self, nondet_membership);
290
291 let current_members = members_snapshot.filter(q!(|b| *b));
292 elements.repeat_with_keys(current_members)
293 }
294 .demux(to, via)
295 }
296
297 /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
298 /// serialization. The external process can receive these elements by establishing a TCP
299 /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
300 ///
301 /// # Example
302 /// ```rust
303 /// # #[cfg(feature = "deploy")] {
304 /// # use hydro_lang::prelude::*;
305 /// # use futures::StreamExt;
306 /// # tokio_test::block_on(async move {
307 /// let flow = FlowBuilder::new();
308 /// let process = flow.process::<()>();
309 /// let numbers: Stream<_, Process<_>, Unbounded> = process.source_iter(q!(vec![1, 2, 3]));
310 /// let external = flow.external::<()>();
311 /// let external_handle = numbers.send_bincode_external(&external);
312 ///
313 /// let mut deployment = hydro_deploy::Deployment::new();
314 /// let nodes = flow
315 /// .with_process(&process, deployment.Localhost())
316 /// .with_external(&external, deployment.Localhost())
317 /// .deploy(&mut deployment);
318 ///
319 /// deployment.deploy().await.unwrap();
320 /// // establish the TCP connection
321 /// let mut external_recv_stream = nodes.connect(external_handle).await;
322 /// deployment.start().await.unwrap();
323 ///
324 /// for w in 1..=3 {
325 /// assert_eq!(external_recv_stream.next().await, Some(w));
326 /// }
327 /// # });
328 /// # }
329 /// ```
330 pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
331 where
332 T: Serialize + DeserializeOwned,
333 {
334 let serialize_pipeline = Some(serialize_bincode::<T>(false));
335
336 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
337
338 let external_key = flow_state_borrow.next_external_out;
339 flow_state_borrow.next_external_out += 1;
340
341 flow_state_borrow.push_root(HydroRoot::SendExternal {
342 to_external_id: other.id,
343 to_key: external_key,
344 to_many: false,
345 unpaired: true,
346 serialize_fn: serialize_pipeline.map(|e| e.into()),
347 instantiate_fn: DebugInstantiate::Building,
348 input: Box::new(self.ir_node.into_inner()),
349 op_metadata: HydroIrOpMetadata::new(),
350 });
351
352 ExternalBincodeStream {
353 process_id: other.id,
354 port_id: external_key,
355 _phantom: PhantomData,
356 }
357 }
358
359 #[cfg(feature = "sim")]
360 /// Sets up a simulation output port for this stream, allowing test code to receive elements
361 /// sent to this stream during simulation.
362 pub fn sim_output(self) -> SimReceiver<T, O, R>
363 where
364 T: Serialize + DeserializeOwned,
365 {
366 let external_location: External<'a, ()> = External {
367 id: 0,
368 flow_state: self.location.flow_state().clone(),
369 _phantom: PhantomData,
370 };
371
372 let external = self.send_bincode_external(&external_location);
373
374 SimReceiver(external.port_id, PhantomData)
375 }
376}
377
378impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
379 Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
380{
381 #[deprecated = "use Stream::demux(..., TCP.bincode()) instead"]
382 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
383 /// using [`bincode`] to serialize/deserialize messages.
384 ///
385 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
386 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
387 /// this API allows precise targeting of specific cluster members rather than broadcasting to
388 /// all members.
389 ///
390 /// # Example
391 /// ```rust
392 /// # #[cfg(feature = "deploy")] {
393 /// # use hydro_lang::prelude::*;
394 /// # use futures::StreamExt;
395 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
396 /// let p1 = flow.process::<()>();
397 /// let workers: Cluster<()> = flow.cluster::<()>();
398 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
399 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
400 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
401 /// .demux_bincode(&workers);
402 /// # on_worker.send_bincode(&p2).entries()
403 /// // if there are 4 members in the cluster, each receives one element
404 /// // - MemberId::<()>(0): [0]
405 /// // - MemberId::<()>(1): [1]
406 /// // - MemberId::<()>(2): [2]
407 /// // - MemberId::<()>(3): [3]
408 /// # }, |mut stream| async move {
409 /// # let mut results = Vec::new();
410 /// # for w in 0..4 {
411 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
412 /// # }
413 /// # results.sort();
414 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
415 /// # }));
416 /// # }
417 /// ```
418 pub fn demux_bincode(
419 self,
420 other: &Cluster<'a, L2>,
421 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
422 where
423 T: Serialize + DeserializeOwned,
424 {
425 self.demux(other, TCP.bincode())
426 }
427
428 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
429 /// using the configuration in `via` to set up the message transport.
430 ///
431 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
432 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
433 /// this API allows precise targeting of specific cluster members rather than broadcasting to
434 /// all members.
435 ///
436 /// # Example
437 /// ```rust
438 /// # #[cfg(feature = "deploy")] {
439 /// # use hydro_lang::prelude::*;
440 /// # use futures::StreamExt;
441 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
442 /// let p1 = flow.process::<()>();
443 /// let workers: Cluster<()> = flow.cluster::<()>();
444 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
445 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
446 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
447 /// .demux(&workers, TCP.bincode());
448 /// # on_worker.send(&p2, TCP.bincode()).entries()
449 /// // if there are 4 members in the cluster, each receives one element
450 /// // - MemberId::<()>(0): [0]
451 /// // - MemberId::<()>(1): [1]
452 /// // - MemberId::<()>(2): [2]
453 /// // - MemberId::<()>(3): [3]
454 /// # }, |mut stream| async move {
455 /// # let mut results = Vec::new();
456 /// # for w in 0..4 {
457 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
458 /// # }
459 /// # results.sort();
460 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
461 /// # }));
462 /// # }
463 /// ```
464 pub fn demux<N: NetworkFor<T>>(
465 self,
466 to: &Cluster<'a, L2>,
467 via: N,
468 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
469 where
470 T: Serialize + DeserializeOwned,
471 {
472 self.into_keyed().demux(to, via)
473 }
474}
475
476impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
477 #[deprecated = "use Stream::round_robin(..., TCP.bincode()) instead"]
478 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
479 /// [`bincode`] to serialize/deserialize messages.
480 ///
481 /// This provides load balancing by evenly distributing work across cluster members. The
482 /// distribution is deterministic based on element order - the first element goes to member 0,
483 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
484 ///
485 /// # Non-Determinism
486 /// The set of cluster members may asynchronously change over time. Each element is distributed
487 /// based on the current cluster membership _at that point in time_. Depending on when cluster
488 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
489 /// membership is stable, the order of members in the round-robin pattern may change across runs.
490 ///
491 /// # Ordering Requirements
492 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
493 /// order of messages and retries affects the round-robin pattern.
494 ///
495 /// # Example
496 /// ```rust
497 /// # #[cfg(feature = "deploy")] {
498 /// # use hydro_lang::prelude::*;
499 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
500 /// # use futures::StreamExt;
501 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
502 /// let p1 = flow.process::<()>();
503 /// let workers: Cluster<()> = flow.cluster::<()>();
504 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
505 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
506 /// on_worker.send_bincode(&p2)
507 /// # .first().values() // we use first to assert that each member gets one element
508 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
509 /// // - MemberId::<()>(?): [1]
510 /// // - MemberId::<()>(?): [2]
511 /// // - MemberId::<()>(?): [3]
512 /// // - MemberId::<()>(?): [4]
513 /// # }, |mut stream| async move {
514 /// # let mut results = Vec::new();
515 /// # for w in 0..4 {
516 /// # results.push(stream.next().await.unwrap());
517 /// # }
518 /// # results.sort();
519 /// # assert_eq!(results, vec![1, 2, 3, 4]);
520 /// # }));
521 /// # }
522 /// ```
523 pub fn round_robin_bincode<L2: 'a>(
524 self,
525 other: &Cluster<'a, L2>,
526 nondet_membership: NonDet,
527 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
528 where
529 T: Serialize + DeserializeOwned,
530 {
531 self.round_robin(other, TCP.bincode(), nondet_membership)
532 }
533
534 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
535 /// the configuration in `via` to set up the message transport.
536 ///
537 /// This provides load balancing by evenly distributing work across cluster members. The
538 /// distribution is deterministic based on element order - the first element goes to member 0,
539 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
540 ///
541 /// # Non-Determinism
542 /// The set of cluster members may asynchronously change over time. Each element is distributed
543 /// based on the current cluster membership _at that point in time_. Depending on when cluster
544 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
545 /// membership is stable, the order of members in the round-robin pattern may change across runs.
546 ///
547 /// # Ordering Requirements
548 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
549 /// order of messages and retries affects the round-robin pattern.
550 ///
551 /// # Example
552 /// ```rust
553 /// # #[cfg(feature = "deploy")] {
554 /// # use hydro_lang::prelude::*;
555 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
556 /// # use futures::StreamExt;
557 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
558 /// let p1 = flow.process::<()>();
559 /// let workers: Cluster<()> = flow.cluster::<()>();
560 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
561 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.bincode(), nondet!(/** assuming stable membership */));
562 /// on_worker.send(&p2, TCP.bincode())
563 /// # .first().values() // we use first to assert that each member gets one element
564 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
565 /// // - MemberId::<()>(?): [1]
566 /// // - MemberId::<()>(?): [2]
567 /// // - MemberId::<()>(?): [3]
568 /// // - MemberId::<()>(?): [4]
569 /// # }, |mut stream| async move {
570 /// # let mut results = Vec::new();
571 /// # for w in 0..4 {
572 /// # results.push(stream.next().await.unwrap());
573 /// # }
574 /// # results.sort();
575 /// # assert_eq!(results, vec![1, 2, 3, 4]);
576 /// # }));
577 /// # }
578 /// ```
579 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
580 self,
581 to: &Cluster<'a, L2>,
582 via: N,
583 nondet_membership: NonDet,
584 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
585 where
586 T: Serialize + DeserializeOwned,
587 {
588 let ids = track_membership(self.location.source_cluster_members(to));
589 sliced! {
590 let members_snapshot = use(ids, nondet_membership);
591 let elements = use(self.enumerate(), nondet_membership);
592
593 let current_members = members_snapshot
594 .filter(q!(|b| *b))
595 .keys()
596 .assume_ordering(nondet_membership)
597 .collect_vec();
598
599 elements
600 .cross_singleton(current_members)
601 .map(q!(|(data, members)| (
602 members[data.0 % members.len()].clone(),
603 data.1
604 )))
605 }
606 .demux(to, via)
607 }
608}
609
610impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
611 #[deprecated = "use Stream::round_robin(..., TCP.bincode()) instead"]
612 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
613 /// [`bincode`] to serialize/deserialize messages.
614 ///
615 /// This provides load balancing by evenly distributing work across cluster members. The
616 /// distribution is deterministic based on element order - the first element goes to member 0,
617 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
618 ///
619 /// # Non-Determinism
620 /// The set of cluster members may asynchronously change over time. Each element is distributed
621 /// based on the current cluster membership _at that point in time_. Depending on when cluster
622 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
623 /// membership is stable, the order of members in the round-robin pattern may change across runs.
624 ///
625 /// # Ordering Requirements
626 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
627 /// order of messages and retries affects the round-robin pattern.
628 ///
629 /// # Example
630 /// ```rust
631 /// # #[cfg(feature = "deploy")] {
632 /// # use hydro_lang::prelude::*;
633 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
634 /// # use hydro_lang::location::MemberId;
635 /// # use futures::StreamExt;
636 /// # std::thread::spawn(|| {
637 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
638 /// let p1 = flow.process::<()>();
639 /// let workers1: Cluster<()> = flow.cluster::<()>();
640 /// let workers2: Cluster<()> = flow.cluster::<()>();
641 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
642 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
643 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
644 /// on_worker2.send_bincode(&p2)
645 /// # .entries()
646 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
647 /// # }, |mut stream| async move {
648 /// # let mut results = Vec::new();
649 /// # let mut locations = std::collections::HashSet::new();
650 /// # for w in 0..=16 {
651 /// # let (location, v) = stream.next().await.unwrap();
652 /// # locations.insert(location);
653 /// # results.push(v);
654 /// # }
655 /// # results.sort();
656 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
657 /// # assert_eq!(locations.len(), 16);
658 /// # }));
659 /// # }).join().unwrap();
660 /// # }
661 /// ```
662 pub fn round_robin_bincode<L2: 'a>(
663 self,
664 other: &Cluster<'a, L2>,
665 nondet_membership: NonDet,
666 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
667 where
668 T: Serialize + DeserializeOwned,
669 {
670 self.round_robin(other, TCP.bincode(), nondet_membership)
671 }
672
673 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
674 /// the configuration in `via` to set up the message transport.
675 ///
676 /// This provides load balancing by evenly distributing work across cluster members. The
677 /// distribution is deterministic based on element order - the first element goes to member 0,
678 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
679 ///
680 /// # Non-Determinism
681 /// The set of cluster members may asynchronously change over time. Each element is distributed
682 /// based on the current cluster membership _at that point in time_. Depending on when cluster
683 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
684 /// membership is stable, the order of members in the round-robin pattern may change across runs.
685 ///
686 /// # Ordering Requirements
687 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
688 /// order of messages and retries affects the round-robin pattern.
689 ///
690 /// # Example
691 /// ```rust
692 /// # #[cfg(feature = "deploy")] {
693 /// # use hydro_lang::prelude::*;
694 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
695 /// # use hydro_lang::location::MemberId;
696 /// # use futures::StreamExt;
697 /// # std::thread::spawn(|| {
698 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
699 /// let p1 = flow.process::<()>();
700 /// let workers1: Cluster<()> = flow.cluster::<()>();
701 /// let workers2: Cluster<()> = flow.cluster::<()>();
702 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
703 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.bincode(), nondet!(/** assuming stable membership */));
704 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
705 /// on_worker2.send(&p2, TCP.bincode())
706 /// # .entries()
707 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
708 /// # }, |mut stream| async move {
709 /// # let mut results = Vec::new();
710 /// # let mut locations = std::collections::HashSet::new();
711 /// # for w in 0..=16 {
712 /// # let (location, v) = stream.next().await.unwrap();
713 /// # locations.insert(location);
714 /// # results.push(v);
715 /// # }
716 /// # results.sort();
717 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
718 /// # assert_eq!(locations.len(), 16);
719 /// # }));
720 /// # }).join().unwrap();
721 /// # }
722 /// ```
723 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
724 self,
725 to: &Cluster<'a, L2>,
726 via: N,
727 nondet_membership: NonDet,
728 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
729 where
730 T: Serialize + DeserializeOwned,
731 {
732 let ids = track_membership(self.location.source_cluster_members(to));
733 sliced! {
734 let members_snapshot = use(ids, nondet_membership);
735 let elements = use(self.enumerate(), nondet_membership);
736
737 let current_members = members_snapshot
738 .filter(q!(|b| *b))
739 .keys()
740 .assume_ordering(nondet_membership)
741 .collect_vec();
742
743 elements
744 .cross_singleton(current_members)
745 .map(q!(|(data, members)| (
746 members[data.0 % members.len()].clone(),
747 data.1
748 )))
749 }
750 .demux(to, via)
751 }
752}
753
754impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
755 #[deprecated = "use Stream::send(..., TCP.bincode()) instead"]
756 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
757 /// using [`bincode`] to serialize/deserialize messages.
758 ///
759 /// Each cluster member sends its local stream elements, and they are collected at the destination
760 /// as a [`KeyedStream`] where keys identify the source cluster member.
761 ///
762 /// # Example
763 /// ```rust
764 /// # #[cfg(feature = "deploy")] {
765 /// # use hydro_lang::prelude::*;
766 /// # use futures::StreamExt;
767 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
768 /// let workers: Cluster<()> = flow.cluster::<()>();
769 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
770 /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
771 /// # all_received.entries()
772 /// # }, |mut stream| async move {
773 /// // if there are 4 members in the cluster, we should receive 4 elements
774 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
775 /// # let mut results = Vec::new();
776 /// # for w in 0..4 {
777 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
778 /// # }
779 /// # results.sort();
780 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
781 /// # }));
782 /// # }
783 /// ```
784 ///
785 /// If you don't need to know the source for each element, you can use `.values()`
786 /// to get just the data:
787 /// ```rust
788 /// # #[cfg(feature = "deploy")] {
789 /// # use hydro_lang::prelude::*;
790 /// # use hydro_lang::live_collections::stream::NoOrder;
791 /// # use futures::StreamExt;
792 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
793 /// # let workers: Cluster<()> = flow.cluster::<()>();
794 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
795 /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
796 /// # values
797 /// # }, |mut stream| async move {
798 /// # let mut results = Vec::new();
799 /// # for w in 0..4 {
800 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
801 /// # }
802 /// # results.sort();
803 /// // if there are 4 members in the cluster, we should receive 4 elements
804 /// // 1, 1, 1, 1
805 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
806 /// # }));
807 /// # }
808 /// ```
809 pub fn send_bincode<L2>(
810 self,
811 other: &Process<'a, L2>,
812 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
813 where
814 T: Serialize + DeserializeOwned,
815 {
816 self.send(other, TCP.bincode())
817 }
818
819 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
820 /// using the configuration in `via` to set up the message transport.
821 ///
822 /// Each cluster member sends its local stream elements, and they are collected at the destination
823 /// as a [`KeyedStream`] where keys identify the source cluster member.
824 ///
825 /// # Example
826 /// ```rust
827 /// # #[cfg(feature = "deploy")] {
828 /// # use hydro_lang::prelude::*;
829 /// # use futures::StreamExt;
830 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
831 /// let workers: Cluster<()> = flow.cluster::<()>();
832 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
833 /// let all_received = numbers.send(&process, TCP.bincode()); // KeyedStream<MemberId<()>, i32, ...>
834 /// # all_received.entries()
835 /// # }, |mut stream| async move {
836 /// // if there are 4 members in the cluster, we should receive 4 elements
837 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
838 /// # let mut results = Vec::new();
839 /// # for w in 0..4 {
840 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
841 /// # }
842 /// # results.sort();
843 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
844 /// # }));
845 /// # }
846 /// ```
847 ///
848 /// If you don't need to know the source for each element, you can use `.values()`
849 /// to get just the data:
850 /// ```rust
851 /// # #[cfg(feature = "deploy")] {
852 /// # use hydro_lang::prelude::*;
853 /// # use hydro_lang::live_collections::stream::NoOrder;
854 /// # use futures::StreamExt;
855 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
856 /// # let workers: Cluster<()> = flow.cluster::<()>();
857 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
858 /// let values: Stream<i32, _, _, NoOrder> = numbers.send(&process, TCP.bincode()).values();
859 /// # values
860 /// # }, |mut stream| async move {
861 /// # let mut results = Vec::new();
862 /// # for w in 0..4 {
863 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
864 /// # }
865 /// # results.sort();
866 /// // if there are 4 members in the cluster, we should receive 4 elements
867 /// // 1, 1, 1, 1
868 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
869 /// # }));
870 /// # }
871 /// ```
872 pub fn send<L2, N: NetworkFor<T>>(
873 self,
874 to: &Process<'a, L2>,
875 via: N,
876 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
877 where
878 T: Serialize + DeserializeOwned,
879 {
880 let _ = via;
881 let serialize_pipeline = Some(N::serialize_thunk(false));
882
883 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
884
885 let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
886 to.clone(),
887 HydroNode::Network {
888 serialize_fn: serialize_pipeline.map(|e| e.into()),
889 instantiate_fn: DebugInstantiate::Building,
890 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
891 input: Box::new(self.ir_node.into_inner()),
892 metadata: to.new_node_metadata(Stream::<
893 (MemberId<L>, T),
894 Process<'a, L2>,
895 Unbounded,
896 O,
897 R,
898 >::collection_kind()),
899 },
900 );
901
902 raw_stream.into_keyed()
903 }
904
905 #[deprecated = "use Stream::broadcast(..., TCP.bincode()) instead"]
906 /// Broadcasts elements of this stream at each source member to all members of a destination
907 /// cluster, using [`bincode`] to serialize/deserialize messages.
908 ///
909 /// Each source member sends each of its stream elements to **every** member of the cluster
910 /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
911 /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
912 /// **only data elements** and sends each element to all cluster members.
913 ///
914 /// # Non-Determinism
915 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
916 /// to the current cluster members known _at that point in time_ at the source member. Depending
917 /// on when each source member is notified of membership changes, it will broadcast each element
918 /// to different members.
919 ///
920 /// # Example
921 /// ```rust
922 /// # #[cfg(feature = "deploy")] {
923 /// # use hydro_lang::prelude::*;
924 /// # use hydro_lang::location::MemberId;
925 /// # use futures::StreamExt;
926 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
927 /// # type Source = ();
928 /// # type Destination = ();
929 /// let source: Cluster<Source> = flow.cluster::<Source>();
930 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
931 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
932 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
933 /// # on_destination.entries().send_bincode(&p2).entries()
934 /// // if there are 4 members in the desination, each receives one element from each source member
935 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
936 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
937 /// // - ...
938 /// # }, |mut stream| async move {
939 /// # let mut results = Vec::new();
940 /// # for w in 0..16 {
941 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
942 /// # }
943 /// # results.sort();
944 /// # assert_eq!(results, vec![
945 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
946 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
947 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
948 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
949 /// # ]);
950 /// # }));
951 /// # }
952 /// ```
953 pub fn broadcast_bincode<L2: 'a>(
954 self,
955 other: &Cluster<'a, L2>,
956 nondet_membership: NonDet,
957 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
958 where
959 T: Clone + Serialize + DeserializeOwned,
960 {
961 self.broadcast(other, TCP.bincode(), nondet_membership)
962 }
963
964 /// Broadcasts elements of this stream at each source member to all members of a destination
965 /// cluster, using the configuration in `via` to set up the message transport.
966 ///
967 /// Each source member sends each of its stream elements to **every** member of the cluster
968 /// based on its latest membership information. Unlike [`Stream::demux`], which requires
969 /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
970 /// **only data elements** and sends each element to all cluster members.
971 ///
972 /// # Non-Determinism
973 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
974 /// to the current cluster members known _at that point in time_ at the source member. Depending
975 /// on when each source member is notified of membership changes, it will broadcast each element
976 /// to different members.
977 ///
978 /// # Example
979 /// ```rust
980 /// # #[cfg(feature = "deploy")] {
981 /// # use hydro_lang::prelude::*;
982 /// # use hydro_lang::location::MemberId;
983 /// # use futures::StreamExt;
984 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
985 /// # type Source = ();
986 /// # type Destination = ();
987 /// let source: Cluster<Source> = flow.cluster::<Source>();
988 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
989 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
990 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.bincode(), nondet!(/** assuming stable membership */));
991 /// # on_destination.entries().send(&p2, TCP.bincode()).entries()
992 /// // if there are 4 members in the desination, each receives one element from each source member
993 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
994 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
995 /// // - ...
996 /// # }, |mut stream| async move {
997 /// # let mut results = Vec::new();
998 /// # for w in 0..16 {
999 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1000 /// # }
1001 /// # results.sort();
1002 /// # assert_eq!(results, vec![
1003 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1004 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1005 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1006 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1007 /// # ]);
1008 /// # }));
1009 /// # }
1010 /// ```
1011 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1012 self,
1013 to: &Cluster<'a, L2>,
1014 via: N,
1015 nondet_membership: NonDet,
1016 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1017 where
1018 T: Clone + Serialize + DeserializeOwned,
1019 {
1020 let ids = track_membership(self.location.source_cluster_members(to));
1021 sliced! {
1022 let members_snapshot = use(ids, nondet_membership);
1023 let elements = use(self, nondet_membership);
1024
1025 let current_members = members_snapshot.filter(q!(|b| *b));
1026 elements.repeat_with_keys(current_members)
1027 }
1028 .demux(to, via)
1029 }
1030}
1031
1032impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
1033 Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
1034{
1035 #[deprecated = "use Stream::demux(..., TCP.bincode()) instead"]
1036 /// Sends elements of this stream at each source member to specific members of a destination
1037 /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1038 ///
1039 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1040 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1041 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1042 /// all members.
1043 ///
1044 /// Each cluster member sends its local stream elements, and they are collected at each
1045 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1046 ///
1047 /// # Example
1048 /// ```rust
1049 /// # #[cfg(feature = "deploy")] {
1050 /// # use hydro_lang::prelude::*;
1051 /// # use futures::StreamExt;
1052 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1053 /// # type Source = ();
1054 /// # type Destination = ();
1055 /// let source: Cluster<Source> = flow.cluster::<Source>();
1056 /// let to_send: Stream<_, Cluster<_>, _> = source
1057 /// .source_iter(q!(vec![0, 1, 2, 3]))
1058 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1059 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1060 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1061 /// # all_received.entries().send_bincode(&p2).entries()
1062 /// # }, |mut stream| async move {
1063 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1064 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1065 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1066 /// // - ...
1067 /// # let mut results = Vec::new();
1068 /// # for w in 0..16 {
1069 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1070 /// # }
1071 /// # results.sort();
1072 /// # assert_eq!(results, vec![
1073 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1074 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1075 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1076 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1077 /// # ]);
1078 /// # }));
1079 /// # }
1080 /// ```
1081 pub fn demux_bincode(
1082 self,
1083 other: &Cluster<'a, L2>,
1084 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1085 where
1086 T: Serialize + DeserializeOwned,
1087 {
1088 self.demux(other, TCP.bincode())
1089 }
1090
1091 /// Sends elements of this stream at each source member to specific members of a destination
1092 /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1093 /// message transport.
1094 ///
1095 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1096 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1097 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1098 /// all members.
1099 ///
1100 /// Each cluster member sends its local stream elements, and they are collected at each
1101 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1102 ///
1103 /// # Example
1104 /// ```rust
1105 /// # #[cfg(feature = "deploy")] {
1106 /// # use hydro_lang::prelude::*;
1107 /// # use futures::StreamExt;
1108 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1109 /// # type Source = ();
1110 /// # type Destination = ();
1111 /// let source: Cluster<Source> = flow.cluster::<Source>();
1112 /// let to_send: Stream<_, Cluster<_>, _> = source
1113 /// .source_iter(q!(vec![0, 1, 2, 3]))
1114 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1115 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1116 /// let all_received = to_send.demux(&destination, TCP.bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1117 /// # all_received.entries().send(&p2, TCP.bincode()).entries()
1118 /// # }, |mut stream| async move {
1119 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1120 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1121 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1122 /// // - ...
1123 /// # let mut results = Vec::new();
1124 /// # for w in 0..16 {
1125 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1126 /// # }
1127 /// # results.sort();
1128 /// # assert_eq!(results, vec![
1129 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1130 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1131 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1132 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1133 /// # ]);
1134 /// # }));
1135 /// # }
1136 /// ```
1137 pub fn demux<N: NetworkFor<T>>(
1138 self,
1139 to: &Cluster<'a, L2>,
1140 via: N,
1141 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1142 where
1143 T: Serialize + DeserializeOwned,
1144 {
1145 self.into_keyed().demux(to, via)
1146 }
1147}
1148
1149#[cfg(test)]
1150mod tests {
1151 #[cfg(feature = "sim")]
1152 use stageleft::q;
1153
1154 #[cfg(feature = "sim")]
1155 use crate::location::{Location, MemberId};
1156 #[cfg(feature = "sim")]
1157 use crate::networking::TCP;
1158 #[cfg(feature = "sim")]
1159 use crate::nondet::nondet;
1160 #[cfg(feature = "sim")]
1161 use crate::prelude::FlowBuilder;
1162
1163 #[cfg(feature = "sim")]
1164 #[test]
1165 fn sim_send_bincode_o2o() {
1166 use crate::networking::TCP;
1167
1168 let flow = FlowBuilder::new();
1169 let node = flow.process::<()>();
1170 let node2 = flow.process::<()>();
1171
1172 let (in_send, input) = node.sim_input();
1173
1174 let out_recv = input
1175 .send(&node2, TCP.bincode())
1176 .batch(&node2.tick(), nondet!(/** test */))
1177 .count()
1178 .all_ticks()
1179 .sim_output();
1180
1181 let instances = flow.sim().exhaustive(async || {
1182 in_send.send(());
1183 in_send.send(());
1184 in_send.send(());
1185
1186 let received = out_recv.collect::<Vec<_>>().await;
1187 assert!(received.into_iter().sum::<usize>() == 3);
1188 });
1189
1190 assert_eq!(instances, 4); // 2^{3 - 1}
1191 }
1192
1193 #[cfg(feature = "sim")]
1194 #[test]
1195 fn sim_send_bincode_m2o() {
1196 let flow = FlowBuilder::new();
1197 let cluster = flow.cluster::<()>();
1198 let node = flow.process::<()>();
1199
1200 let input = cluster.source_iter(q!(vec![1]));
1201
1202 let out_recv = input
1203 .send(&node, TCP.bincode())
1204 .entries()
1205 .batch(&node.tick(), nondet!(/** test */))
1206 .all_ticks()
1207 .sim_output();
1208
1209 let instances = flow
1210 .sim()
1211 .with_cluster_size(&cluster, 4)
1212 .exhaustive(async || {
1213 out_recv
1214 .assert_yields_only_unordered(vec![
1215 (MemberId::from_raw_id(0), 1),
1216 (MemberId::from_raw_id(1), 1),
1217 (MemberId::from_raw_id(2), 1),
1218 (MemberId::from_raw_id(3), 1),
1219 ])
1220 .await
1221 });
1222
1223 assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1224 }
1225
1226 #[cfg(feature = "sim")]
1227 #[test]
1228 fn sim_send_bincode_multiple_m2o() {
1229 let flow = FlowBuilder::new();
1230 let cluster1 = flow.cluster::<()>();
1231 let cluster2 = flow.cluster::<()>();
1232 let node = flow.process::<()>();
1233
1234 let out_recv_1 = cluster1
1235 .source_iter(q!(vec![1]))
1236 .send(&node, TCP.bincode())
1237 .entries()
1238 .sim_output();
1239
1240 let out_recv_2 = cluster2
1241 .source_iter(q!(vec![2]))
1242 .send(&node, TCP.bincode())
1243 .entries()
1244 .sim_output();
1245
1246 let instances = flow
1247 .sim()
1248 .with_cluster_size(&cluster1, 3)
1249 .with_cluster_size(&cluster2, 4)
1250 .exhaustive(async || {
1251 out_recv_1
1252 .assert_yields_only_unordered(vec![
1253 (MemberId::from_raw_id(0), 1),
1254 (MemberId::from_raw_id(1), 1),
1255 (MemberId::from_raw_id(2), 1),
1256 ])
1257 .await;
1258
1259 out_recv_2
1260 .assert_yields_only_unordered(vec![
1261 (MemberId::from_raw_id(0), 2),
1262 (MemberId::from_raw_id(1), 2),
1263 (MemberId::from_raw_id(2), 2),
1264 (MemberId::from_raw_id(3), 2),
1265 ])
1266 .await;
1267 });
1268
1269 assert_eq!(instances, 1);
1270 }
1271
1272 #[cfg(feature = "sim")]
1273 #[test]
1274 fn sim_send_bincode_o2m() {
1275 let flow = FlowBuilder::new();
1276 let cluster = flow.cluster::<()>();
1277 let node = flow.process::<()>();
1278
1279 let input = node.source_iter(q!(vec![
1280 (MemberId::from_raw_id(0), 123),
1281 (MemberId::from_raw_id(1), 456),
1282 ]));
1283
1284 let out_recv = input
1285 .demux(&cluster, TCP.bincode())
1286 .map(q!(|x| x + 1))
1287 .send(&node, TCP.bincode())
1288 .entries()
1289 .sim_output();
1290
1291 flow.sim()
1292 .with_cluster_size(&cluster, 4)
1293 .exhaustive(async || {
1294 out_recv
1295 .assert_yields_only_unordered(vec![
1296 (MemberId::from_raw_id(0), 124),
1297 (MemberId::from_raw_id(1), 457),
1298 ])
1299 .await
1300 });
1301 }
1302
1303 #[cfg(feature = "sim")]
1304 #[test]
1305 fn sim_broadcast_bincode_o2m() {
1306 let flow = FlowBuilder::new();
1307 let cluster = flow.cluster::<()>();
1308 let node = flow.process::<()>();
1309
1310 let input = node.source_iter(q!(vec![123, 456]));
1311
1312 let out_recv = input
1313 .broadcast(&cluster, TCP.bincode(), nondet!(/** test */))
1314 .map(q!(|x| x + 1))
1315 .send(&node, TCP.bincode())
1316 .entries()
1317 .sim_output();
1318
1319 let mut c_1_produced = false;
1320 let mut c_2_produced = false;
1321
1322 flow.sim()
1323 .with_cluster_size(&cluster, 2)
1324 .exhaustive(async || {
1325 let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1326
1327 // check that order is preserved
1328 if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1329 assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1330 c_1_produced = true;
1331 }
1332
1333 if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1334 assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1335 c_2_produced = true;
1336 }
1337 });
1338
1339 assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1340 }
1341
1342 #[cfg(feature = "sim")]
1343 #[test]
1344 fn sim_send_bincode_m2m() {
1345 let flow = FlowBuilder::new();
1346 let cluster = flow.cluster::<()>();
1347 let node = flow.process::<()>();
1348
1349 let input = node.source_iter(q!(vec![
1350 (MemberId::from_raw_id(0), 123),
1351 (MemberId::from_raw_id(1), 456),
1352 ]));
1353
1354 let out_recv = input
1355 .demux(&cluster, TCP.bincode())
1356 .map(q!(|x| x + 1))
1357 .flat_map_ordered(q!(|x| vec![
1358 (MemberId::from_raw_id(0), x),
1359 (MemberId::from_raw_id(1), x),
1360 ]))
1361 .demux(&cluster, TCP.bincode())
1362 .entries()
1363 .send(&node, TCP.bincode())
1364 .entries()
1365 .sim_output();
1366
1367 flow.sim()
1368 .with_cluster_size(&cluster, 4)
1369 .exhaustive(async || {
1370 out_recv
1371 .assert_yields_only_unordered(vec![
1372 (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1373 (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1374 (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1375 (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1376 ])
1377 .await
1378 });
1379 }
1380}