embassy_sync/
zerocopy_channel.rs

1//! A zero-copy queue for sending values between asynchronous tasks.
2//!
3//! It can be used concurrently by a producer (sender) and a
4//! consumer (receiver), i.e. it is an  "SPSC channel".
5//!
6//! This queue takes a Mutex type so that various
7//! targets can be attained. For example, a ThreadModeMutex can be used
8//! for single-core Cortex-M targets where messages are only passed
9//! between tasks running in thread mode. Similarly, a CriticalSectionMutex
10//! can also be used for single-core targets where messages are to be
11//! passed from exception mode e.g. out of an interrupt handler.
12//!
13//! This module provides a bounded channel that has a limit on the number of
14//! messages that it can store, and if this limit is reached, trying to send
15//! another message will result in an error being returned.
16
17use core::cell::RefCell;
18use core::future::{poll_fn, Future};
19use core::marker::PhantomData;
20use core::task::{Context, Poll};
21
22use crate::blocking_mutex::raw::RawMutex;
23use crate::blocking_mutex::Mutex;
24use crate::waitqueue::WakerRegistration;
25
26/// A bounded zero-copy channel for communicating between asynchronous tasks
27/// with backpressure.
28///
29/// The channel will buffer up to the provided number of messages.  Once the
30/// buffer is full, attempts to `send` new messages will wait until a message is
31/// received from the channel.
32///
33/// All data sent will become available in the same order as it was sent.
34///
35/// The channel requires a buffer of recyclable elements.  Writing to the channel is done through
36/// an `&mut T`.
37#[derive(Debug)]
38pub struct Channel<'a, M: RawMutex, T> {
39    buf: BufferPtr<T>,
40    phantom: PhantomData<&'a mut T>,
41    state: Mutex<M, RefCell<State>>,
42}
43
44impl<'a, M: RawMutex, T> Channel<'a, M, T> {
45    /// Initialize a new [`Channel`].
46    ///
47    /// The provided buffer will be used and reused by the channel's logic, and thus dictates the
48    /// channel's capacity.
49    pub fn new(buf: &'a mut [T]) -> Self {
50        let len = buf.len();
51        assert!(len != 0);
52
53        Self {
54            buf: BufferPtr(buf.as_mut_ptr()),
55            phantom: PhantomData,
56            state: Mutex::new(RefCell::new(State {
57                capacity: len,
58                front: 0,
59                back: 0,
60                full: false,
61                send_waker: WakerRegistration::new(),
62                receive_waker: WakerRegistration::new(),
63            })),
64        }
65    }
66
67    /// Creates a [`Sender`] and [`Receiver`] from an existing channel.
68    ///
69    /// Further Senders and Receivers can be created through [`Sender::borrow`] and
70    /// [`Receiver::borrow`] respectively.
71    pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
72        (Sender { channel: self }, Receiver { channel: self })
73    }
74
75    /// Clears all elements in the channel.
76    pub fn clear(&mut self) {
77        self.state.lock(|s| {
78            s.borrow_mut().clear();
79        });
80    }
81
82    /// Returns the number of elements currently in the channel.
83    pub fn len(&self) -> usize {
84        self.state.lock(|s| s.borrow().len())
85    }
86
87    /// Returns whether the channel is empty.
88    pub fn is_empty(&self) -> bool {
89        self.state.lock(|s| s.borrow().is_empty())
90    }
91
92    /// Returns whether the channel is full.
93    pub fn is_full(&self) -> bool {
94        self.state.lock(|s| s.borrow().is_full())
95    }
96}
97
98#[repr(transparent)]
99#[derive(Debug)]
100struct BufferPtr<T>(*mut T);
101
102impl<T> BufferPtr<T> {
103    unsafe fn add(&self, count: usize) -> *mut T {
104        self.0.add(count)
105    }
106}
107
108unsafe impl<T> Send for BufferPtr<T> {}
109unsafe impl<T> Sync for BufferPtr<T> {}
110
111/// Send-only access to a [`Channel`].
112#[derive(Debug)]
113pub struct Sender<'a, M: RawMutex, T> {
114    channel: &'a Channel<'a, M, T>,
115}
116
117impl<'a, M: RawMutex, T> Sender<'a, M, T> {
118    /// Creates one further [`Sender`] over the same channel.
119    pub fn borrow(&mut self) -> Sender<'_, M, T> {
120        Sender { channel: self.channel }
121    }
122
123    /// Attempts to send a value over the channel.
124    pub fn try_send(&mut self) -> Option<&mut T> {
125        self.channel.state.lock(|s| {
126            let s = &mut *s.borrow_mut();
127            match s.push_index() {
128                Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
129                None => None,
130            }
131        })
132    }
133
134    /// Attempts to send a value over the channel.
135    pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
136        self.channel.state.lock(|s| {
137            let s = &mut *s.borrow_mut();
138            match s.push_index() {
139                Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
140                None => {
141                    s.receive_waker.register(cx.waker());
142                    Poll::Pending
143                }
144            }
145        })
146    }
147
148    /// Asynchronously send a value over the channel.
149    pub fn send(&mut self) -> impl Future<Output = &mut T> {
150        poll_fn(|cx| {
151            self.channel.state.lock(|s| {
152                let s = &mut *s.borrow_mut();
153                match s.push_index() {
154                    Some(i) => {
155                        let r = unsafe { &mut *self.channel.buf.add(i) };
156                        Poll::Ready(r)
157                    }
158                    None => {
159                        s.receive_waker.register(cx.waker());
160                        Poll::Pending
161                    }
162                }
163            })
164        })
165    }
166
167    /// Notify the channel that the sending of the value has been finalized.
168    pub fn send_done(&mut self) {
169        self.channel.state.lock(|s| s.borrow_mut().push_done())
170    }
171
172    /// Clears all elements in the channel.
173    pub fn clear(&mut self) {
174        self.channel.state.lock(|s| {
175            s.borrow_mut().clear();
176        });
177    }
178
179    /// Returns the number of elements currently in the channel.
180    pub fn len(&self) -> usize {
181        self.channel.state.lock(|s| s.borrow().len())
182    }
183
184    /// Returns whether the channel is empty.
185    pub fn is_empty(&self) -> bool {
186        self.channel.state.lock(|s| s.borrow().is_empty())
187    }
188
189    /// Returns whether the channel is full.
190    pub fn is_full(&self) -> bool {
191        self.channel.state.lock(|s| s.borrow().is_full())
192    }
193}
194
195/// Receive-only access to a [`Channel`].
196#[derive(Debug)]
197pub struct Receiver<'a, M: RawMutex, T> {
198    channel: &'a Channel<'a, M, T>,
199}
200
201impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
202    /// Creates one further [`Receiver`] over the same channel.
203    pub fn borrow(&mut self) -> Receiver<'_, M, T> {
204        Receiver { channel: self.channel }
205    }
206
207    /// Attempts to receive a value over the channel.
208    pub fn try_receive(&mut self) -> Option<&mut T> {
209        self.channel.state.lock(|s| {
210            let s = &mut *s.borrow_mut();
211            match s.pop_index() {
212                Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
213                None => None,
214            }
215        })
216    }
217
218    /// Attempts to asynchronously receive a value over the channel.
219    pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> {
220        self.channel.state.lock(|s| {
221            let s = &mut *s.borrow_mut();
222            match s.pop_index() {
223                Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
224                None => {
225                    s.send_waker.register(cx.waker());
226                    Poll::Pending
227                }
228            }
229        })
230    }
231
232    /// Asynchronously receive a value over the channel.
233    pub fn receive(&mut self) -> impl Future<Output = &mut T> {
234        poll_fn(|cx| {
235            self.channel.state.lock(|s| {
236                let s = &mut *s.borrow_mut();
237                match s.pop_index() {
238                    Some(i) => {
239                        let r = unsafe { &mut *self.channel.buf.add(i) };
240                        Poll::Ready(r)
241                    }
242                    None => {
243                        s.send_waker.register(cx.waker());
244                        Poll::Pending
245                    }
246                }
247            })
248        })
249    }
250
251    /// Notify the channel that the receiving of the value has been finalized.
252    pub fn receive_done(&mut self) {
253        self.channel.state.lock(|s| s.borrow_mut().pop_done())
254    }
255
256    /// Clears all elements in the channel.
257    pub fn clear(&mut self) {
258        self.channel.state.lock(|s| {
259            s.borrow_mut().clear();
260        });
261    }
262
263    /// Returns the number of elements currently in the channel.
264    pub fn len(&self) -> usize {
265        self.channel.state.lock(|s| s.borrow().len())
266    }
267
268    /// Returns whether the channel is empty.
269    pub fn is_empty(&self) -> bool {
270        self.channel.state.lock(|s| s.borrow().is_empty())
271    }
272
273    /// Returns whether the channel is full.
274    pub fn is_full(&self) -> bool {
275        self.channel.state.lock(|s| s.borrow().is_full())
276    }
277}
278
279#[derive(Debug)]
280struct State {
281    /// Maximum number of elements the channel can hold.
282    capacity: usize,
283
284    /// Front index. Always 0..=(N-1)
285    front: usize,
286    /// Back index. Always 0..=(N-1).
287    back: usize,
288
289    /// Used to distinguish "empty" and "full" cases when `front == back`.
290    /// May only be `true` if `front == back`, always `false` otherwise.
291    full: bool,
292
293    send_waker: WakerRegistration,
294    receive_waker: WakerRegistration,
295}
296
297impl State {
298    fn increment(&self, i: usize) -> usize {
299        if i + 1 == self.capacity {
300            0
301        } else {
302            i + 1
303        }
304    }
305
306    fn clear(&mut self) {
307        if self.full {
308            self.receive_waker.wake();
309        }
310        self.front = 0;
311        self.back = 0;
312        self.full = false;
313    }
314
315    fn len(&self) -> usize {
316        if !self.full {
317            if self.back >= self.front {
318                self.back - self.front
319            } else {
320                self.capacity + self.back - self.front
321            }
322        } else {
323            self.capacity
324        }
325    }
326
327    fn is_full(&self) -> bool {
328        self.full
329    }
330
331    fn is_empty(&self) -> bool {
332        self.front == self.back && !self.full
333    }
334
335    fn push_index(&mut self) -> Option<usize> {
336        match self.is_full() {
337            true => None,
338            false => Some(self.back),
339        }
340    }
341
342    fn push_done(&mut self) {
343        assert!(!self.is_full());
344        self.back = self.increment(self.back);
345        if self.back == self.front {
346            self.full = true;
347        }
348        self.send_waker.wake();
349    }
350
351    fn pop_index(&mut self) -> Option<usize> {
352        match self.is_empty() {
353            true => None,
354            false => Some(self.front),
355        }
356    }
357
358    fn pop_done(&mut self) {
359        assert!(!self.is_empty());
360        self.front = self.increment(self.front);
361        self.full = false;
362        self.receive_waker.wake();
363    }
364}