embassy_executor/raw/
mod.rs

1//! Raw executor.
2//!
3//! This module exposes "raw" Executor and Task structs for more low level control.
4//!
5//! ## WARNING: here be dragons!
6//!
7//! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe
8//! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_executor_macros::task) macro, which are fully safe.
9
10#[cfg_attr(target_has_atomic = "ptr", path = "run_queue_atomics.rs")]
11#[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")]
12mod run_queue;
13
14#[cfg_attr(all(cortex_m, target_has_atomic = "32"), path = "state_atomics_arm.rs")]
15#[cfg_attr(
16    all(not(cortex_m), any(target_has_atomic = "8", target_has_atomic = "32")),
17    path = "state_atomics.rs"
18)]
19#[cfg_attr(
20    not(any(target_has_atomic = "8", target_has_atomic = "32")),
21    path = "state_critical_section.rs"
22)]
23mod state;
24
25#[cfg(feature = "trace")]
26pub mod trace;
27pub(crate) mod util;
28#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")]
29mod waker;
30
31use core::future::Future;
32use core::marker::PhantomData;
33use core::mem;
34use core::pin::Pin;
35use core::ptr::NonNull;
36#[cfg(not(feature = "arch-avr"))]
37use core::sync::atomic::AtomicPtr;
38use core::sync::atomic::Ordering;
39use core::task::{Context, Poll, Waker};
40
41use embassy_executor_timer_queue::TimerQueueItem;
42#[cfg(feature = "arch-avr")]
43use portable_atomic::AtomicPtr;
44
45use self::run_queue::{RunQueue, RunQueueItem};
46use self::state::State;
47use self::util::{SyncUnsafeCell, UninitCell};
48pub use self::waker::task_from_waker;
49use super::SpawnToken;
50
51#[no_mangle]
52extern "Rust" fn __embassy_time_queue_item_from_waker(waker: &Waker) -> &'static mut TimerQueueItem {
53    unsafe { task_from_waker(waker).timer_queue_item() }
54}
55
56/// Raw task header for use in task pointers.
57///
58/// A task can be in one of the following states:
59///
60/// - Not spawned: the task is ready to spawn.
61/// - `SPAWNED`: the task is currently spawned and may be running.
62/// - `RUN_ENQUEUED`: the task is enqueued to be polled. Note that the task may be `!SPAWNED`.
63///    In this case, the `RUN_ENQUEUED` state will be cleared when the task is next polled, without
64///    polling the task's future.
65///
66/// A task's complete life cycle is as follows:
67///
68/// ```text
69/// ┌────────────┐   ┌────────────────────────┐
70/// │Not spawned │◄─5┤Not spawned|Run enqueued│
71/// │            ├6─►│                        │
72/// └─────┬──────┘   └──────▲─────────────────┘
73///       1                 │
74///       │    ┌────────────┘
75///       │    4
76/// ┌─────▼────┴─────────┐
77/// │Spawned|Run enqueued│
78/// │                    │
79/// └─────┬▲─────────────┘
80///       2│
81///       │3
82/// ┌─────▼┴─────┐
83/// │  Spawned   │
84/// │            │
85/// └────────────┘
86/// ```
87///
88/// Transitions:
89/// - 1: Task is spawned - `AvailableTask::claim -> Executor::spawn`
90/// - 2: During poll - `RunQueue::dequeue_all -> State::run_dequeue`
91/// - 3: Task wakes itself, waker wakes task, or task exits - `Waker::wake -> wake_task -> State::run_enqueue`
92/// - 4: A run-queued task exits - `TaskStorage::poll -> Poll::Ready`
93/// - 5: Task is dequeued. The task's future is not polled, because exiting the task replaces its `poll_fn`.
94/// - 6: A task is waken when it is not spawned - `wake_task -> State::run_enqueue`
95pub(crate) struct TaskHeader {
96    pub(crate) state: State,
97    pub(crate) run_queue_item: RunQueueItem,
98    pub(crate) executor: AtomicPtr<SyncExecutor>,
99    poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
100
101    /// Integrated timer queue storage. This field should not be accessed outside of the timer queue.
102    pub(crate) timer_queue_item: TimerQueueItem,
103    #[cfg(feature = "trace")]
104    pub(crate) name: Option<&'static str>,
105    #[cfg(feature = "trace")]
106    pub(crate) id: u32,
107    #[cfg(feature = "trace")]
108    all_tasks_next: AtomicPtr<TaskHeader>,
109}
110
111/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
112#[derive(Debug, Clone, Copy, PartialEq)]
113pub struct TaskRef {
114    ptr: NonNull<TaskHeader>,
115}
116
117unsafe impl Send for TaskRef where &'static TaskHeader: Send {}
118unsafe impl Sync for TaskRef where &'static TaskHeader: Sync {}
119
120impl TaskRef {
121    fn new<F: Future + 'static>(task: &'static TaskStorage<F>) -> Self {
122        Self {
123            ptr: NonNull::from(task).cast(),
124        }
125    }
126
127    /// Safety: The pointer must have been obtained with `Task::as_ptr`
128    pub(crate) unsafe fn from_ptr(ptr: *const TaskHeader) -> Self {
129        Self {
130            ptr: NonNull::new_unchecked(ptr as *mut TaskHeader),
131        }
132    }
133
134    pub(crate) fn header(self) -> &'static TaskHeader {
135        unsafe { self.ptr.as_ref() }
136    }
137
138    /// Returns a reference to the executor that the task is currently running on.
139    pub unsafe fn executor(self) -> Option<&'static Executor> {
140        let executor = self.header().executor.load(Ordering::Relaxed);
141        executor.as_ref().map(|e| Executor::wrap(e))
142    }
143
144    /// Returns a mutable reference to the timer queue item.
145    ///
146    /// Safety
147    ///
148    /// This function must only be called in the context of the integrated timer queue.
149    pub unsafe fn timer_queue_item(mut self) -> &'static mut TimerQueueItem {
150        unsafe { &mut self.ptr.as_mut().timer_queue_item }
151    }
152
153    /// The returned pointer is valid for the entire TaskStorage.
154    pub(crate) fn as_ptr(self) -> *const TaskHeader {
155        self.ptr.as_ptr()
156    }
157}
158
159/// Raw storage in which a task can be spawned.
160///
161/// This struct holds the necessary memory to spawn one task whose future is `F`.
162/// At a given time, the `TaskStorage` may be in spawned or not-spawned state. You
163/// may spawn it with [`TaskStorage::spawn()`], which will fail if it is already spawned.
164///
165/// A `TaskStorage` must live forever, it may not be deallocated even after the task has finished
166/// running. Hence the relevant methods require `&'static self`. It may be reused, however.
167///
168/// Internally, the [embassy_executor::task](embassy_executor_macros::task) macro allocates an array of `TaskStorage`s
169/// in a `static`. The most common reason to use the raw `Task` is to have control of where
170/// the memory for the task is allocated: on the stack, or on the heap with e.g. `Box::leak`, etc.
171
172// repr(C) is needed to guarantee that the Task is located at offset 0
173// This makes it safe to cast between TaskHeader and TaskStorage pointers.
174#[repr(C)]
175pub struct TaskStorage<F: Future + 'static> {
176    raw: TaskHeader,
177    future: UninitCell<F>, // Valid if STATE_SPAWNED
178}
179
180unsafe fn poll_exited(_p: TaskRef) {
181    // Nothing to do, the task is already !SPAWNED and dequeued.
182}
183
184impl<F: Future + 'static> TaskStorage<F> {
185    const NEW: Self = Self::new();
186
187    /// Create a new TaskStorage, in not-spawned state.
188    pub const fn new() -> Self {
189        Self {
190            raw: TaskHeader {
191                state: State::new(),
192                run_queue_item: RunQueueItem::new(),
193                executor: AtomicPtr::new(core::ptr::null_mut()),
194                // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
195                poll_fn: SyncUnsafeCell::new(None),
196
197                timer_queue_item: TimerQueueItem::new(),
198                #[cfg(feature = "trace")]
199                name: None,
200                #[cfg(feature = "trace")]
201                id: 0,
202                #[cfg(feature = "trace")]
203                all_tasks_next: AtomicPtr::new(core::ptr::null_mut()),
204            },
205            future: UninitCell::uninit(),
206        }
207    }
208
209    /// Try to spawn the task.
210    ///
211    /// The `future` closure constructs the future. It's only called if spawning is
212    /// actually possible. It is a closure instead of a simple `future: F` param to ensure
213    /// the future is constructed in-place, avoiding a temporary copy in the stack thanks to
214    /// NRVO optimizations.
215    ///
216    /// This function will fail if the task is already spawned and has not finished running.
217    /// In this case, the error is delayed: a "poisoned" SpawnToken is returned, which will
218    /// cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
219    ///
220    /// Once the task has finished running, you may spawn it again. It is allowed to spawn it
221    /// on a different executor.
222    pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
223        let task = AvailableTask::claim(self);
224        match task {
225            Some(task) => task.initialize(future),
226            None => SpawnToken::new_failed(),
227        }
228    }
229
230    unsafe fn poll(p: TaskRef) {
231        let this = &*p.as_ptr().cast::<TaskStorage<F>>();
232
233        let future = Pin::new_unchecked(this.future.as_mut());
234        let waker = waker::from_task(p);
235        let mut cx = Context::from_waker(&waker);
236        match future.poll(&mut cx) {
237            Poll::Ready(_) => {
238                #[cfg(feature = "trace")]
239                let exec_ptr: *const SyncExecutor = this.raw.executor.load(Ordering::Relaxed);
240
241                // As the future has finished and this function will not be called
242                // again, we can safely drop the future here.
243                this.future.drop_in_place();
244
245                // We replace the poll_fn with a despawn function, so that the task is cleaned up
246                // when the executor polls it next.
247                this.raw.poll_fn.set(Some(poll_exited));
248
249                // Make sure we despawn last, so that other threads can only spawn the task
250                // after we're done with it.
251                this.raw.state.despawn();
252
253                #[cfg(feature = "trace")]
254                trace::task_end(exec_ptr, &p);
255            }
256            Poll::Pending => {}
257        }
258
259        // the compiler is emitting a virtual call for waker drop, but we know
260        // it's a noop for our waker.
261        mem::forget(waker);
262    }
263
264    #[doc(hidden)]
265    #[allow(dead_code)]
266    fn _assert_sync(self) {
267        fn assert_sync<T: Sync>(_: T) {}
268
269        assert_sync(self)
270    }
271}
272
273/// An uninitialized [`TaskStorage`].
274pub struct AvailableTask<F: Future + 'static> {
275    task: &'static TaskStorage<F>,
276}
277
278impl<F: Future + 'static> AvailableTask<F> {
279    /// Try to claim a [`TaskStorage`].
280    ///
281    /// This function returns `None` if a task has already been spawned and has not finished running.
282    pub fn claim(task: &'static TaskStorage<F>) -> Option<Self> {
283        task.raw.state.spawn().then(|| Self { task })
284    }
285
286    fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> {
287        unsafe {
288            self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
289            self.task.future.write_in_place(future);
290
291            let task = TaskRef::new(self.task);
292
293            SpawnToken::new(task)
294        }
295    }
296
297    /// Initialize the [`TaskStorage`] to run the given future.
298    pub fn initialize(self, future: impl FnOnce() -> F) -> SpawnToken<F> {
299        self.initialize_impl::<F>(future)
300    }
301
302    /// Initialize the [`TaskStorage`] to run the given future.
303    ///
304    /// # Safety
305    ///
306    /// `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
307    /// is an `async fn`, NOT a hand-written `Future`.
308    #[doc(hidden)]
309    pub unsafe fn __initialize_async_fn<FutFn>(self, future: impl FnOnce() -> F) -> SpawnToken<FutFn> {
310        // When send-spawning a task, we construct the future in this thread, and effectively
311        // "send" it to the executor thread by enqueuing it in its queue. Therefore, in theory,
312        // send-spawning should require the future `F` to be `Send`.
313        //
314        // The problem is this is more restrictive than needed. Once the future is executing,
315        // it is never sent to another thread. It is only sent when spawning. It should be
316        // enough for the task's arguments to be Send. (and in practice it's super easy to
317        // accidentally make your futures !Send, for example by holding an `Rc` or a `&RefCell` across an `.await`.)
318        //
319        // We can do it by sending the task args and constructing the future in the executor thread
320        // on first poll. However, this cannot be done in-place, so it'll waste stack space for a copy
321        // of the args.
322        //
323        // Luckily, an `async fn` future contains just the args when freshly constructed. So, if the
324        // args are Send, it's OK to send a !Send future, as long as we do it before first polling it.
325        //
326        // (Note: this is how the generators are implemented today, it's not officially guaranteed yet,
327        // but it's possible it'll be guaranteed in the future. See zulip thread:
328        // https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.22only.20before.20poll.22.20Send.20futures )
329        //
330        // The `FutFn` captures all the args, so if it's Send, the task can be send-spawned.
331        // This is why we return `SpawnToken<FutFn>` below.
332        //
333        // This ONLY holds for `async fn` futures. The other `spawn` methods can be called directly
334        // by the user, with arbitrary hand-implemented futures. This is why these return `SpawnToken<F>`.
335        self.initialize_impl::<FutFn>(future)
336    }
337}
338
339/// Raw storage that can hold up to N tasks of the same type.
340///
341/// This is essentially a `[TaskStorage<F>; N]`.
342pub struct TaskPool<F: Future + 'static, const N: usize> {
343    pool: [TaskStorage<F>; N],
344}
345
346impl<F: Future + 'static, const N: usize> TaskPool<F, N> {
347    /// Create a new TaskPool, with all tasks in non-spawned state.
348    pub const fn new() -> Self {
349        Self {
350            pool: [TaskStorage::NEW; N],
351        }
352    }
353
354    fn spawn_impl<T>(&'static self, future: impl FnOnce() -> F) -> SpawnToken<T> {
355        match self.pool.iter().find_map(AvailableTask::claim) {
356            Some(task) => task.initialize_impl::<T>(future),
357            None => SpawnToken::new_failed(),
358        }
359    }
360
361    /// Try to spawn a task in the pool.
362    ///
363    /// See [`TaskStorage::spawn()`] for details.
364    ///
365    /// This will loop over the pool and spawn the task in the first storage that
366    /// is currently free. If none is free, a "poisoned" SpawnToken is returned,
367    /// which will cause [`Spawner::spawn()`](super::Spawner::spawn) to return the error.
368    pub fn spawn(&'static self, future: impl FnOnce() -> F) -> SpawnToken<impl Sized> {
369        self.spawn_impl::<F>(future)
370    }
371
372    /// Like spawn(), but allows the task to be send-spawned if the args are Send even if
373    /// the future is !Send.
374    ///
375    /// Not covered by semver guarantees. DO NOT call this directly. Intended to be used
376    /// by the Embassy macros ONLY.
377    ///
378    /// SAFETY: `future` must be a closure of the form `move || my_async_fn(args)`, where `my_async_fn`
379    /// is an `async fn`, NOT a hand-written `Future`.
380    #[doc(hidden)]
381    pub unsafe fn _spawn_async_fn<FutFn>(&'static self, future: FutFn) -> SpawnToken<impl Sized>
382    where
383        FutFn: FnOnce() -> F,
384    {
385        // See the comment in AvailableTask::__initialize_async_fn for explanation.
386        self.spawn_impl::<FutFn>(future)
387    }
388}
389
390#[derive(Clone, Copy)]
391pub(crate) struct Pender(*mut ());
392
393unsafe impl Send for Pender {}
394unsafe impl Sync for Pender {}
395
396impl Pender {
397    pub(crate) fn pend(self) {
398        extern "Rust" {
399            fn __pender(context: *mut ());
400        }
401        unsafe { __pender(self.0) };
402    }
403}
404
405pub(crate) struct SyncExecutor {
406    run_queue: RunQueue,
407    pender: Pender,
408}
409
410impl SyncExecutor {
411    pub(crate) fn new(pender: Pender) -> Self {
412        Self {
413            run_queue: RunQueue::new(),
414            pender,
415        }
416    }
417
418    /// Enqueue a task in the task queue
419    ///
420    /// # Safety
421    /// - `task` must be a valid pointer to a spawned task.
422    /// - `task` must be set up to run in this executor.
423    /// - `task` must NOT be already enqueued (in this executor or another one).
424    #[inline(always)]
425    unsafe fn enqueue(&self, task: TaskRef, l: state::Token) {
426        #[cfg(feature = "trace")]
427        trace::task_ready_begin(self, &task);
428
429        if self.run_queue.enqueue(task, l) {
430            self.pender.pend();
431        }
432    }
433
434    pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
435        task.header()
436            .executor
437            .store((self as *const Self).cast_mut(), Ordering::Relaxed);
438
439        #[cfg(feature = "trace")]
440        trace::task_new(self, &task);
441
442        state::locked(|l| {
443            self.enqueue(task, l);
444        })
445    }
446
447    /// # Safety
448    ///
449    /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
450    pub(crate) unsafe fn poll(&'static self) {
451        #[cfg(feature = "trace")]
452        trace::poll_start(self);
453
454        self.run_queue.dequeue_all(|p| {
455            let task = p.header();
456
457            #[cfg(feature = "trace")]
458            trace::task_exec_begin(self, &p);
459
460            // Run the task
461            task.poll_fn.get().unwrap_unchecked()(p);
462
463            #[cfg(feature = "trace")]
464            trace::task_exec_end(self, &p);
465        });
466
467        #[cfg(feature = "trace")]
468        trace::executor_idle(self)
469    }
470}
471
472/// Raw executor.
473///
474/// This is the core of the Embassy executor. It is low-level, requiring manual
475/// handling of wakeups and task polling. If you can, prefer using one of the
476/// [higher level executors](crate::Executor).
477///
478/// The raw executor leaves it up to you to handle wakeups and scheduling:
479///
480/// - To get the executor to do work, call `poll()`. This will poll all queued tasks (all tasks
481///   that "want to run").
482/// - You must supply a pender function, as shown below. The executor will call it to notify you
483///   it has work to do. You must arrange for `poll()` to be called as soon as possible.
484/// - Enabling `arch-xx` features will define a pender function for you. This means that you
485///   are limited to using the executors provided to you by the architecture/platform
486///   implementation. If you need a different executor, you must not enable `arch-xx` features.
487///
488/// The pender can be called from *any* context: any thread, any interrupt priority
489/// level, etc. It may be called synchronously from any `Executor` method call as well.
490/// You must deal with this correctly.
491///
492/// In particular, you must NOT call `poll` directly from the pender callback, as this violates
493/// the requirement for `poll` to not be called reentrantly.
494///
495/// The pender function must be exported with the name `__pender` and have the following signature:
496///
497/// ```rust
498/// #[export_name = "__pender"]
499/// fn pender(context: *mut ()) {
500///    // schedule `poll()` to be called
501/// }
502/// ```
503///
504/// The `context` argument is a piece of arbitrary data the executor will pass to the pender.
505/// You can set the `context` when calling [`Executor::new()`]. You can use it to, for example,
506/// differentiate between executors, or to pass a pointer to a callback that should be called.
507#[repr(transparent)]
508pub struct Executor {
509    pub(crate) inner: SyncExecutor,
510
511    _not_sync: PhantomData<*mut ()>,
512}
513
514impl Executor {
515    pub(crate) unsafe fn wrap(inner: &SyncExecutor) -> &Self {
516        mem::transmute(inner)
517    }
518
519    /// Create a new executor.
520    ///
521    /// When the executor has work to do, it will call the pender function and pass `context` to it.
522    ///
523    /// See [`Executor`] docs for details on the pender.
524    pub fn new(context: *mut ()) -> Self {
525        Self {
526            inner: SyncExecutor::new(Pender(context)),
527            _not_sync: PhantomData,
528        }
529    }
530
531    /// Spawn a task in this executor.
532    ///
533    /// # Safety
534    ///
535    /// `task` must be a valid pointer to an initialized but not-already-spawned task.
536    ///
537    /// It is OK to use `unsafe` to call this from a thread that's not the executor thread.
538    /// In this case, the task's Future must be Send. This is because this is effectively
539    /// sending the task to the executor thread.
540    pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
541        self.inner.spawn(task)
542    }
543
544    /// Poll all queued tasks in this executor.
545    ///
546    /// This loops over all tasks that are queued to be polled (i.e. they're
547    /// freshly spawned or they've been woken). Other tasks are not polled.
548    ///
549    /// You must call `poll` after receiving a call to the pender. It is OK
550    /// to call `poll` even when not requested by the pender, but it wastes
551    /// energy.
552    ///
553    /// # Safety
554    ///
555    /// You must call `initialize` before calling this method.
556    ///
557    /// You must NOT call `poll` reentrantly on the same executor.
558    ///
559    /// In particular, note that `poll` may call the pender synchronously. Therefore, you
560    /// must NOT directly call `poll()` from the pender callback. Instead, the callback has to
561    /// somehow schedule for `poll()` to be called later, at a time you know for sure there's
562    /// no `poll()` already running.
563    pub unsafe fn poll(&'static self) {
564        self.inner.poll()
565    }
566
567    /// Get a spawner that spawns tasks in this executor.
568    ///
569    /// It is OK to call this method multiple times to obtain multiple
570    /// `Spawner`s. You may also copy `Spawner`s.
571    pub fn spawner(&'static self) -> super::Spawner {
572        super::Spawner::new(self)
573    }
574
575    /// Get a unique ID for this Executor.
576    pub fn id(&'static self) -> usize {
577        &self.inner as *const SyncExecutor as usize
578    }
579}
580
581/// Wake a task by `TaskRef`.
582///
583/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
584pub fn wake_task(task: TaskRef) {
585    let header = task.header();
586    header.state.run_enqueue(|l| {
587        // We have just marked the task as scheduled, so enqueue it.
588        unsafe {
589            let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked();
590            executor.enqueue(task, l);
591        }
592    });
593}
594
595/// Wake a task by `TaskRef` without calling pend.
596///
597/// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`].
598pub fn wake_task_no_pend(task: TaskRef) {
599    let header = task.header();
600    header.state.run_enqueue(|l| {
601        // We have just marked the task as scheduled, so enqueue it.
602        unsafe {
603            let executor = header.executor.load(Ordering::Relaxed).as_ref().unwrap_unchecked();
604            executor.run_queue.enqueue(task, l);
605        }
606    });
607}