use_coroutine.rs 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. use dioxus_core::{ScopeState, TaskId};
  2. pub use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
  3. use std::future::Future;
  4. /// Maintain a handle over a future that can be paused, resumed, and canceled.
  5. ///
  6. /// This is an upgraded form of [`use_future`] with an integrated channel system.
  7. /// Specifically, the coroutine generated here comes with an [`UnboundedChannel`]
  8. /// built into it - saving you the hassle of building your own.
  9. ///
  10. /// Addititionally, coroutines are automatically injected as shared contexts, so
  11. /// downstream components can tap into a coroutine's channel and send messages
  12. /// into a singular async event loop.
  13. ///
  14. /// This makes it effective for apps that need to interact with an event loop or
  15. /// some asynchronous code without thinking too hard about state.
  16. ///
  17. /// ## Global State
  18. ///
  19. /// Typically, writing apps that handle concurrency properly can be difficult,
  20. /// so the intention of this hook is to make it easy to join and poll async tasks
  21. /// concurrently in a centralized place. You'll find that you can have much better
  22. /// control over your app's state if you centralize your async actions, even under
  23. /// the same concurrent context. This makes it easier to prevent undeseriable
  24. /// states in your UI while various async tasks are already running.
  25. ///
  26. /// This hook is especially powerful when combined with Fermi. We can store important
  27. /// global data in a coroutine, and then access display-level values from the rest
  28. /// of our app through atoms.
  29. ///
  30. /// ## UseCallback instead
  31. ///
  32. /// However, you must plan out your own concurrency and synchronization. If you
  33. /// don't care about actions in your app being synchronized, you can use [`use_callback`]
  34. /// hook to spawn multiple tasks and run them concurrently.
  35. ///
  36. /// ### Notice
  37. /// In order to use ``rx.next().await``, you will need to extend the ``Stream`` trait (used by ``UnboundedReceiver``)
  38. /// by adding the ``futures-util`` crate as a dependency and adding ``StreamExt`` into scope via ``use futures_util::stream::StreamExt;``
  39. ///
  40. /// ## Example
  41. ///
  42. /// ```rust, ignore
  43. /// enum Action {
  44. /// Start,
  45. /// Stop,
  46. /// }
  47. ///
  48. /// let chat_client = use_coroutine(cx, |mut rx: UnboundedReceiver<Action>| async move {
  49. /// while let Some(action) = rx.next().await {
  50. /// match action {
  51. /// Action::Start => {}
  52. /// Action::Stop => {},
  53. /// }
  54. /// }
  55. /// });
  56. ///
  57. ///
  58. /// cx.render(rsx!{
  59. /// button {
  60. /// onclick: move |_| chat_client.send(Action::Start),
  61. /// "Start Chat Service"
  62. /// }
  63. /// })
  64. /// ```
  65. pub fn use_coroutine<M, G, F>(cx: &ScopeState, init: G) -> &Coroutine<M>
  66. where
  67. M: 'static,
  68. G: FnOnce(UnboundedReceiver<M>) -> F,
  69. F: Future<Output = ()> + 'static,
  70. {
  71. cx.use_hook(|| {
  72. let (tx, rx) = futures_channel::mpsc::unbounded();
  73. let task = cx.push_future(init(rx));
  74. cx.provide_context(Coroutine { tx, task })
  75. })
  76. }
  77. /// Get a handle to a coroutine higher in the tree
  78. ///
  79. /// See the docs for [`use_coroutine`] for more details.
  80. #[must_use]
  81. pub fn use_coroutine_handle<M: 'static>(cx: &ScopeState) -> Option<&Coroutine<M>> {
  82. cx.use_hook(|| cx.consume_context::<Coroutine<M>>())
  83. .as_ref()
  84. }
  85. pub struct Coroutine<T> {
  86. tx: UnboundedSender<T>,
  87. task: TaskId,
  88. }
  89. // for use in futures
  90. impl<T> Clone for Coroutine<T> {
  91. fn clone(&self) -> Self {
  92. Self {
  93. tx: self.tx.clone(),
  94. task: self.task,
  95. }
  96. }
  97. }
  98. impl<T> Coroutine<T> {
  99. /// Get the ID of this coroutine
  100. #[must_use]
  101. pub fn task_id(&self) -> TaskId {
  102. self.task
  103. }
  104. /// Send a message to the coroutine
  105. pub fn send(&self, msg: T) {
  106. let _ = self.tx.unbounded_send(msg);
  107. }
  108. }
  109. impl<T> PartialEq for Coroutine<T> {
  110. fn eq(&self, other: &Self) -> bool {
  111. self.task == other.task
  112. }
  113. }
  114. #[cfg(test)]
  115. mod tests {
  116. #![allow(unused)]
  117. use super::*;
  118. use dioxus_core::prelude::*;
  119. use futures_channel::mpsc::unbounded;
  120. use futures_util::StreamExt;
  121. fn app(cx: Scope, name: String) -> Element {
  122. let task = use_coroutine(cx, |mut rx: UnboundedReceiver<i32>| async move {
  123. while let Some(msg) = rx.next().await {
  124. println!("got message: {msg}");
  125. }
  126. });
  127. let task2 = use_coroutine(cx, view_task);
  128. let task3 = use_coroutine(cx, |rx| complex_task(rx, 10));
  129. todo!()
  130. }
  131. async fn view_task(mut rx: UnboundedReceiver<i32>) {
  132. while let Some(msg) = rx.next().await {
  133. println!("got message: {msg}");
  134. }
  135. }
  136. enum Actions {
  137. CloseAll,
  138. OpenAll,
  139. }
  140. async fn complex_task(mut rx: UnboundedReceiver<Actions>, name: i32) {
  141. while let Some(msg) = rx.next().await {
  142. match msg {
  143. Actions::CloseAll => todo!(),
  144. Actions::OpenAll => todo!(),
  145. }
  146. }
  147. }
  148. }