warp_adapter.rs 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. use crate::events;
  2. use dioxus_core::prelude::*;
  3. use futures_util::{pin_mut, SinkExt, StreamExt};
  4. use tokio::sync::mpsc;
  5. use tokio_stream::wrappers::UnboundedReceiverStream;
  6. use tokio_util::task::LocalPoolHandle;
  7. use warp::ws::{Message, WebSocket};
  8. #[cfg(feature = "warp")]
  9. impl crate::Liveview {
  10. pub async fn upgrade(&self, ws: warp::ws::WebSocket, app: fn(Scope) -> Element) {
  11. connect(ws, self.pool.clone(), app).await;
  12. }
  13. }
  14. pub async fn connect(ws: WebSocket, pool: LocalPoolHandle, app: fn(Scope) -> Element) {
  15. // Use a counter to assign a new unique ID for this user.
  16. // Split the socket into a sender and receive of messages.
  17. let (mut user_ws_tx, mut user_ws_rx) = ws.split();
  18. let (event_tx, event_rx) = mpsc::unbounded_channel();
  19. let (edits_tx, edits_rx) = mpsc::unbounded_channel();
  20. let mut edits_rx = UnboundedReceiverStream::new(edits_rx);
  21. let mut event_rx = UnboundedReceiverStream::new(event_rx);
  22. let vdom_fut = pool.spawn_pinned(move || async move {
  23. let mut vdom = VirtualDom::new(app);
  24. let edits = vdom.rebuild();
  25. let serialized = serde_json::to_string(&edits.edits).unwrap();
  26. edits_tx.send(serialized).unwrap();
  27. loop {
  28. use futures_util::future::{select, Either};
  29. let new_event = {
  30. let vdom_fut = vdom.wait_for_work();
  31. pin_mut!(vdom_fut);
  32. match select(event_rx.next(), vdom_fut).await {
  33. Either::Left((l, _)) => l,
  34. Either::Right((_, _)) => None,
  35. }
  36. };
  37. if let Some(new_event) = new_event {
  38. vdom.handle_message(dioxus_core::SchedulerMsg::Event(new_event));
  39. } else {
  40. let mutations = vdom.work_with_deadline(|| false);
  41. for mutation in mutations {
  42. let edits = serde_json::to_string(&mutation.edits).unwrap();
  43. edits_tx.send(edits).unwrap();
  44. }
  45. }
  46. }
  47. });
  48. loop {
  49. use futures_util::future::{select, Either};
  50. match select(user_ws_rx.next(), edits_rx.next()).await {
  51. Either::Left((l, _)) => {
  52. if let Some(Ok(msg)) = l {
  53. if let Ok(Some(msg)) = msg.to_str().map(events::parse_ipc_message) {
  54. if msg.method == "user_event" {
  55. let user_event = events::trigger_from_serialized(msg.params);
  56. event_tx.send(user_event).unwrap();
  57. }
  58. } else {
  59. break;
  60. }
  61. } else {
  62. break;
  63. }
  64. }
  65. Either::Right((edits, _)) => {
  66. if let Some(edits) = edits {
  67. // send the edits to the client
  68. if user_ws_tx.send(Message::text(edits)).await.is_err() {
  69. break;
  70. }
  71. } else {
  72. break;
  73. }
  74. }
  75. }
  76. }
  77. vdom_fut.abort();
  78. }