embassy_sync/pubsub/
publisher.rs

1//! Implementation of anything directly publisher related
2
3use core::future::Future;
4use core::marker::PhantomData;
5use core::ops::{Deref, DerefMut};
6use core::pin::Pin;
7use core::task::{Context, Poll};
8
9use super::{PubSubBehavior, PubSubChannel};
10use crate::blocking_mutex::raw::RawMutex;
11
12/// A publisher to a channel
13#[derive(Debug)]
14pub struct Pub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
15    /// The channel we are a publisher for
16    channel: &'a PSB,
17    _phantom: PhantomData<T>,
18}
19
20impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
21    pub(super) fn new(channel: &'a PSB) -> Self {
22        Self {
23            channel,
24            _phantom: Default::default(),
25        }
26    }
27
28    /// Publish a message right now even when the queue is full.
29    /// This may cause a subscriber to miss an older message.
30    pub fn publish_immediate(&self, message: T) {
31        self.channel.publish_immediate(message)
32    }
33
34    /// Publish a message. But if the message queue is full, wait for all subscribers to have read the last message
35    pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> {
36        PublisherWaitFuture {
37            message: Some(message),
38            publisher: self,
39        }
40    }
41
42    /// Publish a message if there is space in the message queue
43    pub fn try_publish(&self, message: T) -> Result<(), T> {
44        self.channel.publish_with_context(message, None)
45    }
46
47    /// Returns the maximum number of elements the ***channel*** can hold.
48    pub fn capacity(&self) -> usize {
49        self.channel.capacity()
50    }
51
52    /// Returns the free capacity of the ***channel***.
53    ///
54    /// This is equivalent to `capacity() - len()`
55    pub fn free_capacity(&self) -> usize {
56        self.channel.free_capacity()
57    }
58
59    /// Clears all elements in the ***channel***.
60    pub fn clear(&self) {
61        self.channel.clear();
62    }
63
64    /// Returns the number of elements currently in the ***channel***.
65    pub fn len(&self) -> usize {
66        self.channel.len()
67    }
68
69    /// Returns whether the ***channel*** is empty.
70    pub fn is_empty(&self) -> bool {
71        self.channel.is_empty()
72    }
73
74    /// Returns whether the ***channel*** is full.
75    pub fn is_full(&self) -> bool {
76        self.channel.is_full()
77    }
78
79    /// Create a [`futures_sink::Sink`] adapter for this publisher.
80    #[inline]
81    pub const fn sink(&self) -> PubSink<'a, '_, PSB, T> {
82        PubSink { publ: self, fut: None }
83    }
84}
85
86impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> {
87    fn drop(&mut self) {
88        self.channel.unregister_publisher()
89    }
90}
91
92/// A publisher that holds a dynamic reference to the channel
93pub struct DynPublisher<'a, T: Clone>(pub(super) Pub<'a, dyn PubSubBehavior<T> + 'a, T>);
94
95impl<'a, T: Clone> Deref for DynPublisher<'a, T> {
96    type Target = Pub<'a, dyn PubSubBehavior<T> + 'a, T>;
97
98    fn deref(&self) -> &Self::Target {
99        &self.0
100    }
101}
102
103impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> {
104    fn deref_mut(&mut self) -> &mut Self::Target {
105        &mut self.0
106    }
107}
108
109/// A publisher that holds a generic reference to the channel
110#[derive(Debug)]
111pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
112    pub(super) Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
113);
114
115impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
116    for Publisher<'a, M, T, CAP, SUBS, PUBS>
117{
118    type Target = Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
119
120    fn deref(&self) -> &Self::Target {
121        &self.0
122    }
123}
124
125impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
126    for Publisher<'a, M, T, CAP, SUBS, PUBS>
127{
128    fn deref_mut(&mut self) -> &mut Self::Target {
129        &mut self.0
130    }
131}
132
133/// A publisher that can only use the `publish_immediate` function, but it doesn't have to be registered with the channel.
134/// (So an infinite amount is possible)
135#[derive(Debug)]
136pub struct ImmediatePub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
137    /// The channel we are a publisher for
138    channel: &'a PSB,
139    _phantom: PhantomData<T>,
140}
141
142impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> {
143    pub(super) fn new(channel: &'a PSB) -> Self {
144        Self {
145            channel,
146            _phantom: Default::default(),
147        }
148    }
149    /// Publish the message right now even when the queue is full.
150    /// This may cause a subscriber to miss an older message.
151    pub fn publish_immediate(&self, message: T) {
152        self.channel.publish_immediate(message)
153    }
154
155    /// Publish a message if there is space in the message queue
156    pub fn try_publish(&self, message: T) -> Result<(), T> {
157        self.channel.publish_with_context(message, None)
158    }
159
160    /// Returns the maximum number of elements the ***channel*** can hold.
161    pub fn capacity(&self) -> usize {
162        self.channel.capacity()
163    }
164
165    /// Returns the free capacity of the ***channel***.
166    ///
167    /// This is equivalent to `capacity() - len()`
168    pub fn free_capacity(&self) -> usize {
169        self.channel.free_capacity()
170    }
171
172    /// Clears all elements in the ***channel***.
173    pub fn clear(&self) {
174        self.channel.clear();
175    }
176
177    /// Returns the number of elements currently in the ***channel***.
178    pub fn len(&self) -> usize {
179        self.channel.len()
180    }
181
182    /// Returns whether the ***channel*** is empty.
183    pub fn is_empty(&self) -> bool {
184        self.channel.is_empty()
185    }
186
187    /// Returns whether the ***channel*** is full.
188    pub fn is_full(&self) -> bool {
189        self.channel.is_full()
190    }
191}
192
193/// An immediate publisher that holds a dynamic reference to the channel
194pub struct DynImmediatePublisher<'a, T: Clone>(pub(super) ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>);
195
196impl<'a, T: Clone> Deref for DynImmediatePublisher<'a, T> {
197    type Target = ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>;
198
199    fn deref(&self) -> &Self::Target {
200        &self.0
201    }
202}
203
204impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> {
205    fn deref_mut(&mut self) -> &mut Self::Target {
206        &mut self.0
207    }
208}
209
210/// An immediate publisher that holds a generic reference to the channel
211#[derive(Debug)]
212pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
213    pub(super) ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
214);
215
216impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
217    for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS>
218{
219    type Target = ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
220
221    fn deref(&self) -> &Self::Target {
222        &self.0
223    }
224}
225
226impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
227    for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS>
228{
229    fn deref_mut(&mut self) -> &mut Self::Target {
230        &mut self.0
231    }
232}
233
234#[must_use = "Sinks do nothing unless polled"]
235/// [`futures_sink::Sink`] adapter for [`Pub`].
236#[derive(Debug)]
237pub struct PubSink<'a, 'p, PSB, T>
238where
239    T: Clone,
240    PSB: PubSubBehavior<T> + ?Sized,
241{
242    publ: &'p Pub<'a, PSB, T>,
243    fut: Option<PublisherWaitFuture<'p, 'a, PSB, T>>,
244}
245
246impl<'a, 'p, PSB, T> PubSink<'a, 'p, PSB, T>
247where
248    PSB: PubSubBehavior<T> + ?Sized,
249    T: Clone,
250{
251    /// Try to make progress on the pending future if we have one.
252    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
253        let Some(mut fut) = self.fut.take() else {
254            return Poll::Ready(());
255        };
256
257        if Pin::new(&mut fut).poll(cx).is_pending() {
258            self.fut = Some(fut);
259            return Poll::Pending;
260        }
261
262        Poll::Ready(())
263    }
264}
265
266impl<'a, 'p, PSB, T> futures_sink::Sink<T> for PubSink<'a, 'p, PSB, T>
267where
268    PSB: PubSubBehavior<T> + ?Sized,
269    T: Clone,
270{
271    type Error = core::convert::Infallible;
272
273    #[inline]
274    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
275        self.poll(cx).map(Ok)
276    }
277
278    #[inline]
279    fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
280        self.fut = Some(self.publ.publish(item));
281
282        Ok(())
283    }
284
285    #[inline]
286    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
287        self.poll(cx).map(Ok)
288    }
289
290    #[inline]
291    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
292        self.poll(cx).map(Ok)
293    }
294}
295
296/// Future for the publisher wait action
297#[must_use = "futures do nothing unless you `.await` or poll them"]
298#[derive(Debug)]
299pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
300    /// The message we need to publish
301    message: Option<T>,
302    publisher: &'s Pub<'a, PSB, T>,
303}
304
305impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for PublisherWaitFuture<'s, 'a, PSB, T> {
306    type Output = ();
307
308    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
309        let message = self.message.take().unwrap();
310        match self.publisher.channel.publish_with_context(message, Some(cx)) {
311            Ok(()) => Poll::Ready(()),
312            Err(message) => {
313                self.message = Some(message);
314                Poll::Pending
315            }
316        }
317    }
318}
319
320impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, PSB, T> {}