embassy_sync/
pipe.rs

1//! Async byte stream pipe.
2
3use core::cell::{RefCell, UnsafeCell};
4use core::convert::Infallible;
5use core::future::Future;
6use core::ops::Range;
7use core::pin::Pin;
8use core::task::{Context, Poll};
9
10use crate::blocking_mutex::raw::RawMutex;
11use crate::blocking_mutex::Mutex;
12use crate::ring_buffer::RingBuffer;
13use crate::waitqueue::WakerRegistration;
14
15/// Write-only access to a [`Pipe`].
16#[derive(Debug)]
17pub struct Writer<'p, M, const N: usize>
18where
19    M: RawMutex,
20{
21    pipe: &'p Pipe<M, N>,
22}
23
24impl<'p, M, const N: usize> Clone for Writer<'p, M, N>
25where
26    M: RawMutex,
27{
28    fn clone(&self) -> Self {
29        *self
30    }
31}
32
33impl<'p, M, const N: usize> Copy for Writer<'p, M, N> where M: RawMutex {}
34
35impl<'p, M, const N: usize> Writer<'p, M, N>
36where
37    M: RawMutex,
38{
39    /// Write some bytes to the pipe.
40    ///
41    /// See [`Pipe::write()`]
42    pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
43        self.pipe.write(buf)
44    }
45
46    /// Attempt to immediately write some bytes to the pipe.
47    ///
48    /// See [`Pipe::try_write()`]
49    pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
50        self.pipe.try_write(buf)
51    }
52}
53
54/// Future returned by [`Pipe::write`] and  [`Writer::write`].
55#[must_use = "futures do nothing unless you `.await` or poll them"]
56#[derive(Debug)]
57pub struct WriteFuture<'p, M, const N: usize>
58where
59    M: RawMutex,
60{
61    pipe: &'p Pipe<M, N>,
62    buf: &'p [u8],
63}
64
65impl<'p, M, const N: usize> Future for WriteFuture<'p, M, N>
66where
67    M: RawMutex,
68{
69    type Output = usize;
70
71    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
72        match self.pipe.try_write_with_context(Some(cx), self.buf) {
73            Ok(n) => Poll::Ready(n),
74            Err(TryWriteError::Full) => Poll::Pending,
75        }
76    }
77}
78
79impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {}
80
81/// Read-only access to a [`Pipe`].
82#[derive(Debug)]
83pub struct Reader<'p, M, const N: usize>
84where
85    M: RawMutex,
86{
87    pipe: &'p Pipe<M, N>,
88}
89
90impl<'p, M, const N: usize> Reader<'p, M, N>
91where
92    M: RawMutex,
93{
94    /// Read some bytes from the pipe.
95    ///
96    /// See [`Pipe::read()`]
97    pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
98        self.pipe.read(buf)
99    }
100
101    /// Attempt to immediately read some bytes from the pipe.
102    ///
103    /// See [`Pipe::try_read()`]
104    pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
105        self.pipe.try_read(buf)
106    }
107
108    /// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty.
109    ///
110    /// If no bytes are currently available to read, this function waits until at least one byte is available.
111    ///
112    /// If the reader is at end-of-file (EOF), an empty slice is returned.
113    pub fn fill_buf(&mut self) -> FillBufFuture<'_, M, N> {
114        FillBufFuture { pipe: Some(self.pipe) }
115    }
116
117    /// Try returning contents of the internal buffer.
118    ///
119    /// If no bytes are currently available to read, this function returns `Err(TryReadError::Empty)`.
120    ///
121    /// If the reader is at end-of-file (EOF), an empty slice is returned.
122    pub fn try_fill_buf(&mut self) -> Result<&[u8], TryReadError> {
123        unsafe { self.pipe.try_fill_buf_with_context(None) }
124    }
125
126    /// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`.
127    pub fn consume(&mut self, amt: usize) {
128        self.pipe.consume(amt)
129    }
130}
131
132/// Future returned by [`Pipe::read`] and  [`Reader::read`].
133#[must_use = "futures do nothing unless you `.await` or poll them"]
134#[derive(Debug)]
135pub struct ReadFuture<'p, M, const N: usize>
136where
137    M: RawMutex,
138{
139    pipe: &'p Pipe<M, N>,
140    buf: &'p mut [u8],
141}
142
143impl<'p, M, const N: usize> Future for ReadFuture<'p, M, N>
144where
145    M: RawMutex,
146{
147    type Output = usize;
148
149    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
150        match self.pipe.try_read_with_context(Some(cx), self.buf) {
151            Ok(n) => Poll::Ready(n),
152            Err(TryReadError::Empty) => Poll::Pending,
153        }
154    }
155}
156
157impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {}
158
159/// Future returned by [`Reader::fill_buf`].
160#[must_use = "futures do nothing unless you `.await` or poll them"]
161#[derive(Debug)]
162pub struct FillBufFuture<'p, M, const N: usize>
163where
164    M: RawMutex,
165{
166    pipe: Option<&'p Pipe<M, N>>,
167}
168
169impl<'p, M, const N: usize> Future for FillBufFuture<'p, M, N>
170where
171    M: RawMutex,
172{
173    type Output = &'p [u8];
174
175    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
176        let pipe = self.pipe.take().unwrap();
177        match unsafe { pipe.try_fill_buf_with_context(Some(cx)) } {
178            Ok(buf) => Poll::Ready(buf),
179            Err(TryReadError::Empty) => {
180                self.pipe = Some(pipe);
181                Poll::Pending
182            }
183        }
184    }
185}
186
187impl<'p, M, const N: usize> Unpin for FillBufFuture<'p, M, N> where M: RawMutex {}
188
189/// Error returned by [`try_read`](Pipe::try_read).
190#[derive(PartialEq, Eq, Clone, Copy, Debug)]
191#[cfg_attr(feature = "defmt", derive(defmt::Format))]
192pub enum TryReadError {
193    /// No data could be read from the pipe because it is currently
194    /// empty, and reading would require blocking.
195    Empty,
196}
197
198/// Error returned by [`try_write`](Pipe::try_write).
199#[derive(PartialEq, Eq, Clone, Copy, Debug)]
200#[cfg_attr(feature = "defmt", derive(defmt::Format))]
201pub enum TryWriteError {
202    /// No data could be written to the pipe because it is
203    /// currently full, and writing would require blocking.
204    Full,
205}
206
207#[derive(Debug)]
208struct PipeState<const N: usize> {
209    buffer: RingBuffer<N>,
210    read_waker: WakerRegistration,
211    write_waker: WakerRegistration,
212}
213
214#[repr(transparent)]
215#[derive(Debug)]
216struct Buffer<const N: usize>(UnsafeCell<[u8; N]>);
217
218impl<const N: usize> Buffer<N> {
219    unsafe fn get<'a>(&self, r: Range<usize>) -> &'a [u8] {
220        let p = self.0.get() as *const u8;
221        core::slice::from_raw_parts(p.add(r.start), r.end - r.start)
222    }
223
224    unsafe fn get_mut<'a>(&self, r: Range<usize>) -> &'a mut [u8] {
225        let p = self.0.get() as *mut u8;
226        core::slice::from_raw_parts_mut(p.add(r.start), r.end - r.start)
227    }
228}
229
230unsafe impl<const N: usize> Send for Buffer<N> {}
231unsafe impl<const N: usize> Sync for Buffer<N> {}
232
233/// A bounded byte-oriented pipe for communicating between asynchronous tasks
234/// with backpressure.
235///
236/// The pipe will buffer up to the provided number of bytes. Once the
237/// buffer is full, attempts to `write` new bytes will wait until buffer space is freed up.
238///
239/// All data written will become available in the same order as it was written.
240#[derive(Debug)]
241pub struct Pipe<M, const N: usize>
242where
243    M: RawMutex,
244{
245    buf: Buffer<N>,
246    inner: Mutex<M, RefCell<PipeState<N>>>,
247}
248
249impl<M, const N: usize> Pipe<M, N>
250where
251    M: RawMutex,
252{
253    /// Establish a new bounded pipe. For example, to create one with a NoopMutex:
254    ///
255    /// ```
256    /// use embassy_sync::pipe::Pipe;
257    /// use embassy_sync::blocking_mutex::raw::NoopRawMutex;
258    ///
259    /// // Declare a bounded pipe, with a buffer of 256 bytes.
260    /// let mut pipe = Pipe::<NoopRawMutex, 256>::new();
261    /// ```
262    pub const fn new() -> Self {
263        Self {
264            buf: Buffer(UnsafeCell::new([0; N])),
265            inner: Mutex::new(RefCell::new(PipeState {
266                buffer: RingBuffer::new(),
267                read_waker: WakerRegistration::new(),
268                write_waker: WakerRegistration::new(),
269            })),
270        }
271    }
272
273    fn lock<R>(&self, f: impl FnOnce(&mut PipeState<N>) -> R) -> R {
274        self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
275    }
276
277    fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
278        self.inner.lock(|rc: &RefCell<PipeState<N>>| {
279            let s = &mut *rc.borrow_mut();
280
281            if s.buffer.is_full() {
282                s.write_waker.wake();
283            }
284
285            let available = unsafe { self.buf.get(s.buffer.pop_buf()) };
286            if available.is_empty() {
287                if let Some(cx) = cx {
288                    s.read_waker.register(cx.waker());
289                }
290                return Err(TryReadError::Empty);
291            }
292
293            let n = available.len().min(buf.len());
294            buf[..n].copy_from_slice(&available[..n]);
295            s.buffer.pop(n);
296            Ok(n)
297        })
298    }
299
300    // safety: While the returned slice is alive,
301    // no `read` or `consume` methods in the pipe must be called.
302    unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError> {
303        self.inner.lock(|rc: &RefCell<PipeState<N>>| {
304            let s = &mut *rc.borrow_mut();
305
306            if s.buffer.is_full() {
307                s.write_waker.wake();
308            }
309
310            let available = unsafe { self.buf.get(s.buffer.pop_buf()) };
311            if available.is_empty() {
312                if let Some(cx) = cx {
313                    s.read_waker.register(cx.waker());
314                }
315                return Err(TryReadError::Empty);
316            }
317
318            Ok(available)
319        })
320    }
321
322    fn consume(&self, amt: usize) {
323        self.inner.lock(|rc: &RefCell<PipeState<N>>| {
324            let s = &mut *rc.borrow_mut();
325            let available = s.buffer.pop_buf();
326            assert!(amt <= available.len());
327            s.buffer.pop(amt);
328        })
329    }
330
331    fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
332        self.inner.lock(|rc: &RefCell<PipeState<N>>| {
333            let s = &mut *rc.borrow_mut();
334
335            if s.buffer.is_empty() {
336                s.read_waker.wake();
337            }
338
339            let available = unsafe { self.buf.get_mut(s.buffer.push_buf()) };
340            if available.is_empty() {
341                if let Some(cx) = cx {
342                    s.write_waker.register(cx.waker());
343                }
344                return Err(TryWriteError::Full);
345            }
346
347            let n = available.len().min(buf.len());
348            available[..n].copy_from_slice(&buf[..n]);
349            s.buffer.push(n);
350            Ok(n)
351        })
352    }
353
354    /// Split this pipe into a BufRead-capable reader and a writer.
355    ///
356    /// The reader and writer borrow the current pipe mutably, so it is not
357    /// possible to use it directly while they exist. This is needed because
358    /// implementing `BufRead` requires there is a single reader.
359    ///
360    /// The writer is cloneable, the reader is not.
361    pub fn split(&mut self) -> (Reader<'_, M, N>, Writer<'_, M, N>) {
362        (Reader { pipe: self }, Writer { pipe: self })
363    }
364
365    /// Write some bytes to the pipe.
366    ///
367    /// This method writes a nonzero amount of bytes from `buf` into the pipe, and
368    /// returns the amount of bytes written.
369    ///
370    /// If it is not possible to write a nonzero amount of bytes because the pipe's buffer is full,
371    /// this method will wait until it isn't. See [`try_write`](Self::try_write) for a variant that
372    /// returns an error instead of waiting.
373    ///
374    /// It is not guaranteed that all bytes in the buffer are written, even if there's enough
375    /// free space in the pipe buffer for all. In other words, it is possible for `write` to return
376    /// without writing all of `buf` (returning a number less than `buf.len()`) and still leave
377    /// free space in the pipe buffer. You should always `write` in a loop, or use helpers like
378    /// `write_all` from the `embedded-io` crate.
379    pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
380        WriteFuture { pipe: self, buf }
381    }
382
383    /// Write all bytes to the pipe.
384    ///
385    /// This method writes all bytes from `buf` into the pipe
386    pub async fn write_all(&self, mut buf: &[u8]) {
387        while !buf.is_empty() {
388            let n = self.write(buf).await;
389            buf = &buf[n..];
390        }
391    }
392
393    /// Attempt to immediately write some bytes to the pipe.
394    ///
395    /// This method will either write a nonzero amount of bytes to the pipe immediately,
396    /// or return an error if the pipe is empty. See [`write`](Self::write) for a variant
397    /// that waits instead of returning an error.
398    pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
399        self.try_write_with_context(None, buf)
400    }
401
402    /// Read some bytes from the pipe.
403    ///
404    /// This method reads a nonzero amount of bytes from the pipe into `buf` and
405    /// returns the amount of bytes read.
406    ///
407    /// If it is not possible to read a nonzero amount of bytes because the pipe's buffer is empty,
408    /// this method will wait until it isn't. See [`try_read`](Self::try_read) for a variant that
409    /// returns an error instead of waiting.
410    ///
411    /// It is not guaranteed that all bytes in the buffer are read, even if there's enough
412    /// space in `buf` for all. In other words, it is possible for `read` to return
413    /// without filling `buf` (returning a number less than `buf.len()`) and still leave bytes
414    /// in the pipe buffer. You should always `read` in a loop, or use helpers like
415    /// `read_exact` from the `embedded-io` crate.
416    pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
417        ReadFuture { pipe: self, buf }
418    }
419
420    /// Attempt to immediately read some bytes from the pipe.
421    ///
422    /// This method will either read a nonzero amount of bytes from the pipe immediately,
423    /// or return an error if the pipe is empty. See [`read`](Self::read) for a variant
424    /// that waits instead of returning an error.
425    pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
426        self.try_read_with_context(None, buf)
427    }
428
429    /// Clear the data in the pipe's buffer.
430    pub fn clear(&self) {
431        self.inner.lock(|rc: &RefCell<PipeState<N>>| {
432            let s = &mut *rc.borrow_mut();
433
434            s.buffer.clear();
435            s.write_waker.wake();
436        })
437    }
438
439    /// Return whether the pipe is full (no free space in the buffer)
440    pub fn is_full(&self) -> bool {
441        self.len() == N
442    }
443
444    /// Return whether the pipe is empty (no data buffered)
445    pub fn is_empty(&self) -> bool {
446        self.len() == 0
447    }
448
449    /// Total byte capacity.
450    ///
451    /// This is the same as the `N` generic param.
452    pub fn capacity(&self) -> usize {
453        N
454    }
455
456    /// Used byte capacity.
457    pub fn len(&self) -> usize {
458        self.lock(|c| c.buffer.len())
459    }
460
461    /// Free byte capacity.
462    ///
463    /// This is equivalent to `capacity() - len()`
464    pub fn free_capacity(&self) -> usize {
465        N - self.len()
466    }
467}
468
469impl<M: RawMutex, const N: usize> embedded_io_async::ErrorType for Pipe<M, N> {
470    type Error = Infallible;
471}
472
473impl<M: RawMutex, const N: usize> embedded_io_async::Read for Pipe<M, N> {
474    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
475        Ok(Pipe::read(self, buf).await)
476    }
477}
478
479impl<M: RawMutex, const N: usize> embedded_io_async::Write for Pipe<M, N> {
480    async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
481        Ok(Pipe::write(self, buf).await)
482    }
483
484    async fn flush(&mut self) -> Result<(), Self::Error> {
485        Ok(())
486    }
487}
488
489impl<M: RawMutex, const N: usize> embedded_io_async::ErrorType for &Pipe<M, N> {
490    type Error = Infallible;
491}
492
493impl<M: RawMutex, const N: usize> embedded_io_async::Read for &Pipe<M, N> {
494    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
495        Ok(Pipe::read(self, buf).await)
496    }
497}
498
499impl<M: RawMutex, const N: usize> embedded_io_async::Write for &Pipe<M, N> {
500    async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
501        Ok(Pipe::write(self, buf).await)
502    }
503
504    async fn flush(&mut self) -> Result<(), Self::Error> {
505        Ok(())
506    }
507}
508
509impl<M: RawMutex, const N: usize> embedded_io_async::ErrorType for Reader<'_, M, N> {
510    type Error = Infallible;
511}
512
513impl<M: RawMutex, const N: usize> embedded_io_async::Read for Reader<'_, M, N> {
514    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
515        Ok(Reader::read(self, buf).await)
516    }
517}
518
519impl<M: RawMutex, const N: usize> embedded_io_async::BufRead for Reader<'_, M, N> {
520    async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
521        Ok(Reader::fill_buf(self).await)
522    }
523
524    fn consume(&mut self, amt: usize) {
525        Reader::consume(self, amt)
526    }
527}
528
529impl<M: RawMutex, const N: usize> embedded_io_async::ErrorType for Writer<'_, M, N> {
530    type Error = Infallible;
531}
532
533impl<M: RawMutex, const N: usize> embedded_io_async::Write for Writer<'_, M, N> {
534    async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
535        Ok(Writer::write(self, buf).await)
536    }
537
538    async fn flush(&mut self) -> Result<(), Self::Error> {
539        Ok(())
540    }
541}
542
543//
544// Type-erased variants
545//
546
547pub(crate) trait DynamicPipe {
548    fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a>;
549    fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a>;
550
551    fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError>;
552    fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError>;
553
554    fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError>;
555    fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError>;
556
557    fn consume(&self, amt: usize);
558    unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError>;
559}
560
561impl<M, const N: usize> DynamicPipe for Pipe<M, N>
562where
563    M: RawMutex,
564{
565    fn consume(&self, amt: usize) {
566        Pipe::consume(self, amt)
567    }
568
569    unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError> {
570        Pipe::try_fill_buf_with_context(self, cx)
571    }
572
573    fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> {
574        Pipe::write(self, buf).into()
575    }
576
577    fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> {
578        Pipe::read(self, buf).into()
579    }
580
581    fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
582        Pipe::try_read(self, buf)
583    }
584
585    fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
586        Pipe::try_write(self, buf)
587    }
588
589    fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
590        Pipe::try_write_with_context(self, cx, buf)
591    }
592
593    fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
594        Pipe::try_read_with_context(self, cx, buf)
595    }
596}
597
598/// Write-only access to the dynamic pipe.
599pub struct DynamicWriter<'p> {
600    pipe: &'p dyn DynamicPipe,
601}
602
603impl<'p> Clone for DynamicWriter<'p> {
604    fn clone(&self) -> Self {
605        *self
606    }
607}
608
609impl<'p> Copy for DynamicWriter<'p> {}
610
611impl<'p> DynamicWriter<'p> {
612    /// Write some bytes to the pipe.
613    ///
614    /// See [`Pipe::write()`]
615    pub fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> {
616        self.pipe.write(buf)
617    }
618
619    /// Attempt to immediately write some bytes to the pipe.
620    ///
621    /// See [`Pipe::try_write()`]
622    pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
623        self.pipe.try_write(buf)
624    }
625}
626
627impl<'p, M, const N: usize> From<Writer<'p, M, N>> for DynamicWriter<'p>
628where
629    M: RawMutex,
630{
631    fn from(value: Writer<'p, M, N>) -> Self {
632        Self { pipe: value.pipe }
633    }
634}
635
636/// Future returned by [`DynamicWriter::write`].
637#[must_use = "futures do nothing unless you `.await` or poll them"]
638pub struct DynamicWriteFuture<'p> {
639    pipe: &'p dyn DynamicPipe,
640    buf: &'p [u8],
641}
642
643impl<'p> Future for DynamicWriteFuture<'p> {
644    type Output = usize;
645
646    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
647        match self.pipe.try_write_with_context(Some(cx), self.buf) {
648            Ok(n) => Poll::Ready(n),
649            Err(TryWriteError::Full) => Poll::Pending,
650        }
651    }
652}
653
654impl<'p> Unpin for DynamicWriteFuture<'p> {}
655
656impl<'p, M, const N: usize> From<WriteFuture<'p, M, N>> for DynamicWriteFuture<'p>
657where
658    M: RawMutex,
659{
660    fn from(value: WriteFuture<'p, M, N>) -> Self {
661        Self {
662            pipe: value.pipe,
663            buf: value.buf,
664        }
665    }
666}
667
668/// Read-only access to a dynamic pipe.
669pub struct DynamicReader<'p> {
670    pipe: &'p dyn DynamicPipe,
671}
672
673impl<'p> DynamicReader<'p> {
674    /// Read some bytes from the pipe.
675    ///
676    /// See [`Pipe::read()`]
677    pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> {
678        self.pipe.read(buf)
679    }
680
681    /// Attempt to immediately read some bytes from the pipe.
682    ///
683    /// See [`Pipe::try_read()`]
684    pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
685        self.pipe.try_read(buf)
686    }
687
688    /// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty.
689    ///
690    /// If no bytes are currently available to read, this function waits until at least one byte is available.
691    ///
692    /// If the reader is at end-of-file (EOF), an empty slice is returned.
693    pub fn fill_buf(&mut self) -> DynamicFillBufFuture<'_> {
694        DynamicFillBufFuture { pipe: Some(self.pipe) }
695    }
696
697    /// Try returning contents of the internal buffer.
698    ///
699    /// If no bytes are currently available to read, this function returns `Err(TryReadError::Empty)`.
700    ///
701    /// If the reader is at end-of-file (EOF), an empty slice is returned.
702    pub fn try_fill_buf(&mut self) -> Result<&[u8], TryReadError> {
703        unsafe { self.pipe.try_fill_buf_with_context(None) }
704    }
705
706    /// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`.
707    pub fn consume(&mut self, amt: usize) {
708        self.pipe.consume(amt)
709    }
710}
711
712impl<'p, M, const N: usize> From<Reader<'p, M, N>> for DynamicReader<'p>
713where
714    M: RawMutex,
715{
716    fn from(value: Reader<'p, M, N>) -> Self {
717        Self { pipe: value.pipe }
718    }
719}
720
721/// Future returned by [`Pipe::read`] and  [`Reader::read`].
722#[must_use = "futures do nothing unless you `.await` or poll them"]
723pub struct DynamicReadFuture<'p> {
724    pipe: &'p dyn DynamicPipe,
725    buf: &'p mut [u8],
726}
727
728impl<'p> Future for DynamicReadFuture<'p> {
729    type Output = usize;
730
731    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
732        match self.pipe.try_read_with_context(Some(cx), self.buf) {
733            Ok(n) => Poll::Ready(n),
734            Err(TryReadError::Empty) => Poll::Pending,
735        }
736    }
737}
738
739impl<'p> Unpin for DynamicReadFuture<'p> {}
740
741impl<'p, M, const N: usize> From<ReadFuture<'p, M, N>> for DynamicReadFuture<'p>
742where
743    M: RawMutex,
744{
745    fn from(value: ReadFuture<'p, M, N>) -> Self {
746        Self {
747            pipe: value.pipe,
748            buf: value.buf,
749        }
750    }
751}
752
753/// Future returned by [`DynamicReader::fill_buf`].
754#[must_use = "futures do nothing unless you `.await` or poll them"]
755pub struct DynamicFillBufFuture<'p> {
756    pipe: Option<&'p dyn DynamicPipe>,
757}
758
759impl<'p> Future for DynamicFillBufFuture<'p> {
760    type Output = &'p [u8];
761
762    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
763        let pipe = self.pipe.take().unwrap();
764        match unsafe { pipe.try_fill_buf_with_context(Some(cx)) } {
765            Ok(buf) => Poll::Ready(buf),
766            Err(TryReadError::Empty) => {
767                self.pipe = Some(pipe);
768                Poll::Pending
769            }
770        }
771    }
772}
773
774impl<'p> Unpin for DynamicFillBufFuture<'p> {}
775
776impl<'p, M, const N: usize> From<FillBufFuture<'p, M, N>> for DynamicFillBufFuture<'p>
777where
778    M: RawMutex,
779{
780    fn from(value: FillBufFuture<'p, M, N>) -> Self {
781        Self {
782            pipe: value.pipe.map(|p| p as &dyn DynamicPipe),
783        }
784    }
785}
786
787#[cfg(test)]
788mod tests {
789    use futures_executor::ThreadPool;
790    use futures_util::task::SpawnExt;
791    use static_cell::StaticCell;
792
793    use super::*;
794    use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
795
796    #[test]
797    fn writing_once() {
798        let c = Pipe::<NoopRawMutex, 3>::new();
799        assert!(c.try_write(&[1]).is_ok());
800        assert_eq!(c.free_capacity(), 2);
801    }
802
803    #[test]
804    fn writing_when_full() {
805        let c = Pipe::<NoopRawMutex, 3>::new();
806        assert_eq!(c.try_write(&[42]), Ok(1));
807        assert_eq!(c.try_write(&[43]), Ok(1));
808        assert_eq!(c.try_write(&[44]), Ok(1));
809        assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full));
810        assert_eq!(c.free_capacity(), 0);
811    }
812
813    #[test]
814    fn receiving_once_with_one_send() {
815        let c = Pipe::<NoopRawMutex, 3>::new();
816        assert!(c.try_write(&[42]).is_ok());
817        let mut buf = [0; 16];
818        assert_eq!(c.try_read(&mut buf), Ok(1));
819        assert_eq!(buf[0], 42);
820        assert_eq!(c.free_capacity(), 3);
821    }
822
823    #[test]
824    fn receiving_when_empty() {
825        let c = Pipe::<NoopRawMutex, 3>::new();
826        let mut buf = [0; 16];
827        assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty));
828        assert_eq!(c.free_capacity(), 3);
829    }
830
831    #[test]
832    fn simple_send_and_receive() {
833        let c = Pipe::<NoopRawMutex, 3>::new();
834        assert!(c.try_write(&[42]).is_ok());
835        let mut buf = [0; 16];
836        assert_eq!(c.try_read(&mut buf), Ok(1));
837        assert_eq!(buf[0], 42);
838    }
839
840    #[test]
841    fn read_buf() {
842        let mut c = Pipe::<NoopRawMutex, 3>::new();
843        let (mut r, w) = c.split();
844        assert!(w.try_write(&[42, 43]).is_ok());
845        let buf = r.try_fill_buf().unwrap();
846        assert_eq!(buf, &[42, 43]);
847        let buf = r.try_fill_buf().unwrap();
848        assert_eq!(buf, &[42, 43]);
849        r.consume(1);
850        let buf = r.try_fill_buf().unwrap();
851        assert_eq!(buf, &[43]);
852        r.consume(1);
853        assert_eq!(r.try_fill_buf(), Err(TryReadError::Empty));
854        assert_eq!(w.try_write(&[44, 45, 46]), Ok(1));
855        assert_eq!(w.try_write(&[45, 46]), Ok(2));
856        let buf = r.try_fill_buf().unwrap();
857        assert_eq!(buf, &[44]); // only one byte due to wraparound.
858        r.consume(1);
859        let buf = r.try_fill_buf().unwrap();
860        assert_eq!(buf, &[45, 46]);
861        assert!(w.try_write(&[47]).is_ok());
862        let buf = r.try_fill_buf().unwrap();
863        assert_eq!(buf, &[45, 46, 47]);
864        r.consume(3);
865    }
866
867    #[test]
868    fn writer_is_cloneable() {
869        let mut c = Pipe::<NoopRawMutex, 3>::new();
870        let (_r, w) = c.split();
871        let _ = w.clone();
872    }
873
874    #[test]
875    fn dynamic_dispatch_pipe() {
876        let mut c = Pipe::<NoopRawMutex, 3>::new();
877        let (r, w) = c.split();
878        let (mut r, w): (DynamicReader<'_>, DynamicWriter<'_>) = (r.into(), w.into());
879
880        assert!(w.try_write(&[42, 43]).is_ok());
881        let buf = r.try_fill_buf().unwrap();
882        assert_eq!(buf, &[42, 43]);
883        let buf = r.try_fill_buf().unwrap();
884        assert_eq!(buf, &[42, 43]);
885        r.consume(1);
886        let buf = r.try_fill_buf().unwrap();
887        assert_eq!(buf, &[43]);
888        r.consume(1);
889        assert_eq!(r.try_fill_buf(), Err(TryReadError::Empty));
890        assert_eq!(w.try_write(&[44, 45, 46]), Ok(1));
891        assert_eq!(w.try_write(&[45, 46]), Ok(2));
892        let buf = r.try_fill_buf().unwrap();
893        assert_eq!(buf, &[44]); // only one byte due to wraparound.
894        r.consume(1);
895        let buf = r.try_fill_buf().unwrap();
896        assert_eq!(buf, &[45, 46]);
897        assert!(w.try_write(&[47]).is_ok());
898        let buf = r.try_fill_buf().unwrap();
899        assert_eq!(buf, &[45, 46, 47]);
900        r.consume(3);
901    }
902
903    #[futures_test::test]
904    async fn receiver_receives_given_try_write_async() {
905        let executor = ThreadPool::new().unwrap();
906
907        static CHANNEL: StaticCell<Pipe<CriticalSectionRawMutex, 3>> = StaticCell::new();
908        let c = &*CHANNEL.init(Pipe::new());
909        let c2 = c;
910        let f = async move {
911            assert_eq!(c2.try_write(&[42]), Ok(1));
912        };
913        executor.spawn(f).unwrap();
914        let mut buf = [0; 16];
915        assert_eq!(c.read(&mut buf).await, 1);
916        assert_eq!(buf[0], 42);
917    }
918
919    #[futures_test::test]
920    async fn sender_send_completes_if_capacity() {
921        let c = Pipe::<CriticalSectionRawMutex, 1>::new();
922        c.write(&[42]).await;
923        let mut buf = [0; 16];
924        assert_eq!(c.read(&mut buf).await, 1);
925        assert_eq!(buf[0], 42);
926    }
927}