Bläddra i källkod

wip: rip out unsafe task engine

Jonathan Kelley 3 år sedan
förälder
incheckning
c7d001cbb4

+ 1 - 1
examples/tasks.rs

@@ -12,7 +12,7 @@ fn main() {
 fn app(cx: Scope<()>) -> Element {
     let mut count = use_state(&cx, || 0);
 
-    cx.push_task(|| async move {
+    cx.push_future(|| async move {
         tokio::time::sleep(Duration::from_millis(100)).await;
         count += 1;
     });

+ 1 - 1
packages/core/examples/borrowed.rs

@@ -7,7 +7,7 @@ fn main() {}
 
 fn app(cx: Scope) -> Element {
     cx.render(rsx!(div {
-        app2 (
+        app2(
             p: "asd"
         )
     }))

+ 2 - 2
packages/core/src/lib.rs

@@ -67,8 +67,8 @@ pub(crate) mod innerlude {
 pub use crate::innerlude::{
     Attribute, Component, DioxusElement, DomEdit, Element, ElementId, EventHandler, EventPriority,
     IntoVNode, LazyNodes, Listener, Mutations, NodeFactory, Properties, SchedulerMsg, Scope,
-    ScopeId, ScopeState, UserEvent, VComponent, VElement, VFragment, VNode, VPlaceholder, VText,
-    VirtualDom,
+    ScopeId, ScopeState, TaskId, UserEvent, VComponent, VElement, VFragment, VNode, VPlaceholder,
+    VText, VirtualDom,
 };
 
 pub mod prelude {

+ 75 - 87
packages/core/src/scopes.rs

@@ -1,5 +1,7 @@
+use crate::{innerlude::*, unsafe_utils::extend_vnode};
+use bumpalo::Bump;
 use futures_channel::mpsc::UnboundedSender;
-use fxhash::{FxHashMap, FxHashSet};
+use fxhash::FxHashMap;
 use slab::Slab;
 use std::{
     any::{Any, TypeId},
@@ -11,9 +13,6 @@ use std::{
     rc::Rc,
 };
 
-use crate::{innerlude::*, unsafe_utils::extend_vnode};
-use bumpalo::{boxed::Box as BumpBox, Bump};
-
 pub(crate) type FcSlot = *const ();
 
 pub(crate) struct Heuristic {
@@ -26,14 +25,13 @@ pub(crate) struct Heuristic {
 //
 // has an internal heuristics engine to pre-allocate arenas to the right size
 pub(crate) struct ScopeArena {
-    pub pending_futures: RefCell<FxHashSet<ScopeId>>,
-    pub scope_counter: Cell<usize>,
-    pub sender: UnboundedSender<SchedulerMsg>,
+    pub scope_gen: Cell<usize>,
     pub bump: Bump,
     pub scopes: RefCell<FxHashMap<ScopeId, *mut ScopeState>>,
     pub heuristics: RefCell<FxHashMap<FcSlot, Heuristic>>,
     pub free_scopes: RefCell<Vec<*mut ScopeState>>,
     pub nodes: RefCell<Slab<*const VNode<'static>>>,
+    pub tasks: Rc<TaskQueue>,
 }
 
 impl ScopeArena {
@@ -61,14 +59,13 @@ impl ScopeArena {
         debug_assert_eq!(root_id, 0);
 
         Self {
-            scope_counter: Cell::new(0),
+            scope_gen: Cell::new(0),
             bump,
-            pending_futures: RefCell::new(FxHashSet::default()),
             scopes: RefCell::new(FxHashMap::default()),
             heuristics: RefCell::new(FxHashMap::default()),
             free_scopes: RefCell::new(Vec::new()),
             nodes: RefCell::new(nodes),
-            sender,
+            tasks: TaskQueue::new(sender),
         }
     }
 
@@ -92,8 +89,8 @@ impl ScopeArena {
         subtree: u32,
     ) -> ScopeId {
         // Increment the ScopeId system. ScopeIDs are never reused
-        let new_scope_id = ScopeId(self.scope_counter.get());
-        self.scope_counter.set(self.scope_counter.get() + 1);
+        let new_scope_id = ScopeId(self.scope_gen.get());
+        self.scope_gen.set(self.scope_gen.get() + 1);
 
         // Get the height of the scope
         let height = parent_scope
@@ -129,9 +126,9 @@ impl ScopeArena {
                     height,
                     container,
                     new_scope_id,
-                    self.sender.clone(),
                     parent_scope,
                     vcomp,
+                    self.tasks.clone(),
                     self.heuristics
                         .borrow()
                         .get(&fc_ptr)
@@ -236,15 +233,11 @@ impl ScopeArena {
             // - We've dropped all references to the wip bump frame with "ensure_drop_safety"
             unsafe { scope.reset_wip_frame() };
 
-            let mut items = scope.items.borrow_mut();
-
-            // just forget about our suspended nodes while we're at it
-            items.tasks.clear();
+            let items = scope.items.borrow();
 
             // guarantee that we haven't screwed up - there should be no latent references anywhere
             debug_assert!(items.listeners.is_empty());
             debug_assert!(items.borrowed_props.is_empty());
-            debug_assert!(items.tasks.is_empty());
         }
 
         // safety: this is definitely not dropped
@@ -263,10 +256,6 @@ impl ScopeArena {
         let props = scope.props.borrow();
         let render = props.as_ref().unwrap();
         if let Some(node) = render.render(scope) {
-            if !scope.items.borrow().tasks.is_empty() {
-                self.pending_futures.borrow_mut().insert(id);
-            }
-
             let frame = scope.wip_frame();
             let node = frame.bump.alloc(node);
             frame.node.set(unsafe { extend_vnode(node) });
@@ -366,26 +355,6 @@ impl<P> Clone for Scope<'_, P> {
     }
 }
 
-impl<'src, P> Scope<'src, P> {
-    pub fn register_task(self, task: impl Future<Output = ()> + 'src) {
-        let fut: &'src mut dyn Future<Output = ()> = self.scope.bump().alloc(task);
-        let boxed_fut: BumpBox<'src, dyn Future<Output = ()> + 'src> =
-            unsafe { BumpBox::from_raw(fut) };
-        let pinned_fut: Pin<BumpBox<'src, _>> = boxed_fut.into();
-
-        // erase the 'src lifetime for self-referential storage
-        // todo: provide a miri test around this
-        // concerned about segfaulting
-        let self_ref_fut = unsafe { std::mem::transmute(pinned_fut) };
-
-        self.sender
-            .unbounded_send(SchedulerMsg::NewTask(self.our_arena_idx))
-            .unwrap();
-
-        self.scope.items.borrow_mut().tasks.push(self_ref_fut);
-    }
-}
-
 impl<'a, P> std::ops::Deref for Scope<'a, P> {
     // rust will auto deref again to the original 'a lifetime at the call site
     type Target = &'a ScopeState;
@@ -402,6 +371,14 @@ impl<'a, P> std::ops::Deref for Scope<'a, P> {
 #[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
 pub struct ScopeId(pub usize);
 
+/// A task's unique identifier.
+///
+/// `TaskId` is a `usize` that is unique across the entire VirtualDOM and across time. TaskIDs will never be reused
+/// once a Task has been completed.
+#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))]
+#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
+pub struct TaskId(pub usize);
+
 /// Every component in Dioxus is represented by a `ScopeState`.
 ///
 /// Scopes contain the state for hooks, the component's props, and other lifecycle information.
@@ -416,12 +393,10 @@ pub struct ScopeState {
     pub(crate) container: ElementId,
     pub(crate) our_arena_idx: ScopeId,
     pub(crate) height: u32,
-    pub(crate) sender: UnboundedSender<SchedulerMsg>,
 
     // todo: subtrees
     pub(crate) is_subtree_root: Cell<bool>,
     pub(crate) subtree: Cell<u32>,
-
     pub(crate) props: RefCell<Option<Box<dyn AnyProps>>>,
 
     // nodes, items
@@ -436,12 +411,12 @@ pub struct ScopeState {
 
     // shared state -> todo: move this out of scopestate
     pub(crate) shared_contexts: RefCell<HashMap<TypeId, Rc<dyn Any>>>,
+    pub(crate) tasks: Rc<TaskQueue>,
 }
 
 pub struct SelfReferentialItems<'a> {
     pub(crate) listeners: Vec<&'a Listener<'a>>,
     pub(crate) borrowed_props: Vec<&'a VComponent<'a>>,
-    pub(crate) tasks: Vec<Pin<BumpBox<'a, dyn Future<Output = ()>>>>,
 }
 
 // Public methods exposed to libraries and components
@@ -450,13 +425,12 @@ impl ScopeState {
         height: u32,
         container: ElementId,
         our_arena_idx: ScopeId,
-        sender: UnboundedSender<SchedulerMsg>,
         parent_scope: Option<*mut ScopeState>,
         vcomp: Box<dyn AnyProps>,
+        tasks: Rc<TaskQueue>,
         (node_capacity, hook_capacity): (usize, usize),
     ) -> Self {
         ScopeState {
-            sender,
             container,
             our_arena_idx,
             parent_scope,
@@ -470,12 +444,12 @@ impl ScopeState {
 
             generation: 0.into(),
 
+            tasks,
             shared_contexts: Default::default(),
 
             items: RefCell::new(SelfReferentialItems {
                 listeners: Default::default(),
                 borrowed_props: Default::default(),
-                tasks: Default::default(),
             }),
 
             hook_arena: Bump::new(),
@@ -590,7 +564,7 @@ impl ScopeState {
     ///
     /// ## Notice: you should prefer using prepare_update and get_scope_id
     pub fn schedule_update(&self) -> Rc<dyn Fn() + 'static> {
-        let (chan, id) = (self.sender.clone(), self.scope_id());
+        let (chan, id) = (self.tasks.sender.clone(), self.scope_id());
         Rc::new(move || {
             let _ = chan.unbounded_send(SchedulerMsg::Immediate(id));
         })
@@ -602,7 +576,7 @@ impl ScopeState {
     ///
     /// This method should be used when you want to schedule an update for a component
     pub fn schedule_update_any(&self) -> Rc<dyn Fn(ScopeId)> {
-        let chan = self.sender.clone();
+        let chan = self.tasks.sender.clone();
         Rc::new(move |id| {
             let _ = chan.unbounded_send(SchedulerMsg::Immediate(id));
         })
@@ -619,7 +593,10 @@ impl ScopeState {
     ///
     /// `ScopeId` is not unique for the lifetime of the VirtualDom - a ScopeId will be reused if a component is unmounted.
     pub fn needs_update_any(&self, id: ScopeId) {
-        let _ = self.sender.unbounded_send(SchedulerMsg::Immediate(id));
+        let _ = self
+            .tasks
+            .sender
+            .unbounded_send(SchedulerMsg::Immediate(id));
     }
 
     /// Get the Root Node of this scope
@@ -682,38 +659,19 @@ impl ScopeState {
     /// Pushes the future onto the poll queue to be polled after the component renders.
     ///
     /// The future is forcibly dropped if the component is not ready by the next render
-    pub fn push_task<'src, F>(&'src self, fut: F) -> usize
-    where
-        F: Future<Output = ()>,
-        F::Output: 'src,
-        F: 'src,
-    {
-        // wrap it in a type that will actually drop the contents
-        //
-        // Safety: we just made the pointer above and will promise not to alias it!
-        // The main reason we do this through from_raw is because Bumpalo's box does
-        // not support unsized coercion
-
-        let fut: &'src mut dyn Future<Output = ()> = self.bump().alloc(fut);
-        let boxed_fut: BumpBox<'src, dyn Future<Output = ()> + 'src> =
-            unsafe { BumpBox::from_raw(fut) };
-        let pinned_fut: Pin<BumpBox<'src, _>> = boxed_fut.into();
-
-        // erase the 'src lifetime for self-referential storage
-        // todo: provide a miri test around this
-        // concerned about segfaulting
-        let self_ref_fut = unsafe {
-            std::mem::transmute::<Pin<BumpBox<'src, _>>, Pin<BumpBox<'static, _>>>(pinned_fut)
-        };
-
-        self.sender
+    pub fn push_future(&self, fut: impl Future<Output = ()> + 'static) -> TaskId {
+        // wake up the scheduler if it is sleeping
+        self.tasks
+            .sender
             .unbounded_send(SchedulerMsg::NewTask(self.our_arena_idx))
             .unwrap();
 
-        // Push the future into the tasks
-        let mut items = self.items.borrow_mut();
-        items.tasks.push(self_ref_fut);
-        items.tasks.len() - 1
+        self.tasks.push_fut(fut)
+    }
+
+    // todo: attach some state to the future to know if we should poll it
+    pub fn remove_future(&self, id: TaskId) {
+        self.tasks.remove_fut(id);
     }
 
     /// Take a lazy VNode structure and actually build it with the context of the VDom's efficient VNode allocator.
@@ -834,11 +792,6 @@ impl ScopeState {
         self.generation.set(self.generation.get() + 1);
     }
 
-    /// Get the [`Bump`] of the WIP frame.
-    pub(crate) fn bump(&self) -> &Bump {
-        &self.wip_frame().bump
-    }
-
     // todo: disable bookkeeping on drop (unncessary)
     pub(crate) fn reset(&mut self) {
         // first: book keaping
@@ -855,11 +808,9 @@ impl ScopeState {
         let SelfReferentialItems {
             borrowed_props,
             listeners,
-            tasks,
         } = self.items.get_mut();
         borrowed_props.clear();
         listeners.clear();
-        tasks.clear();
         self.frames[0].reset();
         self.frames[1].reset();
 
@@ -905,6 +856,43 @@ impl BumpFrame {
     }
 }
 
+pub(crate) struct TaskQueue {
+    pub(crate) tasks: RefCell<FxHashMap<TaskId, InnerTask>>,
+    gen: Cell<usize>,
+    sender: UnboundedSender<SchedulerMsg>,
+}
+pub(crate) type InnerTask = Pin<Box<dyn Future<Output = ()>>>;
+impl TaskQueue {
+    fn new(sender: UnboundedSender<SchedulerMsg>) -> Rc<Self> {
+        Rc::new(Self {
+            tasks: RefCell::new(FxHashMap::default()),
+            gen: Cell::new(0),
+            sender,
+        })
+    }
+    fn push_fut(&self, task: impl Future<Output = ()> + 'static) -> TaskId {
+        let pinned = Box::pin(task);
+        let id = self.gen.get();
+        self.gen.set(id + 1);
+        let tid = TaskId(id);
+
+        self.tasks.borrow_mut().insert(tid, pinned);
+        tid
+    }
+    fn remove_fut(&self, id: TaskId) {
+        if let Ok(mut tasks) = self.tasks.try_borrow_mut() {
+            let _ = tasks.remove(&id);
+        } else {
+            // todo: it should be okay to remote a fut while the queue is being polled
+            // However, it's not currently possible to do that.
+            log::debug!("Unable to remove task from task queue. This is probably a bug.");
+        }
+    }
+    pub(crate) fn has_tasks(&self) -> bool {
+        !self.tasks.borrow().is_empty()
+    }
+}
+
 #[test]
 fn sizeof() {
     dbg!(std::mem::size_of::<ScopeState>());

+ 16 - 30
packages/core/src/virtual_dom.rs

@@ -7,7 +7,6 @@ use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
 use futures_util::{Future, StreamExt};
 use fxhash::FxHashSet;
 use indexmap::IndexSet;
-use smallvec::SmallVec;
 use std::{any::Any, collections::VecDeque, iter::FromIterator, pin::Pin, sync::Arc, task::Poll};
 
 /// A virtual node s ystem that progresses user events and diffs UI trees.
@@ -312,16 +311,16 @@ impl VirtualDom {
             }
 
             if self.pending_messages.is_empty() {
-                if self.scopes.pending_futures.borrow().is_empty() {
-                    self.pending_messages
-                        .push_front(self.channel.1.next().await.unwrap());
-                } else {
+                if self.scopes.tasks.has_tasks() {
                     use futures_util::future::{select, Either};
 
                     match select(PollTasks(&mut self.scopes), self.channel.1.next()).await {
                         Either::Left((_, _)) => {}
                         Either::Right((msg, _)) => self.pending_messages.push_front(msg.unwrap()),
                     }
+                } else {
+                    self.pending_messages
+                        .push_front(self.channel.1.next().await.unwrap());
                 }
             }
 
@@ -345,8 +344,8 @@ impl VirtualDom {
 
     pub fn process_message(&mut self, msg: SchedulerMsg) {
         match msg {
-            SchedulerMsg::NewTask(id) => {
-                self.scopes.pending_futures.borrow_mut().insert(id);
+            SchedulerMsg::NewTask(_id) => {
+                // uh, not sure? I think end up re-polling it anyways
             }
             SchedulerMsg::Event(event) => {
                 if let Some(element) = event.element {
@@ -705,33 +704,20 @@ impl<'a> Future for PollTasks<'a> {
     fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
         let mut all_pending = true;
 
-        let mut unfinished_tasks: SmallVec<[_; 10]> = smallvec::smallvec![];
-        let mut scopes_to_clear: SmallVec<[_; 10]> = smallvec::smallvec![];
-
-        // Poll every scope manually
-        for fut in self.0.pending_futures.borrow().iter().copied() {
-            let scope = self.0.get_scope(fut).expect("Scope should never be moved");
+        let mut tasks = self.0.tasks.tasks.borrow_mut();
+        let mut to_remove = vec![];
 
-            let mut items = scope.items.borrow_mut();
-
-            // really this should just be retain_mut but that doesn't exist yet
-            while let Some(mut task) = items.tasks.pop() {
-                if task.as_mut().poll(cx).is_ready() {
-                    all_pending = false
-                } else {
-                    unfinished_tasks.push(task);
-                }
-            }
-
-            if unfinished_tasks.is_empty() {
-                scopes_to_clear.push(fut);
+        // this would be better served by retain
+        for (id, task) in tasks.iter_mut() {
+            if task.as_mut().poll(cx).is_ready() {
+                to_remove.push(*id);
+            } else {
+                all_pending = false;
             }
-
-            items.tasks.extend(unfinished_tasks.drain(..));
         }
 
-        for scope in scopes_to_clear {
-            self.0.pending_futures.borrow_mut().remove(&scope);
+        for id in to_remove {
+            tasks.remove(&id);
         }
 
         // Resolve the future if any singular task is ready

+ 1 - 1
packages/desktop/examples/async.rs

@@ -19,7 +19,7 @@ fn app(cx: Scope<()>) -> Element {
     let mut count = use_state(&cx, || 0);
     log::debug!("count is {:?}", count);
 
-    cx.push_task(|| async move {
+    cx.push_future(|| async move {
         tokio::time::sleep(Duration::from_millis(1000)).await;
         println!("count is now {:?}", count);
         count += 1;

+ 5 - 2
packages/hooks/src/lib.rs

@@ -10,5 +10,8 @@ pub use use_shared_state::*;
 mod usecoroutine;
 pub use usecoroutine::*;
 
-mod usemodel;
-pub use usemodel::*;
+mod usefuture;
+pub use usefuture::*;
+
+// mod usemodel;
+// pub use usemodel::*;

+ 8 - 11
packages/hooks/src/usecoroutine.rs

@@ -1,10 +1,6 @@
 use dioxus_core::ScopeState;
 use std::future::Future;
-use std::{
-    cell::{Cell, RefCell},
-    pin::Pin,
-    rc::Rc,
-};
+use std::{cell::Cell, pin::Pin, rc::Rc};
 /*
 
 
@@ -24,8 +20,7 @@ pub fn use_coroutine<'a, F>(
     create_future: impl FnOnce() -> F,
 ) -> CoroutineHandle<'a>
 where
-    F: Future<Output = ()>,
-    F: 'static,
+    F: Future<Output = ()> + 'static,
 {
     cx.use_hook(
         move |_| State {
@@ -35,11 +30,13 @@ where
         },
         |state| {
             let f = create_future();
-            state.pending_fut.set(Some(Box::pin(f)));
+            let id = cx.push_future(f);
 
-            if let Some(fut) = state.running_fut.as_mut() {
-                cx.push_task(fut);
-            }
+            // state.pending_fut.set(Some(Box::pin(f)));
+
+            // if let Some(fut) = state.running_fut.as_mut() {
+            //     cx.push_future(fut);
+            // }
 
             // if let Some(fut) = state.running_fut.take() {
             // state.running.set(true);

+ 34 - 0
packages/hooks/src/usefuture.rs

@@ -0,0 +1,34 @@
+use dioxus_core::{ScopeState, TaskId};
+use std::{cell::Cell, future::Future};
+
+pub fn use_future<'a, T: 'static, F: Future<Output = T>>(
+    cx: &'a ScopeState,
+    f: impl FnOnce() -> F,
+) -> FutureHandle<'a, T> {
+    cx.use_hook(
+        |_| {
+            //
+            UseFutureInner {
+                needs_regen: true,
+                task: None,
+            }
+        },
+        |_| {
+            //
+            FutureHandle {
+                cx,
+                value: Cell::new(None),
+            }
+        },
+    )
+}
+
+struct UseFutureInner {
+    needs_regen: bool,
+    task: Option<TaskId>,
+}
+
+pub struct FutureHandle<'a, T> {
+    cx: &'a ScopeState,
+    value: Cell<Option<T>>,
+}

+ 1 - 1
packages/web/examples/async.rs

@@ -17,7 +17,7 @@ fn main() {
 static App: Component<()> = |cx| {
     let mut count = use_state(&cx, || 0);
 
-    cx.push_task(|| async move {
+    cx.push_future(|| async move {
         TimeoutFuture::new(100).await;
         count += 1;
     });