1use core::cell::RefCell;
4use core::future::{poll_fn, Future};
5use core::marker::PhantomData;
6use core::ops::{Deref, DerefMut};
7use core::task::{Context, Poll};
8
9use crate::blocking_mutex::raw::RawMutex;
10use crate::blocking_mutex::Mutex;
11use crate::waitqueue::MultiWakerRegistration;
12
13#[derive(Debug)]
69pub struct Watch<M: RawMutex, T: Clone, const N: usize> {
70 mutex: Mutex<M, RefCell<WatchState<T, N>>>,
71}
72
73#[derive(Debug)]
74struct WatchState<T: Clone, const N: usize> {
75 data: Option<T>,
76 current_id: u64,
77 wakers: MultiWakerRegistration<N>,
78 receiver_count: usize,
79}
80
81trait SealedWatchBehavior<T> {
82 fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
84
85 fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
88
89 fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
91
92 fn try_changed(&self, id: &mut u64) -> Option<T>;
94
95 fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
98
99 fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
102
103 fn drop_receiver(&self);
107
108 fn clear(&self);
110
111 fn send(&self, val: T);
113
114 fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>));
117
118 fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool);
121}
122
123#[allow(private_bounds)]
125pub trait WatchBehavior<T: Clone>: SealedWatchBehavior<T> {
126 fn try_get(&self, id: Option<&mut u64>) -> Option<T>;
128
129 fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
132
133 fn contains_value(&self) -> bool;
135}
136
137impl<M: RawMutex, T: Clone, const N: usize> SealedWatchBehavior<T> for Watch<M, T, N> {
138 fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
139 self.mutex.lock(|state| {
140 let mut s = state.borrow_mut();
141 match &s.data {
142 Some(data) => {
143 *id = s.current_id;
144 Poll::Ready(data.clone())
145 }
146 None => {
147 s.wakers.register(cx.waker());
148 Poll::Pending
149 }
150 }
151 })
152 }
153
154 fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
155 self.mutex.lock(|state| {
156 let mut s = state.borrow_mut();
157 match s.data {
158 Some(ref data) if f(data) => {
159 *id = s.current_id;
160 Poll::Ready(data.clone())
161 }
162 _ => {
163 s.wakers.register(cx.waker());
164 Poll::Pending
165 }
166 }
167 })
168 }
169
170 fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
171 self.mutex.lock(|state| {
172 let mut s = state.borrow_mut();
173 match (&s.data, s.current_id > *id) {
174 (Some(data), true) => {
175 *id = s.current_id;
176 Poll::Ready(data.clone())
177 }
178 _ => {
179 s.wakers.register(cx.waker());
180 Poll::Pending
181 }
182 }
183 })
184 }
185
186 fn try_changed(&self, id: &mut u64) -> Option<T> {
187 self.mutex.lock(|state| {
188 let s = state.borrow();
189 match s.current_id > *id {
190 true => {
191 *id = s.current_id;
192 s.data.clone()
193 }
194 false => None,
195 }
196 })
197 }
198
199 fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
200 self.mutex.lock(|state| {
201 let mut s = state.borrow_mut();
202 match (&s.data, s.current_id > *id) {
203 (Some(data), true) if f(data) => {
204 *id = s.current_id;
205 Poll::Ready(data.clone())
206 }
207 _ => {
208 s.wakers.register(cx.waker());
209 Poll::Pending
210 }
211 }
212 })
213 }
214
215 fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
216 self.mutex.lock(|state| {
217 let s = state.borrow();
218 match (&s.data, s.current_id > *id) {
219 (Some(data), true) if f(data) => {
220 *id = s.current_id;
221 s.data.clone()
222 }
223 _ => None,
224 }
225 })
226 }
227
228 fn drop_receiver(&self) {
229 self.mutex.lock(|state| {
230 let mut s = state.borrow_mut();
231 s.receiver_count -= 1;
232 })
233 }
234
235 fn clear(&self) {
236 self.mutex.lock(|state| {
237 let mut s = state.borrow_mut();
238 s.data = None;
239 })
240 }
241
242 fn send(&self, val: T) {
243 self.mutex.lock(|state| {
244 let mut s = state.borrow_mut();
245 s.data = Some(val);
246 s.current_id += 1;
247 s.wakers.wake();
248 })
249 }
250
251 fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>)) {
252 self.mutex.lock(|state| {
253 let mut s = state.borrow_mut();
254 f(&mut s.data);
255 s.current_id += 1;
256 s.wakers.wake();
257 })
258 }
259
260 fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool) {
261 self.mutex.lock(|state| {
262 let mut s = state.borrow_mut();
263 if f(&mut s.data) {
264 s.current_id += 1;
265 s.wakers.wake();
266 }
267 })
268 }
269}
270
271impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N> {
272 fn try_get(&self, id: Option<&mut u64>) -> Option<T> {
273 self.mutex.lock(|state| {
274 let s = state.borrow();
275 if let Some(id) = id {
276 *id = s.current_id;
277 }
278 s.data.clone()
279 })
280 }
281
282 fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
283 self.mutex.lock(|state| {
284 let s = state.borrow();
285 match s.data {
286 Some(ref data) if f(data) => {
287 if let Some(id) = id {
288 *id = s.current_id;
289 }
290 Some(data.clone())
291 }
292 _ => None,
293 }
294 })
295 }
296
297 fn contains_value(&self) -> bool {
298 self.mutex.lock(|state| state.borrow().data.is_some())
299 }
300}
301
302impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
303 pub const fn new() -> Self {
305 Self {
306 mutex: Mutex::new(RefCell::new(WatchState {
307 data: None,
308 current_id: 0,
309 wakers: MultiWakerRegistration::new(),
310 receiver_count: 0,
311 })),
312 }
313 }
314
315 pub const fn new_with(data: T) -> Self {
317 Self {
318 mutex: Mutex::new(RefCell::new(WatchState {
319 data: Some(data),
320 current_id: 0,
321 wakers: MultiWakerRegistration::new(),
322 receiver_count: 0,
323 })),
324 }
325 }
326
327 pub fn sender(&self) -> Sender<'_, M, T, N> {
329 Sender(Snd::new(self))
330 }
331
332 pub fn dyn_sender(&self) -> DynSender<'_, T> {
334 DynSender(Snd::new(self))
335 }
336
337 pub fn receiver(&self) -> Option<Receiver<'_, M, T, N>> {
340 self.mutex.lock(|state| {
341 let mut s = state.borrow_mut();
342 if s.receiver_count < N {
343 s.receiver_count += 1;
344 Some(Receiver(Rcv::new(self, 0)))
345 } else {
346 None
347 }
348 })
349 }
350
351 pub fn dyn_receiver(&self) -> Option<DynReceiver<'_, T>> {
354 self.mutex.lock(|state| {
355 let mut s = state.borrow_mut();
356 if s.receiver_count < N {
357 s.receiver_count += 1;
358 Some(DynReceiver(Rcv::new(self, 0)))
359 } else {
360 None
361 }
362 })
363 }
364
365 pub fn anon_receiver(&self) -> AnonReceiver<'_, M, T, N> {
367 AnonReceiver(AnonRcv::new(self, 0))
368 }
369
370 pub fn dyn_anon_receiver(&self) -> DynAnonReceiver<'_, T> {
372 DynAnonReceiver(AnonRcv::new(self, 0))
373 }
374
375 pub fn get_msg_id(&self) -> u64 {
379 self.mutex.lock(|state| state.borrow().current_id)
380 }
381
382 pub fn try_get(&self) -> Option<T> {
384 WatchBehavior::try_get(self, None)
385 }
386
387 pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
389 where
390 F: Fn(&T) -> bool,
391 {
392 WatchBehavior::try_get_and(self, None, &mut f)
393 }
394}
395
396#[derive(Debug)]
398pub struct Snd<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
399 watch: &'a W,
400 _phantom: PhantomData<T>,
401}
402
403impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Clone for Snd<'a, T, W> {
404 fn clone(&self) -> Self {
405 Self {
406 watch: self.watch,
407 _phantom: PhantomData,
408 }
409 }
410}
411
412impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> {
413 fn new(watch: &'a W) -> Self {
415 Self {
416 watch,
417 _phantom: PhantomData,
418 }
419 }
420
421 pub fn send(&self, val: T) {
423 self.watch.send(val)
424 }
425
426 pub fn clear(&self) {
429 self.watch.clear()
430 }
431
432 pub fn try_get(&self) -> Option<T> {
434 self.watch.try_get(None)
435 }
436
437 pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
440 where
441 F: Fn(&T) -> bool,
442 {
443 self.watch.try_get_and(None, &mut f)
444 }
445
446 pub fn contains_value(&self) -> bool {
448 self.watch.contains_value()
449 }
450
451 pub fn send_modify<F>(&self, mut f: F)
453 where
454 F: Fn(&mut Option<T>),
455 {
456 self.watch.send_modify(&mut f)
457 }
458
459 pub fn send_if_modified<F>(&self, mut f: F)
462 where
463 F: Fn(&mut Option<T>) -> bool,
464 {
465 self.watch.send_if_modified(&mut f)
466 }
467}
468
469#[derive(Debug)]
474pub struct Sender<'a, M: RawMutex, T: Clone, const N: usize>(Snd<'a, T, Watch<M, T, N>>);
475
476impl<'a, M: RawMutex, T: Clone, const N: usize> Clone for Sender<'a, M, T, N> {
477 fn clone(&self) -> Self {
478 Self(self.0.clone())
479 }
480}
481
482impl<'a, M: RawMutex, T: Clone, const N: usize> Sender<'a, M, T, N> {
483 pub fn as_dyn(self) -> DynSender<'a, T> {
485 DynSender(Snd::new(self.watch))
486 }
487}
488
489impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynSender<'a, T>> for Sender<'a, M, T, N> {
490 fn into(self) -> DynSender<'a, T> {
491 self.as_dyn()
492 }
493}
494
495impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Sender<'a, M, T, N> {
496 type Target = Snd<'a, T, Watch<M, T, N>>;
497
498 fn deref(&self) -> &Self::Target {
499 &self.0
500 }
501}
502
503impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Sender<'a, M, T, N> {
504 fn deref_mut(&mut self) -> &mut Self::Target {
505 &mut self.0
506 }
507}
508
509pub struct DynSender<'a, T: Clone>(Snd<'a, T, dyn WatchBehavior<T> + 'a>);
513
514impl<'a, T: Clone> Clone for DynSender<'a, T> {
515 fn clone(&self) -> Self {
516 Self(self.0.clone())
517 }
518}
519
520impl<'a, T: Clone> Deref for DynSender<'a, T> {
521 type Target = Snd<'a, T, dyn WatchBehavior<T> + 'a>;
522
523 fn deref(&self) -> &Self::Target {
524 &self.0
525 }
526}
527
528impl<'a, T: Clone> DerefMut for DynSender<'a, T> {
529 fn deref_mut(&mut self) -> &mut Self::Target {
530 &mut self.0
531 }
532}
533
534pub struct Rcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
536 watch: &'a W,
537 at_id: u64,
538 _phantom: PhantomData<T>,
539}
540
541impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
542 fn new(watch: &'a W, at_id: u64) -> Self {
544 Self {
545 watch,
546 at_id,
547 _phantom: PhantomData,
548 }
549 }
550
551 pub fn get(&mut self) -> impl Future<Output = T> + '_ {
555 poll_fn(|cx| self.watch.poll_get(&mut self.at_id, cx))
556 }
557
558 pub fn try_get(&mut self) -> Option<T> {
560 self.watch.try_get(Some(&mut self.at_id))
561 }
562
563 pub async fn get_and<F>(&mut self, mut f: F) -> T
568 where
569 F: Fn(&T) -> bool,
570 {
571 poll_fn(|cx| self.watch.poll_get_and(&mut self.at_id, &mut f, cx)).await
572 }
573
574 pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T>
577 where
578 F: Fn(&T) -> bool,
579 {
580 self.watch.try_get_and(Some(&mut self.at_id), &mut f)
581 }
582
583 pub async fn changed(&mut self) -> T {
587 poll_fn(|cx| self.watch.poll_changed(&mut self.at_id, cx)).await
588 }
589
590 pub fn try_changed(&mut self) -> Option<T> {
592 self.watch.try_changed(&mut self.at_id)
593 }
594
595 pub async fn changed_and<F>(&mut self, mut f: F) -> T
600 where
601 F: Fn(&T) -> bool,
602 {
603 poll_fn(|cx| self.watch.poll_changed_and(&mut self.at_id, &mut f, cx)).await
604 }
605
606 pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
609 where
610 F: Fn(&T) -> bool,
611 {
612 self.watch.try_changed_and(&mut self.at_id, &mut f)
613 }
614
615 pub fn contains_value(&self) -> bool {
618 self.watch.contains_value()
619 }
620}
621
622impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Drop for Rcv<'a, T, W> {
623 fn drop(&mut self) {
624 self.watch.drop_receiver();
625 }
626}
627
628#[derive(Debug)]
630pub struct AnonRcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
631 watch: &'a W,
632 at_id: u64,
633 _phantom: PhantomData<T>,
634}
635
636impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> AnonRcv<'a, T, W> {
637 fn new(watch: &'a W, at_id: u64) -> Self {
639 Self {
640 watch,
641 at_id,
642 _phantom: PhantomData,
643 }
644 }
645
646 pub fn try_get(&mut self) -> Option<T> {
648 self.watch.try_get(Some(&mut self.at_id))
649 }
650
651 pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T>
654 where
655 F: Fn(&T) -> bool,
656 {
657 self.watch.try_get_and(Some(&mut self.at_id), &mut f)
658 }
659
660 pub fn try_changed(&mut self) -> Option<T> {
662 self.watch.try_changed(&mut self.at_id)
663 }
664
665 pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
668 where
669 F: Fn(&T) -> bool,
670 {
671 self.watch.try_changed_and(&mut self.at_id, &mut f)
672 }
673
674 pub fn contains_value(&self) -> bool {
677 self.watch.contains_value()
678 }
679}
680
681pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>);
683
684impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> {
685 pub fn as_dyn(self) -> DynReceiver<'a, T> {
687 let rcv = DynReceiver(Rcv::new(self.0.watch, self.at_id));
688 core::mem::forget(self); rcv
690 }
691}
692
693impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynReceiver<'a, T>> for Receiver<'a, M, T, N> {
694 fn into(self) -> DynReceiver<'a, T> {
695 self.as_dyn()
696 }
697}
698
699impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> {
700 type Target = Rcv<'a, T, Watch<M, T, N>>;
701
702 fn deref(&self) -> &Self::Target {
703 &self.0
704 }
705}
706
707impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> {
708 fn deref_mut(&mut self) -> &mut Self::Target {
709 &mut self.0
710 }
711}
712
713pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior<T> + 'a>);
718
719impl<'a, T: Clone> Deref for DynReceiver<'a, T> {
720 type Target = Rcv<'a, T, dyn WatchBehavior<T> + 'a>;
721
722 fn deref(&self) -> &Self::Target {
723 &self.0
724 }
725}
726
727impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> {
728 fn deref_mut(&mut self) -> &mut Self::Target {
729 &mut self.0
730 }
731}
732
733#[derive(Debug)]
735pub struct AnonReceiver<'a, M: RawMutex, T: Clone, const N: usize>(AnonRcv<'a, T, Watch<M, T, N>>);
736
737impl<'a, M: RawMutex, T: Clone, const N: usize> AnonReceiver<'a, M, T, N> {
738 pub fn as_dyn(self) -> DynAnonReceiver<'a, T> {
740 let rcv = DynAnonReceiver(AnonRcv::new(self.0.watch, self.at_id));
741 core::mem::forget(self); rcv
743 }
744}
745
746impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynAnonReceiver<'a, T>> for AnonReceiver<'a, M, T, N> {
747 fn into(self) -> DynAnonReceiver<'a, T> {
748 self.as_dyn()
749 }
750}
751
752impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for AnonReceiver<'a, M, T, N> {
753 type Target = AnonRcv<'a, T, Watch<M, T, N>>;
754
755 fn deref(&self) -> &Self::Target {
756 &self.0
757 }
758}
759
760impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for AnonReceiver<'a, M, T, N> {
761 fn deref_mut(&mut self) -> &mut Self::Target {
762 &mut self.0
763 }
764}
765
766pub struct DynAnonReceiver<'a, T: Clone>(AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>);
771
772impl<'a, T: Clone> Deref for DynAnonReceiver<'a, T> {
773 type Target = AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>;
774
775 fn deref(&self) -> &Self::Target {
776 &self.0
777 }
778}
779
780impl<'a, T: Clone> DerefMut for DynAnonReceiver<'a, T> {
781 fn deref_mut(&mut self) -> &mut Self::Target {
782 &mut self.0
783 }
784}
785
786#[cfg(test)]
787mod tests {
788 use futures_executor::block_on;
789
790 use super::Watch;
791 use crate::blocking_mutex::raw::CriticalSectionRawMutex;
792
793 #[test]
794 fn multiple_sends() {
795 let f = async {
796 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
797
798 let mut rcv = WATCH.receiver().unwrap();
800 let snd = WATCH.sender();
801
802 assert_eq!(rcv.try_changed(), None);
804
805 snd.send(10);
807 assert_eq!(rcv.changed().await, 10);
808
809 snd.send(20);
811 assert_eq!(rcv.try_changed(), Some(20));
812
813 assert_eq!(rcv.try_changed(), None);
815 };
816 block_on(f);
817 }
818
819 #[test]
820 fn all_try_get() {
821 let f = async {
822 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
823
824 let mut rcv = WATCH.receiver().unwrap();
826 let snd = WATCH.sender();
827
828 assert_eq!(WATCH.try_get(), None);
830 assert_eq!(rcv.try_get(), None);
831 assert_eq!(snd.try_get(), None);
832
833 snd.send(10);
835 assert_eq!(WATCH.try_get(), Some(10));
836 assert_eq!(rcv.try_get(), Some(10));
837 assert_eq!(snd.try_get(), Some(10));
838
839 assert_eq!(WATCH.try_get_and(|x| x > &5), Some(10));
840 assert_eq!(rcv.try_get_and(|x| x > &5), Some(10));
841 assert_eq!(snd.try_get_and(|x| x > &5), Some(10));
842
843 assert_eq!(WATCH.try_get_and(|x| x < &5), None);
844 assert_eq!(rcv.try_get_and(|x| x < &5), None);
845 assert_eq!(snd.try_get_and(|x| x < &5), None);
846 };
847 block_on(f);
848 }
849
850 #[test]
851 fn once_lock_like() {
852 let f = async {
853 static CONFIG0: u8 = 10;
854 static CONFIG1: u8 = 20;
855
856 static WATCH: Watch<CriticalSectionRawMutex, &'static u8, 1> = Watch::new();
857
858 let mut rcv = WATCH.receiver().unwrap();
860 let snd = WATCH.sender();
861
862 assert_eq!(rcv.try_changed(), None);
864
865 snd.send(&CONFIG0);
867 let rcv0 = rcv.changed().await;
868 assert_eq!(rcv0, &10);
869
870 snd.send(&CONFIG1);
872 let rcv1 = rcv.try_changed();
873 assert_eq!(rcv1, Some(&20));
874
875 assert_eq!(rcv.try_changed(), None);
877
878 assert_eq!(rcv0, &CONFIG0);
880 assert_eq!(rcv1, Some(&CONFIG1));
881 };
882 block_on(f);
883 }
884
885 #[test]
886 fn sender_modify() {
887 let f = async {
888 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
889
890 let mut rcv = WATCH.receiver().unwrap();
892 let snd = WATCH.sender();
893
894 snd.send(10);
896 assert_eq!(rcv.try_changed(), Some(10));
897
898 snd.send_modify(|opt| {
900 if let Some(inner) = opt {
901 *inner += 5;
902 }
903 });
904
905 assert_eq!(rcv.try_changed(), Some(15));
907 assert_eq!(rcv.try_changed(), None);
908 };
909 block_on(f);
910 }
911
912 #[test]
913 fn predicate_fn() {
914 let f = async {
915 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
916
917 let mut rcv = WATCH.receiver().unwrap();
919 let snd = WATCH.sender();
920
921 snd.send(15);
922 assert_eq!(rcv.try_get_and(|x| x > &5), Some(15));
923 assert_eq!(rcv.try_get_and(|x| x < &5), None);
924 assert!(rcv.try_changed().is_none());
925
926 snd.send(20);
927 assert_eq!(rcv.try_changed_and(|x| x > &5), Some(20));
928 assert_eq!(rcv.try_changed_and(|x| x > &5), None);
929
930 snd.send(25);
931 assert_eq!(rcv.try_changed_and(|x| x < &5), None);
932 assert_eq!(rcv.try_changed(), Some(25));
933
934 snd.send(30);
935 assert_eq!(rcv.changed_and(|x| x > &5).await, 30);
936 assert_eq!(rcv.get_and(|x| x > &5).await, 30);
937 };
938 block_on(f);
939 }
940
941 #[test]
942 fn receive_after_create() {
943 let f = async {
944 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
945
946 let snd = WATCH.sender();
948 snd.send(10);
949
950 let mut rcv = WATCH.receiver().unwrap();
952 assert_eq!(rcv.try_changed(), Some(10));
953 };
954 block_on(f);
955 }
956
957 #[test]
958 fn max_receivers_drop() {
959 let f = async {
960 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
961
962 let rcv0 = WATCH.receiver();
964 let rcv1 = WATCH.receiver();
965 let rcv2 = WATCH.receiver();
966
967 assert!(rcv0.is_some());
969 assert!(rcv1.is_some());
970 assert!(rcv2.is_none());
971
972 drop(rcv0);
974
975 let rcv3 = WATCH.receiver();
977 assert!(rcv3.is_some());
978 };
979 block_on(f);
980 }
981
982 #[test]
983 fn multiple_receivers() {
984 let f = async {
985 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
986
987 let mut rcv0 = WATCH.receiver().unwrap();
989 let mut rcv1 = WATCH.anon_receiver();
990 let snd = WATCH.sender();
991
992 assert_eq!(rcv0.try_changed(), None);
994 assert_eq!(rcv1.try_changed(), None);
995
996 snd.send(0);
998
999 assert_eq!(rcv0.try_changed(), Some(0));
1001 assert_eq!(rcv1.try_changed(), Some(0));
1002 };
1003 block_on(f);
1004 }
1005
1006 #[test]
1007 fn clone_senders() {
1008 let f = async {
1009 static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
1011 let snd0 = WATCH.sender();
1012 let snd1 = snd0.clone();
1013
1014 let mut rcv = WATCH.receiver().unwrap().as_dyn();
1016
1017 snd0.send(10);
1019 assert_eq!(rcv.try_changed(), Some(10));
1020
1021 snd1.send(20);
1023 assert_eq!(rcv.try_changed(), Some(20));
1024 };
1025 block_on(f);
1026 }
1027
1028 #[test]
1029 fn use_dynamics() {
1030 let f = async {
1031 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1032
1033 let mut anon_rcv = WATCH.dyn_anon_receiver();
1035 let mut dyn_rcv = WATCH.dyn_receiver().unwrap();
1036 let dyn_snd = WATCH.dyn_sender();
1037
1038 dyn_snd.send(10);
1040
1041 assert_eq!(anon_rcv.try_changed(), Some(10));
1043 assert_eq!(dyn_rcv.try_changed(), Some(10));
1044 assert_eq!(dyn_rcv.try_changed(), None);
1045 };
1046 block_on(f);
1047 }
1048
1049 #[test]
1050 fn convert_to_dyn() {
1051 let f = async {
1052 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1053
1054 let anon_rcv = WATCH.anon_receiver();
1056 let rcv = WATCH.receiver().unwrap();
1057 let snd = WATCH.sender();
1058
1059 let mut dyn_anon_rcv = anon_rcv.as_dyn();
1061 let mut dyn_rcv = rcv.as_dyn();
1062 let dyn_snd = snd.as_dyn();
1063
1064 dyn_snd.send(10);
1066
1067 assert_eq!(dyn_anon_rcv.try_changed(), Some(10));
1069 assert_eq!(dyn_rcv.try_changed(), Some(10));
1070 assert_eq!(dyn_rcv.try_changed(), None);
1071 };
1072 block_on(f);
1073 }
1074
1075 #[test]
1076 fn dynamic_receiver_count() {
1077 let f = async {
1078 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1079
1080 let rcv0 = WATCH.receiver();
1082 let rcv1 = WATCH.receiver();
1083 let rcv2 = WATCH.receiver();
1084
1085 assert!(rcv0.is_some());
1087 assert!(rcv1.is_some());
1088 assert!(rcv2.is_none());
1089
1090 let dyn_rcv0 = rcv0.unwrap().as_dyn();
1092
1093 drop(dyn_rcv0);
1095
1096 let rcv3 = WATCH.receiver();
1098 let rcv4 = WATCH.receiver();
1099 assert!(rcv3.is_some());
1100 assert!(rcv4.is_none());
1101 };
1102 block_on(f);
1103 }
1104
1105 #[test]
1106 fn contains_value() {
1107 let f = async {
1108 static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
1109
1110 let rcv = WATCH.receiver().unwrap();
1112 let snd = WATCH.sender();
1113
1114 assert_eq!(rcv.contains_value(), false);
1116 assert_eq!(snd.contains_value(), false);
1117
1118 snd.send(10);
1120
1121 assert_eq!(rcv.contains_value(), true);
1123 assert_eq!(snd.contains_value(), true);
1124 };
1125 block_on(f);
1126 }
1127}