1use std::fmt::{Debug, Formatter};
13use std::marker::PhantomData;
14
15use proc_macro2::Span;
16use quote::quote;
17use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
18use stageleft::{QuotedWithContextWithProps, quote_type};
19
20use super::dynamic::LocationId;
21use super::{Location, MemberId};
22use crate::compile::builder::FlowState;
23use crate::location::LocationKey;
24use crate::location::dynamic::ClusterConsistency;
25use crate::location::member_id::TaglessMemberId;
26use crate::staging_util::{Invariant, get_this_crate};
27
28pub trait Consistency {
31 fn consistency() -> ClusterConsistency;
33}
34
35pub enum NoConsistency {}
38impl Consistency for NoConsistency {
39 fn consistency() -> ClusterConsistency {
40 ClusterConsistency::NoConsistency
41 }
42}
43
44pub enum EventualConsistency {}
47impl Consistency for EventualConsistency {
48 fn consistency() -> ClusterConsistency {
49 ClusterConsistency::EventuallyConsistent
50 }
51}
52
53pub struct Cluster<'a, ClusterTag, Con: Consistency = NoConsistency> {
63 pub(crate) key: LocationKey,
64 pub(crate) flow_state: FlowState,
65 pub(crate) _phantom: Invariant<'a, (ClusterTag, Con)>,
66}
67
68impl<C, Con: Consistency> Debug for Cluster<'_, C, Con> {
69 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70 write!(f, "Cluster({})", self.key)
71 }
72}
73
74impl<C, Con: Consistency> Eq for Cluster<'_, C, Con> {}
75impl<C, Con: Consistency> PartialEq for Cluster<'_, C, Con> {
76 fn eq(&self, other: &Self) -> bool {
77 self.key == other.key && FlowState::ptr_eq(&self.flow_state, &other.flow_state)
78 }
79}
80
81impl<C, Con: Consistency> Clone for Cluster<'_, C, Con> {
82 fn clone(&self) -> Self {
83 Cluster {
84 key: self.key,
85 flow_state: self.flow_state.clone(),
86 _phantom: PhantomData,
87 }
88 }
89}
90
91impl<'a, C, Con: Consistency> super::dynamic::DynLocation for Cluster<'a, C, Con> {
92 fn dyn_id(&self) -> LocationId {
93 LocationId::Cluster(self.key, Con::consistency())
94 }
95
96 fn flow_state(&self) -> &FlowState {
97 &self.flow_state
98 }
99
100 fn is_top_level() -> bool {
101 true
102 }
103
104 fn multiversioned(&self) -> bool {
105 false }
107}
108
109impl<'a, C, Con: Consistency> Location<'a> for Cluster<'a, C, Con> {
110 type Root = Cluster<'a, C, Con>;
111
112 type NoConsistency = Cluster<'a, C, NoConsistency>;
113
114 fn consistency() -> Option<ClusterConsistency> {
115 Some(Con::consistency())
116 }
117
118 fn root(&self) -> Self::Root {
119 self.clone()
120 }
121
122 fn drop_consistency(&self) -> Self::NoConsistency {
123 Cluster {
124 key: self.key,
125 flow_state: self.flow_state.clone(),
126 _phantom: PhantomData,
127 }
128 }
129
130 fn make_from_nondet(l2: Self::NoConsistency) -> Self {
131 Cluster {
132 key: l2.key,
133 flow_state: l2.flow_state,
134 _phantom: PhantomData,
135 }
136 }
137}
138
139#[cfg(feature = "sim")]
140impl<'a, C> Cluster<'a, C> {
141 #[expect(clippy::type_complexity, reason = "stream markers")]
146 pub fn sim_input<T>(
147 &self,
148 ) -> (
149 crate::sim::SimClusterSender<
150 T,
151 crate::live_collections::stream::TotalOrder,
152 crate::live_collections::stream::ExactlyOnce,
153 >,
154 crate::live_collections::Stream<
155 T,
156 Self,
157 crate::live_collections::boundedness::Unbounded,
158 crate::live_collections::stream::TotalOrder,
159 crate::live_collections::stream::ExactlyOnce,
160 >,
161 )
162 where
163 T: serde::Serialize + serde::de::DeserializeOwned,
164 {
165 use crate::location::Location;
166
167 let external_location: crate::location::External<'a, ()> = crate::location::External {
168 key: LocationKey::FIRST,
169 flow_state: self.flow_state.clone(),
170 _phantom: PhantomData,
171 };
172
173 let (external, stream) = self.source_external_bincode(&external_location);
174
175 (
176 crate::sim::SimClusterSender(external.port_id, PhantomData),
177 stream,
178 )
179 }
180}
181
182pub struct ClusterIds<'a> {
187 pub key: LocationKey,
189 pub _phantom: PhantomData<&'a ()>,
191}
192
193impl<'a> Clone for ClusterIds<'a> {
194 fn clone(&self) -> Self {
195 Self {
196 key: self.key,
197 _phantom: Default::default(),
198 }
199 }
200}
201
202impl<'a, Ctx> FreeVariableWithContextWithProps<Ctx, ()> for ClusterIds<'a> {
203 type O = &'a [TaglessMemberId];
204
205 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
206 where
207 Self: Sized,
208 {
209 let ident = syn::Ident::new(
210 &format!("__hydro_lang_cluster_ids_{}", self.key),
211 Span::call_site(),
212 );
213
214 (
215 QuoteTokens {
216 prelude: None,
217 expr: Some(quote! { #ident }),
218 },
219 (),
220 )
221 }
222}
223
224impl<'a, Ctx> QuotedWithContextWithProps<'a, &'a [TaglessMemberId], Ctx, ()> for ClusterIds<'a> {}
225
226pub trait IsCluster {
228 type Tag;
230}
231
232impl<C> IsCluster for Cluster<'_, C> {
233 type Tag = C;
234}
235
236pub static CLUSTER_SELF_ID: ClusterSelfId = ClusterSelfId { _private: &() };
239
240#[derive(Clone, Copy)]
245pub struct ClusterSelfId<'a> {
246 _private: &'a (),
247}
248
249impl<'a, L> FreeVariableWithContextWithProps<L, ()> for ClusterSelfId<'a>
250where
251 L: Location<'a>,
252 <L as Location<'a>>::Root: IsCluster,
253{
254 type O = MemberId<<<L as Location<'a>>::Root as IsCluster>::Tag>;
255
256 fn to_tokens(self, ctx: &L) -> (QuoteTokens, ())
257 where
258 Self: Sized,
259 {
260 let LocationId::Cluster(cluster_id, _) = ctx.root().id() else {
261 unreachable!()
262 };
263
264 let ident = syn::Ident::new(
265 &format!("__hydro_lang_cluster_self_id_{}", cluster_id),
266 Span::call_site(),
267 );
268 let root = get_this_crate();
269 let c_type: syn::Type = quote_type::<<<L as Location<'a>>::Root as IsCluster>::Tag>();
270
271 (
272 QuoteTokens {
273 prelude: None,
274 expr: Some(
275 quote! { #root::__staged::location::MemberId::<#c_type>::from_tagless((#ident).clone()) },
276 ),
277 },
278 (),
279 )
280 }
281}
282
283impl<'a, L>
284 QuotedWithContextWithProps<'a, MemberId<<<L as Location<'a>>::Root as IsCluster>::Tag>, L, ()>
285 for ClusterSelfId<'a>
286where
287 L: Location<'a>,
288 <L as Location<'a>>::Root: IsCluster,
289{
290}
291
292#[cfg(test)]
293mod tests {
294 #[cfg(feature = "sim")]
295 use stageleft::q;
296
297 #[cfg(feature = "sim")]
298 use super::CLUSTER_SELF_ID;
299 #[cfg(feature = "sim")]
300 use crate::location::{Location, MemberId, MembershipEvent};
301 #[cfg(feature = "sim")]
302 use crate::networking::TCP;
303 #[cfg(feature = "sim")]
304 use crate::nondet::nondet;
305 #[cfg(feature = "sim")]
306 use crate::prelude::FlowBuilder;
307
308 #[cfg(feature = "sim")]
309 #[test]
310 fn sim_cluster_self_id() {
311 let mut flow = FlowBuilder::new();
312 let cluster1 = flow.cluster::<()>();
313 let cluster2 = flow.cluster::<()>();
314
315 let node = flow.process::<()>();
316
317 let out_recv = cluster1
318 .source_iter(q!(vec![CLUSTER_SELF_ID]))
319 .send(&node, TCP.fail_stop().bincode())
320 .values()
321 .merge_unordered(
322 cluster2
323 .source_iter(q!(vec![CLUSTER_SELF_ID]))
324 .send(&node, TCP.fail_stop().bincode())
325 .values(),
326 )
327 .sim_output();
328
329 flow.sim()
330 .with_cluster_size(&cluster1, 3)
331 .with_cluster_size(&cluster2, 4)
332 .exhaustive(async || {
333 out_recv
334 .assert_yields_only_unordered([0, 1, 2, 0, 1, 2, 3].map(MemberId::from_raw_id))
335 .await
336 });
337 }
338
339 #[cfg(feature = "sim")]
340 #[test]
341 fn sim_cluster_with_tick() {
342 use std::collections::HashMap;
343
344 let mut flow = FlowBuilder::new();
345 let cluster = flow.cluster::<()>();
346 let node = flow.process::<()>();
347
348 let out_recv = cluster
349 .source_iter(q!(vec![1, 2, 3]))
350 .batch(&cluster.tick(), nondet!())
351 .count()
352 .all_ticks()
353 .send(&node, TCP.fail_stop().bincode())
354 .entries()
355 .map(q!(|(id, v)| (id, v)))
356 .sim_output();
357
358 let count = flow
359 .sim()
360 .with_cluster_size(&cluster, 2)
361 .exhaustive(async || {
362 let grouped = out_recv.collect_sorted::<Vec<_>>().await.into_iter().fold(
363 HashMap::new(),
364 |mut acc: HashMap<MemberId<()>, usize>, (id, v)| {
365 *acc.entry(id).or_default() += v;
366 acc
367 },
368 );
369
370 assert!(grouped.len() == 2);
371 for (_id, v) in grouped {
372 assert!(v == 3);
373 }
374 });
375
376 assert_eq!(count, 106);
377 }
381
382 #[cfg(feature = "sim")]
383 #[test]
384 fn sim_cluster_membership() {
385 let mut flow = FlowBuilder::new();
386 let cluster = flow.cluster::<()>();
387 let node = flow.process::<()>();
388
389 let out_recv = node
390 .source_cluster_membership_stream(&cluster, nondet!())
391 .entries()
392 .map(q!(|(id, v)| (id, v)))
393 .sim_output();
394
395 flow.sim()
396 .with_cluster_size(&cluster, 2)
397 .exhaustive(async || {
398 out_recv
399 .assert_yields_only_unordered(vec![
400 (MemberId::from_raw_id(0), MembershipEvent::Joined),
401 (MemberId::from_raw_id(1), MembershipEvent::Joined),
402 ])
403 .await;
404 });
405 }
406}