tasks.rs 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. use crate::innerlude::{remove_future, spawn, Runtime};
  2. use crate::ScopeId;
  3. use futures_util::task::ArcWake;
  4. use std::sync::Arc;
  5. use std::task::Waker;
  6. use std::{cell::Cell, future::Future};
  7. use std::{cell::RefCell, rc::Rc};
  8. use std::{pin::Pin, task::Poll};
  9. /// A task's unique identifier.
  10. ///
  11. /// `Task` is a unique identifier for a task that has been spawned onto the runtime. It can be used to cancel the task
  12. #[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))]
  13. #[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
  14. pub struct Task(pub(crate) usize);
  15. impl Task {
  16. /// Start a new future on the same thread as the rest of the VirtualDom.
  17. ///
  18. /// This future will not contribute to suspense resolving, so you should primarily use this for reacting to changes
  19. /// and long running tasks.
  20. ///
  21. /// Whenever the component that owns this future is dropped, the future will be dropped as well.
  22. ///
  23. /// Spawning a future onto the root scope will cause it to be dropped when the root component is dropped - which
  24. /// will only occur when the VirtualDom itself has been dropped.
  25. pub fn new(task: impl Future<Output = ()> + 'static) -> Self {
  26. spawn(task)
  27. }
  28. /// Drop the task immediately.
  29. ///
  30. /// This does not abort the task, so you'll want to wrap it in an abort handle if that's important to you
  31. pub fn cancel(self) {
  32. remove_future(self);
  33. }
  34. /// Pause the task.
  35. pub fn pause(&self) {
  36. self.set_active(false);
  37. }
  38. /// Resume the task.
  39. pub fn resume(&self) {
  40. self.set_active(true);
  41. }
  42. /// Check if the task is paused.
  43. pub fn paused(&self) -> bool {
  44. Runtime::with(|rt| {
  45. if let Some(task) = rt.tasks.borrow().get(self.0) {
  46. !task.active.get()
  47. } else {
  48. false
  49. }
  50. })
  51. .unwrap_or_default()
  52. }
  53. /// Wake the task.
  54. pub fn wake(&self) {
  55. Runtime::with(|rt| _ = rt.sender.unbounded_send(SchedulerMsg::TaskNotified(*self)));
  56. }
  57. /// Poll the task immediately.
  58. pub fn poll_now(&self) -> Poll<()> {
  59. Runtime::with(|rt| rt.handle_task_wakeup(*self)).unwrap()
  60. }
  61. /// Set the task as active or paused.
  62. pub fn set_active(&self, active: bool) {
  63. Runtime::with(|rt| {
  64. if let Some(task) = rt.tasks.borrow().get(self.0) {
  65. let was_active = task.active.replace(active);
  66. if !was_active && active {
  67. _ = rt.sender.unbounded_send(SchedulerMsg::TaskNotified(*self));
  68. }
  69. }
  70. });
  71. }
  72. }
  73. impl Runtime {
  74. /// Start a new future on the same thread as the rest of the VirtualDom.
  75. ///
  76. /// This future will not contribute to suspense resolving, so you should primarily use this for reacting to changes
  77. /// and long running tasks.
  78. ///
  79. /// Whenever the component that owns this future is dropped, the future will be dropped as well.
  80. ///
  81. /// Spawning a future onto the root scope will cause it to be dropped when the root component is dropped - which
  82. /// will only occur when the VirtualDom itself has been dropped.
  83. pub fn spawn(&self, scope: ScopeId, task: impl Future<Output = ()> + 'static) -> Task {
  84. // Insert the task, temporarily holding a borrow on the tasks map
  85. let (task, task_id) = {
  86. let mut tasks = self.tasks.borrow_mut();
  87. let entry = tasks.vacant_entry();
  88. let task_id = Task(entry.key());
  89. let task = Rc::new(LocalTask {
  90. scope,
  91. active: Cell::new(true),
  92. parent: self.current_task(),
  93. task: RefCell::new(Box::pin(task)),
  94. waker: futures_util::task::waker(Arc::new(LocalTaskHandle {
  95. id: task_id,
  96. tx: self.sender.clone(),
  97. })),
  98. });
  99. entry.insert(task.clone());
  100. (task, task_id)
  101. };
  102. // Get a borrow on the task, holding no borrows on the tasks map
  103. debug_assert!(self.tasks.try_borrow_mut().is_ok());
  104. debug_assert!(task.task.try_borrow_mut().is_ok());
  105. self.sender
  106. .unbounded_send(SchedulerMsg::TaskNotified(task_id))
  107. .expect("Scheduler should exist");
  108. task_id
  109. }
  110. /// Get the currently running task
  111. pub fn current_task(&self) -> Option<Task> {
  112. self.current_task.get()
  113. }
  114. /// Get the parent task of the given task, if it exists
  115. pub fn parent_task(&self, task: Task) -> Option<Task> {
  116. self.tasks.borrow().get(task.0)?.parent
  117. }
  118. pub(crate) fn task_scope(&self, task: Task) -> Option<ScopeId> {
  119. self.tasks.borrow().get(task.0).map(|t| t.scope)
  120. }
  121. pub(crate) fn handle_task_wakeup(&self, id: Task) -> Poll<()> {
  122. debug_assert!(Runtime::current().is_some(), "Must be in a dioxus runtime");
  123. let task = self.tasks.borrow().get(id.0).cloned();
  124. // The task was removed from the scheduler, so we can just ignore it
  125. let Some(task) = task else {
  126. return Poll::Ready(());
  127. };
  128. // If a task woke up but is paused, we can just ignore it
  129. if !task.active.get() {
  130. return Poll::Pending;
  131. }
  132. let mut cx = std::task::Context::from_waker(&task.waker);
  133. // update the scope stack
  134. self.scope_stack.borrow_mut().push(task.scope);
  135. self.rendering.set(false);
  136. self.current_task.set(Some(id));
  137. let poll_result = task.task.borrow_mut().as_mut().poll(&mut cx);
  138. if poll_result.is_ready() {
  139. // Remove it from the scope so we dont try to double drop it when the scope dropes
  140. self.get_state(task.scope)
  141. .unwrap()
  142. .spawned_tasks
  143. .borrow_mut()
  144. .remove(&id);
  145. self.remove_task(id);
  146. }
  147. // Remove the scope from the stack
  148. self.scope_stack.borrow_mut().pop();
  149. self.rendering.set(true);
  150. self.current_task.set(None);
  151. poll_result
  152. }
  153. /// Drop the future with the given TaskId
  154. ///
  155. /// This does not abort the task, so you'll want to wrap it in an abort handle if that's important to you
  156. pub(crate) fn remove_task(&self, id: Task) -> Option<Rc<LocalTask>> {
  157. self.suspended_tasks.borrow_mut().remove(&id);
  158. self.tasks.borrow_mut().try_remove(id.0)
  159. }
  160. }
  161. /// the task itself is the waker
  162. pub(crate) struct LocalTask {
  163. scope: ScopeId,
  164. parent: Option<Task>,
  165. task: RefCell<Pin<Box<dyn Future<Output = ()> + 'static>>>,
  166. waker: Waker,
  167. active: Cell<bool>,
  168. }
  169. /// The type of message that can be sent to the scheduler.
  170. ///
  171. /// These messages control how the scheduler will process updates to the UI.
  172. #[derive(Debug)]
  173. pub(crate) enum SchedulerMsg {
  174. /// Immediate updates from Components that mark them as dirty
  175. Immediate(ScopeId),
  176. /// A task has woken and needs to be progressed
  177. TaskNotified(Task),
  178. }
  179. struct LocalTaskHandle {
  180. id: Task,
  181. tx: futures_channel::mpsc::UnboundedSender<SchedulerMsg>,
  182. }
  183. impl ArcWake for LocalTaskHandle {
  184. fn wake_by_ref(arc_self: &Arc<Self>) {
  185. _ = arc_self
  186. .tx
  187. .unbounded_send(SchedulerMsg::TaskNotified(arc_self.id));
  188. }
  189. }