1use core::cell::{RefCell, UnsafeCell};
4use core::convert::Infallible;
5use core::future::Future;
6use core::ops::Range;
7use core::pin::Pin;
8use core::task::{Context, Poll};
9
10use crate::blocking_mutex::raw::RawMutex;
11use crate::blocking_mutex::Mutex;
12use crate::ring_buffer::RingBuffer;
13use crate::waitqueue::WakerRegistration;
14
15#[derive(Debug)]
17pub struct Writer<'p, M, const N: usize>
18where
19 M: RawMutex,
20{
21 pipe: &'p Pipe<M, N>,
22}
23
24impl<'p, M, const N: usize> Clone for Writer<'p, M, N>
25where
26 M: RawMutex,
27{
28 fn clone(&self) -> Self {
29 *self
30 }
31}
32
33impl<'p, M, const N: usize> Copy for Writer<'p, M, N> where M: RawMutex {}
34
35impl<'p, M, const N: usize> Writer<'p, M, N>
36where
37 M: RawMutex,
38{
39 pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
43 self.pipe.write(buf)
44 }
45
46 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
50 self.pipe.try_write(buf)
51 }
52}
53
54#[must_use = "futures do nothing unless you `.await` or poll them"]
56#[derive(Debug)]
57pub struct WriteFuture<'p, M, const N: usize>
58where
59 M: RawMutex,
60{
61 pipe: &'p Pipe<M, N>,
62 buf: &'p [u8],
63}
64
65impl<'p, M, const N: usize> Future for WriteFuture<'p, M, N>
66where
67 M: RawMutex,
68{
69 type Output = usize;
70
71 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
72 match self.pipe.try_write_with_context(Some(cx), self.buf) {
73 Ok(n) => Poll::Ready(n),
74 Err(TryWriteError::Full) => Poll::Pending,
75 }
76 }
77}
78
79impl<'p, M, const N: usize> Unpin for WriteFuture<'p, M, N> where M: RawMutex {}
80
81#[derive(Debug)]
83pub struct Reader<'p, M, const N: usize>
84where
85 M: RawMutex,
86{
87 pipe: &'p Pipe<M, N>,
88}
89
90impl<'p, M, const N: usize> Reader<'p, M, N>
91where
92 M: RawMutex,
93{
94 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
98 self.pipe.read(buf)
99 }
100
101 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
105 self.pipe.try_read(buf)
106 }
107
108 pub fn fill_buf(&mut self) -> FillBufFuture<'_, M, N> {
114 FillBufFuture { pipe: Some(self.pipe) }
115 }
116
117 pub fn try_fill_buf(&mut self) -> Result<&[u8], TryReadError> {
123 unsafe { self.pipe.try_fill_buf_with_context(None) }
124 }
125
126 pub fn consume(&mut self, amt: usize) {
128 self.pipe.consume(amt)
129 }
130}
131
132#[must_use = "futures do nothing unless you `.await` or poll them"]
134#[derive(Debug)]
135pub struct ReadFuture<'p, M, const N: usize>
136where
137 M: RawMutex,
138{
139 pipe: &'p Pipe<M, N>,
140 buf: &'p mut [u8],
141}
142
143impl<'p, M, const N: usize> Future for ReadFuture<'p, M, N>
144where
145 M: RawMutex,
146{
147 type Output = usize;
148
149 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
150 match self.pipe.try_read_with_context(Some(cx), self.buf) {
151 Ok(n) => Poll::Ready(n),
152 Err(TryReadError::Empty) => Poll::Pending,
153 }
154 }
155}
156
157impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {}
158
159#[must_use = "futures do nothing unless you `.await` or poll them"]
161#[derive(Debug)]
162pub struct FillBufFuture<'p, M, const N: usize>
163where
164 M: RawMutex,
165{
166 pipe: Option<&'p Pipe<M, N>>,
167}
168
169impl<'p, M, const N: usize> Future for FillBufFuture<'p, M, N>
170where
171 M: RawMutex,
172{
173 type Output = &'p [u8];
174
175 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
176 let pipe = self.pipe.take().unwrap();
177 match unsafe { pipe.try_fill_buf_with_context(Some(cx)) } {
178 Ok(buf) => Poll::Ready(buf),
179 Err(TryReadError::Empty) => {
180 self.pipe = Some(pipe);
181 Poll::Pending
182 }
183 }
184 }
185}
186
187impl<'p, M, const N: usize> Unpin for FillBufFuture<'p, M, N> where M: RawMutex {}
188
189#[derive(PartialEq, Eq, Clone, Copy, Debug)]
191#[cfg_attr(feature = "defmt", derive(defmt::Format))]
192pub enum TryReadError {
193 Empty,
196}
197
198#[derive(PartialEq, Eq, Clone, Copy, Debug)]
200#[cfg_attr(feature = "defmt", derive(defmt::Format))]
201pub enum TryWriteError {
202 Full,
205}
206
207#[derive(Debug)]
208struct PipeState<const N: usize> {
209 buffer: RingBuffer<N>,
210 read_waker: WakerRegistration,
211 write_waker: WakerRegistration,
212}
213
214#[repr(transparent)]
215#[derive(Debug)]
216struct Buffer<const N: usize>(UnsafeCell<[u8; N]>);
217
218impl<const N: usize> Buffer<N> {
219 unsafe fn get<'a>(&self, r: Range<usize>) -> &'a [u8] {
220 let p = self.0.get() as *const u8;
221 core::slice::from_raw_parts(p.add(r.start), r.end - r.start)
222 }
223
224 unsafe fn get_mut<'a>(&self, r: Range<usize>) -> &'a mut [u8] {
225 let p = self.0.get() as *mut u8;
226 core::slice::from_raw_parts_mut(p.add(r.start), r.end - r.start)
227 }
228}
229
230unsafe impl<const N: usize> Send for Buffer<N> {}
231unsafe impl<const N: usize> Sync for Buffer<N> {}
232
233#[derive(Debug)]
241pub struct Pipe<M, const N: usize>
242where
243 M: RawMutex,
244{
245 buf: Buffer<N>,
246 inner: Mutex<M, RefCell<PipeState<N>>>,
247}
248
249impl<M, const N: usize> Pipe<M, N>
250where
251 M: RawMutex,
252{
253 pub const fn new() -> Self {
263 Self {
264 buf: Buffer(UnsafeCell::new([0; N])),
265 inner: Mutex::new(RefCell::new(PipeState {
266 buffer: RingBuffer::new(),
267 read_waker: WakerRegistration::new(),
268 write_waker: WakerRegistration::new(),
269 })),
270 }
271 }
272
273 fn lock<R>(&self, f: impl FnOnce(&mut PipeState<N>) -> R) -> R {
274 self.inner.lock(|rc| f(&mut *rc.borrow_mut()))
275 }
276
277 fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
278 self.inner.lock(|rc: &RefCell<PipeState<N>>| {
279 let s = &mut *rc.borrow_mut();
280
281 if s.buffer.is_full() {
282 s.write_waker.wake();
283 }
284
285 let available = unsafe { self.buf.get(s.buffer.pop_buf()) };
286 if available.is_empty() {
287 if let Some(cx) = cx {
288 s.read_waker.register(cx.waker());
289 }
290 return Err(TryReadError::Empty);
291 }
292
293 let n = available.len().min(buf.len());
294 buf[..n].copy_from_slice(&available[..n]);
295 s.buffer.pop(n);
296 Ok(n)
297 })
298 }
299
300 unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError> {
303 self.inner.lock(|rc: &RefCell<PipeState<N>>| {
304 let s = &mut *rc.borrow_mut();
305
306 if s.buffer.is_full() {
307 s.write_waker.wake();
308 }
309
310 let available = unsafe { self.buf.get(s.buffer.pop_buf()) };
311 if available.is_empty() {
312 if let Some(cx) = cx {
313 s.read_waker.register(cx.waker());
314 }
315 return Err(TryReadError::Empty);
316 }
317
318 Ok(available)
319 })
320 }
321
322 fn consume(&self, amt: usize) {
323 self.inner.lock(|rc: &RefCell<PipeState<N>>| {
324 let s = &mut *rc.borrow_mut();
325 let available = s.buffer.pop_buf();
326 assert!(amt <= available.len());
327 s.buffer.pop(amt);
328 })
329 }
330
331 fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
332 self.inner.lock(|rc: &RefCell<PipeState<N>>| {
333 let s = &mut *rc.borrow_mut();
334
335 if s.buffer.is_empty() {
336 s.read_waker.wake();
337 }
338
339 let available = unsafe { self.buf.get_mut(s.buffer.push_buf()) };
340 if available.is_empty() {
341 if let Some(cx) = cx {
342 s.write_waker.register(cx.waker());
343 }
344 return Err(TryWriteError::Full);
345 }
346
347 let n = available.len().min(buf.len());
348 available[..n].copy_from_slice(&buf[..n]);
349 s.buffer.push(n);
350 Ok(n)
351 })
352 }
353
354 pub fn split(&mut self) -> (Reader<'_, M, N>, Writer<'_, M, N>) {
362 (Reader { pipe: self }, Writer { pipe: self })
363 }
364
365 pub fn write<'a>(&'a self, buf: &'a [u8]) -> WriteFuture<'a, M, N> {
380 WriteFuture { pipe: self, buf }
381 }
382
383 pub async fn write_all(&self, mut buf: &[u8]) {
387 while !buf.is_empty() {
388 let n = self.write(buf).await;
389 buf = &buf[n..];
390 }
391 }
392
393 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
399 self.try_write_with_context(None, buf)
400 }
401
402 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> ReadFuture<'a, M, N> {
417 ReadFuture { pipe: self, buf }
418 }
419
420 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
426 self.try_read_with_context(None, buf)
427 }
428
429 pub fn clear(&self) {
431 self.inner.lock(|rc: &RefCell<PipeState<N>>| {
432 let s = &mut *rc.borrow_mut();
433
434 s.buffer.clear();
435 s.write_waker.wake();
436 })
437 }
438
439 pub fn is_full(&self) -> bool {
441 self.len() == N
442 }
443
444 pub fn is_empty(&self) -> bool {
446 self.len() == 0
447 }
448
449 pub fn capacity(&self) -> usize {
453 N
454 }
455
456 pub fn len(&self) -> usize {
458 self.lock(|c| c.buffer.len())
459 }
460
461 pub fn free_capacity(&self) -> usize {
465 N - self.len()
466 }
467}
468
469impl<M: RawMutex, const N: usize> embedded_io_async::ErrorType for Pipe<M, N> {
470 type Error = Infallible;
471}
472
473impl<M: RawMutex, const N: usize> embedded_io_async::Read for Pipe<M, N> {
474 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
475 Ok(Pipe::read(self, buf).await)
476 }
477}
478
479impl<M: RawMutex, const N: usize> embedded_io_async::Write for Pipe<M, N> {
480 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
481 Ok(Pipe::write(self, buf).await)
482 }
483
484 async fn flush(&mut self) -> Result<(), Self::Error> {
485 Ok(())
486 }
487}
488
489impl<M: RawMutex, const N: usize> embedded_io_async::ErrorType for &Pipe<M, N> {
490 type Error = Infallible;
491}
492
493impl<M: RawMutex, const N: usize> embedded_io_async::Read for &Pipe<M, N> {
494 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
495 Ok(Pipe::read(self, buf).await)
496 }
497}
498
499impl<M: RawMutex, const N: usize> embedded_io_async::Write for &Pipe<M, N> {
500 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
501 Ok(Pipe::write(self, buf).await)
502 }
503
504 async fn flush(&mut self) -> Result<(), Self::Error> {
505 Ok(())
506 }
507}
508
509impl<M: RawMutex, const N: usize> embedded_io_async::ErrorType for Reader<'_, M, N> {
510 type Error = Infallible;
511}
512
513impl<M: RawMutex, const N: usize> embedded_io_async::Read for Reader<'_, M, N> {
514 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
515 Ok(Reader::read(self, buf).await)
516 }
517}
518
519impl<M: RawMutex, const N: usize> embedded_io_async::BufRead for Reader<'_, M, N> {
520 async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
521 Ok(Reader::fill_buf(self).await)
522 }
523
524 fn consume(&mut self, amt: usize) {
525 Reader::consume(self, amt)
526 }
527}
528
529impl<M: RawMutex, const N: usize> embedded_io_async::ErrorType for Writer<'_, M, N> {
530 type Error = Infallible;
531}
532
533impl<M: RawMutex, const N: usize> embedded_io_async::Write for Writer<'_, M, N> {
534 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
535 Ok(Writer::write(self, buf).await)
536 }
537
538 async fn flush(&mut self) -> Result<(), Self::Error> {
539 Ok(())
540 }
541}
542
543pub(crate) trait DynamicPipe {
548 fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a>;
549 fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a>;
550
551 fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError>;
552 fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError>;
553
554 fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError>;
555 fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError>;
556
557 fn consume(&self, amt: usize);
558 unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError>;
559}
560
561impl<M, const N: usize> DynamicPipe for Pipe<M, N>
562where
563 M: RawMutex,
564{
565 fn consume(&self, amt: usize) {
566 Pipe::consume(self, amt)
567 }
568
569 unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError> {
570 Pipe::try_fill_buf_with_context(self, cx)
571 }
572
573 fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> {
574 Pipe::write(self, buf).into()
575 }
576
577 fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> {
578 Pipe::read(self, buf).into()
579 }
580
581 fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
582 Pipe::try_read(self, buf)
583 }
584
585 fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
586 Pipe::try_write(self, buf)
587 }
588
589 fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
590 Pipe::try_write_with_context(self, cx, buf)
591 }
592
593 fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
594 Pipe::try_read_with_context(self, cx, buf)
595 }
596}
597
598pub struct DynamicWriter<'p> {
600 pipe: &'p dyn DynamicPipe,
601}
602
603impl<'p> Clone for DynamicWriter<'p> {
604 fn clone(&self) -> Self {
605 *self
606 }
607}
608
609impl<'p> Copy for DynamicWriter<'p> {}
610
611impl<'p> DynamicWriter<'p> {
612 pub fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> {
616 self.pipe.write(buf)
617 }
618
619 pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
623 self.pipe.try_write(buf)
624 }
625}
626
627impl<'p, M, const N: usize> From<Writer<'p, M, N>> for DynamicWriter<'p>
628where
629 M: RawMutex,
630{
631 fn from(value: Writer<'p, M, N>) -> Self {
632 Self { pipe: value.pipe }
633 }
634}
635
636#[must_use = "futures do nothing unless you `.await` or poll them"]
638pub struct DynamicWriteFuture<'p> {
639 pipe: &'p dyn DynamicPipe,
640 buf: &'p [u8],
641}
642
643impl<'p> Future for DynamicWriteFuture<'p> {
644 type Output = usize;
645
646 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
647 match self.pipe.try_write_with_context(Some(cx), self.buf) {
648 Ok(n) => Poll::Ready(n),
649 Err(TryWriteError::Full) => Poll::Pending,
650 }
651 }
652}
653
654impl<'p> Unpin for DynamicWriteFuture<'p> {}
655
656impl<'p, M, const N: usize> From<WriteFuture<'p, M, N>> for DynamicWriteFuture<'p>
657where
658 M: RawMutex,
659{
660 fn from(value: WriteFuture<'p, M, N>) -> Self {
661 Self {
662 pipe: value.pipe,
663 buf: value.buf,
664 }
665 }
666}
667
668pub struct DynamicReader<'p> {
670 pipe: &'p dyn DynamicPipe,
671}
672
673impl<'p> DynamicReader<'p> {
674 pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> {
678 self.pipe.read(buf)
679 }
680
681 pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
685 self.pipe.try_read(buf)
686 }
687
688 pub fn fill_buf(&mut self) -> DynamicFillBufFuture<'_> {
694 DynamicFillBufFuture { pipe: Some(self.pipe) }
695 }
696
697 pub fn try_fill_buf(&mut self) -> Result<&[u8], TryReadError> {
703 unsafe { self.pipe.try_fill_buf_with_context(None) }
704 }
705
706 pub fn consume(&mut self, amt: usize) {
708 self.pipe.consume(amt)
709 }
710}
711
712impl<'p, M, const N: usize> From<Reader<'p, M, N>> for DynamicReader<'p>
713where
714 M: RawMutex,
715{
716 fn from(value: Reader<'p, M, N>) -> Self {
717 Self { pipe: value.pipe }
718 }
719}
720
721#[must_use = "futures do nothing unless you `.await` or poll them"]
723pub struct DynamicReadFuture<'p> {
724 pipe: &'p dyn DynamicPipe,
725 buf: &'p mut [u8],
726}
727
728impl<'p> Future for DynamicReadFuture<'p> {
729 type Output = usize;
730
731 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
732 match self.pipe.try_read_with_context(Some(cx), self.buf) {
733 Ok(n) => Poll::Ready(n),
734 Err(TryReadError::Empty) => Poll::Pending,
735 }
736 }
737}
738
739impl<'p> Unpin for DynamicReadFuture<'p> {}
740
741impl<'p, M, const N: usize> From<ReadFuture<'p, M, N>> for DynamicReadFuture<'p>
742where
743 M: RawMutex,
744{
745 fn from(value: ReadFuture<'p, M, N>) -> Self {
746 Self {
747 pipe: value.pipe,
748 buf: value.buf,
749 }
750 }
751}
752
753#[must_use = "futures do nothing unless you `.await` or poll them"]
755pub struct DynamicFillBufFuture<'p> {
756 pipe: Option<&'p dyn DynamicPipe>,
757}
758
759impl<'p> Future for DynamicFillBufFuture<'p> {
760 type Output = &'p [u8];
761
762 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
763 let pipe = self.pipe.take().unwrap();
764 match unsafe { pipe.try_fill_buf_with_context(Some(cx)) } {
765 Ok(buf) => Poll::Ready(buf),
766 Err(TryReadError::Empty) => {
767 self.pipe = Some(pipe);
768 Poll::Pending
769 }
770 }
771 }
772}
773
774impl<'p> Unpin for DynamicFillBufFuture<'p> {}
775
776impl<'p, M, const N: usize> From<FillBufFuture<'p, M, N>> for DynamicFillBufFuture<'p>
777where
778 M: RawMutex,
779{
780 fn from(value: FillBufFuture<'p, M, N>) -> Self {
781 Self {
782 pipe: value.pipe.map(|p| p as &dyn DynamicPipe),
783 }
784 }
785}
786
787#[cfg(test)]
788mod tests {
789 use futures_executor::ThreadPool;
790 use futures_util::task::SpawnExt;
791 use static_cell::StaticCell;
792
793 use super::*;
794 use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex};
795
796 #[test]
797 fn writing_once() {
798 let c = Pipe::<NoopRawMutex, 3>::new();
799 assert!(c.try_write(&[1]).is_ok());
800 assert_eq!(c.free_capacity(), 2);
801 }
802
803 #[test]
804 fn writing_when_full() {
805 let c = Pipe::<NoopRawMutex, 3>::new();
806 assert_eq!(c.try_write(&[42]), Ok(1));
807 assert_eq!(c.try_write(&[43]), Ok(1));
808 assert_eq!(c.try_write(&[44]), Ok(1));
809 assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full));
810 assert_eq!(c.free_capacity(), 0);
811 }
812
813 #[test]
814 fn receiving_once_with_one_send() {
815 let c = Pipe::<NoopRawMutex, 3>::new();
816 assert!(c.try_write(&[42]).is_ok());
817 let mut buf = [0; 16];
818 assert_eq!(c.try_read(&mut buf), Ok(1));
819 assert_eq!(buf[0], 42);
820 assert_eq!(c.free_capacity(), 3);
821 }
822
823 #[test]
824 fn receiving_when_empty() {
825 let c = Pipe::<NoopRawMutex, 3>::new();
826 let mut buf = [0; 16];
827 assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty));
828 assert_eq!(c.free_capacity(), 3);
829 }
830
831 #[test]
832 fn simple_send_and_receive() {
833 let c = Pipe::<NoopRawMutex, 3>::new();
834 assert!(c.try_write(&[42]).is_ok());
835 let mut buf = [0; 16];
836 assert_eq!(c.try_read(&mut buf), Ok(1));
837 assert_eq!(buf[0], 42);
838 }
839
840 #[test]
841 fn read_buf() {
842 let mut c = Pipe::<NoopRawMutex, 3>::new();
843 let (mut r, w) = c.split();
844 assert!(w.try_write(&[42, 43]).is_ok());
845 let buf = r.try_fill_buf().unwrap();
846 assert_eq!(buf, &[42, 43]);
847 let buf = r.try_fill_buf().unwrap();
848 assert_eq!(buf, &[42, 43]);
849 r.consume(1);
850 let buf = r.try_fill_buf().unwrap();
851 assert_eq!(buf, &[43]);
852 r.consume(1);
853 assert_eq!(r.try_fill_buf(), Err(TryReadError::Empty));
854 assert_eq!(w.try_write(&[44, 45, 46]), Ok(1));
855 assert_eq!(w.try_write(&[45, 46]), Ok(2));
856 let buf = r.try_fill_buf().unwrap();
857 assert_eq!(buf, &[44]); r.consume(1);
859 let buf = r.try_fill_buf().unwrap();
860 assert_eq!(buf, &[45, 46]);
861 assert!(w.try_write(&[47]).is_ok());
862 let buf = r.try_fill_buf().unwrap();
863 assert_eq!(buf, &[45, 46, 47]);
864 r.consume(3);
865 }
866
867 #[test]
868 fn writer_is_cloneable() {
869 let mut c = Pipe::<NoopRawMutex, 3>::new();
870 let (_r, w) = c.split();
871 let _ = w.clone();
872 }
873
874 #[test]
875 fn dynamic_dispatch_pipe() {
876 let mut c = Pipe::<NoopRawMutex, 3>::new();
877 let (r, w) = c.split();
878 let (mut r, w): (DynamicReader<'_>, DynamicWriter<'_>) = (r.into(), w.into());
879
880 assert!(w.try_write(&[42, 43]).is_ok());
881 let buf = r.try_fill_buf().unwrap();
882 assert_eq!(buf, &[42, 43]);
883 let buf = r.try_fill_buf().unwrap();
884 assert_eq!(buf, &[42, 43]);
885 r.consume(1);
886 let buf = r.try_fill_buf().unwrap();
887 assert_eq!(buf, &[43]);
888 r.consume(1);
889 assert_eq!(r.try_fill_buf(), Err(TryReadError::Empty));
890 assert_eq!(w.try_write(&[44, 45, 46]), Ok(1));
891 assert_eq!(w.try_write(&[45, 46]), Ok(2));
892 let buf = r.try_fill_buf().unwrap();
893 assert_eq!(buf, &[44]); r.consume(1);
895 let buf = r.try_fill_buf().unwrap();
896 assert_eq!(buf, &[45, 46]);
897 assert!(w.try_write(&[47]).is_ok());
898 let buf = r.try_fill_buf().unwrap();
899 assert_eq!(buf, &[45, 46, 47]);
900 r.consume(3);
901 }
902
903 #[futures_test::test]
904 async fn receiver_receives_given_try_write_async() {
905 let executor = ThreadPool::new().unwrap();
906
907 static CHANNEL: StaticCell<Pipe<CriticalSectionRawMutex, 3>> = StaticCell::new();
908 let c = &*CHANNEL.init(Pipe::new());
909 let c2 = c;
910 let f = async move {
911 assert_eq!(c2.try_write(&[42]), Ok(1));
912 };
913 executor.spawn(f).unwrap();
914 let mut buf = [0; 16];
915 assert_eq!(c.read(&mut buf).await, 1);
916 assert_eq!(buf[0], 42);
917 }
918
919 #[futures_test::test]
920 async fn sender_send_completes_if_capacity() {
921 let c = Pipe::<CriticalSectionRawMutex, 1>::new();
922 c.write(&[42]).await;
923 let mut buf = [0; 16];
924 assert_eq!(c.read(&mut buf).await, 1);
925 assert_eq!(buf[0], 42);
926 }
927}