Skip to main content

hydro_lang/location/
tick.rs

1//! Clock domains for batching streaming data into discrete time steps.
2//!
3//! In Hydro, a [`Tick`] represents a logical clock that can be used to batch
4//! unbounded streaming data into discrete, bounded time steps. This is essential
5//! for implementing iterative algorithms, synchronizing data across multiple
6//! streams, and performing aggregations over windows of data.
7//!
8//! A tick is created from a top-level location (such as [`Process`] or [`Cluster`])
9//! using [`Location::tick`]. Once inside a tick, bounded live collections can be
10//! manipulated with operations like fold, reduce, and cross-product, and the
11//! results can be emitted back to the unbounded stream using methods like
12//! `all_ticks()`.
13//!
14//! The [`Atomic`] wrapper provides atomicity guarantees within a tick, ensuring
15//! that reads and writes within a tick are serialized.
16//!
17//! The  [`NoAtomic`] marker trait constrains APIs that cannot be called inside an atomic context.
18
19use sealed::sealed;
20use stageleft::{QuotedWithContext, q};
21
22#[cfg(stageleft_runtime)]
23use super::dynamic::DynLocation;
24use super::{Cluster, Location, LocationId, Process};
25use crate::compile::builder::{ClockId, FlowState};
26use crate::compile::ir::{HydroNode, HydroSource};
27#[cfg(stageleft_runtime)]
28use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial};
29use crate::forward_handle::{TickCycle, TickCycleHandle};
30use crate::live_collections::boundedness::Bounded;
31use crate::live_collections::optional::Optional;
32use crate::live_collections::stream::{ExactlyOnce, Stream, TotalOrder};
33use crate::location::cluster::Consistency;
34use crate::nondet::nondet;
35
36/// Marker trait for locations that are **not** inside an [`Atomic`] context.
37///
38/// This trait is implemented by top-level locations ([`Process`], [`Cluster`]) and
39/// by [`Tick`]. It is used to constrain APIs that should not be called from within
40/// an atomic block.
41#[sealed]
42pub trait NoAtomic {}
43#[sealed]
44impl<T> NoAtomic for Process<'_, T> {}
45#[sealed]
46impl<T, Con: Consistency> NoAtomic for Cluster<'_, T, Con> {}
47#[sealed]
48impl<'a, L> NoAtomic for Tick<L> where L: Location<'a> {}
49
50/// A location wrapper that provides atomicity guarantees within a [`Tick`].
51///
52/// An `Atomic` context establishes a happens-before relationship between operations:
53/// - Downstream computations from `atomic()` are associated with an internal tick
54/// - Outputs from `end_atomic()` are held until all computations in the tick complete
55/// - Snapshots via `use::atomic` are guaranteed to reflect all updates from associated `end_atomic()`
56///
57/// This ensures read-after-write consistency: if a client receives an acknowledgement
58/// from `end_atomic()`, any subsequent `use::atomic` snapshot will include the effects
59/// of that acknowledged operation.
60#[derive(Clone)]
61pub struct Atomic<Loc> {
62    pub(crate) tick: Tick<Loc>,
63}
64
65impl<L: DynLocation> DynLocation for Atomic<L> {
66    fn dyn_id(&self) -> LocationId {
67        LocationId::Atomic(Box::new(self.tick.dyn_id()))
68    }
69
70    fn flow_state(&self) -> &FlowState {
71        self.tick.flow_state()
72    }
73
74    fn is_top_level() -> bool {
75        L::is_top_level()
76    }
77
78    fn multiversioned(&self) -> bool {
79        self.tick.multiversioned()
80    }
81}
82
83impl<'a, L> Location<'a> for Atomic<L>
84where
85    L: Location<'a>,
86{
87    type Root = L::Root;
88
89    type NoConsistency = Atomic<L::NoConsistency>;
90
91    fn consistency() -> Option<super::dynamic::ClusterConsistency> {
92        L::consistency()
93    }
94
95    fn root(&self) -> Self::Root {
96        self.tick.root()
97    }
98
99    fn drop_consistency(&self) -> Self::NoConsistency {
100        Atomic {
101            tick: self.tick.drop_consistency(),
102        }
103    }
104
105    fn make_from_nondet(l2: Self::NoConsistency) -> Self {
106        Atomic {
107            tick: Tick::make_from_nondet(l2.tick),
108        }
109    }
110}
111
112/// Trait for live collections that can be deferred by one tick.
113///
114/// When a collection implements `DeferTick`, calling `defer_tick` delays its
115/// values by one clock cycle. This is primarily used internally to implement
116/// tick-based cycles ([`Tick::cycle`]), ensuring that feedback loops advance
117/// by one tick to avoid infinite recursion within a single tick.
118pub trait DeferTick {
119    /// Returns a new collection whose values are delayed by one tick.
120    fn defer_tick(self) -> Self;
121}
122
123/// Marks the stream as being inside the single global clock domain.
124#[derive(Clone)]
125pub struct Tick<L> {
126    pub(crate) id: ClockId,
127    /// Location.
128    pub(crate) l: L,
129}
130
131impl<L: DynLocation> DynLocation for Tick<L> {
132    fn dyn_id(&self) -> LocationId {
133        LocationId::Tick(self.id, Box::new(self.l.dyn_id()))
134    }
135
136    fn flow_state(&self) -> &FlowState {
137        self.l.flow_state()
138    }
139
140    fn is_top_level() -> bool {
141        false
142    }
143
144    fn multiversioned(&self) -> bool {
145        self.l.multiversioned()
146    }
147}
148
149impl<'a, L> Location<'a> for Tick<L>
150where
151    L: Location<'a>,
152{
153    type Root = L::Root;
154
155    type NoConsistency = Tick<L::NoConsistency>;
156
157    fn consistency() -> Option<super::dynamic::ClusterConsistency> {
158        L::consistency()
159    }
160
161    fn root(&self) -> Self::Root {
162        self.l.root()
163    }
164
165    fn drop_consistency(&self) -> Self::NoConsistency {
166        Tick {
167            id: self.id,
168            l: self.l.drop_consistency(),
169        }
170    }
171
172    fn make_from_nondet(l2: Self::NoConsistency) -> Self {
173        Tick {
174            id: l2.id,
175            l: L::make_from_nondet(l2.l),
176        }
177    }
178}
179
180impl<'a, L> Tick<L>
181where
182    L: Location<'a>,
183{
184    /// Returns a reference to the outer (parent) location that this tick is nested within.
185    ///
186    /// For example, if a `Tick` was created from a `Process`, this returns a reference
187    /// to that `Process`.
188    pub fn outer(&self) -> &L {
189        &self.l
190    }
191
192    /// Creates a bounded stream of `()` values inside this tick, with a fixed batch size.
193    ///
194    /// This is useful for driving computations inside a tick that need to process
195    /// a specific number of elements per tick. Each tick will produce exactly
196    /// `batch_size` unit values.
197    pub fn spin_batch(
198        &self,
199        batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
200    ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce> {
201        let out = self
202            .l
203            .spin()
204            .flat_map_ordered(q!(move |_| 0..batch_size))
205            .map(q!(|_| ()));
206
207        let inner = out.batch(self, nondet!(/** at runtime, `spin` produces a single value per tick, so each batch is guaranteed to be the same size. */));
208        Stream::new(self.clone(), inner.ir_node.replace(HydroNode::Placeholder))
209    }
210
211    /// Creates an [`Optional`] which has a null value on every tick.
212    ///
213    /// # Example
214    /// ```rust
215    /// # #[cfg(feature = "deploy")] {
216    /// # use hydro_lang::prelude::*;
217    /// # use futures::StreamExt;
218    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
219    /// let tick = process.tick();
220    /// let optional = tick.none::<i32>();
221    /// optional.unwrap_or(tick.singleton(q!(123)))
222    /// # .all_ticks()
223    /// # }, |mut stream| async move {
224    /// // 123
225    /// # assert_eq!(stream.next().await.unwrap(), 123);
226    /// # }));
227    /// # }
228    /// ```
229    pub fn none<T>(&self) -> Optional<T, Self, Bounded> {
230        let e = q!([]);
231        let e = QuotedWithContext::<'a, [(); 0], Self>::splice_typed_ctx(e, self);
232
233        let unit_optional: Optional<(), Self, Bounded> = Optional::new(
234            self.clone(),
235            HydroNode::Source {
236                source: HydroSource::Iter(e.into()),
237                metadata: self.new_node_metadata(Optional::<(), Self, Bounded>::collection_kind()),
238            },
239        );
240
241        unit_optional.map(q!(|_| unreachable!())) // always empty
242    }
243
244    /// Creates an [`Optional`] which will have the provided static value on the first tick, and be
245    /// null on all subsequent ticks.
246    ///
247    /// This is useful for bootstrapping stateful computations which need an initial value.
248    ///
249    /// # Example
250    /// ```rust
251    /// # #[cfg(feature = "deploy")] {
252    /// # use hydro_lang::prelude::*;
253    /// # use futures::StreamExt;
254    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
255    /// let tick = process.tick();
256    /// // ticks are lazy by default, forces the second tick to run
257    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
258    /// let optional = tick.optional_first_tick(q!(5));
259    /// optional.unwrap_or(tick.singleton(q!(123))).all_ticks()
260    /// # }, |mut stream| async move {
261    /// // 5, 123, 123, 123, ...
262    /// # assert_eq!(stream.next().await.unwrap(), 5);
263    /// # assert_eq!(stream.next().await.unwrap(), 123);
264    /// # assert_eq!(stream.next().await.unwrap(), 123);
265    /// # assert_eq!(stream.next().await.unwrap(), 123);
266    /// # }));
267    /// # }
268    /// ```
269    pub fn optional_first_tick<T: Clone>(
270        &self,
271        e: impl QuotedWithContext<'a, T, Tick<L>>,
272    ) -> Optional<T, Self, Bounded> {
273        let e = e.splice_untyped_ctx(self);
274
275        Optional::new(
276            self.clone(),
277            HydroNode::SingletonSource {
278                value: e.into(),
279                first_tick_only: true,
280                metadata: self.new_node_metadata(Optional::<T, Self, Bounded>::collection_kind()),
281            },
282        )
283    }
284
285    /// Creates a feedback cycle within this tick for implementing iterative computations.
286    ///
287    /// Returns a handle that must be completed with the actual collection, and a placeholder
288    /// collection that represents the output of the previous tick (deferred by one tick).
289    /// This is useful for implementing fixed-point computations where the output of one
290    /// tick feeds into the input of the next.
291    ///
292    /// The cycle automatically defers values by one tick to prevent infinite recursion.
293    #[expect(
294        private_bounds,
295        reason = "only Hydro collections can implement ReceiverComplete"
296    )]
297    pub fn cycle<S>(&self) -> (TickCycleHandle<'a, S>, S)
298    where
299        S: CycleCollection<'a, TickCycle, Location = Self> + DeferTick,
300    {
301        let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
302        (
303            TickCycleHandle::new(cycle_id, Location::id(self)),
304            S::create_source(cycle_id, self.clone()).defer_tick(),
305        )
306    }
307
308    /// Creates a feedback cycle with an initial value for the first tick.
309    ///
310    /// Similar to [`Tick::cycle`], but allows providing an initial collection
311    /// that will be used as the value on the first tick before any feedback
312    /// is available. This is useful for bootstrapping iterative computations
313    /// that need a starting state.
314    #[expect(
315        private_bounds,
316        reason = "only Hydro collections can implement ReceiverComplete"
317    )]
318    pub fn cycle_with_initial<S>(&self, initial: S) -> (TickCycleHandle<'a, S>, S)
319    where
320        S: CycleCollectionWithInitial<'a, TickCycle, Location = Self>,
321    {
322        let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
323        (
324            TickCycleHandle::new(cycle_id, Location::id(self)),
325            // no need to defer_tick, create_source_with_initial does it for us
326            S::create_source_with_initial(cycle_id, initial, self.clone()),
327        )
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    #[cfg(feature = "sim")]
334    use stageleft::q;
335
336    #[cfg(feature = "sim")]
337    use crate::live_collections::sliced::sliced;
338    #[cfg(feature = "sim")]
339    use crate::location::Location;
340    #[cfg(feature = "sim")]
341    use crate::nondet::nondet;
342    #[cfg(feature = "sim")]
343    use crate::prelude::FlowBuilder;
344
345    #[cfg(feature = "sim")]
346    #[test]
347    fn sim_atomic_stream() {
348        let mut flow = FlowBuilder::new();
349        let node = flow.process::<()>();
350
351        let (write_send, write_req) = node.sim_input();
352        let (read_send, read_req) = node.sim_input::<(), _, _>();
353
354        let atomic_write = write_req.atomic();
355        let current_state = atomic_write.clone().fold(
356            q!(|| 0),
357            q!(|state: &mut i32, v: i32| {
358                *state += v;
359            }),
360        );
361
362        let write_ack_recv = atomic_write.end_atomic().sim_output();
363        let read_response_recv = sliced! {
364            let batch_of_req = use(read_req, nondet!(/** test */));
365            let latest_singleton = use::atomic(current_state, nondet!(/** test */));
366            batch_of_req.cross_singleton(latest_singleton)
367        }
368        .sim_output();
369
370        let sim_compiled = flow.sim().compiled();
371        let instances = sim_compiled.exhaustive(async || {
372            write_send.send(1);
373            write_ack_recv.assert_yields([1]).await;
374            read_send.send(());
375            assert!(read_response_recv.next().await.is_some_and(|(_, v)| v >= 1));
376        });
377
378        assert_eq!(instances, 1);
379
380        let instances_read_before_write = sim_compiled.exhaustive(async || {
381            write_send.send(1);
382            read_send.send(());
383            write_ack_recv.assert_yields([1]).await;
384            let _ = read_response_recv.next().await;
385        });
386
387        assert_eq!(instances_read_before_write, 3); // read before write, write before read, both in same tick
388    }
389
390    #[cfg(feature = "sim")]
391    #[test]
392    #[should_panic]
393    fn sim_non_atomic_stream() {
394        // shows that atomic is necessary
395        let mut flow = FlowBuilder::new();
396        let node = flow.process::<()>();
397
398        let (write_send, write_req) = node.sim_input();
399        let (read_send, read_req) = node.sim_input::<(), _, _>();
400
401        let current_state = write_req.clone().fold(
402            q!(|| 0),
403            q!(|state: &mut i32, v: i32| {
404                *state += v;
405            }),
406        );
407
408        let write_ack_recv = write_req.sim_output();
409
410        let read_response_recv = sliced! {
411            let batch_of_req = use(read_req, nondet!(/** test */));
412            let latest_singleton = use(current_state, nondet!(/** test */));
413            batch_of_req.cross_singleton(latest_singleton)
414        }
415        .sim_output();
416
417        flow.sim().exhaustive(async || {
418            write_send.send(1);
419            write_ack_recv.assert_yields([1]).await;
420            read_send.send(());
421
422            if let Some((_, v)) = read_response_recv.next().await {
423                assert_eq!(v, 1);
424            }
425        });
426    }
427}