瀏覽代碼

feat: pair down usecoroutine and make it much happier

Jonathan Kelley 3 年之前
父節點
當前提交
fbaf25dc6b
共有 1 個文件被更改,包括 102 次插入139 次删除
  1. 102 139
      packages/hooks/src/usecoroutine.rs

+ 102 - 139
packages/hooks/src/usecoroutine.rs

@@ -1,181 +1,144 @@
-#![warn(clippy::pedantic)]
-
-use dioxus_core::exports::bumpalo;
-use dioxus_core::{LazyNodes, ScopeState, TaskId};
-use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
-use std::any::Any;
+use dioxus_core::{ScopeState, TaskId};
+pub use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
 use std::future::Future;
-use std::{cell::Cell, rc::Rc};
+use std::rc::Rc;
 
 /// Maintain a handle over a future that can be paused, resumed, and canceled.
 ///
-/// This is an upgraded form of [`use_future`] with lots of bells-and-whistles.
+/// This is an upgraded form of [`use_future`] with an integrated channel system.
+/// Specifically, the coroutine generated here comes with an [`UnboundedChannel`]
+/// built into it - saving you the hassle of building your own.
 ///
-/// [`use_coroutine`] is well suited for long-running tasks and is very customizable.
+/// Addititionally, coroutines are automatically injected as shared contexts, so
+/// downstream components can tap into a coroutine's channel and send messages
+/// into a singular async event loop.
 ///
+/// This makes it effective for apps that need to interact with an event loop or
+/// some asynchronous code without thinking too hard about state.
 ///
-/// ## Long running tasks
+/// ## Global State
 ///
+/// Typically, writing apps that handle concurrency properly can be difficult,
+/// so the intention of this hook is to make it easy to join and poll async tasks
+/// concurrently in a centralized place. You'll find that you can have much better
+/// control over your app's state if you centralize your async actions, even under
+/// the same concurrent context. This makes it easier to prevent undeseriable
+/// states in your UI while various async tasks are already running.
 ///
+/// This hook is especially powerful when combined with Fermi. We can store important
+/// global data in a coroutine, and then access display-level values from the rest
+/// of our app through atoms.
 ///
-/// ## One-off tasks
+/// ## UseCallback instead
 ///
+/// However, you must plan out your own concurrency and synchronization. If you
+/// don't care about actions in your app being synchronized, you can use [`use_callback`]
+/// hook to spawn multiple tasks and run them concurrently.
 ///
-/// ## Cancellation
+/// ## Example
 ///
+/// ```rust, ignore
+/// enum Action {
+///     Start,
+///     Stop,
+/// }
 ///
-/// ## Global State
-#[allow(clippy::mut_from_ref)]
-pub fn use_coroutine<O: 'static>(cx: &ScopeState) -> &mut UseCoroutine<O, ()> {
+/// let chat_client = use_coroutine(&cx, |rx: UnboundedReceiver<Action>| async move {
+///     while let Some(action) = rx.next().await {
+///         match action {
+///             Action::Start => {}
+///             Action::Stop => {},
+///         }
+///     }
+/// });
+///
+///
+/// cx.render(rsx!{
+///     button {
+///         onclick: move |_| chat_client.send(Action::Start),
+///         "Start Chat Service"
+///     }
+/// })
+/// ```
+pub fn use_coroutine<M, G, F>(cx: &ScopeState, init: G) -> &CoroutineHandle<M>
+where
+    M: 'static,
+    G: FnOnce(UnboundedReceiver<M>) -> F,
+    F: Future<Output = ()> + 'static,
+{
     cx.use_hook(|_| {
-        //
-        UseCoroutine {
-            val: Cell::new(None),
-            rx: Cell::new(None),
-            tx: None,
-            first_run: true,
-            deps: vec![],
-            dep_cnt: 0,
-            needs_regen: false,
-            auto_start: true,
-        }
+        let (tx, rx) = futures_channel::mpsc::unbounded();
+        let task = cx.push_future(init(rx));
+        cx.provide_context(CoroutineHandle { tx, task })
     })
 }
 
-pub struct UseCoroutine<O, M = ()> {
-    val: Cell<Option<O>>,
-    rx: Cell<Option<UnboundedReceiver<M>>>,
-    tx: Option<UnboundedSender<M>>,
-    first_run: bool,
-    deps: Vec<Box<dyn Any>>,
-    dep_cnt: usize,
-    needs_regen: bool,
-    auto_start: bool,
+/// Get a handle to a coroutine higher in the tree
+///
+/// See the docs for [`use_coroutine`] for more details.
+pub fn use_coroutine_handle<M: 'static>(cx: &ScopeState) -> Option<&Rc<CoroutineHandle<M>>> {
+    cx.use_hook(|_| cx.consume_context::<CoroutineHandle<M>>())
+        .as_ref()
 }
 
