|
@@ -1,110 +1,75 @@
|
|
-use crate::events;
|
|
|
|
|
|
+use std::{convert::Infallible, time::Duration};
|
|
|
|
+
|
|
|
|
+use crate::{
|
|
|
|
+ events::{self, IpcMessage},
|
|
|
|
+ LiveView, LiveViewError,
|
|
|
|
+};
|
|
use dioxus_core::prelude::*;
|
|
use dioxus_core::prelude::*;
|
|
|
|
+use dioxus_html::a;
|
|
use futures_util::{pin_mut, SinkExt, StreamExt};
|
|
use futures_util::{pin_mut, SinkExt, StreamExt};
|
|
use tokio::sync::mpsc;
|
|
use tokio::sync::mpsc;
|
|
use tokio_stream::wrappers::UnboundedReceiverStream;
|
|
use tokio_stream::wrappers::UnboundedReceiverStream;
|
|
use tokio_util::task::LocalPoolHandle;
|
|
use tokio_util::task::LocalPoolHandle;
|
|
use warp::ws::{Message, WebSocket};
|
|
use warp::ws::{Message, WebSocket};
|
|
|
|
|
|
-impl crate::Liveview {
|
|
|
|
- pub async fn upgrade_warp(&self, ws: warp::ws::WebSocket, app: fn(Scope) -> Element) {
|
|
|
|
- connect(ws, self.pool.clone(), app, ()).await;
|
|
|
|
- }
|
|
|
|
- pub async fn upgrade_warp_with_props<T>(
|
|
|
|
- &self,
|
|
|
|
- ws: warp::ws::WebSocket,
|
|
|
|
|
|
+impl LiveView {
|
|
|
|
+ pub async fn upgrade_warp(self, ws: WebSocket, app: fn(Scope<()>) -> Element) {}
|
|
|
|
+
|
|
|
|
+ pub async fn upgrade_warp_with_props<T: Send + 'static>(
|
|
|
|
+ self,
|
|
|
|
+ ws: WebSocket,
|
|
app: fn(Scope<T>) -> Element,
|
|
app: fn(Scope<T>) -> Element,
|
|
props: T,
|
|
props: T,
|
|
- ) where
|
|
|
|
- T: Send + Sync + 'static,
|
|
|
|
- {
|
|
|
|
- connect(ws, self.pool.clone(), app, props).await;
|
|
|
|
|
|
+ ) {
|
|
|
|
+ self.pool
|
|
|
|
+ .spawn_pinned(move || liveview_eventloop(app, props, ws))
|
|
|
|
+ .await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-pub async fn connect<T>(
|
|
|
|
- ws: WebSocket,
|
|
|
|
- pool: LocalPoolHandle,
|
|
|
|
- app: fn(Scope<T>) -> Element,
|
|
|
|
|
|
+async fn liveview_eventloop<T>(
|
|
|
|
+ app: Component<T>,
|
|
props: T,
|
|
props: T,
|
|
-) where
|
|
|
|
- T: Send + Sync + 'static,
|
|
|
|
|
|
+ mut ws: WebSocket,
|
|
|
|
+) -> Result<(), LiveViewError>
|
|
|
|
+where
|
|
|
|
+ T: Send + 'static,
|
|
{
|
|
{
|
|
- // Use a counter to assign a new unique ID for this user.
|
|
|
|
-
|
|
|
|
- // Split the socket into a sender and receive of messages.
|
|
|
|
- let (mut user_ws_tx, mut user_ws_rx) = ws.split();
|
|
|
|
-
|
|
|
|
- let (event_tx, event_rx) = mpsc::unbounded_channel();
|
|
|
|
- let (edits_tx, edits_rx) = mpsc::unbounded_channel();
|
|
|
|
-
|
|
|
|
- let mut edits_rx = UnboundedReceiverStream::new(edits_rx);
|
|
|
|
- let mut event_rx = UnboundedReceiverStream::new(event_rx);
|
|
|
|
-
|
|
|
|
- let vdom_fut = pool.spawn_pinned(move || async move {
|
|
|
|
- let mut vdom = VirtualDom::new_with_props(app, props);
|
|
|
|
-
|
|
|
|
- let edits = vdom.rebuild();
|
|
|
|
-
|
|
|
|
- let serialized = serde_json::to_string(&edits.edits).unwrap();
|
|
|
|
- edits_tx.send(serialized).unwrap();
|
|
|
|
-
|
|
|
|
- loop {
|
|
|
|
- use futures_util::future::{select, Either};
|
|
|
|
-
|
|
|
|
- let new_event = {
|
|
|
|
- let vdom_fut = vdom.wait_for_work();
|
|
|
|
-
|
|
|
|
- pin_mut!(vdom_fut);
|
|
|
|
-
|
|
|
|
- match select(event_rx.next(), vdom_fut).await {
|
|
|
|
- Either::Left((l, _)) => l,
|
|
|
|
- Either::Right((_, _)) => None,
|
|
|
|
- }
|
|
|
|
- };
|
|
|
|
-
|
|
|
|
- if let Some(new_event) = new_event {
|
|
|
|
- vdom.handle_message(dioxus_core::SchedulerMsg::Event(new_event));
|
|
|
|
- } else {
|
|
|
|
- let mutations = vdom.work_with_deadline(|| false);
|
|
|
|
- for mutation in mutations {
|
|
|
|
- let edits = serde_json::to_string(&mutation.edits).unwrap();
|
|
|
|
- edits_tx.send(edits).unwrap();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
|
|
+ let mut vdom = VirtualDom::new_with_props(app, props);
|
|
|
|
+ let edits = serde_json::to_string(&vdom.rebuild()).unwrap();
|
|
|
|
+ ws.send(Message::text(edits)).await.unwrap();
|
|
|
|
|
|
loop {
|
|
loop {
|
|
- use futures_util::future::{select, Either};
|
|
|
|
|
|
+ tokio::select! {
|
|
|
|
+ // poll any futures or suspense
|
|
|
|
+ _ = vdom.wait_for_work() => {}
|
|
|
|
+
|
|
|
|
+ evt = ws.next() => {
|
|
|
|
+ match evt {
|
|
|
|
+ Some(Ok(evt)) => {
|
|
|
|
+ if let Ok(evt) = evt.to_str() {
|
|
|
|
+ let IpcMessage { name, element, bubbles, data } = serde_json::from_str(evt).unwrap();
|
|
|
|
|
|
- match select(user_ws_rx.next(), edits_rx.next()).await {
|
|
|
|
- Either::Left((l, _)) => {
|
|
|
|
- if let Some(Ok(msg)) = l {
|
|
|
|
- if let Ok(Some(msg)) = msg.to_str().map(events::parse_ipc_message) {
|
|
|
|
- if msg.method == "user_event" {
|
|
|
|
- let user_event = events::trigger_from_serialized(msg.params);
|
|
|
|
- event_tx.send(user_event).unwrap();
|
|
|
|
|
|
+ vdom.handle_event(&name, data, element, bubbles);
|
|
}
|
|
}
|
|
- } else {
|
|
|
|
- break;
|
|
|
|
}
|
|
}
|
|
- } else {
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- Either::Right((edits, _)) => {
|
|
|
|
- if let Some(edits) = edits {
|
|
|
|
- // send the edits to the client
|
|
|
|
- if user_ws_tx.send(Message::text(edits)).await.is_err() {
|
|
|
|
- break;
|
|
|
|
|
|
+ Some(Err(e)) => {
|
|
|
|
+ // log this I guess?
|
|
|
|
+ // when would we get an error here?
|
|
}
|
|
}
|
|
- } else {
|
|
|
|
- break;
|
|
|
|
|
|
+ None => break,
|
|
}
|
|
}
|
|
|
|
+
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
|
|
- vdom_fut.abort();
|
|
|
|
|
|
+ let edits = vdom
|
|
|
|
+ .render_with_deadline(tokio::time::sleep(Duration::from_millis(10)))
|
|
|
|
+ .await;
|
|
|
|
+
|
|
|
|
+ ws.send(Message::text(serde_json::to_string(&edits).unwrap()))
|
|
|
|
+ .await
|
|
|
|
+ .unwrap();
|
|
|
|
+ }
|
|
|
|
+ Ok(()) as Result<(), LiveViewError>
|
|
}
|
|
}
|