瀏覽代碼

Fix warp fullstack adapter

Evan Almloff 1 年之前
父節點
當前提交
831bcfd8e7
共有 2 個文件被更改,包括 41 次插入125 次删除
  1. 0 2
      packages/fullstack/examples/warp-hello-world/src/main.rs
  2. 41 123
      packages/fullstack/src/adapters/warp_adapter.rs

+ 0 - 2
packages/fullstack/examples/warp-hello-world/src/main.rs

@@ -50,8 +50,6 @@ fn app(cx: Scope<AppProps>) -> Element {
 async fn post_server_data(data: String) -> Result<(), ServerFnError> {
     // The server context contains information about the current request and allows you to modify the response.
     let cx = server_context();
-    cx.response_headers_mut()
-        .insert("Set-Cookie", "foo=bar".parse().unwrap());
     println!("Server received: {}", data);
     println!("Request parts are {:?}", cx.request_parts());
 

+ 41 - 123
packages/fullstack/src/adapters/warp_adapter.rs

@@ -46,15 +46,19 @@
 //!
 //! ```
 
+use crate::layer::Service;
 use crate::{
     prelude::*, render::SSRState, serve_config::ServeConfig, server_fn::DioxusServerFnRegistry,
 };
 
+use crate::server_fn_service;
 use server_fn::{Encoding, Payload, ServerFunctionRegistry};
 use std::error::Error;
 use std::sync::Arc;
+use std::sync::RwLock;
 use tokio::task::spawn_blocking;
 use warp::path::FullPath;
+use warp::Rejection;
 use warp::{
     filters::BoxedFilter,
     http::{Response, StatusCode},
@@ -70,19 +74,22 @@ use warp::{
 ///
 /// #[tokio::main]
 /// async fn main() {
-///     let routes = register_server_fns_with_handler("", |full_route, func| {
+///     let routes = register_server_fns_with_handler(server_fn_route, |full_route, func| {
 ///         path(full_route)
-///             .and(post())
-///             .and(header::headers_cloned())
-///             .and(body::bytes())
-///             .and_then(move |headers: HeaderMap, body| {
-///                 let func = func.clone();
-///                 async move {
-///                     // Add the headers to the server function context
-///                     server_fn_handler((headers.clone(),), func, headers, body).await
-///                 }
-///             })
-///     });
+///         .and(warp::post().or(warp::get()).unify())
+///         .and(request_parts())
+///         .and(warp::body::bytes())
+///         .and_then(move |parts, bytes: bytes::Bytes| {
+///             let mut service = server_fn_service(DioxusServerContext::default(), func.clone());
+///             async move {
+///                 let req = warp::hyper::Request::from_parts(parts, bytes.into());
+///                 service.run(req).await.map_err(|err| {
+///                     log::error!("Server function error: {}", err);
+///                     warp::reject::reject()
+///                 })
+///             }
+///         })
+/// })
 ///     warp::serve(routes).run(([127, 0, 0, 1], 8080)).await;
 /// }
 /// ```
@@ -130,10 +137,14 @@ pub fn register_server_fns(server_fn_route: &'static str) -> BoxedFilter<(impl R
             .and(warp::post().or(warp::get()).unify())
             .and(request_parts())
             .and(warp::body::bytes())
-            .and_then(move |parts, bytes| {
-                let func = func.clone();
+            .and_then(move |parts, bytes: bytes::Bytes| {
+                let mut service = server_fn_service(DioxusServerContext::default(), func.clone());
                 async move {
-                    server_fn_handler(DioxusServerContext::default(), func, parts, bytes).await
+                    let req = warp::hyper::Request::from_parts(parts, bytes.into());
+                    service.run(req).await.map_err(|err| {
+                        log::error!("Server function error: {}", err);
+                        warp::reject::reject()
+                    })
                 }
             })
     })
@@ -185,9 +196,9 @@ pub fn render_ssr<P: Clone + serde::Serialize + Send + Sync + 'static>(
     warp::get()
         .and(request_parts())
         .and(with_ssr_state(&cfg))
-        .then(move |parts: RequestParts, renderer: SSRState| {
+        .then(move |parts: http::request::Parts, renderer: SSRState| {
             let route = parts.uri.path().to_string();
-            let parts = Arc::new(parts);
+            let parts = Arc::new(RwLock::new(parts));
             let cfg = cfg.clone();
             async move {
                 let server_context = DioxusServerContext::new(parts);
@@ -202,7 +213,10 @@ pub fn render_ssr<P: Clone + serde::Serialize + Send + Sync + 'static>(
                             .unwrap();
 
                         let headers_mut = res.headers_mut();
-                        *headers_mut = server_context.take_response_headers();
+                        let headers = server_context.response_parts().unwrap().headers.clone();
+                        for (key, value) in headers.iter() {
+                            headers_mut.insert(key, value.clone());
+                        }
                         freshness.write(headers_mut);
 
                         res
@@ -221,7 +235,7 @@ pub fn render_ssr<P: Clone + serde::Serialize + Send + Sync + 'static>(
 
 /// An extractor for the request parts (used in [DioxusServerContext]). This will extract the method, uri, query, and headers from the request.
 pub fn request_parts(
-) -> impl Filter<Extract = (RequestParts,), Error = warp::reject::Rejection> + Clone {
+) -> impl Filter<Extract = (http::request::Parts,), Error = warp::reject::Rejection> + Clone {
     warp::method()
         .and(warp::filters::path::full())
         .and(
@@ -237,11 +251,14 @@ pub fn request_parts(
                 .map_err(|err| {
                     warp::reject::custom(FailedToReadBody(format!("Failed to build uri: {}", err)))
                 })
-                .map(|uri| RequestParts {
-                    method,
-                    uri,
-                    headers,
-                    ..Default::default()
+                .map(|uri| {
+                    let mut req = http::Request::builder()
+                        .method(method)
+                        .uri(uri)
+                        .body(())
+                        .unwrap();
+                    req.headers_mut().extend(headers);
+                    req.into_parts().0
                 })
         })
 }
@@ -263,105 +280,6 @@ struct RecieveFailed(String);
 
 impl warp::reject::Reject for RecieveFailed {}
 
-/// A default handler for server functions. It will deserialize the request body, call the server function, and serialize the response.
-pub async fn server_fn_handler(
-    server_context: impl Into<(DioxusServerContext)>,
-    function: server_fn::ServerFnTraitObj<()>,
-    parts: RequestParts,
-    body: Bytes,
-) -> Result<Box<dyn warp::Reply>, warp::Rejection> {
-    let mut server_context = server_context.into();
-
-    let parts = Arc::new(parts);
-
-    server_context.parts = parts.clone();
-
-    // Because the future returned by `server_fn_handler` is `Send`, and the future returned by this function must be send, we need to spawn a new runtime
-    let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
-    spawn_blocking({
-        move || {
-            tokio::runtime::Runtime::new()
-                .expect("couldn't spawn runtime")
-                .block_on(async move {
-                    let query = parts
-                        .uri
-                        .query()
-                        .unwrap_or_default()
-                        .as_bytes()
-                        .to_vec()
-                        .into();
-                    let data = match function.encoding() {
-                        Encoding::Url | Encoding::Cbor => &body,
-                        Encoding::GetJSON | Encoding::GetCBOR => &query,
-                    };
-                    let server_function_future = function.call((), data);
-                    let server_function_future =
-                        ProvideServerContext::new(server_function_future, server_context.clone());
-                    let resp = match server_function_future.await {
-                        Ok(serialized) => {
-                            // if this is Accept: application/json then send a serialized JSON response
-                            let accept_header = parts
-                                .headers
-                                .get("Accept")
-                                .as_ref()
-                                .and_then(|value| value.to_str().ok());
-                            let mut res = Response::builder();
-
-                            *res.headers_mut().expect("empty request should be valid") =
-                                server_context.take_response_headers();
-
-                            if accept_header == Some("application/json")
-                                || accept_header
-                                    == Some(
-                                        "application/\
-                                            x-www-form-urlencoded",
-                                    )
-                                || accept_header == Some("application/cbor")
-                            {
-                                res = res.status(StatusCode::OK);
-                            }
-
-                            let resp = match serialized {
-                                Payload::Binary(data) => res
-                                    .header("Content-Type", "application/cbor")
-                                    .body(Bytes::from(data)),
-                                Payload::Url(data) => res
-                                    .header(
-                                        "Content-Type",
-                                        "application/\
-                                        x-www-form-urlencoded",
-                                    )
-                                    .body(Bytes::from(data)),
-                                Payload::Json(data) => res
-                                    .header("Content-Type", "application/json")
-                                    .body(Bytes::from(data)),
-                            };
-
-                            Box::new(resp.unwrap())
-                        }
-                        Err(e) => report_err(e),
-                    };
-
-                    if resp_tx.send(resp).is_err() {
-                        eprintln!("Error sending response");
-                    }
-                })
-        }
-    });
-    resp_rx.await.map_err(|err| {
-        warp::reject::custom(RecieveFailed(format!("Failed to recieve response {err}")))
-    })
-}
-
-fn report_err<E: Error>(e: E) -> Box<dyn warp::Reply> {
-    Box::new(
-        Response::builder()
-            .status(StatusCode::INTERNAL_SERVER_ERROR)
-            .body(format!("Error: {}", e))
-            .unwrap(),
-    ) as Box<dyn warp::Reply>
-}
-
 /// Register the web RSX hot reloading endpoint. This will enable hot reloading for your application in debug mode when you call [`dioxus_hot_reload::hot_reload_init`].
 ///
 /// # Example