embassy_sync/
channel.rs

1//! A queue for sending values between asynchronous tasks.
2//!
3//! It can be used concurrently by multiple producers (senders) and multiple
4//! consumers (receivers), i.e. it is an  "MPMC channel".
5//!
6//! Receivers are competing for messages. So a message that is received by
7//! one receiver is not received by any other.
8//!
9//! This queue takes a Mutex type so that various
10//! targets can be attained. For example, a ThreadModeMutex can be used
11//! for single-core Cortex-M targets where messages are only passed
12//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
13//! can also be used for single-core targets where messages are to be
14//! passed from exception mode e.g. out of an interrupt handler.
15//!
16//! This module provides a bounded channel that has a limit on the number of
17//! messages that it can store, and if this limit is reached, trying to send
18//! another message will result in an error being returned.
19//!
20//! # Example: Message passing between task and interrupt handler
21//!
22//! ```rust
23//! use embassy_sync::channel::Channel;
24//! use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
25//!
26//! static SHARED_CHANNEL: Channel<CriticalSectionRawMutex, u32, 8> = Channel::new();
27//!
28//! fn my_interrupt_handler() {
29//!     // Do some work..
30//!     // ...
31//!     if let Err(e) = SHARED_CHANNEL.sender().try_send(42) {
32//!         // Channel is full..
33//!     }
34//! }
35//!
36//! async fn my_async_task() {
37//!     // ...
38//!     let receiver = SHARED_CHANNEL.receiver();
39//!     loop {
40//!         let data_from_interrupt = receiver.receive().await;
41//!         // Do something with the data.
42//!     }
43//! }
44//! ```
45
46use core::cell::RefCell;
47use core::future::Future;
48use core::pin::Pin;
49use core::task::{Context, Poll};
50
51use heapless::Deque;
52
53use crate::blocking_mutex::raw::RawMutex;
54use crate::blocking_mutex::Mutex;
55use crate::waitqueue::WakerRegistration;
56
57/// Send-only access to a [`Channel`].
58#[derive(Debug)]
59pub struct Sender<'ch, M, T, const N: usize>
60where
61    M: RawMutex,
62{
63    channel: &'ch Channel<M, T, N>,
64}
65
66impl<'ch, M, T, const N: usize> Clone for Sender<'ch, M, T, N>
67where
68    M: RawMutex,
69{
70    fn clone(&self) -> Self {
71        *self
72    }
73}
74
75impl<'ch, M, T, const N: usize> Copy for Sender<'ch, M, T, N> where M: RawMutex {}
76
77impl<'ch, M, T, const N: usize> Sender<'ch, M, T, N>
78where
79    M: RawMutex,
80{
81    /// Sends a value.
82    ///
83    /// See [`Channel::send()`]
84    pub fn send(&self, message: T) -> SendFuture<'ch, M, T, N> {
85        self.channel.send(message)
86    }
87
88    /// Attempt to immediately send a message.
89    ///
90    /// See [`Channel::send()`]
91    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
92        self.channel.try_send(message)
93    }
94
95    /// Allows a poll_fn to poll until the channel is ready to send
96    ///
97    /// See [`Channel::poll_ready_to_send()`]
98    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
99        self.channel.poll_ready_to_send(cx)
100    }
101
102    /// Returns the maximum number of elements the channel can hold.
103    ///
104    /// See [`Channel::capacity()`]
105    pub const fn capacity(&self) -> usize {
106        self.channel.capacity()
107    }
108
109    /// Returns the free capacity of the channel.
110    ///
111    /// See [`Channel::free_capacity()`]
112    pub fn free_capacity(&self) -> usize {
113        self.channel.free_capacity()
114    }
115
116    /// Clears all elements in the channel.
117    ///
118    /// See [`Channel::clear()`]
119    pub fn clear(&self) {
120        self.channel.clear();
121    }
122
123    /// Returns the number of elements currently in the channel.
124    ///
125    /// See [`Channel::len()`]
126    pub fn len(&self) -> usize {
127        self.channel.len()
128    }
129
130    /// Returns whether the channel is empty.
131    ///
132    /// See [`Channel::is_empty()`]
133    pub fn is_empty(&self) -> bool {
134        self.channel.is_empty()
135    }
136
137    /// Returns whether the channel is full.
138    ///
139    /// See [`Channel::is_full()`]
140    pub fn is_full(&self) -> bool {
141        self.channel.is_full()
142    }
143}
144
145/// Send-only access to a [`Channel`] without knowing channel size.
146pub struct DynamicSender<'ch, T> {
147    pub(crate) channel: &'ch dyn DynamicChannel<T>,
148}
149
150impl<'ch, T> Clone for DynamicSender<'ch, T> {
151    fn clone(&self) -> Self {
152        *self
153    }
154}
155
156impl<'ch, T> Copy for DynamicSender<'ch, T> {}
157
158impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for DynamicSender<'ch, T>
159where
160    M: RawMutex,
161{
162    fn from(s: Sender<'ch, M, T, N>) -> Self {
163        Self { channel: s.channel }
164    }
165}
166
167impl<'ch, T> DynamicSender<'ch, T> {
168    /// Sends a value.
169    ///
170    /// See [`Channel::send()`]
171    pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
172        DynamicSendFuture {
173            channel: self.channel,
174            message: Some(message),
175        }
176    }
177
178    /// Attempt to immediately send a message.
179    ///
180    /// See [`Channel::send()`]
181    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
182        self.channel.try_send_with_context(message, None)
183    }
184
185    /// Allows a poll_fn to poll until the channel is ready to send
186    ///
187    /// See [`Channel::poll_ready_to_send()`]
188    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
189        self.channel.poll_ready_to_send(cx)
190    }
191}
192
193/// Send-only access to a [`Channel`] without knowing channel size.
194/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
195pub struct SendDynamicSender<'ch, T> {
196    pub(crate) channel: &'ch dyn DynamicChannel<T>,
197}
198
199impl<'ch, T> Clone for SendDynamicSender<'ch, T> {
200    fn clone(&self) -> Self {
201        *self
202    }
203}
204
205impl<'ch, T> Copy for SendDynamicSender<'ch, T> {}
206unsafe impl<'ch, T: Send> Send for SendDynamicSender<'ch, T> {}
207unsafe impl<'ch, T: Send> Sync for SendDynamicSender<'ch, T> {}
208
209impl<'ch, M, T, const N: usize> From<Sender<'ch, M, T, N>> for SendDynamicSender<'ch, T>
210where
211    M: RawMutex + Sync + Send,
212{
213    fn from(s: Sender<'ch, M, T, N>) -> Self {
214        Self { channel: s.channel }
215    }
216}
217
218impl<'ch, T> SendDynamicSender<'ch, T> {
219    /// Sends a value.
220    ///
221    /// See [`Channel::send()`]
222    pub fn send(&self, message: T) -> DynamicSendFuture<'ch, T> {
223        DynamicSendFuture {
224            channel: self.channel,
225            message: Some(message),
226        }
227    }
228
229    /// Attempt to immediately send a message.
230    ///
231    /// See [`Channel::send()`]
232    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
233        self.channel.try_send_with_context(message, None)
234    }
235
236    /// Allows a poll_fn to poll until the channel is ready to send
237    ///
238    /// See [`Channel::poll_ready_to_send()`]
239    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
240        self.channel.poll_ready_to_send(cx)
241    }
242}
243
244/// Receive-only access to a [`Channel`].
245#[derive(Debug)]
246pub struct Receiver<'ch, M, T, const N: usize>
247where
248    M: RawMutex,
249{
250    channel: &'ch Channel<M, T, N>,
251}
252
253impl<'ch, M, T, const N: usize> Clone for Receiver<'ch, M, T, N>
254where
255    M: RawMutex,
256{
257    fn clone(&self) -> Self {
258        *self
259    }
260}
261
262impl<'ch, M, T, const N: usize> Copy for Receiver<'ch, M, T, N> where M: RawMutex {}
263
264impl<'ch, M, T, const N: usize> Receiver<'ch, M, T, N>
265where
266    M: RawMutex,
267{
268    /// Receive the next value.
269    ///
270    /// See [`Channel::receive()`].
271    pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
272        self.channel.receive()
273    }
274
275    /// Is a value ready to be received in the channel
276    ///
277    /// See [`Channel::ready_to_receive()`].
278    pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
279        self.channel.ready_to_receive()
280    }
281
282    /// Attempt to immediately receive the next value.
283    ///
284    /// See [`Channel::try_receive()`]
285    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
286        self.channel.try_receive()
287    }
288
289    /// Peek at the next value without removing it from the queue.
290    ///
291    /// See [`Channel::try_peek()`]
292    pub fn try_peek(&self) -> Result<T, TryReceiveError>
293    where
294        T: Clone,
295    {
296        self.channel.try_peek()
297    }
298
299    /// Allows a poll_fn to poll until the channel is ready to receive
300    ///
301    /// See [`Channel::poll_ready_to_receive()`]
302    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
303        self.channel.poll_ready_to_receive(cx)
304    }
305
306    /// Poll the channel for the next item
307    ///
308    /// See [`Channel::poll_receive()`]
309    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
310        self.channel.poll_receive(cx)
311    }
312
313    /// Returns the maximum number of elements the channel can hold.
314    ///
315    /// See [`Channel::capacity()`]
316    pub const fn capacity(&self) -> usize {
317        self.channel.capacity()
318    }
319
320    /// Returns the free capacity of the channel.
321    ///
322    /// See [`Channel::free_capacity()`]
323    pub fn free_capacity(&self) -> usize {
324        self.channel.free_capacity()
325    }
326
327    /// Clears all elements in the channel.
328    ///
329    /// See [`Channel::clear()`]
330    pub fn clear(&self) {
331        self.channel.clear();
332    }
333
334    /// Returns the number of elements currently in the channel.
335    ///
336    /// See [`Channel::len()`]
337    pub fn len(&self) -> usize {
338        self.channel.len()
339    }
340
341    /// Returns whether the channel is empty.
342    ///
343    /// See [`Channel::is_empty()`]
344    pub fn is_empty(&self) -> bool {
345        self.channel.is_empty()
346    }
347
348    /// Returns whether the channel is full.
349    ///
350    /// See [`Channel::is_full()`]
351    pub fn is_full(&self) -> bool {
352        self.channel.is_full()
353    }
354}
355
356/// Receive-only access to a [`Channel`] without knowing channel size.
357pub struct DynamicReceiver<'ch, T> {
358    pub(crate) channel: &'ch dyn DynamicChannel<T>,
359}
360
361impl<'ch, T> Clone for DynamicReceiver<'ch, T> {
362    fn clone(&self) -> Self {
363        *self
364    }
365}
366
367impl<'ch, T> Copy for DynamicReceiver<'ch, T> {}
368
369impl<'ch, T> DynamicReceiver<'ch, T> {
370    /// Receive the next value.
371    ///
372    /// See [`Channel::receive()`].
373    pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
374        DynamicReceiveFuture { channel: self.channel }
375    }
376
377    /// Attempt to immediately receive the next value.
378    ///
379    /// See [`Channel::try_receive()`]
380    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
381        self.channel.try_receive_with_context(None)
382    }
383
384    /// Peek at the next value without removing it from the queue.
385    ///
386    /// See [`Channel::try_peek()`]
387    pub fn try_peek(&self) -> Result<T, TryReceiveError>
388    where
389        T: Clone,
390    {
391        self.channel.try_peek_with_context(None)
392    }
393
394    /// Allows a poll_fn to poll until the channel is ready to receive
395    ///
396    /// See [`Channel::poll_ready_to_receive()`]
397    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
398        self.channel.poll_ready_to_receive(cx)
399    }
400
401    /// Poll the channel for the next item
402    ///
403    /// See [`Channel::poll_receive()`]
404    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
405        self.channel.poll_receive(cx)
406    }
407}
408
409impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for DynamicReceiver<'ch, T>
410where
411    M: RawMutex,
412{
413    fn from(s: Receiver<'ch, M, T, N>) -> Self {
414        Self { channel: s.channel }
415    }
416}
417
418/// Receive-only access to a [`Channel`] without knowing channel size.
419/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
420pub struct SendDynamicReceiver<'ch, T> {
421    pub(crate) channel: &'ch dyn DynamicChannel<T>,
422}
423
424/// Receive-only access to a [`Channel`] without knowing channel size.
425/// This version can be sent between threads but can only be created if the underlying mutex is Sync.
426#[deprecated(since = "0.7.1", note = "please use `SendDynamicReceiver` instead")]
427pub type SendableDynamicReceiver<'ch, T> = SendDynamicReceiver<'ch, T>;
428
429impl<'ch, T> Clone for SendDynamicReceiver<'ch, T> {
430    fn clone(&self) -> Self {
431        *self
432    }
433}
434
435impl<'ch, T> Copy for SendDynamicReceiver<'ch, T> {}
436unsafe impl<'ch, T: Send> Send for SendDynamicReceiver<'ch, T> {}
437unsafe impl<'ch, T: Send> Sync for SendDynamicReceiver<'ch, T> {}
438
439impl<'ch, T> SendDynamicReceiver<'ch, T> {
440    /// Receive the next value.
441    ///
442    /// See [`Channel::receive()`].
443    pub fn receive(&self) -> DynamicReceiveFuture<'_, T> {
444        DynamicReceiveFuture { channel: self.channel }
445    }
446
447    /// Attempt to immediately receive the next value.
448    ///
449    /// See [`Channel::try_receive()`]
450    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
451        self.channel.try_receive_with_context(None)
452    }
453
454    /// Allows a poll_fn to poll until the channel is ready to receive
455    ///
456    /// See [`Channel::poll_ready_to_receive()`]
457    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
458        self.channel.poll_ready_to_receive(cx)
459    }
460
461    /// Poll the channel for the next item
462    ///
463    /// See [`Channel::poll_receive()`]
464    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
465        self.channel.poll_receive(cx)
466    }
467}
468
469impl<'ch, M, T, const N: usize> From<Receiver<'ch, M, T, N>> for SendDynamicReceiver<'ch, T>
470where
471    M: RawMutex + Sync + Send,
472{
473    fn from(s: Receiver<'ch, M, T, N>) -> Self {
474        Self { channel: s.channel }
475    }
476}
477
478impl<'ch, M, T, const N: usize> futures_core::Stream for Receiver<'ch, M, T, N>
479where
480    M: RawMutex,
481{
482    type Item = T;
483
484    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
485        self.channel.poll_receive(cx).map(Some)
486    }
487}
488
489/// Future returned by [`Channel::receive`] and  [`Receiver::receive`].
490#[must_use = "futures do nothing unless you `.await` or poll them"]
491#[derive(Debug)]
492pub struct ReceiveFuture<'ch, M, T, const N: usize>
493where
494    M: RawMutex,
495{
496    channel: &'ch Channel<M, T, N>,
497}
498
499impl<'ch, M, T, const N: usize> Future for ReceiveFuture<'ch, M, T, N>
500where
501    M: RawMutex,
502{
503    type Output = T;
504
505    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
506        self.channel.poll_receive(cx)
507    }
508}
509
510/// Future returned by [`Channel::ready_to_receive`] and  [`Receiver::ready_to_receive`].
511#[must_use = "futures do nothing unless you `.await` or poll them"]
512#[derive(Debug)]
513pub struct ReceiveReadyFuture<'ch, M, T, const N: usize>
514where
515    M: RawMutex,
516{
517    channel: &'ch Channel<M, T, N>,
518}
519
520impl<'ch, M, T, const N: usize> Future for ReceiveReadyFuture<'ch, M, T, N>
521where
522    M: RawMutex,
523{
524    type Output = ();
525
526    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
527        self.channel.poll_ready_to_receive(cx)
528    }
529}
530
531/// Future returned by [`DynamicReceiver::receive`].
532#[must_use = "futures do nothing unless you `.await` or poll them"]
533pub struct DynamicReceiveFuture<'ch, T> {
534    channel: &'ch dyn DynamicChannel<T>,
535}
536
537impl<'ch, T> Future for DynamicReceiveFuture<'ch, T> {
538    type Output = T;
539
540    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
541        match self.channel.try_receive_with_context(Some(cx)) {
542            Ok(v) => Poll::Ready(v),
543            Err(TryReceiveError::Empty) => Poll::Pending,
544        }
545    }
546}
547
548impl<'ch, M: RawMutex, T, const N: usize> From<ReceiveFuture<'ch, M, T, N>> for DynamicReceiveFuture<'ch, T> {
549    fn from(value: ReceiveFuture<'ch, M, T, N>) -> Self {
550        Self { channel: value.channel }
551    }
552}
553
554/// Future returned by [`Channel::send`] and  [`Sender::send`].
555#[must_use = "futures do nothing unless you `.await` or poll them"]
556#[derive(Debug)]
557pub struct SendFuture<'ch, M, T, const N: usize>
558where
559    M: RawMutex,
560{
561    channel: &'ch Channel<M, T, N>,
562    message: Option<T>,
563}
564
565impl<'ch, M, T, const N: usize> Future for SendFuture<'ch, M, T, N>
566where
567    M: RawMutex,
568{
569    type Output = ();
570
571    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
572        match self.message.take() {
573            Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
574                Ok(..) => Poll::Ready(()),
575                Err(TrySendError::Full(m)) => {
576                    self.message = Some(m);
577                    Poll::Pending
578                }
579            },
580            None => panic!("Message cannot be None"),
581        }
582    }
583}
584
585impl<'ch, M, T, const N: usize> Unpin for SendFuture<'ch, M, T, N> where M: RawMutex {}
586
587/// Future returned by [`DynamicSender::send`].
588#[must_use = "futures do nothing unless you `.await` or poll them"]
589pub struct DynamicSendFuture<'ch, T> {
590    channel: &'ch dyn DynamicChannel<T>,
591    message: Option<T>,
592}
593
594impl<'ch, T> Future for DynamicSendFuture<'ch, T> {
595    type Output = ();
596
597    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
598        match self.message.take() {
599            Some(m) => match self.channel.try_send_with_context(m, Some(cx)) {
600                Ok(..) => Poll::Ready(()),
601                Err(TrySendError::Full(m)) => {
602                    self.message = Some(m);
603                    Poll::Pending
604                }
605            },
606            None => panic!("Message cannot be None"),
607        }
608    }
609}
610
611impl<'ch, T> Unpin for DynamicSendFuture<'ch, T> {}
612
613impl<'ch, M: RawMutex, T, const N: usize> From<SendFuture<'ch, M, T, N>> for DynamicSendFuture<'ch, T> {
614    fn from(value: SendFuture<'ch, M, T, N>) -> Self {
615        Self {
616            channel: value.channel,
617            message: value.message,
618        }
619    }
620}
621
622pub(crate) trait DynamicChannel<T> {
623    fn try_send_with_context(&self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>>;
624
625    fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>;
626
627    fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
628    where
629        T: Clone;
630
631    fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>;
632    fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>;
633
634    fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T>;
635}
636
637/// Error returned by [`try_receive`](Channel::try_receive).
638#[derive(PartialEq, Eq, Clone, Copy, Debug)]
639#[cfg_attr(feature = "defmt", derive(defmt::Format))]
640pub enum TryReceiveError {
641    /// A message could not be received because the channel is empty.
642    Empty,
643}
644
645/// Error returned by [`try_send`](Channel::try_send).
646#[derive(PartialEq, Eq, Clone, Copy, Debug)]
647#[cfg_attr(feature = "defmt", derive(defmt::Format))]
648pub enum TrySendError<T> {
649    /// The data could not be sent on the channel because the channel is
650    /// currently full and sending would require blocking.
651    Full(T),
652}
653
654#[derive(Debug)]
655struct ChannelState<T, const N: usize> {
656    queue: Deque<T, N>,
657    receiver_waker: WakerRegistration,
658    senders_waker: WakerRegistration,
659}
660
661impl<T, const N: usize> ChannelState<T, N> {
662    const fn new() -> Self {
663        ChannelState {
664            queue: Deque::new(),
665            receiver_waker: WakerRegistration::new(),
666            senders_waker: WakerRegistration::new(),
667        }
668    }
669
670    fn try_receive(&mut self) -> Result<T, TryReceiveError> {
671        self.try_receive_with_context(None)
672    }
673
674    fn try_peek(&mut self) -> Result<T, TryReceiveError>
675    where
676        T: Clone,
677    {
678        self.try_peek_with_context(None)
679    }
680
681    fn try_peek_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
682    where
683        T: Clone,
684    {
685        if self.queue.is_full() {
686            self.senders_waker.wake();
687        }
688
689        if let Some(message) = self.queue.front() {
690            Ok(message.clone())
691        } else {
692            if let Some(cx) = cx {
693                self.receiver_waker.register(cx.waker());
694            }
695            Err(TryReceiveError::Empty)
696        }
697    }
698
699    fn try_receive_with_context(&mut self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
700        if self.queue.is_full() {
701            self.senders_waker.wake();
702        }
703
704        if let Some(message) = self.queue.pop_front() {
705            Ok(message)
706        } else {
707            if let Some(cx) = cx {
708                self.receiver_waker.register(cx.waker());
709            }
710            Err(TryReceiveError::Empty)
711        }
712    }
713
714    fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll<T> {
715        if self.queue.is_full() {
716            self.senders_waker.wake();
717        }
718
719        if let Some(message) = self.queue.pop_front() {
720            Poll::Ready(message)
721        } else {
722            self.receiver_waker.register(cx.waker());
723            Poll::Pending
724        }
725    }
726
727    fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> {
728        self.receiver_waker.register(cx.waker());
729
730        if !self.queue.is_empty() {
731            Poll::Ready(())
732        } else {
733            Poll::Pending
734        }
735    }
736
737    fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
738        self.try_send_with_context(message, None)
739    }
740
741    fn try_send_with_context(&mut self, message: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
742        match self.queue.push_back(message) {
743            Ok(()) => {
744                self.receiver_waker.wake();
745                Ok(())
746            }
747            Err(message) => {
748                if let Some(cx) = cx {
749                    self.senders_waker.register(cx.waker());
750                }
751                Err(TrySendError::Full(message))
752            }
753        }
754    }
755
756    fn poll_ready_to_send(&mut self, cx: &mut Context<'_>) -> Poll<()> {
757        self.senders_waker.register(cx.waker());
758
759        if !self.queue.is_full() {
760            Poll::Ready(())
761        } else {
762            Poll::Pending
763        }
764    }
765
766    fn clear(&mut self) {
767        if self.queue.is_full() {
768            self.senders_waker.wake();
769        }
770        self.queue.clear();
771    }
772
773    fn len(&self) -> usize {
774        self.queue.len()
775    }
776
777    fn is_empty(&self) -> bool {
778        self.queue.is_empty()
779    }
780
781    fn is_full(&self) -> bool {
782        self.queue.is_full()
783    }
784}
785
786/// A bounded channel for communicating between asynchronous tasks
787/// with backpressure.
788///
789/// The channel will buffer up to the provided number of messages.  Once the
790/// buffer is full, attempts to `send` new messages will wait until a message is
791/// received from the channel.
792///
793/// All data sent will become available in the same order as it was sent.
794#[derive(Debug)]
795pub struct Channel<M, T, const N: usize>
796where
797    M: RawMutex,
798{
799    inner: Mutex<M, RefCell<ChannelState<T, N>>>,
800}
801
802impl<M, T, const N: usize> Channel<M, T, N>
803where
804    M: RawMutex,
805{
806    /// Establish a new bounded channel. For example, to create one with a NoopMutex:
807    ///
808    /// ```
809    /// use embassy_sync::channel::Channel;
810    /// use embassy_sync::blocking_mutex::raw::NoopRawMutex;
811    ///
812    /// // Declare a bounded channel of 3 u32s.
813    /// let mut channel = Channel::<NoopRawMutex, u32, 3>::new();
814    /// ```
815    pub const fn new() -> Self {
816        Self {
817            inner: Mutex::new(RefCell::new(ChannelState::new())),
818        }
819    }
820
821    fn lock<R>(&self, f: impl FnOnce(&mut ChannelState<T, N>) -> R) -> R {
822        self.inner.lock(|rc| f(&mut *unwrap!(rc.try_borrow_mut())))
823    }
824
825    fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
826        self.lock(|c| c.try_receive_with_context(cx))
827    }
828
829    fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
830    where
831        T: Clone,
832    {
833        self.lock(|c| c.try_peek_with_context(cx))
834    }
835
836    /// Poll the channel for the next message
837    pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
838        self.lock(|c| c.poll_receive(cx))
839    }
840
841    fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
842        self.lock(|c| c.try_send_with_context(m, cx))
843    }
844
845    /// Allows a poll_fn to poll until the channel is ready to receive
846    pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
847        self.lock(|c| c.poll_ready_to_receive(cx))
848    }
849
850    /// Allows a poll_fn to poll until the channel is ready to send
851    pub fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
852        self.lock(|c| c.poll_ready_to_send(cx))
853    }
854
855    /// Get a sender for this channel.
856    pub fn sender(&self) -> Sender<'_, M, T, N> {
857        Sender { channel: self }
858    }
859
860    /// Get a receiver for this channel.
861    pub fn receiver(&self) -> Receiver<'_, M, T, N> {
862        Receiver { channel: self }
863    }
864
865    /// Get a sender for this channel using dynamic dispatch.
866    pub fn dyn_sender(&self) -> DynamicSender<'_, T> {
867        DynamicSender { channel: self }
868    }
869
870    /// Get a receiver for this channel using dynamic dispatch.
871    pub fn dyn_receiver(&self) -> DynamicReceiver<'_, T> {
872        DynamicReceiver { channel: self }
873    }
874
875    /// Send a value, waiting until there is capacity.
876    ///
877    /// Sending completes when the value has been pushed to the channel's queue.
878    /// This doesn't mean the value has been received yet.
879    pub fn send(&self, message: T) -> SendFuture<'_, M, T, N> {
880        SendFuture {
881            channel: self,
882            message: Some(message),
883        }
884    }
885
886    /// Attempt to immediately send a message.
887    ///
888    /// This method differs from [`send`](Channel::send) by returning immediately if the channel's
889    /// buffer is full, instead of waiting.
890    ///
891    /// # Errors
892    ///
893    /// If the channel capacity has been reached, i.e., the channel has `n`
894    /// buffered values where `n` is the argument passed to [`Channel`], then an
895    /// error is returned.
896    pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
897        self.lock(|c| c.try_send(message))
898    }
899
900    /// Receive the next value.
901    ///
902    /// If there are no messages in the channel's buffer, this method will
903    /// wait until a message is sent.
904    pub fn receive(&self) -> ReceiveFuture<'_, M, T, N> {
905        ReceiveFuture { channel: self }
906    }
907
908    /// Is a value ready to be received in the channel
909    ///
910    /// If there are no messages in the channel's buffer, this method will
911    /// wait until there is at least one
912    pub fn ready_to_receive(&self) -> ReceiveReadyFuture<'_, M, T, N> {
913        ReceiveReadyFuture { channel: self }
914    }
915
916    /// Attempt to immediately receive a message.
917    ///
918    /// This method will either receive a message from the channel immediately or return an error
919    /// if the channel is empty.
920    pub fn try_receive(&self) -> Result<T, TryReceiveError> {
921        self.lock(|c| c.try_receive())
922    }
923
924    /// Peek at the next value without removing it from the queue.
925    ///
926    /// This method will either receive a copy of the message from the channel immediately or return
927    /// an error if the channel is empty.
928    pub fn try_peek(&self) -> Result<T, TryReceiveError>
929    where
930        T: Clone,
931    {
932        self.lock(|c| c.try_peek())
933    }
934
935    /// Returns the maximum number of elements the channel can hold.
936    pub const fn capacity(&self) -> usize {
937        N
938    }
939
940    /// Returns the free capacity of the channel.
941    ///
942    /// This is equivalent to `capacity() - len()`
943    pub fn free_capacity(&self) -> usize {
944        N - self.len()
945    }
946
947    /// Clears all elements in the channel.
948    pub fn clear(&self) {
949        self.lock(|c| c.clear());
950    }
951
952    /// Returns the number of elements currently in the channel.
953    pub fn len(&self) -> usize {
954        self.lock(|c| c.len())
955    }
956
957    /// Returns whether the channel is empty.
958    pub fn is_empty(&self) -> bool {
959        self.lock(|c| c.is_empty())
960    }
961
962    /// Returns whether the channel is full.
963    pub fn is_full(&self) -> bool {
964        self.lock(|c| c.is_full())
965    }
966}
967
968/// Implements the DynamicChannel to allow creating types that are unaware of the queue size with the
969/// tradeoff cost of dynamic dispatch.
970impl<M, T, const N: usize> DynamicChannel<T> for Channel<M, T, N>
971where
972    M: RawMutex,
973{
974    fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError<T>> {
975        Channel::try_send_with_context(self, m, cx)
976    }
977
978    fn try_receive_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError> {
979        Channel::try_receive_with_context(self, cx)
980    }
981
982    fn try_peek_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<T, TryReceiveError>
983    where
984        T: Clone,
985    {
986        Channel::try_peek_with_context(self, cx)
987    }
988
989    fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()> {
990        Channel::poll_ready_to_send(self, cx)
991    }
992
993    fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> {
994        Channel::poll_ready_to_receive(self, cx)
995    }
996
997    fn poll_receive(&self, cx: &mut Context<'_>) -> Poll<T> {
998        Channel::poll_receive(self, cx)
999    }
1000}
1001
1002impl<M, T, const N: usize> futures_core::Stream for Channel<M, T, N>
1003where
1004    M: RawMutex,
1005{
1006    type Item = T;
1007
1008    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1009        self.poll_receive(cx).map(Some)
1010    }
1011}
1012
1013#[cfg(test)]
1014mod tests {
1015    use core::time::Duration;
1016
1017    use futures_executor::ThreadPool;
1018    use futures_timer::Delay;
1019    use futures_util::task::SpawnExt;
1020    use static_cell::StaticCell;
1021
1022    use super::*;
1023    use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
1024
1025    fn capacity<T, const N: usize>(c: &ChannelState<T, N>) -> usize {
1026        c.queue.capacity() - c.queue.len()
1027    }
1028
1029    #[test]
1030    fn sending_once() {
1031        let mut c = ChannelState::<u32, 3>::new();
1032        assert!(c.try_send(1).is_ok());
1033        assert_eq!(capacity(&c), 2);
1034    }
1035
1036    #[test]
1037    fn sending_when_full() {
1038        let mut c = ChannelState::<u32, 3>::new();
1039        let _ = c.try_send(1);
1040        let _ = c.try_send(1);
1041        let _ = c.try_send(1);
1042        match c.try_send(2) {
1043            Err(TrySendError::Full(2)) => assert!(true),
1044            _ => assert!(false),
1045        }
1046        assert_eq!(capacity(&c), 0);
1047    }
1048
1049    #[test]
1050    fn receiving_once_with_one_send() {
1051        let mut c = ChannelState::<u32, 3>::new();
1052        assert!(c.try_send(1).is_ok());
1053        assert_eq!(c.try_receive().unwrap(), 1);
1054        assert_eq!(capacity(&c), 3);
1055    }
1056
1057    #[test]
1058    fn receiving_when_empty() {
1059        let mut c = ChannelState::<u32, 3>::new();
1060        match c.try_receive() {
1061            Err(TryReceiveError::Empty) => assert!(true),
1062            _ => assert!(false),
1063        }
1064        assert_eq!(capacity(&c), 3);
1065    }
1066
1067    #[test]
1068    fn simple_send_and_receive() {
1069        let c = Channel::<NoopRawMutex, u32, 3>::new();
1070        assert!(c.try_send(1).is_ok());
1071        assert_eq!(c.try_peek().unwrap(), 1);
1072        assert_eq!(c.try_peek().unwrap(), 1);
1073        assert_eq!(c.try_receive().unwrap(), 1);
1074    }
1075
1076    #[test]
1077    fn cloning() {
1078        let c = Channel::<NoopRawMutex, u32, 3>::new();
1079        let r1 = c.receiver();
1080        let s1 = c.sender();
1081
1082        let _ = r1.clone();
1083        let _ = s1.clone();
1084    }
1085
1086    #[test]
1087    fn dynamic_dispatch_into() {
1088        let c = Channel::<NoopRawMutex, u32, 3>::new();
1089        let s: DynamicSender<'_, u32> = c.sender().into();
1090        let r: DynamicReceiver<'_, u32> = c.receiver().into();
1091
1092        assert!(s.try_send(1).is_ok());
1093        assert_eq!(r.try_receive().unwrap(), 1);
1094    }
1095
1096    #[test]
1097    fn dynamic_dispatch_constructor() {
1098        let c = Channel::<NoopRawMutex, u32, 3>::new();
1099        let s = c.dyn_sender();
1100        let r = c.dyn_receiver();
1101
1102        assert!(s.try_send(1).is_ok());
1103        assert_eq!(r.try_peek().unwrap(), 1);
1104        assert_eq!(r.try_peek().unwrap(), 1);
1105        assert_eq!(r.try_receive().unwrap(), 1);
1106    }
1107
1108    #[futures_test::test]
1109    async fn receiver_receives_given_try_send_async() {
1110        let executor = ThreadPool::new().unwrap();
1111
1112        static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 3>> = StaticCell::new();
1113        let c = &*CHANNEL.init(Channel::new());
1114        let c2 = c;
1115        assert!(executor
1116            .spawn(async move {
1117                assert!(c2.try_send(1).is_ok());
1118            })
1119            .is_ok());
1120        assert_eq!(c.receive().await, 1);
1121    }
1122
1123    #[futures_test::test]
1124    async fn sender_send_completes_if_capacity() {
1125        let c = Channel::<CriticalSectionRawMutex, u32, 1>::new();
1126        c.send(1).await;
1127        assert_eq!(c.receive().await, 1);
1128    }
1129
1130    #[futures_test::test]
1131    async fn senders_sends_wait_until_capacity() {
1132        let executor = ThreadPool::new().unwrap();
1133
1134        static CHANNEL: StaticCell<Channel<CriticalSectionRawMutex, u32, 1>> = StaticCell::new();
1135        let c = &*CHANNEL.init(Channel::new());
1136        assert!(c.try_send(1).is_ok());
1137
1138        let c2 = c;
1139        let send_task_1 = executor.spawn_with_handle(async move { c2.send(2).await });
1140        let c2 = c;
1141        let send_task_2 = executor.spawn_with_handle(async move { c2.send(3).await });
1142        // Wish I could think of a means of determining that the async send is waiting instead.
1143        // However, I've used the debugger to observe that the send does indeed wait.
1144        Delay::new(Duration::from_millis(500)).await;
1145        assert_eq!(c.receive().await, 1);
1146        assert!(executor
1147            .spawn(async move {
1148                loop {
1149                    c.receive().await;
1150                }
1151            })
1152            .is_ok());
1153        send_task_1.unwrap().await;
1154        send_task_2.unwrap().await;
1155    }
1156}