Browse Source

wip: more work on scheduler

Jonathan Kelley 3 years ago
parent
commit
1cd5e69712

+ 1 - 1
examples/async.rs

@@ -28,7 +28,7 @@ pub static App: FC<()> = |cx| {
             }
             }
             button {
             button {
                 "Start counting"
                 "Start counting"
-                onclick: move |_| task.start()
+                onclick: move |_| task.resume()
             }
             }
             button {
             button {
                 "Switch counting direcion"
                 "Switch counting direcion"

+ 1 - 1
examples/reference/task.rs

@@ -47,7 +47,7 @@ pub static Example: FC<()> = |cx| {
             }
             }
             button {
             button {
                 "Start counting"
                 "Start counting"
-                onclick: move |_| task.start()
+                onclick: move |_| task.resume()
             }
             }
             button {
             button {
                 "Switch counting direcion"
                 "Switch counting direcion"

+ 1 - 1
packages/core/.vscode/settings.json

@@ -1,3 +1,3 @@
 {
 {
-  "rust-analyzer.inlayHints.enable": false
+  "rust-analyzer.inlayHints.enable": true
 }
 }

+ 10 - 13
packages/core/src/diff.rs

@@ -88,9 +88,8 @@
 //! More info on how to improve this diffing algorithm:
 //! More info on how to improve this diffing algorithm:
 //!  - https://hacks.mozilla.org/2019/03/fast-bump-allocated-virtual-doms-with-rust-and-wasm/
 //!  - https://hacks.mozilla.org/2019/03/fast-bump-allocated-virtual-doms-with-rust-and-wasm/
 
 
-use crate::{innerlude::*, scheduler::Scheduler};
+use crate::innerlude::*;
 use fxhash::{FxHashMap, FxHashSet};
 use fxhash::{FxHashMap, FxHashSet};
-use slab::Slab;
 use DomEdit::*;
 use DomEdit::*;
 
 
 /// Our DiffMachine is an iterative tree differ.
 /// Our DiffMachine is an iterative tree differ.
@@ -154,14 +153,7 @@ impl<'bump> DiffMachine<'bump> {
         }
         }
     }
     }
 
 
