axtask/wait_queue.rs
1use alloc::collections::VecDeque;
2use alloc::sync::Arc;
3
4use kernel_guard::{NoOp, NoPreemptIrqSave};
5use kspin::{SpinNoIrq, SpinNoIrqGuard};
6
7use crate::{AxTaskRef, CurrentTask, current_run_queue, select_run_queue};
8
9/// A queue to store sleeping tasks.
10///
11/// # Examples
12///
13/// ```
14/// use axtask::WaitQueue;
15/// use core::sync::atomic::{AtomicU32, Ordering};
16///
17/// static VALUE: AtomicU32 = AtomicU32::new(0);
18/// static WQ: WaitQueue = WaitQueue::new();
19///
20/// axtask::init_scheduler();
21/// // spawn a new task that updates `VALUE` and notifies the main task
22/// axtask::spawn(|| {
23/// assert_eq!(VALUE.load(Ordering::Relaxed), 0);
24/// VALUE.fetch_add(1, Ordering::Relaxed);
25/// WQ.notify_one(true); // wake up the main task
26/// });
27///
28/// WQ.wait(); // block until `notify()` is called
29/// assert_eq!(VALUE.load(Ordering::Relaxed), 1);
30/// ```
31pub struct WaitQueue {
32 queue: SpinNoIrq<VecDeque<AxTaskRef>>,
33}
34
35pub(crate) type WaitQueueGuard<'a> = SpinNoIrqGuard<'a, VecDeque<AxTaskRef>>;
36
37impl WaitQueue {
38 /// Creates an empty wait queue.
39 pub const fn new() -> Self {
40 Self {
41 queue: SpinNoIrq::new(VecDeque::new()),
42 }
43 }
44
45 /// Creates an empty wait queue with space for at least `capacity` elements.
46 pub fn with_capacity(capacity: usize) -> Self {
47 Self {
48 queue: SpinNoIrq::new(VecDeque::with_capacity(capacity)),
49 }
50 }
51
52 /// Cancel events by removing the task from the wait queue.
53 /// If `from_timer_list` is true, try to remove the task from the timer list.
54 fn cancel_events(&self, curr: CurrentTask, _from_timer_list: bool) {
55 // A task can be wake up only one events (timer or `notify()`), remove
56 // the event from another queue.
57 if curr.in_wait_queue() {
58 // wake up by timer (timeout).
59 self.queue.lock().retain(|t| !curr.ptr_eq(t));
60 curr.set_in_wait_queue(false);
61 }
62
63 // Try to cancel a timer event from timer lists.
64 // Just mark task's current timer ticket ID as expired.
65 #[cfg(feature = "irq")]
66 if _from_timer_list {
67 curr.timer_ticket_expired();
68 // Note:
69 // this task is still not removed from timer list of target CPU,
70 // which may cause some redundant timer events because it still needs to
71 // go through the process of expiring an event from the timer list and invoking the callback.
72 // (it can be considered a lazy-removal strategy, it will be ignored when it is about to take effect.)
73 }
74 }
75
76 /// Blocks the current task and put it into the wait queue, until other task
77 /// notifies it.
78 pub fn wait(&self) {
79 current_run_queue::<NoPreemptIrqSave>().blocked_resched(self.queue.lock());
80 self.cancel_events(crate::current(), false);
81 }
82
83 /// Blocks the current task and put it into the wait queue, until the given
84 /// `condition` becomes true.
85 ///
86 /// Note that even other tasks notify this task, it will not wake up until
87 /// the condition becomes true.
88 pub fn wait_until<F>(&self, condition: F)
89 where
90 F: Fn() -> bool,
91 {
92 let curr = crate::current();
93 loop {
94 let mut rq = current_run_queue::<NoPreemptIrqSave>();
95 let wq = self.queue.lock();
96 if condition() {
97 break;
98 }
99 rq.blocked_resched(wq);
100 // Preemption may occur here.
101 }
102 self.cancel_events(curr, false);
103 }
104
105 /// Blocks the current task and put it into the wait queue, until other tasks
106 /// notify it, or the given duration has elapsed.
107 #[cfg(feature = "irq")]
108 pub fn wait_timeout(&self, dur: core::time::Duration) -> bool {
109 let mut rq = current_run_queue::<NoPreemptIrqSave>();
110 let curr = crate::current();
111 let deadline = axhal::time::wall_time() + dur;
112 debug!(
113 "task wait_timeout: {} deadline={:?}",
114 curr.id_name(),
115 deadline
116 );
117 crate::timers::set_alarm_wakeup(deadline, curr.clone());
118
119 rq.blocked_resched(self.queue.lock());
120
121 let timeout = curr.in_wait_queue(); // still in the wait queue, must have timed out
122
123 // Always try to remove the task from the timer list.
124 self.cancel_events(curr, true);
125 timeout
126 }
127
128 /// Blocks the current task and put it into the wait queue, until the given
129 /// `condition` becomes true, or the given duration has elapsed.
130 ///
131 /// Note that even other tasks notify this task, it will not wake up until
132 /// the above conditions are met.
133 #[cfg(feature = "irq")]
134 pub fn wait_timeout_until<F>(&self, dur: core::time::Duration, condition: F) -> bool
135 where
136 F: Fn() -> bool,
137 {
138 let curr = crate::current();
139 let deadline = axhal::time::wall_time() + dur;
140 debug!(
141 "task wait_timeout: {}, deadline={:?}",
142 curr.id_name(),
143 deadline
144 );
145 crate::timers::set_alarm_wakeup(deadline, curr.clone());
146
147 let mut timeout = true;
148 loop {
149 let mut rq = current_run_queue::<NoPreemptIrqSave>();
150 if axhal::time::wall_time() >= deadline {
151 break;
152 }
153 let wq = self.queue.lock();
154 if condition() {
155 timeout = false;
156 break;
157 }
158
159 rq.blocked_resched(wq);
160 // Preemption may occur here.
161 }
162 // Always try to remove the task from the timer list.
163 self.cancel_events(curr, true);
164 timeout
165 }
166
167 /// Wakes up one task in the wait queue, usually the first one.
168 ///
169 /// If `resched` is true, the current task will be preempted when the
170 /// preemption is enabled.
171 pub fn notify_one(&self, resched: bool) -> bool {
172 let mut wq = self.queue.lock();
173 if let Some(task) = wq.pop_front() {
174 unblock_one_task(task, resched);
175 true
176 } else {
177 false
178 }
179 }
180
181 /// Wakes all tasks in the wait queue.
182 ///
183 /// If `resched` is true, the current task will be preempted when the
184 /// preemption is enabled.
185 pub fn notify_all(&self, resched: bool) {
186 while self.notify_one(resched) {
187 // loop until the wait queue is empty
188 }
189 }
190
191 /// Wake up the given task in the wait queue.
192 ///
193 /// If `resched` is true, the current task will be preempted when the
194 /// preemption is enabled.
195 pub fn notify_task(&mut self, resched: bool, task: &AxTaskRef) -> bool {
196 let mut wq = self.queue.lock();
197 if let Some(index) = wq.iter().position(|t| Arc::ptr_eq(t, task)) {
198 unblock_one_task(wq.remove(index).unwrap(), resched);
199 true
200 } else {
201 false
202 }
203 }
204}
205
206fn unblock_one_task(task: AxTaskRef, resched: bool) {
207 // Mark task as not in wait queue.
208 task.set_in_wait_queue(false);
209 // Select run queue by the CPU set of the task.
210 // Use `NoOp` kernel guard here because the function is called with holding the
211 // lock of wait queue, where the irq and preemption are disabled.
212 select_run_queue::<NoOp>(&task).unblock_task(task, resched)
213}