tasks.rs 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. //! The TaskQueue serves as a centralized async store for all tasks in Dioxus.
  2. //! When a component renders, it may submit an async task to the queue.
  3. //!
  4. //! Then the task complete, it is emitted from the virtual dom in the event loop, which is then fed back into the virtualdom
  5. //! as an event trigger.
  6. //!
  7. //! When a component is scheduled to re-render, the awaing task must be dumped from the queue.
  8. //!
  9. //! This is all pretty unsafe stuff.
  10. //! The major invariant here is that tasks that enter the queue may be invalidated during transitions.
  11. use std::{
  12. cell::Cell,
  13. sync::{Arc, RwLock},
  14. };
  15. use futures_util::{stream::FuturesUnordered, Future, Stream, StreamExt};
  16. use slotmap::{DefaultKey, SlotMap};
  17. use crate::innerlude::{EventTrigger, FiberTask, ScopeId};
  18. pub type TaskSubmitter = Arc<dyn Fn(FiberTask)>;
  19. pub struct TaskQueue {
  20. slots: Arc<RwLock<FuturesUnordered<FiberTask>>>,
  21. // slots: Arc<RwLock<SlotMap<DefaultKey, DTask>>>,
  22. submitter: TaskSubmitter,
  23. }
  24. impl TaskQueue {
  25. pub fn new() -> Self {
  26. let slots = Arc::new(RwLock::new(FuturesUnordered::new()));
  27. let slots2 = slots.clone();
  28. let submitter = Arc::new(move |task| {
  29. let mut slots = slots2.write().unwrap();
  30. log::debug!("Task submitted into global task queue");
  31. slots.push(task);
  32. });
  33. Self { slots, submitter }
  34. }
  35. pub fn new_submitter(&self) -> TaskSubmitter {
  36. self.submitter.clone()
  37. }
  38. pub fn submit_task(&mut self, task: FiberTask) {
  39. self.slots.write().unwrap().push(task);
  40. // TaskHandle { key }
  41. }
  42. pub fn is_empty(&self) -> bool {
  43. self.slots.read().unwrap().is_empty()
  44. }
  45. pub fn len(&self) -> usize {
  46. self.slots.read().unwrap().len()
  47. }
  48. pub async fn next(&mut self) -> Option<EventTrigger> {
  49. let mut slots = self.slots.write().unwrap();
  50. slots.next().await
  51. }
  52. }
  53. // impl Stream for TaskQueue {
  54. // type Item = EventTrigger;
  55. // /// We can never be finished polling
  56. // fn poll_next(
  57. // self: Pin<&mut Self>,
  58. // cx: &mut std::task::Context<'_>,
  59. // ) -> std::task::Poll<Option<Self::Item>> {
  60. // // let yield_every = self.len();
  61. // // let mut polled = 0;
  62. // let mut slots = self.slots.write().unwrap();
  63. // for (_key, slot) in slots.iter_mut() {
  64. // if slot.dead.get() {
  65. // continue;
  66. // }
  67. // let r = slot.fut;
  68. // // let fut = unsafe { &mut *r };
  69. // // use futures::{future::Future, poll, FutureExt};
  70. // let f2 = fut.as_mut();
  71. // let w = cx.waker();
  72. // let mut cx = Context::from_waker(&w);
  73. // // Pin::new_unchecked(pointer)
  74. // // use std::future::Future;
  75. // match f2.poll(&mut cx) {
  76. // Poll::Ready(_) => {
  77. // let trigger = EventTrigger::new_from_task(slot.originator);
  78. // slot.dead.set(true);
  79. // return Poll::Ready(Some(trigger));
  80. // }
  81. // Poll::Pending => continue,
  82. // }
  83. // }
  84. // // we tried polling every active task.
  85. // // give up and relinquish controlto the parent
  86. // // We have polled a large number of futures in a row without yielding.
  87. // // To ensure we do not starve other tasks waiting on the executor,
  88. // // we yield here, but immediately wake ourselves up to continue.
  89. // // cx.waker().wake_by_ref();
  90. // return Poll::Pending;
  91. // }
  92. // }
  93. pub struct TaskHandle {
  94. key: DefaultKey,
  95. }
  96. pub struct DTask {
  97. fut: FiberTask,
  98. originator: ScopeId,
  99. dead: Cell<bool>,
  100. }
  101. impl DTask {
  102. pub fn new(fut: FiberTask, originator: ScopeId) -> Self {
  103. Self {
  104. fut,
  105. originator,
  106. dead: Cell::new(false),
  107. }
  108. }
  109. pub fn debug_new(fut: FiberTask) -> Self {
  110. let originator = ScopeId::default();
  111. Self {
  112. fut,
  113. originator,
  114. dead: Cell::new(false),
  115. }
  116. }
  117. }