소스 검색

chore: use existing scheduler

Jonathan Kelley 2 년 전
부모
커밋
daa5449b6b

+ 3 - 4
packages/core/src/scheduler/mod.rs

@@ -1,5 +1,4 @@
 use crate::ScopeId;
-use futures_util::stream::FuturesUnordered;
 use slab::Slab;
 
 mod suspense;
@@ -24,13 +23,13 @@ pub(crate) enum SchedulerMsg {
     SuspenseNotified(SuspenseId),
 }
 
-use std::{cell::RefCell, rc::Rc, sync::Arc};
+use std::{cell::RefCell, rc::Rc};
 
 pub(crate) struct Scheduler {
     pub sender: futures_channel::mpsc::UnboundedSender<SchedulerMsg>,
 
     /// Tasks created with cx.spawn
-    pub tasks: RefCell<FuturesUnordered<LocalTask>>,
+    pub tasks: RefCell<Slab<LocalTask>>,
 
     /// Async components
     pub leaves: RefCell<Slab<SuspenseLeaf>>,
@@ -40,7 +39,7 @@ impl Scheduler {
     pub fn new(sender: futures_channel::mpsc::UnboundedSender<SchedulerMsg>) -> Rc<Self> {
         Rc::new(Scheduler {
             sender,
-            tasks: RefCell::new(FuturesUnordered::new()),
+            tasks: RefCell::new(Slab::new()),
             leaves: RefCell::new(Slab::new()),
         })
     }

+ 2 - 3
packages/core/src/scheduler/suspense.rs

@@ -5,6 +5,7 @@ use crate::ElementId;
 use crate::{innerlude::Mutations, Element, ScopeId};
 use std::future::Future;
 use std::sync::Arc;
+use std::task::Waker;
 use std::{
     cell::{Cell, RefCell},
     collections::HashSet,
@@ -37,12 +38,10 @@ impl SuspenseContext {
 }
 
 pub(crate) struct SuspenseLeaf {
-    pub(crate) id: SuspenseId,
     pub(crate) scope_id: ScopeId,
-    pub(crate) tx: futures_channel::mpsc::UnboundedSender<SchedulerMsg>,
     pub(crate) notified: Cell<bool>,
     pub(crate) task: *mut dyn Future<Output = Element<'static>>,
-    pub(crate) waker: Arc<SuspenseHandle>,
+    pub(crate) waker: Waker,
 }
 
 pub struct SuspenseHandle {

+ 6 - 21
packages/core/src/scheduler/task.rs

@@ -1,5 +1,4 @@
 use futures_util::task::ArcWake;
-use futures_util::FutureExt;
 
 use super::{Scheduler, SchedulerMsg};
 use crate::ScopeId;
@@ -21,20 +20,8 @@ pub struct TaskId(pub usize);
 pub(crate) struct LocalTask {
     pub scope: ScopeId,
     pub(super) task: RefCell<Pin<Box<dyn Future<Output = ()> + 'static>>>,
-    id: TaskId,
-    tx: futures_channel::mpsc::UnboundedSender<SchedulerMsg>,
     pub waker: Waker,
 }
-impl Future for LocalTask {
-    type Output = ();
-
-    fn poll(
-        self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Self::Output> {
-        self.task.borrow_mut().poll_unpin(cx)
-    }
-}
 
 impl Scheduler {
     /// Start a new future on the same thread as the rest of the VirtualDom.
@@ -47,12 +34,12 @@ impl Scheduler {
     /// Spawning a future onto the root scope will cause it to be dropped when the root component is dropped - which
     /// will only occur when the VirtuaalDom itself has been dropped.
     pub fn spawn(&self, scope: ScopeId, task: impl Future<Output = ()> + 'static) -> TaskId {
-        // let entry = tasks.vacant_entry();
-        let task_id = TaskId(0);
+        let mut tasks = self.tasks.borrow_mut();
+
+        let entry = tasks.vacant_entry();
+        let task_id = TaskId(entry.key());
 
         let task = LocalTask {
-            id: task_id,
-            tx: self.sender.clone(),
             task: RefCell::new(Box::pin(task)),
             scope,
             waker: futures_util::task::waker(Arc::new(LocalTaskHandle {
@@ -61,9 +48,7 @@ impl Scheduler {
             })),
         };
 
-        self.tasks.borrow_mut().push(task);
-
-        // println!("Spawning task: {:?}", task_id);
+        entry.insert(task);
 
         self.sender
             .unbounded_send(SchedulerMsg::TaskNotified(task_id))
@@ -76,7 +61,7 @@ impl Scheduler {
     ///
     /// This does nto abort the task, so you'll want to wrap it in an aborthandle if that's important to you
     pub fn remove(&self, id: TaskId) {
-        // self.tasks.borrow_mut().remove(id.0);
+        self.tasks.borrow_mut().remove(id.0);
     }
 }
 

+ 77 - 86
packages/core/src/scheduler/wait.rs

@@ -1,4 +1,3 @@
-use futures_util::task::ArcWake;
 use futures_util::FutureExt;
 use std::{
     rc::Rc,
@@ -21,27 +20,22 @@ impl VirtualDom {
     pub(crate) fn handle_task_wakeup(&mut self, id: TaskId) {
         let mut tasks = self.scheduler.tasks.borrow_mut();
 
-        // let task = match tasks.get(id.0) {
-        //     Some(task) => task,
-        //     None => {
-        //         // The task was removed from the scheduler, so we can just ignore it
-        //         return;
-        //     }
-        // };
-
-        // let mut cx = Context::from_waker(&task.waker);
-
-        // // If the task completes...
-        // println!("polling task");
-        // if task.task.borrow_mut().as_mut().poll(&mut cx).is_ready() {
-        //     // Remove it from the scope so we dont try to double drop it when the scope dropes
-        //     self.scopes[task.scope.0].spawned_tasks.remove(&id);
-
-        //     // Remove it from the scheduler
-        //     tasks.remove(id.0);
-        // } else {
-        //     println!("task not ready yet, but we gave it a handle to the waker");
-        // }
+        let task = match tasks.get(id.0) {
+            Some(task) => task,
+            // The task was removed from the scheduler, so we can just ignore it
+            None => return,
+        };
+
+        let mut cx = Context::from_waker(&task.waker);
+
+        // If the task completes...
+        if task.task.borrow_mut().as_mut().poll(&mut cx).is_ready() {
+            // Remove it from the scope so we dont try to double drop it when the scope dropes
+            self.scopes[task.scope.0].spawned_tasks.remove(&id);
+
+            // Remove it from the scheduler
+            tasks.remove(id.0);
+        }
     }
 
     pub(crate) fn acquire_suspense_boundary(&self, id: ScopeId) -> Rc<SuspenseContext> {
@@ -51,69 +45,66 @@ impl VirtualDom {
     }
 
     pub(crate) fn handle_suspense_wakeup(&mut self, id: SuspenseId) {
-        // let leaf = self
-        //     .scheduler
-        //     .leaves
-        //     .borrow_mut()
-        //     .get(id.0)
-        //     .unwrap()
-        //     .clone();
-
-        // let scope_id = leaf.scope_id;
-
-        // // todo: cache the waker
-        // let waker = leaf.waker();
-        // let mut cx = Context::from_waker(&waker);
-
-        // // Safety: the future is always pinned to the bump arena
-        // let mut pinned = unsafe { std::pin::Pin::new_unchecked(&mut *leaf.task) };
-        // let as_pinned_mut = &mut pinned;
-
-        // // the component finished rendering and gave us nodes
-        // // we should attach them to that component and then render its children
-        // // continue rendering the tree until we hit yet another suspended component
-        // if let Poll::Ready(new_nodes) = as_pinned_mut.poll_unpin(&mut cx) {
-        //     let fiber = self.acquire_suspense_boundary(leaf.scope_id);
-
-        //     let scope = &mut self.scopes[scope_id.0];
-        //     let arena = scope.current_frame();
-
-        //     let ret = arena.bump.alloc(match new_nodes {
-        //         Some(new) => RenderReturn::Ready(new),
-        //         None => RenderReturn::default(),
-        //     });
-
-        //     arena.node.set(ret);
-
-        //     fiber.waiting_on.borrow_mut().remove(&id);
-
-        //     if let RenderReturn::Ready(template) = ret {
-        //         let mutations_ref = &mut fiber.mutations.borrow_mut();
-        //         let mutations = &mut **mutations_ref;
-        //         let template: &VNode = unsafe { std::mem::transmute(template) };
-        //         let mutations: &mut Mutations = unsafe { std::mem::transmute(mutations) };
-
-        //         std::mem::swap(&mut self.mutations, mutations);
-
-        //         let place_holder_id = scope.placeholder.get().unwrap();
-        //         self.scope_stack.push(scope_id);
-        //         let created = self.create(template);
-        //         self.scope_stack.pop();
-        //         mutations.push(Mutation::ReplaceWith {
-        //             id: place_holder_id,
-        //             m: created,
-        //         });
-
-        //         for leaf in self.collected_leaves.drain(..) {
-        //             fiber.waiting_on.borrow_mut().insert(leaf);
-        //         }
-
-        //         std::mem::swap(&mut self.mutations, mutations);
-
-        //         if fiber.waiting_on.borrow().is_empty() {
-        //             self.finished_fibers.push(fiber.id);
-        //         }
-        //     }
-        // }
+        let leaves = self.scheduler.leaves.borrow_mut();
+        let leaf = leaves.get(id.0).unwrap();
+
+        let scope_id = leaf.scope_id;
+
+        // todo: cache the waker
+        let mut cx = Context::from_waker(&leaf.waker);
+
+        // Safety: the future is always pinned to the bump arena
+        let mut pinned = unsafe { std::pin::Pin::new_unchecked(&mut *leaf.task) };
+        let as_pinned_mut = &mut pinned;
+
+        // the component finished rendering and gave us nodes
+        // we should attach them to that component and then render its children
+        // continue rendering the tree until we hit yet another suspended component
+        if let Poll::Ready(new_nodes) = as_pinned_mut.poll_unpin(&mut cx) {
+            let fiber = self.acquire_suspense_boundary(leaf.scope_id);
+
+            let scope = &mut self.scopes[scope_id.0];
+            let arena = scope.current_frame();
+
+            let ret = arena.bump.alloc(match new_nodes {
+                Some(new) => RenderReturn::Ready(new),
+                None => RenderReturn::default(),
+            });
+
+            arena.node.set(ret);
+
+            fiber.waiting_on.borrow_mut().remove(&id);
+
+            if let RenderReturn::Ready(template) = ret {
+                let mutations_ref = &mut fiber.mutations.borrow_mut();
+                let mutations = &mut **mutations_ref;
+                let template: &VNode = unsafe { std::mem::transmute(template) };
+                let mutations: &mut Mutations = unsafe { std::mem::transmute(mutations) };
+
+                std::mem::swap(&mut self.mutations, mutations);
+
+                let place_holder_id = scope.placeholder.get().unwrap();
+                self.scope_stack.push(scope_id);
+
+                drop(leaves);
+
+                let created = self.create(template);
+                self.scope_stack.pop();
+                mutations.push(Mutation::ReplaceWith {
+                    id: place_holder_id,
+                    m: created,
+                });
+
+                for leaf in self.collected_leaves.drain(..) {
+                    fiber.waiting_on.borrow_mut().insert(leaf);
+                }
+
+                std::mem::swap(&mut self.mutations, mutations);
+
+                if fiber.waiting_on.borrow().is_empty() {
+                    self.finished_fibers.push(fiber.id);
+                }
+            }
+        }
     }
 }

+ 3 - 6
packages/core/src/scope_arena.rs

@@ -92,17 +92,14 @@ impl VirtualDom {
             let leaf = SuspenseLeaf {
                 scope_id,
                 task: task.as_mut(),
-                id: suspense_id,
-                tx: self.scheduler.sender.clone(),
                 notified: Default::default(),
-                waker: Arc::new(SuspenseHandle {
+                waker: futures_util::task::waker(Arc::new(SuspenseHandle {
                     id: suspense_id,
                     tx: self.scheduler.sender.clone(),
-                }),
+                })),
             };
 
-            let waker = futures_util::task::waker(leaf.waker.clone());
-            let mut cx = Context::from_waker(&waker);
+            let mut cx = Context::from_waker(&leaf.waker);
 
             // safety: the task is already pinned in the bump arena
             let mut pinned = unsafe { Pin::new_unchecked(task.as_mut()) };

+ 12 - 10
packages/core/src/virtual_dom.rs

@@ -437,16 +437,18 @@ impl VirtualDom {
                                 return;
                             }
 
-                            let res = (&mut *self.scheduler.tasks.borrow_mut()).next().await;
-
-                            if res.is_none() {
-                                // If we have no tasks, then we should wait for a message
-                                if let Some(msg) = self.rx.next().await {
-                                    some_msg = Some(msg);
-                                } else {
-                                    return;
-                                }
-                            }
+                            some_msg = self.rx.next().await
+
+                            // let res = (&mut *self.scheduler.tasks.borrow_mut()).next().await;
+
+                            // if res.is_none() {
+                            //     // If we have no tasks, then we should wait for a message
+                            //     if let Some(msg) = self.rx.next().await {
+                            //         some_msg = Some(msg);
+                            //     } else {
+                            //         return;
+                            //     }
+                            // }
                         }
                     }
                 }