Procházet zdrojové kódy

Add liveview Salvo integration (#538)

Chrislearn Young před 2 roky
rodič
revize
e4466fee0a

+ 5 - 1
packages/liveview/Cargo.toml

@@ -37,12 +37,16 @@ warp = { version = "0.3", optional = true }
 axum = { version = "0.5.1", optional = true, features = ["ws"] }
 tower = { version = "0.4.12", optional = true }
 
+# salvo
+salvo = { version = "0.32.0", optional = true, features = ["ws"] }
+
 [dev-dependencies]
 tokio = { version = "1", features = ["full"] }
 dioxus = { path = "../dioxus" }
 warp = "0.3"
 axum = { version = "0.5.1", features = ["ws"] }
+salvo = { version = "0.32.0", features = ["affix", "ws"] }
 tower = "0.4.12"
 
 [features]
-default = []
+default = []

+ 55 - 0
packages/liveview/examples/salvo.rs

@@ -0,0 +1,55 @@
+#[cfg(not(feature = "salvo"))]
+fn main() {}
+
+#[cfg(feature = "salvo")]
+#[tokio::main]
+async fn main() {
+    use std::sync::Arc;
+
+    use dioxus_core::{Element, LazyNodes, Scope};
+    use dioxus_liveview as liveview;
+    use dioxus_liveview::Liveview;
+    use salvo::extra::affix;
+    use salvo::extra::ws::WsHandler;
+    use salvo::prelude::*;
+
+    fn app(cx: Scope) -> Element {
+        cx.render(LazyNodes::new(|f| f.text(format_args!("hello world!"))))
+    }
+
+    pretty_env_logger::init();
+
+    let addr = ([127, 0, 0, 1], 3030);
+
+    // todo: compactify this routing under one liveview::app method
+    let view = liveview::new(addr);
+    let router = Router::new()
+        .hoop(affix::inject(Arc::new(view)))
+        .get(index)
+        .push(Router::with_path("app").get(connect));
+    Server::new(TcpListener::bind(addr)).serve(router).await;
+
+    #[handler]
+    fn index(depot: &mut Depot, res: &mut Response) {
+        let view = depot.obtain::<Arc<Liveview>>().unwrap();
+        let body = view.body("<title>Dioxus LiveView</title>");
+        res.render(Text::Html(body));
+    }
+
+    #[handler]
+    async fn connect(
+        req: &mut Request,
+        depot: &mut Depot,
+        res: &mut Response,
+    ) -> Result<(), StatusError> {
+        let view = depot.obtain::<Arc<Liveview>>().unwrap().clone();
+        let fut = WsHandler::new().handle(req, res)?;
+        let fut = async move {
+            if let Some(ws) = fut.await {
+                view.upgrade_salvo(ws, app).await;
+            }
+        };
+        tokio::task::spawn(fut);
+        Ok(())
+    }
+}

+ 110 - 0
packages/liveview/src/adapters/salvo_adapter.rs

@@ -0,0 +1,110 @@
+use crate::events;
+use dioxus_core::prelude::*;
+use futures_util::{pin_mut, SinkExt, StreamExt};
+use salvo::extra::ws::{Message, WebSocket};
+use tokio::sync::mpsc;
+use tokio_stream::wrappers::UnboundedReceiverStream;
+use tokio_util::task::LocalPoolHandle;
+
+impl crate::Liveview {
+    pub async fn upgrade_salvo(&self, ws: salvo::extra::ws::WebSocket, app: fn(Scope) -> Element) {
+        connect(ws, self.pool.clone(), app, ()).await;
+    }
+    pub async fn upgrade_salvo_with_props<T>(
+        &self,
+        ws: salvo::extra::ws::WebSocket,
+        app: fn(Scope<T>) -> Element,
+        props: T,
+    ) where
+        T: Send + Sync + 'static,
+    {
+        connect(ws, self.pool.clone(), app, props).await;
+    }
+}
+
+pub async fn connect<T>(
+    ws: WebSocket,
+    pool: LocalPoolHandle,
+    app: fn(Scope<T>) -> Element,
+    props: T,
+) where
+    T: Send + Sync + 'static,
+{
+    // Use a counter to assign a new unique ID for this user.
+
+    // Split the socket into a sender and receive of messages.
+    let (mut user_ws_tx, mut user_ws_rx) = ws.split();
+
+    let (event_tx, event_rx) = mpsc::unbounded_channel();
+    let (edits_tx, edits_rx) = mpsc::unbounded_channel();
+
+    let mut edits_rx = UnboundedReceiverStream::new(edits_rx);
+    let mut event_rx = UnboundedReceiverStream::new(event_rx);
+
+    let vdom_fut = pool.spawn_pinned(move || async move {
+        let mut vdom = VirtualDom::new_with_props(app, props);
+
+        let edits = vdom.rebuild();
+
+        let serialized = serde_json::to_string(&edits.edits).unwrap();
+        edits_tx.send(serialized).unwrap();
+
+        loop {
+            use futures_util::future::{select, Either};
+
+            let new_event = {
+                let vdom_fut = vdom.wait_for_work();
+
+                pin_mut!(vdom_fut);
+
+                match select(event_rx.next(), vdom_fut).await {
+                    Either::Left((l, _)) => l,
+                    Either::Right((_, _)) => None,
+                }
+            };
+
+            if let Some(new_event) = new_event {
+                vdom.handle_message(dioxus_core::SchedulerMsg::Event(new_event));
+            } else {
+                let mutations = vdom.work_with_deadline(|| false);
+                for mutation in mutations {
+                    let edits = serde_json::to_string(&mutation.edits).unwrap();
+                    edits_tx.send(edits).unwrap();
+                }
+            }
+        }
+    });
+
+    loop {
+        use futures_util::future::{select, Either};
+
+        match select(user_ws_rx.next(), edits_rx.next()).await {
+            Either::Left((l, _)) => {
+                if let Some(Ok(msg)) = l {
+                    if let Ok(Some(msg)) = msg.to_str().map(events::parse_ipc_message) {
+                        if msg.method == "user_event" {
+                            let user_event = events::trigger_from_serialized(msg.params);
+                            event_tx.send(user_event).unwrap();
+                        }
+                    } else {
+                        break;
+                    }
+                } else {
+                    break;
+                }
+            }
+            Either::Right((edits, _)) => {
+                if let Some(edits) = edits {
+                    // send the edits to the client
+                    if user_ws_tx.send(Message::text(edits)).await.is_err() {
+                        break;
+                    }
+                } else {
+                    break;
+                }
+            }
+        }
+    }
+
+    vdom_fut.abort();
+}

+ 3 - 0
packages/liveview/src/lib.rs

@@ -7,6 +7,9 @@ pub mod adapters {
 
     #[cfg(feature = "axum")]
     pub mod axum_adapter;
+
+    #[cfg(feature = "salvo")]
+    pub mod salvo_adapter;
 }
 
 use std::net::SocketAddr;