axum_adapter.rs 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. use crate::events;
  2. use axum::extract::ws::{Message, WebSocket};
  3. use dioxus_core::prelude::*;
  4. use futures_util::{
  5. future::{select, Either},
  6. pin_mut, SinkExt, StreamExt,
  7. };
  8. use tokio::sync::mpsc;
  9. use tokio_stream::wrappers::UnboundedReceiverStream;
  10. use tokio_util::task::LocalPoolHandle;
  11. impl crate::Liveview {
  12. pub async fn upgrade_axum(&self, ws: WebSocket, app: fn(Scope) -> Element) {
  13. connect(ws, self.pool.clone(), app, ()).await;
  14. }
  15. pub async fn upgrade_axum_with_props<T>(
  16. &self,
  17. ws: WebSocket,
  18. app: fn(Scope<T>) -> Element,
  19. props: T,
  20. ) where
  21. T: Send + Sync + 'static,
  22. {
  23. connect(ws, self.pool.clone(), app, props).await;
  24. }
  25. }
  26. pub async fn connect<T>(
  27. socket: WebSocket,
  28. pool: LocalPoolHandle,
  29. app: fn(Scope<T>) -> Element,
  30. props: T,
  31. ) where
  32. T: Send + Sync + 'static,
  33. {
  34. let (mut user_ws_tx, mut user_ws_rx) = socket.split();
  35. let (event_tx, event_rx) = mpsc::unbounded_channel();
  36. let (edits_tx, edits_rx) = mpsc::unbounded_channel();
  37. let mut edits_rx = UnboundedReceiverStream::new(edits_rx);
  38. let mut event_rx = UnboundedReceiverStream::new(event_rx);
  39. let vdom_fut = pool.clone().spawn_pinned(move || async move {
  40. let mut vdom = VirtualDom::new_with_props(app, props);
  41. let edits = vdom.rebuild();
  42. let serialized = serde_json::to_string(&edits.edits).unwrap();
  43. edits_tx.send(serialized).unwrap();
  44. loop {
  45. let new_event = {
  46. let vdom_fut = vdom.wait_for_work();
  47. pin_mut!(vdom_fut);
  48. match select(event_rx.next(), vdom_fut).await {
  49. Either::Left((l, _)) => l,
  50. Either::Right((_, _)) => None,
  51. }
  52. };
  53. if let Some(new_event) = new_event {
  54. vdom.handle_message(dioxus_core::SchedulerMsg::Event(new_event));
  55. } else {
  56. let mutations = vdom.work_with_deadline(|| false);
  57. for mutation in mutations {
  58. let edits = serde_json::to_string(&mutation.edits).unwrap();
  59. edits_tx.send(edits).unwrap();
  60. }
  61. }
  62. }
  63. });
  64. loop {
  65. match select(user_ws_rx.next(), edits_rx.next()).await {
  66. Either::Left((l, _)) => {
  67. if let Some(Ok(msg)) = l {
  68. if let Ok(Some(msg)) = msg.to_text().map(events::parse_ipc_message) {
  69. let user_event = events::trigger_from_serialized(msg.params);
  70. event_tx.send(user_event).unwrap();
  71. } else {
  72. break;
  73. }
  74. } else {
  75. break;
  76. }
  77. }
  78. Either::Right((edits, _)) => {
  79. if let Some(edits) = edits {
  80. // send the edits to the client
  81. if user_ws_tx.send(Message::Text(edits)).await.is_err() {
  82. break;
  83. }
  84. } else {
  85. break;
  86. }
  87. }
  88. }
  89. }
  90. vdom_fut.abort();
  91. }