-pub enum FutureState<'a, T> {
-    Pending,
-    Complete(&'a T),
-    Regenerating(&'a T), // the old value
+pub struct CoroutineHandle<T> {
+    tx: UnboundedSender<T>,
+    task: TaskId,
 }
 
-impl<O> UseCoroutine<O, ()> {
-    /// explicitly set the type of the channel used by the coroutine
-    fn with_channel<S>(&mut self) -> &mut UseCoroutine<O, S> {
-        if self.first_run {
-            // self.provide_context()
-        }
-        todo!()
+impl<T> CoroutineHandle<T> {
+    /// Get the ID of this coroutine
+    #[must_use]
+    pub fn task_id(&self) -> TaskId {
+        self.task
     }
 
-    /// explicitly set the type of the channel used by the coroutine
-    fn with_channel_isolate<S>(&mut self) -> &mut UseCoroutine<O, S> {
-        todo!()
+    /// Send a message to the coroutine
+    pub fn send(&self, msg: T) {
+        let _ = self.tx.unbounded_send(msg);
     }
 }
 
-impl<O, M> UseCoroutine<O, M> {
-    pub fn is_running(&self) -> bool {
-        false
-        // self.running.get()
-    }
-
-    pub fn start(&self) {
-        // if !self.is_running() {
-        //     if let Some(mut fut) = self.create_fut.take() {
-        //         let fut = fut();
-        //         let ready_handle = self.running.clone();
-
-        //         let task = self.cx.push_future(async move {
-        //             ready_handle.set(true);
-        //             fut.await;
-        //             ready_handle.set(false);
-        //         });
+#[cfg(test)]
+mod tests {
+    #![allow(unused)]
 
-        //         self.task_id.set(Some(task));
-        //     }
-        // }
-    }
+    use super::*;
+    use dioxus_core::exports::futures_channel::mpsc::unbounded;
+    use dioxus_core::prelude::*;
+    use futures_util::StreamExt;
 
-    pub fn send(&self, msg: M) {
-        if let Some(tx) = self.tx.clone() {
-            if tx.unbounded_send(msg).is_err() {
-                log::error!("Failed to send message");
+    fn app(cx: Scope, name: String) -> Element {
+        let task = use_coroutine(&cx, |mut rx: UnboundedReceiver<i32>| async move {
+            while let Some(msg) = rx.next().await {
+                println!("got message: {}", msg);
             }
-        }
-    }
+        });
 
-    // todo: wire these up, either into the task system or into the coroutine system itself
-    // we would have change how we poll the coroutine and how its awaken
+        let task2 = use_coroutine(&cx, view_task);
 
-    fn build<F: Future<Output = O>>(&mut self, f: impl FnOnce(UnboundedReceiver<M>) -> F) -> &Self {
-        self.first_run = false;
-        if self.auto_start || self.needs_regen {
-            //
-        }
+        let task3 = use_coroutine(&cx, |rx| complex_task(rx, 10));
 
-        self
-    }
-
-    pub fn auto_start(mut self, start: bool) -> Self {
-        // if start && self.run_count.get() == 1 {
-        //     self.start();
-        // }
-        self
+        None
     }
 
-    /// Add this value to the dependency list
-    ///
-    /// This is a hook and should be called during the initial hook process.
-    /// It should •not• be called in a conditional.
-    pub fn with_dep<F: 'static + PartialEq + Clone>(&mut self, dependency: &F) -> &mut Self {
-        if let Some(dep) = self.deps.get_mut(self.dep_cnt) {
-            if let Some(saved_dep) = dep.downcast_mut::<F>() {
-                if dependency != saved_dep {
-                    *saved_dep = dependency.clone();
-                    self.needs_regen = true;
-                }
-            };
-        } else {
-            self.deps.push(Box::new(dependency.to_owned()));
-            self.needs_regen = true;
+    async fn view_task(mut rx: UnboundedReceiver<i32>) {
+        while let Some(msg) = rx.next().await {
+            println!("got message: {}", msg);
         }
-
-        self
     }
 
-    pub fn restart_if(&self, f: impl FnOnce() -> bool) -> &Self {
-        self
+    enum Actions {
+        CloseAll,
+        OpenAll,
     }
 
-    // pub fn resume(&self) {}
-    // pub fn stop(&self) {}
-    // pub fn restart(&self) {}
-}
-
-pub struct CoroutineContext<T> {
-    tx: UnboundedSender<T>,
-}
-
-#[cfg(test)]
-mod tests {
-    #![allow(unused)]
-
-    use super::*;
-    use dioxus_core::exports::futures_channel::mpsc::unbounded;
-    use dioxus_core::prelude::*;
-    use futures_util::StreamExt;
-
-    fn app(cx: Scope, name: String) -> Element {
-        let task = use_coroutine(&cx)
-            .with_dep(&name)
-            .with_channel::<i32>()
-            .build(|mut rx| async move {
-                while let Some(msg) = rx.next().await {
-                    println!("got message: {}", msg);
-                }
-            });
-
-        None
+    async fn complex_task(mut rx: UnboundedReceiver<Actions>, name: i32) {
+        while let Some(msg) = rx.next().await {
+            match msg {
+                Actions::CloseAll => todo!(),
+                Actions::OpenAll => todo!(),
+            }
+        }
     }
 }