embassy_sync/
zerocopy_channel.rs1use core::cell::RefCell;
18use core::future::{poll_fn, Future};
19use core::marker::PhantomData;
20use core::task::{Context, Poll};
21
22use crate::blocking_mutex::raw::RawMutex;
23use crate::blocking_mutex::Mutex;
24use crate::waitqueue::WakerRegistration;
25
26#[derive(Debug)]
38pub struct Channel<'a, M: RawMutex, T> {
39 buf: BufferPtr<T>,
40 phantom: PhantomData<&'a mut T>,
41 state: Mutex<M, RefCell<State>>,
42}
43
44impl<'a, M: RawMutex, T> Channel<'a, M, T> {
45 pub fn new(buf: &'a mut [T]) -> Self {
50 let len = buf.len();
51 assert!(len != 0);
52
53 Self {
54 buf: BufferPtr(buf.as_mut_ptr()),
55 phantom: PhantomData,
56 state: Mutex::new(RefCell::new(State {
57 capacity: len,
58 front: 0,
59 back: 0,
60 full: false,
61 send_waker: WakerRegistration::new(),
62 receive_waker: WakerRegistration::new(),
63 })),
64 }
65 }
66
67 pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
72 (Sender { channel: self }, Receiver { channel: self })
73 }
74
75 pub fn clear(&mut self) {
77 self.state.lock(|s| {
78 s.borrow_mut().clear();
79 });
80 }
81
82 pub fn len(&self) -> usize {
84 self.state.lock(|s| s.borrow().len())
85 }
86
87 pub fn is_empty(&self) -> bool {
89 self.state.lock(|s| s.borrow().is_empty())
90 }
91
92 pub fn is_full(&self) -> bool {
94 self.state.lock(|s| s.borrow().is_full())
95 }
96}
97
98#[repr(transparent)]
99#[derive(Debug)]
100struct BufferPtr<T>(*mut T);
101
102impl<T> BufferPtr<T> {
103 unsafe fn add(&self, count: usize) -> *mut T {
104 self.0.add(count)
105 }
106}
107
108unsafe impl<T> Send for BufferPtr<T> {}
109unsafe impl<T> Sync for BufferPtr<T> {}
110
111#[derive(Debug)]
113pub struct Sender<'a, M: RawMutex, T> {
114 channel: &'a Channel<'a, M, T>,
115}
116
117impl<'a, M: RawMutex, T> Sender<'a, M, T> {
118 pub fn borrow(&mut self) -> Sender<'_, M, T> {
120 Sender { channel: self.channel }
121 }
122
123 pub fn try_send(&mut self) -> Option<&mut T> {
125 self.channel.state.lock(|s| {
126 let s = &mut *s.borrow_mut();
127 match s.push_index() {
128 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
129 None => None,
130 }
131 })
132 }
133
134 pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
136 self.channel.state.lock(|s| {
137 let s = &mut *s.borrow_mut();
138 match s.push_index() {
139 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
140 None => {
141 s.receive_waker.register(cx.waker());
142 Poll::Pending
143 }
144 }
145 })
146 }
147
148 pub fn send(&mut self) -> impl Future<Output = &mut T> {
150 poll_fn(|cx| {
151 self.channel.state.lock(|s| {
152 let s = &mut *s.borrow_mut();
153 match s.push_index() {
154 Some(i) => {
155 let r = unsafe { &mut *self.channel.buf.add(i) };
156 Poll::Ready(r)
157 }
158 None => {
159 s.receive_waker.register(cx.waker());
160 Poll::Pending
161 }
162 }
163 })
164 })
165 }
166
167 pub fn send_done(&mut self) {
169 self.channel.state.lock(|s| s.borrow_mut().push_done())
170 }
171
172 pub fn clear(&mut self) {
174 self.channel.state.lock(|s| {
175 s.borrow_mut().clear();
176 });
177 }
178
179 pub fn len(&self) -> usize {
181 self.channel.state.lock(|s| s.borrow().len())
182 }
183
184 pub fn is_empty(&self) -> bool {
186 self.channel.state.lock(|s| s.borrow().is_empty())
187 }
188
189 pub fn is_full(&self) -> bool {
191 self.channel.state.lock(|s| s.borrow().is_full())
192 }
193}
194
195#[derive(Debug)]
197pub struct Receiver<'a, M: RawMutex, T> {
198 channel: &'a Channel<'a, M, T>,
199}
200
201impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
202 pub fn borrow(&mut self) -> Receiver<'_, M, T> {
204 Receiver { channel: self.channel }
205 }
206
207 pub fn try_receive(&mut self) -> Option<&mut T> {
209 self.channel.state.lock(|s| {
210 let s = &mut *s.borrow_mut();
211 match s.pop_index() {
212 Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
213 None => None,
214 }
215 })
216 }
217
218 pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> {
220 self.channel.state.lock(|s| {
221 let s = &mut *s.borrow_mut();
222 match s.pop_index() {
223 Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
224 None => {
225 s.send_waker.register(cx.waker());
226 Poll::Pending
227 }
228 }
229 })
230 }
231
232 pub fn receive(&mut self) -> impl Future<Output = &mut T> {
234 poll_fn(|cx| {
235 self.channel.state.lock(|s| {
236 let s = &mut *s.borrow_mut();
237 match s.pop_index() {
238 Some(i) => {
239 let r = unsafe { &mut *self.channel.buf.add(i) };
240 Poll::Ready(r)
241 }
242 None => {
243 s.send_waker.register(cx.waker());
244 Poll::Pending
245 }
246 }
247 })
248 })
249 }
250
251 pub fn receive_done(&mut self) {
253 self.channel.state.lock(|s| s.borrow_mut().pop_done())
254 }
255
256 pub fn clear(&mut self) {
258 self.channel.state.lock(|s| {
259 s.borrow_mut().clear();
260 });
261 }
262
263 pub fn len(&self) -> usize {
265 self.channel.state.lock(|s| s.borrow().len())
266 }
267
268 pub fn is_empty(&self) -> bool {
270 self.channel.state.lock(|s| s.borrow().is_empty())
271 }
272
273 pub fn is_full(&self) -> bool {
275 self.channel.state.lock(|s| s.borrow().is_full())
276 }
277}
278
279#[derive(Debug)]
280struct State {
281 capacity: usize,
283
284 front: usize,
286 back: usize,
288
289 full: bool,
292
293 send_waker: WakerRegistration,
294 receive_waker: WakerRegistration,
295}
296
297impl State {
298 fn increment(&self, i: usize) -> usize {
299 if i + 1 == self.capacity {
300 0
301 } else {
302 i + 1
303 }
304 }
305
306 fn clear(&mut self) {
307 if self.full {
308 self.receive_waker.wake();
309 }
310 self.front = 0;
311 self.back = 0;
312 self.full = false;
313 }
314
315 fn len(&self) -> usize {
316 if !self.full {
317 if self.back >= self.front {
318 self.back - self.front
319 } else {
320 self.capacity + self.back - self.front
321 }
322 } else {
323 self.capacity
324 }
325 }
326
327 fn is_full(&self) -> bool {
328 self.full
329 }
330
331 fn is_empty(&self) -> bool {
332 self.front == self.back && !self.full
333 }
334
335 fn push_index(&mut self) -> Option<usize> {
336 match self.is_full() {
337 true => None,
338 false => Some(self.back),
339 }
340 }
341
342 fn push_done(&mut self) {
343 assert!(!self.is_full());
344 self.back = self.increment(self.back);
345 if self.back == self.front {
346 self.full = true;
347 }
348 self.send_waker.wake();
349 }
350
351 fn pop_index(&mut self) -> Option<usize> {
352 match self.is_empty() {
353 true => None,
354 false => Some(self.front),
355 }
356 }
357
358 fn pop_done(&mut self) {
359 assert!(!self.is_empty());
360 self.front = self.increment(self.front);
361 self.full = false;
362 self.receive_waker.wake();
363 }
364}