Skip to main content

hydro_lang/live_collections/keyed_stream/
mod.rs

1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_singleton::KeyedSingleton;
14use super::optional::Optional;
15use super::stream::{
16    ExactlyOnce, IsExactlyOnce, IsOrdered, MinOrder, MinRetries, NoOrder, Stream, TotalOrder,
17};
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::keyed_singleton::KeyedSingletonBound;
27use crate::live_collections::stream::{
28    AtLeastOnce, Ordering, Retries, WeakerOrderingThan, WeakerRetryThan,
29};
30#[cfg(stageleft_runtime)]
31use crate::location::dynamic::{DynLocation, LocationId};
32use crate::location::tick::DeferTick;
33use crate::location::{Atomic, Location, Tick, check_matching_location};
34use crate::manual_expr::ManualExpr;
35use crate::nondet::{NonDet, nondet};
36use crate::properties::{
37    AggFuncAlgebra, ApplyMonotoneKeyedStream, ValidCommutativityFor, ValidIdempotenceFor,
38    manual_proof,
39};
40
41pub mod networking;
42
43/// Streaming elements of type `V` grouped by a key of type `K`.
44///
45/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
46/// order of keys is non-deterministic but the order *within* each group may be deterministic.
47///
48/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
49/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
50/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
51///
52/// Type Parameters:
53/// - `K`: the type of the key for each group
54/// - `V`: the type of the elements inside each group
55/// - `Loc`: the [`Location`] where the keyed stream is materialized
56/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
57/// - `Order`: tracks whether the elements within each group have deterministic order
58///   ([`TotalOrder`]) or not ([`NoOrder`])
59/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
60///   ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
61pub struct KeyedStream<
62    K,
63    V,
64    Loc,
65    Bound: Boundedness = Unbounded,
66    Order: Ordering = TotalOrder,
67    Retry: Retries = ExactlyOnce,
68> {
69    pub(crate) location: Loc,
70    pub(crate) ir_node: RefCell<HydroNode>,
71    pub(crate) flow_state: FlowState,
72
73    _phantom: PhantomData<(K, V, Loc, Bound, Order, Retry)>,
74}
75
76impl<K, V, L, B: Boundedness, O: Ordering, R: Retries> Drop for KeyedStream<K, V, L, B, O, R> {
77    fn drop(&mut self) {
78        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
79        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
80            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
81                input: Box::new(ir_node),
82                op_metadata: HydroIrOpMetadata::new(),
83            });
84        }
85    }
86}
87
88impl<'a, K, V, L, O: Ordering, R: Retries> From<KeyedStream<K, V, L, Bounded, O, R>>
89    for KeyedStream<K, V, L, Unbounded, O, R>
90where
91    L: Location<'a>,
92{
93    fn from(stream: KeyedStream<K, V, L, Bounded, O, R>) -> KeyedStream<K, V, L, Unbounded, O, R> {
94        let new_meta = stream
95            .location
96            .new_node_metadata(KeyedStream::<K, V, L, Unbounded, O, R>::collection_kind());
97
98        KeyedStream {
99            location: stream.location.clone(),
100            flow_state: stream.flow_state.clone(),
101            ir_node: RefCell::new(HydroNode::Cast {
102                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
103                metadata: new_meta,
104            }),
105            _phantom: PhantomData,
106        }
107    }
108}
109
110impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
111    for KeyedStream<K, V, L, B, NoOrder, R>
112where
113    L: Location<'a>,
114{
115    fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
116        stream.weaken_ordering()
117    }
118}
119
120impl<'a, K, V, L, O: Ordering, R: Retries> DeferTick for KeyedStream<K, V, Tick<L>, Bounded, O, R>
121where
122    L: Location<'a>,
123{
124    fn defer_tick(self) -> Self {
125        KeyedStream::defer_tick(self)
126    }
127}
128
129impl<'a, K, V, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
130    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
131where
132    L: Location<'a>,
133{
134    type Location = Tick<L>;
135
136    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
137        KeyedStream {
138            flow_state: location.flow_state().clone(),
139            location: location.clone(),
140            ir_node: RefCell::new(HydroNode::CycleSource {
141                cycle_id,
142                metadata: location.new_node_metadata(
143                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
144                ),
145            }),
146            _phantom: PhantomData,
147        }
148    }
149}
150
151impl<'a, K, V, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
152    for KeyedStream<K, V, Tick<L>, Bounded, O, R>
153where
154    L: Location<'a>,
155{
156    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
157        assert_eq!(
158            Location::id(&self.location),
159            expected_location,
160            "locations do not match"
161        );
162
163        self.location
164            .flow_state()
165            .borrow_mut()
166            .push_root(HydroRoot::CycleSink {
167                cycle_id,
168                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
169                op_metadata: HydroIrOpMetadata::new(),
170            });
171    }
172}
173
174impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
175    for KeyedStream<K, V, L, B, O, R>
176where
177    L: Location<'a>,
178{
179    type Location = L;
180
181    fn create_source(cycle_id: CycleId, location: L) -> Self {
182        KeyedStream {
183            flow_state: location.flow_state().clone(),
184            location: location.clone(),
185            ir_node: RefCell::new(HydroNode::CycleSource {
186                cycle_id,
187                metadata: location
188                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
189            }),
190            _phantom: PhantomData,
191        }
192    }
193}
194
195impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
196    for KeyedStream<K, V, L, B, O, R>
197where
198    L: Location<'a>,
199{
200    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
201        assert_eq!(
202            Location::id(&self.location),
203            expected_location,
204            "locations do not match"
205        );
206        self.location
207            .flow_state()
208            .borrow_mut()
209            .push_root(HydroRoot::CycleSink {
210                cycle_id,
211                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
212                op_metadata: HydroIrOpMetadata::new(),
213            });
214    }
215}
216
217impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
218    Clone for KeyedStream<K, V, Loc, Bound, Order, R>
219{
220    fn clone(&self) -> Self {
221        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
222            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
223            *self.ir_node.borrow_mut() = HydroNode::Tee {
224                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
225                metadata: self.location.new_node_metadata(Self::collection_kind()),
226            };
227        }
228
229        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
230            KeyedStream {
231                location: self.location.clone(),
232                flow_state: self.flow_state.clone(),
233                ir_node: HydroNode::Tee {
234                    inner: SharedNode(inner.0.clone()),
235                    metadata: metadata.clone(),
236                }
237                .into(),
238                _phantom: PhantomData,
239            }
240        } else {
241            unreachable!()
242        }
243    }
244}
245
246/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
247/// control the processing of future elements.
248pub enum Generate<T> {
249    /// Emit the provided element, and keep processing future inputs.
250    Yield(T),
251    /// Emit the provided element as the _final_ element, do not process future inputs.
252    Return(T),
253    /// Do not emit anything, but continue processing future inputs.
254    Continue,
255    /// Do not emit anything, and do not process further inputs.
256    Break,
257}
258
259impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
260    KeyedStream<K, V, L, B, O, R>
261{
262    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
263        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
264        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
265
266        let flow_state = location.flow_state().clone();
267        KeyedStream {
268            location,
269            flow_state,
270            ir_node: RefCell::new(ir_node),
271            _phantom: PhantomData,
272        }
273    }
274
275    /// Returns the [`CollectionKind`] corresponding to this type.
276    pub fn collection_kind() -> CollectionKind {
277        CollectionKind::KeyedStream {
278            bound: B::BOUND_KIND,
279            value_order: O::ORDERING_KIND,
280            value_retry: R::RETRIES_KIND,
281            key_type: stageleft::quote_type::<K>().into(),
282            value_type: stageleft::quote_type::<V>().into(),
283        }
284    }
285
286    /// Returns the [`Location`] where this keyed stream is being materialized.
287    pub fn location(&self) -> &L {
288        &self.location
289    }
290
291    /// Weakens the consistency of this live collection to not guarantee any consistency across
292    /// cluster members (if this collection is on a cluster).
293    pub fn weaken_consistency(self) -> KeyedStream<K, V, L::NoConsistency, B, O, R>
294    where
295        L: Location<'a>,
296    {
297        if L::consistency()
298            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
299        {
300            // already no consistency
301            KeyedStream::new(
302                self.location.drop_consistency(),
303                self.ir_node.replace(HydroNode::Placeholder),
304            )
305        } else {
306            KeyedStream::new(
307                self.location.drop_consistency(),
308                HydroNode::Cast {
309                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
310                    metadata: self
311                        .location
312                        .drop_consistency()
313                        .new_node_metadata(
314                            KeyedStream::<K, V, L::NoConsistency, B>::collection_kind(),
315                        ),
316                },
317            )
318        }
319    }
320
321    /// Casts this live collection to have the consistency guarantees specified in the given
322    /// location type parameter. The developer must ensure that the strengthened consistency
323    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
324    pub fn assert_has_consistency_of<L2: Location<'a, NoConsistency = L::NoConsistency>>(
325        self,
326        _proof: impl crate::properties::ConsistencyProof,
327    ) -> KeyedStream<K, V, L2, B, O, R>
328    where
329        L: Location<'a>,
330    {
331        if L::consistency() == L2::consistency() {
332            KeyedStream::new(
333                self.location.with_consistency_of(),
334                self.ir_node.replace(HydroNode::Placeholder),
335            )
336        } else {
337            KeyedStream::new(
338                self.location.with_consistency_of(),
339                HydroNode::AssertIsConsistent {
340                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
341                    metadata: self
342                        .location
343                        .clone()
344                        .with_consistency_of::<L2>()
345                        .new_node_metadata(KeyedStream::<K, V, L2, B, O, R>::collection_kind()),
346                },
347            )
348        }
349    }
350
351    /// Turns this [`KeyedStream`] into a [`Stream`] preserving ordering, under the invariant
352    /// assumption that there is at most one key. If this invariant is broken, the program
353    /// may exhibit undefined behavior, so uses must be carefully vetted.
354    pub(crate) fn cast_at_most_one_key(self) -> Stream<(K, V), L, B, O, R> {
355        Stream::new(
356            self.location.clone(),
357            HydroNode::Cast {
358                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
359                metadata: self
360                    .location
361                    .new_node_metadata(Stream::<(K, V), L, B, O, R>::collection_kind()),
362            },
363        )
364    }
365
366    /// Turns this [`KeyedStream`] into a [`KeyedSingleton`], under the invariant assumption that
367    /// there is at most one entry per key. If this invariant is broken, the program may exhibit
368    /// undefined behavior, so uses must be carefully vetted.
369    pub(crate) fn cast_at_most_one_entry_per_key(
370        self,
371    ) -> KeyedSingleton<K, V, L, B::WithBoundedValue> {
372        KeyedSingleton::new(
373            self.location.clone(),
374            HydroNode::Cast {
375                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
376                metadata: self.location.new_node_metadata(KeyedSingleton::<
377                    K,
378                    V,
379                    L,
380                    B::WithBoundedValue,
381                >::collection_kind()),
382            },
383        )
384    }
385
386    pub(crate) fn use_ordering_type<O2: Ordering>(self) -> KeyedStream<K, V, L, B, O2, R> {
387        if O::ORDERING_KIND == O2::ORDERING_KIND {
388            KeyedStream::new(
389                self.location.clone(),
390                self.ir_node.replace(HydroNode::Placeholder),
391            )
392        } else {
393            panic!(
394                "Runtime ordering {:?} did not match requested cast {:?}.",
395                O::ORDERING_KIND,
396                O2::ORDERING_KIND
397            )
398        }
399    }
400
401    /// Explicitly "casts" the keyed stream to a type with a different ordering
402    /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
403    /// by the type-system.
404    ///
405    /// # Non-Determinism
406    /// This function is used as an escape hatch, and any mistakes in the
407    /// provided ordering guarantee will propagate into the guarantees
408    /// for the rest of the program.
409    pub fn assume_ordering<O2: Ordering>(
410        self,
411        _nondet: NonDet,
412    ) -> KeyedStream<K, V, L::NoConsistency, B, O2, R> {
413        if O::ORDERING_KIND == O2::ORDERING_KIND {
414            self.use_ordering_type().weaken_consistency()
415        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
416            // We can always weaken the ordering guarantee
417            let target_location = self.location.drop_consistency();
418            KeyedStream::new(
419                target_location.clone(),
420                HydroNode::Cast {
421                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
422                    metadata: target_location
423                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
424                },
425            )
426        } else {
427            let target_location = self.location.drop_consistency();
428            KeyedStream::new(
429                target_location.clone(),
430                HydroNode::ObserveNonDet {
431                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
432                    trusted: false,
433                    metadata: target_location
434                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
435                },
436            )
437        }
438    }
439
440    fn assume_ordering_trusted<O2: Ordering>(
441        self,
442        _nondet: NonDet,
443    ) -> KeyedStream<K, V, L, B, O2, R> {
444        if O::ORDERING_KIND == O2::ORDERING_KIND {
445            KeyedStream::new(
446                self.location.clone(),
447                self.ir_node.replace(HydroNode::Placeholder),
448            )
449        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
450            // We can always weaken the ordering guarantee
451            KeyedStream::new(
452                self.location.clone(),
453                HydroNode::Cast {
454                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
455                    metadata: self
456                        .location
457                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
458                },
459            )
460        } else {
461            KeyedStream::new(
462                self.location.clone(),
463                HydroNode::ObserveNonDet {
464                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
465                    trusted: true,
466                    metadata: self
467                        .location
468                        .new_node_metadata(KeyedStream::<K, V, L, B, O2, R>::collection_kind()),
469                },
470            )
471        }
472    }
473
474    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
475    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
476    /// which is always safe because that is the weakest possible guarantee.
477    pub fn weakest_ordering(self) -> KeyedStream<K, V, L, B, NoOrder, R> {
478        self.weaken_ordering::<NoOrder>()
479    }
480
481    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
482    /// enforcing that `O2` is weaker than the input ordering guarantee.
483    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> KeyedStream<K, V, L, B, O2, R> {
484        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
485        self.assume_ordering_trusted::<O2>(nondet)
486    }
487
488    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
489    /// implies that `O == TotalOrder`.
490    pub fn make_totally_ordered(self) -> KeyedStream<K, V, L, B, TotalOrder, R>
491    where
492        O: IsOrdered,
493    {
494        self.assume_ordering_trusted(nondet!(/** no-op */))
495    }
496
497    /// Explicitly "casts" the keyed stream to a type with a different retries
498    /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
499    /// be proven by the type-system.
500    ///
501    /// # Non-Determinism
502    /// This function is used as an escape hatch, and any mistakes in the
503    /// provided retries guarantee will propagate into the guarantees
504    /// for the rest of the program.
505    pub fn assume_retries<R2: Retries>(
506        self,
507        _nondet: NonDet,
508    ) -> KeyedStream<K, V, L::NoConsistency, B, O, R2> {
509        if R::RETRIES_KIND == R2::RETRIES_KIND {
510            KeyedStream::new(
511                self.location.drop_consistency(),
512                self.ir_node.replace(HydroNode::Placeholder),
513            )
514        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
515            // We can always weaken the retries guarantee
516            let target_location = self.location.drop_consistency();
517            KeyedStream::new(
518                target_location.clone(),
519                HydroNode::Cast {
520                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
521                    metadata: target_location
522                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
523                },
524            )
525        } else {
526            let target_location = self.location.drop_consistency();
527            KeyedStream::new(
528                target_location.clone(),
529                HydroNode::ObserveNonDet {
530                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
531                    trusted: false,
532                    metadata: target_location
533                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
534                },
535            )
536        }
537    }
538
539    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
540    // is not observable
541    fn assume_retries_trusted<R2: Retries>(
542        self,
543        _nondet: NonDet,
544    ) -> KeyedStream<K, V, L, B, O, R2> {
545        if R::RETRIES_KIND == R2::RETRIES_KIND {
546            KeyedStream::new(
547                self.location.clone(),
548                self.ir_node.replace(HydroNode::Placeholder),
549            )
550        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
551            // We can always weaken the retries guarantee
552            KeyedStream::new(
553                self.location.clone(),
554                HydroNode::Cast {
555                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
556                    metadata: self
557                        .location
558                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
559                },
560            )
561        } else {
562            KeyedStream::new(
563                self.location.clone(),
564                HydroNode::ObserveNonDet {
565                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
566                    trusted: true,
567                    metadata: self
568                        .location
569                        .new_node_metadata(KeyedStream::<K, V, L, B, O, R2>::collection_kind()),
570                },
571            )
572        }
573    }
574
575    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
576    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
577    /// which is always safe because that is the weakest possible guarantee.
578    pub fn weakest_retries(self) -> KeyedStream<K, V, L, B, O, AtLeastOnce> {
579        self.weaken_retries::<AtLeastOnce>()
580    }
581
582    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
583    /// enforcing that `R2` is weaker than the input retries guarantee.
584    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> KeyedStream<K, V, L, B, O, R2> {
585        let nondet = nondet!(/** this is a weaker retries guarantee, so it is safe to assume */);
586        self.assume_retries_trusted::<R2>(nondet)
587    }
588
589    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
590    /// implies that `R == ExactlyOnce`.
591    pub fn make_exactly_once(self) -> KeyedStream<K, V, L, B, O, ExactlyOnce>
592    where
593        R: IsExactlyOnce,
594    {
595        self.assume_retries_trusted(nondet!(/** no-op */))
596    }
597
598    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
599    /// implies that `B == Bounded`.
600    pub fn make_bounded(self) -> KeyedStream<K, V, L, Bounded, O, R>
601    where
602        B: IsBounded,
603    {
604        self.weaken_boundedness()
605    }
606
607    /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
608    /// which implies that `B == Bounded`.
609    pub fn weaken_boundedness<B2: Boundedness>(self) -> KeyedStream<K, V, L, B2, O, R> {
610        if B::BOUNDED == B2::BOUNDED {
611            KeyedStream::new(
612                self.location.clone(),
613                self.ir_node.replace(HydroNode::Placeholder),
614            )
615        } else {
616            // We can always weaken the boundedness
617            KeyedStream::new(
618                self.location.clone(),
619                HydroNode::Cast {
620                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
621                    metadata: self
622                        .location
623                        .new_node_metadata(KeyedStream::<K, V, L, B2, O, R>::collection_kind()),
624                },
625            )
626        }
627    }
628
629    /// Flattens the keyed stream into an unordered stream of key-value pairs.
630    ///
631    /// # Example
632    /// ```rust
633    /// # #[cfg(feature = "deploy")] {
634    /// # use hydro_lang::prelude::*;
635    /// # use futures::StreamExt;
636    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
637    /// process
638    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
639    ///     .into_keyed()
640    ///     .entries()
641    /// # }, |mut stream| async move {
642    /// // (1, 2), (1, 3), (2, 4) in any order
643    /// # let mut results = Vec::new();
644    /// # for _ in 0..3 {
645    /// #     results.push(stream.next().await.unwrap());
646    /// # }
647    /// # results.sort();
648    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
649    /// # }));
650    /// # }
651    /// ```
652    pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
653        Stream::new(
654            self.location.clone(),
655            HydroNode::Cast {
656                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
657                metadata: self
658                    .location
659                    .new_node_metadata(Stream::<(K, V), L, B, NoOrder, R>::collection_kind()),
660            },
661        )
662    }
663
664    /// Flattens the keyed stream into a totally ordered stream of key-value pairs,
665    /// preserving the order of values within each key group but non-deterministically
666    /// interleaving across keys.
667    ///
668    /// Requires the keyed stream to be totally ordered within each group (`O: IsOrdered`).
669    ///
670    /// # Non-Determinism
671    /// The interleaving of entries across different keys is non-deterministic.
672    /// Within each key, the original order is preserved.
673    pub fn entries_partially_ordered(
674        self,
675        _nondet: NonDet,
676    ) -> Stream<(K, V), L::NoConsistency, B, TotalOrder, R>
677    where
678        O: IsOrdered,
679    {
680        let target_location = self.location.drop_consistency();
681        Stream::new(
682            target_location.clone(),
683            HydroNode::ObserveNonDet {
684                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
685                trusted: false,
686                metadata: target_location
687                    .new_node_metadata(Stream::<(K, V), L, B, TotalOrder, R>::collection_kind()),
688            },
689        )
690    }
691
692    /// Flattens the keyed stream into an unordered stream of only the values.
693    ///
694    /// # Example
695    /// ```rust
696    /// # #[cfg(feature = "deploy")] {
697    /// # use hydro_lang::prelude::*;
698    /// # use futures::StreamExt;
699    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
700    /// process
701    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
702    ///     .into_keyed()
703    ///     .values()
704    /// # }, |mut stream| async move {
705    /// // 2, 3, 4 in any order
706    /// # let mut results = Vec::new();
707    /// # for _ in 0..3 {
708    /// #     results.push(stream.next().await.unwrap());
709    /// # }
710    /// # results.sort();
711    /// # assert_eq!(results, vec![2, 3, 4]);
712    /// # }));
713    /// # }
714    /// ```
715    pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
716        self.entries().map(q!(|(_, v)| v))
717    }
718
719    /// Flattens the keyed stream into an unordered stream of just the keys.
720    ///
721    /// # Example
722    /// ```rust
723    /// # #[cfg(feature = "deploy")] {
724    /// # use hydro_lang::prelude::*;
725    /// # use futures::StreamExt;
726    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
727    /// # process
728    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (1, 5)]))
729    /// #     .into_keyed()
730    /// #     .keys()
731    /// # }, |mut stream| async move {
732    /// // 1, 2 in any order
733    /// # let mut results = Vec::new();
734    /// # for _ in 0..2 {
735    /// #     results.push(stream.next().await.unwrap());
736    /// # }
737    /// # results.sort();
738    /// # assert_eq!(results, vec![1, 2]);
739    /// # }));
740    /// # }
741    /// ```
742    pub fn keys(self) -> Stream<K, L, B, NoOrder, ExactlyOnce>
743    where
744        K: Eq + Hash,
745    {
746        self.entries().map(q!(|(k, _)| k)).unique()
747    }
748
749    /// Transforms each value by invoking `f` on each element, with keys staying the same
750    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
751    ///
752    /// If you do not want to modify the stream and instead only want to view
753    /// each item use [`KeyedStream::inspect`] instead.
754    ///
755    /// # Example
756    /// ```rust
757    /// # #[cfg(feature = "deploy")] {
758    /// # use hydro_lang::prelude::*;
759    /// # use futures::StreamExt;
760    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
761    /// process
762    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
763    ///     .into_keyed()
764    ///     .map(q!(|v| v + 1))
765    /// #   .entries()
766    /// # }, |mut stream| async move {
767    /// // { 1: [3, 4], 2: [5] }
768    /// # let mut results = Vec::new();
769    /// # for _ in 0..3 {
770    /// #     results.push(stream.next().await.unwrap());
771    /// # }
772    /// # results.sort();
773    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 5)]);
774    /// # }));
775    /// # }
776    /// ```
777    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
778    where
779        F: Fn(V) -> U + 'a,
780    {
781        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
782        let map_f = q!({
783            let orig = f;
784            move |(k, v)| (k, orig(v))
785        })
786        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
787        .into();
788
789        KeyedStream::new(
790            self.location.clone(),
791            HydroNode::Map {
792                f: map_f,
793                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
794                metadata: self
795                    .location
796                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
797            },
798        )
799    }
800
801    /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
802    /// re-grouped even they are tuples; instead they will be grouped under the original key.
803    ///
804    /// If you do not want to modify the stream and instead only want to view
805    /// each item use [`KeyedStream::inspect_with_key`] instead.
806    ///
807    /// # Example
808    /// ```rust
809    /// # #[cfg(feature = "deploy")] {
810    /// # use hydro_lang::prelude::*;
811    /// # use futures::StreamExt;
812    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
813    /// process
814    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
815    ///     .into_keyed()
816    ///     .map_with_key(q!(|(k, v)| k + v))
817    /// #   .entries()
818    /// # }, |mut stream| async move {
819    /// // { 1: [3, 4], 2: [6] }
820    /// # let mut results = Vec::new();
821    /// # for _ in 0..3 {
822    /// #     results.push(stream.next().await.unwrap());
823    /// # }
824    /// # results.sort();
825    /// # assert_eq!(results, vec![(1, 3), (1, 4), (2, 6)]);
826    /// # }));
827    /// # }
828    /// ```
829    pub fn map_with_key<U, F>(
830        self,
831        f: impl IntoQuotedMut<'a, F, L> + Copy,
832    ) -> KeyedStream<K, U, L, B, O, R>
833    where
834        F: Fn((K, V)) -> U + 'a,
835        K: Clone,
836    {
837        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
838        let map_f = q!({
839            let orig = f;
840            move |(k, v)| {
841                let out = orig((Clone::clone(&k), v));
842                (k, out)
843            }
844        })
845        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
846        .into();
847
848        KeyedStream::new(
849            self.location.clone(),
850            HydroNode::Map {
851                f: map_f,
852                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
853                metadata: self
854                    .location
855                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
856            },
857        )
858    }
859
860    /// Prepends a new value to the key of each element in the stream, producing a new
861    /// keyed stream with compound keys. Because the original key is preserved, no re-grouping
862    /// occurs and the elements in each group preserve their original order.
863    ///
864    /// # Example
865    /// ```rust
866    /// # #[cfg(feature = "deploy")] {
867    /// # use hydro_lang::prelude::*;
868    /// # use futures::StreamExt;
869    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
870    /// process
871    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
872    ///     .into_keyed()
873    ///     .prefix_key(q!(|&(k, _)| k % 2))
874    /// #   .entries()
875    /// # }, |mut stream| async move {
876    /// // { (1, 1): [2, 3], (0, 2): [4] }
877    /// # let mut results = Vec::new();
878    /// # for _ in 0..3 {
879    /// #     results.push(stream.next().await.unwrap());
880    /// # }
881    /// # results.sort();
882    /// # assert_eq!(results, vec![((0, 2), 4), ((1, 1), 2), ((1, 1), 3)]);
883    /// # }));
884    /// # }
885    /// ```
886    pub fn prefix_key<K2, F>(
887        self,
888        f: impl IntoQuotedMut<'a, F, L> + Copy,
889    ) -> KeyedStream<(K2, K), V, L, B, O, R>
890    where
891        F: Fn(&(K, V)) -> K2 + 'a,
892    {
893        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
894        let map_f = q!({
895            let orig = f;
896            move |kv| {
897                let out = orig(&kv);
898                ((out, kv.0), kv.1)
899            }
900        })
901        .splice_fn1_ctx::<(K, V), ((K2, K), V)>(&self.location)
902        .into();
903
904        KeyedStream::new(
905            self.location.clone(),
906            HydroNode::Map {
907                f: map_f,
908                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
909                metadata: self
910                    .location
911                    .new_node_metadata(KeyedStream::<(K2, K), V, L, B, O, R>::collection_kind()),
912            },
913        )
914    }
915
916    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
917    /// `f`, preserving the order of the elements within the group.
918    ///
919    /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
920    /// not modify or take ownership of the values. If you need to modify the values while filtering
921    /// use [`KeyedStream::filter_map`] instead.
922    ///
923    /// # Example
924    /// ```rust
925    /// # #[cfg(feature = "deploy")] {
926    /// # use hydro_lang::prelude::*;
927    /// # use futures::StreamExt;
928    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
929    /// process
930    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
931    ///     .into_keyed()
932    ///     .filter(q!(|&x| x > 2))
933    /// #   .entries()
934    /// # }, |mut stream| async move {
935    /// // { 1: [3], 2: [4] }
936    /// # let mut results = Vec::new();
937    /// # for _ in 0..2 {
938    /// #     results.push(stream.next().await.unwrap());
939    /// # }
940    /// # results.sort();
941    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
942    /// # }));
943    /// # }
944    /// ```
945    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
946    where
947        F: Fn(&V) -> bool + 'a,
948    {
949        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
950        let filter_f = q!({
951            let orig = f;
952            move |t: &(_, _)| orig(&t.1)
953        })
954        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
955        .into();
956
957        KeyedStream::new(
958            self.location.clone(),
959            HydroNode::Filter {
960                f: filter_f,
961                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
962                metadata: self.location.new_node_metadata(Self::collection_kind()),
963            },
964        )
965    }
966
967    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
968    /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
969    ///
970    /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
971    /// not modify or take ownership of the values. If you need to modify the values while filtering
972    /// use [`KeyedStream::filter_map_with_key`] instead.
973    ///
974    /// # Example
975    /// ```rust
976    /// # #[cfg(feature = "deploy")] {
977    /// # use hydro_lang::prelude::*;
978    /// # use futures::StreamExt;
979    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
980    /// process
981    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
982    ///     .into_keyed()
983    ///     .filter_with_key(q!(|&(k, v)| v - k == 2))
984    /// #   .entries()
985    /// # }, |mut stream| async move {
986    /// // { 1: [3], 2: [4] }
987    /// # let mut results = Vec::new();
988    /// # for _ in 0..2 {
989    /// #     results.push(stream.next().await.unwrap());
990    /// # }
991    /// # results.sort();
992    /// # assert_eq!(results, vec![(1, 3), (2, 4)]);
993    /// # }));
994    /// # }
995    /// ```
996    pub fn filter_with_key<F>(
997        self,
998        f: impl IntoQuotedMut<'a, F, L> + Copy,
999    ) -> KeyedStream<K, V, L, B, O, R>
1000    where
1001        F: Fn(&(K, V)) -> bool + 'a,
1002    {
1003        let filter_f = f
1004            .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1005            .into();
1006
1007        KeyedStream::new(
1008            self.location.clone(),
1009            HydroNode::Filter {
1010                f: filter_f,
1011                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1012                metadata: self.location.new_node_metadata(Self::collection_kind()),
1013            },
1014        )
1015    }
1016
1017    /// An operator that both filters and maps each value, with keys staying the same.
1018    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
1019    /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
1020    ///
1021    /// # Example
1022    /// ```rust
1023    /// # #[cfg(feature = "deploy")] {
1024    /// # use hydro_lang::prelude::*;
1025    /// # use futures::StreamExt;
1026    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1027    /// process
1028    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
1029    ///     .into_keyed()
1030    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
1031    /// #   .entries()
1032    /// # }, |mut stream| async move {
1033    /// // { 1: [2], 2: [4] }
1034    /// # let mut results = Vec::new();
1035    /// # for _ in 0..2 {
1036    /// #     results.push(stream.next().await.unwrap());
1037    /// # }
1038    /// # results.sort();
1039    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1040    /// # }));
1041    /// # }
1042    /// ```
1043    pub fn filter_map<U, F>(
1044        self,
1045        f: impl IntoQuotedMut<'a, F, L> + Copy,
1046    ) -> KeyedStream<K, U, L, B, O, R>
1047    where
1048        F: Fn(V) -> Option<U> + 'a,
1049    {
1050        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1051        let filter_map_f = q!({
1052            let orig = f;
1053            move |(k, v)| orig(v).map(|o| (k, o))
1054        })
1055        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1056        .into();
1057
1058        KeyedStream::new(
1059            self.location.clone(),
1060            HydroNode::FilterMap {
1061                f: filter_map_f,
1062                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1063                metadata: self
1064                    .location
1065                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1066            },
1067        )
1068    }
1069
1070    /// An operator that both filters and maps each key-value pair. The resulting values are **not**
1071    /// re-grouped even they are tuples; instead they will be grouped under the original key.
1072    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
1073    ///
1074    /// # Example
1075    /// ```rust
1076    /// # #[cfg(feature = "deploy")] {
1077    /// # use hydro_lang::prelude::*;
1078    /// # use futures::StreamExt;
1079    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1080    /// process
1081    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
1082    ///     .into_keyed()
1083    ///     .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
1084    /// #   .entries()
1085    /// # }, |mut stream| async move {
1086    /// // { 2: [2] }
1087    /// # let mut results = Vec::new();
1088    /// # for _ in 0..1 {
1089    /// #     results.push(stream.next().await.unwrap());
1090    /// # }
1091    /// # results.sort();
1092    /// # assert_eq!(results, vec![(2, 2)]);
1093    /// # }));
1094    /// # }
1095    /// ```
1096    pub fn filter_map_with_key<U, F>(
1097        self,
1098        f: impl IntoQuotedMut<'a, F, L> + Copy,
1099    ) -> KeyedStream<K, U, L, B, O, R>
1100    where
1101        F: Fn((K, V)) -> Option<U> + 'a,
1102        K: Clone,
1103    {
1104        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1105        let filter_map_f = q!({
1106            let orig = f;
1107            move |(k, v)| {
1108                let out = orig((Clone::clone(&k), v));
1109                out.map(|o| (k, o))
1110            }
1111        })
1112        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1113        .into();
1114
1115        KeyedStream::new(
1116            self.location.clone(),
1117            HydroNode::FilterMap {
1118                f: filter_map_f,
1119                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1120                metadata: self
1121                    .location
1122                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1123            },
1124        )
1125    }
1126
1127    /// Generates a keyed stream that maps each value `v` to a tuple `(v, x)`,
1128    /// where `v` is the value of `other`, a bounded [`super::singleton::Singleton`] or
1129    /// [`Optional`]. If `other` is an empty [`Optional`], no values will be produced.
1130    ///
1131    /// # Example
1132    /// ```rust
1133    /// # #[cfg(feature = "deploy")] {
1134    /// # use hydro_lang::prelude::*;
1135    /// # use futures::StreamExt;
1136    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1137    /// let tick = process.tick();
1138    /// let batch = process
1139    ///   .source_iter(q!(vec![(1, 123), (1, 456), (2, 123)]))
1140    ///   .into_keyed()
1141    ///   .batch(&tick, nondet!(/** test */));
1142    /// let count = batch.clone().entries().count(); // `count()` returns a singleton
1143    /// batch.cross_singleton(count).all_ticks().entries()
1144    /// # }, |mut stream| async move {
1145    /// // { 1: [(123, 3), (456, 3)], 2: [(123, 3)] }
1146    /// # let mut results = Vec::new();
1147    /// # for _ in 0..3 {
1148    /// #     results.push(stream.next().await.unwrap());
1149    /// # }
1150    /// # results.sort();
1151    /// # assert_eq!(results, vec![(1, (123, 3)), (1, (456, 3)), (2, (123, 3))]);
1152    /// # }));
1153    /// # }
1154    /// ```
1155    pub fn cross_singleton<O2>(
1156        self,
1157        other: impl Into<Optional<O2, L, Bounded>>,
1158    ) -> KeyedStream<K, (V, O2), L, B, O, R>
1159    where
1160        O2: Clone,
1161    {
1162        let other: Optional<O2, L, Bounded> = other.into();
1163        check_matching_location(&self.location, &other.location);
1164
1165        Stream::new(
1166            self.location.clone(),
1167            HydroNode::CrossSingleton {
1168                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1169                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1170                metadata: self
1171                    .location
1172                    .new_node_metadata(Stream::<((K, V), O2), L, B, O, R>::collection_kind()),
1173            },
1174        )
1175        .map(q!(|((k, v), o2)| (k, (v, o2))))
1176        .into_keyed()
1177    }
1178
1179    /// For each value `v` in each group, transform `v` using `f` and then treat the
1180    /// result as an [`Iterator`] to produce values one by one within the same group.
1181    /// The implementation for [`Iterator`] for the output type `I` must produce items
1182    /// in a **deterministic** order.
1183    ///
1184    /// For example, `I` could be a `Vec`, but not a `HashSet`. If the order of the items in `I` is
1185    /// not deterministic, use [`KeyedStream::flat_map_unordered`] instead.
1186    ///
1187    /// # Example
1188    /// ```rust
1189    /// # #[cfg(feature = "deploy")] {
1190    /// # use hydro_lang::prelude::*;
1191    /// # use futures::StreamExt;
1192    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1193    /// process
1194    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1195    ///     .into_keyed()
1196    ///     .flat_map_ordered(q!(|x| x))
1197    /// #   .entries()
1198    /// # }, |mut stream| async move {
1199    /// // { 1: [2, 3, 4], 2: [5, 6] }
1200    /// # let mut results = Vec::new();
1201    /// # for _ in 0..5 {
1202    /// #     results.push(stream.next().await.unwrap());
1203    /// # }
1204    /// # results.sort();
1205    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1206    /// # }));
1207    /// # }
1208    /// ```
1209    pub fn flat_map_ordered<U, I, F>(
1210        self,
1211        f: impl IntoQuotedMut<'a, F, L> + Copy,
1212    ) -> KeyedStream<K, U, L, B, O, R>
1213    where
1214        I: IntoIterator<Item = U>,
1215        F: Fn(V) -> I + 'a,
1216        K: Clone,
1217    {
1218        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1219        let flat_map_f = q!({
1220            let orig = f;
1221            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1222        })
1223        .splice_fn1_ctx::<(K, V), _>(&self.location)
1224        .into();
1225
1226        KeyedStream::new(
1227            self.location.clone(),
1228            HydroNode::FlatMap {
1229                f: flat_map_f,
1230                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1231                metadata: self
1232                    .location
1233                    .new_node_metadata(KeyedStream::<K, U, L, B, O, R>::collection_kind()),
1234            },
1235        )
1236    }
1237
1238    /// Like [`KeyedStream::flat_map_ordered`], but allows the implementation of [`Iterator`]
1239    /// for the output type `I` to produce items in any order.
1240    ///
1241    /// # Example
1242    /// ```rust
1243    /// # #[cfg(feature = "deploy")] {
1244    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1245    /// # use futures::StreamExt;
1246    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1247    /// process
1248    ///     .source_iter(q!(vec![
1249    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1250    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
1251    ///     ]))
1252    ///     .into_keyed()
1253    ///     .flat_map_unordered(q!(|x| x))
1254    /// #   .entries()
1255    /// # }, |mut stream| async move {
1256    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1257    /// # let mut results = Vec::new();
1258    /// # for _ in 0..4 {
1259    /// #     results.push(stream.next().await.unwrap());
1260    /// # }
1261    /// # results.sort();
1262    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1263    /// # }));
1264    /// # }
1265    /// ```
1266    pub fn flat_map_unordered<U, I, F>(
1267        self,
1268        f: impl IntoQuotedMut<'a, F, L> + Copy,
1269    ) -> KeyedStream<K, U, L, B, NoOrder, R>
1270    where
1271        I: IntoIterator<Item = U>,
1272        F: Fn(V) -> I + 'a,
1273        K: Clone,
1274    {
1275        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1276        let flat_map_f = q!({
1277            let orig = f;
1278            move |(k, v)| orig(v).into_iter().map(move |u| (Clone::clone(&k), u))
1279        })
1280        .splice_fn1_ctx::<(K, V), _>(&self.location)
1281        .into();
1282
1283        KeyedStream::new(
1284            self.location.clone(),
1285            HydroNode::FlatMap {
1286                f: flat_map_f,
1287                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1288                metadata: self
1289                    .location
1290                    .new_node_metadata(KeyedStream::<K, U, L, B, NoOrder, R>::collection_kind()),
1291            },
1292        )
1293    }
1294
1295    /// For each value `v` in each group, treat `v` as an [`Iterator`] and produce its items one by one
1296    /// within the same group. The implementation for [`Iterator`] for the value type `V` must produce
1297    /// items in a **deterministic** order.
1298    ///
1299    /// For example, `V` could be a `Vec`, but not a `HashSet`. If the order of the items in `V` is
1300    /// not deterministic, use [`KeyedStream::flatten_unordered`] instead.
1301    ///
1302    /// # Example
1303    /// ```rust
1304    /// # #[cfg(feature = "deploy")] {
1305    /// # use hydro_lang::prelude::*;
1306    /// # use futures::StreamExt;
1307    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1308    /// process
1309    ///     .source_iter(q!(vec![(1, vec![2, 3]), (1, vec![4]), (2, vec![5, 6])]))
1310    ///     .into_keyed()
1311    ///     .flatten_ordered()
1312    /// #   .entries()
1313    /// # }, |mut stream| async move {
1314    /// // { 1: [2, 3, 4], 2: [5, 6] }
1315    /// # let mut results = Vec::new();
1316    /// # for _ in 0..5 {
1317    /// #     results.push(stream.next().await.unwrap());
1318    /// # }
1319    /// # results.sort();
1320    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5), (2, 6)]);
1321    /// # }));
1322    /// # }
1323    /// ```
1324    pub fn flatten_ordered<U>(self) -> KeyedStream<K, U, L, B, O, R>
1325    where
1326        V: IntoIterator<Item = U>,
1327        K: Clone,
1328    {
1329        self.flat_map_ordered(q!(|d| d))
1330    }
1331
1332    /// Like [`KeyedStream::flatten_ordered`], but allows the implementation of [`Iterator`]
1333    /// for the value type `V` to produce items in any order.
1334    ///
1335    /// # Example
1336    /// ```rust
1337    /// # #[cfg(feature = "deploy")] {
1338    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
1339    /// # use futures::StreamExt;
1340    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
1341    /// process
1342    ///     .source_iter(q!(vec![
1343    ///         (1, std::collections::HashSet::<i32>::from_iter(vec![2, 3])),
1344    ///         (2, std::collections::HashSet::from_iter(vec![4, 5]))
1345    ///     ]))
1346    ///     .into_keyed()
1347    ///     .flatten_unordered()
1348    /// #   .entries()
1349    /// # }, |mut stream| async move {
1350    /// // { 1: [2, 3], 2: [4, 5] } with values in each group in unknown order
1351    /// # let mut results = Vec::new();
1352    /// # for _ in 0..4 {
1353    /// #     results.push(stream.next().await.unwrap());
1354    /// # }
1355    /// # results.sort();
1356    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4), (2, 5)]);
1357    /// # }));
1358    /// # }
1359    /// ```
1360    pub fn flatten_unordered<U>(self) -> KeyedStream<K, U, L, B, NoOrder, R>
1361    where
1362        V: IntoIterator<Item = U>,
1363        K: Clone,
1364    {
1365        self.flat_map_unordered(q!(|d| d))
1366    }
1367
1368    /// An operator which allows you to "inspect" each element of a stream without
1369    /// modifying it. The closure `f` is called on a reference to each value. This is
1370    /// mainly useful for debugging, and should not be used to generate side-effects.
1371    ///
1372    /// # Example
1373    /// ```rust
1374    /// # #[cfg(feature = "deploy")] {
1375    /// # use hydro_lang::prelude::*;
1376    /// # use futures::StreamExt;
1377    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1378    /// process
1379    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1380    ///     .into_keyed()
1381    ///     .inspect(q!(|v| println!("{}", v)))
1382    /// #   .entries()
1383    /// # }, |mut stream| async move {
1384    /// # let mut results = Vec::new();
1385    /// # for _ in 0..3 {
1386    /// #     results.push(stream.next().await.unwrap());
1387    /// # }
1388    /// # results.sort();
1389    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1390    /// # }));
1391    /// # }
1392    /// ```
1393    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1394    where
1395        F: Fn(&V) + 'a,
1396    {
1397        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1398        let inspect_f = q!({
1399            let orig = f;
1400            move |t: &(_, _)| orig(&t.1)
1401        })
1402        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1403        .into();
1404
1405        KeyedStream::new(
1406            self.location.clone(),
1407            HydroNode::Inspect {
1408                f: inspect_f,
1409                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1410                metadata: self.location.new_node_metadata(Self::collection_kind()),
1411            },
1412        )
1413    }
1414
1415    /// An operator which allows you to "inspect" each element of a stream without
1416    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1417    /// mainly useful for debugging, and should not be used to generate side-effects.
1418    ///
1419    /// # Example
1420    /// ```rust
1421    /// # #[cfg(feature = "deploy")] {
1422    /// # use hydro_lang::prelude::*;
1423    /// # use futures::StreamExt;
1424    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1425    /// process
1426    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
1427    ///     .into_keyed()
1428    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1429    /// #   .entries()
1430    /// # }, |mut stream| async move {
1431    /// # let mut results = Vec::new();
1432    /// # for _ in 0..3 {
1433    /// #     results.push(stream.next().await.unwrap());
1434    /// # }
1435    /// # results.sort();
1436    /// # assert_eq!(results, vec![(1, 2), (1, 3), (2, 4)]);
1437    /// # }));
1438    /// # }
1439    /// ```
1440    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1441    where
1442        F: Fn(&(K, V)) + 'a,
1443    {
1444        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1445
1446        KeyedStream::new(
1447            self.location.clone(),
1448            HydroNode::Inspect {
1449                f: inspect_f,
1450                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1451                metadata: self.location.new_node_metadata(Self::collection_kind()),
1452            },
1453        )
1454    }
1455
1456    /// An operator which allows you to "name" a `HydroNode`.
1457    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1458    pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
1459        {
1460            let mut node = self.ir_node.borrow_mut();
1461            let metadata = node.metadata_mut();
1462            metadata.tag = Some(name.to_owned());
1463        }
1464        self
1465    }
1466
1467    /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
1468    ///
1469    /// Unlike [`KeyedStream::fold`] which only returns the final accumulated value, `scan` produces a new stream
1470    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
1471    /// early by returning `None`.
1472    ///
1473    /// The function takes a mutable reference to the accumulator and the current element, and returns
1474    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1475    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1476    ///
1477    /// # Example
1478    /// ```rust
1479    /// # #[cfg(feature = "deploy")] {
1480    /// # use hydro_lang::prelude::*;
1481    /// # use futures::StreamExt;
1482    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1483    /// process
1484    ///     .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
1485    ///     .into_keyed()
1486    ///     .scan(
1487    ///         q!(|| 0),
1488    ///         q!(|acc, x| {
1489    ///             *acc += x;
1490    ///             if *acc % 2 == 0 { None } else { Some(*acc) }
1491    ///         }),
1492    ///     )
1493    /// #   .entries()
1494    /// # }, |mut stream| async move {
1495    /// // Output: { 0: [1], 1: [3, 7] }
1496    /// # let mut results = Vec::new();
1497    /// # for _ in 0..3 {
1498    /// #     results.push(stream.next().await.unwrap());
1499    /// # }
1500    /// # results.sort();
1501    /// # assert_eq!(results, vec![(0, 1), (1, 3), (1, 7)]);
1502    /// # }));
1503    /// # }
1504    /// ```
1505    pub fn scan<A, U, I, F>(
1506        self,
1507        init: impl IntoQuotedMut<'a, I, L> + Copy,
1508        f: impl IntoQuotedMut<'a, F, L> + Copy,
1509    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1510    where
1511        O: IsOrdered,
1512        R: IsExactlyOnce,
1513        K: Clone + Eq + Hash,
1514        I: Fn() -> A + 'a,
1515        F: Fn(&mut A, V) -> Option<U> + 'a,
1516    {
1517        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1518        self.make_totally_ordered().make_exactly_once().generator(
1519            init,
1520            q!({
1521                let orig = f;
1522                move |state, v| {
1523                    if let Some(out) = orig(state, v) {
1524                        Generate::Yield(out)
1525                    } else {
1526                        Generate::Break
1527                    }
1528                }
1529            }),
1530        )
1531    }
1532
1533    /// Iteratively processes the elements in each group using a state machine that can yield
1534    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1535    /// syntax in Rust, without requiring special syntax.
1536    ///
1537    /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
1538    /// state for each group. The second argument defines the processing logic, taking in a
1539    /// mutable reference to the group's state and the value to be processed. It emits a
1540    /// [`Generate`] value, whose variants define what is emitted and whether further inputs
1541    /// should be processed.
1542    ///
1543    /// # Example
1544    /// ```rust
1545    /// # #[cfg(feature = "deploy")] {
1546    /// # use hydro_lang::prelude::*;
1547    /// # use futures::StreamExt;
1548    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1549    /// process
1550    ///     .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
1551    ///     .into_keyed()
1552    ///     .generator(
1553    ///         q!(|| 0),
1554    ///         q!(|acc, x| {
1555    ///             *acc += x;
1556    ///             if *acc > 100 {
1557    ///                 hydro_lang::live_collections::keyed_stream::Generate::Return(
1558    ///                     "done!".to_owned()
1559    ///                 )
1560    ///             } else if *acc % 2 == 0 {
1561    ///                 hydro_lang::live_collections::keyed_stream::Generate::Yield(
1562    ///                     "even".to_owned()
1563    ///                 )
1564    ///             } else {
1565    ///                 hydro_lang::live_collections::keyed_stream::Generate::Continue
1566    ///             }
1567    ///         }),
1568    ///     )
1569    /// #   .entries()
1570    /// # }, |mut stream| async move {
1571    /// // Output: { 0: ["even", "done!"], 1: ["even"] }
1572    /// # let mut results = Vec::new();
1573    /// # for _ in 0..3 {
1574    /// #     results.push(stream.next().await.unwrap());
1575    /// # }
1576    /// # results.sort();
1577    /// # assert_eq!(results, vec![(0, "done!".to_owned()), (0, "even".to_owned()), (1, "even".to_owned())]);
1578    /// # }));
1579    /// # }
1580    /// ```
1581    pub fn generator<A, U, I, F>(
1582        self,
1583        init: impl IntoQuotedMut<'a, I, L> + Copy,
1584        f: impl IntoQuotedMut<'a, F, L> + Copy,
1585    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
1586    where
1587        O: IsOrdered,
1588        R: IsExactlyOnce,
1589        K: Clone + Eq + Hash,
1590        I: Fn() -> A + 'a,
1591        F: Fn(&mut A, V) -> Generate<U> + 'a,
1592    {
1593        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1594        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1595
1596        let this = self.make_totally_ordered().make_exactly_once();
1597
1598        let scan_init = q!(|| HashMap::new())
1599            .splice_fn0_ctx::<HashMap<K, Option<A>>>(&this.location)
1600            .into();
1601        let scan_f = q!(move |acc: &mut HashMap<_, _>, (k, v)| {
1602            let existing_state = acc.entry(Clone::clone(&k)).or_insert_with(|| Some(init()));
1603            if let Some(existing_state_value) = existing_state {
1604                match f(existing_state_value, v) {
1605                    Generate::Yield(out) => Some(Some((k, out))),
1606                    Generate::Return(out) => {
1607                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1608                        Some(Some((k, out)))
1609                    }
1610                    Generate::Break => {
1611                        let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
1612                        Some(None)
1613                    }
1614                    Generate::Continue => Some(None),
1615                }
1616            } else {
1617                Some(None)
1618            }
1619        })
1620        .splice_fn2_borrow_mut_ctx::<HashMap<K, Option<A>>, (K, V), _>(&this.location)
1621        .into();
1622
1623        let scan_node = HydroNode::Scan {
1624            init: scan_init,
1625            acc: scan_f,
1626            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1627            metadata: this.location.new_node_metadata(Stream::<
1628                Option<(K, U)>,
1629                L,
1630                B,
1631                TotalOrder,
1632                ExactlyOnce,
1633            >::collection_kind()),
1634        };
1635
1636        let flatten_f = q!(|d| d)
1637            .splice_fn1_ctx::<Option<(K, U)>, _>(&this.location)
1638            .into();
1639        let flatten_node = HydroNode::FlatMap {
1640            f: flatten_f,
1641            input: Box::new(scan_node),
1642            metadata: this.location.new_node_metadata(KeyedStream::<
1643                K,
1644                U,
1645                L,
1646                B,
1647                TotalOrder,
1648                ExactlyOnce,
1649            >::collection_kind()),
1650        };
1651
1652        KeyedStream::new(this.location.clone(), flatten_node)
1653    }
1654
1655    /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
1656    /// in-order across the values in each group. But the aggregation function returns a boolean,
1657    /// which when true indicates that the aggregated result is complete and can be released to
1658    /// downstream computation. Unlike [`KeyedStream::fold`], this means that even if the input
1659    /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
1660    /// normal stream elements.
1661    ///
1662    /// # Example
1663    /// ```rust
1664    /// # #[cfg(feature = "deploy")] {
1665    /// # use hydro_lang::prelude::*;
1666    /// # use futures::StreamExt;
1667    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1668    /// process
1669    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1670    ///     .into_keyed()
1671    ///     .fold_early_stop(
1672    ///         q!(|| 0),
1673    ///         q!(|acc, x| {
1674    ///             *acc += x;
1675    ///             x % 2 == 0
1676    ///         }),
1677    ///     )
1678    /// #   .entries()
1679    /// # }, |mut stream| async move {
1680    /// // Output: { 0: 2, 1: 9 }
1681    /// # let mut results = Vec::new();
1682    /// # for _ in 0..2 {
1683    /// #     results.push(stream.next().await.unwrap());
1684    /// # }
1685    /// # results.sort();
1686    /// # assert_eq!(results, vec![(0, 2), (1, 9)]);
1687    /// # }));
1688    /// # }
1689    /// ```
1690    pub fn fold_early_stop<A, I, F>(
1691        self,
1692        init: impl IntoQuotedMut<'a, I, L> + Copy,
1693        f: impl IntoQuotedMut<'a, F, L> + Copy,
1694    ) -> KeyedSingleton<K, A, L, B::WithBoundedValue>
1695    where
1696        O: IsOrdered,
1697        R: IsExactlyOnce,
1698        K: Clone + Eq + Hash,
1699        I: Fn() -> A + 'a,
1700        F: Fn(&mut A, V) -> bool + 'a,
1701    {
1702        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1703        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1704        let out_without_bound_cast = self.generator(
1705            q!(move || Some(init())),
1706            q!(move |key_state, v| {
1707                if let Some(key_state_value) = key_state.as_mut() {
1708                    if f(key_state_value, v) {
1709                        Generate::Return(key_state.take().unwrap())
1710                    } else {
1711                        Generate::Continue
1712                    }
1713                } else {
1714                    unreachable!()
1715                }
1716            }),
1717        );
1718
1719        // SAFETY: The generator will only ever return at most one value per key, since once it
1720        // returns a value for a key it will never process any more values for that key.
1721        out_without_bound_cast.cast_at_most_one_entry_per_key()
1722    }
1723
1724    /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
1725    /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
1726    /// otherwise the first element would be non-deterministic.
1727    ///
1728    /// # Example
1729    /// ```rust
1730    /// # #[cfg(feature = "deploy")] {
1731    /// # use hydro_lang::prelude::*;
1732    /// # use futures::StreamExt;
1733    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1734    /// process
1735    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
1736    ///     .into_keyed()
1737    ///     .first()
1738    /// #   .entries()
1739    /// # }, |mut stream| async move {
1740    /// // Output: { 0: 2, 1: 3 }
1741    /// # let mut results = Vec::new();
1742    /// # for _ in 0..2 {
1743    /// #     results.push(stream.next().await.unwrap());
1744    /// # }
1745    /// # results.sort();
1746    /// # assert_eq!(results, vec![(0, 2), (1, 3)]);
1747    /// # }));
1748    /// # }
1749    /// ```
1750    pub fn first(self) -> KeyedSingleton<K, V, L, B::WithBoundedValue>
1751    where
1752        O: IsOrdered,
1753        R: IsExactlyOnce,
1754        K: Clone + Eq + Hash,
1755    {
1756        self.fold_early_stop(
1757            q!(|| None),
1758            q!(|acc, v| {
1759                *acc = Some(v);
1760                true
1761            }),
1762        )
1763        .map(q!(|v| v.unwrap()))
1764    }
1765
1766    /// Returns a keyed stream containing at most the first `n` values per key,
1767    /// preserving the original order within each group. Similar to SQL `LIMIT`
1768    /// applied per group.
1769    ///
1770    /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1771    /// retries, since the result depends on the order and cardinality of elements
1772    /// within each group.
1773    ///
1774    /// # Example
1775    /// ```rust
1776    /// # #[cfg(feature = "deploy")] {
1777    /// # use hydro_lang::prelude::*;
1778    /// # use futures::StreamExt;
1779    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1780    /// process
1781    ///     .source_iter(q!(vec![(1, 10), (1, 20), (1, 30), (2, 40), (2, 50)]))
1782    ///     .into_keyed()
1783    ///     .limit(q!(2))
1784    /// #   .entries()
1785    /// # }, |mut stream| async move {
1786    /// // { 1: [10, 20], 2: [40, 50] }
1787    /// # let mut results = Vec::new();
1788    /// # for _ in 0..4 {
1789    /// #     results.push(stream.next().await.unwrap());
1790    /// # }
1791    /// # results.sort();
1792    /// # assert_eq!(results, vec![(1, 10), (1, 20), (2, 40), (2, 50)]);
1793    /// # }));
1794    /// # }
1795    /// ```
1796    pub fn limit(
1797        self,
1798        n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1799    ) -> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
1800    where
1801        O: IsOrdered,
1802        R: IsExactlyOnce,
1803        K: Clone + Eq + Hash,
1804    {
1805        self.generator(
1806            q!(|| 0usize),
1807            q!(move |count, item| {
1808                if *count == n {
1809                    Generate::Break
1810                } else {
1811                    *count += 1;
1812                    if *count == n {
1813                        Generate::Return(item)
1814                    } else {
1815                        Generate::Yield(item)
1816                    }
1817                }
1818            }),
1819        )
1820    }
1821
1822    /// Assigns a zero-based index to each value within each key group, emitting
1823    /// `(K, (index, V))` tuples with per-key sequential indices.
1824    ///
1825    /// The output keyed stream has [`TotalOrder`] and [`ExactlyOnce`] guarantees.
1826    /// This is a streaming operator that processes elements as they arrive.
1827    ///
1828    /// # Example
1829    /// ```rust
1830    /// # #[cfg(feature = "deploy")] {
1831    /// # use hydro_lang::prelude::*;
1832    /// # use futures::StreamExt;
1833    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1834    /// process
1835    ///     .source_iter(q!(vec![(1, 10), (2, 20), (1, 30)]))
1836    ///     .into_keyed()
1837    ///     .enumerate()
1838    /// # .entries()
1839    /// # }, |mut stream| async move {
1840    /// // per-key indices: { 1: [(0, 10), (1, 30)], 2: [(0, 20)] }
1841    /// # let mut results = Vec::new();
1842    /// # for _ in 0..3 {
1843    /// #     results.push(stream.next().await.unwrap());
1844    /// # }
1845    /// # let key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
1846    /// # let key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
1847    /// # assert_eq!(key1, vec![(0, 10), (1, 30)]);
1848    /// # assert_eq!(key2, vec![(0, 20)]);
1849    /// # }));
1850    /// # }
1851    /// ```
1852    pub fn enumerate(self) -> KeyedStream<K, (usize, V), L, B, TotalOrder, ExactlyOnce>
1853    where
1854        O: IsOrdered,
1855        R: IsExactlyOnce,
1856        K: Eq + Hash + Clone,
1857    {
1858        self.scan(
1859            q!(|| 0),
1860            q!(|acc, next| {
1861                let curr = *acc;
1862                *acc += 1;
1863                Some((curr, next))
1864            }),
1865        )
1866    }
1867
1868    /// Counts the number of elements in each group, producing a [`KeyedSingleton`] with the counts.
1869    ///
1870    /// # Example
1871    /// ```rust
1872    /// # #[cfg(feature = "deploy")] {
1873    /// # use hydro_lang::prelude::*;
1874    /// # use futures::StreamExt;
1875    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1876    /// let tick = process.tick();
1877    /// let numbers = process
1878    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4), (1, 5)]))
1879    ///     .into_keyed();
1880    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1881    /// batch
1882    ///     .value_counts()
1883    ///     .entries()
1884    ///     .all_ticks()
1885    /// # }, |mut stream| async move {
1886    /// // (1, 3), (2, 2)
1887    /// # let mut results = Vec::new();
1888    /// # for _ in 0..2 {
1889    /// #     results.push(stream.next().await.unwrap());
1890    /// # }
1891    /// # results.sort();
1892    /// # assert_eq!(results, vec![(1, 3), (2, 2)]);
1893    /// # }));
1894    /// # }
1895    /// ```
1896    pub fn value_counts(
1897        self,
1898    ) -> KeyedSingleton<K, usize, L, <B as KeyedSingletonBound>::KeyedStreamToMonotone>
1899    where
1900        R: IsExactlyOnce,
1901        K: Eq + Hash,
1902    {
1903        self.make_exactly_once()
1904            .assume_ordering_trusted(
1905                nondet!(/** ordering within each group affects neither result nor intermediates */),
1906            )
1907            .fold(
1908                q!(|| 0),
1909                q!(
1910                    |acc, _| *acc += 1,
1911                    monotone = manual_proof!(/** += 1 is monotonic */)
1912                ),
1913            )
1914    }
1915
1916    /// Like [`Stream::fold`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1917    /// group via the `comb` closure.
1918    ///
1919    /// Depending on the input stream guarantees, the closure may need to be commutative
1920    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1921    ///
1922    /// If the input and output value types are the same and do not require initialization then use
1923    /// [`KeyedStream::reduce`].
1924    ///
1925    /// # Example
1926    /// ```rust
1927    /// # #[cfg(feature = "deploy")] {
1928    /// # use hydro_lang::prelude::*;
1929    /// # use futures::StreamExt;
1930    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1931    /// let tick = process.tick();
1932    /// let numbers = process
1933    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1934    ///     .into_keyed();
1935    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1936    /// batch
1937    ///     .fold(q!(|| false), q!(|acc, x| *acc |= x))
1938    ///     .entries()
1939    ///     .all_ticks()
1940    /// # }, |mut stream| async move {
1941    /// // (1, false), (2, true)
1942    /// # let mut results = Vec::new();
1943    /// # for _ in 0..2 {
1944    /// #     results.push(stream.next().await.unwrap());
1945    /// # }
1946    /// # results.sort();
1947    /// # assert_eq!(results, vec![(1, false), (2, true)]);
1948    /// # }));
1949    /// # }
1950    /// ```
1951    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V), C, Idemp, M, B2: KeyedSingletonBound>(
1952        self,
1953        init: impl IntoQuotedMut<'a, I, L>,
1954        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1955    ) -> KeyedSingleton<K, A, L, B2>
1956    where
1957        K: Eq + Hash,
1958        C: ValidCommutativityFor<O>,
1959        Idemp: ValidIdempotenceFor<R>,
1960        B: ApplyMonotoneKeyedStream<M, B2>,
1961    {
1962        let init = init.splice_fn0_ctx(&self.location).into();
1963        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1964        proof.register_proof(&comb);
1965
1966        let ordered = self
1967            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1968            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
1969
1970        KeyedSingleton::new(
1971            ordered.location.clone(),
1972            HydroNode::FoldKeyed {
1973                init,
1974                acc: comb.into(),
1975                input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
1976                metadata: ordered
1977                    .location
1978                    .new_node_metadata(KeyedSingleton::<K, A, L, B2>::collection_kind()),
1979            },
1980        )
1981        .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1982    }
1983
1984    /// Like [`Stream::reduce`] but in the spirit of SQL `GROUP BY`, aggregates the values in each
1985    /// group via the `comb` closure.
1986    ///
1987    /// Depending on the input stream guarantees, the closure may need to be commutative
1988    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1989    ///
1990    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
1991    ///
1992    /// # Example
1993    /// ```rust
1994    /// # #[cfg(feature = "deploy")] {
1995    /// # use hydro_lang::prelude::*;
1996    /// # use futures::StreamExt;
1997    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1998    /// let tick = process.tick();
1999    /// let numbers = process
2000    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
2001    ///     .into_keyed();
2002    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2003    /// batch
2004    ///     .reduce(q!(|acc, x| *acc |= x))
2005    ///     .entries()
2006    ///     .all_ticks()
2007    /// # }, |mut stream| async move {
2008    /// // (1, false), (2, true)
2009    /// # let mut results = Vec::new();
2010    /// # for _ in 0..2 {
2011    /// #     results.push(stream.next().await.unwrap());
2012    /// # }
2013    /// # results.sort();
2014    /// # assert_eq!(results, vec![(1, false), (2, true)]);
2015    /// # }));
2016    /// # }
2017    /// ```
2018    pub fn reduce<F: Fn(&mut V, V) + 'a, C, Idemp>(
2019        self,
2020        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
2021    ) -> KeyedSingleton<K, V, L, B>
2022    where
2023        K: Eq + Hash,
2024        C: ValidCommutativityFor<O>,
2025        Idemp: ValidIdempotenceFor<R>,
2026    {
2027        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
2028        proof.register_proof(&f);
2029
2030        let ordered = self
2031            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
2032            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
2033
2034        KeyedSingleton::new(
2035            ordered.location.clone(),
2036            HydroNode::ReduceKeyed {
2037                f: f.into(),
2038                input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
2039                metadata: ordered
2040                    .location
2041                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
2042            },
2043        )
2044        .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
2045    }
2046
2047    /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark
2048    /// are automatically deleted.
2049    ///
2050    /// Depending on the input stream guarantees, the closure may need to be commutative
2051    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
2052    ///
2053    /// # Example
2054    /// ```rust
2055    /// # #[cfg(feature = "deploy")] {
2056    /// # use hydro_lang::prelude::*;
2057    /// # use futures::StreamExt;
2058    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2059    /// let tick = process.tick();
2060    /// let watermark = tick.singleton(q!(2));
2061    /// let numbers = process
2062    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
2063    ///     .into_keyed();
2064    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2065    /// batch
2066    ///     .reduce_watermark(watermark, q!(|acc, x| *acc |= x))
2067    ///     .entries()
2068    ///     .all_ticks()
2069    /// # }, |mut stream| async move {
2070    /// // (2, true)
2071    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
2072    /// # }));
2073    /// # }
2074    /// ```
2075    pub fn reduce_watermark<O2, F, C, Idemp>(
2076        self,
2077        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
2078        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
2079    ) -> KeyedSingleton<K, V, L, B>
2080    where
2081        K: Eq + Hash,
2082        O2: Clone,
2083        F: Fn(&mut V, V) + 'a,
2084        C: ValidCommutativityFor<O>,
2085        Idemp: ValidIdempotenceFor<R>,
2086    {
2087        let other: Optional<O2, Tick<L::Root>, Bounded> = other.into();
2088        check_matching_location(&self.location.root(), other.location.outer());
2089        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
2090        proof.register_proof(&f);
2091
2092        let ordered = self
2093            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
2094            .assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */));
2095
2096        KeyedSingleton::new(
2097            ordered.location.clone(),
2098            HydroNode::ReduceKeyedWatermark {
2099                f: f.into(),
2100                input: Box::new(ordered.ir_node.replace(HydroNode::Placeholder)),
2101                watermark: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2102                metadata: ordered
2103                    .location
2104                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
2105            },
2106        )
2107        .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
2108    }
2109
2110    /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
2111    /// whose keys are not in the bounded stream.
2112    ///
2113    /// # Example
2114    /// ```rust
2115    /// # #[cfg(feature = "deploy")] {
2116    /// # use hydro_lang::prelude::*;
2117    /// # use futures::StreamExt;
2118    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2119    /// let tick = process.tick();
2120    /// let keyed_stream = process
2121    ///     .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2122    ///     .batch(&tick, nondet!(/** test */))
2123    ///     .into_keyed();
2124    /// let keys_to_remove = process
2125    ///     .source_iter(q!(vec![1, 2]))
2126    ///     .batch(&tick, nondet!(/** test */));
2127    /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
2128    /// #   .entries()
2129    /// # }, |mut stream| async move {
2130    /// // { 3: ['c'], 4: ['d'] }
2131    /// # let mut results = Vec::new();
2132    /// # for _ in 0..2 {
2133    /// #     results.push(stream.next().await.unwrap());
2134    /// # }
2135    /// # results.sort();
2136    /// # assert_eq!(results, vec![(3, 'c'), (4, 'd')]);
2137    /// # }));
2138    /// # }
2139    /// ```
2140    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
2141        self,
2142        other: Stream<K, L, Bounded, O2, R2>,
2143    ) -> Self
2144    where
2145        K: Eq + Hash,
2146    {
2147        check_matching_location(&self.location, &other.location);
2148
2149        KeyedStream::new(
2150            self.location.clone(),
2151            HydroNode::AntiJoin {
2152                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2153                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2154                metadata: self.location.new_node_metadata(Self::collection_kind()),
2155            },
2156        )
2157    }
2158
2159    /// Emit a keyed stream containing keys shared between two keyed streams,
2160    /// where each value in the output keyed stream is a tuple of
2161    /// (self's value, other's value).
2162    /// If there are multiple values for the same key, this performs a cross product
2163    /// for each matching key.
2164    ///
2165    /// # Example
2166    /// ```rust
2167    /// # #[cfg(feature = "deploy")] {
2168    /// # use hydro_lang::prelude::*;
2169    /// # use futures::StreamExt;
2170    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2171    /// let tick = process.tick();
2172    /// let keyed_data = process
2173    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2174    ///     .into_keyed()
2175    ///     .batch(&tick, nondet!(/** test */));
2176    /// let other_data = process
2177    ///     .source_iter(q!(vec![(1, 100), (2, 200), (2, 201)]))
2178    ///     .into_keyed()
2179    ///     .batch(&tick, nondet!(/** test */));
2180    /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
2181    /// # }, |mut stream| async move {
2182    /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200), (20, 201)] } in any order
2183    /// # let mut results = vec![];
2184    /// # for _ in 0..4 {
2185    /// #     results.push(stream.next().await.unwrap());
2186    /// # }
2187    /// # results.sort();
2188    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200)), (2, (20, 201))]);
2189    /// # }));
2190    /// # }
2191    /// ```
2192    #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2193    pub fn join_keyed_stream<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2194        self,
2195        other: KeyedStream<K, V2, L, B2, O2, R2>,
2196    ) -> KeyedStream<
2197        K,
2198        (V, V2),
2199        L,
2200        B,
2201        B2::PreserveOrderIfBounded<NoOrder>,
2202        <R as MinRetries<R2>>::Min,
2203    >
2204    where
2205        K: Eq + Hash + Clone,
2206        R: MinRetries<R2>,
2207        V: Clone,
2208        V2: Clone,
2209    {
2210        self.entries().join(other.entries()).into_keyed()
2211    }
2212
2213    /// Deduplicates values within each key group, emitting each unique value per key
2214    /// exactly once.
2215    ///
2216    /// # Example
2217    /// ```rust
2218    /// # #[cfg(feature = "deploy")] {
2219    /// # use hydro_lang::prelude::*;
2220    /// # use futures::StreamExt;
2221    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2222    /// process
2223    ///     .source_iter(q!(vec![(1, 10), (2, 20), (1, 10), (2, 30), (1, 20)]))
2224    ///     .into_keyed()
2225    ///     .unique()
2226    /// # .entries()
2227    /// # }, |mut stream| async move {
2228    /// // unique values per key: { 1: [10, 20], 2: [20, 30] }
2229    /// # let mut results = Vec::new();
2230    /// # for _ in 0..4 {
2231    /// #     results.push(stream.next().await.unwrap());
2232    /// # }
2233    /// # let mut key1: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2234    /// # let mut key2: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2235    /// # key1.sort();
2236    /// # key2.sort();
2237    /// # assert_eq!(key1, vec![10, 20]);
2238    /// # assert_eq!(key2, vec![20, 30]);
2239    /// # }));
2240    /// # }
2241    /// ```
2242    pub fn unique(self) -> KeyedStream<K, V, L, B, NoOrder, ExactlyOnce>
2243    where
2244        K: Eq + Hash + Clone,
2245        V: Eq + Hash + Clone,
2246    {
2247        self.entries().unique().into_keyed()
2248    }
2249
2250    /// Sorts the values within each key group in ascending order.
2251    ///
2252    /// The output keyed stream has a [`TotalOrder`] guarantee on the values within
2253    /// each group. This operator will block until all elements in the input stream
2254    /// are available, so it requires the input stream to be [`Bounded`].
2255    ///
2256    /// # Example
2257    /// ```rust
2258    /// # #[cfg(feature = "deploy")] {
2259    /// # use hydro_lang::prelude::*;
2260    /// # use futures::StreamExt;
2261    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2262    /// let tick = process.tick();
2263    /// let numbers = process
2264    ///     .source_iter(q!(vec![(1, 3), (2, 1), (1, 1), (2, 2)]))
2265    ///     .into_keyed();
2266    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2267    /// batch.sort().all_ticks()
2268    /// # .entries()
2269    /// # }, |mut stream| async move {
2270    /// // values sorted within each key: { 1: [1, 3], 2: [1, 2] }
2271    /// # let mut results = Vec::new();
2272    /// # for _ in 0..4 {
2273    /// #     results.push(stream.next().await.unwrap());
2274    /// # }
2275    /// # let key1_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 1).map(|(_, v)| *v).collect();
2276    /// # let key2_vals: Vec<_> = results.iter().filter(|(k, _)| *k == 2).map(|(_, v)| *v).collect();
2277    /// # assert_eq!(key1_vals, vec![1, 3]);
2278    /// # assert_eq!(key2_vals, vec![1, 2]);
2279    /// # }));
2280    /// # }
2281    /// ```
2282    pub fn sort(self) -> KeyedStream<K, V, L, Bounded, TotalOrder, R>
2283    where
2284        B: IsBounded,
2285        K: Ord,
2286        V: Ord,
2287    {
2288        self.entries().sort().into_keyed()
2289    }
2290
2291    /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
2292    /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
2293    /// is only present in one of the inputs, its values are passed through as-is). The output has
2294    /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
2295    ///
2296    /// Currently, both input streams must be [`Bounded`]. This operator will block
2297    /// on the first stream until all its elements are available. In a future version,
2298    /// we will relax the requirement on the `other` stream.
2299    ///
2300    /// # Example
2301    /// ```rust
2302    /// # #[cfg(feature = "deploy")] {
2303    /// # use hydro_lang::prelude::*;
2304    /// # use futures::StreamExt;
2305    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2306    /// let tick = process.tick();
2307    /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
2308    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2309    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2310    /// # .entries()
2311    /// # }, |mut stream| async move {
2312    /// // { 0: [2, 1], 1: [4, 3] }
2313    /// # let mut results = Vec::new();
2314    /// # for _ in 0..4 {
2315    /// #     results.push(stream.next().await.unwrap());
2316    /// # }
2317    /// # results.sort();
2318    /// # assert_eq!(results, vec![(0, 1), (0, 2), (1, 3), (1, 4)]);
2319    /// # }));
2320    /// # }
2321    /// ```
2322    pub fn chain<O2: Ordering, R2: Retries>(
2323        self,
2324        other: KeyedStream<K, V, L, Bounded, O2, R2>,
2325    ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2326    where
2327        B: IsBounded,
2328        O: MinOrder<O2>,
2329        R: MinRetries<R2>,
2330    {
2331        let this = self.make_bounded();
2332        check_matching_location(&this.location, &other.location);
2333
2334        KeyedStream::new(
2335            this.location.clone(),
2336            HydroNode::Chain {
2337                first: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2338                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2339                metadata: this.location.new_node_metadata(KeyedStream::<
2340                    K,
2341                    V,
2342                    L,
2343                    Bounded,
2344                    <O as MinOrder<O2>>::Min,
2345                    <R as MinRetries<R2>>::Min,
2346                >::collection_kind()),
2347            },
2348        )
2349    }
2350
2351    /// Emit a keyed stream containing keys shared between the keyed stream and the
2352    /// keyed singleton, where each value in the output keyed stream is a tuple of
2353    /// (the keyed stream's value, the keyed singleton's value).
2354    ///
2355    /// # Example
2356    /// ```rust
2357    /// # #[cfg(feature = "deploy")] {
2358    /// # use hydro_lang::prelude::*;
2359    /// # use futures::StreamExt;
2360    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2361    /// let tick = process.tick();
2362    /// let keyed_data = process
2363    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2364    ///     .into_keyed()
2365    ///     .batch(&tick, nondet!(/** test */));
2366    /// let singleton_data = process
2367    ///     .source_iter(q!(vec![(1, 100), (2, 200)]))
2368    ///     .into_keyed()
2369    ///     .batch(&tick, nondet!(/** test */))
2370    ///     .first();
2371    /// keyed_data.join_keyed_singleton(singleton_data).entries().all_ticks()
2372    /// # }, |mut stream| async move {
2373    /// // { 1: [(10, 100), (11, 100)], 2: [(20, 200)] } in any order
2374    /// # let mut results = vec![];
2375    /// # for _ in 0..3 {
2376    /// #     results.push(stream.next().await.unwrap());
2377    /// # }
2378    /// # results.sort();
2379    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (11, 100)), (2, (20, 200))]);
2380    /// # }));
2381    /// # }
2382    /// ```
2383    pub fn join_keyed_singleton<V2: Clone, B2: IsBounded>(
2384        self,
2385        other: KeyedSingleton<K, V2, L, B2>,
2386    ) -> KeyedStream<K, (V, V2), L, B, O, R>
2387    where
2388        K: Eq + Hash + Clone,
2389        V: Clone,
2390    {
2391        let ir_node = if B2::BOUNDED {
2392            HydroNode::JoinHalf {
2393                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2394                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2395                metadata: self
2396                    .location
2397                    .new_node_metadata(KeyedStream::<K, (V, V2), L, B, O, R>::collection_kind()),
2398            }
2399        } else {
2400            HydroNode::Join {
2401                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2402                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2403                metadata: self
2404                    .location
2405                    .new_node_metadata(KeyedStream::<K, (V, V2), L, B, O, R>::collection_kind()),
2406            }
2407        };
2408
2409        KeyedStream::new(self.location.clone(), ir_node)
2410    }
2411
2412    /// Gets the values associated with a specific key from the keyed stream.
2413    /// Returns an empty stream if the key is `None` or there are no associated values.
2414    ///
2415    /// # Example
2416    /// ```rust
2417    /// # #[cfg(feature = "deploy")] {
2418    /// # use hydro_lang::prelude::*;
2419    /// # use futures::StreamExt;
2420    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2421    /// let tick = process.tick();
2422    /// let keyed_data = process
2423    ///     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2424    ///     .into_keyed()
2425    ///     .batch(&tick, nondet!(/** test */));
2426    /// let key = tick.singleton(q!(1));
2427    /// keyed_data.get(key).all_ticks()
2428    /// # }, |mut stream| async move {
2429    /// // 10, 11
2430    /// # let mut results = vec![];
2431    /// # for _ in 0..2 {
2432    /// #     results.push(stream.next().await.unwrap());
2433    /// # }
2434    /// # results.sort();
2435    /// # assert_eq!(results, vec![10, 11]);
2436    /// # }));
2437    /// # }
2438    /// ```
2439    pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Stream<V, L, B, O, R>
2440    where
2441        K: Eq + Hash + Clone,
2442        V: Clone,
2443    {
2444        let joined =
2445            self.join_keyed_singleton(key.into().map(q!(|k| (k, ()))).into_keyed_singleton());
2446
2447        if O::ORDERING_KIND == StreamOrder::TotalOrder {
2448            joined
2449                .use_ordering_type::<TotalOrder>()
2450                .cast_at_most_one_key()
2451                .map(q!(|(_, (v, _))| v))
2452                .weaken_ordering()
2453        } else {
2454            joined.values().map(q!(|(v, _)| v)).use_ordering_type()
2455        }
2456    }
2457
2458    /// For each value in `self`, find the matching key in `lookup`.
2459    /// The output is a keyed stream with the key from `self`, and a value
2460    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2461    /// If the key is not present in `lookup`, the option will be [`None`].
2462    ///
2463    /// # Example
2464    /// ```rust
2465    /// # #[cfg(feature = "deploy")] {
2466    /// # use hydro_lang::prelude::*;
2467    /// # use futures::StreamExt;
2468    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2469    /// # let tick = process.tick();
2470    /// let requests = // { 1: [10, 11], 2: 20 }
2471    /// # process
2472    /// #     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2473    /// #     .into_keyed()
2474    /// #     .batch(&tick, nondet!(/** test */));
2475    /// let other_data = // { 10: 100, 11: 110 }
2476    /// # process
2477    /// #     .source_iter(q!(vec![(10, 100), (11, 110)]))
2478    /// #     .into_keyed()
2479    /// #     .batch(&tick, nondet!(/** test */))
2480    /// #     .first();
2481    /// requests.lookup_keyed_singleton(other_data)
2482    /// # .entries().all_ticks()
2483    /// # }, |mut stream| async move {
2484    /// // { 1: [(10, Some(100)), (11, Some(110))], 2: (20, None) }
2485    /// # let mut results = vec![];
2486    /// # for _ in 0..3 {
2487    /// #     results.push(stream.next().await.unwrap());
2488    /// # }
2489    /// # results.sort();
2490    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (11, Some(110))), (2, (20, None))]);
2491    /// # }));
2492    /// # }
2493    /// ```
2494    pub fn lookup_keyed_singleton<V2>(
2495        self,
2496        lookup: KeyedSingleton<V, V2, L, Bounded>,
2497    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
2498    where
2499        B: IsBounded,
2500        K: Eq + Hash + Clone,
2501        V: Eq + Hash + Clone,
2502        V2: Clone,
2503    {
2504        self.lookup_keyed_stream(lookup.into_keyed_stream().weaken_retries::<R>())
2505    }
2506
2507    /// For each value in `self`, find the matching key in `lookup`.
2508    /// The output is a keyed stream with the key from `self`, and a value
2509    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
2510    /// If the key is not present in `lookup`, the option will be [`None`].
2511    ///
2512    /// # Example
2513    /// ```rust
2514    /// # #[cfg(feature = "deploy")] {
2515    /// # use hydro_lang::prelude::*;
2516    /// # use futures::StreamExt;
2517    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2518    /// # let tick = process.tick();
2519    /// let requests = // { 1: [10, 11], 2: 20 }
2520    /// # process
2521    /// #     .source_iter(q!(vec![(1, 10), (1, 11), (2, 20)]))
2522    /// #     .into_keyed()
2523    /// #     .batch(&tick, nondet!(/** test */));
2524    /// let other_data = // { 10: [100, 101], 11: 110 }
2525    /// # process
2526    /// #     .source_iter(q!(vec![(10, 100), (10, 101), (11, 110)]))
2527    /// #     .into_keyed()
2528    /// #     .batch(&tick, nondet!(/** test */));
2529    /// requests.lookup_keyed_stream(other_data)
2530    /// # .entries().all_ticks()
2531    /// # }, |mut stream| async move {
2532    /// // { 1: [(10, Some(100)), (10, Some(101)), (11, Some(110))], 2: (20, None) }
2533    /// # let mut results = vec![];
2534    /// # for _ in 0..4 {
2535    /// #     results.push(stream.next().await.unwrap());
2536    /// # }
2537    /// # results.sort();
2538    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(101))), (1, (11, Some(110))), (2, (20, None))]);
2539    /// # }));
2540    /// # }
2541    /// ```
2542    #[expect(clippy::type_complexity, reason = "retries propagation")]
2543    pub fn lookup_keyed_stream<V2, O2: Ordering, R2: Retries>(
2544        self,
2545        lookup: KeyedStream<V, V2, L, Bounded, O2, R2>,
2546    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, <R as MinRetries<R2>>::Min>
2547    where
2548        B: IsBounded,
2549        K: Eq + Hash + Clone,
2550        V: Eq + Hash + Clone,
2551        V2: Clone,
2552        R: MinRetries<R2>,
2553    {
2554        let inverted = self
2555            .make_bounded()
2556            .entries()
2557            .map(q!(|(key, lookup_value)| (lookup_value, key)))
2558            .into_keyed();
2559        let found = inverted
2560            .clone()
2561            .join_keyed_stream(lookup.clone())
2562            .entries()
2563            .map(q!(|(lookup_value, (key, value))| (
2564                key,
2565                (lookup_value, Some(value))
2566            )))
2567            .into_keyed();
2568        let not_found = inverted
2569            .filter_key_not_in(lookup.keys())
2570            .entries()
2571            .map(q!(|(lookup_value, key)| (key, (lookup_value, None))))
2572            .into_keyed();
2573
2574        found.chain(not_found.weaken_retries::<<R as MinRetries<R2>>::Min>())
2575    }
2576
2577    /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
2578    /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
2579    ///
2580    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2581    /// processed before an acknowledgement is emitted.
2582    pub fn atomic(self) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
2583        let id = self.location.flow_state().borrow_mut().next_clock_id();
2584        let out_location = Atomic {
2585            tick: Tick {
2586                id,
2587                l: self.location.clone(),
2588            },
2589        };
2590        KeyedStream::new(
2591            out_location.clone(),
2592            HydroNode::BeginAtomic {
2593                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2594                metadata: out_location
2595                    .new_node_metadata(KeyedStream::<K, V, Atomic<L>, B, O, R>::collection_kind()),
2596            },
2597        )
2598    }
2599
2600    /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
2601    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2602    /// the order of the input.
2603    ///
2604    /// # Non-Determinism
2605    /// The batch boundaries are non-deterministic and may change across executions.
2606    pub fn batch<L2: Location<'a, NoConsistency = L::NoConsistency>>(
2607        self,
2608        tick: &Tick<L2>,
2609        nondet: NonDet,
2610    ) -> KeyedStream<K, V, Tick<L::NoConsistency>, Bounded, O, R> {
2611        let _ = nondet;
2612        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2613        KeyedStream::new(
2614            tick.drop_consistency(),
2615            HydroNode::Batch {
2616                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2617                metadata: tick.new_node_metadata(
2618                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2619                ),
2620            },
2621        )
2622    }
2623}
2624
2625impl<'a, K1, K2, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2626    KeyedStream<(K1, K2), V, L, B, O, R>
2627{
2628    /// Produces a new keyed stream by dropping the first element of the compound key.
2629    ///
2630    /// Because multiple keys may share the same suffix, this operation results in re-grouping
2631    /// of the values under the new keys. The values across groups with the same new key
2632    /// will be interleaved, so the resulting stream has [`NoOrder`] within each group.
2633    ///
2634    /// # Example
2635    /// ```rust
2636    /// # #[cfg(feature = "deploy")] {
2637    /// # use hydro_lang::prelude::*;
2638    /// # use futures::StreamExt;
2639    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2640    /// process
2641    ///     .source_iter(q!(vec![((1, 10), 2), ((1, 10), 3), ((2, 20), 4)]))
2642    ///     .into_keyed()
2643    ///     .drop_key_prefix()
2644    /// #   .entries()
2645    /// # }, |mut stream| async move {
2646    /// // { 10: [2, 3], 20: [4] }
2647    /// # let mut results = Vec::new();
2648    /// # for _ in 0..3 {
2649    /// #     results.push(stream.next().await.unwrap());
2650    /// # }
2651    /// # results.sort();
2652    /// # assert_eq!(results, vec![(10, 2), (10, 3), (20, 4)]);
2653    /// # }));
2654    /// # }
2655    /// ```
2656    pub fn drop_key_prefix(self) -> KeyedStream<K2, V, L, B, NoOrder, R> {
2657        self.entries()
2658            .map(q!(|((_k1, k2), v)| (k2, v)))
2659            .into_keyed()
2660    }
2661}
2662
2663impl<'a, K, V, L: Location<'a>, O: Ordering, R: Retries> KeyedStream<K, V, L, Unbounded, O, R> {
2664    /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
2665    /// of any overlapping groups. The result has [`NoOrder`] on each group because the
2666    /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
2667    /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
2668    ///
2669    /// Currently, both input streams must be [`Unbounded`].
2670    ///
2671    /// # Example
2672    /// ```rust
2673    /// # #[cfg(feature = "deploy")] {
2674    /// # use hydro_lang::prelude::*;
2675    /// # use futures::StreamExt;
2676    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2677    /// let numbers1: KeyedStream<i32, i32, _> = // { 1: [2], 3: [4] }
2678    /// # process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed().into();
2679    /// let numbers2: KeyedStream<i32, i32, _> = // { 1: [3], 3: [5] }
2680    /// # process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed().into();
2681    /// numbers1.merge_unordered(numbers2)
2682    /// #   .entries()
2683    /// # }, |mut stream| async move {
2684    /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
2685    /// # let mut results = Vec::new();
2686    /// # for _ in 0..4 {
2687    /// #     results.push(stream.next().await.unwrap());
2688    /// # }
2689    /// # results.sort();
2690    /// # assert_eq!(results, vec![(1, 2), (1, 3), (3, 4), (3, 5)]);
2691    /// # }));
2692    /// # }
2693    /// ```
2694    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2695        self,
2696        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2697    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2698    where
2699        R: MinRetries<R2>,
2700    {
2701        KeyedStream::new(
2702            self.location.clone(),
2703            HydroNode::Chain {
2704                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2705                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2706                metadata: self.location.new_node_metadata(KeyedStream::<
2707                    K,
2708                    V,
2709                    L,
2710                    Unbounded,
2711                    NoOrder,
2712                    <R as MinRetries<R2>>::Min,
2713                >::collection_kind()),
2714            },
2715        )
2716    }
2717
2718    /// Deprecated: use [`KeyedStream::merge_unordered`] instead.
2719    #[deprecated(note = "use `merge_unordered` instead")]
2720    pub fn interleave<O2: Ordering, R2: Retries>(
2721        self,
2722        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
2723    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2724    where
2725        R: MinRetries<R2>,
2726    {
2727        self.merge_unordered(other)
2728    }
2729}
2730
2731impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
2732where
2733    L: Location<'a>,
2734{
2735    /// Returns a keyed stream corresponding to the latest batch of elements being atomically
2736    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2737    /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
2738    /// used to create the atomic section.
2739    ///
2740    /// # Non-Determinism
2741    /// The batch boundaries are non-deterministic and may change across executions.
2742    pub fn batch_atomic<L2: Location<'a, NoConsistency = L::NoConsistency>>(
2743        self,
2744        tick: &Tick<L2>,
2745        nondet: NonDet,
2746    ) -> KeyedStream<K, V, Tick<L::NoConsistency>, Bounded, O, R> {
2747        let _ = nondet;
2748        KeyedStream::new(
2749            tick.drop_consistency(),
2750            HydroNode::Batch {
2751                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2752                metadata: tick.new_node_metadata(
2753                    KeyedStream::<K, V, Tick<L>, Bounded, O, R>::collection_kind(),
2754                ),
2755            },
2756        )
2757    }
2758
2759    /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
2760    /// See [`KeyedStream::atomic`] for more details.
2761    pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
2762        KeyedStream::new(
2763            self.location.tick.l.clone(),
2764            HydroNode::EndAtomic {
2765                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2766                metadata: self
2767                    .location
2768                    .tick
2769                    .l
2770                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2771            },
2772        )
2773    }
2774}
2775
2776impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
2777where
2778    L: Location<'a>,
2779{
2780    /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2781    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2782    /// each key.
2783    pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
2784        KeyedStream::new(
2785            self.location.outer().clone(),
2786            HydroNode::YieldConcat {
2787                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2788                metadata: self.location.outer().new_node_metadata(KeyedStream::<
2789                    K,
2790                    V,
2791                    L,
2792                    Unbounded,
2793                    O,
2794                    R,
2795                >::collection_kind(
2796                )),
2797            },
2798        )
2799    }
2800
2801    /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
2802    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
2803    /// each key.
2804    ///
2805    /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
2806    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2807    /// stream's [`Tick`] context.
2808    pub fn all_ticks_atomic(self) -> KeyedStream<K, V, Atomic<L>, Unbounded, O, R> {
2809        let out_location = Atomic {
2810            tick: self.location.clone(),
2811        };
2812
2813        KeyedStream::new(
2814            out_location.clone(),
2815            HydroNode::YieldConcat {
2816                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2817                metadata: out_location.new_node_metadata(KeyedStream::<
2818                    K,
2819                    V,
2820                    Atomic<L>,
2821                    Unbounded,
2822                    O,
2823                    R,
2824                >::collection_kind()),
2825            },
2826        )
2827    }
2828
2829    /// Transforms the keyed stream using the given closure in "stateful" mode, where stateful operators
2830    /// such as `fold` retrain their memory for each key across ticks rather than resetting across batches of each key.
2831    ///
2832    /// This API is particularly useful for stateful computation on batches of data, such as
2833    /// maintaining an accumulated state that is up to date with the current batch.
2834    ///
2835    /// # Example
2836    /// ```rust
2837    /// # #[cfg(feature = "deploy")] {
2838    /// # use hydro_lang::prelude::*;
2839    /// # use futures::StreamExt;
2840    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2841    /// let tick = process.tick();
2842    /// # // ticks are lazy by default, forces the second tick to run
2843    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2844    /// # let batch_first_tick = process
2845    /// #   .source_iter(q!(vec![(0, 1), (1, 2), (2, 3), (3, 4)]))
2846    /// #   .into_keyed()
2847    /// #   .batch(&tick, nondet!(/** test */));
2848    /// # let batch_second_tick = process
2849    /// #   .source_iter(q!(vec![(0, 5), (1, 6), (2, 7)]))
2850    /// #   .into_keyed()
2851    /// #   .batch(&tick, nondet!(/** test */))
2852    /// #   .defer_tick(); // appears on the second tick
2853    /// let input = batch_first_tick.chain(batch_second_tick).all_ticks();
2854    ///
2855    /// input.batch(&tick, nondet!(/** test */))
2856    ///     .across_ticks(|s| s.reduce(q!(|sum, new| {
2857    ///         *sum += new;
2858    ///     }))).entries().all_ticks()
2859    /// # }, |mut stream| async move {
2860    /// // First tick: [(0, 1), (1, 2), (2, 3), (3, 4)]
2861    /// # let mut results = Vec::new();
2862    /// # for _ in 0..4 {
2863    /// #     results.push(stream.next().await.unwrap());
2864    /// # }
2865    /// # results.sort();
2866    /// # assert_eq!(results, vec![(0, 1), (1, 2), (2, 3), (3, 4)]);
2867    /// // Second tick: [(0, 6), (1, 8), (2, 10), (3, 4)]
2868    /// # results.clear();
2869    /// # for _ in 0..4 {
2870    /// #     results.push(stream.next().await.unwrap());
2871    /// # }
2872    /// # results.sort();
2873    /// # assert_eq!(results, vec![(0, 6), (1, 8), (2, 10), (3, 4)]);
2874    /// # }));
2875    /// # }
2876    /// ```
2877    pub fn across_ticks<Out: BatchAtomic<'a>>(
2878        self,
2879        thunk: impl FnOnce(KeyedStream<K, V, Atomic<L>, Unbounded, O, R>) -> Out,
2880    ) -> Out::Batched {
2881        thunk(self.all_ticks_atomic()).batched_atomic()
2882    }
2883
2884    /// Shifts the entries in `self` to the **next tick**, so that the returned keyed stream at
2885    /// tick `T` always has the entries of `self` at tick `T - 1`.
2886    ///
2887    /// At tick `0`, the output keyed stream is empty, since there is no previous tick.
2888    ///
2889    /// This operator enables stateful iterative processing with ticks, by sending data from one
2890    /// tick to the next. For example, you can use it to combine inputs across consecutive batches.
2891    ///
2892    /// # Example
2893    /// ```rust
2894    /// # #[cfg(feature = "deploy")] {
2895    /// # use hydro_lang::prelude::*;
2896    /// # use futures::StreamExt;
2897    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2898    /// let tick = process.tick();
2899    /// # // ticks are lazy by default, forces the second tick to run
2900    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2901    /// # let batch_first_tick = process
2902    /// #   .source_iter(q!(vec![(1, 2), (1, 3)]))
2903    /// #   .batch(&tick, nondet!(/** test */))
2904    /// #   .into_keyed();
2905    /// # let batch_second_tick = process
2906    /// #   .source_iter(q!(vec![(1, 4), (2, 5)]))
2907    /// #   .batch(&tick, nondet!(/** test */))
2908    /// #   .defer_tick()
2909    /// #   .into_keyed(); // appears on the second tick
2910    /// let changes_across_ticks = // { 1: [2, 3] } (first tick), { 1: [4], 2: [5] } (second tick)
2911    /// # batch_first_tick.chain(batch_second_tick);
2912    /// changes_across_ticks.clone().defer_tick().chain( // from the previous tick
2913    ///     changes_across_ticks // from the current tick
2914    /// )
2915    /// # .entries().all_ticks()
2916    /// # }, |mut stream| async move {
2917    /// // First tick: { 1: [2, 3] }
2918    /// # let mut results = Vec::new();
2919    /// # for _ in 0..2 {
2920    /// #     results.push(stream.next().await.unwrap());
2921    /// # }
2922    /// # results.sort();
2923    /// # assert_eq!(results, vec![(1, 2), (1, 3)]);
2924    /// // Second tick: { 1: [2, 3, 4], 2: [5] }
2925    /// # results.clear();
2926    /// # for _ in 0..4 {
2927    /// #     results.push(stream.next().await.unwrap());
2928    /// # }
2929    /// # results.sort();
2930    /// # assert_eq!(results, vec![(1, 2), (1, 3), (1, 4), (2, 5)]);
2931    /// // Third tick: { 1: [4], 2: [5] }
2932    /// # results.clear();
2933    /// # for _ in 0..2 {
2934    /// #     results.push(stream.next().await.unwrap());
2935    /// # }
2936    /// # results.sort();
2937    /// # assert_eq!(results, vec![(1, 4), (2, 5)]);
2938    /// # }));
2939    /// # }
2940    /// ```
2941    pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
2942        KeyedStream::new(
2943            self.location.clone(),
2944            HydroNode::DeferTick {
2945                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2946                metadata: self.location.new_node_metadata(KeyedStream::<
2947                    K,
2948                    V,
2949                    Tick<L>,
2950                    Bounded,
2951                    O,
2952                    R,
2953                >::collection_kind()),
2954            },
2955        )
2956    }
2957}
2958
2959#[cfg(test)]
2960mod tests {
2961    #[cfg(feature = "deploy")]
2962    use futures::{SinkExt, StreamExt};
2963    #[cfg(feature = "deploy")]
2964    use hydro_deploy::Deployment;
2965    #[cfg(any(feature = "deploy", feature = "sim"))]
2966    use stageleft::q;
2967
2968    #[cfg(any(feature = "deploy", feature = "sim"))]
2969    use crate::compile::builder::FlowBuilder;
2970    #[cfg(feature = "deploy")]
2971    use crate::live_collections::stream::ExactlyOnce;
2972    #[cfg(feature = "sim")]
2973    use crate::live_collections::stream::{NoOrder, TotalOrder};
2974    #[cfg(any(feature = "deploy", feature = "sim"))]
2975    use crate::location::Location;
2976    #[cfg(feature = "sim")]
2977    use crate::networking::TCP;
2978    #[cfg(any(feature = "deploy", feature = "sim"))]
2979    use crate::nondet::nondet;
2980    #[cfg(feature = "deploy")]
2981    use crate::properties::manual_proof;
2982
2983    #[cfg(feature = "deploy")]
2984    #[tokio::test]
2985    async fn get_unbounded_keyed_stream_bounded_singleton() {
2986        let mut deployment = Deployment::new();
2987
2988        let mut flow = FlowBuilder::new();
2989        let node = flow.process::<()>();
2990        let external = flow.external::<()>();
2991
2992        let (input_send, input_stream) =
2993            node.source_external_bincode::<_, (i32, i32), _, ExactlyOnce>(&external);
2994
2995        let key = node.singleton(q!(1));
2996
2997        let out = input_stream
2998            .into_keyed()
2999            .get(key)
3000            .send_bincode_external(&external);
3001
3002        let nodes = flow
3003            .with_process(&node, deployment.Localhost())
3004            .with_external(&external, deployment.Localhost())
3005            .deploy(&mut deployment);
3006
3007        deployment.deploy().await.unwrap();
3008
3009        let mut input_send = nodes.connect(input_send).await;
3010        let mut out = nodes.connect(out).await;
3011
3012        deployment.start().await.unwrap();
3013
3014        // First batch
3015        input_send.send((1, 10)).await.unwrap();
3016        input_send.send((2, 20)).await.unwrap();
3017        assert_eq!(out.next().await.unwrap(), 10);
3018
3019        // Second batch
3020        input_send.send((1, 11)).await.unwrap();
3021        input_send.send((2, 21)).await.unwrap();
3022        assert_eq!(out.next().await.unwrap(), 11);
3023    }
3024
3025    #[cfg(feature = "deploy")]
3026    #[tokio::test]
3027    async fn reduce_watermark_filter() {
3028        let mut deployment = Deployment::new();
3029
3030        let mut flow = FlowBuilder::new();
3031        let node = flow.process::<()>();
3032        let external = flow.external::<()>();
3033
3034        let node_tick = node.tick();
3035        let watermark = node_tick.singleton(q!(2));
3036
3037        let sum = node
3038            .source_stream(q!(tokio_stream::iter([
3039                (0, 100),
3040                (1, 101),
3041                (2, 102),
3042                (2, 102)
3043            ])))
3044            .into_keyed()
3045            .reduce_watermark(
3046                watermark,
3047                q!(|acc, v| {
3048                    *acc += v;
3049                }),
3050            )
3051            .snapshot(&node_tick, nondet!(/** test */))
3052            .entries()
3053            .all_ticks()
3054            .send_bincode_external(&external);
3055
3056        let nodes = flow
3057            .with_process(&node, deployment.Localhost())
3058            .with_external(&external, deployment.Localhost())
3059            .deploy(&mut deployment);
3060
3061        deployment.deploy().await.unwrap();
3062
3063        let mut out = nodes.connect(sum).await;
3064
3065        deployment.start().await.unwrap();
3066
3067        assert_eq!(out.next().await.unwrap(), (2, 204));
3068    }
3069
3070    #[cfg(feature = "deploy")]
3071    #[tokio::test]
3072    async fn reduce_watermark_bounded() {
3073        let mut deployment = Deployment::new();
3074
3075        let mut flow = FlowBuilder::new();
3076        let node = flow.process::<()>();
3077        let external = flow.external::<()>();
3078
3079        let node_tick = node.tick();
3080        let watermark = node_tick.singleton(q!(2));
3081
3082        let sum = node
3083            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
3084            .into_keyed()
3085            .reduce_watermark(
3086                watermark,
3087                q!(|acc, v| {
3088                    *acc += v;
3089                }),
3090            )
3091            .entries()
3092            .send_bincode_external(&external);
3093
3094        let nodes = flow
3095            .with_process(&node, deployment.Localhost())
3096            .with_external(&external, deployment.Localhost())
3097            .deploy(&mut deployment);
3098
3099        deployment.deploy().await.unwrap();
3100
3101        let mut out = nodes.connect(sum).await;
3102
3103        deployment.start().await.unwrap();
3104
3105        assert_eq!(out.next().await.unwrap(), (2, 204));
3106    }
3107
3108    #[cfg(feature = "deploy")]
3109    #[tokio::test]
3110    async fn reduce_watermark_garbage_collect() {
3111        let mut deployment = Deployment::new();
3112
3113        let mut flow = FlowBuilder::new();
3114        let node = flow.process::<()>();
3115        let external = flow.external::<()>();
3116        let (tick_send, tick_trigger) =
3117            node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
3118
3119        let node_tick = node.tick();
3120        let (watermark_complete_cycle, watermark) =
3121            node_tick.cycle_with_initial(node_tick.singleton(q!(2)));
3122        let next_watermark = watermark.clone().map(q!(|v| v + 1));
3123        watermark_complete_cycle.complete_next_tick(next_watermark);
3124
3125        let tick_triggered_input = node_tick
3126            .singleton(q!((3, 103)))
3127            .into_stream()
3128            .filter_if(
3129                tick_trigger
3130                    .clone()
3131                    .batch(&node_tick, nondet!(/** test */))
3132                    .first()
3133                    .is_some(),
3134            )
3135            .all_ticks();
3136
3137        let sum = node
3138            .source_stream(q!(tokio_stream::iter([
3139                (0, 100),
3140                (1, 101),
3141                (2, 102),
3142                (2, 102)
3143            ])))
3144            .merge_unordered(tick_triggered_input)
3145            .into_keyed()
3146            .reduce_watermark(
3147                watermark,
3148                q!(
3149                    |acc, v| {
3150                        *acc += v;
3151                    },
3152                    commutative = manual_proof!(/** integer addition is commutative */)
3153                ),
3154            )
3155            .snapshot(&node_tick, nondet!(/** test */))
3156            .entries()
3157            .all_ticks()
3158            .send_bincode_external(&external);
3159
3160        let nodes = flow
3161            .with_default_optimize()
3162            .with_process(&node, deployment.Localhost())
3163            .with_external(&external, deployment.Localhost())
3164            .deploy(&mut deployment);
3165
3166        deployment.deploy().await.unwrap();
3167
3168        let mut tick_send = nodes.connect(tick_send).await;
3169        let mut out_recv = nodes.connect(sum).await;
3170
3171        deployment.start().await.unwrap();
3172
3173        assert_eq!(out_recv.next().await.unwrap(), (2, 204));
3174
3175        tick_send.send(()).await.unwrap();
3176
3177        assert_eq!(out_recv.next().await.unwrap(), (3, 103));
3178    }
3179
3180    #[cfg(feature = "sim")]
3181    #[test]
3182    #[should_panic]
3183    fn sim_batch_nondet_size() {
3184        let mut flow = FlowBuilder::new();
3185        let node = flow.process::<()>();
3186
3187        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
3188
3189        let tick = node.tick();
3190        let out_recv = input
3191            .batch(&tick, nondet!(/** test */))
3192            .fold(q!(|| vec![]), q!(|acc, v| acc.push(v)))
3193            .entries()
3194            .all_ticks()
3195            .sim_output();
3196
3197        flow.sim().exhaustive(async || {
3198            out_recv
3199                .assert_yields_only_unordered([(1, vec![1, 2])])
3200                .await;
3201        });
3202    }
3203
3204    #[cfg(feature = "sim")]
3205    #[test]
3206    fn sim_batch_preserves_group_order() {
3207        let mut flow = FlowBuilder::new();
3208        let node = flow.process::<()>();
3209
3210        let input = node.source_iter(q!([(1, 1), (1, 2), (2, 3)])).into_keyed();
3211
3212        let tick = node.tick();
3213        let out_recv = input
3214            .batch(&tick, nondet!(/** test */))
3215            .all_ticks()
3216            .fold_early_stop(
3217                q!(|| 0),
3218                q!(|acc, v| {
3219                    *acc = std::cmp::max(v, *acc);
3220                    *acc >= 2
3221                }),
3222            )
3223            .entries()
3224            .sim_output();
3225
3226        let instances = flow.sim().exhaustive(async || {
3227            out_recv
3228                .assert_yields_only_unordered([(1, 2), (2, 3)])
3229                .await;
3230        });
3231
3232        assert_eq!(instances, 8);
3233        // - three cases: all three in a separate tick (pick where (2, 3) is)
3234        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after
3235        // - two cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them
3236        // - one case: all three together
3237    }
3238
3239    #[cfg(feature = "sim")]
3240    #[test]
3241    fn sim_batch_unordered_shuffles() {
3242        let mut flow = FlowBuilder::new();
3243        let node = flow.process::<()>();
3244
3245        let input = node
3246            .source_iter(q!([(1, 1), (1, 2), (2, 3)]))
3247            .into_keyed()
3248            .weaken_ordering::<NoOrder>();
3249
3250        let tick = node.tick();
3251        let out_recv = input
3252            .batch(&tick, nondet!(/** test */))
3253            .all_ticks()
3254            .entries()
3255            .sim_output();
3256
3257        let instances = flow.sim().exhaustive(async || {
3258            out_recv
3259                .assert_yields_only_unordered([(1, 1), (1, 2), (2, 3)])
3260                .await;
3261        });
3262
3263        assert_eq!(instances, 13);
3264        // - 6 (3 * 2) cases: all three in a separate tick (pick where (2, 3) is), and order of (1, 1), (1, 2)
3265        // - two cases: (1, 1) and (1, 2) together, (2, 3) before or after (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
3266        // - 4 (2 * 2) cases: (1, 1) and (1, 2) separate, (2, 3) grouped with one of them, and order of (1, 1), (1, 2)
3267        // - one case: all three together (order of (1, 1), (1, 2) doesn't matter because batched is still unordered)
3268    }
3269
3270    #[cfg(feature = "sim")]
3271    #[test]
3272    #[should_panic]
3273    fn sim_observe_order_batched() {
3274        let mut flow = FlowBuilder::new();
3275        let node = flow.process::<()>();
3276
3277        let (port, input) = node.sim_input::<_, NoOrder, _>();
3278
3279        let tick = node.tick();
3280        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3281        let out_recv = batch
3282            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3283            .all_ticks()
3284            .first()
3285            .entries()
3286            .sim_output();
3287
3288        flow.sim().exhaustive(async || {
3289            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
3290            out_recv
3291                .assert_yields_only_unordered([(1, 1), (2, 1)])
3292                .await; // fails with assume_ordering
3293        });
3294    }
3295
3296    #[cfg(feature = "sim")]
3297    #[test]
3298    fn sim_observe_order_batched_count() {
3299        let mut flow = FlowBuilder::new();
3300        let node = flow.process::<()>();
3301
3302        let (port, input) = node.sim_input::<_, NoOrder, _>();
3303
3304        let tick = node.tick();
3305        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3306        let out_recv = batch
3307            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3308            .all_ticks()
3309            .entries()
3310            .sim_output();
3311
3312        let instance_count = flow.sim().exhaustive(async || {
3313            port.send_many_unordered([(1, 1), (1, 2), (2, 1), (2, 2)]);
3314            let _ = out_recv.collect_sorted::<Vec<_>>().await;
3315        });
3316
3317        assert_eq!(instance_count, 104); // too complicated to enumerate here, but less than stream equivalent
3318    }
3319
3320    #[cfg(feature = "sim")]
3321    #[test]
3322    fn sim_top_level_assume_ordering() {
3323        use std::collections::HashMap;
3324
3325        let mut flow = FlowBuilder::new();
3326        let node = flow.process::<()>();
3327
3328        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3329
3330        let out_recv = input
3331            .into_keyed()
3332            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3333            .fold_early_stop(
3334                q!(|| Vec::new()),
3335                q!(|acc, v| {
3336                    acc.push(v);
3337                    acc.len() >= 2
3338                }),
3339            )
3340            .entries()
3341            .sim_output();
3342
3343        let instance_count = flow.sim().exhaustive(async || {
3344            in_send.send_many_unordered([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd')]);
3345            let out: HashMap<_, _> = out_recv
3346                .collect_sorted::<Vec<_>>()
3347                .await
3348                .into_iter()
3349                .collect();
3350            // Each key accumulates its values; we get one entry per key
3351            assert_eq!(out.len(), 2);
3352        });
3353
3354        assert_eq!(instance_count, 24)
3355    }
3356
3357    #[cfg(feature = "sim")]
3358    #[test]
3359    fn sim_top_level_assume_ordering_cycle_back() {
3360        use std::collections::HashMap;
3361
3362        let mut flow = FlowBuilder::new();
3363        let node = flow.process::<()>();
3364        let node2 = flow.process::<()>();
3365
3366        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3367
3368        let (complete_cycle_back, cycle_back) =
3369            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3370        let ordered = input
3371            .into_keyed()
3372            .merge_unordered(cycle_back)
3373            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3374        complete_cycle_back.complete(
3375            ordered
3376                .clone()
3377                .map(q!(|v| v + 1))
3378                .filter(q!(|v| v % 2 == 1))
3379                .entries()
3380                .send(&node2, TCP.fail_stop().bincode())
3381                .send(&node, TCP.fail_stop().bincode())
3382                .into_keyed(),
3383        );
3384
3385        let out_recv = ordered
3386            .fold_early_stop(
3387                q!(|| Vec::new()),
3388                q!(|acc, v| {
3389                    acc.push(v);
3390                    acc.len() >= 2
3391                }),
3392            )
3393            .entries()
3394            .sim_output();
3395
3396        let mut saw = false;
3397        let instance_count = flow.sim().exhaustive(async || {
3398            // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back.
3399            // We want to see [0, 1] - the cycled back value interleaved
3400            in_send.send_many_unordered([(1, 0), (1, 2)]);
3401            let out: HashMap<_, _> = out_recv
3402                .collect_sorted::<Vec<_>>()
3403                .await
3404                .into_iter()
3405                .collect();
3406
3407            // We want to see an instance where key 1 gets: 0, then 1 (cycled back from 0+1)
3408            if let Some(values) = out.get(&1)
3409                && *values == vec![0, 1]
3410            {
3411                saw = true;
3412            }
3413        });
3414
3415        assert!(
3416            saw,
3417            "did not see an instance with key 1 having [0, 1] in order"
3418        );
3419        assert_eq!(instance_count, 6);
3420    }
3421
3422    #[cfg(feature = "sim")]
3423    #[test]
3424    fn sim_top_level_assume_ordering_cross_key_cycle() {
3425        use std::collections::HashMap;
3426
3427        // This test demonstrates why releasing one entry at a time is important:
3428        // When one key's observed order cycles back into a different key, we need
3429        // to be able to interleave the cycled-back entry with pending items for
3430        // that other key.
3431        let mut flow = FlowBuilder::new();
3432        let node = flow.process::<()>();
3433        let node2 = flow.process::<()>();
3434
3435        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3436
3437        let (complete_cycle_back, cycle_back) =
3438            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3439        let ordered = input
3440            .into_keyed()
3441            .merge_unordered(cycle_back)
3442            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3443
3444        // Cycle back: when we see (1, 10), emit (2, 100) to key 2
3445        complete_cycle_back.complete(
3446            ordered
3447                .clone()
3448                .filter(q!(|v| *v == 10))
3449                .map(q!(|_| 100))
3450                .entries()
3451                .map(q!(|(_, v)| (2, v))) // Change key from 1 to 2
3452                .send(&node2, TCP.fail_stop().bincode())
3453                .send(&node, TCP.fail_stop().bincode())
3454                .into_keyed(),
3455        );
3456
3457        let out_recv = ordered
3458            .fold_early_stop(
3459                q!(|| Vec::new()),
3460                q!(|acc, v| {
3461                    acc.push(v);
3462                    acc.len() >= 2
3463                }),
3464            )
3465            .entries()
3466            .sim_output();
3467
3468        // We want to see an instance where:
3469        // - (1, 10) is released first
3470        // - This causes (2, 100) to be cycled back
3471        // - (2, 100) is released BEFORE (2, 20) which was already pending
3472        let mut saw_cross_key_interleave = false;
3473        let instance_count = flow.sim().exhaustive(async || {
3474            // Send (1, 10), (1, 11) for key 1, and (2, 20), (2, 21) for key 2
3475            in_send.send_many_unordered([(1, 10), (1, 11), (2, 20), (2, 21)]);
3476            let out: HashMap<_, _> = out_recv
3477                .collect_sorted::<Vec<_>>()
3478                .await
3479                .into_iter()
3480                .collect();
3481
3482            // Check if we see the cross-key interleaving:
3483            // key 2 should have [100, 20] or [100, 21] - cycled back 100 before a pending item
3484            if let Some(values) = out.get(&2)
3485                && values.len() >= 2
3486                && values[0] == 100
3487            {
3488                saw_cross_key_interleave = true;
3489            }
3490        });
3491
3492        assert!(
3493            saw_cross_key_interleave,
3494            "did not see an instance where cycled-back 100 was released before pending items for key 2"
3495        );
3496        assert_eq!(instance_count, 60);
3497    }
3498
3499    #[cfg(feature = "sim")]
3500    #[test]
3501    fn sim_top_level_assume_ordering_cycle_back_tick() {
3502        use std::collections::HashMap;
3503
3504        let mut flow = FlowBuilder::new();
3505        let node = flow.process::<()>();
3506        let node2 = flow.process::<()>();
3507
3508        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3509
3510        let (complete_cycle_back, cycle_back) =
3511            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3512        let ordered = input
3513            .into_keyed()
3514            .merge_unordered(cycle_back)
3515            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3516        complete_cycle_back.complete(
3517            ordered
3518                .clone()
3519                .batch(&node.tick(), nondet!(/** test */))
3520                .all_ticks()
3521                .map(q!(|v| v + 1))
3522                .filter(q!(|v| v % 2 == 1))
3523                .entries()
3524                .send(&node2, TCP.fail_stop().bincode())
3525                .send(&node, TCP.fail_stop().bincode())
3526                .into_keyed(),
3527        );
3528
3529        let out_recv = ordered
3530            .fold_early_stop(
3531                q!(|| Vec::new()),
3532                q!(|acc, v| {
3533                    acc.push(v);
3534                    acc.len() >= 2
3535                }),
3536            )
3537            .entries()
3538            .sim_output();
3539
3540        let mut saw = false;
3541        let instance_count = flow.sim().exhaustive(async || {
3542            in_send.send_many_unordered([(1, 0), (1, 2)]);
3543            let out: HashMap<_, _> = out_recv
3544                .collect_sorted::<Vec<_>>()
3545                .await
3546                .into_iter()
3547                .collect();
3548
3549            if let Some(values) = out.get(&1)
3550                && *values == vec![0, 1]
3551            {
3552                saw = true;
3553            }
3554        });
3555
3556        assert!(
3557            saw,
3558            "did not see an instance with key 1 having [0, 1] in order"
3559        );
3560        assert_eq!(instance_count, 58);
3561    }
3562
3563    #[cfg(feature = "sim")]
3564    #[test]
3565    fn sim_entries_partially_ordered_bounded() {
3566        let mut flow = FlowBuilder::new();
3567        let node = flow.process::<()>();
3568
3569        let (port, input) = node.sim_input::<_, TotalOrder, _>();
3570
3571        let tick = node.tick();
3572        let batch = input.into_keyed().batch(&tick, nondet!(/** test */));
3573        let out_recv = batch
3574            .entries_partially_ordered(nondet!(/** test */))
3575            .all_ticks()
3576            .sim_output();
3577
3578        let instance_count = flow.sim().exhaustive(async || {
3579            port.send((1, 'a'));
3580            port.send((1, 'b'));
3581            port.send((2, 'c'));
3582            let _: Vec<(i32, char)> = out_recv.collect().await;
3583        });
3584
3585        assert_eq!(instance_count, 12);
3586    }
3587
3588    #[cfg(feature = "sim")]
3589    #[test]
3590    fn sim_entries_partially_ordered_top_level() {
3591        let mut flow = FlowBuilder::new();
3592        let node = flow.process::<()>();
3593
3594        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3595
3596        let out_recv = input
3597            .into_keyed()
3598            .entries_partially_ordered(nondet!(/** test */))
3599            .sim_output();
3600
3601        let instance_count = flow.sim().exhaustive(async || {
3602            in_send.send((1, 'a'));
3603            in_send.send((1, 'b'));
3604            in_send.send((2, 'c'));
3605            let _: Vec<(i32, char)> = out_recv.collect().await;
3606        });
3607
3608        assert_eq!(instance_count, 3);
3609    }
3610
3611    #[cfg(feature = "sim")]
3612    #[test]
3613    fn sim_entries_partially_ordered_cycle_back() {
3614        let mut flow = FlowBuilder::new();
3615        let node = flow.process::<()>();
3616        let node2 = flow.process::<()>();
3617
3618        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3619
3620        let (complete_cycle_back, cycle_back) =
3621            node.forward_ref::<super::KeyedStream<_, _, _, _, NoOrder>>();
3622        let ordered = input
3623            .into_keyed()
3624            .merge_unordered(cycle_back)
3625            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3626
3627        let flat = ordered
3628            .clone()
3629            .entries_partially_ordered(nondet!(/** test */));
3630
3631        complete_cycle_back.complete(
3632            flat.clone()
3633                .map(q!(|(k, v): (i32, i32)| (k, v + 1)))
3634                .filter(q!(|(_, v)| *v % 2 == 1))
3635                .send(&node2, TCP.fail_stop().bincode())
3636                .send(&node, TCP.fail_stop().bincode())
3637                .into_keyed(),
3638        );
3639
3640        let out_recv = flat.sim_output();
3641
3642        let mut saw = false;
3643        let instance_count = flow.sim().exhaustive(async || {
3644            // Send (1, 0) and (1, 2). 0+1=1 is odd so cycles back as (1, 1).
3645            // We want to see (1, 1) before (1, 2) - the cycled back value beats the pending one
3646            in_send.send_many_unordered([(1, 0), (1, 2)]);
3647            let results: Vec<(i32, i32)> = out_recv.collect().await;
3648
3649            let pos_1 = results.iter().position(|v| *v == (1, 1));
3650            let pos_2 = results.iter().position(|v| *v == (1, 2));
3651            if let (Some(p1), Some(p2)) = (pos_1, pos_2)
3652                && p1 < p2
3653            {
3654                saw = true;
3655            }
3656        });
3657
3658        assert!(saw, "did not see an instance with (1, 1) before (1, 2)");
3659        assert_eq!(instance_count, 78);
3660    }
3661}