1use core::cell::RefCell;
47use core::future::Future;
48use core::pin::Pin;
49use core::task::{Context, Poll};
50
51use heapless::Deque;
52
53use crate::blocking_mutex::raw::RawMutex;
54use crate::blocking_mutex::Mutex;
55use crate::waitqueue::WakerRegistration;
56
57#[derive(Debug)]
59pub struct Sender<'ch, M, T, const N: usize>
60where
61 M: RawMutex,
62{
63 channel: &'ch Channel<M, T, N>,
64}
65
66impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
67where
68 M: RawMutex,
69{
70 fn clone(&self) -> Self {
71 *self
72 }
73}
74
75impl<'ch, M, T, const N: usize> Copy for Sender<'ch, M, T, N> where M: RawMutex {}
76
77impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
78where
79 M: RawMutex,
80{
81 pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
85 self.channel.send(message)
86 }
87
88 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
92 self.channel.try_send(message)
93 }
94
95 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
99 self.channel.poll_ready_to_send(cx)
100 }
101
102 pub const fn capacity(&self) -> usize {
106 self.channel.capacity()
107 }
108
109 pub fn free_capacity(&self) -> usize {
113 self.channel.free_capacity()
114 }
115
116 pub fn clear(&self) {
120 self.channel.clear();
121 }
122
123 pub fn len(&self) -> usize {
127 self.channel.len()
128 }
129
130 pub fn is_empty(&self) -> bool {
134 self.channel.is_empty()
135 }
136
137 pub fn is_full(&self) -> bool {
141 self.channel.is_full()
142 }
143}
144
145pub struct DynamicSender<'ch, T> {
147 pub(crate) channel: &'ch dyn DynamicChannel<T>,
148}
149
150impl<'ch, T> Clone for DynamicSender<'ch, T> {
151 fn clone(&self) -> Self {
152 *self
153 }
154}
155
156impl<'ch, T> Copy for DynamicSender<'ch, T> {}
157
158impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T>
159where
160 M: RawMutex,
161{
162 fn from(s: Sender<'ch, M, T, N>) -> Self {
163 Self { channel: s.channel }
164 }
165}
166
167impl<'ch, T> DynamicSender<'ch, T> {
168 pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
172 DynamicSendFuture {
173 channel: self.channel,
174 message: Some(message),
175 }
176 }
177
178 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
182 self.channel.try_send_with_context(message, None)
183 }
184
185 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
189 self.channel.poll_ready_to_send(cx)
190 }
191}
192
193pub struct SendDynamicSender<'ch, T> {
196 pub(crate) channel: &'ch dyn DynamicChannel<T>,
197}
198
199impl<'ch, T> Clone for SendDynamicSender<'ch, T> {
200 fn clone(&self) -> Self {
201 *self
202 }
203}
204
205impl<'ch, T> Copy for SendDynamicSender<'ch, T> {}
206unsafe impl<'ch, T: Send> Send for SendDynamicSender<'ch, T> {}
207unsafe impl<'ch, T: Send> Sync for SendDynamicSender<'ch, T> {}
208
209impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for SendDynamicSender<'ch, T>
210where
211 M: RawMutex + Sync + Send,
212{
213 fn from(s: Sender<'ch, M, T, N>) -> Self {
214 Self { channel: s.channel }
215 }
216}
217
218impl<'ch, T> SendDynamicSender<'ch, T> {
219 pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
223 DynamicSendFuture {
224 channel: self.channel,
225 message: Some(message),
226 }
227 }
228
229 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
233 self.channel.try_send_with_context(message, None)
234 }
235
236 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
240 self.channel.poll_ready_to_send(cx)
241 }
242}
243
244#[derive(Debug)]
246pub struct Receiver<'ch, M, T, const N: usize>
247where
248 M: RawMutex,
249{
250 channel: &'ch Channel<M, T, N>,
251}
252
253impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N>
254where
255 M: RawMutex,
256{
257 fn clone(&self) -> Self {
258 *self
259 }
260}
261
262impl<'ch, M, T, const N: usize> Copy for Receiver<'ch, M, T, N> where M: RawMutex {}
263
264impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
265where
266 M: RawMutex,
267{
268 pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
272 self.channel.receive()
273 }
274
275 pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
279 self.channel.ready_to_receive()
280 }
281
282 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
286 self.channel.try_receive()
287 }
288
289 pub fn try_peek(&self) -> Result<T, TryReceiveError>
293 where
294 T: Clone,
295 {
296 self.channel.try_peek()
297 }
298
299 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
303 self.channel.poll_ready_to_receive(cx)
304 }
305
306 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
310 self.channel.poll_receive(cx)
311 }
312
313 pub const fn capacity(&self) -> usize {
317 self.channel.capacity()
318 }
319
320 pub fn free_capacity(&self) -> usize {
324 self.channel.free_capacity()
325 }
326
327 pub fn clear(&self) {
331 self.channel.clear();
332 }
333
334 pub fn len(&self) -> usize {
338 self.channel.len()
339 }
340
341 pub fn is_empty(&self) -> bool {
345 self.channel.is_empty()
346 }
347
348 pub fn is_full(&self) -> bool {
352 self.channel.is_full()
353 }
354}
355
356pub struct DynamicReceiver<'ch, T> {
358 pub(crate) channel: &'ch dyn DynamicChannel<T>,
359}
360
361impl<'ch, T> Clone for DynamicReceiver<'ch, T> {
362 fn clone(&self) -> Self {
363 *self
364 }
365}
366
367impl<'ch, T> Copy for DynamicReceiver<'ch, T> {}
368
369impl<'ch, T> DynamicReceiver<'ch, T> {
370 pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
374 DynamicReceiveFuture { channel: self.channel }
375 }
376
377 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
381 self.channel.try_receive_with_context(None)
382 }
383
384 pub fn try_peek(&self) -> Result<T, TryReceiveError>
388 where
389 T: Clone,
390 {
391 self.channel.try_peek_with_context(None)
392 }
393
394 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
398 self.channel.poll_ready_to_receive(cx)
399 }
400
401 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
405 self.channel.poll_receive(cx)
406 }
407}
408
409impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T>
410where
411 M: RawMutex,
412{
413 fn from(s: Receiver<'ch, M, T, N>) -> Self {
414 Self { channel: s.channel }
415 }
416}
417
418pub struct SendDynamicReceiver<'ch, T> {
421 pub(crate) channel: &'ch dyn DynamicChannel<T>,
422}
423
424#[deprecated(since = "0.7.1", note = "please use `SendDynamicReceiver` instead")]
427pub type SendableDynamicReceiver<'ch, T> = SendDynamicReceiver<'ch, T>;
428
429impl<'ch, T> Clone for SendDynamicReceiver<'ch, T> {
430 fn clone(&self) -> Self {
431 *self
432 }
433}
434
435impl<'ch, T> Copy for SendDynamicReceiver<'ch, T> {}
436unsafe impl<'ch, T: Send> Send for SendDynamicReceiver<'ch, T> {}
437unsafe impl<'ch, T: Send> Sync for SendDynamicReceiver<'ch, T> {}
438
439impl<'ch, T> SendDynamicReceiver<'ch, T> {
440 pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
444 DynamicReceiveFuture { channel: self.channel }
445 }
446
447 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
451 self.channel.try_receive_with_context(None)
452 }
453
454 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
458 self.channel.poll_ready_to_receive(cx)
459 }
460
461 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
465 self.channel.poll_receive(cx)
466 }
467}
468
469impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for SendDynamicReceiver<'ch, T>
470where
471 M: RawMutex + Sync + Send,
472{
473 fn from(s: Receiver<'ch, M, T, N>) -> Self {
474 Self { channel: s.channel }
475 }
476}
477
478impl<'ch, M, T, const N: usize> futures_core::Stream for Receiver<'ch, M, T, N>
479where
480 M: RawMutex,
481{
482 type Item = T;
483
484 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
485 self.channel.poll_receive(cx).map(Some)
486 }
487}
488
489#[must_use = "futures do nothing unless you `.await` or poll them"]
491#[derive(Debug)]
492pub struct ReceiveFuture<'ch, M, T, const N: usize>
493where
494 M: RawMutex,
495{
496 channel: &'ch Channel<M, T, N>,
497}
498
499impl<'ch, M, T, const N: usize> Future for ReceiveFuture<'ch, M, T, N>
500where
501 M: RawMutex,
502{
503 type Output = T;
504
505 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
506 self.channel.poll_receive(cx)
507 }
508}
509
510#[must_use = "futures do nothing unless you `.await` or poll them"]
512#[derive(Debug)]
513pub struct ReceiveReadyFuture<'ch, M, T, const N: usize>
514where
515 M: RawMutex,
516{
517 channel: &'ch Channel<M, T, N>,
518}
519
520impl<'ch, M, T, const N: usize> Future for ReceiveReadyFuture<'ch, M, T, N>
521where
522 M: RawMutex,
523{
524 type Output = ();
525
526 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
527 self.channel.poll_ready_to_receive(cx)
528 }
529}
530
531#[must_use = "futures do nothing unless you `.await` or poll them"]
533pub struct DynamicReceiveFuture<'ch, T> {
534 channel: &'ch dyn DynamicChannel<T>,
535}
536
537impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> {
538 type Output = T;
539
540 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
541 match self.channel.try_receive_with_context(Some(cx)) {
542 Ok(v) => Poll::Ready(v),
543 Err(TryReceiveError::Empty) => Poll::Pending,
544 }
545 }
546}
547
548impl<'ch, M: RawMutex, T, const N: usize> From<ReceiveFuture<'ch, M, T, N>> for DynamicReceiveFuture<'ch, T> {
549 fn from(value: ReceiveFuture<'ch, M, T, N>) -> Self {
550 Self { channel: value.channel }
551 }
552}
553
554#[must_use = "futures do nothing unless you `.await` or poll them"]
556#[derive(Debug)]
557pub struct SendFuture<'ch, M, T, const N: usize>
558where
559 M: RawMutex,
560{
561 channel: &'ch Channel<M, T, N>,
562 message: Option<T>,
563}
564
565impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
566where
567 M: RawMutex,
568{
569 type Output = ();
570
571 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
572 match self.message.take() {
573 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
574 Ok(..) => Poll::Ready(()),
575 Err(TrySendError::Full(m)) => {
576 self.message = Some(m);
577 Poll::Pending
578 }
579 },
580 None => panic!("Message cannot be None"),
581 }
582 }
583}
584
585impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
586
587#[must_use = "futures do nothing unless you `.await` or poll them"]
589pub struct DynamicSendFuture<'ch, T> {
590 channel: &'ch dyn DynamicChannel<T>,
591 message: Option<T>,
592}
593
594impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
595 type Output = ();
596
597 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
598 match self.message.take() {
599 Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
600 Ok(..) => Poll::Ready(()),
601 Err(TrySendError::Full(m)) => {
602 self.message = Some(m);
603 Poll::Pending
604 }
605 },
606 None => panic!("Message cannot be None"),
607 }
608 }
609}
610
611impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
612
613impl<'ch, M: RawMutex, T, const N: usize> From<SendFuture<'ch, M, T, N>> for DynamicSendFuture<'ch, T> {
614 fn from(value: SendFuture<'ch, M, T, N>) -> Self {
615 Self {
616 channel: value.channel,
617 message: value.message,
618 }
619 }
620}
621
622pub(crate) trait DynamicChannel<T> {
623 fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>;
624
625 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>;
626
627 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
628 where
629 T: Clone;
630
631 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>;
632 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>;
633
634 fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T>;
635}
636
637#[derive(PartialEq, Eq, Clone, Copy, Debug)]
639#[cfg_attr(feature = "defmt", derive(defmt::Format))]
640pub enum TryReceiveError {
641 Empty,
643}
644
645#[derive(PartialEq, Eq, Clone, Copy, Debug)]
647#[cfg_attr(feature = "defmt", derive(defmt::Format))]
648pub enum TrySendError<T> {
649 Full(T),
652}
653
654#[derive(Debug)]
655struct ChannelState<T, const N: usize> {
656 queue: Deque<T, N>,
657 receiver_waker: WakerRegistration,
658 senders_waker: WakerRegistration,
659}
660
661impl<T, const N: usize> ChannelState<T, N> {
662 const fn new() -> Self {
663 ChannelState {
664 queue: Deque::new(),
665 receiver_waker: WakerRegistration::new(),
666 senders_waker: WakerRegistration::new(),
667 }
668 }
669
670 fn try_receive(&mut self) -> Result<T, TryReceiveError> {
671 self.try_receive_with_context(None)
672 }
673
674 fn try_peek(&mut self) -> Result<T, TryReceiveError>
675 where
676 T: Clone,
677 {
678 self.try_peek_with_context(None)
679 }
680
681 fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
682 where
683 T: Clone,
684 {
685 if self.queue.is_full() {
686 self.senders_waker.wake();
687 }
688
689 if let Some(message) = self.queue.front() {
690 Ok(message.clone())
691 } else {
692 if let Some(cx) = cx {
693 self.receiver_waker.register(cx.waker());
694 }
695 Err(TryReceiveError::Empty)
696 }
697 }
698
699 fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
700 if self.queue.is_full() {
701 self.senders_waker.wake();
702 }
703
704 if let Some(message) = self.queue.pop_front() {
705 Ok(message)
706 } else {
707 if let Some(cx) = cx {
708 self.receiver_waker.register(cx.waker());
709 }
710 Err(TryReceiveError::Empty)
711 }
712 }
713
714 fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> {
715 if self.queue.is_full() {
716 self.senders_waker.wake();
717 }
718
719 if let Some(message) = self.queue.pop_front() {
720 Poll::Ready(message)
721 } else {
722 self.receiver_waker.register(cx.waker());
723 Poll::Pending
724 }
725 }
726
727 fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> {
728 self.receiver_waker.register(cx.waker());
729
730 if !self.queue.is_empty() {
731 Poll::Ready(())
732 } else {
733 Poll::Pending
734 }
735 }
736
737 fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
738 self.try_send_with_context(message, None)
739 }
740
741 fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
742 match self.queue.push_back(message) {
743 Ok(()) => {
744 self.receiver_waker.wake();
745 Ok(())
746 }
747 Err(message) => {
748 if let Some(cx) = cx {
749 self.senders_waker.register(cx.waker());
750 }
751 Err(TrySendError::Full(message))
752 }
753 }
754 }
755
756 fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> {
757 self.senders_waker.register(cx.waker());
758
759 if !self.queue.is_full() {
760 Poll::Ready(())
761 } else {
762 Poll::Pending
763 }
764 }
765
766 fn clear(&mut self) {
767 if self.queue.is_full() {
768 self.senders_waker.wake();
769 }
770 self.queue.clear();
771 }
772
773 fn len(&self) -> usize {
774 self.queue.len()
775 }
776
777 fn is_empty(&self) -> bool {
778 self.queue.is_empty()
779 }
780
781 fn is_full(&self) -> bool {
782 self.queue.is_full()
783 }
784}
785
786#[derive(Debug)]
795pub struct Channel<M, T, const N: usize>
796where
797 M: RawMutex,
798{
799 inner: Mutex<M, RefCell<ChannelState<T, N>>>,
800}
801
802impl<M, T, const N: usize> Channel<M, T, N>
803where
804 M: RawMutex,
805{
806 pub const fn new() -> Self {
816 Self {
817 inner: Mutex::new(RefCell::new(ChannelState::new())),
818 }
819 }
820
821 fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
822 self.inner.lock(|rc| f(&mut *unwrap!(rc.try_borrow_mut())))
823 }
824
825 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
826 self.lock(|c| c.try_receive_with_context(cx))
827 }
828
829 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
830 where
831 T: Clone,
832 {
833 self.lock(|c| c.try_peek_with_context(cx))
834 }
835
836 pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
838 self.lock(|c| c.poll_receive(cx))
839 }
840
841 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
842 self.lock(|c| c.try_send_with_context(m, cx))
843 }
844
845 pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
847 self.lock(|c| c.poll_ready_to_receive(cx))
848 }
849
850 pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
852 self.lock(|c| c.poll_ready_to_send(cx))
853 }
854
855 pub fn sender(&self) -> Sender<'_, M, T, N> {
857 Sender { channel: self }
858 }
859
860 pub fn receiver(&self) -> Receiver<'_, M, T, N> {
862 Receiver { channel: self }
863 }
864
865 pub fn dyn_sender(&self) -> DynamicSender<'_, T> {
867 DynamicSender { channel: self }
868 }
869
870 pub fn dyn_receiver(&self) -> DynamicReceiver<'_, T> {
872 DynamicReceiver { channel: self }
873 }
874
875 pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> {
880 SendFuture {
881 channel: self,
882 message: Some(message),
883 }
884 }
885
886 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
897 self.lock(|c| c.try_send(message))
898 }
899
900 pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
905 ReceiveFuture { channel: self }
906 }
907
908 pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
913 ReceiveReadyFuture { channel: self }
914 }
915
916 pub fn try_receive(&self) -> Result<T, TryReceiveError> {
921 self.lock(|c| c.try_receive())
922 }
923
924 pub fn try_peek(&self) -> Result<T, TryReceiveError>
929 where
930 T: Clone,
931 {
932 self.lock(|c| c.try_peek())
933 }
934
935 pub const fn capacity(&self) -> usize {
937 N
938 }
939
940 pub fn free_capacity(&self) -> usize {
944 N - self.len()
945 }
946
947 pub fn clear(&self) {
949 self.lock(|c| c.clear());
950 }
951
952 pub fn len(&self) -> usize {
954 self.lock(|c| c.len())
955 }
956
957 pub fn is_empty(&self) -> bool {
959 self.lock(|c| c.is_empty())
960 }
961
962 pub fn is_full(&self) -> bool {
964 self.lock(|c| c.is_full())
965 }
966}
967
968impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N>
971where
972 M: RawMutex,
973{
974 fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
975 Channel::try_send_with_context(self, m, cx)
976 }
977
978 fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
979 Channel::try_receive_with_context(self, cx)
980 }
981
982 fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
983 where
984 T: Clone,
985 {
986 Channel::try_peek_with_context(self, cx)
987 }
988
989 fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
990 Channel::poll_ready_to_send(self, cx)
991 }
992
993 fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
994 Channel::poll_ready_to_receive(self, cx)
995 }
996
997 fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
998 Channel::poll_receive(self, cx)
999 }
1000}
1001
1002impl<M, T, const N: usize> futures_core::Stream for Channel<M, T, N>
1003where
1004 M: RawMutex,
1005{
1006 type Item = T;
1007
1008 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1009 self.poll_receive(cx).map(Some)
1010 }
1011}
1012
1013#[cfg(test)]
1014mod tests {
1015 use core::time::Duration;
1016
1017 use futures_executor::ThreadPool;
1018 use futures_timer::Delay;
1019 use futures_util::task::SpawnExt;
1020 use static_cell::StaticCell;
1021
1022 use super::*;
1023 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
1024
1025 fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
1026 c.queue.capacity() - c.queue.len()
1027 }
1028
1029 #[test]
1030 fn sending_once() {
1031 let mut c = ChannelState::<u32, 3>::new();
1032 assert!(c.try_send(1).is_ok());
1033 assert_eq!(capacity(&c), 2);
1034 }
1035
1036 #[test]
1037 fn sending_when_full() {
1038 let mut c = ChannelState::<u32, 3>::new();
1039 let _ = c.try_send(1);
1040 let _ = c.try_send(1);
1041 let _ = c.try_send(1);
1042 match c.try_send(2) {
1043 Err(TrySendError::Full(2)) => assert!(true),
1044 _ => assert!(false),
1045 }
1046 assert_eq!(capacity(&c), 0);
1047 }
1048
1049 #[test]
1050 fn receiving_once_with_one_send() {
1051 let mut c = ChannelState::<u32, 3>::new();
1052 assert!(c.try_send(1).is_ok());
1053 assert_eq!(c.try_receive().unwrap(), 1);
1054 assert_eq!(capacity(&c), 3);
1055 }
1056
1057 #[test]
1058 fn receiving_when_empty() {
1059 let mut c = ChannelState::<u32, 3>::new();
1060 match c.try_receive() {
1061 Err(TryReceiveError::Empty) => assert!(true),
1062 _ => assert!(false),
1063 }
1064 assert_eq!(capacity(&c), 3);
1065 }
1066
1067 #[test]
1068 fn simple_send_and_receive() {
1069 let c = Channel::<NoopRawMutex, u32, 3>::new();
1070 assert!(c.try_send(1).is_ok());
1071 assert_eq!(c.try_peek().unwrap(), 1);
1072 assert_eq!(c.try_peek().unwrap(), 1);
1073 assert_eq!(c.try_receive().unwrap(), 1);
1074 }
1075
1076 #[test]
1077 fn cloning() {
1078 let c = Channel::<NoopRawMutex, u32, 3>::new();
1079 let r1 = c.receiver();
1080 let s1 = c.sender();
1081
1082 let _ = r1.clone();
1083 let _ = s1.clone();
1084 }
1085
1086 #[test]
1087 fn dynamic_dispatch_into() {
1088 let c = Channel::<NoopRawMutex, u32, 3>::new();
1089 let s: DynamicSender<'_, u32> = c.sender().into();
1090 let r: DynamicReceiver<'_, u32> = c.receiver().into();
1091
1092 assert!(s.try_send(1).is_ok());
1093 assert_eq!(r.try_receive().unwrap(), 1);
1094 }
1095
1096 #[test]
1097 fn dynamic_dispatch_constructor() {
1098 let c = Channel::<NoopRawMutex, u32, 3>::new();
1099 let s = c.dyn_sender();
1100 let r = c.dyn_receiver();
1101
1102 assert!(s.try_send(1).is_ok());
1103 assert_eq!(r.try_peek().unwrap(), 1);
1104 assert_eq!(r.try_peek().unwrap(), 1);
1105 assert_eq!(r.try_receive().unwrap(), 1);
1106 }
1107
1108 #[futures_test::test]
1109 async fn receiver_receives_given_try_send_async() {
1110 let executor = ThreadPool::new().unwrap();
1111
1112 static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 3>> = StaticCell::new();
1113 let c = &*CHANNEL.init(Channel::new());
1114 let c2 = c;
1115 assert!(executor
1116 .spawn(async move {
1117 assert!(c2.try_send(1).is_ok());
1118 })
1119 .is_ok());
1120 assert_eq!(c.receive().await, 1);
1121 }
1122
1123 #[futures_test::test]
1124 async fn sender_send_completes_if_capacity() {
1125 let c = Channel::<CriticalSectionRawMutex, u32, 1>::new();
1126 c.send(1).await;
1127 assert_eq!(c.receive().await, 1);
1128 }
1129
1130 #[futures_test::test]
1131 async fn senders_sends_wait_until_capacity() {
1132 let executor = ThreadPool::new().unwrap();
1133
1134 static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 1>> = StaticCell::new();
1135 let c = &*CHANNEL.init(Channel::new());
1136 assert!(c.try_send(1).is_ok());
1137
1138 let c2 = c;
1139 let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
1140 let c2 = c;
1141 let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
1142 Delay::new(Duration::from_millis(500)).await;
1145 assert_eq!(c.receive().await, 1);
1146 assert!(executor
1147 .spawn(async move {
1148 loop {
1149 c.receive().await;
1150 }
1151 })
1152 .is_ok());
1153 send_task_1.unwrap().await;
1154 send_task_2.unwrap().await;
1155 }
1156}