pool.rs 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. use crate::LiveViewError;
  2. use dioxus_core::prelude::*;
  3. use dioxus_html::HtmlEvent;
  4. use futures_util::{pin_mut, SinkExt, StreamExt};
  5. use std::time::Duration;
  6. use tokio_util::task::LocalPoolHandle;
  7. #[derive(Clone)]
  8. pub struct LiveViewPool {
  9. pub(crate) pool: LocalPoolHandle,
  10. }
  11. impl Default for LiveViewPool {
  12. fn default() -> Self {
  13. Self::new()
  14. }
  15. }
  16. impl LiveViewPool {
  17. pub fn new() -> Self {
  18. LiveViewPool {
  19. pool: LocalPoolHandle::new(16),
  20. }
  21. }
  22. pub async fn launch(
  23. &self,
  24. ws: impl LiveViewSocket,
  25. app: fn(Scope<()>) -> Element,
  26. ) -> Result<(), LiveViewError> {
  27. self.launch_with_props(ws, app, ()).await
  28. }
  29. pub async fn launch_with_props<T: Send + 'static>(
  30. &self,
  31. ws: impl LiveViewSocket,
  32. app: fn(Scope<T>) -> Element,
  33. props: T,
  34. ) -> Result<(), LiveViewError> {
  35. self.launch_virtualdom(ws, move || VirtualDom::new_with_props(app, props))
  36. .await
  37. }
  38. pub async fn launch_virtualdom<F: FnOnce() -> VirtualDom + Send + 'static>(
  39. &self,
  40. ws: impl LiveViewSocket,
  41. make_app: F,
  42. ) -> Result<(), LiveViewError> {
  43. match self.pool.spawn_pinned(move || run(make_app(), ws)).await {
  44. Ok(Ok(_)) => Ok(()),
  45. Ok(Err(e)) => Err(e),
  46. Err(_) => Err(LiveViewError::SendingFailed),
  47. }
  48. }
  49. }
  50. /// A LiveViewSocket is a Sink and Stream of Strings that Dioxus uses to communicate with the client
  51. ///
  52. /// Most websockets from most HTTP frameworks can be converted into a LiveViewSocket using the appropriate adapter.
  53. ///
  54. /// You can also convert your own socket into a LiveViewSocket by implementing this trait. This trait is an auto trait,
  55. /// meaning that as long as your type implements Stream and Sink, you can use it as a LiveViewSocket.
  56. ///
  57. /// For example, the axum implementation is a really small transform:
  58. ///
  59. /// ```rust, ignore
  60. /// pub fn axum_socket(ws: WebSocket) -> impl LiveViewSocket {
  61. /// ws.map(transform_rx)
  62. /// .with(transform_tx)
  63. /// .sink_map_err(|_| LiveViewError::SendingFailed)
  64. /// }
  65. ///
  66. /// fn transform_rx(message: Result<Message, axum::Error>) -> Result<String, LiveViewError> {
  67. /// message
  68. /// .map_err(|_| LiveViewError::SendingFailed)?
  69. /// .into_text()
  70. /// .map_err(|_| LiveViewError::SendingFailed)
  71. /// }
  72. ///
  73. /// async fn transform_tx(message: String) -> Result<Message, axum::Error> {
  74. /// Ok(Message::Text(message))
  75. /// }
  76. /// ```
  77. pub trait LiveViewSocket:
  78. SinkExt<String, Error = LiveViewError>
  79. + StreamExt<Item = Result<String, LiveViewError>>
  80. + Send
  81. + 'static
  82. {
  83. }
  84. impl<S> LiveViewSocket for S where
  85. S: SinkExt<String, Error = LiveViewError>
  86. + StreamExt<Item = Result<String, LiveViewError>>
  87. + Send
  88. + 'static
  89. {
  90. }
  91. /// The primary event loop for the VirtualDom waiting for user input
  92. ///
  93. /// This function makes it easy to integrate Dioxus LiveView with any socket-based framework.
  94. ///
  95. /// As long as your framework can provide a Sink and Stream of Strings, you can use this function.
  96. ///
  97. /// You might need to transform the error types of the web backend into the LiveView error type.
  98. pub async fn run(mut vdom: VirtualDom, ws: impl LiveViewSocket) -> Result<(), LiveViewError> {
  99. #[cfg(all(feature = "hot-reload", debug_assertions))]
  100. let mut hot_reload_rx = {
  101. let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
  102. dioxus_hot_reload::connect(move |template| {
  103. let _ = tx.send(template);
  104. });
  105. rx
  106. };
  107. // todo: use an efficient binary packed format for this
  108. let edits = serde_json::to_string(&vdom.rebuild()).unwrap();
  109. // pin the futures so we can use select!
  110. pin_mut!(ws);
  111. // send the initial render to the client
  112. ws.send(edits).await?;
  113. // desktop uses this wrapper struct thing around the actual event itself
  114. // this is sorta driven by tao/wry
  115. #[derive(serde::Deserialize)]
  116. struct IpcMessage {
  117. params: HtmlEvent,
  118. }
  119. loop {
  120. #[cfg(all(feature = "hot-reload", debug_assertions))]
  121. let hot_reload_wait = hot_reload_rx.recv();
  122. #[cfg(not(all(feature = "hot-reload", debug_assertions)))]
  123. let hot_reload_wait: std::future::Pending<Option<()>> = std::future::pending();
  124. tokio::select! {
  125. // poll any futures or suspense
  126. _ = vdom.wait_for_work() => {}
  127. evt = ws.next() => {
  128. match evt.as_ref().map(|o| o.as_deref()) {
  129. // respond with a pong every ping to keep the websocket alive
  130. Some(Ok("__ping__")) => {
  131. ws.send("__pong__".to_string()).await?;
  132. }
  133. Some(Ok(evt)) => {
  134. if let Ok(IpcMessage { params }) = serde_json::from_str::<IpcMessage>(evt) {
  135. vdom.handle_event(&params.name, params.data.into_any(), params.element, params.bubbles);
  136. }
  137. }
  138. // log this I guess? when would we get an error here?
  139. Some(Err(_e)) => {},
  140. None => return Ok(()),
  141. }
  142. }
  143. Some(msg) = hot_reload_wait => {
  144. #[cfg(all(feature = "hot-reload", debug_assertions))]
  145. match msg{
  146. dioxus_hot_reload::HotReloadMsg::UpdateTemplate(new_template) => {
  147. vdom.replace_template(new_template);
  148. }
  149. dioxus_hot_reload::HotReloadMsg::Shutdown => {
  150. std::process::exit(0);
  151. },
  152. }
  153. #[cfg(not(all(feature = "hot-reload", debug_assertions)))]
  154. let () = msg;
  155. }
  156. }
  157. let edits = vdom
  158. .render_with_deadline(tokio::time::sleep(Duration::from_millis(10)))
  159. .await;
  160. ws.send(serde_json::to_string(&edits).unwrap()).await?;
  161. }
  162. }