hydro_lang/live_collections/singleton.rs
1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9
10use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
11use super::optional::Optional;
12use super::sliced::sliced;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::CycleId;
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::{DynLocation, LocationId};
21use crate::location::tick::{Atomic, NoAtomic};
22use crate::location::{Location, NoTick, Tick, check_matching_location};
23use crate::nondet::{NonDet, nondet};
24
25/// A single Rust value that can asynchronously change over time.
26///
27/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
28/// [`Unbounded`], the value will asynchronously change over time.
29///
30/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
31/// a single number that will asynchronously change as events are processed. Singletons also appear
32/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
33/// such as getting the length of a batch of requests.
34///
35/// Type Parameters:
36/// - `Type`: the type of the value in this singleton
37/// - `Loc`: the [`Location`] where the singleton is materialized
38/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
39pub struct Singleton<Type, Loc, Bound: Boundedness> {
40 pub(crate) location: Loc,
41 pub(crate) ir_node: RefCell<HydroNode>,
42
43 _phantom: PhantomData<(Type, Loc, Bound)>,
44}
45
46impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
47where
48 T: Clone,
49 L: Location<'a> + NoTick,
50{
51 fn from(value: Singleton<T, L, Bounded>) -> Self {
52 let tick = value.location().tick();
53 value.clone_into_tick(&tick).latest()
54 }
55}
56
57impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
58where
59 L: Location<'a>,
60{
61 type Location = Tick<L>;
62
63 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
64 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
65 location.clone(),
66 HydroNode::DeferTick {
67 input: Box::new(HydroNode::CycleSource {
68 cycle_id,
69 metadata: location.new_node_metadata(Self::collection_kind()),
70 }),
71 metadata: location
72 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
73 },
74 );
75
76 from_previous_tick.unwrap_or(initial)
77 }
78}
79
80impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
81where
82 L: Location<'a>,
83{
84 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
85 assert_eq!(
86 Location::id(&self.location),
87 expected_location,
88 "locations do not match"
89 );
90 self.location
91 .flow_state()
92 .borrow_mut()
93 .push_root(HydroRoot::CycleSink {
94 cycle_id,
95 input: Box::new(self.ir_node.into_inner()),
96 op_metadata: HydroIrOpMetadata::new(),
97 });
98 }
99}
100
101impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
102where
103 L: Location<'a>,
104{
105 type Location = Tick<L>;
106
107 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
108 Singleton::new(
109 location.clone(),
110 HydroNode::CycleSource {
111 cycle_id,
112 metadata: location.new_node_metadata(Self::collection_kind()),
113 },
114 )
115 }
116}
117
118impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
119where
120 L: Location<'a>,
121{
122 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
123 assert_eq!(
124 Location::id(&self.location),
125 expected_location,
126 "locations do not match"
127 );
128 self.location
129 .flow_state()
130 .borrow_mut()
131 .push_root(HydroRoot::CycleSink {
132 cycle_id,
133 input: Box::new(self.ir_node.into_inner()),
134 op_metadata: HydroIrOpMetadata::new(),
135 });
136 }
137}
138
139impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
140where
141 L: Location<'a> + NoTick,
142{
143 type Location = L;
144
145 fn create_source(cycle_id: CycleId, location: L) -> Self {
146 Singleton::new(
147 location.clone(),
148 HydroNode::CycleSource {
149 cycle_id,
150 metadata: location.new_node_metadata(Self::collection_kind()),
151 },
152 )
153 }
154}
155
156impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
157where
158 L: Location<'a> + NoTick,
159{
160 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
161 assert_eq!(
162 Location::id(&self.location),
163 expected_location,
164 "locations do not match"
165 );
166 self.location
167 .flow_state()
168 .borrow_mut()
169 .push_root(HydroRoot::CycleSink {
170 cycle_id,
171 input: Box::new(self.ir_node.into_inner()),
172 op_metadata: HydroIrOpMetadata::new(),
173 });
174 }
175}
176
177impl<'a, T, L, B: Boundedness> Clone for Singleton<T, L, B>
178where
179 T: Clone,
180 L: Location<'a>,
181{
182 fn clone(&self) -> Self {
183 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
184 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
185 *self.ir_node.borrow_mut() = HydroNode::Tee {
186 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
187 metadata: self.location.new_node_metadata(Self::collection_kind()),
188 };
189 }
190
191 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
192 Singleton {
193 location: self.location.clone(),
194 ir_node: HydroNode::Tee {
195 inner: TeeNode(inner.0.clone()),
196 metadata: metadata.clone(),
197 }
198 .into(),
199 _phantom: PhantomData,
200 }
201 } else {
202 unreachable!()
203 }
204 }
205}
206
207#[cfg(stageleft_runtime)]
208fn zip_inside_tick<'a, T, L: Location<'a>, B: Boundedness, O>(
209 me: Singleton<T, Tick<L>, B>,
210 other: O,
211) -> <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::Out
212where
213 Singleton<T, Tick<L>, B>: ZipResult<'a, O, Location = Tick<L>>,
214{
215 check_matching_location(
216 &me.location,
217 &Singleton::<T, Tick<L>, B>::other_location(&other),
218 );
219
220 Singleton::<T, Tick<L>, B>::make(
221 me.location.clone(),
222 HydroNode::CrossSingleton {
223 left: Box::new(me.ir_node.into_inner()),
224 right: Box::new(Singleton::<T, Tick<L>, B>::other_ir_node(other)),
225 metadata: me.location.new_node_metadata(CollectionKind::Singleton {
226 bound: B::BOUND_KIND,
227 element_type: stageleft::quote_type::<
228 <Singleton<T, Tick<L>, B> as ZipResult<'a, O>>::ElementType,
229 >()
230 .into(),
231 }),
232 },
233 )
234}
235
236impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
237where
238 L: Location<'a>,
239{
240 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
241 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
242 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
243 Singleton {
244 location,
245 ir_node: RefCell::new(ir_node),
246 _phantom: PhantomData,
247 }
248 }
249
250 pub(crate) fn collection_kind() -> CollectionKind {
251 CollectionKind::Singleton {
252 bound: B::BOUND_KIND,
253 element_type: stageleft::quote_type::<T>().into(),
254 }
255 }
256
257 /// Returns the [`Location`] where this singleton is being materialized.
258 pub fn location(&self) -> &L {
259 &self.location
260 }
261
262 /// Transforms the singleton value by applying a function `f` to it,
263 /// continuously as the input is updated.
264 ///
265 /// # Example
266 /// ```rust
267 /// # #[cfg(feature = "deploy")] {
268 /// # use hydro_lang::prelude::*;
269 /// # use futures::StreamExt;
270 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
271 /// let tick = process.tick();
272 /// let singleton = tick.singleton(q!(5));
273 /// singleton.map(q!(|v| v * 2)).all_ticks()
274 /// # }, |mut stream| async move {
275 /// // 10
276 /// # assert_eq!(stream.next().await.unwrap(), 10);
277 /// # }));
278 /// # }
279 /// ```
280 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton<U, L, B>
281 where
282 F: Fn(T) -> U + 'a,
283 {
284 let f = f.splice_fn1_ctx(&self.location).into();
285 Singleton::new(
286 self.location.clone(),
287 HydroNode::Map {
288 f,
289 input: Box::new(self.ir_node.into_inner()),
290 metadata: self
291 .location
292 .new_node_metadata(Singleton::<U, L, B>::collection_kind()),
293 },
294 )
295 }
296
297 /// Transforms the singleton value by applying a function `f` to it and then flattening
298 /// the result into a stream, preserving the order of elements.
299 ///
300 /// The function `f` is applied to the singleton value to produce an iterator, and all items
301 /// from that iterator are emitted in the output stream in deterministic order.
302 ///
303 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
304 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
305 /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
306 ///
307 /// # Example
308 /// ```rust
309 /// # #[cfg(feature = "deploy")] {
310 /// # use hydro_lang::prelude::*;
311 /// # use futures::StreamExt;
312 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
313 /// let tick = process.tick();
314 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
315 /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
316 /// # }, |mut stream| async move {
317 /// // 1, 2, 3
318 /// # for w in vec![1, 2, 3] {
319 /// # assert_eq!(stream.next().await.unwrap(), w);
320 /// # }
321 /// # }));
322 /// # }
323 /// ```
324 pub fn flat_map_ordered<U, I, F>(
325 self,
326 f: impl IntoQuotedMut<'a, F, L>,
327 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
328 where
329 I: IntoIterator<Item = U>,
330 F: Fn(T) -> I + 'a,
331 {
332 let f = f.splice_fn1_ctx(&self.location).into();
333 Stream::new(
334 self.location.clone(),
335 HydroNode::FlatMap {
336 f,
337 input: Box::new(self.ir_node.into_inner()),
338 metadata: self.location.new_node_metadata(
339 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
340 ),
341 },
342 )
343 }
344
345 /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
346 /// for the output type `I` to produce items in any order.
347 ///
348 /// The function `f` is applied to the singleton value to produce an iterator, and all items
349 /// from that iterator are emitted in the output stream in non-deterministic order.
350 ///
351 /// # Example
352 /// ```rust
353 /// # #[cfg(feature = "deploy")] {
354 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
355 /// # use futures::StreamExt;
356 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
357 /// let tick = process.tick();
358 /// let singleton = tick.singleton(q!(
359 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
360 /// ));
361 /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
362 /// # }, |mut stream| async move {
363 /// // 1, 2, 3, but in no particular order
364 /// # let mut results = Vec::new();
365 /// # for _ in 0..3 {
366 /// # results.push(stream.next().await.unwrap());
367 /// # }
368 /// # results.sort();
369 /// # assert_eq!(results, vec![1, 2, 3]);
370 /// # }));
371 /// # }
372 /// ```
373 pub fn flat_map_unordered<U, I, F>(
374 self,
375 f: impl IntoQuotedMut<'a, F, L>,
376 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
377 where
378 I: IntoIterator<Item = U>,
379 F: Fn(T) -> I + 'a,
380 {
381 let f = f.splice_fn1_ctx(&self.location).into();
382 Stream::new(
383 self.location.clone(),
384 HydroNode::FlatMap {
385 f,
386 input: Box::new(self.ir_node.into_inner()),
387 metadata: self
388 .location
389 .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
390 },
391 )
392 }
393
394 /// Flattens the singleton value into a stream, preserving the order of elements.
395 ///
396 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
397 /// are emitted in the output stream in deterministic order.
398 ///
399 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
400 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
401 /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
402 ///
403 /// # Example
404 /// ```rust
405 /// # #[cfg(feature = "deploy")] {
406 /// # use hydro_lang::prelude::*;
407 /// # use futures::StreamExt;
408 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
409 /// let tick = process.tick();
410 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
411 /// singleton.flatten_ordered().all_ticks()
412 /// # }, |mut stream| async move {
413 /// // 1, 2, 3
414 /// # for w in vec![1, 2, 3] {
415 /// # assert_eq!(stream.next().await.unwrap(), w);
416 /// # }
417 /// # }));
418 /// # }
419 /// ```
420 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
421 where
422 T: IntoIterator<Item = U>,
423 {
424 self.flat_map_ordered(q!(|x| x))
425 }
426
427 /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
428 /// for the element type `T` to produce items in any order.
429 ///
430 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
431 /// are emitted in the output stream in non-deterministic order.
432 ///
433 /// # Example
434 /// ```rust
435 /// # #[cfg(feature = "deploy")] {
436 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
437 /// # use futures::StreamExt;
438 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
439 /// let tick = process.tick();
440 /// let singleton = tick.singleton(q!(
441 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
442 /// ));
443 /// singleton.flatten_unordered().all_ticks()
444 /// # }, |mut stream| async move {
445 /// // 1, 2, 3, but in no particular order
446 /// # let mut results = Vec::new();
447 /// # for _ in 0..3 {
448 /// # results.push(stream.next().await.unwrap());
449 /// # }
450 /// # results.sort();
451 /// # assert_eq!(results, vec![1, 2, 3]);
452 /// # }));
453 /// # }
454 /// ```
455 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
456 where
457 T: IntoIterator<Item = U>,
458 {
459 self.flat_map_unordered(q!(|x| x))
460 }
461
462 /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
463 ///
464 /// If the predicate returns `true`, the output optional contains the same value.
465 /// If the predicate returns `false`, the output optional is empty.
466 ///
467 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
468 /// not modify or take ownership of the value. If you need to modify the value while filtering
469 /// use [`Singleton::filter_map`] instead.
470 ///
471 /// # Example
472 /// ```rust
473 /// # #[cfg(feature = "deploy")] {
474 /// # use hydro_lang::prelude::*;
475 /// # use futures::StreamExt;
476 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
477 /// let tick = process.tick();
478 /// let singleton = tick.singleton(q!(5));
479 /// singleton.filter(q!(|&x| x > 3)).all_ticks()
480 /// # }, |mut stream| async move {
481 /// // 5
482 /// # assert_eq!(stream.next().await.unwrap(), 5);
483 /// # }));
484 /// # }
485 /// ```
486 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
487 where
488 F: Fn(&T) -> bool + 'a,
489 {
490 let f = f.splice_fn1_borrow_ctx(&self.location).into();
491 Optional::new(
492 self.location.clone(),
493 HydroNode::Filter {
494 f,
495 input: Box::new(self.ir_node.into_inner()),
496 metadata: self
497 .location
498 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
499 },
500 )
501 }
502
503 /// An operator that both filters and maps. It yields the value only if the supplied
504 /// closure `f` returns `Some(value)`.
505 ///
506 /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
507 /// If the closure returns `None`, the output optional is empty.
508 ///
509 /// # Example
510 /// ```rust
511 /// # #[cfg(feature = "deploy")] {
512 /// # use hydro_lang::prelude::*;
513 /// # use futures::StreamExt;
514 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
515 /// let tick = process.tick();
516 /// let singleton = tick.singleton(q!("42"));
517 /// singleton
518 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
519 /// .all_ticks()
520 /// # }, |mut stream| async move {
521 /// // 42
522 /// # assert_eq!(stream.next().await.unwrap(), 42);
523 /// # }));
524 /// # }
525 /// ```
526 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
527 where
528 F: Fn(T) -> Option<U> + 'a,
529 {
530 let f = f.splice_fn1_ctx(&self.location).into();
531 Optional::new(
532 self.location.clone(),
533 HydroNode::FilterMap {
534 f,
535 input: Box::new(self.ir_node.into_inner()),
536 metadata: self
537 .location
538 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
539 },
540 )
541 }
542
543 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
544 ///
545 /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
546 /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
547 /// non-null. This is useful for combining several pieces of state together.
548 ///
549 /// # Example
550 /// ```rust
551 /// # #[cfg(feature = "deploy")] {
552 /// # use hydro_lang::prelude::*;
553 /// # use futures::StreamExt;
554 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
555 /// let tick = process.tick();
556 /// let numbers = process
557 /// .source_iter(q!(vec![123, 456]))
558 /// .batch(&tick, nondet!(/** test */));
559 /// let count = numbers.clone().count(); // Singleton
560 /// let max = numbers.max(); // Optional
561 /// count.zip(max).all_ticks()
562 /// # }, |mut stream| async move {
563 /// // [(2, 456)]
564 /// # for w in vec![(2, 456)] {
565 /// # assert_eq!(stream.next().await.unwrap(), w);
566 /// # }
567 /// # }));
568 /// # }
569 /// ```
570 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
571 where
572 Self: ZipResult<'a, O, Location = L>,
573 {
574 check_matching_location(&self.location, &Self::other_location(&other));
575
576 if L::is_top_level()
577 && let Some(tick) = self.location.try_tick()
578 {
579 let out = zip_inside_tick(
580 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
581 Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
582 Self::other_location(&other),
583 Self::other_ir_node(other),
584 )
585 .snapshot(&tick, nondet!(/** eventually stabilizes */)),
586 )
587 .latest();
588
589 Self::make(out.location, out.ir_node.into_inner())
590 } else {
591 Self::make(
592 self.location.clone(),
593 HydroNode::CrossSingleton {
594 left: Box::new(self.ir_node.into_inner()),
595 right: Box::new(Self::other_ir_node(other)),
596 metadata: self.location.new_node_metadata(CollectionKind::Optional {
597 bound: B::BOUND_KIND,
598 element_type: stageleft::quote_type::<
599 <Self as ZipResult<'a, O>>::ElementType,
600 >()
601 .into(),
602 }),
603 },
604 )
605 }
606 }
607
608 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
609 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
610 ///
611 /// Useful for conditionally processing, such as only emitting a singleton's value outside
612 /// a tick if some other condition is satisfied.
613 ///
614 /// # Example
615 /// ```rust
616 /// # #[cfg(feature = "deploy")] {
617 /// # use hydro_lang::prelude::*;
618 /// # use futures::StreamExt;
619 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
620 /// let tick = process.tick();
621 /// // ticks are lazy by default, forces the second tick to run
622 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
623 ///
624 /// let batch_first_tick = process
625 /// .source_iter(q!(vec![1]))
626 /// .batch(&tick, nondet!(/** test */));
627 /// let batch_second_tick = process
628 /// .source_iter(q!(vec![1, 2, 3]))
629 /// .batch(&tick, nondet!(/** test */))
630 /// .defer_tick(); // appears on the second tick
631 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
632 /// batch_first_tick.chain(batch_second_tick).count()
633 /// .filter_if_some(some_on_first_tick)
634 /// .all_ticks()
635 /// # }, |mut stream| async move {
636 /// // [1]
637 /// # for w in vec![1] {
638 /// # assert_eq!(stream.next().await.unwrap(), w);
639 /// # }
640 /// # }));
641 /// # }
642 /// ```
643 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B> {
644 self.zip::<Optional<(), L, B>>(signal.map(q!(|_u| ())))
645 .map(q!(|(d, _signal)| d))
646 }
647
648 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
649 /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
650 ///
651 /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
652 /// the condition.
653 ///
654 /// # Example
655 /// ```rust
656 /// # #[cfg(feature = "deploy")] {
657 /// # use hydro_lang::prelude::*;
658 /// # use futures::StreamExt;
659 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
660 /// let tick = process.tick();
661 /// // ticks are lazy by default, forces the second tick to run
662 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
663 ///
664 /// let batch_first_tick = process
665 /// .source_iter(q!(vec![1]))
666 /// .batch(&tick, nondet!(/** test */));
667 /// let batch_second_tick = process
668 /// .source_iter(q!(vec![1, 2, 3]))
669 /// .batch(&tick, nondet!(/** test */))
670 /// .defer_tick(); // appears on the second tick
671 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
672 /// batch_first_tick.chain(batch_second_tick).count()
673 /// .filter_if_none(some_on_first_tick)
674 /// .all_ticks()
675 /// # }, |mut stream| async move {
676 /// // [3]
677 /// # for w in vec![3] {
678 /// # assert_eq!(stream.next().await.unwrap(), w);
679 /// # }
680 /// # }));
681 /// # }
682 /// ```
683 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B> {
684 self.filter_if_some(
685 other
686 .map(q!(|_| ()))
687 .into_singleton()
688 .filter(q!(|o| o.is_none())),
689 )
690 }
691
692 /// An operator which allows you to "name" a `HydroNode`.
693 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
694 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
695 {
696 let mut node = self.ir_node.borrow_mut();
697 let metadata = node.metadata_mut();
698 metadata.tag = Some(name.to_owned());
699 }
700 self
701 }
702}
703
704impl<'a, T, L, B: Boundedness> Singleton<Option<T>, L, B>
705where
706 L: Location<'a>,
707{
708 /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
709 /// the inner `Option`.
710 ///
711 /// This is implemented as an identity [`Singleton::filter_map`], passing through the
712 /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
713 /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
714 ///
715 /// # Example
716 /// ```rust
717 /// # #[cfg(feature = "deploy")] {
718 /// # use hydro_lang::prelude::*;
719 /// # use futures::StreamExt;
720 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
721 /// let tick = process.tick();
722 /// let singleton = tick.singleton(q!(Some(42)));
723 /// singleton.into_optional().all_ticks()
724 /// # }, |mut stream| async move {
725 /// // 42
726 /// # assert_eq!(stream.next().await.unwrap(), 42);
727 /// # }));
728 /// # }
729 /// ```
730 pub fn into_optional(self) -> Optional<T, L, B> {
731 self.filter_map(q!(|v| v))
732 }
733}
734
735impl<'a, T, L, B: Boundedness> Singleton<T, Atomic<L>, B>
736where
737 L: Location<'a> + NoTick,
738{
739 /// Returns a singleton value corresponding to the latest snapshot of the singleton
740 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
741 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
742 /// all snapshots of this singleton into the atomic-associated tick will observe the
743 /// same value each tick.
744 ///
745 /// # Non-Determinism
746 /// Because this picks a snapshot of a singleton whose value is continuously changing,
747 /// the output singleton has a non-deterministic value since the snapshot can be at an
748 /// arbitrary point in time.
749 pub fn snapshot_atomic(self, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
750 Singleton::new(
751 self.location.clone().tick,
752 HydroNode::Batch {
753 inner: Box::new(self.ir_node.into_inner()),
754 metadata: self
755 .location
756 .tick
757 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
758 },
759 )
760 }
761
762 /// Returns this singleton back into a top-level, asynchronous execution context where updates
763 /// to the value will be asynchronously propagated.
764 pub fn end_atomic(self) -> Singleton<T, L, B> {
765 Singleton::new(
766 self.location.tick.l.clone(),
767 HydroNode::EndAtomic {
768 inner: Box::new(self.ir_node.into_inner()),
769 metadata: self
770 .location
771 .tick
772 .l
773 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
774 },
775 )
776 }
777}
778
779impl<'a, T, L, B: Boundedness> Singleton<T, L, B>
780where
781 L: Location<'a>,
782{
783 /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
784 /// will observe the same version of the value and will be executed synchronously before any
785 /// outputs are yielded (in [`Optional::end_atomic`]).
786 ///
787 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
788 /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
789 /// a different version).
790 ///
791 /// Entering an atomic section requires a [`Tick`] argument that declares where the singleton will
792 /// be atomically processed. Snapshotting an singleton into the _same_ [`Tick`] will preserve the
793 /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
794 pub fn atomic(self, tick: &Tick<L>) -> Singleton<T, Atomic<L>, B> {
795 let out_location = Atomic { tick: tick.clone() };
796 Singleton::new(
797 out_location.clone(),
798 HydroNode::BeginAtomic {
799 inner: Box::new(self.ir_node.into_inner()),
800 metadata: out_location
801 .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
802 },
803 )
804 }
805
806 /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
807 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
808 /// relevant data that contributed to the snapshot at tick `t`.
809 ///
810 /// # Non-Determinism
811 /// Because this picks a snapshot of a singleton whose value is continuously changing,
812 /// the output singleton has a non-deterministic value since the snapshot can be at an
813 /// arbitrary point in time.
814 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
815 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
816 Singleton::new(
817 tick.clone(),
818 HydroNode::Batch {
819 inner: Box::new(self.ir_node.into_inner()),
820 metadata: tick
821 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
822 },
823 )
824 }
825
826 /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
827 /// with order corresponding to increasing prefixes of data contributing to the singleton.
828 ///
829 /// # Non-Determinism
830 /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
831 /// to non-deterministic batching and arrival of inputs, the output stream is
832 /// non-deterministic.
833 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
834 where
835 L: NoTick,
836 {
837 sliced! {
838 let snapshot = use(self, nondet);
839 snapshot.into_stream()
840 }
841 .weaken_retries()
842 }
843
844 /// Given a time interval, returns a stream corresponding to snapshots of the singleton
845 /// value taken at various points in time. Because the input singleton may be
846 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
847 /// represent the value of the singleton given some prefix of the streams leading up to
848 /// it.
849 ///
850 /// # Non-Determinism
851 /// The output stream is non-deterministic in which elements are sampled, since this
852 /// is controlled by a clock.
853 pub fn sample_every(
854 self,
855 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
856 nondet: NonDet,
857 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
858 where
859 L: NoTick + NoAtomic,
860 {
861 let samples = self.location.source_interval(interval, nondet);
862 sliced! {
863 let snapshot = use(self, nondet);
864 let sample_batch = use(samples, nondet);
865
866 snapshot.filter_if_some(sample_batch.first()).into_stream()
867 }
868 .weaken_retries()
869 }
870
871 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
872 /// implies that `B == Bounded`.
873 pub fn make_bounded(self) -> Singleton<T, L, Bounded>
874 where
875 B: IsBounded,
876 {
877 Singleton::new(self.location, self.ir_node.into_inner())
878 }
879
880 /// Clones this bounded singleton into a tick, returning a singleton that has the
881 /// same value as the outer singleton. Because the outer singleton is bounded, this
882 /// is deterministic because there is only a single immutable version.
883 pub fn clone_into_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
884 where
885 B: IsBounded,
886 T: Clone,
887 {
888 // TODO(shadaj): avoid printing simulator logs for this snapshot
889 self.snapshot(
890 tick,
891 nondet!(/** bounded top-level singleton so deterministic */),
892 )
893 }
894
895 /// Converts this singleton into a [`Stream`] containing a single element, the value.
896 ///
897 /// # Example
898 /// ```rust
899 /// # #[cfg(feature = "deploy")] {
900 /// # use hydro_lang::prelude::*;
901 /// # use futures::StreamExt;
902 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
903 /// let tick = process.tick();
904 /// let batch_input = process
905 /// .source_iter(q!(vec![123, 456]))
906 /// .batch(&tick, nondet!(/** test */));
907 /// batch_input.clone().chain(
908 /// batch_input.count().into_stream()
909 /// ).all_ticks()
910 /// # }, |mut stream| async move {
911 /// // [123, 456, 2]
912 /// # for w in vec![123, 456, 2] {
913 /// # assert_eq!(stream.next().await.unwrap(), w);
914 /// # }
915 /// # }));
916 /// # }
917 /// ```
918 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
919 where
920 B: IsBounded,
921 {
922 Stream::new(
923 self.location.clone(),
924 HydroNode::Cast {
925 inner: Box::new(self.ir_node.into_inner()),
926 metadata: self.location.new_node_metadata(Stream::<
927 T,
928 Tick<L>,
929 Bounded,
930 TotalOrder,
931 ExactlyOnce,
932 >::collection_kind()),
933 },
934 )
935 }
936}
937
938impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
939where
940 L: Location<'a>,
941{
942 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
943 /// which will stream the value computed in _each_ tick as a separate stream element.
944 ///
945 /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
946 /// producing one element in the output for each tick. This is useful for batched computations,
947 /// where the results from each tick must be combined together.
948 ///
949 /// # Example
950 /// ```rust
951 /// # #[cfg(feature = "deploy")] {
952 /// # use hydro_lang::prelude::*;
953 /// # use futures::StreamExt;
954 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
955 /// let tick = process.tick();
956 /// # // ticks are lazy by default, forces the second tick to run
957 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
958 /// # let batch_first_tick = process
959 /// # .source_iter(q!(vec![1]))
960 /// # .batch(&tick, nondet!(/** test */));
961 /// # let batch_second_tick = process
962 /// # .source_iter(q!(vec![1, 2, 3]))
963 /// # .batch(&tick, nondet!(/** test */))
964 /// # .defer_tick(); // appears on the second tick
965 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
966 /// input_batch // first tick: [1], second tick: [1, 2, 3]
967 /// .count()
968 /// .all_ticks()
969 /// # }, |mut stream| async move {
970 /// // [1, 3]
971 /// # for w in vec![1, 3] {
972 /// # assert_eq!(stream.next().await.unwrap(), w);
973 /// # }
974 /// # }));
975 /// # }
976 /// ```
977 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
978 self.into_stream().all_ticks()
979 }
980
981 /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
982 /// which will stream the value computed in _each_ tick as a separate stream element.
983 ///
984 /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
985 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
986 /// singleton's [`Tick`] context.
987 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
988 self.into_stream().all_ticks_atomic()
989 }
990
991 /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
992 /// be asynchronously updated with the latest value of the singleton inside the tick.
993 ///
994 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
995 /// tick that tracks the inner value. This is useful for getting the value as of the
996 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
997 ///
998 /// # Example
999 /// ```rust
1000 /// # #[cfg(feature = "deploy")] {
1001 /// # use hydro_lang::prelude::*;
1002 /// # use futures::StreamExt;
1003 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1004 /// let tick = process.tick();
1005 /// # // ticks are lazy by default, forces the second tick to run
1006 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1007 /// # let batch_first_tick = process
1008 /// # .source_iter(q!(vec![1]))
1009 /// # .batch(&tick, nondet!(/** test */));
1010 /// # let batch_second_tick = process
1011 /// # .source_iter(q!(vec![1, 2, 3]))
1012 /// # .batch(&tick, nondet!(/** test */))
1013 /// # .defer_tick(); // appears on the second tick
1014 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1015 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1016 /// .count()
1017 /// .latest()
1018 /// # .sample_eager(nondet!(/** test */))
1019 /// # }, |mut stream| async move {
1020 /// // asynchronously changes from 1 ~> 3
1021 /// # for w in vec![1, 3] {
1022 /// # assert_eq!(stream.next().await.unwrap(), w);
1023 /// # }
1024 /// # }));
1025 /// # }
1026 /// ```
1027 pub fn latest(self) -> Singleton<T, L, Unbounded> {
1028 Singleton::new(
1029 self.location.outer().clone(),
1030 HydroNode::YieldConcat {
1031 inner: Box::new(self.ir_node.into_inner()),
1032 metadata: self
1033 .location
1034 .outer()
1035 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1036 },
1037 )
1038 }
1039
1040 /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1041 /// be updated with the latest value of the singleton inside the tick.
1042 ///
1043 /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1044 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1045 /// singleton's [`Tick`] context.
1046 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1047 let out_location = Atomic {
1048 tick: self.location.clone(),
1049 };
1050 Singleton::new(
1051 out_location.clone(),
1052 HydroNode::YieldConcat {
1053 inner: Box::new(self.ir_node.into_inner()),
1054 metadata: out_location
1055 .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1056 },
1057 )
1058 }
1059}
1060
1061#[doc(hidden)]
1062/// Helper trait that determines the output collection type for [`Singleton::zip`].
1063///
1064/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1065/// [`Singleton`].
1066#[sealed::sealed]
1067pub trait ZipResult<'a, Other> {
1068 /// The output collection type.
1069 type Out;
1070 /// The type of the tupled output value.
1071 type ElementType;
1072 /// The type of the other collection's value.
1073 type OtherType;
1074 /// The location where the tupled result will be materialized.
1075 type Location: Location<'a>;
1076
1077 /// The location of the second input to the `zip`.
1078 fn other_location(other: &Other) -> Self::Location;
1079 /// The IR node of the second input to the `zip`.
1080 fn other_ir_node(other: Other) -> HydroNode;
1081
1082 /// Constructs the output live collection given an IR node containing the zip result.
1083 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1084}
1085
1086#[sealed::sealed]
1087impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1088where
1089 L: Location<'a>,
1090{
1091 type Out = Singleton<(T, U), L, B>;
1092 type ElementType = (T, U);
1093 type OtherType = U;
1094 type Location = L;
1095
1096 fn other_location(other: &Singleton<U, L, B>) -> L {
1097 other.location.clone()
1098 }
1099
1100 fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1101 other.ir_node.into_inner()
1102 }
1103
1104 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1105 Singleton::new(
1106 location.clone(),
1107 HydroNode::Cast {
1108 inner: Box::new(ir_node),
1109 metadata: location.new_node_metadata(Self::Out::collection_kind()),
1110 },
1111 )
1112 }
1113}
1114
1115#[sealed::sealed]
1116impl<'a, T, U, L, B: Boundedness> ZipResult<'a, Optional<U, L, B>> for Singleton<T, L, B>
1117where
1118 L: Location<'a>,
1119{
1120 type Out = Optional<(T, U), L, B>;
1121 type ElementType = (T, U);
1122 type OtherType = U;
1123 type Location = L;
1124
1125 fn other_location(other: &Optional<U, L, B>) -> L {
1126 other.location.clone()
1127 }
1128
1129 fn other_ir_node(other: Optional<U, L, B>) -> HydroNode {
1130 other.ir_node.into_inner()
1131 }
1132
1133 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1134 Optional::new(location, ir_node)
1135 }
1136}
1137
1138#[cfg(test)]
1139mod tests {
1140 #[cfg(feature = "deploy")]
1141 use futures::{SinkExt, StreamExt};
1142 #[cfg(feature = "deploy")]
1143 use hydro_deploy::Deployment;
1144 #[cfg(any(feature = "deploy", feature = "sim"))]
1145 use stageleft::q;
1146
1147 #[cfg(any(feature = "deploy", feature = "sim"))]
1148 use crate::compile::builder::FlowBuilder;
1149 #[cfg(feature = "deploy")]
1150 use crate::live_collections::stream::ExactlyOnce;
1151 #[cfg(any(feature = "deploy", feature = "sim"))]
1152 use crate::location::Location;
1153 #[cfg(any(feature = "deploy", feature = "sim"))]
1154 use crate::nondet::nondet;
1155
1156 #[cfg(feature = "deploy")]
1157 #[tokio::test]
1158 async fn tick_cycle_cardinality() {
1159 let mut deployment = Deployment::new();
1160
1161 let mut flow = FlowBuilder::new();
1162 let node = flow.process::<()>();
1163 let external = flow.external::<()>();
1164
1165 let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1166
1167 let node_tick = node.tick();
1168 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1169 let counts = singleton
1170 .clone()
1171 .into_stream()
1172 .count()
1173 .filter_if_some(input.batch(&node_tick, nondet!(/** testing */)).first())
1174 .all_ticks()
1175 .send_bincode_external(&external);
1176 complete_cycle.complete_next_tick(singleton);
1177
1178 let nodes = flow
1179 .with_process(&node, deployment.Localhost())
1180 .with_external(&external, deployment.Localhost())
1181 .deploy(&mut deployment);
1182
1183 deployment.deploy().await.unwrap();
1184
1185 let mut tick_trigger = nodes.connect(input_send).await;
1186 let mut external_out = nodes.connect(counts).await;
1187
1188 deployment.start().await.unwrap();
1189
1190 tick_trigger.send(()).await.unwrap();
1191
1192 assert_eq!(external_out.next().await.unwrap(), 1);
1193
1194 tick_trigger.send(()).await.unwrap();
1195
1196 assert_eq!(external_out.next().await.unwrap(), 1);
1197 }
1198
1199 #[cfg(feature = "sim")]
1200 #[test]
1201 #[should_panic]
1202 fn sim_fold_intermediate_states() {
1203 let mut flow = FlowBuilder::new();
1204 let node = flow.process::<()>();
1205
1206 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1207 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1208
1209 let tick = node.tick();
1210 let batch = folded.snapshot(&tick, nondet!(/** test */));
1211 let out_recv = batch.all_ticks().sim_output();
1212
1213 flow.sim().exhaustive(async || {
1214 assert_eq!(out_recv.next().await.unwrap(), 10);
1215 });
1216 }
1217
1218 #[cfg(feature = "sim")]
1219 #[test]
1220 fn sim_fold_intermediate_state_count() {
1221 let mut flow = FlowBuilder::new();
1222 let node = flow.process::<()>();
1223
1224 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1225 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1226
1227 let tick = node.tick();
1228 let batch = folded.snapshot(&tick, nondet!(/** test */));
1229 let out_recv = batch.all_ticks().sim_output();
1230
1231 let instance_count = flow.sim().exhaustive(async || {
1232 let out = out_recv.collect::<Vec<_>>().await;
1233 assert_eq!(out.last(), Some(&10));
1234 });
1235
1236 assert_eq!(
1237 instance_count,
1238 16 // 2^4 possible subsets of intermediates (including initial state)
1239 )
1240 }
1241
1242 #[cfg(feature = "sim")]
1243 #[test]
1244 fn sim_fold_no_repeat_initial() {
1245 // check that we don't repeat the initial state of the fold in autonomous decisions
1246
1247 let mut flow = FlowBuilder::new();
1248 let node = flow.process::<()>();
1249
1250 let (in_port, input) = node.sim_input();
1251 let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1252
1253 let tick = node.tick();
1254 let batch = folded.snapshot(&tick, nondet!(/** test */));
1255 let out_recv = batch.all_ticks().sim_output();
1256
1257 flow.sim().exhaustive(async || {
1258 assert_eq!(out_recv.next().await.unwrap(), 0);
1259
1260 in_port.send(123);
1261
1262 assert_eq!(out_recv.next().await.unwrap(), 123);
1263 });
1264 }
1265
1266 #[cfg(feature = "sim")]
1267 #[test]
1268 #[should_panic]
1269 fn sim_fold_repeats_snapshots() {
1270 // when the tick is driven by a snapshot AND something else, the snapshot can
1271 // "stutter" and repeat the same state multiple times
1272
1273 let mut flow = FlowBuilder::new();
1274 let node = flow.process::<()>();
1275
1276 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1277 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1278
1279 let tick = node.tick();
1280 let batch = source
1281 .batch(&tick, nondet!(/** test */))
1282 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1283 let out_recv = batch.all_ticks().sim_output();
1284
1285 flow.sim().exhaustive(async || {
1286 if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1287 {
1288 panic!("repeated snapshot");
1289 }
1290 });
1291 }
1292
1293 #[cfg(feature = "sim")]
1294 #[test]
1295 fn sim_fold_repeats_snapshots_count() {
1296 // check the number of instances
1297 let mut flow = FlowBuilder::new();
1298 let node = flow.process::<()>();
1299
1300 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1301 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1302
1303 let tick = node.tick();
1304 let batch = source
1305 .batch(&tick, nondet!(/** test */))
1306 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1307 let out_recv = batch.all_ticks().sim_output();
1308
1309 let count = flow.sim().exhaustive(async || {
1310 let _ = out_recv.collect::<Vec<_>>().await;
1311 });
1312
1313 assert_eq!(count, 52);
1314 // don't have a combinatorial explanation for this number yet, but checked via logs
1315 }
1316
1317 #[cfg(feature = "sim")]
1318 #[test]
1319 fn sim_top_level_singleton_exhaustive() {
1320 // ensures that top-level singletons have only one snapshot
1321 let mut flow = FlowBuilder::new();
1322 let node = flow.process::<()>();
1323
1324 let singleton = node.singleton(q!(1));
1325 let tick = node.tick();
1326 let batch = singleton.snapshot(&tick, nondet!(/** test */));
1327 let out_recv = batch.all_ticks().sim_output();
1328
1329 let count = flow.sim().exhaustive(async || {
1330 let _ = out_recv.collect::<Vec<_>>().await;
1331 });
1332
1333 assert_eq!(count, 1);
1334 }
1335
1336 #[cfg(feature = "sim")]
1337 #[test]
1338 fn sim_top_level_singleton_join_count() {
1339 // if a tick consumes a static snapshot and a stream batch, only the batch require space
1340 // exploration
1341
1342 let mut flow = FlowBuilder::new();
1343 let node = flow.process::<()>();
1344
1345 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1346 let tick = node.tick();
1347 let batch = source_iter
1348 .batch(&tick, nondet!(/** test */))
1349 .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1350 let out_recv = batch.all_ticks().sim_output();
1351
1352 let instance_count = flow.sim().exhaustive(async || {
1353 let _ = out_recv.collect::<Vec<_>>().await;
1354 });
1355
1356 assert_eq!(
1357 instance_count,
1358 16 // 2^4 ways to split up (including a possibly empty first batch)
1359 )
1360 }
1361
1362 #[cfg(feature = "sim")]
1363 #[test]
1364 fn top_level_singleton_into_stream_no_replay() {
1365 let mut flow = FlowBuilder::new();
1366 let node = flow.process::<()>();
1367
1368 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1369 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1370
1371 let out_recv = folded.into_stream().sim_output();
1372
1373 flow.sim().exhaustive(async || {
1374 out_recv.assert_yields_only([10]).await;
1375 });
1376 }
1377}