task.rs 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. use futures_util::task::ArcWake;
  2. use super::{Scheduler, SchedulerMsg};
  3. use crate::ScopeId;
  4. use std::cell::RefCell;
  5. use std::future::Future;
  6. use std::pin::Pin;
  7. use std::sync::Arc;
  8. use std::task::Waker;
  9. /// A task's unique identifier.
  10. ///
  11. /// `TaskId` is a `usize` that is unique across the entire VirtualDOM and across time. TaskIDs will never be reused
  12. /// once a Task has been completed.
  13. #[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))]
  14. #[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
  15. pub struct TaskId(pub usize);
  16. /// the task itself is the waker
  17. pub(crate) struct LocalTask {
  18. pub scope: ScopeId,
  19. pub task: RefCell<Pin<Box<dyn Future<Output = ()> + 'static>>>,
  20. pub waker: Waker,
  21. }
  22. impl Scheduler {
  23. /// Start a new future on the same thread as the rest of the VirtualDom.
  24. ///
  25. /// This future will not contribute to suspense resolving, so you should primarily use this for reacting to changes
  26. /// and long running tasks.
  27. ///
  28. /// Whenever the component that owns this future is dropped, the future will be dropped as well.
  29. ///
  30. /// Spawning a future onto the root scope will cause it to be dropped when the root component is dropped - which
  31. /// will only occur when the VirtuaalDom itself has been dropped.
  32. pub fn spawn(&self, scope: ScopeId, task: impl Future<Output = ()> + 'static) -> TaskId {
  33. let mut tasks = self.tasks.borrow_mut();
  34. let entry = tasks.vacant_entry();
  35. let task_id = TaskId(entry.key());
  36. let task = LocalTask {
  37. task: RefCell::new(Box::pin(task)),
  38. scope,
  39. waker: futures_util::task::waker(Arc::new(LocalTaskHandle {
  40. id: task_id,
  41. tx: self.sender.clone(),
  42. })),
  43. };
  44. let mut cx = std::task::Context::from_waker(&task.waker);
  45. if !task.task.borrow_mut().as_mut().poll(&mut cx).is_ready() {
  46. self.sender
  47. .unbounded_send(SchedulerMsg::TaskNotified(task_id))
  48. .expect("Scheduler should exist");
  49. }
  50. entry.insert(task);
  51. task_id
  52. }
  53. /// Drop the future with the given TaskId
  54. ///
  55. /// This does not abort the task, so you'll want to wrap it in an aborthandle if that's important to you
  56. pub fn remove(&self, id: TaskId) -> Option<LocalTask> {
  57. self.tasks.borrow_mut().try_remove(id.0)
  58. }
  59. }
  60. pub struct LocalTaskHandle {
  61. id: TaskId,
  62. tx: futures_channel::mpsc::UnboundedSender<SchedulerMsg>,
  63. }
  64. impl ArcWake for LocalTaskHandle {
  65. fn wake_by_ref(arc_self: &Arc<Self>) {
  66. // This can fail if the scheduler has been dropped while the application is shutting down
  67. let _ = arc_self
  68. .tx
  69. .unbounded_send(SchedulerMsg::TaskNotified(arc_self.id));
  70. }
  71. }