Skip to main content

hydro_lang/
forward_handle.rs

1//! Mechanisms for introducing forward references and cycles in Hydro.
2
3use sealed::sealed;
4
5use crate::compile::builder::CycleId;
6use crate::location::Location;
7use crate::location::dynamic::LocationId;
8use crate::staging_util::Invariant;
9
10#[sealed]
11pub(crate) trait ReceiverKind {}
12
13/// Marks that the [`ForwardHandle`] is for a "forward reference" to a later-defined collection.
14///
15/// When the handle is completed, the provided collection must not depend _synchronously_
16/// (in the same tick) on the forward reference that was created earlier.
17pub enum ForwardRef {}
18
19#[sealed]
20impl ReceiverKind for ForwardRef {}
21
22/// Marks that the [`ForwardHandle`] will send a live collection to the next tick.
23///
24/// Dependency cycles are permitted for this handle type, because the collection used
25/// to complete this handle will appear on the source-side on the _next_ tick.
26pub enum TickCycle {}
27
28#[sealed]
29impl ReceiverKind for TickCycle {}
30
31pub(crate) trait ReceiverComplete<'a, Marker>
32where
33    Marker: ReceiverKind,
34{
35    fn complete(self, cycle_id: CycleId, expected_location: LocationId);
36}
37
38pub(crate) trait CycleCollection<'a, Kind>: ReceiverComplete<'a, Kind>
39where
40    Kind: ReceiverKind,
41{
42    type Location: Location<'a>;
43
44    fn create_source(id: CycleId, location: Self::Location) -> Self;
45}
46
47pub(crate) trait CycleCollectionWithInitial<'a, Kind>: ReceiverComplete<'a, Kind>
48where
49    Kind: ReceiverKind,
50{
51    type Location: Location<'a>;
52
53    fn location(&self) -> &Self::Location;
54
55    fn create_source_with_initial(
56        cycle_id: CycleId,
57        initial: Self,
58        location: Self::Location,
59    ) -> Self;
60}
61
62/// A handle that can be used to fulfill a forward reference.
63///
64/// The `C` type parameter specifies the collection type that can be used to complete the handle.
65#[expect(
66    private_bounds,
67    reason = "only Hydro collections can implement ReceiverComplete"
68)]
69pub struct ForwardHandle<'a, C: ReceiverComplete<'a, ForwardRef>> {
70    completed: bool,
71    cycle_id: CycleId,
72    expected_location: LocationId,
73    _phantom: Invariant<'a, C>,
74}
75
76#[expect(
77    private_bounds,
78    reason = "only Hydro collections can implement ReceiverComplete"
79)]
80impl<'a, C: ReceiverComplete<'a, ForwardRef>> ForwardHandle<'a, C> {
81    pub(crate) fn new(cycle_id: CycleId, expected_location: LocationId) -> Self {
82        Self {
83            completed: false,
84            cycle_id,
85            expected_location,
86            _phantom: std::marker::PhantomData,
87        }
88    }
89}
90
91impl<'a, C: ReceiverComplete<'a, ForwardRef>> Drop for ForwardHandle<'a, C> {
92    fn drop(&mut self) {
93        if !self.completed && !std::thread::panicking() {
94            panic!("ForwardHandle dropped without being completed");
95        }
96    }
97}
98
99#[expect(
100    private_bounds,
101    reason = "only Hydro collections can implement ReceiverComplete"
102)]
103impl<'a, C: ReceiverComplete<'a, ForwardRef>> ForwardHandle<'a, C> {
104    /// Completes the forward reference with the given live collection. The initial forward reference
105    /// collection created in [`Location::forward_ref`] will resolve to this value.
106    ///
107    /// The provided value **must not** depend _synchronously_ (in the same tick) on the forward reference
108    /// collection, as doing so would create a dependency cycle. Asynchronous cycles (outside a tick) are
109    /// allowed, since the program can continue running while the cycle is processed.
110    pub fn complete(mut self, stream: impl Into<C>) {
111        self.completed = true;
112        C::complete(stream.into(), self.cycle_id, self.expected_location.clone())
113    }
114}
115
116/// A handle that can be used to complete a tick cycle by sending a collection to the next tick.
117///
118/// The `C` type parameter specifies the collection type that can be used to complete the handle.
119#[expect(
120    private_bounds,
121    reason = "only Hydro collections can implement ReceiverComplete"
122)]
123pub struct TickCycleHandle<'a, C: ReceiverComplete<'a, TickCycle>> {
124    completed: bool,
125    cycle_id: CycleId,
126    expected_location: LocationId,
127    _phantom: Invariant<'a, C>,
128}
129
130#[expect(
131    private_bounds,
132    reason = "only Hydro collections can implement ReceiverComplete"
133)]
134impl<'a, C: ReceiverComplete<'a, TickCycle>> TickCycleHandle<'a, C> {
135    pub(crate) fn new(cycle_id: CycleId, expected_location: LocationId) -> Self {
136        Self {
137            completed: false,
138            cycle_id,
139            expected_location,
140            _phantom: std::marker::PhantomData,
141        }
142    }
143}
144
145impl<'a, C: ReceiverComplete<'a, TickCycle>> Drop for TickCycleHandle<'a, C> {
146    fn drop(&mut self) {
147        if !self.completed && !std::thread::panicking() {
148            panic!("TickCycleHandle dropped without being completed");
149        }
150    }
151}
152
153#[expect(
154    private_bounds,
155    reason = "only Hydro collections can implement ReceiverComplete"
156)]
157impl<'a, C: ReceiverComplete<'a, TickCycle>> TickCycleHandle<'a, C> {
158    /// Sends the provided collection to the next tick, where it will be materialized
159    /// in the collection returned by [`crate::location::Tick::cycle`] or
160    /// [`crate::location::Tick::cycle_with_initial`].
161    pub fn complete_next_tick(mut self, stream: impl Into<C>) {
162        self.completed = true;
163        C::complete(stream.into(), self.cycle_id, self.expected_location.clone())
164    }
165}
166
167/// A trait for completing a cycle handle with a state value.
168/// Used internally by the `sliced!` macro for state management.
169#[doc(hidden)]
170pub trait CompleteCycle<S> {
171    /// Completes the cycle with the given state value.
172    fn complete_next_tick(self, state: S);
173}
174
175impl<'a, C: ReceiverComplete<'a, TickCycle>> CompleteCycle<C> for TickCycleHandle<'a, C> {
176    fn complete_next_tick(self, state: C) {
177        TickCycleHandle::complete_next_tick(self, state)
178    }
179}