axtask/
wait_queue.rs

1use alloc::collections::VecDeque;
2use alloc::sync::Arc;
3
4use kernel_guard::{NoOp, NoPreemptIrqSave};
5use kspin::{SpinNoIrq, SpinNoIrqGuard};
6
7use crate::{AxTaskRef, CurrentTask, current_run_queue, select_run_queue};
8
9/// A queue to store sleeping tasks.
10///
11/// # Examples
12///
13/// ```
14/// use axtask::WaitQueue;
15/// use core::sync::atomic::{AtomicU32, Ordering};
16///
17/// static VALUE: AtomicU32 = AtomicU32::new(0);
18/// static WQ: WaitQueue = WaitQueue::new();
19///
20/// axtask::init_scheduler();
21/// // spawn a new task that updates `VALUE` and notifies the main task
22/// axtask::spawn(|| {
23///     assert_eq!(VALUE.load(Ordering::Relaxed), 0);
24///     VALUE.fetch_add(1, Ordering::Relaxed);
25///     WQ.notify_one(true); // wake up the main task
26/// });
27///
28/// WQ.wait(); // block until `notify()` is called
29/// assert_eq!(VALUE.load(Ordering::Relaxed), 1);
30/// ```
31pub struct WaitQueue {
32    queue: SpinNoIrq<VecDeque<AxTaskRef>>,
33}
34
35pub(crate) type WaitQueueGuard<'a> = SpinNoIrqGuard<'a, VecDeque<AxTaskRef>>;
36
37impl WaitQueue {
38    /// Creates an empty wait queue.
39    pub const fn new() -> Self {
40        Self {
41            queue: SpinNoIrq::new(VecDeque::new()),
42        }
43    }
44
45    /// Creates an empty wait queue with space for at least `capacity` elements.
46    pub fn with_capacity(capacity: usize) -> Self {
47        Self {
48            queue: SpinNoIrq::new(VecDeque::with_capacity(capacity)),
49        }
50    }
51
52    /// Cancel events by removing the task from the wait queue.
53    /// If `from_timer_list` is true, try to remove the task from the timer list.
54    fn cancel_events(&self, curr: CurrentTask, _from_timer_list: bool) {
55        // A task can be wake up only one events (timer or `notify()`), remove
56        // the event from another queue.
57        if curr.in_wait_queue() {
58            // wake up by timer (timeout).
59            self.queue.lock().retain(|t| !curr.ptr_eq(t));
60            curr.set_in_wait_queue(false);
61        }
62
63        // Try to cancel a timer event from timer lists.
64        // Just mark task's current timer ticket ID as expired.
65        #[cfg(feature = "irq")]
66        if _from_timer_list {
67            curr.timer_ticket_expired();
68            // Note:
69            //  this task is still not removed from timer list of target CPU,
70            //  which may cause some redundant timer events because it still needs to
71            //  go through the process of expiring an event from the timer list and invoking the callback.
72            //  (it can be considered a lazy-removal strategy, it will be ignored when it is about to take effect.)
73        }
74    }
75
76    /// Blocks the current task and put it into the wait queue, until other task
77    /// notifies it.
78    pub fn wait(&self) {
79        current_run_queue::<NoPreemptIrqSave>().blocked_resched(self.queue.lock());
80        self.cancel_events(crate::current(), false);
81    }
82
83    /// Blocks the current task and put it into the wait queue, until the given
84    /// `condition` becomes true.
85    ///
86    /// Note that even other tasks notify this task, it will not wake up until
87    /// the condition becomes true.
88    pub fn wait_until<F>(&self, condition: F)
89    where
90        F: Fn() -> bool,
91    {
92        let curr = crate::current();
93        loop {
94            let mut rq = current_run_queue::<NoPreemptIrqSave>();
95            let wq = self.queue.lock();
96            if condition() {
97                break;
98            }
99            rq.blocked_resched(wq);
100            // Preemption may occur here.
101        }
102        self.cancel_events(curr, false);
103    }
104
105    /// Blocks the current task and put it into the wait queue, until other tasks
106    /// notify it, or the given duration has elapsed.
107    #[cfg(feature = "irq")]
108    pub fn wait_timeout(&self, dur: core::time::Duration) -> bool {
109        let mut rq = current_run_queue::<NoPreemptIrqSave>();
110        let curr = crate::current();
111        let deadline = axhal::time::wall_time() + dur;
112        debug!(
113            "task wait_timeout: {} deadline={:?}",
114            curr.id_name(),
115            deadline
116        );
117        crate::timers::set_alarm_wakeup(deadline, curr.clone());
118
119        rq.blocked_resched(self.queue.lock());
120
121        let timeout = curr.in_wait_queue(); // still in the wait queue, must have timed out
122
123        // Always try to remove the task from the timer list.
124        self.cancel_events(curr, true);
125        timeout
126    }
127
128    /// Blocks the current task and put it into the wait queue, until the given
129    /// `condition` becomes true, or the given duration has elapsed.
130    ///
131    /// Note that even other tasks notify this task, it will not wake up until
132    /// the above conditions are met.
133    #[cfg(feature = "irq")]
134    pub fn wait_timeout_until<F>(&self, dur: core::time::Duration, condition: F) -> bool
135    where
136        F: Fn() -> bool,
137    {
138        let curr = crate::current();
139        let deadline = axhal::time::wall_time() + dur;
140        debug!(
141            "task wait_timeout: {}, deadline={:?}",
142            curr.id_name(),
143            deadline
144        );
145        crate::timers::set_alarm_wakeup(deadline, curr.clone());
146
147        let mut timeout = true;
148        loop {
149            let mut rq = current_run_queue::<NoPreemptIrqSave>();
150            if axhal::time::wall_time() >= deadline {
151                break;
152            }
153            let wq = self.queue.lock();
154            if condition() {
155                timeout = false;
156                break;
157            }
158
159            rq.blocked_resched(wq);
160            // Preemption may occur here.
161        }
162        // Always try to remove the task from the timer list.
163        self.cancel_events(curr, true);
164        timeout
165    }
166
167    /// Wakes up one task in the wait queue, usually the first one.
168    ///
169    /// If `resched` is true, the current task will be preempted when the
170    /// preemption is enabled.
171    pub fn notify_one(&self, resched: bool) -> bool {
172        let mut wq = self.queue.lock();
173        if let Some(task) = wq.pop_front() {
174            unblock_one_task(task, resched);
175            true
176        } else {
177            false
178        }
179    }
180
181    /// Wakes all tasks in the wait queue.
182    ///
183    /// If `resched` is true, the current task will be preempted when the
184    /// preemption is enabled.
185    pub fn notify_all(&self, resched: bool) {
186        while self.notify_one(resched) {
187            // loop until the wait queue is empty
188        }
189    }
190
191    /// Wake up the given task in the wait queue.
192    ///
193    /// If `resched` is true, the current task will be preempted when the
194    /// preemption is enabled.
195    pub fn notify_task(&mut self, resched: bool, task: &AxTaskRef) -> bool {
196        let mut wq = self.queue.lock();
197        if let Some(index) = wq.iter().position(|t| Arc::ptr_eq(t, task)) {
198            unblock_one_task(wq.remove(index).unwrap(), resched);
199            true
200        } else {
201            false
202        }
203    }
204}
205
206fn unblock_one_task(task: AxTaskRef, resched: bool) {
207    // Mark task as not in wait queue.
208    task.set_in_wait_queue(false);
209    // Select run queue by the CPU set of the task.
210    // Use `NoOp` kernel guard here because the function is called with holding the
211    // lock of wait queue, where the irq and preemption are disabled.
212    select_run_queue::<NoOp>(&task).unblock_task(task, resched)
213}