Răsfoiți Sursa

Merge pull request #1264 from Demonthos/spawn-pinned

Don't spawn a new tokio runtime every fullstack request
Jonathan Kelley 1 an în urmă
părinte
comite
b526fa3ebc
2 a modificat fișierele cu 29 adăugiri și 22 ștergeri
  1. 2 1
      packages/fullstack/Cargo.toml
  2. 27 21
      packages/fullstack/src/adapters/mod.rs

+ 2 - 1
packages/fullstack/Cargo.toml

@@ -41,6 +41,7 @@ log = { workspace = true }
 once_cell = "1.17.1"
 thiserror = { workspace = true }
 tokio = { workspace = true, features = ["full"], optional = true }
+tokio-util = { version = "0.7.8", features = ["rt"], optional = true }
 object-pool = "0.5.4"
 anymap = "0.12.1"
 
@@ -68,7 +69,7 @@ hot-reload = ["serde_json", "futures-util"]
 warp = ["dep:warp", "ssr"]
 axum = ["dep:axum", "tower-http", "ssr"]
 salvo = ["dep:salvo", "ssr"]
-ssr = ["server_fn/ssr", "tokio", "dioxus-ssr", "tower", "hyper", "http", "http-body", "dioxus-router/ssr", "tokio-stream"]
+ssr = ["server_fn/ssr", "tokio", "tokio-util", "dioxus-ssr", "tower", "hyper", "http", "http-body", "dioxus-router/ssr", "tokio-stream"]
 default-tls = ["server_fn/default-tls"]
 rustls = ["server_fn/rustls"]
 

+ 27 - 21
packages/fullstack/src/adapters/mod.rs

@@ -17,12 +17,9 @@ pub mod salvo_adapter;
 #[cfg(feature = "warp")]
 pub mod warp_adapter;
 
-use std::sync::{Arc, RwLock};
-
 use http::StatusCode;
-
 use server_fn::{Encoding, Payload};
-use tokio::task::spawn_blocking;
+use std::sync::{Arc, RwLock};
 
 use crate::{
     layer::{BoxedService, Service},
@@ -93,27 +90,22 @@ impl Service for ServerFnHandler {
 
             // Because the future returned by `server_fn_handler` is `Send`, and the future returned by this function must be send, we need to spawn a new runtime
             let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
-            spawn_blocking({
+            let pool = get_local_pool();
+            pool.spawn_pinned({
                 let function = function.clone();
                 let mut server_context = server_context.clone();
                 server_context.parts = parts;
-                move || {
-                    tokio::runtime::Runtime::new()
-                        .expect("couldn't spawn runtime")
-                        .block_on(async move {
-                            let data = match function.encoding() {
-                                Encoding::Url | Encoding::Cbor => &body,
-                                Encoding::GetJSON | Encoding::GetCBOR => &query,
-                            };
-                            let server_function_future = function.call((), data);
-                            let server_function_future = ProvideServerContext::new(
-                                server_function_future,
-                                server_context.clone(),
-                            );
-                            let resp = server_function_future.await;
+                move || async move {
+                    let data = match function.encoding() {
+                        Encoding::Url | Encoding::Cbor => &body,
+                        Encoding::GetJSON | Encoding::GetCBOR => &query,
+                    };
+                    let server_function_future = function.call((), data);
+                    let server_function_future =
+                        ProvideServerContext::new(server_function_future, server_context.clone());
+                    let resp = server_function_future.await;
 
-                            resp_tx.send(resp).unwrap();
-                        })
+                    resp_tx.send(resp).unwrap();
                 }
             });
             let result = resp_rx.await.unwrap();
@@ -158,3 +150,17 @@ impl Service for ServerFnHandler {
         })
     }
 }
+
+fn get_local_pool() -> tokio_util::task::LocalPoolHandle {
+    use once_cell::sync::OnceCell;
+    static LOCAL_POOL: OnceCell<tokio_util::task::LocalPoolHandle> = OnceCell::new();
+    LOCAL_POOL
+        .get_or_init(|| {
+            tokio_util::task::LocalPoolHandle::new(
+                std::thread::available_parallelism()
+                    .map(Into::into)
+                    .unwrap_or(1),
+            )
+        })
+        .clone()
+}