embassy_sync/pubsub/
subscriber.rs

1//! Implementation of anything directly subscriber related
2
3use core::future::Future;
4use core::marker::PhantomData;
5use core::ops::{Deref, DerefMut};
6use core::pin::Pin;
7use core::task::{Context, Poll};
8
9use super::{PubSubBehavior, PubSubChannel, WaitResult};
10use crate::blocking_mutex::raw::RawMutex;
11
12/// A subscriber to a channel
13#[derive(Debug)]
14pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
15    /// The message id of the next message we are yet to receive
16    next_message_id: u64,
17    /// The channel we are a subscriber to
18    channel: &'a PSB,
19    _phantom: PhantomData<T>,
20}
21
22impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> {
23    pub(super) fn new(next_message_id: u64, channel: &'a PSB) -> Self {
24        Self {
25            next_message_id,
26            channel,
27            _phantom: Default::default(),
28        }
29    }
30
31    /// Wait for a published message
32    pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> {
33        SubscriberWaitFuture { subscriber: self }
34    }
35
36    /// Wait for a published message (ignoring lag results)
37    pub async fn next_message_pure(&mut self) -> T {
38        loop {
39            match self.next_message().await {
40                WaitResult::Lagged(_) => continue,
41                WaitResult::Message(message) => break message,
42            }
43        }
44    }
45
46    /// Try to see if there's a published message we haven't received yet.
47    ///
48    /// This function does not peek. The message is received if there is one.
49    pub fn try_next_message(&mut self) -> Option<WaitResult<T>> {
50        match self.channel.get_message_with_context(&mut self.next_message_id, None) {
51            Poll::Ready(result) => Some(result),
52            Poll::Pending => None,
53        }
54    }
55
56    /// Try to see if there's a published message we haven't received yet (ignoring lag results).
57    ///
58    /// This function does not peek. The message is received if there is one.
59    pub fn try_next_message_pure(&mut self) -> Option<T> {
60        loop {
61            match self.try_next_message() {
62                Some(WaitResult::Lagged(_)) => continue,
63                Some(WaitResult::Message(message)) => break Some(message),
64                None => break None,
65            }
66        }
67    }
68
69    /// The amount of messages this subscriber hasn't received yet. This is like [Self::len] but specifically
70    /// for this subscriber.
71    pub fn available(&self) -> u64 {
72        self.channel.available(self.next_message_id)
73    }
74
75    /// Returns the maximum number of elements the ***channel*** can hold.
76    pub fn capacity(&self) -> usize {
77        self.channel.capacity()
78    }
79
80    /// Returns the free capacity of the ***channel***.
81    ///
82    /// This is equivalent to `capacity() - len()`
83    pub fn free_capacity(&self) -> usize {
84        self.channel.free_capacity()
85    }
86
87    /// Clears all elements in the ***channel***.
88    pub fn clear(&self) {
89        self.channel.clear();
90    }
91
92    /// Returns the number of elements currently in the ***channel***.
93    /// See [Self::available] for how many messages are available for this subscriber.
94    pub fn len(&self) -> usize {
95        self.channel.len()
96    }
97
98    /// Returns whether the ***channel*** is empty.
99    pub fn is_empty(&self) -> bool {
100        self.channel.is_empty()
101    }
102
103    /// Returns whether the ***channel*** is full.
104    pub fn is_full(&self) -> bool {
105        self.channel.is_full()
106    }
107}
108
109impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {
110    fn drop(&mut self) {
111        self.channel.unregister_subscriber(self.next_message_id)
112    }
113}
114
115impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {}
116
117/// Warning: The stream implementation ignores lag results and returns all messages.
118/// This might miss some messages without you knowing it.
119impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures_core::Stream for Sub<'a, PSB, T> {
120    type Item = T;
121
122    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
123        match self
124            .channel
125            .get_message_with_context(&mut self.next_message_id, Some(cx))
126        {
127            Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)),
128            Poll::Ready(WaitResult::Lagged(_)) => {
129                cx.waker().wake_by_ref();
130                Poll::Pending
131            }
132            Poll::Pending => Poll::Pending,
133        }
134    }
135}
136
137/// A subscriber that holds a dynamic reference to the channel
138pub struct DynSubscriber<'a, T: Clone>(pub(super) Sub<'a, dyn PubSubBehavior<T> + 'a, T>);
139
140impl<'a, T: Clone> Deref for DynSubscriber<'a, T> {
141    type Target = Sub<'a, dyn PubSubBehavior<T> + 'a, T>;
142
143    fn deref(&self) -> &Self::Target {
144        &self.0
145    }
146}
147
148impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> {
149    fn deref_mut(&mut self) -> &mut Self::Target {
150        &mut self.0
151    }
152}
153
154/// A subscriber that holds a generic reference to the channel
155#[derive(Debug)]
156pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
157    pub(super) Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
158);
159
160impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
161    for Subscriber<'a, M, T, CAP, SUBS, PUBS>
162{
163    type Target = Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
164
165    fn deref(&self) -> &Self::Target {
166        &self.0
167    }
168}
169
170impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
171    for Subscriber<'a, M, T, CAP, SUBS, PUBS>
172{
173    fn deref_mut(&mut self) -> &mut Self::Target {
174        &mut self.0
175    }
176}
177
178/// Future for the subscriber wait action
179#[must_use = "futures do nothing unless you `.await` or poll them"]
180#[derive(Debug)]
181pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
182    subscriber: &'s mut Sub<'a, PSB, T>,
183}
184
185impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> {
186    type Output = WaitResult<T>;
187
188    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
189        self.subscriber
190            .channel
191            .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx))
192    }
193}
194
195impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {}