Explorar o código

wip: reactive context instead of effect

Jonathan Kelley hai 1 ano
pai
achega
968f24a7b3

+ 2 - 0
Cargo.lock

@@ -2476,6 +2476,7 @@ version = "0.4.3"
 dependencies = [
  "dioxus",
  "dioxus-ssr",
+ "flume",
  "futures-channel",
  "futures-util",
  "longest-increasing-subsequence",
@@ -2883,6 +2884,7 @@ version = "0.4.3"
 dependencies = [
  "dioxus",
  "dioxus-core",
+ "flume",
  "futures-channel",
  "futures-util",
  "generational-box",

+ 1 - 0
packages/core/Cargo.toml

@@ -20,6 +20,7 @@ slab = { workspace = true }
 futures-channel = { workspace = true }
 tracing = { workspace = true }
 serde = { version = "1", features = ["derive"], optional = true }
+flume = { version = "0.11.0", default-features = false, features = ["async"]}
 
 
 [dev-dependencies]

+ 4 - 28
packages/core/src/global_context.rs

@@ -252,34 +252,10 @@ pub fn after_render(f: impl FnMut() + 'static) {
 /// Effects rely on this to ensure that they only run effects after the DOM has been updated. Without flush_sync effects
 /// are run immediately before diffing the DOM, which causes all sorts of out-of-sync weirdness.
 pub async fn flush_sync() {
-    let mut polled = false;
-
-    let _task =
-        FlushKey(Runtime::with(|rt| rt.add_to_flush_table()).expect("to be in a dioxus runtime"));
-
-    // Poll without giving the waker to anyone
-    // The runtime will manually wake this task up when it's ready
-    poll_fn(|_| {
-        if !polled {
-            polled = true;
-            futures_util::task::Poll::Pending
-        } else {
-            futures_util::task::Poll::Ready(())
-        }
-    })
-    .await;
-
-    // If the the future got polled, then we don't need to prevent it from being dropped
-    // If we had generational indicies on tasks we could simply let the task remain in the queue and just be a no-op
-    // when it's run
-    std::mem::forget(_task);
-
-    struct FlushKey(Task);
-    impl Drop for FlushKey {
-        fn drop(&mut self) {
-            Runtime::with(|rt| rt.flush_table.borrow_mut().remove(&self.0));
-        }
-    }
+    _ = Runtime::with(|rt| rt.flush.clone())
+        .unwrap()
+        .recv_async()
+        .await;
 }
 
 /// Use a hook with a cleanup function

+ 6 - 3
packages/core/src/runtime.rs

@@ -33,19 +33,22 @@ pub struct Runtime {
     pub(crate) sender: futures_channel::mpsc::UnboundedSender<SchedulerMsg>,
 
     // Tasks waiting to be manually resumed when we call wait_for_work
-    pub(crate) flush_table: RefCell<FxHashSet<Task>>,
+    pub(crate) flush: flume::Receiver<()>,
 }
 
 impl Runtime {
-    pub(crate) fn new(sender: futures_channel::mpsc::UnboundedSender<SchedulerMsg>) -> Rc<Self> {
+    pub(crate) fn new(
+        sender: futures_channel::mpsc::UnboundedSender<SchedulerMsg>,
+        flush: flume::Receiver<()>,
+    ) -> Rc<Self> {
         Rc::new(Self {
             sender,
+            flush,
             rendering: Cell::new(true),
             scope_states: Default::default(),
             scope_stack: Default::default(),
             current_task: Default::default(),
             tasks: Default::default(),
-            flush_table: Default::default(),
         })
     }
 

+ 0 - 7
packages/core/src/tasks.rs

@@ -122,13 +122,6 @@ impl Runtime {
         self.tasks.borrow().get(task.0)?.parent
     }
 
-    /// Add this task to the queue of tasks that will manually get poked when the scheduler is flushed
-    pub(crate) fn add_to_flush_table(&self) -> Task {
-        let value = self.current_task().unwrap();
-        self.flush_table.borrow_mut().insert(value);
-        value
-    }
-
     pub(crate) fn handle_task_wakeup(&self, id: Task) {
         debug_assert!(Runtime::current().is_some(), "Must be in a dioxus runtime");
 

+ 6 - 14
packages/core/src/virtual_dom.rs

@@ -202,6 +202,7 @@ pub struct VirtualDom {
     pub(crate) suspended_scopes: FxHashSet<ScopeId>,
 
     rx: futures_channel::mpsc::UnboundedReceiver<SchedulerMsg>,
+    flush_tx: flume::Sender<()>,
 }
 
 impl VirtualDom {
@@ -305,10 +306,12 @@ impl VirtualDom {
     /// ```
     pub(crate) fn new_with_component(root: impl AnyProps + 'static) -> Self {
         let (tx, rx) = futures_channel::mpsc::unbounded();
+        let (flush_tx, flush_rx) = flume::unbounded(); // I don't think this needs to be unbounded
 
         let mut dom = Self {
             rx,
-            runtime: Runtime::new(tx),
+            flush_tx,
+            runtime: Runtime::new(tx, flush_rx),
             scopes: Default::default(),
             dirty_scopes: Default::default(),
             templates: Default::default(),
@@ -419,8 +422,8 @@ impl VirtualDom {
     /// let dom = VirtualDom::new(app);
     /// ```
     pub async fn wait_for_work(&mut self) {
-        // Ping tasks waiting on the flush table - they're waiting for sync stuff to be done before progressing
-        self.clear_flush_table();
+        // Send the flush signal to the runtime
+        _ = self.flush_tx.try_send(());
 
         // And then poll the futures
         self.poll_tasks().await;
@@ -448,17 +451,6 @@ impl VirtualDom {
         }
     }
 
-    fn clear_flush_table(&mut self) {
-        // Make sure we set the runtime since we're running user code
-        let _runtime = RuntimeGuard::new(self.runtime.clone());
-
-        // Manually flush tasks that called `flush().await`
-        // Tasks that might've been waiting for `flush` finally have a chance to run to their next await point
-        for task in self.runtime.flush_table.take() {
-            self.runtime.handle_task_wakeup(task);
-        }
-    }
-
     /// Process all events in the queue until there are no more left
     pub fn process_events(&mut self) {
         let _runtime = RuntimeGuard::new(self.runtime.clone());

+ 1 - 0
packages/signals/Cargo.toml

@@ -22,6 +22,7 @@ once_cell = "1.18.0"
 rustc-hash = { workspace = true }
 futures-channel = { workspace = true }
 futures-util = { workspace = true }
+flume = { version = "0.11.0", default-features = false, features = ["async"] }
 
 [dev-dependencies]
 dioxus = { workspace = true }

+ 84 - 0
packages/signals/src/rc.rs

@@ -0,0 +1,84 @@
+use dioxus_core::prelude::ScopeId;
+use generational_box::{GenerationalBoxId, SyncStorage};
+use rustc_hash::FxHashSet;
+use std::{cell::RefCell, hash::Hash};
+
+use crate::{CopyValue, Readable};
+
+/// A context for signal reads and writes to be directed to
+///
+/// When a signal calls .read(), it will look for the current ReactiveContext to read from.
+/// If it doesn't find it, then it will try and insert a context into the nearest component scope via context api.
+///
+/// When the ReactiveContext drops, it will remove itself from the the associated contexts attached to signal
+#[derive(Clone, Copy, PartialEq)]
+pub struct ReactiveContext {
+    // todo: we dont need to use syncstorage per say
+    inner: CopyValue<Inner, SyncStorage>,
+}
+
+thread_local! {
+    static CURRENT: RefCell<Vec<ReactiveContext>> = RefCell::new(vec![]);
+}
+
+impl ReactiveContext {
+    /// Get the current reactive context
+    ///
+    /// If this was set manually, then that value will be returned.
+    ///
+    /// If there's no current reactive context, then a new one will be created at the current scope and returned.
+    pub fn current() -> Self {
+        todo!()
+    }
+
+    /// Run this function in the context of this reactive context
+    ///
+    /// This will set the current reactive context to this context for the duration of the function.
+    /// You can then get information about the current subscriptions.
+    pub fn run_in(&self, f: impl FnOnce()) {
+        todo!()
+    }
+
+    /// Marks this reactive context as dirty
+    ///
+    /// If there's a scope associated with this context, then it will be marked as dirty too
+    pub fn mark_dirty(&self) {}
+
+    /// Clear all subscribers from this reactive context
+    pub fn clear_subscribers(&self) {
+        todo!()
+    }
+
+    /// Wait for this reactive context to change
+    pub async fn changed(&self) {
+        let waiter = self.inner.read().waiters.1.clone();
+
+        _ = waiter.recv_async().await;
+    }
+}
+
+impl Hash for ReactiveContext {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.inner.id().hash(state);
+    }
+}
+
+struct Inner {
+    // Set of signals bound to this context
+    subscribers: FxHashSet<GenerationalBoxId>,
+
+    // The scope that this context is associated with
+    // This is only relevant when RC is being used to call update on a signal
+    scope: Option<ScopeId>,
+
+    // Futures will call .changed().await
+    waiters: (flume::Sender<()>, flume::Receiver<()>),
+}
+
+impl Inner {}
+
+impl Drop for Inner {
+    fn drop(&mut self) {
+        todo!()
+    }
+}

+ 74 - 64
packages/signals/src/signal.rs

@@ -1,6 +1,7 @@
 use crate::{
     get_effect_ref, read::Readable, write::Writable, CopyValue, Effect, EffectInner,
-    EffectStackRef, GlobalMemo, GlobalSignal, MappedSignal, ReadOnlySignal, EFFECT_STACK,
+    EffectStackRef, GlobalMemo, GlobalSignal, MappedSignal, ReactiveContext,
+    ReactiveContextProvider, ReadSignal, EFFECT_STACK,
 };
 use dioxus_core::{
     prelude::{
@@ -13,6 +14,7 @@ use parking_lot::RwLock;
 use std::{
     any::Any,
     cell::RefCell,
+    collections::HashSet,
     ops::{Deref, DerefMut},
     rc::Rc,
     sync::Arc,
@@ -59,17 +61,18 @@ pub type SyncSignal<T> = Signal<T, SyncStorage>;
 
 /// The data stored for tracking in a signal.
 pub struct SignalData<T> {
-    pub(crate) subscribers: Arc<RwLock<SignalSubscribers>>,
+    pub(crate) subscribers: Arc<RwLock<HashSet<ReactiveContext>>>,
     pub(crate) update_any: Arc<dyn Fn(ScopeId) + Sync + Send>,
     pub(crate) effect_ref: EffectStackRef,
     pub(crate) value: T,
 }
 
-#[derive(Default)]
-pub(crate) struct SignalSubscribers {
-    pub(crate) subscribers: Vec<ScopeId>,
-    pub(crate) effect_subscribers: Vec<GenerationalBoxId>,
-}
+// #[derive(Default)]
+// pub(crate) struct SignalSubscribers {
+//     // todo: use a linear map here for faster scans
+//     pub(crate) subscribers: FxHashSet<ScopeId>,
+//     pub(crate) effect_subscribers: FxHashSet<GenerationalBoxId>,
+// }
 
 impl<T: 'static> Signal<T> {
     /// Creates a new Signal. Signals are a Copy state management solution with automatic dependency tracking.
@@ -214,42 +217,44 @@ impl<T: 'static, S: Storage<SignalData<T>>> Signal<T, S> {
     }
 
     fn update_subscribers(&self) {
-        {
-            let inner = self.inner.read();
-            for &scope_id in &*inner.subscribers.read().subscribers {
-                tracing::trace!(
-                    "Write on {:?} triggered update on {:?}",
-                    self.inner.value,
-                    scope_id
-                );
-                (inner.update_any)(scope_id);
-            }
-        }
-
-        let self_read = &self.inner.read();
-        let subscribers = {
-            let effects = &mut self_read.subscribers.write().effect_subscribers;
-            std::mem::take(&mut *effects)
-        };
-        let effect_ref = &self_read.effect_ref;
-        for effect in subscribers {
-            tracing::trace!(
-                "Write on {:?} triggered effect {:?}",
-                self.inner.value,
-                effect
-            );
-            effect_ref.rerun_effect(effect);
-        }
+        todo!()
+        // {
+        //     let inner = self.inner.read();
+        //     for &scope_id in inner.subscribers.read().subscribers.iter() {
+        //         tracing::trace!(
+        //             "Write on {:?} triggered update on {:?}",
+        //             self.inner.value,
+        //             scope_id
+        //         );
+        //         (inner.update_any)(scope_id);
+        //     }
+        // }
+
+        // let self_read = &self.inner.read();
+        // let subscribers = {
+        //     let effects = &mut self_read.subscribers.write().effect_subscribers;
+        //     std::mem::take(&mut *effects)
+        // };
+        // let effect_ref = &self_read.effect_ref;
+        // for effect in subscribers {
+        //     tracing::trace!(
+        //         "Write on {:?} triggered effect {:?}",
+        //         self.inner.value,
+        //         effect
+        //     );
+        //     effect_ref.queue_effect(effect);
+        // }
     }
 
     /// Unsubscribe this scope from the signal's effect list
     pub fn unsubscribe(&self, scope: ScopeId) {
-        self.inner
-            .read()
-            .subscribers
-            .write()
-            .subscribers
-            .retain(|s| *s != scope);
+        todo!()
+        // self.inner
+        //     .read()
+        //     .subscribers
+        //     .write()
+        //     .subscribers
+        //     .retain(|s| *s != scope);
     }
 
     /// Map the signal to a new type.
@@ -284,31 +289,36 @@ impl<T, S: Storage<SignalData<T>>> Readable<T> for Signal<T, S> {
     #[track_caller]
     fn read(&self) -> S::Ref<T> {
         let inner = self.inner.read();
-        if let Some(effect) = EFFECT_STACK.with(|stack| stack.current()) {
-            let subscribers = inner.subscribers.read();
-            if !subscribers.effect_subscribers.contains(&effect.inner.id()) {
-                drop(subscribers);
-                let mut subscribers = inner.subscribers.write();
-                subscribers.effect_subscribers.push(effect.inner.id());
-            }
-        } else if let Some(current_scope_id) = current_scope_id() {
-            // only subscribe if the vdom is rendering
-            if dioxus_core::vdom_is_rendering() {
-                tracing::trace!(
-                    "{:?} subscribed to {:?}",
-                    self.inner.value,
-                    current_scope_id
-                );
-                let subscribers = inner.subscribers.read();
-                if !subscribers.subscribers.contains(&current_scope_id) {
-                    drop(subscribers);
-                    let mut subscribers = inner.subscribers.write();
-                    subscribers.subscribers.push(current_scope_id);
-                    let unsubscriber = current_unsubscriber();
-                    subscribers.subscribers.push(unsubscriber.borrow().scope);
-                }
-            }
-        }
+
+        todo!();
+
+        // // if we're in a reactive context, attach the context to the signal via the mapping
+        // if let Some(effect) = ReactiveContextProvider::current() {
+        //     inner
+        //         .subscribers
+        //         .write()
+        //         .effect_subscribers
+        //         .insert(effect.inner.id());
+        // } else if let Some(current_scope_id) = current_scope_id() {
+        //     // only subscribe if the vdom is rendering
+        //     if dioxus_core::vdom_is_rendering() {
+        //         tracing::trace!(
+        //             "{:?} subscribed to {:?}",
+        //             self.inner.value,
+        //             current_scope_id
+        //         );
+
+        //         let mut subscribers = inner.subscribers.write();
+
+        //         if !subscribers.subscribers.contains(&current_scope_id) {
+        //             subscribers.subscribers.insert(current_scope_id);
+        //             subscribers
+        //                 .subscribers
+        //                 .insert(current_unsubscriber().borrow().scope);
+        //         }
+        //     }
+        // }
+
         S::map(inner, |v| &v.value)
     }