embassy_sync/
watch.rs

1//! A synchronization primitive for passing the latest value to **multiple** receivers.
2
3use core::cell::RefCell;
4use core::future::{poll_fn, Future};
5use core::marker::PhantomData;
6use core::ops::{Deref, DerefMut};
7use core::task::{Context, Poll};
8
9use crate::blocking_mutex::raw::RawMutex;
10use crate::blocking_mutex::Mutex;
11use crate::waitqueue::MultiWakerRegistration;
12
13/// The `Watch` is a single-slot signaling primitive that allows _multiple_ (`N`) receivers to concurrently await
14/// changes to the value. Unlike a [`Signal`](crate::signal::Signal), `Watch` supports multiple receivers,
15/// and unlike a [`PubSubChannel`](crate::pubsub::PubSubChannel), `Watch` immediately overwrites the previous
16/// value when a new one is sent, without waiting for all receivers to read the previous value.
17///
18/// This makes `Watch` particularly useful when a single task updates a value or "state", and multiple other tasks
19/// need to be notified about changes to this value asynchronously. Receivers may "lose" stale values, as they are
20/// always provided with the latest value.
21///
22/// Typically, `Watch` instances are declared as `static`, and a [`Sender`] and [`Receiver`]
23/// (or [`DynSender`] and/or [`DynReceiver`]) are obtained where relevant. An [`AnonReceiver`]
24/// and [`DynAnonReceiver`] are also available, which do not increase the receiver count for the
25/// channel, and unwrapping is therefore not required, but it is not possible to `.await` the channel.
26/// ```
27///
28/// use futures_executor::block_on;
29/// use embassy_sync::watch::Watch;
30/// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
31///
32/// let f = async {
33///
34/// static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
35///
36/// // Obtain receivers and sender
37/// let mut rcv0 = WATCH.receiver().unwrap();
38/// let mut rcv1 = WATCH.dyn_receiver().unwrap();
39/// let mut snd = WATCH.sender();
40///
41/// // No more receivers, and no update
42/// assert!(WATCH.receiver().is_none());
43/// assert_eq!(rcv1.try_changed(), None);
44///
45/// snd.send(10);
46///
47/// // Receive the new value (async or try)
48/// assert_eq!(rcv0.changed().await, 10);
49/// assert_eq!(rcv1.try_changed(), Some(10));
50///
51/// // No update
52/// assert_eq!(rcv0.try_changed(), None);
53/// assert_eq!(rcv1.try_changed(), None);
54///
55/// snd.send(20);
56///
57/// // Using `get` marks the value as seen
58/// assert_eq!(rcv1.get().await, 20);
59/// assert_eq!(rcv1.try_changed(), None);
60///
61/// // But `get` also returns when unchanged
62/// assert_eq!(rcv1.get().await, 20);
63/// assert_eq!(rcv1.get().await, 20);
64///
65/// };
66/// block_on(f);
67/// ```
68#[derive(Debug)]
69pub struct Watch<M: RawMutex, T: Clone, const N: usize> {
70    mutex: Mutex<M, RefCell<WatchState<T, N>>>,
71}
72
73#[derive(Debug)]
74struct WatchState<T: Clone, const N: usize> {
75    data: Option<T>,
76    current_id: u64,
77    wakers: MultiWakerRegistration<N>,
78    receiver_count: usize,
79}
80
81trait SealedWatchBehavior<T> {
82    /// Poll the `Watch` for the current value, making it as seen.
83    fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
84
85    /// Poll the `Watch` for the value if it matches the predicate function
86    /// `f`, making it as seen.
87    fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
88
89    /// Poll the `Watch` for a changed value, marking it as seen, if an id is given.
90    fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
91
92    /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen.
93    fn try_changed(&self, id: &mut u64) -> Option<T>;
94
95    /// Poll the `Watch` for a changed value that matches the predicate function
96    /// `f`, marking it as seen.
97    fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
98
99    /// Tries to retrieve the value of the `Watch` if it has changed and matches the
100    /// predicate function `f`, marking it as seen.
101    fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
102
103    /// Used when a receiver is dropped to decrement the receiver count.
104    ///
105    /// ## This method should not be called by the user.
106    fn drop_receiver(&self);
107
108    /// Clears the value of the `Watch`.
109    fn clear(&self);
110
111    /// Sends a new value to the `Watch`.
112    fn send(&self, val: T);
113
114    /// Modify the value of the `Watch` using a closure. Returns `false` if the
115    /// `Watch` does not already contain a value.
116    fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>));
117
118    /// Modify the value of the `Watch` using a closure. Returns `false` if the
119    /// `Watch` does not already contain a value.
120    fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool);
121}
122
123/// A trait representing the 'inner' behavior of the `Watch`.
124#[allow(private_bounds)]
125pub trait WatchBehavior<T: Clone>: SealedWatchBehavior<T> {
126    /// Tries to get the value of the `Watch`, marking it as seen, if an id is given.
127    fn try_get(&self, id: Option<&mut u64>) -> Option<T>;
128
129    /// Tries to get the value of the `Watch` if it matches the predicate function
130    /// `f`, marking it as seen.
131    fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
132
133    /// Checks if the `Watch` is been initialized with a value.
134    fn contains_value(&self) -> bool;
135}
136
137impl<M: RawMutex, T: Clone, const N: usize> SealedWatchBehavior<T> for Watch<M, T, N> {
138    fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
139        self.mutex.lock(|state| {
140            let mut s = state.borrow_mut();
141            match &s.data {
142                Some(data) => {
143                    *id = s.current_id;
144                    Poll::Ready(data.clone())
145                }
146                None => {
147                    s.wakers.register(cx.waker());
148                    Poll::Pending
149                }
150            }
151        })
152    }
153
154    fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
155        self.mutex.lock(|state| {
156            let mut s = state.borrow_mut();
157            match s.data {
158                Some(ref data) if f(data) => {
159                    *id = s.current_id;
160                    Poll::Ready(data.clone())
161                }
162                _ => {
163                    s.wakers.register(cx.waker());
164                    Poll::Pending
165                }
166            }
167        })
168    }
169
170    fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
171        self.mutex.lock(|state| {
172            let mut s = state.borrow_mut();
173            match (&s.data, s.current_id > *id) {
174                (Some(data), true) => {
175                    *id = s.current_id;
176                    Poll::Ready(data.clone())
177                }
178                _ => {
179                    s.wakers.register(cx.waker());
180                    Poll::Pending
181                }
182            }
183        })
184    }
185
186    fn try_changed(&self, id: &mut u64) -> Option<T> {
187        self.mutex.lock(|state| {
188            let s = state.borrow();
189            match s.current_id > *id {
190                true => {
191                    *id = s.current_id;
192                    s.data.clone()
193                }
194                false => None,
195            }
196        })
197    }
198
199    fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
200        self.mutex.lock(|state| {
201            let mut s = state.borrow_mut();
202            match (&s.data, s.current_id > *id) {
203                (Some(data), true) if f(data) => {
204                    *id = s.current_id;
205                    Poll::Ready(data.clone())
206                }
207                _ => {
208                    s.wakers.register(cx.waker());
209                    Poll::Pending
210                }
211            }
212        })
213    }
214
215    fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
216        self.mutex.lock(|state| {
217            let s = state.borrow();
218            match (&s.data, s.current_id > *id) {
219                (Some(data), true) if f(data) => {
220                    *id = s.current_id;
221                    s.data.clone()
222                }
223                _ => None,
224            }
225        })
226    }
227
228    fn drop_receiver(&self) {
229        self.mutex.lock(|state| {
230            let mut s = state.borrow_mut();
231            s.receiver_count -= 1;
232        })
233    }
234
235    fn clear(&self) {
236        self.mutex.lock(|state| {
237            let mut s = state.borrow_mut();
238            s.data = None;
239        })
240    }
241
242    fn send(&self, val: T) {
243        self.mutex.lock(|state| {
244            let mut s = state.borrow_mut();
245            s.data = Some(val);
246            s.current_id += 1;
247            s.wakers.wake();
248        })
249    }
250
251    fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>)) {
252        self.mutex.lock(|state| {
253            let mut s = state.borrow_mut();
254            f(&mut s.data);
255            s.current_id += 1;
256            s.wakers.wake();
257        })
258    }
259
260    fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool) {
261        self.mutex.lock(|state| {
262            let mut s = state.borrow_mut();
263            if f(&mut s.data) {
264                s.current_id += 1;
265                s.wakers.wake();
266            }
267        })
268    }
269}
270
271impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> {
272    fn try_get(&self, id: Option<&mut u64>) -> Option<T> {
273        self.mutex.lock(|state| {
274            let s = state.borrow();
275            if let Some(id) = id {
276                *id = s.current_id;
277            }
278            s.data.clone()
279        })
280    }
281
282    fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
283        self.mutex.lock(|state| {
284            let s = state.borrow();
285            match s.data {
286                Some(ref data) if f(data) => {
287                    if let Some(id) = id {
288                        *id = s.current_id;
289                    }
290                    Some(data.clone())
291                }
292                _ => None,
293            }
294        })
295    }
296
297    fn contains_value(&self) -> bool {
298        self.mutex.lock(|state| state.borrow().data.is_some())
299    }
300}
301
302impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
303    /// Create a new `Watch` channel for `N` receivers.
304    pub const fn new() -> Self {
305        Self {
306            mutex: Mutex::new(RefCell::new(WatchState {
307                data: None,
308                current_id: 0,
309                wakers: MultiWakerRegistration::new(),
310                receiver_count: 0,
311            })),
312        }
313    }
314
315    /// Create a new `Watch` channel with default data.
316    pub const fn new_with(data: T) -> Self {
317        Self {
318            mutex: Mutex::new(RefCell::new(WatchState {
319                data: Some(data),
320                current_id: 0,
321                wakers: MultiWakerRegistration::new(),
322                receiver_count: 0,
323            })),
324        }
325    }
326
327    /// Create a new [`Sender`] for the `Watch`.
328    pub fn sender(&self) -> Sender<'_, M, T, N> {
329        Sender(Snd::new(self))
330    }
331
332    /// Create a new [`DynSender`] for the `Watch`.
333    pub fn dyn_sender(&self) -> DynSender<'_, T> {
334        DynSender(Snd::new(self))
335    }
336
337    /// Try to create a new [`Receiver`] for the `Watch`. If the
338    /// maximum number of receivers has been reached, `None` is returned.
339    pub fn receiver(&self) -> Option<Receiver<'_, M, T, N>> {
340        self.mutex.lock(|state| {
341            let mut s = state.borrow_mut();
342            if s.receiver_count < N {
343                s.receiver_count += 1;
344                Some(Receiver(Rcv::new(self, 0)))
345            } else {
346                None
347            }
348        })
349    }
350
351    /// Try to create a new [`DynReceiver`] for the `Watch`. If the
352    /// maximum number of receivers has been reached, `None` is returned.
353    pub fn dyn_receiver(&self) -> Option<DynReceiver<'_, T>> {
354        self.mutex.lock(|state| {
355            let mut s = state.borrow_mut();
356            if s.receiver_count < N {
357                s.receiver_count += 1;
358                Some(DynReceiver(Rcv::new(self, 0)))
359            } else {
360                None
361            }
362        })
363    }
364
365    /// Try to create a new [`AnonReceiver`] for the `Watch`.
366    pub fn anon_receiver(&self) -> AnonReceiver<'_, M, T, N> {
367        AnonReceiver(AnonRcv::new(self, 0))
368    }
369
370    /// Try to create a new [`DynAnonReceiver`] for the `Watch`.
371    pub fn dyn_anon_receiver(&self) -> DynAnonReceiver<'_, T> {
372        DynAnonReceiver(AnonRcv::new(self, 0))
373    }
374
375    /// Returns the message ID of the latest message sent to the `Watch`.
376    ///
377    /// This counter is monotonic, and is incremented every time a new message is sent.
378    pub fn get_msg_id(&self) -> u64 {
379        self.mutex.lock(|state| state.borrow().current_id)
380    }
381
382    /// Tries to get the value of the `Watch`.
383    pub fn try_get(&self) -> Option<T> {
384        WatchBehavior::try_get(self, None)
385    }
386
387    /// Tries to get the value of the `Watch` if it matches the predicate function `f`.
388    pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
389    where
390        F: Fn(&T) -> bool,
391    {
392        WatchBehavior::try_get_and(self, None, &mut f)
393    }
394}
395
396/// A receiver can `.await` a change in the `Watch` value.
397#[derive(Debug)]
398pub struct Snd<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
399    watch: &'a W,
400    _phantom: PhantomData<T>,
401}
402
403impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Clone for Snd<'a, T, W> {
404    fn clone(&self) -> Self {
405        Self {
406            watch: self.watch,
407            _phantom: PhantomData,
408        }
409    }
410}
411
412impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> {
413    /// Creates a new `Receiver` with a reference to the `Watch`.
414    fn new(watch: &'a W) -> Self {
415        Self {
416            watch,
417            _phantom: PhantomData,
418        }
419    }
420
421    /// Sends a new value to the `Watch`.
422    pub fn send(&self, val: T) {
423        self.watch.send(val)
424    }
425
426    /// Clears the value of the `Watch`.
427    /// This will cause calls to [`Rcv::get`] to be pending.
428    pub fn clear(&self) {
429        self.watch.clear()
430    }
431
432    /// Tries to retrieve the value of the `Watch`.
433    pub fn try_get(&self) -> Option<T> {
434        self.watch.try_get(None)
435    }
436
437    /// Tries to peek the current value of the `Watch` if it matches the predicate
438    /// function `f`.
439    pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
440    where
441        F: Fn(&T) -> bool,
442    {
443        self.watch.try_get_and(None, &mut f)
444    }
445
446    /// Returns true if the `Watch` contains a value.
447    pub fn contains_value(&self) -> bool {
448        self.watch.contains_value()
449    }
450
451    /// Modify the value of the `Watch` using a closure.
452    pub fn send_modify<F>(&self, mut f: F)
453    where
454        F: Fn(&mut Option<T>),
455    {
456        self.watch.send_modify(&mut f)
457    }
458
459    /// Modify the value of the `Watch` using a closure. The closure must return
460    /// `true` if the value was modified, which notifies all receivers.
461    pub fn send_if_modified<F>(&self, mut f: F)
462    where
463        F: Fn(&mut Option<T>) -> bool,
464    {
465        self.watch.send_if_modified(&mut f)
466    }
467}
468
469/// A sender of a `Watch` channel.
470///
471/// For a simpler type definition, consider [`DynSender`] at the expense of
472/// some runtime performance due to dynamic dispatch.
473#[derive(Debug)]
474pub struct Sender<'a, M: RawMutex, T: Clone, const N: usize>(Snd<'a, T, Watch<M, T, N>>);
475
476impl<'a, M: RawMutex, T: Clone, const N: usize> Clone for Sender<'a, M, T, N> {
477    fn clone(&self) -> Self {
478        Self(self.0.clone())
479    }
480}
481
482impl<'a, M: RawMutex, T: Clone, const N: usize> Sender<'a, M, T, N> {
483    /// Converts the `Sender` into a [`DynSender`].
484    pub fn as_dyn(self) -> DynSender<'a, T> {
485        DynSender(Snd::new(self.watch))
486    }
487}
488
489impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynSender<'a, T>> for Sender<'a, M, T, N> {
490    fn into(self) -> DynSender<'a, T> {
491        self.as_dyn()
492    }
493}
494
495impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Sender<'a, M, T, N> {
496    type Target = Snd<'a, T, Watch<M, T, N>>;
497
498    fn deref(&self) -> &Self::Target {
499        &self.0
500    }
501}
502
503impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Sender<'a, M, T, N> {
504    fn deref_mut(&mut self) -> &mut Self::Target {
505        &mut self.0
506    }
507}
508
509/// A sender which holds a **dynamic** reference to a `Watch` channel.
510///
511/// This is an alternative to [`Sender`] with a simpler type definition,
512pub struct DynSender<'a, T: Clone>(Snd<'a, T, dyn WatchBehavior<T> + 'a>);
513
514impl<'a, T: Clone> Clone for DynSender<'a, T> {
515    fn clone(&self) -> Self {
516        Self(self.0.clone())
517    }
518}
519
520impl<'a, T: Clone> Deref for DynSender<'a, T> {
521    type Target = Snd<'a, T, dyn WatchBehavior<T> + 'a>;
522
523    fn deref(&self) -> &Self::Target {
524        &self.0
525    }
526}
527
528impl<'a, T: Clone> DerefMut for DynSender<'a, T> {
529    fn deref_mut(&mut self) -> &mut Self::Target {
530        &mut self.0
531    }
532}
533
534/// A receiver can `.await` a change in the `Watch` value.
535pub struct Rcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
536    watch: &'a W,
537    at_id: u64,
538    _phantom: PhantomData<T>,
539}
540
541impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
542    /// Creates a new `Receiver` with a reference to the `Watch`.
543    fn new(watch: &'a W, at_id: u64) -> Self {
544        Self {
545            watch,
546            at_id,
547            _phantom: PhantomData,
548        }
549    }
550
551    /// Returns the current value of the `Watch` once it is initialized, marking it as seen.
552    ///
553    /// **Note**: Futures do nothing unless you `.await` or poll them.
554    pub fn get(&mut self) -> impl Future<Output = T> + '_ {
555        poll_fn(|cx| self.watch.poll_get(&mut self.at_id, cx))
556    }
557
558    /// Tries to get the current value of the `Watch` without waiting, marking it as seen.
559    pub fn try_get(&mut self) -> Option<T> {
560        self.watch.try_get(Some(&mut self.at_id))
561    }
562
563    /// Returns the value of the `Watch` if it matches the predicate function `f`,
564    /// or waits for it to match, marking it as seen.
565    ///
566    /// **Note**: Futures do nothing unless you `.await` or poll them.
567    pub async fn get_and<F>(&mut self, mut f: F) -> T
568    where
569        F: Fn(&T) -> bool,
570    {
571        poll_fn(|cx| self.watch.poll_get_and(&mut self.at_id, &mut f, cx)).await
572    }
573
574    /// Tries to get the current value of the `Watch` if it matches the predicate
575    /// function `f` without waiting, marking it as seen.
576    pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T>
577    where
578        F: Fn(&T) -> bool,
579    {
580        self.watch.try_get_and(Some(&mut self.at_id), &mut f)
581    }
582
583    /// Waits for the `Watch` to change and returns the new value, marking it as seen.
584    ///
585    /// **Note**: Futures do nothing unless you `.await` or poll them.
586    pub async fn changed(&mut self) -> T {
587        poll_fn(|cx| self.watch.poll_changed(&mut self.at_id, cx)).await
588    }
589
590    /// Tries to get the new value of the watch without waiting, marking it as seen.
591    pub fn try_changed(&mut self) -> Option<T> {
592        self.watch.try_changed(&mut self.at_id)
593    }
594
595    /// Waits for the `Watch` to change to a value which satisfies the predicate
596    /// function `f` and returns the new value, marking it as seen.
597    ///
598    /// **Note**: Futures do nothing unless you `.await` or poll them.
599    pub async fn changed_and<F>(&mut self, mut f: F) -> T
600    where
601        F: Fn(&T) -> bool,
602    {
603        poll_fn(|cx| self.watch.poll_changed_and(&mut self.at_id, &mut f, cx)).await
604    }
605
606    /// Tries to get the new value of the watch which satisfies the predicate
607    /// function `f` and returns the new value without waiting, marking it as seen.
608    pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
609    where
610        F: Fn(&T) -> bool,
611    {
612        self.watch.try_changed_and(&mut self.at_id, &mut f)
613    }
614
615    /// Checks if the `Watch` contains a value. If this returns true,
616    /// then awaiting [`Rcv::get`] will return immediately.
617    pub fn contains_value(&self) -> bool {
618        self.watch.contains_value()
619    }
620}
621
622impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Drop for Rcv<'a, T, W> {
623    fn drop(&mut self) {
624        self.watch.drop_receiver();
625    }
626}
627
628/// A anonymous receiver can NOT `.await` a change in the `Watch` value.
629#[derive(Debug)]
630pub struct AnonRcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
631    watch: &'a W,
632    at_id: u64,
633    _phantom: PhantomData<T>,
634}
635
636impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> AnonRcv<'a, T, W> {
637    /// Creates a new `Receiver` with a reference to the `Watch`.
638    fn new(watch: &'a W, at_id: u64) -> Self {
639        Self {
640            watch,
641            at_id,
642            _phantom: PhantomData,
643        }
644    }
645
646    /// Tries to get the current value of the `Watch` without waiting, marking it as seen.
647    pub fn try_get(&mut self) -> Option<T> {
648        self.watch.try_get(Some(&mut self.at_id))
649    }
650
651    /// Tries to get the current value of the `Watch` if it matches the predicate
652    /// function `f` without waiting, marking it as seen.
653    pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T>
654    where
655        F: Fn(&T) -> bool,
656    {
657        self.watch.try_get_and(Some(&mut self.at_id), &mut f)
658    }
659
660    /// Tries to get the new value of the watch without waiting, marking it as seen.
661    pub fn try_changed(&mut self) -> Option<T> {
662        self.watch.try_changed(&mut self.at_id)
663    }
664
665    /// Tries to get the new value of the watch which satisfies the predicate
666    /// function `f` and returns the new value without waiting, marking it as seen.
667    pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
668    where
669        F: Fn(&T) -> bool,
670    {
671        self.watch.try_changed_and(&mut self.at_id, &mut f)
672    }
673
674    /// Checks if the `Watch` contains a value. If this returns true,
675    /// then awaiting [`Rcv::get`] will return immediately.
676    pub fn contains_value(&self) -> bool {
677        self.watch.contains_value()
678    }
679}
680
681/// A receiver of a `Watch` channel.
682pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>);
683
684impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> {
685    /// Converts the `Receiver` into a [`DynReceiver`].
686    pub fn as_dyn(self) -> DynReceiver<'a, T> {
687        let rcv = DynReceiver(Rcv::new(self.0.watch, self.at_id));
688        core::mem::forget(self); // Ensures the destructor is not called
689        rcv
690    }
691}
692
693impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynReceiver<'a, T>> for Receiver<'a, M, T, N> {
694    fn into(self) -> DynReceiver<'a, T> {
695        self.as_dyn()
696    }
697}
698
699impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> {
700    type Target = Rcv<'a, T, Watch<M, T, N>>;
701
702    fn deref(&self) -> &Self::Target {
703        &self.0
704    }
705}
706
707impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> {
708    fn deref_mut(&mut self) -> &mut Self::Target {
709        &mut self.0
710    }
711}
712
713/// A receiver which holds a **dynamic** reference to a `Watch` channel.
714///
715/// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of
716/// some runtime performance due to dynamic dispatch.
717pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior<T> + 'a>);
718
719impl<'a, T: Clone> Deref for DynReceiver<'a, T> {
720    type Target = Rcv<'a, T, dyn WatchBehavior<T> + 'a>;
721
722    fn deref(&self) -> &Self::Target {
723        &self.0
724    }
725}
726
727impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> {
728    fn deref_mut(&mut self) -> &mut Self::Target {
729        &mut self.0
730    }
731}
732
733/// A receiver of a `Watch` channel that cannot `.await` values.
734#[derive(Debug)]
735pub struct AnonReceiver<'a, M: RawMutex, T: Clone, const N: usize>(AnonRcv<'a, T, Watch<M, T, N>>);
736
737impl<'a, M: RawMutex, T: Clone, const N: usize> AnonReceiver<'a, M, T, N> {
738    /// Converts the `Receiver` into a [`DynReceiver`].
739    pub fn as_dyn(self) -> DynAnonReceiver<'a, T> {
740        let rcv = DynAnonReceiver(AnonRcv::new(self.0.watch, self.at_id));
741        core::mem::forget(self); // Ensures the destructor is not called
742        rcv
743    }
744}
745
746impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynAnonReceiver<'a, T>> for AnonReceiver<'a, M, T, N> {
747    fn into(self) -> DynAnonReceiver<'a, T> {
748        self.as_dyn()
749    }
750}
751
752impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for AnonReceiver<'a, M, T, N> {
753    type Target = AnonRcv<'a, T, Watch<M, T, N>>;
754
755    fn deref(&self) -> &Self::Target {
756        &self.0
757    }
758}
759
760impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for AnonReceiver<'a, M, T, N> {
761    fn deref_mut(&mut self) -> &mut Self::Target {
762        &mut self.0
763    }
764}
765
766/// A receiver that cannot `.await` value, which holds a **dynamic** reference to a `Watch` channel.
767///
768/// This is an alternative to [`AnonReceiver`] with a simpler type definition, at the expense of
769/// some runtime performance due to dynamic dispatch.
770pub struct DynAnonReceiver<'a, T: Clone>(AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>);
771
772impl<'a, T: Clone> Deref for DynAnonReceiver<'a, T> {
773    type Target = AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>;
774
775    fn deref(&self) -> &Self::Target {
776        &self.0
777    }
778}
779
780impl<'a, T: Clone> DerefMut for DynAnonReceiver<'a, T> {
781    fn deref_mut(&mut self) -> &mut Self::Target {
782        &mut self.0
783    }
784}
785
786#[cfg(test)]
787mod tests {
788    use futures_executor::block_on;
789
790    use super::Watch;
791    use crate::blocking_mutex::raw::CriticalSectionRawMutex;
792
793    #[test]
794    fn multiple_sends() {
795        let f = async {
796            static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
797
798            // Obtain receiver and sender
799            let mut rcv = WATCH.receiver().unwrap();
800            let snd = WATCH.sender();
801
802            // Not initialized
803            assert_eq!(rcv.try_changed(), None);
804
805            // Receive the new value
806            snd.send(10);
807            assert_eq!(rcv.changed().await, 10);
808
809            // Receive another value
810            snd.send(20);
811            assert_eq!(rcv.try_changed(), Some(20));
812
813            // No update
814            assert_eq!(rcv.try_changed(), None);
815        };
816        block_on(f);
817    }
818
819    #[test]
820    fn all_try_get() {
821        let f = async {
822            static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
823
824            // Obtain receiver and sender
825            let mut rcv = WATCH.receiver().unwrap();
826            let snd = WATCH.sender();
827
828            // Not initialized
829            assert_eq!(WATCH.try_get(), None);
830            assert_eq!(rcv.try_get(), None);
831            assert_eq!(snd.try_get(), None);
832
833            // Receive the new value
834            snd.send(10);
835            assert_eq!(WATCH.try_get(), Some(10));
836            assert_eq!(rcv.try_get(), Some(10));
837            assert_eq!(snd.try_get(), Some(10));
838
839            assert_eq!(WATCH.try_get_and(|x| x > &5), Some(10));
840            assert_eq!(rcv.try_get_and(|x| x > &5), Some(10));
841            assert_eq!(snd.try_get_and(|x| x > &5), Some(10));
842
843            assert_eq!(WATCH.try_get_and(|x| x < &5), None);
844            assert_eq!(rcv.try_get_and(|x| x < &5), None);
845            assert_eq!(snd.try_get_and(|x| x < &5), None);
846        };
847        block_on(f);
848    }
849
850    #[test]
851    fn once_lock_like() {
852        let f = async {
853            static CONFIG0: u8 = 10;
854            static CONFIG1: u8 = 20;
855
856            static WATCH: Watch<CriticalSectionRawMutex, &'static u8, 1> = Watch::new();
857
858            // Obtain receiver and sender
859            let mut rcv = WATCH.receiver().unwrap();
860            let snd = WATCH.sender();
861
862            // Not initialized
863            assert_eq!(rcv.try_changed(), None);
864
865            // Receive the new value
866            snd.send(&CONFIG0);
867            let rcv0 = rcv.changed().await;
868            assert_eq!(rcv0, &10);
869
870            // Receive another value
871            snd.send(&CONFIG1);
872            let rcv1 = rcv.try_changed();
873            assert_eq!(rcv1, Some(&20));
874
875            // No update
876            assert_eq!(rcv.try_changed(), None);
877
878            // Ensure similarity with original static
879            assert_eq!(rcv0, &CONFIG0);
880            assert_eq!(rcv1, Some(&CONFIG1));
881        };
882        block_on(f);
883    }
884
885    #[test]
886    fn sender_modify() {
887        let f = async {
888            static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
889
890            // Obtain receiver and sender
891            let mut rcv = WATCH.receiver().unwrap();
892            let snd = WATCH.sender();
893
894            // Receive the new value
895            snd.send(10);
896            assert_eq!(rcv.try_changed(), Some(10));
897
898            // Modify the value inplace
899            snd.send_modify(|opt| {
900                if let Some(inner) = opt {
901                    *inner += 5;
902                }
903            });
904
905            // Get the modified value
906            assert_eq!(rcv.try_changed(), Some(15));
907            assert_eq!(rcv.try_changed(), None);
908        };
909        block_on(f);
910    }
911
912    #[test]
913    fn predicate_fn() {
914        let f = async {
915            static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
916
917            // Obtain receiver and sender
918            let mut rcv = WATCH.receiver().unwrap();
919            let snd = WATCH.sender();
920
921            snd.send(15);
922            assert_eq!(rcv.try_get_and(|x| x > &5), Some(15));
923            assert_eq!(rcv.try_get_and(|x| x < &5), None);
924            assert!(rcv.try_changed().is_none());
925
926            snd.send(20);
927            assert_eq!(rcv.try_changed_and(|x| x > &5), Some(20));
928            assert_eq!(rcv.try_changed_and(|x| x > &5), None);
929
930            snd.send(25);
931            assert_eq!(rcv.try_changed_and(|x| x < &5), None);
932            assert_eq!(rcv.try_changed(), Some(25));
933
934            snd.send(30);
935            assert_eq!(rcv.changed_and(|x| x > &5).await, 30);
936            assert_eq!(rcv.get_and(|x| x > &5).await, 30);
937        };
938        block_on(f);
939    }
940
941    #[test]
942    fn receive_after_create() {
943        let f = async {
944            static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
945
946            // Obtain sender and send value
947            let snd = WATCH.sender();
948            snd.send(10);
949
950            // Obtain receiver and receive value
951            let mut rcv = WATCH.receiver().unwrap();
952            assert_eq!(rcv.try_changed(), Some(10));
953        };
954        block_on(f);
955    }
956
957    #[test]
958    fn max_receivers_drop() {
959        let f = async {
960            static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
961
962            // Try to create 3 receivers (only 2 can exist at once)
963            let rcv0 = WATCH.receiver();
964            let rcv1 = WATCH.receiver();
965            let rcv2 = WATCH.receiver();
966
967            // Ensure the first two are successful and the third is not
968            assert!(rcv0.is_some());
969            assert!(rcv1.is_some());
970            assert!(rcv2.is_none());
971
972            // Drop the first receiver
973            drop(rcv0);
974
975            // Create another receiver and ensure it is successful
976            let rcv3 = WATCH.receiver();
977            assert!(rcv3.is_some());
978        };
979        block_on(f);
980    }
981
982    #[test]
983    fn multiple_receivers() {
984        let f = async {
985            static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
986
987            // Obtain receivers and sender
988            let mut rcv0 = WATCH.receiver().unwrap();
989            let mut rcv1 = WATCH.anon_receiver();
990            let snd = WATCH.sender();
991
992            // No update for both
993            assert_eq!(rcv0.try_changed(), None);
994            assert_eq!(rcv1.try_changed(), None);
995
996            // Send a new value
997            snd.send(0);
998
999            // Both receivers receive the new value
1000            assert_eq!(rcv0.try_changed(), Some(0));
1001            assert_eq!(rcv1.try_changed(), Some(0));
1002        };
1003        block_on(f);
1004    }
1005
1006    #[test]
1007    fn clone_senders() {
1008        let f = async {
1009            // Obtain different ways to send
1010            static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
1011            let snd0 = WATCH.sender();
1012            let snd1 = snd0.clone();
1013
1014            // Obtain Receiver
1015            let mut rcv = WATCH.receiver().unwrap().as_dyn();
1016
1017            // Send a value from first sender
1018            snd0.send(10);
1019            assert_eq!(rcv.try_changed(), Some(10));
1020
1021            // Send a value from second sender
1022            snd1.send(20);
1023            assert_eq!(rcv.try_changed(), Some(20));
1024        };
1025        block_on(f);
1026    }
1027
1028    #[test]
1029    fn use_dynamics() {
1030        let f = async {
1031            static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1032
1033            // Obtain receiver and sender
1034            let mut anon_rcv = WATCH.dyn_anon_receiver();
1035            let mut dyn_rcv = WATCH.dyn_receiver().unwrap();
1036            let dyn_snd = WATCH.dyn_sender();
1037
1038            // Send a value
1039            dyn_snd.send(10);
1040
1041            // Ensure the dynamic receiver receives the value
1042            assert_eq!(anon_rcv.try_changed(), Some(10));
1043            assert_eq!(dyn_rcv.try_changed(), Some(10));
1044            assert_eq!(dyn_rcv.try_changed(), None);
1045        };
1046        block_on(f);
1047    }
1048
1049    #[test]
1050    fn convert_to_dyn() {
1051        let f = async {
1052            static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1053
1054            // Obtain receiver and sender
1055            let anon_rcv = WATCH.anon_receiver();
1056            let rcv = WATCH.receiver().unwrap();
1057            let snd = WATCH.sender();
1058
1059            // Convert to dynamic
1060            let mut dyn_anon_rcv = anon_rcv.as_dyn();
1061            let mut dyn_rcv = rcv.as_dyn();
1062            let dyn_snd = snd.as_dyn();
1063
1064            // Send a value
1065            dyn_snd.send(10);
1066
1067            // Ensure the dynamic receiver receives the value
1068            assert_eq!(dyn_anon_rcv.try_changed(), Some(10));
1069            assert_eq!(dyn_rcv.try_changed(), Some(10));
1070            assert_eq!(dyn_rcv.try_changed(), None);
1071        };
1072        block_on(f);
1073    }
1074
1075    #[test]
1076    fn dynamic_receiver_count() {
1077        let f = async {
1078            static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1079
1080            // Obtain receiver and sender
1081            let rcv0 = WATCH.receiver();
1082            let rcv1 = WATCH.receiver();
1083            let rcv2 = WATCH.receiver();
1084
1085            // Ensure the first two are successful and the third is not
1086            assert!(rcv0.is_some());
1087            assert!(rcv1.is_some());
1088            assert!(rcv2.is_none());
1089
1090            // Convert to dynamic
1091            let dyn_rcv0 = rcv0.unwrap().as_dyn();
1092
1093            // Drop the (now dynamic) receiver
1094            drop(dyn_rcv0);
1095
1096            // Create another receiver and ensure it is successful
1097            let rcv3 = WATCH.receiver();
1098            let rcv4 = WATCH.receiver();
1099            assert!(rcv3.is_some());
1100            assert!(rcv4.is_none());
1101        };
1102        block_on(f);
1103    }
1104
1105    #[test]
1106    fn contains_value() {
1107        let f = async {
1108            static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1109
1110            // Obtain receiver and sender
1111            let rcv = WATCH.receiver().unwrap();
1112            let snd = WATCH.sender();
1113
1114            // check if the watch contains a value
1115            assert_eq!(rcv.contains_value(), false);
1116            assert_eq!(snd.contains_value(), false);
1117
1118            // Send a value
1119            snd.send(10);
1120
1121            // check if the watch contains a value
1122            assert_eq!(rcv.contains_value(), true);
1123            assert_eq!(snd.contains_value(), true);
1124        };
1125        block_on(f);
1126    }
1127}