Bläddra i källkod

implemented liveview axum adapter

Ian 3 år sedan
förälder
incheckning
a632d9b12b
2 ändrade filer med 75 tillägg och 2 borttagningar
  1. 7 2
      packages/liveview/Cargo.toml
  2. 68 0
      packages/liveview/src/adapters/axum_adapter.rs

+ 7 - 2
packages/liveview/Cargo.toml

@@ -30,10 +30,15 @@ dioxus-core = { path = "../core", features = ["serialize"] }
 # warp
 # warp
 warp = { version = "0.3", optional = true }
 warp = { version = "0.3", optional = true }
 
 
+# axum
+axum = { version = "0.5.1", optional = true, features = ["ws"] }
+tower = { version = "0.4.12", optional = true }
+
 [dev-dependencies]
 [dev-dependencies]
 dioxus-liveview = { path = "./", features = ["warp"] }
 dioxus-liveview = { path = "./", features = ["warp"] }
 warp = "0.3"
 warp = "0.3"
-
+axum = { version = "0.5.1", features = ["ws"] }
+tower = "0.4.12"
 
 
 [features]
 [features]
-default = []
+default = []

+ 68 - 0
packages/liveview/src/adapters/axum_adapter.rs

@@ -1 +1,69 @@
+use crate::{events, Liveview};
+use axum::extract::ws::{Message, WebSocket};
+use dioxus_core::prelude::*;
+use futures_util::{
+    future::{select, Either},
+    pin_mut, SinkExt, StreamExt,
+};
+use tokio::sync::mpsc;
+use tokio_stream::wrappers::UnboundedReceiverStream;
 
 
+pub async fn connect(socket: WebSocket, liveview: Liveview, app: fn(Scope) -> Element) {
+    let (mut user_ws_tx, mut user_ws_rx) = socket.split();
+    let (event_tx, event_rx) = mpsc::unbounded_channel();
+    let (edits_tx, edits_rx) = mpsc::unbounded_channel();
+    let mut edits_rx = UnboundedReceiverStream::new(edits_rx);
+    let mut event_rx = UnboundedReceiverStream::new(event_rx);
+    let vdom_fut = liveview.pool.clone().spawn_pinned(move || async move {
+        let mut vdom = VirtualDom::new(app);
+        let edits = vdom.rebuild();
+        let serialized = serde_json::to_string(&edits.edits).unwrap();
+        edits_tx.send(serialized).unwrap();
+        loop {
+            let new_event = {
+                let vdom_fut = vdom.wait_for_work();
+                pin_mut!(vdom_fut);
+                match select(event_rx.next(), vdom_fut).await {
+                    Either::Left((l, _)) => l,
+                    Either::Right((_, _)) => None,
+                }
+            };
+            if let Some(new_event) = new_event {
+                vdom.handle_message(dioxus_core::SchedulerMsg::Event(new_event));
+            } else {
+                let mutations = vdom.work_with_deadline(|| false);
+                for mutation in mutations {
+                    let edits = serde_json::to_string(&mutation.edits).unwrap();
+                    edits_tx.send(edits).unwrap();
+                }
+            }
+        }
+    });
+    loop {
+        match select(user_ws_rx.next(), edits_rx.next()).await {
+            Either::Left((l, _)) => {
+                if let Some(Ok(msg)) = l {
+                    if let Ok(Some(msg)) = msg.to_text().map(events::parse_ipc_message) {
+                        let user_event = events::trigger_from_serialized(msg.params);
+                        event_tx.send(user_event).unwrap();
+                    } else {
+                        break;
+                    }
+                } else {
+                    break;
+                }
+            }
+            Either::Right((edits, _)) => {
+                if let Some(edits) = edits {
+                    // send the edits to the client
+                    if user_ws_tx.send(Message::Text(edits)).await.is_err() {
+                        break;
+                    }
+                } else {
+                    break;
+                }
+            }
+        }
+    }
+    vdom_fut.abort();
+}