Skip to main content

hydro_lang/live_collections/
singleton.rs

1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::{Deref, Not};
6use std::rc::Rc;
7
8use sealed::sealed;
9use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::optional::Optional;
13use super::sliced::sliced;
14use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
15use crate::compile::builder::{CycleId, FlowState};
16use crate::compile::ir::{
17    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, SingletonBoundKind,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22#[cfg(stageleft_runtime)]
23use crate::location::dynamic::{DynLocation, LocationId};
24use crate::location::tick::{Atomic, NoAtomic};
25use crate::location::{Location, Tick, check_matching_location};
26use crate::nondet::{NonDet, nondet};
27use crate::properties::{
28    ApplyMonotoneStream, ApplyOrderPreservingSingleton, MapFuncAlgebra, Proved,
29};
30
31/// A marker trait indicating which components of a [`Singleton`] may change.
32///
33/// In addition to [`Bounded`] (immutable) and [`Unbounded`] (arbitrarily mutable), this also
34/// includes an additional variant [`Monotonic`], which means that the value will only grow.
35pub trait SingletonBound {
36    /// The [`Boundedness`] that this [`Singleton`] would be erased to.
37    type UnderlyingBound: Boundedness + ApplyMonotoneStream<Proved, Self::StreamToMonotone>;
38
39    /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`Stream`] with [`Self`] boundedness.
40    type StreamToMonotone: SingletonBound<UnderlyingBound = Self::UnderlyingBound>;
41
42    /// Returns the [`SingletonBoundKind`] corresponding to this type.
43    fn bound_kind() -> SingletonBoundKind;
44}
45
46impl SingletonBound for Unbounded {
47    type UnderlyingBound = Unbounded;
48
49    type StreamToMonotone = Monotonic;
50
51    fn bound_kind() -> SingletonBoundKind {
52        SingletonBoundKind::Unbounded
53    }
54}
55
56impl SingletonBound for Bounded {
57    type UnderlyingBound = Bounded;
58
59    type StreamToMonotone = Bounded;
60
61    fn bound_kind() -> SingletonBoundKind {
62        SingletonBoundKind::Bounded
63    }
64}
65
66/// Marks that the [`Singleton`] is monotonic, which means that its value will only grow over time.
67pub struct Monotonic;
68
69impl SingletonBound for Monotonic {
70    type UnderlyingBound = Unbounded;
71
72    type StreamToMonotone = Monotonic;
73
74    fn bound_kind() -> SingletonBoundKind {
75        SingletonBoundKind::Monotonic
76    }
77}
78
79#[sealed]
80#[diagnostic::on_unimplemented(
81    message = "The input singleton must be monotonic (`Monotonic`) or bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
82    label = "required here",
83    note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
84)]
85/// Marker trait that is implemented for the [`Monotonic`] boundedness guarantee.
86pub trait IsMonotonic: SingletonBound {}
87
88#[sealed]
89#[diagnostic::do_not_recommend]
90impl IsMonotonic for Monotonic {}
91
92#[sealed]
93#[diagnostic::do_not_recommend]
94impl<B: IsBounded> IsMonotonic for B {}
95
96/// A single Rust value that can asynchronously change over time.
97///
98/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
99/// [`Unbounded`], the value will asynchronously change over time.
100///
101/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
102/// a single number that will asynchronously change as events are processed. Singletons also appear
103/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
104/// such as getting the length of a batch of requests.
105///
106/// Type Parameters:
107/// - `Type`: the type of the value in this singleton
108/// - `Loc`: the [`Location`] where the singleton is materialized
109/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
110pub struct Singleton<Type, Loc, Bound: SingletonBound> {
111    pub(crate) location: Loc,
112    pub(crate) ir_node: RefCell<HydroNode>,
113    pub(crate) flow_state: FlowState,
114
115    _phantom: PhantomData<(Type, Loc, Bound)>,
116}
117
118impl<T, L, B: SingletonBound> Drop for Singleton<T, L, B> {
119    fn drop(&mut self) {
120        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
121        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
122            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
123                input: Box::new(ir_node),
124                op_metadata: HydroIrOpMetadata::new(),
125            });
126        }
127    }
128}
129
130impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
131where
132    T: Clone,
133    L: Location<'a>,
134{
135    fn from(value: Singleton<T, L, Bounded>) -> Self {
136        let tick = value.location().tick();
137        value.clone_into_tick(&tick).latest()
138    }
139}
140
141impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
142where
143    L: Location<'a>,
144{
145    type Location = Tick<L>;
146
147    fn location(&self) -> &Self::Location {
148        self.location()
149    }
150
151    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
152        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
153            location.clone(),
154            HydroNode::DeferTick {
155                input: Box::new(HydroNode::CycleSource {
156                    cycle_id,
157                    metadata: location.new_node_metadata(Self::collection_kind()),
158                }),
159                metadata: location
160                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
161            },
162        );
163
164        from_previous_tick.unwrap_or(initial)
165    }
166}
167
168impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
169where
170    L: Location<'a>,
171{
172    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
173        assert_eq!(
174            Location::id(&self.location),
175            expected_location,
176            "locations do not match"
177        );
178        self.location
179            .flow_state()
180            .borrow_mut()
181            .push_root(HydroRoot::CycleSink {
182                cycle_id,
183                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
184                op_metadata: HydroIrOpMetadata::new(),
185            });
186    }
187}
188
189impl<'a, T, L, B: SingletonBound> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
190where
191    L: Location<'a>,
192{
193    type Location = L;
194
195    fn create_source(cycle_id: CycleId, location: L) -> Self {
196        Singleton::new(
197            location.clone(),
198            HydroNode::CycleSource {
199                cycle_id,
200                metadata: location.new_node_metadata(Self::collection_kind()),
201            },
202        )
203    }
204}
205
206impl<'a, T, L, B: SingletonBound> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
207where
208    L: Location<'a>,
209{
210    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
211        assert_eq!(
212            Location::id(&self.location),
213            expected_location,
214            "locations do not match"
215        );
216        self.location
217            .flow_state()
218            .borrow_mut()
219            .push_root(HydroRoot::CycleSink {
220                cycle_id,
221                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
222                op_metadata: HydroIrOpMetadata::new(),
223            });
224    }
225}
226
227impl<'a, T, L, B: SingletonBound> Clone for Singleton<T, L, B>
228where
229    T: Clone,
230    L: Location<'a>,
231{
232    fn clone(&self) -> Self {
233        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
234            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
235            *self.ir_node.borrow_mut() = HydroNode::Tee {
236                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
237                metadata: self.location.new_node_metadata(Self::collection_kind()),
238            };
239        }
240
241        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
242            Singleton {
243                location: self.location.clone(),
244                flow_state: self.flow_state.clone(),
245                ir_node: HydroNode::Tee {
246                    inner: SharedNode(inner.0.clone()),
247                    metadata: metadata.clone(),
248                }
249                .into(),
250                _phantom: PhantomData,
251            }
252        } else {
253            unreachable!()
254        }
255    }
256}
257
258#[cfg(stageleft_runtime)]
259fn zip_inside_tick<'a, T, L: Location<'a>, B: SingletonBound, O>(
260    me: Singleton<T, Tick<L>, B>,
261    other: Optional<O, Tick<L>, B::UnderlyingBound>,
262) -> Optional<(T, O), Tick<L>, B::UnderlyingBound> {
263    let me_as_optional: Optional<T, Tick<L>, B::UnderlyingBound> = me.into();
264    super::optional::zip_inside_tick(me_as_optional, other)
265}
266
267impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
268where
269    L: Location<'a>,
270{
271    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
272        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
273        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
274        let flow_state = location.flow_state().clone();
275        Singleton {
276            location,
277            flow_state,
278            ir_node: RefCell::new(ir_node),
279            _phantom: PhantomData,
280        }
281    }
282
283    pub(crate) fn collection_kind() -> CollectionKind {
284        CollectionKind::Singleton {
285            bound: B::bound_kind(),
286            element_type: stageleft::quote_type::<T>().into(),
287        }
288    }
289
290    /// Returns the [`Location`] where this singleton is being materialized.
291    pub fn location(&self) -> &L {
292        &self.location
293    }
294
295    /// Weakens the consistency of this live collection to not guarantee any consistency across
296    /// cluster members (if this collection is on a cluster).
297    pub fn weaken_consistency(self) -> Singleton<T, L::NoConsistency, B>
298    where
299        L: Location<'a>,
300    {
301        if L::consistency()
302            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
303        {
304            // already no consistency
305            Singleton::new(
306                self.location.drop_consistency(),
307                self.ir_node.replace(HydroNode::Placeholder),
308            )
309        } else {
310            Singleton::new(
311                self.location.drop_consistency(),
312                HydroNode::Cast {
313                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
314                    metadata: self
315                        .location
316                        .clone()
317                        .drop_consistency()
318                        .new_node_metadata(Singleton::<T, L::NoConsistency, B>::collection_kind()),
319                },
320            )
321        }
322    }
323
324    /// Casts this live collection to have the consistency guarantees specified in the given
325    /// location type parameter. The developer must ensure that the strengthened consistency
326    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
327    pub fn assert_has_consistency_of<L2: Location<'a, NoConsistency = L::NoConsistency>>(
328        self,
329        _proof: impl crate::properties::ConsistencyProof,
330    ) -> Singleton<T, L2, B>
331    where
332        L: Location<'a>,
333    {
334        if L::consistency() == L2::consistency() {
335            Singleton::new(
336                self.location.with_consistency_of(),
337                self.ir_node.replace(HydroNode::Placeholder),
338            )
339        } else {
340            Singleton::new(
341                self.location.with_consistency_of(),
342                HydroNode::AssertIsConsistent {
343                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
344                    metadata: self
345                        .location
346                        .clone()
347                        .with_consistency_of::<L2>()
348                        .new_node_metadata(Singleton::<T, L2, B>::collection_kind()),
349                },
350            )
351        }
352    }
353
354    /// Drops the monotonicity property of the [`Singleton`].
355    pub fn ignore_monotonic(self) -> Singleton<T, L, B::UnderlyingBound> {
356        if B::bound_kind() == B::UnderlyingBound::bound_kind() {
357            Singleton::new(
358                self.location.clone(),
359                self.ir_node.replace(HydroNode::Placeholder),
360            )
361        } else {
362            Singleton::new(
363                self.location.clone(),
364                HydroNode::Cast {
365                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
366                    metadata:
367                        self.location.new_node_metadata(
368                            Singleton::<T, L, B::UnderlyingBound>::collection_kind(),
369                        ),
370                },
371            )
372        }
373    }
374
375    /// Transforms the singleton value by applying a function `f` to it,
376    /// continuously as the input is updated.
377    ///
378    /// # Example
379    /// ```rust
380    /// # #[cfg(feature = "deploy")] {
381    /// # use hydro_lang::prelude::*;
382    /// # use futures::StreamExt;
383    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
384    /// let tick = process.tick();
385    /// let singleton = tick.singleton(q!(5));
386    /// singleton.map(q!(|v| v * 2)).all_ticks()
387    /// # }, |mut stream| async move {
388    /// // 10
389    /// # assert_eq!(stream.next().await.unwrap(), 10);
390    /// # }));
391    /// # }
392    /// ```
393    pub fn map<U, F, OP, B2: SingletonBound>(
394        self,
395        f: impl IntoQuotedMut<'a, F, L, MapFuncAlgebra<OP>>,
396    ) -> Singleton<U, L, B2>
397    where
398        F: Fn(T) -> U + 'a,
399        B: ApplyOrderPreservingSingleton<OP, B2>,
400    {
401        let (f, proof) = f.splice_fn1_ctx_props(&self.location);
402        proof.register_proof(&f);
403        let f = f.into();
404        Singleton::new(
405            self.location.clone(),
406            HydroNode::Map {
407                f,
408                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
409                metadata: self
410                    .location
411                    .new_node_metadata(Singleton::<U, L, B2>::collection_kind()),
412            },
413        )
414    }
415
416    /// Transforms the singleton value by applying a function `f` to it and then flattening
417    /// the result into a stream, preserving the order of elements.
418    ///
419    /// The function `f` is applied to the singleton value to produce an iterator, and all items
420    /// from that iterator are emitted in the output stream in deterministic order.
421    ///
422    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
423    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
424    /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
425    ///
426    /// # Example
427    /// ```rust
428    /// # #[cfg(feature = "deploy")] {
429    /// # use hydro_lang::prelude::*;
430    /// # use futures::StreamExt;
431    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
432    /// let tick = process.tick();
433    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
434    /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
435    /// # }, |mut stream| async move {
436    /// // 1, 2, 3
437    /// # for w in vec![1, 2, 3] {
438    /// #     assert_eq!(stream.next().await.unwrap(), w);
439    /// # }
440    /// # }));
441    /// # }
442    /// ```
443    pub fn flat_map_ordered<U, I, F>(
444        self,
445        f: impl IntoQuotedMut<'a, F, L>,
446    ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
447    where
448        B: IsBounded,
449        I: IntoIterator<Item = U>,
450        F: Fn(T) -> I + 'a,
451    {
452        self.into_stream().flat_map_ordered(f)
453    }
454
455    /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
456    /// for the output type `I` to produce items in any order.
457    ///
458    /// The function `f` is applied to the singleton value to produce an iterator, and all items
459    /// from that iterator are emitted in the output stream in non-deterministic order.
460    ///
461    /// # Example
462    /// ```rust
463    /// # #[cfg(feature = "deploy")] {
464    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
465    /// # use futures::StreamExt;
466    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
467    /// let tick = process.tick();
468    /// let singleton = tick.singleton(q!(
469    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
470    /// ));
471    /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
472    /// # }, |mut stream| async move {
473    /// // 1, 2, 3, but in no particular order
474    /// # let mut results = Vec::new();
475    /// # for _ in 0..3 {
476    /// #     results.push(stream.next().await.unwrap());
477    /// # }
478    /// # results.sort();
479    /// # assert_eq!(results, vec![1, 2, 3]);
480    /// # }));
481    /// # }
482    /// ```
483    pub fn flat_map_unordered<U, I, F>(
484        self,
485        f: impl IntoQuotedMut<'a, F, L>,
486    ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
487    where
488        B: IsBounded,
489        I: IntoIterator<Item = U>,
490        F: Fn(T) -> I + 'a,
491    {
492        self.into_stream().flat_map_unordered(f)
493    }
494
495    /// Flattens the singleton value into a stream, preserving the order of elements.
496    ///
497    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
498    /// are emitted in the output stream in deterministic order.
499    ///
500    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
501    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
502    /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
503    ///
504    /// # Example
505    /// ```rust
506    /// # #[cfg(feature = "deploy")] {
507    /// # use hydro_lang::prelude::*;
508    /// # use futures::StreamExt;
509    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
510    /// let tick = process.tick();
511    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
512    /// singleton.flatten_ordered().all_ticks()
513    /// # }, |mut stream| async move {
514    /// // 1, 2, 3
515    /// # for w in vec![1, 2, 3] {
516    /// #     assert_eq!(stream.next().await.unwrap(), w);
517    /// # }
518    /// # }));
519    /// # }
520    /// ```
521    pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
522    where
523        B: IsBounded,
524        T: IntoIterator<Item = U>,
525    {
526        self.flat_map_ordered(q!(|x| x))
527    }
528
529    /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
530    /// for the element type `T` to produce items in any order.
531    ///
532    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
533    /// are emitted in the output stream in non-deterministic order.
534    ///
535    /// # Example
536    /// ```rust
537    /// # #[cfg(feature = "deploy")] {
538    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
539    /// # use futures::StreamExt;
540    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
541    /// let tick = process.tick();
542    /// let singleton = tick.singleton(q!(
543    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
544    /// ));
545    /// singleton.flatten_unordered().all_ticks()
546    /// # }, |mut stream| async move {
547    /// // 1, 2, 3, but in no particular order
548    /// # let mut results = Vec::new();
549    /// # for _ in 0..3 {
550    /// #     results.push(stream.next().await.unwrap());
551    /// # }
552    /// # results.sort();
553    /// # assert_eq!(results, vec![1, 2, 3]);
554    /// # }));
555    /// # }
556    /// ```
557    pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
558    where
559        B: IsBounded,
560        T: IntoIterator<Item = U>,
561    {
562        self.flat_map_unordered(q!(|x| x))
563    }
564
565    /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
566    ///
567    /// If the predicate returns `true`, the output optional contains the same value.
568    /// If the predicate returns `false`, the output optional is empty.
569    ///
570    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
571    /// not modify or take ownership of the value. If you need to modify the value while filtering
572    /// use [`Singleton::filter_map`] instead.
573    ///
574    /// # Example
575    /// ```rust
576    /// # #[cfg(feature = "deploy")] {
577    /// # use hydro_lang::prelude::*;
578    /// # use futures::StreamExt;
579    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
580    /// let tick = process.tick();
581    /// let singleton = tick.singleton(q!(5));
582    /// singleton.filter(q!(|&x| x > 3)).all_ticks()
583    /// # }, |mut stream| async move {
584    /// // 5
585    /// # assert_eq!(stream.next().await.unwrap(), 5);
586    /// # }));
587    /// # }
588    /// ```
589    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B::UnderlyingBound>
590    where
591        F: Fn(&T) -> bool + 'a,
592    {
593        let f = f.splice_fn1_borrow_ctx(&self.location).into();
594        Optional::new(
595            self.location.clone(),
596            HydroNode::Filter {
597                f,
598                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
599                metadata: self
600                    .location
601                    .new_node_metadata(Optional::<T, L, B::UnderlyingBound>::collection_kind()),
602            },
603        )
604    }
605
606    /// An operator that both filters and maps. It yields the value only if the supplied
607    /// closure `f` returns `Some(value)`.
608    ///
609    /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
610    /// If the closure returns `None`, the output optional is empty.
611    ///
612    /// # Example
613    /// ```rust
614    /// # #[cfg(feature = "deploy")] {
615    /// # use hydro_lang::prelude::*;
616    /// # use futures::StreamExt;
617    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
618    /// let tick = process.tick();
619    /// let singleton = tick.singleton(q!("42"));
620    /// singleton
621    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
622    ///     .all_ticks()
623    /// # }, |mut stream| async move {
624    /// // 42
625    /// # assert_eq!(stream.next().await.unwrap(), 42);
626    /// # }));
627    /// # }
628    /// ```
629    pub fn filter_map<U, F>(
630        self,
631        f: impl IntoQuotedMut<'a, F, L>,
632    ) -> Optional<U, L, B::UnderlyingBound>
633    where
634        F: Fn(T) -> Option<U> + 'a,
635    {
636        let f = f.splice_fn1_ctx(&self.location).into();
637        Optional::new(
638            self.location.clone(),
639            HydroNode::FilterMap {
640                f,
641                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
642                metadata: self
643                    .location
644                    .new_node_metadata(Optional::<U, L, B::UnderlyingBound>::collection_kind()),
645            },
646        )
647    }
648
649    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
650    ///
651    /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
652    /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
653    /// non-null. This is useful for combining several pieces of state together.
654    ///
655    /// # Example
656    /// ```rust
657    /// # #[cfg(feature = "deploy")] {
658    /// # use hydro_lang::prelude::*;
659    /// # use futures::StreamExt;
660    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
661    /// let tick = process.tick();
662    /// let numbers = process
663    ///   .source_iter(q!(vec![123, 456]))
664    ///   .batch(&tick, nondet!(/** test */));
665    /// let count = numbers.clone().count(); // Singleton
666    /// let max = numbers.max(); // Optional
667    /// count.zip(max).all_ticks()
668    /// # }, |mut stream| async move {
669    /// // [(2, 456)]
670    /// # for w in vec![(2, 456)] {
671    /// #     assert_eq!(stream.next().await.unwrap(), w);
672    /// # }
673    /// # }));
674    /// # }
675    /// ```
676    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
677    where
678        Self: ZipResult<'a, O, Location = L>,
679        B: IsBounded,
680    {
681        check_matching_location(&self.location, &Self::other_location(&other));
682
683        if L::is_top_level()
684            && let Some(tick) = self.location.try_tick()
685        {
686            let self_location = self.location().clone();
687            let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
688            let out = zip_inside_tick(
689                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
690                Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
691                    other_location.clone(),
692                    HydroNode::Cast {
693                        inner: Box::new(Self::other_ir_node(other)),
694                        metadata: other_location.new_node_metadata(Optional::<
695                            <Self as ZipResult<'a, O>>::OtherType,
696                            Tick<L>,
697                            Bounded,
698                        >::collection_kind(
699                        )),
700                    },
701                )
702                .snapshot(&tick, nondet!(/** eventually stabilizes */)),
703            )
704            .latest();
705
706            Self::make(self_location, out.ir_node.replace(HydroNode::Placeholder))
707        } else {
708            Self::make(
709                self.location.clone(),
710                HydroNode::CrossSingleton {
711                    left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
712                    right: Box::new(Self::other_ir_node(other)),
713                    metadata: self.location.new_node_metadata(CollectionKind::Optional {
714                        bound: B::BOUND_KIND,
715                        element_type: stageleft::quote_type::<
716                            <Self as ZipResult<'a, O>>::ElementType,
717                        >()
718                        .into(),
719                    }),
720                },
721            )
722        }
723    }
724
725    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
726    /// boolean signal is `true`, otherwise the output is null.
727    ///
728    /// # Example
729    /// ```rust
730    /// # #[cfg(feature = "deploy")] {
731    /// # use hydro_lang::prelude::*;
732    /// # use futures::StreamExt;
733    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
734    /// let tick = process.tick();
735    /// // ticks are lazy by default, forces the second tick to run
736    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
737    ///
738    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
739    /// let batch_first_tick = process
740    ///   .source_iter(q!(vec![1]))
741    ///   .batch(&tick, nondet!(/** test */));
742    /// let batch_second_tick = process
743    ///   .source_iter(q!(vec![1, 2, 3]))
744    ///   .batch(&tick, nondet!(/** test */))
745    ///   .defer_tick();
746    /// batch_first_tick.chain(batch_second_tick).count()
747    ///   .filter_if(signal)
748    ///   .all_ticks()
749    /// # }, |mut stream| async move {
750    /// // [1]
751    /// # for w in vec![1] {
752    /// #     assert_eq!(stream.next().await.unwrap(), w);
753    /// # }
754    /// # }));
755    /// # }
756    /// ```
757    pub fn filter_if(
758        self,
759        signal: Singleton<bool, L, B>,
760    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
761    where
762        B: IsBounded,
763    {
764        self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
765    }
766
767    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
768    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
769    ///
770    /// Useful for conditionally processing, such as only emitting a singleton's value outside
771    /// a tick if some other condition is satisfied.
772    ///
773    /// # Example
774    /// ```rust
775    /// # #[cfg(feature = "deploy")] {
776    /// # use hydro_lang::prelude::*;
777    /// # use futures::StreamExt;
778    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
779    /// let tick = process.tick();
780    /// // ticks are lazy by default, forces the second tick to run
781    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
782    ///
783    /// let batch_first_tick = process
784    ///   .source_iter(q!(vec![1]))
785    ///   .batch(&tick, nondet!(/** test */));
786    /// let batch_second_tick = process
787    ///   .source_iter(q!(vec![1, 2, 3]))
788    ///   .batch(&tick, nondet!(/** test */))
789    ///   .defer_tick(); // appears on the second tick
790    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
791    /// batch_first_tick.chain(batch_second_tick).count()
792    ///   .filter_if_some(some_on_first_tick)
793    ///   .all_ticks()
794    /// # }, |mut stream| async move {
795    /// // [1]
796    /// # for w in vec![1] {
797    /// #     assert_eq!(stream.next().await.unwrap(), w);
798    /// # }
799    /// # }));
800    /// # }
801    /// ```
802    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
803    pub fn filter_if_some<U>(
804        self,
805        signal: Optional<U, L, B>,
806    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
807    where
808        B: IsBounded,
809    {
810        self.filter_if(signal.is_some())
811    }
812
813    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
814    /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
815    ///
816    /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
817    /// the condition.
818    ///
819    /// # Example
820    /// ```rust
821    /// # #[cfg(feature = "deploy")] {
822    /// # use hydro_lang::prelude::*;
823    /// # use futures::StreamExt;
824    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
825    /// let tick = process.tick();
826    /// // ticks are lazy by default, forces the second tick to run
827    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
828    ///
829    /// let batch_first_tick = process
830    ///   .source_iter(q!(vec![1]))
831    ///   .batch(&tick, nondet!(/** test */));
832    /// let batch_second_tick = process
833    ///   .source_iter(q!(vec![1, 2, 3]))
834    ///   .batch(&tick, nondet!(/** test */))
835    ///   .defer_tick(); // appears on the second tick
836    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
837    /// batch_first_tick.chain(batch_second_tick).count()
838    ///   .filter_if_none(some_on_first_tick)
839    ///   .all_ticks()
840    /// # }, |mut stream| async move {
841    /// // [3]
842    /// # for w in vec![3] {
843    /// #     assert_eq!(stream.next().await.unwrap(), w);
844    /// # }
845    /// # }));
846    /// # }
847    /// ```
848    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
849    pub fn filter_if_none<U>(
850        self,
851        other: Optional<U, L, B>,
852    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
853    where
854        B: IsBounded,
855    {
856        self.filter_if(other.is_none())
857    }
858
859    /// Returns a [`Singleton`] containing `true` if this singleton's value equals the other's.
860    ///
861    /// # Example
862    /// ```rust
863    /// # #[cfg(feature = "deploy")] {
864    /// # use hydro_lang::prelude::*;
865    /// # use futures::StreamExt;
866    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
867    /// let tick = process.tick();
868    /// let a = tick.singleton(q!(5));
869    /// let b = tick.singleton(q!(5));
870    /// a.equals(b).all_ticks()
871    /// # }, |mut stream| async move {
872    /// // [true]
873    /// # assert_eq!(stream.next().await.unwrap(), true);
874    /// # }));
875    /// # }
876    /// ```
877    pub fn equals(self, other: Singleton<T, L, B>) -> Singleton<bool, L, B>
878    where
879        T: PartialEq,
880        B: IsBounded,
881    {
882        self.zip(other).map(q!(|(a, b)| a == b))
883    }
884
885    /// Returns a [`Stream`] that emits an event the first time the singleton has a value that is
886    /// greater than or equal to the provided threshold. The event will have the value of the
887    /// given threshold.
888    ///
889    /// This requires the incoming singleton to be monotonic, because otherwise the detection of
890    /// the threshold would be non-deterministic.
891    ///
892    /// # Example
893    /// ```rust
894    /// # #[cfg(feature = "deploy")] {
895    /// # use hydro_lang::prelude::*;
896    /// # use futures::StreamExt;
897    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
898    /// let a = // singleton 1 ~> 5 ~> 10
899    /// # process.singleton(q!(5));
900    /// let b = process.singleton(q!(4));
901    /// a.threshold_greater_or_equal(b)
902    /// # }, |mut stream| async move {
903    /// // [4]
904    /// # assert_eq!(stream.next().await.unwrap(), 4);
905    /// # }));
906    /// # }
907    /// ```
908    pub fn threshold_greater_or_equal<B2: IsBounded>(
909        self,
910        threshold: Singleton<T, L, B2>,
911    ) -> Stream<T, L, B::UnderlyingBound>
912    where
913        T: Clone + PartialOrd,
914        B: IsMonotonic,
915    {
916        let threshold = threshold.make_bounded();
917        let self_location = self.location().clone();
918        match self.try_make_bounded() {
919            Ok(bounded) => {
920                let uncasted = threshold
921                    .zip(bounded)
922                    .into_stream()
923                    .filter_map(q!(|(t, m)| if m < t { None } else { Some(t) }));
924
925                Stream::new(
926                    uncasted.location.clone(),
927                    uncasted.ir_node.replace(HydroNode::Placeholder),
928                )
929            }
930            Err(me) => {
931                let uncasted = sliced! {
932                    let me = use(me, nondet!(/** thresholds are deterministic */));
933                    let mut remaining_threshold = use::state(|l| {
934                        let as_option: Optional<_, _, _> = threshold.clone_into_tick(l).into();
935                        as_option
936                    });
937
938                    let (not_passed, passed) = remaining_threshold.zip(me).into_stream().partition(q!(|(t, m)| m < t));
939                    remaining_threshold = not_passed.first().map(q!(|(t, _)| t));
940                    passed.map(q!(|(t, _)| t))
941                };
942
943                Stream::new(
944                    self_location,
945                    uncasted.ir_node.replace(HydroNode::Placeholder),
946                )
947            }
948        }
949    }
950
951    /// An operator which allows you to "name" a `HydroNode`.
952    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
953    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
954        {
955            let mut node = self.ir_node.borrow_mut();
956            let metadata = node.metadata_mut();
957            metadata.tag = Some(name.to_owned());
958        }
959        self
960    }
961}
962
963impl<'a, L: Location<'a>, B: SingletonBound> Not for Singleton<bool, L, B> {
964    type Output = Singleton<bool, L, B::UnderlyingBound>;
965
966    fn not(self) -> Self::Output {
967        self.map(q!(|b| !b))
968    }
969}
970
971impl<'a, T, L, B: SingletonBound> Singleton<Option<T>, L, B>
972where
973    L: Location<'a>,
974{
975    /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
976    /// the inner `Option`.
977    ///
978    /// This is implemented as an identity [`Singleton::filter_map`], passing through the
979    /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
980    /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
981    ///
982    /// # Example
983    /// ```rust
984    /// # #[cfg(feature = "deploy")] {
985    /// # use hydro_lang::prelude::*;
986    /// # use futures::StreamExt;
987    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
988    /// let tick = process.tick();
989    /// let singleton = tick.singleton(q!(Some(42)));
990    /// singleton.into_optional().all_ticks()
991    /// # }, |mut stream| async move {
992    /// // 42
993    /// # assert_eq!(stream.next().await.unwrap(), 42);
994    /// # }));
995    /// # }
996    /// ```
997    pub fn into_optional(self) -> Optional<T, L, B::UnderlyingBound> {
998        self.filter_map(q!(|v| v))
999    }
1000}
1001
1002impl<'a, L, B: SingletonBound> Singleton<bool, L, B>
1003where
1004    L: Location<'a>,
1005{
1006    /// Returns a [`Singleton`] containing the logical AND of this and another boolean singleton.
1007    ///
1008    /// # Example
1009    /// ```rust
1010    /// # #[cfg(feature = "deploy")] {
1011    /// # use hydro_lang::prelude::*;
1012    /// # use futures::StreamExt;
1013    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1014    /// let tick = process.tick();
1015    /// // ticks are lazy by default, forces the second tick to run
1016    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1017    ///
1018    /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1019    /// let b = tick.singleton(q!(true)); // true, true
1020    /// a.and(b).all_ticks()
1021    /// # }, |mut stream| async move {
1022    /// // [true, false]
1023    /// # for w in vec![true, false] {
1024    /// #     assert_eq!(stream.next().await.unwrap(), w);
1025    /// # }
1026    /// # }));
1027    /// # }
1028    /// ```
1029    pub fn and(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1030    where
1031        B: IsBounded,
1032    {
1033        self.zip(other).map(q!(|(a, b)| a && b)).make_bounded()
1034    }
1035
1036    /// Returns a [`Singleton`] containing the logical OR of this and another boolean singleton.
1037    ///
1038    /// # Example
1039    /// ```rust
1040    /// # #[cfg(feature = "deploy")] {
1041    /// # use hydro_lang::prelude::*;
1042    /// # use futures::StreamExt;
1043    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1044    /// let tick = process.tick();
1045    /// // ticks are lazy by default, forces the second tick to run
1046    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1047    ///
1048    /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1049    /// let b = tick.singleton(q!(false)); // false, false
1050    /// a.or(b).all_ticks()
1051    /// # }, |mut stream| async move {
1052    /// // [true, false]
1053    /// # for w in vec![true, false] {
1054    /// #     assert_eq!(stream.next().await.unwrap(), w);
1055    /// # }
1056    /// # }));
1057    /// # }
1058    /// ```
1059    pub fn or(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1060    where
1061        B: IsBounded,
1062    {
1063        self.zip(other).map(q!(|(a, b)| a || b)).make_bounded()
1064    }
1065}
1066
1067impl<'a, T, L, B: SingletonBound> Singleton<T, Atomic<L>, B>
1068where
1069    L: Location<'a>,
1070{
1071    /// Returns a singleton value corresponding to the latest snapshot of the singleton
1072    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1073    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1074    /// all snapshots of this singleton into the atomic-associated tick will observe the
1075    /// same value each tick.
1076    ///
1077    /// # Non-Determinism
1078    /// Because this picks a snapshot of a singleton whose value is continuously changing,
1079    /// the output singleton has a non-deterministic value since the snapshot can be at an
1080    /// arbitrary point in time.
1081    pub fn snapshot_atomic<L2: Location<'a, NoConsistency = L::NoConsistency>>(
1082        self,
1083        tick: &Tick<L2>,
1084        _nondet: NonDet,
1085    ) -> Singleton<T, Tick<L::NoConsistency>, Bounded> {
1086        Singleton::new(
1087            tick.drop_consistency(),
1088            HydroNode::Batch {
1089                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1090                metadata: tick
1091                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1092            },
1093        )
1094    }
1095
1096    /// Returns this singleton back into a top-level, asynchronous execution context where updates
1097    /// to the value will be asynchronously propagated.
1098    pub fn end_atomic(self) -> Singleton<T, L, B> {
1099        Singleton::new(
1100            self.location.tick.l.clone(),
1101            HydroNode::EndAtomic {
1102                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1103                metadata: self
1104                    .location
1105                    .tick
1106                    .l
1107                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
1108            },
1109        )
1110    }
1111}
1112
1113impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
1114where
1115    L: Location<'a>,
1116{
1117    /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
1118    /// will observe the same version of the value and will be executed synchronously before any
1119    /// outputs are yielded (in [`Optional::end_atomic`]).
1120    ///
1121    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1122    /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
1123    /// a different version).
1124    pub fn atomic(self) -> Singleton<T, Atomic<L>, B> {
1125        let id = self.location.flow_state().borrow_mut().next_clock_id();
1126        let out_location = Atomic {
1127            tick: Tick {
1128                id,
1129                l: self.location.clone(),
1130            },
1131        };
1132        Singleton::new(
1133            out_location.clone(),
1134            HydroNode::BeginAtomic {
1135                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1136                metadata: out_location
1137                    .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
1138            },
1139        )
1140    }
1141
1142    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
1143    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1144    /// relevant data that contributed to the snapshot at tick `t`.
1145    ///
1146    /// # Non-Determinism
1147    /// Because this picks a snapshot of a singleton whose value is continuously changing,
1148    /// the output singleton has a non-deterministic value since the snapshot can be at an
1149    /// arbitrary point in time.
1150    pub fn snapshot<L2: Location<'a, NoConsistency = L::NoConsistency>>(
1151        self,
1152        tick: &Tick<L2>,
1153        _nondet: NonDet,
1154    ) -> Singleton<T, Tick<L::NoConsistency>, Bounded> {
1155        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1156        Singleton::new(
1157            tick.drop_consistency(),
1158            HydroNode::Batch {
1159                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1160                metadata: tick
1161                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1162            },
1163        )
1164    }
1165
1166    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
1167    /// with order corresponding to increasing prefixes of data contributing to the singleton.
1168    ///
1169    /// # Non-Determinism
1170    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
1171    /// to non-deterministic batching and arrival of inputs, the output stream is
1172    /// non-deterministic.
1173    pub fn sample_eager(
1174        self,
1175        nondet: NonDet,
1176    ) -> Stream<T, L::NoConsistency, Unbounded, TotalOrder, AtLeastOnce> {
1177        sliced! {
1178            let snapshot = use(self, nondet);
1179            snapshot.into_stream()
1180        }
1181        .weaken_retries()
1182    }
1183
1184    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
1185    /// value taken at various points in time. Because the input singleton may be
1186    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1187    /// represent the value of the singleton given some prefix of the streams leading up to
1188    /// it.
1189    ///
1190    /// # Non-Determinism
1191    /// The output stream is non-deterministic in which elements are sampled, since this
1192    /// is controlled by a clock.
1193    pub fn sample_every(
1194        self,
1195        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1196        nondet: NonDet,
1197    ) -> Stream<T, L::NoConsistency, Unbounded, TotalOrder, AtLeastOnce>
1198    where
1199        L: NoAtomic,
1200    {
1201        let samples = self.location.source_interval(interval, nondet);
1202        sliced! {
1203            let snapshot = use(self, nondet);
1204            let sample_batch = use(samples, nondet);
1205
1206            snapshot.filter_if(sample_batch.first().is_some()).into_stream()
1207        }
1208        .weaken_retries()
1209    }
1210
1211    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1212    /// implies that `B == Bounded`.
1213    pub fn make_bounded(self) -> Singleton<T, L, Bounded>
1214    where
1215        B: IsBounded,
1216    {
1217        Singleton::new(
1218            self.location.clone(),
1219            self.ir_node.replace(HydroNode::Placeholder),
1220        )
1221    }
1222
1223    #[expect(clippy::result_large_err, reason = "internal use only")]
1224    fn try_make_bounded(self) -> Result<Singleton<T, L, Bounded>, Singleton<T, L, B>> {
1225        if B::UnderlyingBound::BOUNDED {
1226            Ok(Singleton::new(
1227                self.location.clone(),
1228                self.ir_node.replace(HydroNode::Placeholder),
1229            ))
1230        } else {
1231            Err(self)
1232        }
1233    }
1234
1235    /// Clones this bounded singleton into a tick, returning a singleton that has the
1236    /// same value as the outer singleton. Because the outer singleton is bounded, this
1237    /// is deterministic because there is only a single immutable version.
1238    pub fn clone_into_tick<L2: Location<'a, NoConsistency = L::NoConsistency>>(
1239        self,
1240        tick: &Tick<L2>,
1241    ) -> Singleton<T, Tick<L2>, Bounded>
1242    where
1243        B: IsBounded,
1244        T: Clone,
1245    {
1246        // TODO(shadaj): avoid printing simulator logs for this snapshot
1247        let inner = self.snapshot(
1248            tick,
1249            nondet!(/** bounded top-level singleton so deterministic */),
1250        );
1251        Singleton::new(tick.clone(), inner.ir_node.replace(HydroNode::Placeholder))
1252    }
1253
1254    /// Converts this singleton into a [`Stream`] containing a single element, the value.
1255    ///
1256    /// # Example
1257    /// ```rust
1258    /// # #[cfg(feature = "deploy")] {
1259    /// # use hydro_lang::prelude::*;
1260    /// # use futures::StreamExt;
1261    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1262    /// let tick = process.tick();
1263    /// let batch_input = process
1264    ///   .source_iter(q!(vec![123, 456]))
1265    ///   .batch(&tick, nondet!(/** test */));
1266    /// batch_input.clone().chain(
1267    ///   batch_input.count().into_stream()
1268    /// ).all_ticks()
1269    /// # }, |mut stream| async move {
1270    /// // [123, 456, 2]
1271    /// # for w in vec![123, 456, 2] {
1272    /// #     assert_eq!(stream.next().await.unwrap(), w);
1273    /// # }
1274    /// # }));
1275    /// # }
1276    /// ```
1277    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1278    where
1279        B: IsBounded,
1280    {
1281        Stream::new(
1282            self.location.clone(),
1283            HydroNode::Cast {
1284                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1285                metadata: self.location.new_node_metadata(Stream::<
1286                    T,
1287                    Tick<L>,
1288                    Bounded,
1289                    TotalOrder,
1290                    ExactlyOnce,
1291                >::collection_kind()),
1292            },
1293        )
1294    }
1295
1296    /// Resolves the singleton's [`Future`] value by blocking until it completes,
1297    /// producing a singleton of the resolved output.
1298    ///
1299    /// This is useful when the singleton contains an async computation that must
1300    /// be awaited before further processing. The future is polled to completion
1301    /// before the output value is emitted.
1302    ///
1303    /// # Example
1304    /// ```rust
1305    /// # #[cfg(feature = "deploy")] {
1306    /// # use hydro_lang::prelude::*;
1307    /// # use futures::StreamExt;
1308    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1309    /// let tick = process.tick();
1310    /// let singleton = tick.singleton(q!(5));
1311    /// singleton
1312    ///     .map(q!(|v| async move { v * 2 }))
1313    ///     .resolve_future_blocking()
1314    ///     .all_ticks()
1315    /// # }, |mut stream| async move {
1316    /// // 10
1317    /// # assert_eq!(stream.next().await.unwrap(), 10);
1318    /// # }));
1319    /// # }
1320    /// ```
1321    pub fn resolve_future_blocking(
1322        self,
1323    ) -> Singleton<T::Output, L, <B as SingletonBound>::UnderlyingBound>
1324    where
1325        T: Future,
1326        B: IsBounded,
1327    {
1328        Singleton::new(
1329            self.location.clone(),
1330            HydroNode::ResolveFuturesBlocking {
1331                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1332                metadata: self
1333                    .location
1334                    .new_node_metadata(Singleton::<T::Output, L, B>::collection_kind()),
1335            },
1336        )
1337    }
1338}
1339
1340impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
1341where
1342    L: Location<'a>,
1343{
1344    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1345    /// which will stream the value computed in _each_ tick as a separate stream element.
1346    ///
1347    /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
1348    /// producing one element in the output for each tick. This is useful for batched computations,
1349    /// where the results from each tick must be combined together.
1350    ///
1351    /// # Example
1352    /// ```rust
1353    /// # #[cfg(feature = "deploy")] {
1354    /// # use hydro_lang::prelude::*;
1355    /// # use futures::StreamExt;
1356    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1357    /// let tick = process.tick();
1358    /// # // ticks are lazy by default, forces the second tick to run
1359    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1360    /// # let batch_first_tick = process
1361    /// #   .source_iter(q!(vec![1]))
1362    /// #   .batch(&tick, nondet!(/** test */));
1363    /// # let batch_second_tick = process
1364    /// #   .source_iter(q!(vec![1, 2, 3]))
1365    /// #   .batch(&tick, nondet!(/** test */))
1366    /// #   .defer_tick(); // appears on the second tick
1367    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1368    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1369    ///     .count()
1370    ///     .all_ticks()
1371    /// # }, |mut stream| async move {
1372    /// // [1, 3]
1373    /// # for w in vec![1, 3] {
1374    /// #     assert_eq!(stream.next().await.unwrap(), w);
1375    /// # }
1376    /// # }));
1377    /// # }
1378    /// ```
1379    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1380        self.into_stream().all_ticks()
1381    }
1382
1383    /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
1384    /// which will stream the value computed in _each_ tick as a separate stream element.
1385    ///
1386    /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
1387    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1388    /// singleton's [`Tick`] context.
1389    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1390        self.into_stream().all_ticks_atomic()
1391    }
1392
1393    /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
1394    /// be asynchronously updated with the latest value of the singleton inside the tick.
1395    ///
1396    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1397    /// tick that tracks the inner value. This is useful for getting the value as of the
1398    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1399    ///
1400    /// # Example
1401    /// ```rust
1402    /// # #[cfg(feature = "deploy")] {
1403    /// # use hydro_lang::prelude::*;
1404    /// # use futures::StreamExt;
1405    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1406    /// let tick = process.tick();
1407    /// # // ticks are lazy by default, forces the second tick to run
1408    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1409    /// # let batch_first_tick = process
1410    /// #   .source_iter(q!(vec![1]))
1411    /// #   .batch(&tick, nondet!(/** test */));
1412    /// # let batch_second_tick = process
1413    /// #   .source_iter(q!(vec![1, 2, 3]))
1414    /// #   .batch(&tick, nondet!(/** test */))
1415    /// #   .defer_tick(); // appears on the second tick
1416    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1417    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1418    ///     .count()
1419    ///     .latest()
1420    /// # .sample_eager(nondet!(/** test */))
1421    /// # }, |mut stream| async move {
1422    /// // asynchronously changes from 1 ~> 3
1423    /// # for w in vec![1, 3] {
1424    /// #     assert_eq!(stream.next().await.unwrap(), w);
1425    /// # }
1426    /// # }));
1427    /// # }
1428    /// ```
1429    pub fn latest(self) -> Singleton<T, L, Unbounded> {
1430        Singleton::new(
1431            self.location.outer().clone(),
1432            HydroNode::YieldConcat {
1433                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1434                metadata: self
1435                    .location
1436                    .outer()
1437                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1438            },
1439        )
1440    }
1441
1442    /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1443    /// be updated with the latest value of the singleton inside the tick.
1444    ///
1445    /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1446    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1447    /// singleton's [`Tick`] context.
1448    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1449        let out_location = Atomic {
1450            tick: self.location.clone(),
1451        };
1452        Singleton::new(
1453            out_location.clone(),
1454            HydroNode::YieldConcat {
1455                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1456                metadata: out_location
1457                    .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1458            },
1459        )
1460    }
1461}
1462
1463#[doc(hidden)]
1464/// Helper trait that determines the output collection type for [`Singleton::zip`].
1465///
1466/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1467/// [`Singleton`].
1468#[sealed::sealed]
1469pub trait ZipResult<'a, Other> {
1470    /// The output collection type.
1471    type Out;
1472    /// The type of the tupled output value.
1473    type ElementType;
1474    /// The type of the other collection's value.
1475    type OtherType;
1476    /// The location where the tupled result will be materialized.
1477    type Location: Location<'a>;
1478
1479    /// The location of the second input to the `zip`.
1480    fn other_location(other: &Other) -> Self::Location;
1481    /// The IR node of the second input to the `zip`.
1482    fn other_ir_node(other: Other) -> HydroNode;
1483
1484    /// Constructs the output live collection given an IR node containing the zip result.
1485    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1486}
1487
1488#[sealed::sealed]
1489impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1490where
1491    L: Location<'a>,
1492{
1493    type Out = Singleton<(T, U), L, B>;
1494    type ElementType = (T, U);
1495    type OtherType = U;
1496    type Location = L;
1497
1498    fn other_location(other: &Singleton<U, L, B>) -> L {
1499        other.location.clone()
1500    }
1501
1502    fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1503        other.ir_node.replace(HydroNode::Placeholder)
1504    }
1505
1506    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1507        Singleton::new(
1508            location.clone(),
1509            HydroNode::Cast {
1510                inner: Box::new(ir_node),
1511                metadata: location.new_node_metadata(Self::Out::collection_kind()),
1512            },
1513        )
1514    }
1515}
1516
1517#[sealed::sealed]
1518impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Optional<U, L, B::UnderlyingBound>>
1519    for Singleton<T, L, B>
1520where
1521    L: Location<'a>,
1522{
1523    type Out = Optional<(T, U), L, B::UnderlyingBound>;
1524    type ElementType = (T, U);
1525    type OtherType = U;
1526    type Location = L;
1527
1528    fn other_location(other: &Optional<U, L, B::UnderlyingBound>) -> L {
1529        other.location.clone()
1530    }
1531
1532    fn other_ir_node(other: Optional<U, L, B::UnderlyingBound>) -> HydroNode {
1533        other.ir_node.replace(HydroNode::Placeholder)
1534    }
1535
1536    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1537        Optional::new(location, ir_node)
1538    }
1539}
1540
1541#[cfg(test)]
1542mod tests {
1543    #[cfg(feature = "deploy")]
1544    use futures::{SinkExt, StreamExt};
1545    #[cfg(feature = "deploy")]
1546    use hydro_deploy::Deployment;
1547    #[cfg(any(feature = "deploy", feature = "sim"))]
1548    use stageleft::q;
1549
1550    #[cfg(any(feature = "deploy", feature = "sim"))]
1551    use crate::compile::builder::FlowBuilder;
1552    #[cfg(feature = "deploy")]
1553    use crate::live_collections::stream::ExactlyOnce;
1554    #[cfg(any(feature = "deploy", feature = "sim"))]
1555    use crate::location::Location;
1556    #[cfg(any(feature = "deploy", feature = "sim"))]
1557    use crate::nondet::nondet;
1558
1559    #[cfg(feature = "deploy")]
1560    #[tokio::test]
1561    async fn tick_cycle_cardinality() {
1562        let mut deployment = Deployment::new();
1563
1564        let mut flow = FlowBuilder::new();
1565        let node = flow.process::<()>();
1566        let external = flow.external::<()>();
1567
1568        let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1569
1570        let node_tick = node.tick();
1571        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1572        let counts = singleton
1573            .clone()
1574            .into_stream()
1575            .count()
1576            .filter_if(
1577                input
1578                    .batch(&node_tick, nondet!(/** testing */))
1579                    .first()
1580                    .is_some(),
1581            )
1582            .all_ticks()
1583            .send_bincode_external(&external);
1584        complete_cycle.complete_next_tick(singleton);
1585
1586        let nodes = flow
1587            .with_process(&node, deployment.Localhost())
1588            .with_external(&external, deployment.Localhost())
1589            .deploy(&mut deployment);
1590
1591        deployment.deploy().await.unwrap();
1592
1593        let mut tick_trigger = nodes.connect(input_send).await;
1594        let mut external_out = nodes.connect(counts).await;
1595
1596        deployment.start().await.unwrap();
1597
1598        tick_trigger.send(()).await.unwrap();
1599
1600        assert_eq!(external_out.next().await.unwrap(), 1);
1601
1602        tick_trigger.send(()).await.unwrap();
1603
1604        assert_eq!(external_out.next().await.unwrap(), 1);
1605    }
1606
1607    #[cfg(feature = "sim")]
1608    #[test]
1609    #[should_panic]
1610    fn sim_fold_intermediate_states() {
1611        let mut flow = FlowBuilder::new();
1612        let node = flow.process::<()>();
1613
1614        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1615        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1616
1617        let tick = node.tick();
1618        let batch = folded.snapshot(&tick, nondet!(/** test */));
1619        let out_recv = batch.all_ticks().sim_output();
1620
1621        flow.sim().exhaustive(async || {
1622            assert_eq!(out_recv.next().await.unwrap(), 10);
1623        });
1624    }
1625
1626    #[cfg(feature = "sim")]
1627    #[test]
1628    fn sim_fold_intermediate_state_count() {
1629        let mut flow = FlowBuilder::new();
1630        let node = flow.process::<()>();
1631
1632        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1633        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1634
1635        let tick = node.tick();
1636        let batch = folded.snapshot(&tick, nondet!(/** test */));
1637        let out_recv = batch.all_ticks().sim_output();
1638
1639        let instance_count = flow.sim().exhaustive(async || {
1640            let out = out_recv.collect::<Vec<_>>().await;
1641            assert_eq!(out.last(), Some(&10));
1642        });
1643
1644        assert_eq!(
1645            instance_count,
1646            16 // 2^4 possible subsets of intermediates (including initial state)
1647        )
1648    }
1649
1650    #[cfg(feature = "sim")]
1651    #[test]
1652    fn sim_fold_no_repeat_initial() {
1653        // check that we don't repeat the initial state of the fold in autonomous decisions
1654
1655        let mut flow = FlowBuilder::new();
1656        let node = flow.process::<()>();
1657
1658        let (in_port, input) = node.sim_input();
1659        let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1660
1661        let tick = node.tick();
1662        let batch = folded.snapshot(&tick, nondet!(/** test */));
1663        let out_recv = batch.all_ticks().sim_output();
1664
1665        flow.sim().exhaustive(async || {
1666            assert_eq!(out_recv.next().await.unwrap(), 0);
1667
1668            in_port.send(123);
1669
1670            assert_eq!(out_recv.next().await.unwrap(), 123);
1671        });
1672    }
1673
1674    #[cfg(feature = "sim")]
1675    #[test]
1676    #[should_panic]
1677    fn sim_fold_repeats_snapshots() {
1678        // when the tick is driven by a snapshot AND something else, the snapshot can
1679        // "stutter" and repeat the same state multiple times
1680
1681        let mut flow = FlowBuilder::new();
1682        let node = flow.process::<()>();
1683
1684        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1685        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1686
1687        let tick = node.tick();
1688        let batch = source
1689            .batch(&tick, nondet!(/** test */))
1690            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1691        let out_recv = batch.all_ticks().sim_output();
1692
1693        flow.sim().exhaustive(async || {
1694            if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1695            {
1696                panic!("repeated snapshot");
1697            }
1698        });
1699    }
1700
1701    #[cfg(feature = "sim")]
1702    #[test]
1703    fn sim_fold_repeats_snapshots_count() {
1704        // check the number of instances
1705        let mut flow = FlowBuilder::new();
1706        let node = flow.process::<()>();
1707
1708        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1709        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1710
1711        let tick = node.tick();
1712        let batch = source
1713            .batch(&tick, nondet!(/** test */))
1714            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1715        let out_recv = batch.all_ticks().sim_output();
1716
1717        let count = flow.sim().exhaustive(async || {
1718            let _ = out_recv.collect::<Vec<_>>().await;
1719        });
1720
1721        assert_eq!(count, 52);
1722        // don't have a combinatorial explanation for this number yet, but checked via logs
1723    }
1724
1725    #[cfg(feature = "sim")]
1726    #[test]
1727    fn sim_top_level_singleton_exhaustive() {
1728        // ensures that top-level singletons have only one snapshot
1729        let mut flow = FlowBuilder::new();
1730        let node = flow.process::<()>();
1731
1732        let singleton = node.singleton(q!(1));
1733        let tick = node.tick();
1734        let batch = singleton.snapshot(&tick, nondet!(/** test */));
1735        let out_recv = batch.all_ticks().sim_output();
1736
1737        let count = flow.sim().exhaustive(async || {
1738            let _ = out_recv.collect::<Vec<_>>().await;
1739        });
1740
1741        assert_eq!(count, 1);
1742    }
1743
1744    #[cfg(feature = "sim")]
1745    #[test]
1746    fn sim_top_level_singleton_join_count() {
1747        // if a tick consumes a static snapshot and a stream batch, only the batch require space
1748        // exploration
1749
1750        let mut flow = FlowBuilder::new();
1751        let node = flow.process::<()>();
1752
1753        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1754        let tick = node.tick();
1755        let batch = source_iter
1756            .batch(&tick, nondet!(/** test */))
1757            .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1758        let out_recv = batch.all_ticks().sim_output();
1759
1760        let instance_count = flow.sim().exhaustive(async || {
1761            let _ = out_recv.collect::<Vec<_>>().await;
1762        });
1763
1764        assert_eq!(
1765            instance_count,
1766            16 // 2^4 ways to split up (including a possibly empty first batch)
1767        )
1768    }
1769
1770    #[cfg(feature = "sim")]
1771    #[test]
1772    fn top_level_singleton_into_stream_no_replay() {
1773        let mut flow = FlowBuilder::new();
1774        let node = flow.process::<()>();
1775
1776        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1777        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1778
1779        let out_recv = folded.into_stream().sim_output();
1780
1781        flow.sim().exhaustive(async || {
1782            out_recv.assert_yields_only([10]).await;
1783        });
1784    }
1785
1786    #[cfg(feature = "sim")]
1787    #[test]
1788    fn inside_tick_singleton_zip() {
1789        use crate::live_collections::Stream;
1790        use crate::live_collections::sliced::sliced;
1791
1792        let mut flow = FlowBuilder::new();
1793        let node = flow.process::<()>();
1794
1795        let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1796        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1797
1798        let out_recv = sliced! {
1799            let v = use(folded, nondet!(/** test */));
1800            v.clone().zip(v).into_stream()
1801        }
1802        .sim_output();
1803
1804        let count = flow.sim().exhaustive(async || {
1805            let out = out_recv.collect::<Vec<_>>().await;
1806            assert_eq!(out.last(), Some(&(3, 3)));
1807        });
1808
1809        assert_eq!(count, 4);
1810    }
1811}