1use core::future::Future;
4use core::marker::PhantomData;
5use core::ops::{Deref, DerefMut};
6use core::pin::Pin;
7use core::task::{Context, Poll};
8
9use super::{PubSubBehavior, PubSubChannel, WaitResult};
10use crate::blocking_mutex::raw::RawMutex;
11
12#[derive(Debug)]
14pub struct Sub<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
15 next_message_id: u64,
17 channel: &'a PSB,
19 _phantom: PhantomData<T>,
20}
21
22impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Sub<'a, PSB, T> {
23 pub(super) fn new(next_message_id: u64, channel: &'a PSB) -> Self {
24 Self {
25 next_message_id,
26 channel,
27 _phantom: Default::default(),
28 }
29 }
30
31 pub fn next_message<'s>(&'s mut self) -> SubscriberWaitFuture<'s, 'a, PSB, T> {
33 SubscriberWaitFuture { subscriber: self }
34 }
35
36 pub async fn next_message_pure(&mut self) -> T {
38 loop {
39 match self.next_message().await {
40 WaitResult::Lagged(_) => continue,
41 WaitResult::Message(message) => break message,
42 }
43 }
44 }
45
46 pub fn try_next_message(&mut self) -> Option<WaitResult<T>> {
50 match self.channel.get_message_with_context(&mut self.next_message_id, None) {
51 Poll::Ready(result) => Some(result),
52 Poll::Pending => None,
53 }
54 }
55
56 pub fn try_next_message_pure(&mut self) -> Option<T> {
60 loop {
61 match self.try_next_message() {
62 Some(WaitResult::Lagged(_)) => continue,
63 Some(WaitResult::Message(message)) => break Some(message),
64 None => break None,
65 }
66 }
67 }
68
69 pub fn available(&self) -> u64 {
72 self.channel.available(self.next_message_id)
73 }
74
75 pub fn capacity(&self) -> usize {
77 self.channel.capacity()
78 }
79
80 pub fn free_capacity(&self) -> usize {
84 self.channel.free_capacity()
85 }
86
87 pub fn clear(&self) {
89 self.channel.clear();
90 }
91
92 pub fn len(&self) -> usize {
95 self.channel.len()
96 }
97
98 pub fn is_empty(&self) -> bool {
100 self.channel.is_empty()
101 }
102
103 pub fn is_full(&self) -> bool {
105 self.channel.is_full()
106 }
107}
108
109impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {
110 fn drop(&mut self) {
111 self.channel.unregister_subscriber(self.next_message_id)
112 }
113}
114
115impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for Sub<'a, PSB, T> {}
116
117impl<'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> futures_core::Stream for Sub<'a, PSB, T> {
120 type Item = T;
121
122 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
123 match self
124 .channel
125 .get_message_with_context(&mut self.next_message_id, Some(cx))
126 {
127 Poll::Ready(WaitResult::Message(message)) => Poll::Ready(Some(message)),
128 Poll::Ready(WaitResult::Lagged(_)) => {
129 cx.waker().wake_by_ref();
130 Poll::Pending
131 }
132 Poll::Pending => Poll::Pending,
133 }
134 }
135}
136
137pub struct DynSubscriber<'a, T: Clone>(pub(super) Sub<'a, dyn PubSubBehavior<T> + 'a, T>);
139
140impl<'a, T: Clone> Deref for DynSubscriber<'a, T> {
141 type Target = Sub<'a, dyn PubSubBehavior<T> + 'a, T>;
142
143 fn deref(&self) -> &Self::Target {
144 &self.0
145 }
146}
147
148impl<'a, T: Clone> DerefMut for DynSubscriber<'a, T> {
149 fn deref_mut(&mut self) -> &mut Self::Target {
150 &mut self.0
151 }
152}
153
154#[derive(Debug)]
156pub struct Subscriber<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize>(
157 pub(super) Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>,
158);
159
160impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> Deref
161 for Subscriber<'a, M, T, CAP, SUBS, PUBS>
162{
163 type Target = Sub<'a, PubSubChannel<M, T, CAP, SUBS, PUBS>, T>;
164
165 fn deref(&self) -> &Self::Target {
166 &self.0
167 }
168}
169
170impl<'a, M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> DerefMut
171 for Subscriber<'a, M, T, CAP, SUBS, PUBS>
172{
173 fn deref_mut(&mut self) -> &mut Self::Target {
174 &mut self.0
175 }
176}
177
178#[must_use = "futures do nothing unless you `.await` or poll them"]
180#[derive(Debug)]
181pub struct SubscriberWaitFuture<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> {
182 subscriber: &'s mut Sub<'a, PSB, T>,
183}
184
185impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Future for SubscriberWaitFuture<'s, 'a, PSB, T> {
186 type Output = WaitResult<T>;
187
188 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
189 self.subscriber
190 .channel
191 .get_message_with_context(&mut self.subscriber.next_message_id, Some(cx))
192 }
193}
194
195impl<'s, 'a, PSB: PubSubBehavior<T> + ?Sized, T: Clone> Unpin for SubscriberWaitFuture<'s, 'a, PSB, T> {}