1use sealed::sealed;
20use stageleft::{QuotedWithContext, q};
21
22#[cfg(stageleft_runtime)]
23use super::dynamic::DynLocation;
24use super::{Cluster, Location, LocationId, Process};
25use crate::compile::builder::{ClockId, FlowState};
26use crate::compile::ir::{HydroNode, HydroSource};
27#[cfg(stageleft_runtime)]
28use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial};
29use crate::forward_handle::{TickCycle, TickCycleHandle};
30use crate::live_collections::boundedness::Bounded;
31use crate::live_collections::optional::Optional;
32use crate::live_collections::stream::{ExactlyOnce, Stream, TotalOrder};
33use crate::location::cluster::Consistency;
34use crate::nondet::nondet;
35
36#[sealed]
42pub trait NoAtomic {}
43#[sealed]
44impl<T> NoAtomic for Process<'_, T> {}
45#[sealed]
46impl<T, Con: Consistency> NoAtomic for Cluster<'_, T, Con> {}
47#[sealed]
48impl<'a, L> NoAtomic for Tick<L> where L: Location<'a> {}
49
50#[derive(Clone)]
61pub struct Atomic<Loc> {
62 pub(crate) tick: Tick<Loc>,
63}
64
65impl<L: DynLocation> DynLocation for Atomic<L> {
66 fn dyn_id(&self) -> LocationId {
67 LocationId::Atomic(Box::new(self.tick.dyn_id()))
68 }
69
70 fn flow_state(&self) -> &FlowState {
71 self.tick.flow_state()
72 }
73
74 fn is_top_level() -> bool {
75 L::is_top_level()
76 }
77
78 fn multiversioned(&self) -> bool {
79 self.tick.multiversioned()
80 }
81}
82
83impl<'a, L> Location<'a> for Atomic<L>
84where
85 L: Location<'a>,
86{
87 type Root = L::Root;
88
89 type NoConsistency = Atomic<L::NoConsistency>;
90
91 fn consistency() -> Option<super::dynamic::ClusterConsistency> {
92 L::consistency()
93 }
94
95 fn root(&self) -> Self::Root {
96 self.tick.root()
97 }
98
99 fn drop_consistency(&self) -> Self::NoConsistency {
100 Atomic {
101 tick: self.tick.drop_consistency(),
102 }
103 }
104
105 fn make_from_nondet(l2: Self::NoConsistency) -> Self {
106 Atomic {
107 tick: Tick::make_from_nondet(l2.tick),
108 }
109 }
110}
111
112pub trait DeferTick {
119 fn defer_tick(self) -> Self;
121}
122
123#[derive(Clone)]
125pub struct Tick<L> {
126 pub(crate) id: ClockId,
127 pub(crate) l: L,
129}
130
131impl<L: DynLocation> DynLocation for Tick<L> {
132 fn dyn_id(&self) -> LocationId {
133 LocationId::Tick(self.id, Box::new(self.l.dyn_id()))
134 }
135
136 fn flow_state(&self) -> &FlowState {
137 self.l.flow_state()
138 }
139
140 fn is_top_level() -> bool {
141 false
142 }
143
144 fn multiversioned(&self) -> bool {
145 self.l.multiversioned()
146 }
147}
148
149impl<'a, L> Location<'a> for Tick<L>
150where
151 L: Location<'a>,
152{
153 type Root = L::Root;
154
155 type NoConsistency = Tick<L::NoConsistency>;
156
157 fn consistency() -> Option<super::dynamic::ClusterConsistency> {
158 L::consistency()
159 }
160
161 fn root(&self) -> Self::Root {
162 self.l.root()
163 }
164
165 fn drop_consistency(&self) -> Self::NoConsistency {
166 Tick {
167 id: self.id,
168 l: self.l.drop_consistency(),
169 }
170 }
171
172 fn make_from_nondet(l2: Self::NoConsistency) -> Self {
173 Tick {
174 id: l2.id,
175 l: L::make_from_nondet(l2.l),
176 }
177 }
178}
179
180impl<'a, L> Tick<L>
181where
182 L: Location<'a>,
183{
184 pub fn outer(&self) -> &L {
189 &self.l
190 }
191
192 pub fn spin_batch(
198 &self,
199 batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
200 ) -> Stream<(), Self, Bounded, TotalOrder, ExactlyOnce> {
201 let out = self
202 .l
203 .spin()
204 .flat_map_ordered(q!(move |_| 0..batch_size))
205 .map(q!(|_| ()));
206
207 let inner = out.batch(self, nondet!());
208 Stream::new(self.clone(), inner.ir_node.replace(HydroNode::Placeholder))
209 }
210
211 pub fn none<T>(&self) -> Optional<T, Self, Bounded> {
230 let e = q!([]);
231 let e = QuotedWithContext::<'a, [(); 0], Self>::splice_typed_ctx(e, self);
232
233 let unit_optional: Optional<(), Self, Bounded> = Optional::new(
234 self.clone(),
235 HydroNode::Source {
236 source: HydroSource::Iter(e.into()),
237 metadata: self.new_node_metadata(Optional::<(), Self, Bounded>::collection_kind()),
238 },
239 );
240
241 unit_optional.map(q!(|_| unreachable!())) }
243
244 pub fn optional_first_tick<T: Clone>(
270 &self,
271 e: impl QuotedWithContext<'a, T, Tick<L>>,
272 ) -> Optional<T, Self, Bounded> {
273 let e = e.splice_untyped_ctx(self);
274
275 Optional::new(
276 self.clone(),
277 HydroNode::SingletonSource {
278 value: e.into(),
279 first_tick_only: true,
280 metadata: self.new_node_metadata(Optional::<T, Self, Bounded>::collection_kind()),
281 },
282 )
283 }
284
285 #[expect(
294 private_bounds,
295 reason = "only Hydro collections can implement ReceiverComplete"
296 )]
297 pub fn cycle<S>(&self) -> (TickCycleHandle<'a, S>, S)
298 where
299 S: CycleCollection<'a, TickCycle, Location = Self> + DeferTick,
300 {
301 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
302 (
303 TickCycleHandle::new(cycle_id, Location::id(self)),
304 S::create_source(cycle_id, self.clone()).defer_tick(),
305 )
306 }
307
308 #[expect(
315 private_bounds,
316 reason = "only Hydro collections can implement ReceiverComplete"
317 )]
318 pub fn cycle_with_initial<S>(&self, initial: S) -> (TickCycleHandle<'a, S>, S)
319 where
320 S: CycleCollectionWithInitial<'a, TickCycle, Location = Self>,
321 {
322 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
323 (
324 TickCycleHandle::new(cycle_id, Location::id(self)),
325 S::create_source_with_initial(cycle_id, initial, self.clone()),
327 )
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 #[cfg(feature = "sim")]
334 use stageleft::q;
335
336 #[cfg(feature = "sim")]
337 use crate::live_collections::sliced::sliced;
338 #[cfg(feature = "sim")]
339 use crate::location::Location;
340 #[cfg(feature = "sim")]
341 use crate::nondet::nondet;
342 #[cfg(feature = "sim")]
343 use crate::prelude::FlowBuilder;
344
345 #[cfg(feature = "sim")]
346 #[test]
347 fn sim_atomic_stream() {
348 let mut flow = FlowBuilder::new();
349 let node = flow.process::<()>();
350
351 let (write_send, write_req) = node.sim_input();
352 let (read_send, read_req) = node.sim_input::<(), _, _>();
353
354 let atomic_write = write_req.atomic();
355 let current_state = atomic_write.clone().fold(
356 q!(|| 0),
357 q!(|state: &mut i32, v: i32| {
358 *state += v;
359 }),
360 );
361
362 let write_ack_recv = atomic_write.end_atomic().sim_output();
363 let read_response_recv = sliced! {
364 let batch_of_req = use(read_req, nondet!());
365 let latest_singleton = use::atomic(current_state, nondet!());
366 batch_of_req.cross_singleton(latest_singleton)
367 }
368 .sim_output();
369
370 let sim_compiled = flow.sim().compiled();
371 let instances = sim_compiled.exhaustive(async || {
372 write_send.send(1);
373 write_ack_recv.assert_yields([1]).await;
374 read_send.send(());
375 assert!(read_response_recv.next().await.is_some_and(|(_, v)| v >= 1));
376 });
377
378 assert_eq!(instances, 1);
379
380 let instances_read_before_write = sim_compiled.exhaustive(async || {
381 write_send.send(1);
382 read_send.send(());
383 write_ack_recv.assert_yields([1]).await;
384 let _ = read_response_recv.next().await;
385 });
386
387 assert_eq!(instances_read_before_write, 3); }
389
390 #[cfg(feature = "sim")]
391 #[test]
392 #[should_panic]
393 fn sim_non_atomic_stream() {
394 let mut flow = FlowBuilder::new();
396 let node = flow.process::<()>();
397
398 let (write_send, write_req) = node.sim_input();
399 let (read_send, read_req) = node.sim_input::<(), _, _>();
400
401 let current_state = write_req.clone().fold(
402 q!(|| 0),
403 q!(|state: &mut i32, v: i32| {
404 *state += v;
405 }),
406 );
407
408 let write_ack_recv = write_req.sim_output();
409
410 let read_response_recv = sliced! {
411 let batch_of_req = use(read_req, nondet!());
412 let latest_singleton = use(current_state, nondet!());
413 batch_of_req.cross_singleton(latest_singleton)
414 }
415 .sim_output();
416
417 flow.sim().exhaustive(async || {
418 write_send.send(1);
419 write_ack_recv.assert_yields([1]).await;
420 read_send.send(());
421
422 if let Some((_, v)) = read_response_recv.next().await {
423 assert_eq!(v, 1);
424 }
425 });
426 }
427}