hydro_lang/live_collections/keyed_singleton.rs
1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::builder::{CycleId, FlowState};
18use crate::compile::ir::{
19 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, SharedNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24use crate::live_collections::stream::{Ordering, Retries};
25#[cfg(stageleft_runtime)]
26use crate::location::dynamic::{DynLocation, LocationId};
27use crate::location::tick::DeferTick;
28use crate::location::{Atomic, Location, Tick, check_matching_location};
29use crate::manual_expr::ManualExpr;
30use crate::nondet::{NonDet, nondet};
31use crate::properties::manual_proof;
32
33/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
34///
35/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
36/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
37/// indicates that entries may be added over time, but once an entry is added it will never be
38/// removed and its value will never change.
39pub trait KeyedSingletonBound {
40 /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
41 type UnderlyingBound: Boundedness;
42 /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
43 type ValueBound: Boundedness;
44
45 /// The type of the keyed singleton if the value for each key is immutable.
46 type WithBoundedValue: KeyedSingletonBound<
47 UnderlyingBound = Self::UnderlyingBound,
48 ValueBound = Bounded,
49 EraseMonotonic = Self::WithBoundedValue,
50 >;
51
52 /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`KeyedStream`] with [`Self`] boundedness.
53 type KeyedStreamToMonotone: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
54
55 /// The type of the keyed singleton if the value for each key is no longer monotonic.
56 type EraseMonotonic: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
57
58 /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
59 fn bound_kind() -> KeyedSingletonBoundKind;
60}
61
62impl KeyedSingletonBound for Unbounded {
63 type UnderlyingBound = Unbounded;
64 type ValueBound = Unbounded;
65 type WithBoundedValue = BoundedValue;
66 type KeyedStreamToMonotone = MonotonicValue;
67 type EraseMonotonic = Unbounded;
68
69 fn bound_kind() -> KeyedSingletonBoundKind {
70 KeyedSingletonBoundKind::Unbounded
71 }
72}
73
74impl KeyedSingletonBound for Bounded {
75 type UnderlyingBound = Bounded;
76 type ValueBound = Bounded;
77 type WithBoundedValue = Bounded;
78 type KeyedStreamToMonotone = Bounded;
79 type EraseMonotonic = Bounded;
80
81 fn bound_kind() -> KeyedSingletonBoundKind {
82 KeyedSingletonBoundKind::Bounded
83 }
84}
85
86/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
87/// its value is bounded and will never change, but new entries may appear asynchronously
88pub struct BoundedValue;
89
90impl KeyedSingletonBound for BoundedValue {
91 type UnderlyingBound = Unbounded;
92 type ValueBound = Bounded;
93 type WithBoundedValue = BoundedValue;
94 type KeyedStreamToMonotone = BoundedValue;
95 type EraseMonotonic = BoundedValue;
96
97 fn bound_kind() -> KeyedSingletonBoundKind {
98 KeyedSingletonBoundKind::BoundedValue
99 }
100}
101
102/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
103/// it will never be removed, and the corresponding value will only increase monotonically.
104pub struct MonotonicValue;
105
106impl KeyedSingletonBound for MonotonicValue {
107 type UnderlyingBound = Unbounded;
108 type ValueBound = Unbounded;
109 type WithBoundedValue = BoundedValue;
110 type KeyedStreamToMonotone = MonotonicValue;
111 type EraseMonotonic = Unbounded;
112
113 fn bound_kind() -> KeyedSingletonBoundKind {
114 KeyedSingletonBoundKind::MonotonicValue
115 }
116}
117
118/// Mapping from keys of type `K` to values of type `V`.
119///
120/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
121/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
122/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
123/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
124/// keys cannot be removed and the value for each key is immutable.
125///
126/// Type Parameters:
127/// - `K`: the type of the key for each entry
128/// - `V`: the type of the value for each entry
129/// - `Loc`: the [`Location`] where the keyed singleton is materialized
130/// - `Bound`: tracks whether the entries are:
131/// - [`Bounded`] (local and finite)
132/// - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
133/// - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
134pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
135 pub(crate) location: Loc,
136 pub(crate) ir_node: RefCell<HydroNode>,
137 pub(crate) flow_state: FlowState,
138
139 _phantom: PhantomData<(K, V, Loc, Bound)>,
140}
141
142impl<K, V, L, B: KeyedSingletonBound> Drop for KeyedSingleton<K, V, L, B> {
143 fn drop(&mut self) {
144 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
145 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
146 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
147 input: Box::new(ir_node),
148 op_metadata: HydroIrOpMetadata::new(),
149 });
150 }
151 }
152}
153
154impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
155 for KeyedSingleton<K, V, Loc, Bound>
156{
157 fn clone(&self) -> Self {
158 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
159 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
160 *self.ir_node.borrow_mut() = HydroNode::Tee {
161 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
162 metadata: self.location.new_node_metadata(Self::collection_kind()),
163 };
164 }
165
166 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
167 KeyedSingleton {
168 location: self.location.clone(),
169 flow_state: self.flow_state.clone(),
170 ir_node: HydroNode::Tee {
171 inner: SharedNode(inner.0.clone()),
172 metadata: metadata.clone(),
173 }
174 .into(),
175 _phantom: PhantomData,
176 }
177 } else {
178 unreachable!()
179 }
180 }
181}
182
183impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
184 for KeyedSingleton<K, V, L, B>
185where
186 L: Location<'a>,
187{
188 type Location = L;
189
190 fn create_source(cycle_id: CycleId, location: L) -> Self {
191 KeyedSingleton {
192 flow_state: location.flow_state().clone(),
193 location: location.clone(),
194 ir_node: RefCell::new(HydroNode::CycleSource {
195 cycle_id,
196 metadata: location.new_node_metadata(Self::collection_kind()),
197 }),
198 _phantom: PhantomData,
199 }
200 }
201}
202
203impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
204where
205 L: Location<'a>,
206{
207 type Location = Tick<L>;
208
209 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
210 KeyedSingleton::new(
211 location.clone(),
212 HydroNode::CycleSource {
213 cycle_id,
214 metadata: location.new_node_metadata(Self::collection_kind()),
215 },
216 )
217 }
218}
219
220impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
221where
222 L: Location<'a>,
223{
224 fn defer_tick(self) -> Self {
225 KeyedSingleton::defer_tick(self)
226 }
227}
228
229impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
230 for KeyedSingleton<K, V, L, B>
231where
232 L: Location<'a>,
233{
234 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
235 assert_eq!(
236 Location::id(&self.location),
237 expected_location,
238 "locations do not match"
239 );
240 self.location
241 .flow_state()
242 .borrow_mut()
243 .push_root(HydroRoot::CycleSink {
244 cycle_id,
245 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
246 op_metadata: HydroIrOpMetadata::new(),
247 });
248 }
249}
250
251impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
252where
253 L: Location<'a>,
254{
255 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
256 assert_eq!(
257 Location::id(&self.location),
258 expected_location,
259 "locations do not match"
260 );
261 self.location
262 .flow_state()
263 .borrow_mut()
264 .push_root(HydroRoot::CycleSink {
265 cycle_id,
266 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
267 op_metadata: HydroIrOpMetadata::new(),
268 });
269 }
270}
271
272impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
273 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
274 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
275 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
276
277 let flow_state = location.flow_state().clone();
278 KeyedSingleton {
279 location,
280 flow_state,
281 ir_node: RefCell::new(ir_node),
282 _phantom: PhantomData,
283 }
284 }
285
286 /// Returns the [`Location`] where this keyed singleton is being materialized.
287 pub fn location(&self) -> &L {
288 &self.location
289 }
290
291 /// Weakens the consistency of this live collection to not guarantee any consistency across
292 /// cluster members (if this collection is on a cluster).
293 pub fn weaken_consistency(self) -> KeyedSingleton<K, V, L::NoConsistency, B>
294 where
295 L: Location<'a>,
296 {
297 if L::consistency()
298 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
299 {
300 // already no consistency
301 KeyedSingleton::new(
302 self.location.drop_consistency(),
303 self.ir_node.replace(HydroNode::Placeholder),
304 )
305 } else {
306 KeyedSingleton::new(
307 self.location.drop_consistency(),
308 HydroNode::Cast {
309 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
310 metadata: self
311 .location
312 .drop_consistency()
313 .new_node_metadata(
314 KeyedSingleton::<K, V, L::NoConsistency, B>::collection_kind(),
315 ),
316 },
317 )
318 }
319 }
320
321 /// Casts this live collection to have the consistency guarantees specified in the given
322 /// location type parameter. The developer must ensure that the strengthened consistency
323 /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
324 pub fn assert_has_consistency_of<L2: Location<'a, NoConsistency = L::NoConsistency>>(
325 self,
326 _proof: impl crate::properties::ConsistencyProof,
327 ) -> KeyedSingleton<K, V, L2, B>
328 where
329 L: Location<'a>,
330 {
331 if L::consistency() == L2::consistency() {
332 // already consistent
333 KeyedSingleton::new(
334 self.location.with_consistency_of(),
335 self.ir_node.replace(HydroNode::Placeholder),
336 )
337 } else {
338 KeyedSingleton::new(
339 self.location.with_consistency_of(),
340 HydroNode::AssertIsConsistent {
341 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
342 metadata: self
343 .location
344 .clone()
345 .with_consistency_of::<L2>()
346 .new_node_metadata(KeyedSingleton::<K, V, L2, B>::collection_kind()),
347 },
348 )
349 }
350 }
351}
352
353#[cfg(stageleft_runtime)]
354fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
355 me: KeyedSingleton<K, V, L, Bounded>,
356) -> Singleton<usize, L, Bounded> {
357 me.entries().count()
358}
359
360#[cfg(stageleft_runtime)]
361fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
362 me: KeyedSingleton<K, V, L, Bounded>,
363) -> Singleton<HashMap<K, V>, L, Bounded>
364where
365 K: Eq + Hash,
366{
367 me.entries()
368 .assume_ordering_trusted(nondet!(
369 /// There is only one element associated with each key. The closure technically
370 /// isn't commutative in the case where both passed entries have the same key
371 /// but different values.
372 ///
373 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
374 /// the key is never already present in the map.
375 ))
376 .fold(
377 q!(|| HashMap::new()),
378 q!(|map, (k, v)| {
379 map.insert(k, v);
380 }),
381 )
382}
383
384impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
385 pub(crate) fn collection_kind() -> CollectionKind {
386 CollectionKind::KeyedSingleton {
387 bound: B::bound_kind(),
388 key_type: stageleft::quote_type::<K>().into(),
389 value_type: stageleft::quote_type::<V>().into(),
390 }
391 }
392
393 /// Transforms each value by invoking `f` on each element, with keys staying the same
394 /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
395 ///
396 /// If you do not want to modify the stream and instead only want to view
397 /// each item use [`KeyedSingleton::inspect`] instead.
398 ///
399 /// # Example
400 /// ```rust
401 /// # #[cfg(feature = "deploy")] {
402 /// # use hydro_lang::prelude::*;
403 /// # use futures::StreamExt;
404 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
405 /// let keyed_singleton = // { 1: 2, 2: 4 }
406 /// # process
407 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
408 /// # .into_keyed()
409 /// # .first();
410 /// keyed_singleton.map(q!(|v| v + 1))
411 /// # .entries()
412 /// # }, |mut stream| async move {
413 /// // { 1: 3, 2: 5 }
414 /// # let mut results = Vec::new();
415 /// # for _ in 0..2 {
416 /// # results.push(stream.next().await.unwrap());
417 /// # }
418 /// # results.sort();
419 /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
420 /// # }));
421 /// # }
422 /// ```
423 pub fn map<U, F>(
424 self,
425 f: impl IntoQuotedMut<'a, F, L> + Copy,
426 ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
427 where
428 F: Fn(V) -> U + 'a,
429 {
430 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
431 let map_f = q!({
432 let orig = f;
433 move |(k, v)| (k, orig(v))
434 })
435 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
436 .into();
437
438 KeyedSingleton::new(
439 self.location.clone(),
440 HydroNode::Map {
441 f: map_f,
442 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
443 metadata: self.location.new_node_metadata(KeyedSingleton::<
444 K,
445 U,
446 L,
447 B::EraseMonotonic,
448 >::collection_kind()),
449 },
450 )
451 }
452
453 /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
454 /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
455 ///
456 /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
457 /// the new value `U`. The key remains unchanged in the output.
458 ///
459 /// # Example
460 /// ```rust
461 /// # #[cfg(feature = "deploy")] {
462 /// # use hydro_lang::prelude::*;
463 /// # use futures::StreamExt;
464 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
465 /// let keyed_singleton = // { 1: 2, 2: 4 }
466 /// # process
467 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
468 /// # .into_keyed()
469 /// # .first();
470 /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
471 /// # .entries()
472 /// # }, |mut stream| async move {
473 /// // { 1: 3, 2: 6 }
474 /// # let mut results = Vec::new();
475 /// # for _ in 0..2 {
476 /// # results.push(stream.next().await.unwrap());
477 /// # }
478 /// # results.sort();
479 /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
480 /// # }));
481 /// # }
482 /// ```
483 pub fn map_with_key<U, F>(
484 self,
485 f: impl IntoQuotedMut<'a, F, L> + Copy,
486 ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
487 where
488 F: Fn((K, V)) -> U + 'a,
489 K: Clone,
490 {
491 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
492 let map_f = q!({
493 let orig = f;
494 move |(k, v)| {
495 let out = orig((Clone::clone(&k), v));
496 (k, out)
497 }
498 })
499 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
500 .into();
501
502 KeyedSingleton::new(
503 self.location.clone(),
504 HydroNode::Map {
505 f: map_f,
506 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
507 metadata: self.location.new_node_metadata(KeyedSingleton::<
508 K,
509 U,
510 L,
511 B::EraseMonotonic,
512 >::collection_kind()),
513 },
514 )
515 }
516
517 /// Gets the number of keys in the keyed singleton.
518 ///
519 /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
520 /// since keys may be added / removed over time. When the set of keys changes, the count will
521 /// be asynchronously updated.
522 ///
523 /// # Example
524 /// ```rust
525 /// # #[cfg(feature = "deploy")] {
526 /// # use hydro_lang::prelude::*;
527 /// # use futures::StreamExt;
528 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
529 /// # let tick = process.tick();
530 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
531 /// # process
532 /// # .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
533 /// # .into_keyed()
534 /// # .batch(&tick, nondet!(/** test */))
535 /// # .first();
536 /// keyed_singleton.key_count()
537 /// # .all_ticks()
538 /// # }, |mut stream| async move {
539 /// // 3
540 /// # assert_eq!(stream.next().await.unwrap(), 3);
541 /// # }));
542 /// # }
543 /// ```
544 pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
545 if B::ValueBound::BOUNDED {
546 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
547 location: self.location.clone(),
548 flow_state: self.flow_state.clone(),
549 ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
550 _phantom: PhantomData,
551 };
552
553 me.entries().count().ignore_monotonic()
554 } else if L::is_top_level()
555 && let Some(tick) = self.location.try_tick()
556 && B::bound_kind() == KeyedSingletonBoundKind::Unbounded
557 {
558 let me: KeyedSingleton<K, V, L, Unbounded> = KeyedSingleton::new(
559 self.location.clone(),
560 self.ir_node.replace(HydroNode::Placeholder),
561 );
562
563 let out =
564 key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
565 .latest();
566 Singleton::new(
567 self.location.clone(),
568 out.ir_node.replace(HydroNode::Placeholder),
569 )
570 } else {
571 panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
572 }
573 }
574
575 /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
576 ///
577 /// As the values for each key are updated asynchronously, the `HashMap` will be updated
578 /// asynchronously as well.
579 ///
580 /// # Example
581 /// ```rust
582 /// # #[cfg(feature = "deploy")] {
583 /// # use hydro_lang::prelude::*;
584 /// # use futures::StreamExt;
585 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
586 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
587 /// # process
588 /// # .source_iter(q!(vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())]))
589 /// # .into_keyed()
590 /// # .batch(&process.tick(), nondet!(/** test */))
591 /// # .first();
592 /// keyed_singleton.into_singleton()
593 /// # .all_ticks()
594 /// # }, |mut stream| async move {
595 /// // { 1: "a", 2: "b", 3: "c" }
596 /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())].into_iter().collect());
597 /// # }));
598 /// # }
599 /// ```
600 pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
601 where
602 K: Eq + Hash,
603 {
604 if B::ValueBound::BOUNDED {
605 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
606 location: self.location.clone(),
607 flow_state: self.flow_state.clone(),
608 ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
609 _phantom: PhantomData,
610 };
611
612 me.entries()
613 .assume_ordering_trusted(nondet!(
614 /// There is only one element associated with each key. The closure technically
615 /// isn't commutative in the case where both passed entries have the same key
616 /// but different values.
617 ///
618 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
619 /// the key is never already present in the map.
620 ))
621 .fold(
622 q!(|| HashMap::new()),
623 q!(|map, (k, v)| {
624 // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
625 map.insert(k, v);
626 }),
627 )
628 } else if L::is_top_level()
629 && let Some(tick) = self.location.try_tick()
630 && B::bound_kind() == KeyedSingletonBoundKind::Unbounded
631 {
632 let me: KeyedSingleton<K, V, L, Unbounded> = KeyedSingleton::new(
633 self.location.clone(),
634 self.ir_node.replace(HydroNode::Placeholder),
635 );
636
637 let out = into_singleton_inside_tick(
638 me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
639 )
640 .latest();
641 Singleton::new(
642 self.location.clone(),
643 out.ir_node.replace(HydroNode::Placeholder),
644 )
645 } else {
646 panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
647 }
648 }
649
650 /// An operator which allows you to "name" a `HydroNode`.
651 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
652 pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
653 {
654 let mut node = self.ir_node.borrow_mut();
655 let metadata = node.metadata_mut();
656 metadata.tag = Some(name.to_owned());
657 }
658 self
659 }
660
661 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
662 /// implies that `B == Bounded`.
663 pub fn make_bounded(self) -> KeyedSingleton<K, V, L, Bounded>
664 where
665 B: IsBounded,
666 {
667 KeyedSingleton::new(
668 self.location.clone(),
669 self.ir_node.replace(HydroNode::Placeholder),
670 )
671 }
672
673 /// Gets the value associated with a specific key from the keyed singleton.
674 /// Returns `None` if the key is `None` or there is no associated value.
675 ///
676 /// # Example
677 /// ```rust
678 /// # #[cfg(feature = "deploy")] {
679 /// # use hydro_lang::prelude::*;
680 /// # use futures::StreamExt;
681 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
682 /// let tick = process.tick();
683 /// let keyed_data = process
684 /// .source_iter(q!(vec![(1, 2), (2, 3)]))
685 /// .into_keyed()
686 /// .batch(&tick, nondet!(/** test */))
687 /// .first();
688 /// let key = tick.singleton(q!(1));
689 /// keyed_data.get(key).all_ticks()
690 /// # }, |mut stream| async move {
691 /// // 2
692 /// # assert_eq!(stream.next().await.unwrap(), 2);
693 /// # }));
694 /// # }
695 /// ```
696 pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Optional<V, L, Bounded>
697 where
698 B: IsBounded,
699 K: Hash + Eq + Clone,
700 V: Clone,
701 {
702 self.make_bounded()
703 .into_keyed_stream()
704 .get(key)
705 .cast_at_most_one_element()
706 }
707
708 /// Emit a keyed stream containing keys shared between the keyed singleton and the
709 /// keyed stream, where each value in the output keyed stream is a tuple of
710 /// (the keyed singleton's value, the keyed stream's value).
711 ///
712 /// # Example
713 /// ```rust
714 /// # #[cfg(feature = "deploy")] {
715 /// # use hydro_lang::prelude::*;
716 /// # use futures::StreamExt;
717 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
718 /// let tick = process.tick();
719 /// let keyed_data = process
720 /// .source_iter(q!(vec![(1, 10), (2, 20)]))
721 /// .into_keyed()
722 /// .batch(&tick, nondet!(/** test */))
723 /// .first();
724 /// let other_data = process
725 /// .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
726 /// .into_keyed()
727 /// .batch(&tick, nondet!(/** test */));
728 /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
729 /// # }, |mut stream| async move {
730 /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
731 /// # let mut results = vec![];
732 /// # for _ in 0..3 {
733 /// # results.push(stream.next().await.unwrap());
734 /// # }
735 /// # results.sort();
736 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
737 /// # }));
738 /// # }
739 /// ```
740 pub fn join_keyed_stream<O2: Ordering, R2: Retries, V2, B2: Boundedness>(
741 self,
742 other: KeyedStream<K, V2, L, B2, O2, R2>,
743 ) -> KeyedStream<K, (V, V2), L, B2, O2, R2>
744 where
745 B: IsBounded,
746 K: Eq + Hash + Clone,
747 V: Clone,
748 V2: Clone,
749 {
750 // TODO(shadaj): if DFIR guarantees that joining unbounded keyed stream x bounded keyed stream
751 // always produces deterministic order per key (nested loop join), this could just use
752 // `join_keyed_stream` without constructing IRs manually
753 KeyedStream::new(
754 self.location.clone(),
755 HydroNode::Join {
756 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
757 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
758 metadata: self
759 .location
760 .new_node_metadata(KeyedStream::<K, (V, V2), L, B2, O2, R2>::collection_kind()),
761 },
762 )
763 }
764
765 /// Emit a keyed singleton containing all keys shared between two keyed singletons,
766 /// where each value in the output keyed singleton is a tuple of
767 /// (self.value, other.value).
768 ///
769 /// # Example
770 /// ```rust
771 /// # #[cfg(feature = "deploy")] {
772 /// # use hydro_lang::prelude::*;
773 /// # use futures::StreamExt;
774 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
775 /// # let tick = process.tick();
776 /// let requests = // { 1: 10, 2: 20, 3: 30 }
777 /// # process
778 /// # .source_iter(q!(vec![(1, 10), (2, 20), (3, 30)]))
779 /// # .into_keyed()
780 /// # .batch(&tick, nondet!(/** test */))
781 /// # .first();
782 /// let other = // { 1: 100, 2: 200, 4: 400 }
783 /// # process
784 /// # .source_iter(q!(vec![(1, 100), (2, 200), (4, 400)]))
785 /// # .into_keyed()
786 /// # .batch(&tick, nondet!(/** test */))
787 /// # .first();
788 /// requests.join_keyed_singleton(other)
789 /// # .entries().all_ticks()
790 /// # }, |mut stream| async move {
791 /// // { 1: (10, 100), 2: (20, 200) }
792 /// # let mut results = vec![];
793 /// # for _ in 0..2 {
794 /// # results.push(stream.next().await.unwrap());
795 /// # }
796 /// # results.sort();
797 /// # assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
798 /// # }));
799 /// # }
800 /// ```
801 pub fn join_keyed_singleton<V2: Clone>(
802 self,
803 other: KeyedSingleton<K, V2, L, Bounded>,
804 ) -> KeyedSingleton<K, (V, V2), L, Bounded>
805 where
806 B: IsBounded,
807 K: Eq + Hash + Clone,
808 V: Clone,
809 {
810 let result_stream = self
811 .make_bounded()
812 .entries()
813 .join(other.entries())
814 .into_keyed();
815
816 // The cast is guaranteed to succeed, since each key (in both `self` and `other`) has at most one value.
817 result_stream.cast_at_most_one_entry_per_key()
818 }
819
820 /// For each value in `self`, find the matching key in `lookup`.
821 /// The output is a keyed singleton with the key from `self`, and a value
822 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
823 /// If the key is not present in `lookup`, the option will be [`None`].
824 ///
825 /// # Example
826 /// ```rust
827 /// # #[cfg(feature = "deploy")] {
828 /// # use hydro_lang::prelude::*;
829 /// # use futures::StreamExt;
830 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
831 /// # let tick = process.tick();
832 /// let requests = // { 1: 10, 2: 20 }
833 /// # process
834 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
835 /// # .into_keyed()
836 /// # .batch(&tick, nondet!(/** test */))
837 /// # .first();
838 /// let other_data = // { 10: 100, 11: 110 }
839 /// # process
840 /// # .source_iter(q!(vec![(10, 100), (11, 110)]))
841 /// # .into_keyed()
842 /// # .batch(&tick, nondet!(/** test */))
843 /// # .first();
844 /// requests.lookup_keyed_singleton(other_data)
845 /// # .entries().all_ticks()
846 /// # }, |mut stream| async move {
847 /// // { 1: (10, Some(100)), 2: (20, None) }
848 /// # let mut results = vec![];
849 /// # for _ in 0..2 {
850 /// # results.push(stream.next().await.unwrap());
851 /// # }
852 /// # results.sort();
853 /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
854 /// # }));
855 /// # }
856 /// ```
857 pub fn lookup_keyed_singleton<V2>(
858 self,
859 lookup: KeyedSingleton<V, V2, L, Bounded>,
860 ) -> KeyedSingleton<K, (V, Option<V2>), L, Bounded>
861 where
862 B: IsBounded,
863 K: Eq + Hash + Clone,
864 V: Eq + Hash + Clone,
865 V2: Clone,
866 {
867 let result_stream = self
868 .make_bounded()
869 .into_keyed_stream()
870 .lookup_keyed_stream(lookup.into_keyed_stream());
871
872 // The cast is guaranteed to succeed since both lookup and self contain at most 1 value per key
873 result_stream.cast_at_most_one_entry_per_key()
874 }
875
876 /// For each value in `self`, find the matching key in `lookup`.
877 /// The output is a keyed stream with the key from `self`, and a value
878 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
879 /// If the key is not present in `lookup`, the option will be [`None`].
880 ///
881 /// # Example
882 /// ```rust
883 /// # #[cfg(feature = "deploy")] {
884 /// # use hydro_lang::prelude::*;
885 /// # use futures::StreamExt;
886 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
887 /// # let tick = process.tick();
888 /// let requests = // { 1: 10, 2: 20 }
889 /// # process
890 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
891 /// # .into_keyed()
892 /// # .batch(&tick, nondet!(/** test */))
893 /// # .first();
894 /// let other_data = // { 10: 100, 10: 110 }
895 /// # process
896 /// # .source_iter(q!(vec![(10, 100), (10, 110)]))
897 /// # .into_keyed()
898 /// # .batch(&tick, nondet!(/** test */));
899 /// requests.lookup_keyed_stream(other_data)
900 /// # .entries().all_ticks()
901 /// # }, |mut stream| async move {
902 /// // { 1: [(10, Some(100)), (10, Some(110))], 2: (20, None) }
903 /// # let mut results = vec![];
904 /// # for _ in 0..3 {
905 /// # results.push(stream.next().await.unwrap());
906 /// # }
907 /// # results.sort();
908 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(110))), (2, (20, None))]);
909 /// # }));
910 /// # }
911 /// ```
912 pub fn lookup_keyed_stream<V2, O: Ordering, R: Retries>(
913 self,
914 lookup: KeyedStream<V, V2, L, Bounded, O, R>,
915 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
916 where
917 B: IsBounded,
918 K: Eq + Hash + Clone,
919 V: Eq + Hash + Clone,
920 V2: Clone,
921 {
922 self.make_bounded()
923 .entries()
924 .weaken_retries::<R>() // TODO: Once weaken_retries() is implemented for KeyedSingleton, remove entries() and into_keyed()
925 .into_keyed()
926 .lookup_keyed_stream(lookup)
927 }
928}
929
930impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
931 KeyedSingleton<K, V, L, B>
932{
933 /// Flattens the keyed singleton into an unordered stream of key-value pairs.
934 ///
935 /// The value for each key must be bounded, otherwise the resulting stream elements would be
936 /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
937 /// into the output.
938 ///
939 /// # Example
940 /// ```rust
941 /// # #[cfg(feature = "deploy")] {
942 /// # use hydro_lang::prelude::*;
943 /// # use futures::StreamExt;
944 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
945 /// let keyed_singleton = // { 1: 2, 2: 4 }
946 /// # process
947 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
948 /// # .into_keyed()
949 /// # .first();
950 /// keyed_singleton.entries()
951 /// # }, |mut stream| async move {
952 /// // (1, 2), (2, 4) in any order
953 /// # let mut results = Vec::new();
954 /// # for _ in 0..2 {
955 /// # results.push(stream.next().await.unwrap());
956 /// # }
957 /// # results.sort();
958 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
959 /// # }));
960 /// # }
961 /// ```
962 pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
963 self.into_keyed_stream().entries()
964 }
965
966 /// Flattens the keyed singleton into an unordered stream of just the values.
967 ///
968 /// The value for each key must be bounded, otherwise the resulting stream elements would be
969 /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
970 /// into the output.
971 ///
972 /// # Example
973 /// ```rust
974 /// # #[cfg(feature = "deploy")] {
975 /// # use hydro_lang::prelude::*;
976 /// # use futures::StreamExt;
977 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
978 /// let keyed_singleton = // { 1: 2, 2: 4 }
979 /// # process
980 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
981 /// # .into_keyed()
982 /// # .first();
983 /// keyed_singleton.values()
984 /// # }, |mut stream| async move {
985 /// // 2, 4 in any order
986 /// # let mut results = Vec::new();
987 /// # for _ in 0..2 {
988 /// # results.push(stream.next().await.unwrap());
989 /// # }
990 /// # results.sort();
991 /// # assert_eq!(results, vec![2, 4]);
992 /// # }));
993 /// # }
994 /// ```
995 pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
996 let map_f = q!(|(_, v)| v)
997 .splice_fn1_ctx::<(K, V), V>(&self.location)
998 .into();
999
1000 Stream::new(
1001 self.location.clone(),
1002 HydroNode::Map {
1003 f: map_f,
1004 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1005 metadata: self.location.new_node_metadata(Stream::<
1006 V,
1007 L,
1008 B::UnderlyingBound,
1009 NoOrder,
1010 ExactlyOnce,
1011 >::collection_kind()),
1012 },
1013 )
1014 }
1015
1016 /// Flattens the keyed singleton into an unordered stream of just the keys.
1017 ///
1018 /// The value for each key must be bounded, otherwise the removal of keys would result in
1019 /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
1020 /// into the output.
1021 ///
1022 /// # Example
1023 /// ```rust
1024 /// # #[cfg(feature = "deploy")] {
1025 /// # use hydro_lang::prelude::*;
1026 /// # use futures::StreamExt;
1027 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1028 /// let keyed_singleton = // { 1: 2, 2: 4 }
1029 /// # process
1030 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1031 /// # .into_keyed()
1032 /// # .first();
1033 /// keyed_singleton.keys()
1034 /// # }, |mut stream| async move {
1035 /// // 1, 2 in any order
1036 /// # let mut results = Vec::new();
1037 /// # for _ in 0..2 {
1038 /// # results.push(stream.next().await.unwrap());
1039 /// # }
1040 /// # results.sort();
1041 /// # assert_eq!(results, vec![1, 2]);
1042 /// # }));
1043 /// # }
1044 /// ```
1045 pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1046 self.entries().map(q!(|(k, _)| k))
1047 }
1048
1049 /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
1050 /// entries whose keys are not in the provided stream.
1051 ///
1052 /// # Example
1053 /// ```rust
1054 /// # #[cfg(feature = "deploy")] {
1055 /// # use hydro_lang::prelude::*;
1056 /// # use futures::StreamExt;
1057 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1058 /// let tick = process.tick();
1059 /// let keyed_singleton = // { 1: 2, 2: 4 }
1060 /// # process
1061 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1062 /// # .into_keyed()
1063 /// # .first()
1064 /// # .batch(&tick, nondet!(/** test */));
1065 /// let keys_to_remove = process
1066 /// .source_iter(q!(vec![1]))
1067 /// .batch(&tick, nondet!(/** test */));
1068 /// keyed_singleton.filter_key_not_in(keys_to_remove)
1069 /// # .entries().all_ticks()
1070 /// # }, |mut stream| async move {
1071 /// // { 2: 4 }
1072 /// # for w in vec![(2, 4)] {
1073 /// # assert_eq!(stream.next().await.unwrap(), w);
1074 /// # }
1075 /// # }));
1076 /// # }
1077 /// ```
1078 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1079 self,
1080 other: Stream<K, L, Bounded, O2, R2>,
1081 ) -> Self
1082 where
1083 K: Hash + Eq,
1084 {
1085 check_matching_location(&self.location, &other.location);
1086
1087 KeyedSingleton::new(
1088 self.location.clone(),
1089 HydroNode::AntiJoin {
1090 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1091 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1092 metadata: self.location.new_node_metadata(Self::collection_kind()),
1093 },
1094 )
1095 }
1096
1097 /// An operator which allows you to "inspect" each value of a keyed singleton without
1098 /// modifying it. The closure `f` is called on a reference to each value. This is
1099 /// mainly useful for debugging, and should not be used to generate side-effects.
1100 ///
1101 /// # Example
1102 /// ```rust
1103 /// # #[cfg(feature = "deploy")] {
1104 /// # use hydro_lang::prelude::*;
1105 /// # use futures::StreamExt;
1106 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1107 /// let keyed_singleton = // { 1: 2, 2: 4 }
1108 /// # process
1109 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1110 /// # .into_keyed()
1111 /// # .first();
1112 /// keyed_singleton
1113 /// .inspect(q!(|v| println!("{}", v)))
1114 /// # .entries()
1115 /// # }, |mut stream| async move {
1116 /// // { 1: 2, 2: 4 }
1117 /// # for w in vec![(1, 2), (2, 4)] {
1118 /// # assert_eq!(stream.next().await.unwrap(), w);
1119 /// # }
1120 /// # }));
1121 /// # }
1122 /// ```
1123 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1124 where
1125 F: Fn(&V) + 'a,
1126 {
1127 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1128 let inspect_f = q!({
1129 let orig = f;
1130 move |t: &(_, _)| orig(&t.1)
1131 })
1132 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1133 .into();
1134
1135 KeyedSingleton::new(
1136 self.location.clone(),
1137 HydroNode::Inspect {
1138 f: inspect_f,
1139 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1140 metadata: self.location.new_node_metadata(Self::collection_kind()),
1141 },
1142 )
1143 }
1144
1145 /// An operator which allows you to "inspect" each entry of a keyed singleton without
1146 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1147 /// mainly useful for debugging, and should not be used to generate side-effects.
1148 ///
1149 /// # Example
1150 /// ```rust
1151 /// # #[cfg(feature = "deploy")] {
1152 /// # use hydro_lang::prelude::*;
1153 /// # use futures::StreamExt;
1154 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1155 /// let keyed_singleton = // { 1: 2, 2: 4 }
1156 /// # process
1157 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1158 /// # .into_keyed()
1159 /// # .first();
1160 /// keyed_singleton
1161 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1162 /// # .entries()
1163 /// # }, |mut stream| async move {
1164 /// // { 1: 2, 2: 4 }
1165 /// # for w in vec![(1, 2), (2, 4)] {
1166 /// # assert_eq!(stream.next().await.unwrap(), w);
1167 /// # }
1168 /// # }));
1169 /// # }
1170 /// ```
1171 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1172 where
1173 F: Fn(&(K, V)) + 'a,
1174 {
1175 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1176
1177 KeyedSingleton::new(
1178 self.location.clone(),
1179 HydroNode::Inspect {
1180 f: inspect_f,
1181 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1182 metadata: self.location.new_node_metadata(Self::collection_kind()),
1183 },
1184 )
1185 }
1186
1187 /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
1188 ///
1189 /// Because this method requires values to be bounded, the output [`Optional`] will only be
1190 /// asynchronously updated if a new key is added that is higher than the previous max key.
1191 ///
1192 /// # Example
1193 /// ```rust
1194 /// # #[cfg(feature = "deploy")] {
1195 /// # use hydro_lang::prelude::*;
1196 /// # use futures::StreamExt;
1197 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1198 /// let tick = process.tick();
1199 /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
1200 /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 123), (2, 456), (0, 789)])))
1201 /// # .into_keyed()
1202 /// # .first();
1203 /// keyed_singleton.get_max_key()
1204 /// # .sample_eager(nondet!(/** test */))
1205 /// # }, |mut stream| async move {
1206 /// // (2, 456)
1207 /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
1208 /// # }));
1209 /// # }
1210 /// ```
1211 pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
1212 where
1213 K: Ord,
1214 {
1215 self.entries()
1216 .assume_ordering_trusted(nondet!(
1217 /// There is only one element associated with each key, and the keys are totallly
1218 /// ordered so we will produce a deterministic value. The closure technically
1219 /// isn't commutative in the case where both passed entries have the same key
1220 /// but different values.
1221 ///
1222 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
1223 /// the two inputs do not have the same key.
1224 ))
1225 .reduce(q!(
1226 move |curr, new| {
1227 if new.0 > curr.0 {
1228 *curr = new;
1229 }
1230 },
1231 idempotent = manual_proof!(/** repeated elements are ignored */)
1232 ))
1233 }
1234
1235 /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
1236 /// element, the value.
1237 ///
1238 /// This is the equivalent of [`Singleton::into_stream`] but keyed.
1239 ///
1240 /// # Example
1241 /// ```rust
1242 /// # #[cfg(feature = "deploy")] {
1243 /// # use hydro_lang::prelude::*;
1244 /// # use futures::StreamExt;
1245 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1246 /// let keyed_singleton = // { 1: 2, 2: 4 }
1247 /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 2), (2, 4)])))
1248 /// # .into_keyed()
1249 /// # .first();
1250 /// keyed_singleton
1251 /// .clone()
1252 /// .into_keyed_stream()
1253 /// .merge_unordered(
1254 /// keyed_singleton.into_keyed_stream()
1255 /// )
1256 /// # .entries()
1257 /// # }, |mut stream| async move {
1258 /// /// // { 1: [2, 2], 2: [4, 4] }
1259 /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
1260 /// # assert_eq!(stream.next().await.unwrap(), w);
1261 /// # }
1262 /// # }));
1263 /// # }
1264 /// ```
1265 pub fn into_keyed_stream(
1266 self,
1267 ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
1268 KeyedStream::new(
1269 self.location.clone(),
1270 HydroNode::Cast {
1271 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1272 metadata: self.location.new_node_metadata(KeyedStream::<
1273 K,
1274 V,
1275 L,
1276 B::UnderlyingBound,
1277 TotalOrder,
1278 ExactlyOnce,
1279 >::collection_kind()),
1280 },
1281 )
1282 }
1283}
1284
1285impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1286where
1287 L: Location<'a>,
1288{
1289 /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1290 /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1291 ///
1292 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1293 /// processed before an acknowledgement is emitted.
1294 pub fn atomic(self) -> KeyedSingleton<K, V, Atomic<L>, B> {
1295 let id = self.location.flow_state().borrow_mut().next_clock_id();
1296 let out_location = Atomic {
1297 tick: Tick {
1298 id,
1299 l: self.location.clone(),
1300 },
1301 };
1302 KeyedSingleton::new(
1303 out_location.clone(),
1304 HydroNode::BeginAtomic {
1305 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1306 metadata: out_location
1307 .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1308 },
1309 )
1310 }
1311}
1312
1313impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1314where
1315 L: Location<'a>,
1316{
1317 /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1318 /// See [`KeyedSingleton::atomic`] for more details.
1319 pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1320 KeyedSingleton::new(
1321 self.location.tick.l.clone(),
1322 HydroNode::EndAtomic {
1323 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1324 metadata: self
1325 .location
1326 .tick
1327 .l
1328 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1329 },
1330 )
1331 }
1332}
1333
1334impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1335 /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1336 /// tick `T` always has the entries of `self` at tick `T - 1`.
1337 ///
1338 /// At tick `0`, the output has no entries, since there is no previous tick.
1339 ///
1340 /// This operator enables stateful iterative processing with ticks, by sending data from one
1341 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1342 ///
1343 /// # Example
1344 /// ```rust
1345 /// # #[cfg(feature = "deploy")] {
1346 /// # use hydro_lang::prelude::*;
1347 /// # use futures::StreamExt;
1348 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1349 /// let tick = process.tick();
1350 /// # // ticks are lazy by default, forces the second tick to run
1351 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1352 /// # let batch_first_tick = process
1353 /// # .source_iter(q!(vec![(1, 2), (2, 3)]))
1354 /// # .batch(&tick, nondet!(/** test */))
1355 /// # .into_keyed();
1356 /// # let batch_second_tick = process
1357 /// # .source_iter(q!(vec![(2, 4), (3, 5)]))
1358 /// # .batch(&tick, nondet!(/** test */))
1359 /// # .into_keyed()
1360 /// # .defer_tick(); // appears on the second tick
1361 /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1362 /// # batch_first_tick.chain(batch_second_tick).first();
1363 /// input_batch.clone().filter_key_not_in(
1364 /// input_batch.defer_tick().keys() // keys present in the previous tick
1365 /// )
1366 /// # .entries().all_ticks()
1367 /// # }, |mut stream| async move {
1368 /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1369 /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1370 /// # assert_eq!(stream.next().await.unwrap(), w);
1371 /// # }
1372 /// # }));
1373 /// # }
1374 /// ```
1375 pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1376 KeyedSingleton::new(
1377 self.location.clone(),
1378 HydroNode::DeferTick {
1379 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1380 metadata: self
1381 .location
1382 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1383 },
1384 )
1385 }
1386}
1387
1388impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1389where
1390 L: Location<'a>,
1391{
1392 /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1393 /// point in time.
1394 ///
1395 /// # Non-Determinism
1396 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1397 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1398 pub fn snapshot<L2: Location<'a, NoConsistency = L::NoConsistency>>(
1399 self,
1400 tick: &Tick<L2>,
1401 _nondet: NonDet,
1402 ) -> KeyedSingleton<K, V, Tick<L::NoConsistency>, Bounded> {
1403 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1404 KeyedSingleton::new(
1405 tick.drop_consistency(),
1406 HydroNode::Batch {
1407 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1408 metadata: tick
1409 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1410 },
1411 )
1412 }
1413}
1414
1415impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1416where
1417 L: Location<'a>,
1418{
1419 /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1420 /// state of the keyed singleton being atomically processed.
1421 ///
1422 /// # Non-Determinism
1423 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1424 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1425 pub fn snapshot_atomic<L2: Location<'a, NoConsistency = L::NoConsistency>>(
1426 self,
1427 tick: &Tick<L2>,
1428 _nondet: NonDet,
1429 ) -> KeyedSingleton<K, V, Tick<L::NoConsistency>, Bounded> {
1430 KeyedSingleton::new(
1431 tick.drop_consistency(),
1432 HydroNode::Batch {
1433 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1434 metadata: tick
1435 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1436 },
1437 )
1438 }
1439}
1440
1441impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1442where
1443 L: Location<'a>,
1444{
1445 /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1446 ///
1447 /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1448 /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1449 /// is filtered out.
1450 ///
1451 /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1452 /// not modify or take ownership of the values. If you need to modify the values while filtering
1453 /// use [`KeyedSingleton::filter_map`] instead.
1454 ///
1455 /// # Example
1456 /// ```rust
1457 /// # #[cfg(feature = "deploy")] {
1458 /// # use hydro_lang::prelude::*;
1459 /// # use futures::StreamExt;
1460 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1461 /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1462 /// # process
1463 /// # .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1464 /// # .into_keyed()
1465 /// # .first();
1466 /// keyed_singleton.filter(q!(|&v| v > 1))
1467 /// # .entries()
1468 /// # }, |mut stream| async move {
1469 /// // { 1: 2, 2: 4 }
1470 /// # let mut results = Vec::new();
1471 /// # for _ in 0..2 {
1472 /// # results.push(stream.next().await.unwrap());
1473 /// # }
1474 /// # results.sort();
1475 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1476 /// # }));
1477 /// # }
1478 /// ```
1479 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1480 where
1481 F: Fn(&V) -> bool + 'a,
1482 {
1483 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1484 let filter_f = q!({
1485 let orig = f;
1486 move |t: &(_, _)| orig(&t.1)
1487 })
1488 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1489 .into();
1490
1491 KeyedSingleton::new(
1492 self.location.clone(),
1493 HydroNode::Filter {
1494 f: filter_f,
1495 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1496 metadata: self
1497 .location
1498 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1499 },
1500 )
1501 }
1502
1503 /// An operator that both filters and maps values. It yields only the key-value pairs where
1504 /// the supplied closure `f` returns `Some(value)`.
1505 ///
1506 /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1507 /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1508 /// If it returns `None`, the key-value pair is filtered out.
1509 ///
1510 /// # Example
1511 /// ```rust
1512 /// # #[cfg(feature = "deploy")] {
1513 /// # use hydro_lang::prelude::*;
1514 /// # use futures::StreamExt;
1515 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1516 /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1517 /// # process
1518 /// # .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1519 /// # .into_keyed()
1520 /// # .first();
1521 /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1522 /// # .entries()
1523 /// # }, |mut stream| async move {
1524 /// // { 1: 42, 3: 100 }
1525 /// # let mut results = Vec::new();
1526 /// # for _ in 0..2 {
1527 /// # results.push(stream.next().await.unwrap());
1528 /// # }
1529 /// # results.sort();
1530 /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1531 /// # }));
1532 /// # }
1533 /// ```
1534 pub fn filter_map<F, U>(
1535 self,
1536 f: impl IntoQuotedMut<'a, F, L> + Copy,
1537 ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
1538 where
1539 F: Fn(V) -> Option<U> + 'a,
1540 {
1541 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1542 let filter_map_f = q!({
1543 let orig = f;
1544 move |(k, v)| orig(v).map(|o| (k, o))
1545 })
1546 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1547 .into();
1548
1549 KeyedSingleton::new(
1550 self.location.clone(),
1551 HydroNode::FilterMap {
1552 f: filter_map_f,
1553 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1554 metadata: self.location.new_node_metadata(KeyedSingleton::<
1555 K,
1556 U,
1557 L,
1558 B::EraseMonotonic,
1559 >::collection_kind()),
1560 },
1561 )
1562 }
1563
1564 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1565 /// arrived since the previous batch was released.
1566 ///
1567 /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1568 /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1569 ///
1570 /// # Non-Determinism
1571 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1572 /// has a non-deterministic set of key-value pairs.
1573 pub fn batch<L2: Location<'a, NoConsistency = L::NoConsistency>>(
1574 self,
1575 tick: &Tick<L2>,
1576 _nondet: NonDet,
1577 ) -> KeyedSingleton<K, V, Tick<L::NoConsistency>, Bounded> {
1578 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1579 KeyedSingleton::new(
1580 tick.drop_consistency(),
1581 HydroNode::Batch {
1582 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1583 metadata: tick
1584 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1585 },
1586 )
1587 }
1588}
1589
1590impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1591where
1592 L: Location<'a>,
1593{
1594 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1595 /// atomically processed.
1596 ///
1597 /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1598 /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1599 ///
1600 /// # Non-Determinism
1601 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1602 /// has a non-deterministic set of key-value pairs.
1603 pub fn batch_atomic<L2: Location<'a, NoConsistency = L::NoConsistency>>(
1604 self,
1605 tick: &Tick<L2>,
1606 nondet: NonDet,
1607 ) -> KeyedSingleton<K, V, Tick<L::NoConsistency>, Bounded> {
1608 let _ = nondet;
1609 KeyedSingleton::new(
1610 tick.drop_consistency(),
1611 HydroNode::Batch {
1612 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1613 metadata: tick
1614 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1615 },
1616 )
1617 }
1618}
1619
1620#[cfg(test)]
1621mod tests {
1622 #[cfg(feature = "deploy")]
1623 use futures::{SinkExt, StreamExt};
1624 #[cfg(feature = "deploy")]
1625 use hydro_deploy::Deployment;
1626 #[cfg(any(feature = "deploy", feature = "sim"))]
1627 use stageleft::q;
1628
1629 #[cfg(any(feature = "deploy", feature = "sim"))]
1630 use crate::compile::builder::FlowBuilder;
1631 #[cfg(any(feature = "deploy", feature = "sim"))]
1632 use crate::location::Location;
1633 #[cfg(any(feature = "deploy", feature = "sim"))]
1634 use crate::nondet::nondet;
1635
1636 #[cfg(feature = "deploy")]
1637 #[tokio::test]
1638 async fn key_count_bounded_value() {
1639 let mut deployment = Deployment::new();
1640
1641 let mut flow = FlowBuilder::new();
1642 let node = flow.process::<()>();
1643 let external = flow.external::<()>();
1644
1645 let (input_port, input) = node.source_external_bincode(&external);
1646 let out = input
1647 .into_keyed()
1648 .first()
1649 .key_count()
1650 .sample_eager(nondet!(/** test */))
1651 .send_bincode_external(&external);
1652
1653 let nodes = flow
1654 .with_process(&node, deployment.Localhost())
1655 .with_external(&external, deployment.Localhost())
1656 .deploy(&mut deployment);
1657
1658 deployment.deploy().await.unwrap();
1659
1660 let mut external_in = nodes.connect(input_port).await;
1661 let mut external_out = nodes.connect(out).await;
1662
1663 deployment.start().await.unwrap();
1664
1665 assert_eq!(external_out.next().await.unwrap(), 0);
1666
1667 external_in.send((1, 1)).await.unwrap();
1668 assert_eq!(external_out.next().await.unwrap(), 1);
1669
1670 external_in.send((2, 2)).await.unwrap();
1671 assert_eq!(external_out.next().await.unwrap(), 2);
1672 }
1673
1674 #[cfg(feature = "deploy")]
1675 #[tokio::test]
1676 async fn key_count_unbounded_value() {
1677 let mut deployment = Deployment::new();
1678
1679 let mut flow = FlowBuilder::new();
1680 let node = flow.process::<()>();
1681 let external = flow.external::<()>();
1682
1683 let (input_port, input) = node.source_external_bincode(&external);
1684 let out = input
1685 .into_keyed()
1686 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1687 .key_count()
1688 .sample_eager(nondet!(/** test */))
1689 .send_bincode_external(&external);
1690
1691 let nodes = flow
1692 .with_process(&node, deployment.Localhost())
1693 .with_external(&external, deployment.Localhost())
1694 .deploy(&mut deployment);
1695
1696 deployment.deploy().await.unwrap();
1697
1698 let mut external_in = nodes.connect(input_port).await;
1699 let mut external_out = nodes.connect(out).await;
1700
1701 deployment.start().await.unwrap();
1702
1703 assert_eq!(external_out.next().await.unwrap(), 0);
1704
1705 external_in.send((1, 1)).await.unwrap();
1706 assert_eq!(external_out.next().await.unwrap(), 1);
1707
1708 external_in.send((1, 2)).await.unwrap();
1709 assert_eq!(external_out.next().await.unwrap(), 1);
1710
1711 external_in.send((2, 2)).await.unwrap();
1712 assert_eq!(external_out.next().await.unwrap(), 2);
1713
1714 external_in.send((1, 1)).await.unwrap();
1715 assert_eq!(external_out.next().await.unwrap(), 2);
1716
1717 external_in.send((3, 1)).await.unwrap();
1718 assert_eq!(external_out.next().await.unwrap(), 3);
1719 }
1720
1721 #[cfg(feature = "deploy")]
1722 #[tokio::test]
1723 async fn into_singleton_bounded_value() {
1724 let mut deployment = Deployment::new();
1725
1726 let mut flow = FlowBuilder::new();
1727 let node = flow.process::<()>();
1728 let external = flow.external::<()>();
1729
1730 let (input_port, input) = node.source_external_bincode(&external);
1731 let out = input
1732 .into_keyed()
1733 .first()
1734 .into_singleton()
1735 .sample_eager(nondet!(/** test */))
1736 .send_bincode_external(&external);
1737
1738 let nodes = flow
1739 .with_process(&node, deployment.Localhost())
1740 .with_external(&external, deployment.Localhost())
1741 .deploy(&mut deployment);
1742
1743 deployment.deploy().await.unwrap();
1744
1745 let mut external_in = nodes.connect(input_port).await;
1746 let mut external_out = nodes.connect(out).await;
1747
1748 deployment.start().await.unwrap();
1749
1750 assert_eq!(
1751 external_out.next().await.unwrap(),
1752 std::collections::HashMap::new()
1753 );
1754
1755 external_in.send((1, 1)).await.unwrap();
1756 assert_eq!(
1757 external_out.next().await.unwrap(),
1758 vec![(1, 1)].into_iter().collect()
1759 );
1760
1761 external_in.send((2, 2)).await.unwrap();
1762 assert_eq!(
1763 external_out.next().await.unwrap(),
1764 vec![(1, 1), (2, 2)].into_iter().collect()
1765 );
1766 }
1767
1768 #[cfg(feature = "deploy")]
1769 #[tokio::test]
1770 async fn into_singleton_unbounded_value() {
1771 let mut deployment = Deployment::new();
1772
1773 let mut flow = FlowBuilder::new();
1774 let node = flow.process::<()>();
1775 let external = flow.external::<()>();
1776
1777 let (input_port, input) = node.source_external_bincode(&external);
1778 let out = input
1779 .into_keyed()
1780 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1781 .into_singleton()
1782 .sample_eager(nondet!(/** test */))
1783 .send_bincode_external(&external);
1784
1785 let nodes = flow
1786 .with_process(&node, deployment.Localhost())
1787 .with_external(&external, deployment.Localhost())
1788 .deploy(&mut deployment);
1789
1790 deployment.deploy().await.unwrap();
1791
1792 let mut external_in = nodes.connect(input_port).await;
1793 let mut external_out = nodes.connect(out).await;
1794
1795 deployment.start().await.unwrap();
1796
1797 assert_eq!(
1798 external_out.next().await.unwrap(),
1799 std::collections::HashMap::new()
1800 );
1801
1802 external_in.send((1, 1)).await.unwrap();
1803 assert_eq!(
1804 external_out.next().await.unwrap(),
1805 vec![(1, 1)].into_iter().collect()
1806 );
1807
1808 external_in.send((1, 2)).await.unwrap();
1809 assert_eq!(
1810 external_out.next().await.unwrap(),
1811 vec![(1, 2)].into_iter().collect()
1812 );
1813
1814 external_in.send((2, 2)).await.unwrap();
1815 assert_eq!(
1816 external_out.next().await.unwrap(),
1817 vec![(1, 2), (2, 1)].into_iter().collect()
1818 );
1819
1820 external_in.send((1, 1)).await.unwrap();
1821 assert_eq!(
1822 external_out.next().await.unwrap(),
1823 vec![(1, 3), (2, 1)].into_iter().collect()
1824 );
1825
1826 external_in.send((3, 1)).await.unwrap();
1827 assert_eq!(
1828 external_out.next().await.unwrap(),
1829 vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1830 );
1831 }
1832
1833 #[cfg(feature = "sim")]
1834 #[test]
1835 fn sim_unbounded_singleton_snapshot() {
1836 let mut flow = FlowBuilder::new();
1837 let node = flow.process::<()>();
1838
1839 let (input_port, input) = node.sim_input();
1840 let output = input
1841 .into_keyed()
1842 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1843 .snapshot(&node.tick(), nondet!(/** test */))
1844 .entries()
1845 .all_ticks()
1846 .sim_output();
1847
1848 let count = flow.sim().exhaustive(async || {
1849 input_port.send((1, 123));
1850 input_port.send((1, 456));
1851 input_port.send((2, 123));
1852
1853 let all = output.collect_sorted::<Vec<_>>().await;
1854 assert_eq!(all.last().unwrap(), &(2, 1));
1855 });
1856
1857 assert_eq!(count, 8);
1858 }
1859
1860 #[cfg(feature = "deploy")]
1861 #[tokio::test]
1862 async fn join_keyed_stream() {
1863 let mut deployment = Deployment::new();
1864
1865 let mut flow = FlowBuilder::new();
1866 let node = flow.process::<()>();
1867 let external = flow.external::<()>();
1868
1869 let tick = node.tick();
1870 let keyed_data = node
1871 .source_iter(q!(vec![(1, 10), (2, 20)]))
1872 .into_keyed()
1873 .batch(&tick, nondet!(/** test */))
1874 .first();
1875 let requests = node
1876 .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
1877 .into_keyed()
1878 .batch(&tick, nondet!(/** test */));
1879
1880 let out = keyed_data
1881 .join_keyed_stream(requests)
1882 .entries()
1883 .all_ticks()
1884 .send_bincode_external(&external);
1885
1886 let nodes = flow
1887 .with_process(&node, deployment.Localhost())
1888 .with_external(&external, deployment.Localhost())
1889 .deploy(&mut deployment);
1890
1891 deployment.deploy().await.unwrap();
1892
1893 let mut external_out = nodes.connect(out).await;
1894
1895 deployment.start().await.unwrap();
1896
1897 let mut results = vec![];
1898 for _ in 0..2 {
1899 results.push(external_out.next().await.unwrap());
1900 }
1901 results.sort();
1902
1903 assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
1904 }
1905}