1use core::future::Future;
4use core::marker::PhantomData;
5use core::ops::{Deref, DerefMut};
6use core::pin::Pin;
7use core::task::{Context, Poll};
8
9use super::{PubSubBehavior, PubSubChannel};
10use crate::blocking_mutex::raw::RawMutex;
11
12#[derive(Debug)]
14pub struct Pub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
15 channel: &'a PSB,
17 _phantom: PhantomData<T>,
18}
19
20impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Pub<'a, PSB, T> {
21 pub(super) fn new(channel: &'a PSB) -> Self {
22 Self {
23 channel,
24 _phantom: Default::default(),
25 }
26 }
27
28 pub fn publish_immediate(&self, message: T) {
31 self.channel.publish_immediate(message)
32 }
33
34 pub fn publish<'s>(&'s self, message: T) -> PublisherWaitFuture<'s, 'a, PSB, T> {
36 PublisherWaitFuture {
37 message: Some(message),
38 publisher: self,
39 }
40 }
41
42 pub fn try_publish(&self, message: T) -> Result<(), T> {
44 self.channel.publish_with_context(message, None)
45 }
46
47 pub fn capacity(&self) -> usize {
49 self.channel.capacity()
50 }
51
52 pub fn free_capacity(&self) -> usize {
56 self.channel.free_capacity()
57 }
58
59 pub fn clear(&self) {
61 self.channel.clear();
62 }
63
64 pub fn len(&self) -> usize {
66 self.channel.len()
67 }
68
69 pub fn is_empty(&self) -> bool {
71 self.channel.is_empty()
72 }
73
74 pub fn is_full(&self) -> bool {
76 self.channel.is_full()
77 }
78
79 #[inline]
81 pub const fn sink(&self) -> PubSink<'a, '_, PSB, T> {
82 PubSink { publ: self, fut: None }
83 }
84}
85
86impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Pub<'a, PSB, T> {
87 fn drop(&mut self) {
88 self.channel.unregister_publisher()
89 }
90}
91
92pub struct DynPublisher<'a, T: Clone>(pub(super) Pub<'a, dyn PubSubBehavior<T> + 'a, T>);
94
95impl<'a, T: Clone> Deref for DynPublisher<'a, T> {
96 type Target = Pub<'a, dyn PubSubBehavior<T> + 'a, T>;
97
98 fn deref(&self) -> &Self::Target {
99 &self.0
100 }
101}
102
103impl<'a, T: Clone> DerefMut for DynPublisher<'a, T> {
104 fn deref_mut(&mut self) -> &mut Self::Target {
105 &mut self.0
106 }
107}
108
109#[derive(Debug)]
111pub struct Publisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
112 pub(super) Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
113);
114
115impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
116 for Publisher<'a, M, T, CAP, SUBS, PUBS>
117{
118 type Target = Pub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
119
120 fn deref(&self) -> &Self::Target {
121 &self.0
122 }
123}
124
125impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
126 for Publisher<'a, M, T, CAP, SUBS, PUBS>
127{
128 fn deref_mut(&mut self) -> &mut Self::Target {
129 &mut self.0
130 }
131}
132
133#[derive(Debug)]
136pub struct ImmediatePub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
137 channel: &'a PSB,
139 _phantom: PhantomData<T>,
140}
141
142impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> {
143 pub(super) fn new(channel: &'a PSB) -> Self {
144 Self {
145 channel,
146 _phantom: Default::default(),
147 }
148 }
149 pub fn publish_immediate(&self, message: T) {
152 self.channel.publish_immediate(message)
153 }
154
155 pub fn try_publish(&self, message: T) -> Result<(), T> {
157 self.channel.publish_with_context(message, None)
158 }
159
160 pub fn capacity(&self) -> usize {
162 self.channel.capacity()
163 }
164
165 pub fn free_capacity(&self) -> usize {
169 self.channel.free_capacity()
170 }
171
172 pub fn clear(&self) {
174 self.channel.clear();
175 }
176
177 pub fn len(&self) -> usize {
179 self.channel.len()
180 }
181
182 pub fn is_empty(&self) -> bool {
184 self.channel.is_empty()
185 }
186
187 pub fn is_full(&self) -> bool {
189 self.channel.is_full()
190 }
191}
192
193pub struct DynImmediatePublisher<'a, T: Clone>(pub(super) ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>);
195
196impl<'a, T: Clone> Deref for DynImmediatePublisher<'a, T> {
197 type Target = ImmediatePub<'a, dyn PubSubBehavior<T> + 'a, T>;
198
199 fn deref(&self) -> &Self::Target {
200 &self.0
201 }
202}
203
204impl<'a, T: Clone> DerefMut for DynImmediatePublisher<'a, T> {
205 fn deref_mut(&mut self) -> &mut Self::Target {
206 &mut self.0
207 }
208}
209
210#[derive(Debug)]
212pub struct ImmediatePublisher<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
213 pub(super) ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
214);
215
216impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
217 for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS>
218{
219 type Target = ImmediatePub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
220
221 fn deref(&self) -> &Self::Target {
222 &self.0
223 }
224}
225
226impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
227 for ImmediatePublisher<'a, M, T, CAP, SUBS, PUBS>
228{
229 fn deref_mut(&mut self) -> &mut Self::Target {
230 &mut self.0
231 }
232}
233
234#[must_use = "Sinks do nothing unless polled"]
235#[derive(Debug)]
237pub struct PubSink<'a, 'p, PSB, T>
238where
239 T: Clone,
240 PSB: PubSubBehavior<T> + ?Sized,
241{
242 publ: &'p Pub<'a, PSB, T>,
243 fut: Option<PublisherWaitFuture<'p, 'a, PSB, T>>,
244}
245
246impl<'a, 'p, PSB, T> PubSink<'a, 'p, PSB, T>
247where
248 PSB: PubSubBehavior<T> + ?Sized,
249 T: Clone,
250{
251 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
253 let Some(mut fut) = self.fut.take() else {
254 return Poll::Ready(());
255 };
256
257 if Pin::new(&mut fut).poll(cx).is_pending() {
258 self.fut = Some(fut);
259 return Poll::Pending;
260 }
261
262 Poll::Ready(())
263 }
264}
265
266impl<'a, 'p, PSB, T> futures_sink::Sink<T> for PubSink<'a, 'p, PSB, T>
267where
268 PSB: PubSubBehavior<T> + ?Sized,
269 T: Clone,
270{
271 type Error = core::convert::Infallible;
272
273 #[inline]
274 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
275 self.poll(cx).map(Ok)
276 }
277
278 #[inline]
279 fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
280 self.fut = Some(self.publ.publish(item));
281
282 Ok(())
283 }
284
285 #[inline]
286 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
287 self.poll(cx).map(Ok)
288 }
289
290 #[inline]
291 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
292 self.poll(cx).map(Ok)
293 }
294}
295
296#[must_use = "futures do nothing unless you `.await` or poll them"]
298#[derive(Debug)]
299pub struct PublisherWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
300 message: Option<T>,
302 publisher: &'s Pub<'a, PSB, T>,
303}
304
305impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for PublisherWaitFuture<'s, 'a, PSB, T> {
306 type Output = ();
307
308 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
309 let message = self.message.take().unwrap();
310 match self.publisher.channel.publish_with_context(message, Some(cx)) {
311 Ok(()) => Poll::Ready(()),
312 Err(message) => {
313 self.message = Some(message);
314 Poll::Pending
315 }
316 }
317 }
318}
319
320impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for PublisherWaitFuture<'s, 'a, PSB, T> {}