1
0

tasks.rs 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. use crate::innerlude::{remove_future, spawn, Runtime};
  2. use crate::ScopeId;
  3. use futures_util::task::ArcWake;
  4. use std::pin::Pin;
  5. use std::sync::Arc;
  6. use std::task::Waker;
  7. use std::{cell::Cell, future::Future};
  8. use std::{cell::RefCell, rc::Rc};
  9. /// A task's unique identifier.
  10. ///
  11. /// `Task` is a unique identifier for a task that has been spawned onto the runtime. It can be used to cancel the task
  12. #[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))]
  13. #[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
  14. pub struct Task(pub(crate) usize);
  15. impl Task {
  16. /// Start a new future on the same thread as the rest of the VirtualDom.
  17. ///
  18. /// This future will not contribute to suspense resolving, so you should primarily use this for reacting to changes
  19. /// and long running tasks.
  20. ///
  21. /// Whenever the component that owns this future is dropped, the future will be dropped as well.
  22. ///
  23. /// Spawning a future onto the root scope will cause it to be dropped when the root component is dropped - which
  24. /// will only occur when the VirtualDom itself has been dropped.
  25. pub fn new(task: impl Future<Output = ()> + 'static) -> Self {
  26. spawn(task)
  27. }
  28. /// Drop the task immediately.
  29. ///
  30. /// This does not abort the task, so you'll want to wrap it in an abort handle if that's important to you
  31. pub fn stop(self) {
  32. remove_future(self);
  33. }
  34. /// Pause the task.
  35. pub fn pause(&self) {
  36. Runtime::with(|rt| rt.tasks.borrow()[self.0].active.set(false));
  37. }
  38. /// Check if the task is paused.
  39. pub fn paused(&self) -> bool {
  40. Runtime::with(|rt| !rt.tasks.borrow()[self.0].active.get()).unwrap_or_default()
  41. }
  42. pub fn wake(&self) {
  43. Runtime::with(|rt| _ = rt.sender.unbounded_send(SchedulerMsg::TaskNotified(*self)));
  44. }
  45. pub fn set_active(&self, active: bool) {
  46. Runtime::with(|rt| rt.tasks.borrow()[self.0].active.set(active));
  47. }
  48. /// Resume the task.
  49. pub fn resume(&self) {
  50. Runtime::with(|rt| {
  51. // set the active flag, and then ping the scheduler to ensure the task gets queued
  52. let was_active = rt.tasks.borrow()[self.0].active.replace(true);
  53. if !was_active {
  54. _ = rt.sender.unbounded_send(SchedulerMsg::TaskNotified(*self));
  55. }
  56. });
  57. }
  58. }
  59. impl Runtime {
  60. /// Start a new future on the same thread as the rest of the VirtualDom.
  61. ///
  62. /// This future will not contribute to suspense resolving, so you should primarily use this for reacting to changes
  63. /// and long running tasks.
  64. ///
  65. /// Whenever the component that owns this future is dropped, the future will be dropped as well.
  66. ///
  67. /// Spawning a future onto the root scope will cause it to be dropped when the root component is dropped - which
  68. /// will only occur when the VirtualDom itself has been dropped.
  69. pub fn spawn(&self, scope: ScopeId, task: impl Future<Output = ()> + 'static) -> Task {
  70. // Insert the task, temporarily holding a borrow on the tasks map
  71. let (task, task_id) = {
  72. let mut tasks = self.tasks.borrow_mut();
  73. let entry = tasks.vacant_entry();
  74. let task_id = Task(entry.key());
  75. let task = Rc::new(LocalTask {
  76. scope,
  77. active: Cell::new(true),
  78. parent: self.current_task(),
  79. task: RefCell::new(Box::pin(task)),
  80. waker: futures_util::task::waker(Arc::new(LocalTaskHandle {
  81. id: task_id,
  82. tx: self.sender.clone(),
  83. })),
  84. });
  85. entry.insert(task.clone());
  86. (task, task_id)
  87. };
  88. // Get a borrow on the task, holding no borrows on the tasks map
  89. debug_assert!(self.tasks.try_borrow_mut().is_ok());
  90. debug_assert!(task.task.try_borrow_mut().is_ok());
  91. self.sender
  92. .unbounded_send(SchedulerMsg::TaskNotified(task_id))
  93. .expect("Scheduler should exist");
  94. task_id
  95. }
  96. /// Get the currently running task
  97. pub fn current_task(&self) -> Option<Task> {
  98. self.current_task.get()
  99. }
  100. /// Get the parent task of the given task, if it exists
  101. pub fn parent_task(&self, task: Task) -> Option<Task> {
  102. self.tasks.borrow().get(task.0)?.parent
  103. }
  104. /// Add this task to the queue of tasks that will manually get poked when the scheduler is flushed
  105. pub(crate) fn add_to_flush_table(&self) -> Task {
  106. let value = self.current_task().unwrap();
  107. self.flush_table.borrow_mut().insert(value);
  108. value
  109. }
  110. pub(crate) fn handle_task_wakeup(&self, id: Task) {
  111. debug_assert!(Runtime::current().is_some(), "Must be in a dioxus runtime");
  112. let task = self.tasks.borrow().get(id.0).cloned();
  113. // The task was removed from the scheduler, so we can just ignore it
  114. let Some(task) = task else {
  115. return;
  116. };
  117. // If a task woke up but is paused, we can just ignore it
  118. if !task.active.get() {
  119. return;
  120. }
  121. let mut cx = std::task::Context::from_waker(&task.waker);
  122. // update the scope stack
  123. self.scope_stack.borrow_mut().push(task.scope);
  124. self.rendering.set(false);
  125. self.current_task.set(Some(id));
  126. if task.task.borrow_mut().as_mut().poll(&mut cx).is_ready() {
  127. // Remove it from the scope so we dont try to double drop it when the scope dropes
  128. self.get_state(task.scope)
  129. .unwrap()
  130. .spawned_tasks
  131. .borrow_mut()
  132. .remove(&id);
  133. // Remove it from the scheduler
  134. self.tasks.borrow_mut().try_remove(id.0);
  135. }
  136. // Remove the scope from the stack
  137. self.scope_stack.borrow_mut().pop();
  138. self.rendering.set(true);
  139. self.current_task.set(None);
  140. }
  141. /// Drop the future with the given TaskId
  142. ///
  143. /// This does not abort the task, so you'll want to wrap it in an abort handle if that's important to you
  144. pub(crate) fn remove_task(&self, id: Task) -> Option<Rc<LocalTask>> {
  145. self.tasks.borrow_mut().try_remove(id.0)
  146. }
  147. }
  148. /// the task itself is the waker
  149. pub(crate) struct LocalTask {
  150. scope: ScopeId,
  151. parent: Option<Task>,
  152. task: RefCell<Pin<Box<dyn Future<Output = ()> + 'static>>>,
  153. waker: Waker,
  154. active: Cell<bool>,
  155. }
  156. /// The type of message that can be sent to the scheduler.
  157. ///
  158. /// These messages control how the scheduler will process updates to the UI.
  159. #[derive(Debug)]
  160. pub(crate) enum SchedulerMsg {
  161. /// Immediate updates from Components that mark them as dirty
  162. Immediate(ScopeId),
  163. /// A task has woken and needs to be progressed
  164. TaskNotified(Task),
  165. }
  166. struct LocalTaskHandle {
  167. id: Task,
  168. tx: futures_channel::mpsc::UnboundedSender<SchedulerMsg>,
  169. }
  170. impl ArcWake for LocalTaskHandle {
  171. fn wake_by_ref(arc_self: &Arc<Self>) {
  172. _ = arc_self
  173. .tx
  174. .unbounded_send(SchedulerMsg::TaskNotified(arc_self.id));
  175. }
  176. }