warp_adapter.rs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. //! Dioxus utilities for the [Warp](https://docs.rs/warp/latest/warp/index.html) server framework.
  2. //!
  3. //! # Example
  4. //! ```rust
  5. //! #![allow(non_snake_case)]
  6. //! use dioxus::prelude::*;
  7. //! use dioxus_fullstack::prelude::*;
  8. //!
  9. //! fn main() {
  10. //! #[cfg(feature = "web")]
  11. //! dioxus_web::launch_cfg(app, dioxus_web::Config::new().hydrate(true));
  12. //! #[cfg(feature = "ssr")]
  13. //! {
  14. //! GetServerData::register().unwrap();
  15. //! tokio::runtime::Runtime::new()
  16. //! .unwrap()
  17. //! .block_on(async move {
  18. //! let routes = serve_dioxus_application("", ServeConfigBuilder::new(app, ()));
  19. //! warp::serve(routes).run(([127, 0, 0, 1], 8080)).await;
  20. //! });
  21. //! }
  22. //! }
  23. //!
  24. //! fn app(cx: Scope) -> Element {
  25. //! let text = use_state(cx, || "...".to_string());
  26. //!
  27. //! cx.render(rsx! {
  28. //! button {
  29. //! onclick: move |_| {
  30. //! to_owned![text];
  31. //! async move {
  32. //! if let Ok(data) = get_server_data().await {
  33. //! text.set(data);
  34. //! }
  35. //! }
  36. //! },
  37. //! "Run a server function"
  38. //! }
  39. //! "Server said: {text}"
  40. //! })
  41. //! }
  42. //!
  43. //! #[server(GetServerData)]
  44. //! async fn get_server_data() -> Result<String, ServerFnError> {
  45. //! Ok("Hello from the server!".to_string())
  46. //! }
  47. //!
  48. //! ```
  49. use crate::{
  50. prelude::*, render::SSRState, serve_config::ServeConfig, server_fn::DioxusServerFnRegistry,
  51. };
  52. use dioxus_core::VirtualDom;
  53. use server_fn::{Encoding, Payload, ServerFunctionRegistry};
  54. use std::error::Error;
  55. use std::sync::Arc;
  56. use tokio::task::spawn_blocking;
  57. use warp::path::FullPath;
  58. use warp::{
  59. filters::BoxedFilter,
  60. http::{Response, StatusCode},
  61. hyper::body::Bytes,
  62. path, Filter, Reply,
  63. };
  64. /// Registers server functions with a custom handler function. This allows you to pass custom context to your server functions by generating a [`DioxusServerContext`] from the request.
  65. ///
  66. /// # Example
  67. /// ```rust
  68. /// use warp::{body, header, hyper::HeaderMap, path, post, Filter};
  69. ///
  70. /// #[tokio::main]
  71. /// async fn main() {
  72. /// let routes = register_server_fns_with_handler("", |full_route, func| {
  73. /// path(full_route)
  74. /// .and(post())
  75. /// .and(header::headers_cloned())
  76. /// .and(body::bytes())
  77. /// .and_then(move |headers: HeaderMap, body| {
  78. /// let func = func.clone();
  79. /// async move {
  80. /// // Add the headers to the server function context
  81. /// server_fn_handler((headers.clone(),), func, headers, body).await
  82. /// }
  83. /// })
  84. /// });
  85. /// warp::serve(routes).run(([127, 0, 0, 1], 8080)).await;
  86. /// }
  87. /// ```
  88. pub fn register_server_fns_with_handler<H, F, R>(
  89. server_fn_route: &'static str,
  90. mut handler: H,
  91. ) -> BoxedFilter<(R,)>
  92. where
  93. H: FnMut(String, ServerFunction) -> F,
  94. F: Filter<Extract = (R,), Error = warp::Rejection> + Send + Sync + 'static,
  95. F::Extract: Send,
  96. R: Reply + 'static,
  97. {
  98. let mut filter: Option<BoxedFilter<F::Extract>> = None;
  99. for server_fn_path in DioxusServerFnRegistry::paths_registered() {
  100. let func = DioxusServerFnRegistry::get(server_fn_path).unwrap();
  101. let full_route = format!("{server_fn_route}/{server_fn_path}")
  102. .trim_start_matches('/')
  103. .to_string();
  104. let route = handler(full_route, func.clone()).boxed();
  105. if let Some(boxed_filter) = filter.take() {
  106. filter = Some(boxed_filter.or(route).unify().boxed());
  107. } else {
  108. filter = Some(route);
  109. }
  110. }
  111. filter.expect("No server functions found")
  112. }
  113. /// Registers server functions with the default handler. This handler function will pass an empty [`DioxusServerContext`] to your server functions.
  114. ///
  115. /// # Example
  116. /// ```rust
  117. /// use dioxus_fullstack::prelude::*;
  118. ///
  119. /// #[tokio::main]
  120. /// async fn main() {
  121. /// let routes = register_server_fns("");
  122. /// warp::serve(routes).run(([127, 0, 0, 1], 8080)).await;
  123. /// }
  124. /// ```
  125. pub fn register_server_fns(server_fn_route: &'static str) -> BoxedFilter<(impl Reply,)> {
  126. register_server_fns_with_handler(server_fn_route, |full_route, func| {
  127. path(full_route.clone())
  128. .and(warp::post().or(warp::get()).unify())
  129. .and(request_parts())
  130. .and(warp::body::bytes())
  131. .and_then(move |parts, bytes| {
  132. let func = func.clone();
  133. async move {
  134. server_fn_handler(DioxusServerContext::default(), func, parts, bytes).await
  135. }
  136. })
  137. })
  138. }
  139. /// Serves the Dioxus application. This will serve a complete server side rendered application.
  140. /// This will serve static assets, server render the application, register server functions, and intigrate with hot reloading.
  141. ///
  142. /// # Example
  143. /// ```rust
  144. /// #![allow(non_snake_case)]
  145. /// use dioxus::prelude::*;
  146. /// use dioxus_fullstack::prelude::*;
  147. ///
  148. /// #[tokio::main]
  149. /// async fn main() {
  150. /// let routes = serve_dioxus_application("", ServeConfigBuilder::new(app, ()));
  151. /// warp::serve(routes).run(([127, 0, 0, 1], 8080)).await;
  152. /// }
  153. ///
  154. /// fn app(cx: Scope) -> Element {
  155. /// todo!()
  156. /// }
  157. /// ```
  158. pub fn serve_dioxus_application<P: Clone + serde::Serialize + Send + Sync + 'static>(
  159. server_fn_route: &'static str,
  160. cfg: impl Into<ServeConfig<P>>,
  161. ) -> BoxedFilter<(impl Reply,)> {
  162. let cfg = cfg.into();
  163. // Serve the dist folder and the index.html file
  164. let serve_dir = warp::fs::dir(cfg.assets_path);
  165. connect_hot_reload()
  166. .or(register_server_fns(server_fn_route))
  167. .or(warp::path::end().and(render_ssr(cfg)))
  168. .or(serve_dir)
  169. .boxed()
  170. }
  171. /// Server render the application.
  172. pub fn render_ssr<P: Clone + serde::Serialize + Send + Sync + 'static>(
  173. cfg: ServeConfig<P>,
  174. ) -> impl Filter<Extract = (impl Reply,), Error = warp::Rejection> + Clone {
  175. warp::get()
  176. .and(request_parts())
  177. .and(with_ssr_state())
  178. .map(move |parts, renderer: SSRState| {
  179. let parts = Arc::new(parts);
  180. let server_context = DioxusServerContext::new(parts);
  181. let mut vdom = VirtualDom::new_with_props(cfg.app, cfg.props.clone())
  182. .with_root_context(server_context.clone());
  183. let _ = vdom.rebuild();
  184. let html = renderer.render_vdom(&vdom, &cfg);
  185. let mut res = Response::builder();
  186. *res.headers_mut().expect("empty request should be valid") =
  187. server_context.take_response_headers();
  188. res.header("Content-Type", "text/html")
  189. .body(Bytes::from(html))
  190. .unwrap()
  191. })
  192. }
  193. /// An extractor for the request parts (used in [DioxusServerContext]). This will extract the method, uri, query, and headers from the request.
  194. pub fn request_parts(
  195. ) -> impl Filter<Extract = (RequestParts,), Error = warp::reject::Rejection> + Clone {
  196. warp::method()
  197. .and(warp::filters::path::full())
  198. .and(
  199. warp::filters::query::raw()
  200. .or(warp::any().map(String::new))
  201. .unify(),
  202. )
  203. .and(warp::header::headers_cloned())
  204. .and_then(move |method, path: FullPath, query, headers| async move {
  205. http::uri::Builder::new()
  206. .path_and_query(format!("{}?{}", path.as_str(), query))
  207. .build()
  208. .map_err(|err| {
  209. warp::reject::custom(FailedToReadBody(format!("Failed to build uri: {}", err)))
  210. })
  211. .map(|uri| RequestParts {
  212. method,
  213. uri,
  214. headers,
  215. ..Default::default()
  216. })
  217. })
  218. }
  219. fn with_ssr_state() -> impl Filter<Extract = (SSRState,), Error = std::convert::Infallible> + Clone
  220. {
  221. let state = SSRState::default();
  222. warp::any().map(move || state.clone())
  223. }
  224. #[derive(Debug)]
  225. struct FailedToReadBody(String);
  226. impl warp::reject::Reject for FailedToReadBody {}
  227. #[derive(Debug)]
  228. struct RecieveFailed(String);
  229. impl warp::reject::Reject for RecieveFailed {}
  230. /// A default handler for server functions. It will deserialize the request body, call the server function, and serialize the response.
  231. pub async fn server_fn_handler(
  232. server_context: impl Into<DioxusServerContext>,
  233. function: ServerFunction,
  234. parts: RequestParts,
  235. body: Bytes,
  236. ) -> Result<Box<dyn warp::Reply>, warp::Rejection> {
  237. let mut server_context = server_context.into();
  238. let parts = Arc::new(parts);
  239. server_context.parts = parts.clone();
  240. // Because the future returned by `server_fn_handler` is `Send`, and the future returned by this function must be send, we need to spawn a new runtime
  241. let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
  242. spawn_blocking({
  243. move || {
  244. tokio::runtime::Runtime::new()
  245. .expect("couldn't spawn runtime")
  246. .block_on(async move {
  247. let query = parts
  248. .uri
  249. .query()
  250. .unwrap_or_default()
  251. .as_bytes()
  252. .to_vec()
  253. .into();
  254. let data = match &function.encoding {
  255. Encoding::Url | Encoding::Cbor => &body,
  256. Encoding::GetJSON | Encoding::GetCBOR => &query,
  257. };
  258. let resp = match (function.trait_obj)(server_context.clone(), data).await {
  259. Ok(serialized) => {
  260. // if this is Accept: application/json then send a serialized JSON response
  261. let accept_header = parts
  262. .headers
  263. .get("Accept")
  264. .as_ref()
  265. .and_then(|value| value.to_str().ok());
  266. let mut res = Response::builder();
  267. *res.headers_mut().expect("empty request should be valid") =
  268. server_context.take_response_headers();
  269. if accept_header == Some("application/json")
  270. || accept_header
  271. == Some(
  272. "application/\
  273. x-www-form-urlencoded",
  274. )
  275. || accept_header == Some("application/cbor")
  276. {
  277. res = res.status(StatusCode::OK);
  278. }
  279. let resp = match serialized {
  280. Payload::Binary(data) => res
  281. .header("Content-Type", "application/cbor")
  282. .body(Bytes::from(data)),
  283. Payload::Url(data) => res
  284. .header(
  285. "Content-Type",
  286. "application/\
  287. x-www-form-urlencoded",
  288. )
  289. .body(Bytes::from(data)),
  290. Payload::Json(data) => res
  291. .header("Content-Type", "application/json")
  292. .body(Bytes::from(data)),
  293. };
  294. Box::new(resp.unwrap())
  295. }
  296. Err(e) => report_err(e),
  297. };
  298. if resp_tx.send(resp).is_err() {
  299. eprintln!("Error sending response");
  300. }
  301. })
  302. }
  303. });
  304. resp_rx.await.map_err(|err| {
  305. warp::reject::custom(RecieveFailed(format!("Failed to recieve response {err}")))
  306. })
  307. }
  308. fn report_err<E: Error>(e: E) -> Box<dyn warp::Reply> {
  309. Box::new(
  310. Response::builder()
  311. .status(StatusCode::INTERNAL_SERVER_ERROR)
  312. .body(format!("Error: {}", e))
  313. .unwrap(),
  314. ) as Box<dyn warp::Reply>
  315. }
  316. /// Register the web RSX hot reloading endpoint. This will enable hot reloading for your application in debug mode when you call [`dioxus_hot_reload::hot_reload_init`].
  317. ///
  318. /// # Example
  319. /// ```rust
  320. /// #![allow(non_snake_case)]
  321. /// use dioxus_fullstack::prelude::*;
  322. ///
  323. /// #[tokio::main]
  324. /// async fn main() {
  325. /// let routes = connect_hot_reload();
  326. /// warp::serve(routes).run(([127, 0, 0, 1], 8080)).await;
  327. /// }
  328. /// ```
  329. pub fn connect_hot_reload() -> impl Filter<Extract = (impl Reply,), Error = warp::Rejection> + Clone
  330. {
  331. #[cfg(not(all(debug_assertions, feature = "hot-reload", feature = "ssr")))]
  332. {
  333. warp::path!("_dioxus" / "hot_reload")
  334. .and(warp::ws())
  335. .map(warp::reply)
  336. .map(|reply| warp::reply::with_status(reply, warp::http::StatusCode::NOT_FOUND));
  337. }
  338. #[cfg(all(debug_assertions, feature = "hot-reload", feature = "ssr"))]
  339. {
  340. use crate::hot_reload::HotReloadState;
  341. use futures_util::sink::SinkExt;
  342. use futures_util::StreamExt;
  343. use warp::ws::Message;
  344. let hot_reload = warp::path!("_dioxus" / "hot_reload")
  345. .and(warp::any().then(|| crate::hot_reload::spawn_hot_reload()))
  346. .and(warp::ws())
  347. .map(move |state: &'static HotReloadState, ws: warp::ws::Ws| {
  348. #[cfg(all(debug_assertions, feature = "hot-reload", feature = "ssr"))]
  349. ws.on_upgrade(move |mut websocket| {
  350. async move {
  351. println!("🔥 Hot Reload WebSocket connected");
  352. {
  353. // update any rsx calls that changed before the websocket connected.
  354. {
  355. println!("🔮 Finding updates since last compile...");
  356. let templates_read = state.templates.read().await;
  357. for template in &*templates_read {
  358. if websocket
  359. .send(Message::text(
  360. serde_json::to_string(&template).unwrap(),
  361. ))
  362. .await
  363. .is_err()
  364. {
  365. return;
  366. }
  367. }
  368. }
  369. println!("finished");
  370. }
  371. let mut rx = tokio_stream::wrappers::WatchStream::from_changes(
  372. state.message_receiver.clone(),
  373. );
  374. while let Some(change) = rx.next().await {
  375. if let Some(template) = change {
  376. let template = { serde_json::to_string(&template).unwrap() };
  377. if websocket.send(Message::text(template)).await.is_err() {
  378. break;
  379. };
  380. }
  381. }
  382. }
  383. })
  384. });
  385. let disconnect =
  386. warp::path!("_dioxus" / "disconnect")
  387. .and(warp::ws())
  388. .map(move |ws: warp::ws::Ws| {
  389. println!("disconnect");
  390. #[cfg(all(debug_assertions, feature = "hot-reload", feature = "ssr"))]
  391. ws.on_upgrade(move |mut websocket| async move {
  392. struct DisconnectOnDrop(Option<warp::ws::WebSocket>);
  393. impl Drop for DisconnectOnDrop {
  394. fn drop(&mut self) {
  395. let _ = self.0.take().unwrap().close();
  396. }
  397. }
  398. let _ = websocket.send(Message::text("connected")).await;
  399. let mut ws = DisconnectOnDrop(Some(websocket));
  400. loop {
  401. if ws.0.as_mut().unwrap().next().await.is_none() {
  402. break;
  403. }
  404. }
  405. })
  406. });
  407. disconnect.or(hot_reload)
  408. }
  409. }