warp_adapter.rs 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. use crate::events;
  2. use dioxus_core::prelude::*;
  3. use futures_util::{pin_mut, SinkExt, StreamExt};
  4. use tokio::sync::mpsc;
  5. use tokio_stream::wrappers::UnboundedReceiverStream;
  6. use tokio_util::task::LocalPoolHandle;
  7. use warp::ws::{Message, WebSocket};
  8. #[cfg(feature = "warp")]
  9. impl crate::Liveview {
  10. pub async fn upgrade(&self, ws: warp::ws::WebSocket, app: fn(Scope) -> Element) {
  11. connect(ws, self.pool.clone(), app, ()).await;
  12. }
  13. pub async fn upgrade_with_props<T>(&self, ws: warp::ws::WebSocket, app: fn(Scope) -> Element, props: T)
  14. where
  15. T: Send + Sync + 'static
  16. {
  17. connect(ws, self.pool.clone(), app, props)
  18. }
  19. }
  20. pub async fn connect<T>(ws: WebSocket, pool: LocalPoolHandle, app: fn(Scope) -> Element, props: T)
  21. where
  22. T: Send + Sync+ 'static
  23. {
  24. // Use a counter to assign a new unique ID for this user.
  25. // Split the socket into a sender and receive of messages.
  26. let (mut user_ws_tx, mut user_ws_rx) = ws.split();
  27. let (event_tx, event_rx) = mpsc::unbounded_channel();
  28. let (edits_tx, edits_rx) = mpsc::unbounded_channel();
  29. let mut edits_rx = UnboundedReceiverStream::new(edits_rx);
  30. let mut event_rx = UnboundedReceiverStream::new(event_rx);
  31. let vdom_fut = pool.spawn_pinned(move || async move {
  32. let mut vdom = VirtualDom::new_with_props(app, props);
  33. let edits = vdom.rebuild();
  34. let serialized = serde_json::to_string(&edits.edits).unwrap();
  35. edits_tx.send(serialized).unwrap();
  36. loop {
  37. use futures_util::future::{select, Either};
  38. let new_event = {
  39. let vdom_fut = vdom.wait_for_work();
  40. pin_mut!(vdom_fut);
  41. match select(event_rx.next(), vdom_fut).await {
  42. Either::Left((l, _)) => l,
  43. Either::Right((_, _)) => None,
  44. }
  45. };
  46. if let Some(new_event) = new_event {
  47. vdom.handle_message(dioxus_core::SchedulerMsg::Event(new_event));
  48. } else {
  49. let mutations = vdom.work_with_deadline(|| false);
  50. for mutation in mutations {
  51. let edits = serde_json::to_string(&mutation.edits).unwrap();
  52. edits_tx.send(edits).unwrap();
  53. }
  54. }
  55. }
  56. });
  57. loop {
  58. use futures_util::future::{select, Either};
  59. match select(user_ws_rx.next(), edits_rx.next()).await {
  60. Either::Left((l, _)) => {
  61. if let Some(Ok(msg)) = l {
  62. if let Ok(Some(msg)) = msg.to_str().map(events::parse_ipc_message) {
  63. if msg.method == "user_event" {
  64. let user_event = events::trigger_from_serialized(msg.params);
  65. event_tx.send(user_event).unwrap();
  66. }
  67. } else {
  68. break;
  69. }
  70. } else {
  71. break;
  72. }
  73. }
  74. Either::Right((edits, _)) => {
  75. if let Some(edits) = edits {
  76. // send the edits to the client
  77. if user_ws_tx.send(Message::text(edits)).await.is_err() {
  78. break;
  79. }
  80. } else {
  81. break;
  82. }
  83. }
  84. }
  85. }
  86. vdom_fut.abort();
  87. }