1
0
Эх сурвалжийг харах

make the video streaming async

Evan Almloff 1 жил өмнө
parent
commit
dfb118e06c

+ 1 - 0
.gitignore

@@ -4,6 +4,7 @@
 /dist
 Cargo.lock
 .DS_Store
+/examples/assets/test_video.mp4
 
 .vscode/*
 !.vscode/settings.json

+ 57 - 51
examples/video_stream.rs

@@ -4,56 +4,64 @@ use dioxus_desktop::wry::http::Response;
 use dioxus_desktop::{use_asset_handler, AssetRequest};
 use http::{header::*, response::Builder as ResponseBuilder, status::StatusCode};
 use std::borrow::Cow;
-use std::sync::{Arc, Mutex};
-use std::{
-    io::{Read, Seek, SeekFrom, Write},
-    path::PathBuf,
-};
+use std::{io::SeekFrom, path::PathBuf};
+use tokio::io::AsyncReadExt;
+use tokio::io::AsyncSeekExt;
+use tokio::io::AsyncWriteExt;
+
+const VIDEO_PATH: &str = "./examples/assets/test_video.mp4";
 
 fn main() {
+    let video_file = PathBuf::from(VIDEO_PATH);
+    if !video_file.exists() {
+        tokio::runtime::Runtime::new()
+            .unwrap()
+            .block_on(async move {
+                println!("Downloading video file...");
+                let video_url =
+                    "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4";
+                let mut response = reqwest::get(video_url).await.unwrap();
+                let mut file = tokio::fs::File::create(&video_file).await.unwrap();
+                while let Some(chunk) = response.chunk().await.unwrap() {
+                    file.write_all(&chunk).await.unwrap();
+                }
+            });
+    }
     dioxus_desktop::launch(app);
 }
 
 fn app(cx: Scope) -> Element {
-    let id = Arc::new(Mutex::new(0));
     use_asset_handler(cx, move |request: &AssetRequest| {
-        let response: Option<Response<Cow<'static, [u8]>>> = match get_stream_response(request, &id)
-        {
-            Ok(response) => Some(response.map(Cow::Owned)),
-            Err(err) => {
-                eprintln!("Error: {}", err);
-                None
-            }
-        };
-        async move { response }
+        let request = request.clone();
+        async move {
+            let video_file = PathBuf::from(VIDEO_PATH);
+            let mut file = tokio::fs::File::open(&video_file).await.unwrap();
+            let response: Option<Response<Cow<'static, [u8]>>> =
+                match get_stream_response(&mut file, &request).await {
+                    Ok(response) => Some(response.map(Cow::Owned)),
+                    Err(err) => {
+                        eprintln!("Error: {}", err);
+                        None
+                    }
+                };
+            response
+        }
     });
 
-    cx.render(rsx! {
-        div {
-            video {
-                src: "test_video.mp4",
-                autoplay: true,
-                controls: true,
-                width: 640,
-                height: 480,
-            }
-        }
-    })
+    render! {
+        div { video { src: "test_video.mp4", autoplay: true, controls: true, width: 640, height: 480 } }
+    }
 }
 
-fn get_stream_response(
+async fn get_stream_response(
+    asset: &mut (impl tokio::io::AsyncSeek + tokio::io::AsyncRead + Unpin + Send + Sync),
     request: &AssetRequest,
-    boundary_id: &Arc<Mutex<i32>>,
 ) -> Result<Response<Vec<u8>>, Box<dyn std::error::Error>> {
-    let video_file = PathBuf::from("./examples/assets/test_video.mp4");
-
-    let mut file = std::fs::File::open(&video_file)?;
-
-    // get file length
+    // get stream length
     let len = {
-        let old_pos = file.stream_position()?;
-        let len = file.seek(SeekFrom::End(0))?;
-        file.seek(SeekFrom::Start(old_pos))?;
+        let old_pos = asset.stream_position().await?;
+        let len = asset.seek(SeekFrom::End(0)).await?;
+        asset.seek(SeekFrom::Start(old_pos)).await?;
         len
     };
 
@@ -103,9 +111,9 @@ fn get_stream_response(
             // allocate a buf with a suitable capacity
             let mut buf = Vec::with_capacity(bytes_to_read as usize);
             // seek the file to the starting byte
-            file.seek(SeekFrom::Start(start))?;
+            asset.seek(SeekFrom::Start(start)).await?;
             // read the needed bytes
-            file.take(bytes_to_read).read_to_end(&mut buf)?;
+            asset.take(bytes_to_read).read_to_end(&mut buf).await?;
 
             resp = resp.header(CONTENT_RANGE, format!("bytes {start}-{end}/{len}"));
             resp = resp.header(CONTENT_LENGTH, end + 1 - start);
@@ -130,9 +138,7 @@ fn get_stream_response(
                 })
                 .collect::<Vec<_>>();
 
-            let mut id = boundary_id.lock().unwrap();
-            *id += 1;
-            let boundary = format!("sadasq2e{id}");
+            let boundary = format!("{:x}", rand::random::<u64>());
             let boundary_sep = format!("\r\n--{boundary}\r\n");
             let boundary_closer = format!("\r\n--{boundary}\r\n");
 
@@ -143,34 +149,34 @@ fn get_stream_response(
 
             for (end, start) in ranges {
                 // a new range is being written, write the range boundary
-                buf.write_all(boundary_sep.as_bytes())?;
+                buf.write_all(boundary_sep.as_bytes()).await?;
 
                 // write the needed headers `Content-Type` and `Content-Range`
-                buf.write_all(format!("{CONTENT_TYPE}: video/mp4\r\n").as_bytes())?;
-                buf.write_all(
-                    format!("{CONTENT_RANGE}: bytes {start}-{end}/{len}\r\n").as_bytes(),
-                )?;
+                buf.write_all(format!("{CONTENT_TYPE}: video/mp4\r\n").as_bytes())
+                    .await?;
+                buf.write_all(format!("{CONTENT_RANGE}: bytes {start}-{end}/{len}\r\n").as_bytes())
+                    .await?;
 
                 // write the separator to indicate the start of the range body
-                buf.write_all("\r\n".as_bytes())?;
+                buf.write_all("\r\n".as_bytes()).await?;
 
                 // calculate number of bytes needed to be read
                 let bytes_to_read = end + 1 - start;
 
                 let mut local_buf = vec![0_u8; bytes_to_read as usize];
-                file.seek(SeekFrom::Start(start))?;
-                file.read_exact(&mut local_buf)?;
+                asset.seek(SeekFrom::Start(start)).await?;
+                asset.read_exact(&mut local_buf).await?;
                 buf.extend_from_slice(&local_buf);
             }
             // all ranges have been written, write the closing boundary
-            buf.write_all(boundary_closer.as_bytes())?;
+            buf.write_all(boundary_closer.as_bytes()).await?;
 
             resp.body(buf)
         }
     } else {
         resp = resp.header(CONTENT_LENGTH, len);
         let mut buf = Vec::with_capacity(len as usize);
-        file.read_to_end(&mut buf)?;
+        asset.read_to_end(&mut buf).await?;
         resp.body(buf)
     };
 

+ 7 - 9
packages/desktop/src/protocol.rs

@@ -4,7 +4,7 @@ use slab::Slab;
 use std::{
     borrow::Cow,
     future::Future,
-    ops::{Deref, DerefMut},
+    ops::Deref,
     path::{Path, PathBuf},
     pin::Pin,
     rc::Rc,
@@ -72,10 +72,11 @@ pub type AssetResponse = Response<Cow<'static, [u8]>>;
 pub trait AssetFuture: Future<Output = Option<AssetResponse>> + Send + Sync + 'static {}
 impl<T: Future<Output = Option<AssetResponse>> + Send + Sync + 'static> AssetFuture for T {}
 
+#[derive(Debug, Clone)]
 /// A request for an asset. This is a wrapper around [`Request<Vec<u8>>`] that provides methods specific to asset requests.
 pub struct AssetRequest {
     path: PathBuf,
-    request: Request<Vec<u8>>,
+    request: Arc<Request<Vec<u8>>>,
 }
 
 impl AssetRequest {
@@ -90,7 +91,10 @@ impl From<Request<Vec<u8>>> for AssetRequest {
         let decoded = urlencoding::decode(request.uri().path().trim_start_matches('/'))
             .expect("expected URL to be UTF-8 encoded");
         let path = PathBuf::from(&*decoded);
-        Self { request, path }
+        Self {
+            request: Arc::new(request),
+            path,
+        }
     }
 }
 
@@ -102,12 +106,6 @@ impl Deref for AssetRequest {
     }
 }
 
-impl DerefMut for AssetRequest {
-    fn deref_mut(&mut self) -> &mut Self::Target {
-        &mut self.request
-    }
-}
-
 /// A handler that takes an [`AssetRequest`] and returns a future that either loads the asset, or returns `None`.
 /// This handler is stashed indefinitely in a context object, so it must be `'static`.
 pub trait AssetHandler<F: AssetFuture>: Send + Sync + 'static {