-    // pub fn new_headless(shared: &'bump SharedResources) -> Self {
-    //     let edits = Mutations::new();
-    //     let cur_scope = ScopeId(0);
-    //     Self::new(edits, cur_scope, shared)
-    // }
-
-    //
-    pub async fn diff_scope(&'bump mut self, id: ScopeId) {
+    pub fn diff_scope(&'bump mut self, id: ScopeId) {
         if let Some(component) = self.vdom.get_scope_mut(id) {
         if let Some(component) = self.vdom.get_scope_mut(id) {
             let (old, new) = (component.frames.wip_head(), component.frames.fin_head());
             let (old, new) = (component.frames.wip_head(), component.frames.fin_head());
             self.stack.push(DiffInstruction::DiffNode { new, old });
             self.stack.push(DiffInstruction::DiffNode { new, old });
@@ -173,7 +165,9 @@ impl<'bump> DiffMachine<'bump> {
     /// This method implements a depth-first iterative tree traversal.
     /// This method implements a depth-first iterative tree traversal.
     ///
     ///
     /// We do depth-first to maintain high cache locality (nodes were originally generated recursively).
     /// We do depth-first to maintain high cache locality (nodes were originally generated recursively).
-    pub async fn work(&mut self) {
+    ///
+    /// Returns a `bool` indicating that the work completed properly.
+    pub fn work(&mut self, deadline_expired: &mut impl FnMut() -> bool) -> bool {
         while let Some(instruction) = self.stack.pop() {
         while let Some(instruction) = self.stack.pop() {
             // defer to individual functions so the compiler produces better code
             // defer to individual functions so the compiler produces better code
             // large functions tend to be difficult for the compiler to work with
             // large functions tend to be difficult for the compiler to work with
@@ -193,9 +187,12 @@ impl<'bump> DiffMachine<'bump> {
                 DiffInstruction::PrepareMoveNode { node } => self.prepare_move_node(node),
                 DiffInstruction::PrepareMoveNode { node } => self.prepare_move_node(node),
             };
             };
 
 
-            // todo: call this less frequently, there is a bit of overhead involved
-            yield_now().await;
+            if deadline_expired() {
+                return false;
+            }
         }
         }
+
+        true
     }
     }
 
 
     fn prepare_move_node(&mut self, node: &'bump VNode<'bump>) {
     fn prepare_move_node(&mut self, node: &'bump VNode<'bump>) {

+ 105 - 62
packages/core/src/scheduler.rs

@@ -18,7 +18,7 @@ point if being "fast" if you can't also be "responsive."
 
 
 As such, Dioxus can manually decide on what work is most important at any given moment in time. With a properly tuned
 As such, Dioxus can manually decide on what work is most important at any given moment in time. With a properly tuned
 priority system, Dioxus can ensure that user interaction is prioritized and committed as soon as possible (sub 100ms).
 priority system, Dioxus can ensure that user interaction is prioritized and committed as soon as possible (sub 100ms).
-The controller responsible for this priority management is called the "scheduler" and is responsible for juggle many
+The controller responsible for this priority management is called the "scheduler" and is responsible for juggling many
 different types of work simultaneously.
 different types of work simultaneously.
 
 
 # How does it work?
 # How does it work?
@@ -155,7 +155,7 @@ impl Scheduler {
     pub fn new() -> Self {
     pub fn new() -> Self {
         /*
         /*
         Preallocate 2000 elements and 100 scopes to avoid dynamic allocation.
         Preallocate 2000 elements and 100 scopes to avoid dynamic allocation.
-        Perhaps this should be configurable?
+        Perhaps this should be configurable from some external config?
         */
         */
         let components = Rc::new(UnsafeCell::new(Slab::with_capacity(100)));
         let components = Rc::new(UnsafeCell::new(Slab::with_capacity(100)));
         let raw_elements = Rc::new(UnsafeCell::new(Slab::with_capacity(2000)));
         let raw_elements = Rc::new(UnsafeCell::new(Slab::with_capacity(2000)));
@@ -283,21 +283,14 @@ impl Scheduler {
 
 
     // nothing to do, no events on channels, no work
     // nothing to do, no events on channels, no work
     pub fn has_any_work(&self) -> bool {
     pub fn has_any_work(&self) -> bool {
-        self.has_work() || self.has_pending_events() || self.has_pending_garbage()
+        let pending_lanes = self.lanes.iter().find(|f| f.has_work()).is_some();
+        pending_lanes || self.has_pending_events()
     }
     }
 
 
     pub fn has_pending_events(&self) -> bool {
     pub fn has_pending_events(&self) -> bool {
         self.ui_events.len() > 0
         self.ui_events.len() > 0
     }
     }
 
 
-    pub fn has_work(&self) -> bool {
-        self.lanes.iter().find(|f| f.has_work()).is_some()
-    }
-
-    pub fn has_pending_garbage(&self) -> bool {
-        !self.garbage_scopes.is_empty()
-    }
-
     fn shift_priorities(&mut self) {
     fn shift_priorities(&mut self) {
         self.current_priority = match (
         self.current_priority = match (
             self.lanes[0].has_work(),
             self.lanes[0].has_work(),
@@ -312,6 +305,9 @@ impl Scheduler {
         };
         };
     }
     }
 
 
+    /// re-balance the work lanes, ensuring high-priority work properly bumps away low priority work
+    fn balance_lanes(&mut self) {}
+
     fn load_current_lane(&mut self) -> &mut PriorityLane {
     fn load_current_lane(&mut self) -> &mut PriorityLane {
         match self.current_priority {
         match self.current_priority {
             EventPriority::Immediate => todo!(),
             EventPriority::Immediate => todo!(),
@@ -335,6 +331,29 @@ impl Scheduler {
         }
         }
     }
     }
 
 
+    /// Work the scheduler down, not polling any ongoing tasks.
+    ///
+    /// Will use the standard priority-based scheduling, batching, etc, but just won't interact with the async reactor.
+    pub fn work_sync<'a>(&'a mut self) -> Vec<Mutations<'a>> {
+        let mut committed_mutations = Vec::<Mutations<'static>>::new();
+
+        // Internalize any pending work since the last time we ran
+        self.manually_poll_events();
+
+        if !self.has_any_work() {
+            self.pool.clean_up_garbage();
+        }
+
+        while self.has_any_work() {
+            // do work
+
+            // Create work from the pending event queue
+            self.consume_pending_events();
+        }
+
+        committed_mutations
+    }
+
     /// The primary workhorse of the VirtualDOM.
     /// The primary workhorse of the VirtualDOM.
     ///
     ///
     /// Uses some fairly complex logic to schedule what work should be produced.
     /// Uses some fairly complex logic to schedule what work should be produced.
@@ -342,7 +361,7 @@ impl Scheduler {
     /// Returns a list of successful mutations.
     /// Returns a list of successful mutations.
     pub async fn work_with_deadline<'a>(
     pub async fn work_with_deadline<'a>(
         &'a mut self,
         &'a mut self,
-        mut deadline: Pin<Box<impl FusedFuture<Output = ()>>>,
+        mut deadline_reached: Pin<Box<impl FusedFuture<Output = ()>>>,
     ) -> Vec<Mutations<'a>> {
     ) -> Vec<Mutations<'a>> {
         /*
         /*
         Strategy:
         Strategy:
@@ -374,7 +393,7 @@ impl Scheduler {
             // Wait for any new events if we have nothing to do
             // Wait for any new events if we have nothing to do
             if !self.has_any_work() {
             if !self.has_any_work() {
                 self.pool.clean_up_garbage();
                 self.pool.clean_up_garbage();
-                let deadline_expired = self.wait_for_any_trigger(&mut deadline).await;
+                let deadline_expired = self.wait_for_any_trigger(&mut deadline_reached).await;
 
 
                 if deadline_expired {
                 if deadline_expired {
                     return committed_mutations;
                     return committed_mutations;
@@ -384,63 +403,71 @@ impl Scheduler {
             // Create work from the pending event queue
             // Create work from the pending event queue
             self.consume_pending_events();
             self.consume_pending_events();
 
 
-            // Work through the current subtree, and commit the results when it finishes
-            // When the deadline expires, give back the work
+            // shift to the correct lane
             self.shift_priorities();
             self.shift_priorities();
 
 
-            let saved_state = self.load_work();
-
-            // We have to split away some parts of ourself - current lane is borrowed mutably
-            let mut shared = self.pool.clone();
-            let mut machine = unsafe { saved_state.promote(&mut shared) };
+            let mut deadline_reached = || (&mut deadline_reached).now_or_never().is_some();
 
 
-            if machine.stack.is_empty() {
-                let shared = self.pool.clone();
-                self.current_lane().dirty_scopes.sort_by(|a, b| {
-                    let h1 = shared.get_scope(*a).unwrap().height;
-                    let h2 = shared.get_scope(*b).unwrap().height;
-                    h1.cmp(&h2)
-                });
+            let finished_before_deadline =
+                self.work_on_current_lane(&mut deadline_reached, &mut committed_mutations);
 
 
-                if let Some(scope) = self.current_lane().dirty_scopes.pop() {
-                    let component = self.pool.get_scope(scope).unwrap();
-                    let (old, new) = (component.frames.wip_head(), component.frames.fin_head());
-                    machine.stack.push(DiffInstruction::DiffNode { new, old });
-                }
+            if !finished_before_deadline {
+                break;
             }
             }
+        }
 
 
-            let completed = {
-                let fut = machine.work();
-                pin_mut!(fut);
-                use futures_util::future::{select, Either};
-                match select(fut, &mut deadline).await {
-                    Either::Left((_, _other)) => true,
-                    Either::Right((_, _other)) => false,
-                }
-            };
+        committed_mutations
+    }
 
 
-            let machine: DiffMachine<'static> = unsafe { std::mem::transmute(machine) };
-            let mut saved = machine.save();
+    // returns true if the lane is finished
+    pub fn work_on_current_lane(
+        &mut self,
+        deadline_reached: &mut impl FnMut() -> bool,
+        mutations: &mut Vec<Mutations>,
+    ) -> bool {
+        // Work through the current subtree, and commit the results when it finishes
+        // When the deadline expires, give back the work
+        let saved_state = self.load_work();
+
+        // We have to split away some parts of ourself - current lane is borrowed mutably
+        let mut shared = self.pool.clone();
+        let mut machine = unsafe { saved_state.promote(&mut shared) };
+
+        if machine.stack.is_empty() {
+            let shared = self.pool.clone();
+            self.current_lane().dirty_scopes.sort_by(|a, b| {
+                let h1 = shared.get_scope(*a).unwrap().height;
+                let h2 = shared.get_scope(*b).unwrap().height;
+                h1.cmp(&h2)
+            });
+
+            if let Some(scope) = self.current_lane().dirty_scopes.pop() {
+                let component = self.pool.get_scope(scope).unwrap();
+                let (old, new) = (component.frames.wip_head(), component.frames.fin_head());
+                machine.stack.push(DiffInstruction::DiffNode { new, old });
+            }
+        }
 
 
-            if completed {
-                for node in saved.seen_scopes.drain() {
-                    self.current_lane().dirty_scopes.remove(&node);
-                }
+        let deadline_expired = machine.work(deadline_reached);
 
 
-                let mut new_mutations = Mutations::new();
-                std::mem::swap(&mut new_mutations, &mut saved.mutations);
+        let machine: DiffMachine<'static> = unsafe { std::mem::transmute(machine) };
+        let mut saved = machine.save();
 
 
-                committed_mutations.push(new_mutations);
+        if deadline_expired {
+            self.save_work(saved);
+            false
+        } else {
+            for node in saved.seen_scopes.drain() {
+                self.current_lane().dirty_scopes.remove(&node);
             }
             }
 
 
-            self.save_work(saved);
+            let mut new_mutations = Mutations::new();
+            std::mem::swap(&mut new_mutations, &mut saved.mutations);
 
 
-            if !completed {
-                break;
-            }
+            mutations.push(new_mutations);
+            self.save_work(saved);
+            true
         }
         }
-
-        committed_mutations
     }
     }
 
 
     // waits for a trigger, canceling early if the deadline is reached
     // waits for a trigger, canceling early if the deadline is reached
@@ -537,16 +564,32 @@ impl TaskHandle {
     /// Toggles this coroutine off/on.
     /// Toggles this coroutine off/on.
     ///
     ///
     /// This method is not synchronous - your task will not stop immediately.
     /// This method is not synchronous - your task will not stop immediately.
-    pub fn toggle(&self) {}
+    pub fn toggle(&self) {
+        self.sender
+            .unbounded_send(SchedulerMsg::ToggleTask(self.our_id))
+            .unwrap()
+    }
 
 
     /// This method is not synchronous - your task will not stop immediately.
     /// This method is not synchronous - your task will not stop immediately.
-    pub fn start(&self) {}
+    pub fn resume(&self) {
+        self.sender
+            .unbounded_send(SchedulerMsg::ResumeTask(self.our_id))
+            .unwrap()
+    }
 
 
     /// This method is not synchronous - your task will not stop immediately.
     /// This method is not synchronous - your task will not stop immediately.
-    pub fn stop(&self) {}
+    pub fn stop(&self) {
+        self.sender
+            .unbounded_send(SchedulerMsg::ToggleTask(self.our_id))
+            .unwrap()
+    }
 
 
     /// This method is not synchronous - your task will not stop immediately.
     /// This method is not synchronous - your task will not stop immediately.
-    pub fn restart(&self) {}
+    pub fn restart(&self) {
+        self.sender
+            .unbounded_send(SchedulerMsg::ToggleTask(self.our_id))
+            .unwrap()
+    }
 }
 }
 
 
 #[derive(serde::Serialize, serde::Deserialize, Copy, Clone, PartialEq, Eq, Hash, Debug)]
 #[derive(serde::Serialize, serde::Deserialize, Copy, Clone, PartialEq, Eq, Hash, Debug)]
@@ -589,9 +632,9 @@ impl ElementId {
 /// flushed before proceeding. Multiple discrete events is highly unlikely, though.
 /// flushed before proceeding. Multiple discrete events is highly unlikely, though.
 #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash, PartialOrd, Ord)]
 #[derive(Debug, PartialEq, Eq, Clone, Copy, Hash, PartialOrd, Ord)]
 pub enum EventPriority {
 pub enum EventPriority {
-    /// Work that must be completed during the EventHandler phase
-    ///
+    /// Work that must be completed during the EventHandler phase.
     ///
     ///
+    /// Currently this is reserved for controlled inputs.
     Immediate = 3,
     Immediate = 3,
 
 
     /// "High Priority" work will not interrupt other high priority work, but will interrupt medium and low priority work.
     /// "High Priority" work will not interrupt other high priority work, but will interrupt medium and low priority work.

+ 17 - 26
packages/core/src/virtual_dom.rs

@@ -209,13 +209,14 @@ impl VirtualDom {
             .get_scope_mut(self.base_scope)
             .get_scope_mut(self.base_scope)
             .expect("The base scope should never be moved");
             .expect("The base scope should never be moved");
 
 
-        // // We run the component. If it succeeds, then we can diff it and add the changes to the dom.
+        // We run the component. If it succeeds, then we can diff it and add the changes to the dom.
         if cur_component.run_scope(&self.scheduler.pool) {
         if cur_component.run_scope(&self.scheduler.pool) {
             diff_machine
             diff_machine
                 .stack
                 .stack
                 .create_node(cur_component.frames.fin_head(), MountType::Append);
                 .create_node(cur_component.frames.fin_head(), MountType::Append);
             diff_machine.stack.scope_stack.push(self.base_scope);
             diff_machine.stack.scope_stack.push(self.base_scope);
-            diff_machine.work().await;
+
+            // let completed = diff_machine.work();
         } else {
         } else {
             // todo: should this be a hard error?
             // todo: should this be a hard error?
             log::warn!(
             log::warn!(
@@ -227,19 +228,7 @@ impl VirtualDom {
         unsafe { std::mem::transmute(diff_machine.mutations) }
         unsafe { std::mem::transmute(diff_machine.mutations) }
     }
     }
 
 
-    pub fn diff_sync<'s>(&'s mut self) -> Mutations<'s> {
-        let mut fut = self.diff_async().boxed_local();
-
-        loop {
-            if let Some(edits) = (&mut fut).now_or_never() {
-                break edits;
-            }
-        }
-    }
-
-    pub async fn diff_async<'s>(&'s mut self) -> Mutations<'s> {
-        let mut diff_machine = DiffMachine::new(Mutations::new(), todo!());
-
+    pub fn diff<'s>(&'s mut self) -> Mutations<'s> {
         let cur_component = self
         let cur_component = self
             .scheduler
             .scheduler
             .pool
             .pool
@@ -247,22 +236,24 @@ impl VirtualDom {
             .expect("The base scope should never be moved");
             .expect("The base scope should never be moved");
 
 
         if cur_component.run_scope(&self.scheduler.pool) {
         if cur_component.run_scope(&self.scheduler.pool) {
-            diff_machine.diff_scope(self.base_scope).await;
+            let mut diff_machine = DiffMachine::new(Mutations::new(), todo!());
+            diff_machine.diff_scope(self.base_scope);
+            diff_machine.mutations
+        } else {
+            Mutations::new()
         }
         }
-
-        diff_machine.mutations
     }
     }
 
 
     /// Runs the virtualdom immediately, not waiting for any suspended nodes to complete.
     /// Runs the virtualdom immediately, not waiting for any suspended nodes to complete.
     ///
     ///
-    /// This method will not wait for any suspended nodes to complete.
-    pub fn run_immediate<'s>(&'s mut self) -> Mutations<'s> {
-        todo!()
-        // use futures_util::FutureExt;
-        // let mut is_ready = || false;
-        // self.run_with_deadline(futures_util::future::ready(()), &mut is_ready)
-        //     .now_or_never()
-        //     .expect("this future will always resolve immediately")
+    /// This method will not wait for any suspended nodes to complete. If there is no pending work, then this method will
+    /// return "None"
+    pub fn run_immediate<'s>(&'s mut self) -> Option<Vec<Mutations<'s>>> {
+        if self.scheduler.has_any_work() {
+            Some(self.scheduler.work_sync())
+        } else {
+            None
+        }
     }
     }
 
 
     /// Run the virtualdom with a deadline.
     /// Run the virtualdom with a deadline.