warp_adapter.rs 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. use crate::{liveview_eventloop, LiveView, LiveViewError};
  2. use dioxus_core::prelude::*;
  3. use futures_util::{SinkExt, StreamExt};
  4. use warp::ws::{Message, WebSocket};
  5. impl LiveView {
  6. pub async fn upgrade_warp(
  7. self,
  8. ws: WebSocket,
  9. app: fn(Scope<()>) -> Element,
  10. ) -> Result<(), LiveViewError> {
  11. self.upgrade_warp_with_props(ws, app, ()).await
  12. }
  13. pub async fn upgrade_warp_with_props<T: Send + 'static>(
  14. self,
  15. ws: WebSocket,
  16. app: fn(Scope<T>) -> Element,
  17. props: T,
  18. ) -> Result<(), LiveViewError> {
  19. let (ws_tx, ws_rx) = ws.split();
  20. let ws_tx = ws_tx
  21. .with(transform_warp)
  22. .sink_map_err(|_| LiveViewError::SendingFailed);
  23. let ws_rx = ws_rx.map(transform_warp_rx);
  24. match self
  25. .pool
  26. .spawn_pinned(move || liveview_eventloop(app, props, ws_tx, ws_rx))
  27. .await
  28. {
  29. Ok(Ok(_)) => Ok(()),
  30. Ok(Err(e)) => Err(e),
  31. Err(_) => Err(LiveViewError::SendingFailed),
  32. }
  33. }
  34. }
  35. fn transform_warp_rx(f: Result<Message, warp::Error>) -> Result<String, LiveViewError> {
  36. // destructure the message into the buffer we got from warp
  37. let msg = f.map_err(|_| LiveViewError::SendingFailed)?.into_bytes();
  38. // transform it back into a string, saving us the allocation
  39. let msg = String::from_utf8(msg).map_err(|_| LiveViewError::SendingFailed)?;
  40. Ok(msg)
  41. }
  42. async fn transform_warp(message: String) -> Result<Message, warp::Error> {
  43. Ok(Message::text(message))
  44. }