Bläddra i källkod

Merge pull request #1727 from ealmloff/video-streaming

Add a video streaming example
Jonathan Kelley 1 år sedan
förälder
incheckning
318cae7bf8
4 ändrade filer med 193 tillägg och 9 borttagningar
  1. 1 0
      .gitignore
  2. 1 0
      Cargo.toml
  3. 184 0
      examples/video_stream.rs
  4. 7 9
      packages/desktop/src/protocol.rs

+ 1 - 0
.gitignore

@@ -4,6 +4,7 @@
 /dist
 Cargo.lock
 .DS_Store
+/examples/assets/test_video.mp4
 
 .vscode/*
 !.vscode/settings.json

+ 1 - 0
Cargo.toml

@@ -133,3 +133,4 @@ fern = { version = "0.6.0", features = ["colored"] }
 env_logger = "0.10.0"
 simple_logger = "4.0.0"
 thiserror = { workspace = true }
+http-range = "0.1.5"

+ 184 - 0
examples/video_stream.rs

@@ -0,0 +1,184 @@
+use dioxus::prelude::*;
+use dioxus_desktop::wry::http;
+use dioxus_desktop::wry::http::Response;
+use dioxus_desktop::{use_asset_handler, AssetRequest};
+use http::{header::*, response::Builder as ResponseBuilder, status::StatusCode};
+use std::borrow::Cow;
+use std::{io::SeekFrom, path::PathBuf};
+use tokio::io::AsyncReadExt;
+use tokio::io::AsyncSeekExt;
+use tokio::io::AsyncWriteExt;
+
+const VIDEO_PATH: &str = "./examples/assets/test_video.mp4";
+
+fn main() {
+    let video_file = PathBuf::from(VIDEO_PATH);
+    if !video_file.exists() {
+        tokio::runtime::Runtime::new()
+            .unwrap()
+            .block_on(async move {
+                println!("Downloading video file...");
+                let video_url =
+                    "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4";
+                let mut response = reqwest::get(video_url).await.unwrap();
+                let mut file = tokio::fs::File::create(&video_file).await.unwrap();
+                while let Some(chunk) = response.chunk().await.unwrap() {
+                    file.write_all(&chunk).await.unwrap();
+                }
+            });
+    }
+    dioxus_desktop::launch(app);
+}
+
+fn app(cx: Scope) -> Element {
+    use_asset_handler(cx, move |request: &AssetRequest| {
+        let request = request.clone();
+        async move {
+            let video_file = PathBuf::from(VIDEO_PATH);
+            let mut file = tokio::fs::File::open(&video_file).await.unwrap();
+            let response: Option<Response<Cow<'static, [u8]>>> =
+                match get_stream_response(&mut file, &request).await {
+                    Ok(response) => Some(response.map(Cow::Owned)),
+                    Err(err) => {
+                        eprintln!("Error: {}", err);
+                        None
+                    }
+                };
+            response
+        }
+    });
+
+    render! {
+        div { video { src: "test_video.mp4", autoplay: true, controls: true, width: 640, height: 480 } }
+    }
+}
+
+async fn get_stream_response(
+    asset: &mut (impl tokio::io::AsyncSeek + tokio::io::AsyncRead + Unpin + Send + Sync),
+    request: &AssetRequest,
+) -> Result<Response<Vec<u8>>, Box<dyn std::error::Error>> {
+    // get stream length
+    let len = {
+        let old_pos = asset.stream_position().await?;
+        let len = asset.seek(SeekFrom::End(0)).await?;
+        asset.seek(SeekFrom::Start(old_pos)).await?;
+        len
+    };
+
+    let mut resp = ResponseBuilder::new().header(CONTENT_TYPE, "video/mp4");
+
+    // if the webview sent a range header, we need to send a 206 in return
+    // Actually only macOS and Windows are supported. Linux will ALWAYS return empty headers.
+    let http_response = if let Some(range_header) = request.headers().get("range") {
+        let not_satisfiable = || {
+            ResponseBuilder::new()
+                .status(StatusCode::RANGE_NOT_SATISFIABLE)
+                .header(CONTENT_RANGE, format!("bytes */{len}"))
+                .body(vec![])
+        };
+
+        // parse range header
+        let ranges = if let Ok(ranges) = http_range::HttpRange::parse(range_header.to_str()?, len) {
+            ranges
+                .iter()
+                // map the output back to spec range <start-end>, example: 0-499
+                .map(|r| (r.start, r.start + r.length - 1))
+                .collect::<Vec<_>>()
+        } else {
+            return Ok(not_satisfiable()?);
+        };
+
+        /// The Maximum bytes we send in one range
+        const MAX_LEN: u64 = 1000 * 1024;
+
+        if ranges.len() == 1 {
+            let &(start, mut end) = ranges.first().unwrap();
+
+            // check if a range is not satisfiable
+            //
+            // this should be already taken care of by HttpRange::parse
+            // but checking here again for extra assurance
+            if start >= len || end >= len || end < start {
+                return Ok(not_satisfiable()?);
+            }
+
+            // adjust end byte for MAX_LEN
+            end = start + (end - start).min(len - start).min(MAX_LEN - 1);
+
+            // calculate number of bytes needed to be read
+            let bytes_to_read = end + 1 - start;
+
+            // allocate a buf with a suitable capacity
+            let mut buf = Vec::with_capacity(bytes_to_read as usize);
+            // seek the file to the starting byte
+            asset.seek(SeekFrom::Start(start)).await?;
+            // read the needed bytes
+            asset.take(bytes_to_read).read_to_end(&mut buf).await?;
+
+            resp = resp.header(CONTENT_RANGE, format!("bytes {start}-{end}/{len}"));
+            resp = resp.header(CONTENT_LENGTH, end + 1 - start);
+            resp = resp.status(StatusCode::PARTIAL_CONTENT);
+            resp.body(buf)
+        } else {
+            let mut buf = Vec::new();
+            let ranges = ranges
+                .iter()
+                .filter_map(|&(start, mut end)| {
+                    // filter out unsatisfiable ranges
+                    //
+                    // this should be already taken care of by HttpRange::parse
+                    // but checking here again for extra assurance
+                    if start >= len || end >= len || end < start {
+                        None
+                    } else {
+                        // adjust end byte for MAX_LEN
+                        end = start + (end - start).min(len - start).min(MAX_LEN - 1);
+                        Some((start, end))
+                    }
+                })
+                .collect::<Vec<_>>();
+
+            let boundary = format!("{:x}", rand::random::<u64>());
+            let boundary_sep = format!("\r\n--{boundary}\r\n");
+            let boundary_closer = format!("\r\n--{boundary}\r\n");
+
+            resp = resp.header(
+                CONTENT_TYPE,
+                format!("multipart/byteranges; boundary={boundary}"),
+            );
+
+            for (end, start) in ranges {
+                // a new range is being written, write the range boundary
+                buf.write_all(boundary_sep.as_bytes()).await?;
+
+                // write the needed headers `Content-Type` and `Content-Range`
+                buf.write_all(format!("{CONTENT_TYPE}: video/mp4\r\n").as_bytes())
+                    .await?;
+                buf.write_all(format!("{CONTENT_RANGE}: bytes {start}-{end}/{len}\r\n").as_bytes())
+                    .await?;
+
+                // write the separator to indicate the start of the range body
+                buf.write_all("\r\n".as_bytes()).await?;
+
+                // calculate number of bytes needed to be read
+                let bytes_to_read = end + 1 - start;
+
+                let mut local_buf = vec![0_u8; bytes_to_read as usize];
+                asset.seek(SeekFrom::Start(start)).await?;
+                asset.read_exact(&mut local_buf).await?;
+                buf.extend_from_slice(&local_buf);
+            }
+            // all ranges have been written, write the closing boundary
+            buf.write_all(boundary_closer.as_bytes()).await?;
+
+            resp.body(buf)
+        }
+    } else {
+        resp = resp.header(CONTENT_LENGTH, len);
+        let mut buf = Vec::with_capacity(len as usize);
+        asset.read_to_end(&mut buf).await?;
+        resp.body(buf)
+    };
+
+    http_response.map_err(Into::into)
+}

+ 7 - 9
packages/desktop/src/protocol.rs

@@ -4,7 +4,7 @@ use slab::Slab;
 use std::{
     borrow::Cow,
     future::Future,
-    ops::{Deref, DerefMut},
+    ops::Deref,
     path::{Path, PathBuf},
     pin::Pin,
     rc::Rc,
@@ -72,10 +72,11 @@ pub type AssetResponse = Response<Cow<'static, [u8]>>;
 pub trait AssetFuture: Future<Output = Option<AssetResponse>> + Send + Sync + 'static {}
 impl<T: Future<Output = Option<AssetResponse>> + Send + Sync + 'static> AssetFuture for T {}
 
+#[derive(Debug, Clone)]
 /// A request for an asset. This is a wrapper around [`Request<Vec<u8>>`] that provides methods specific to asset requests.
 pub struct AssetRequest {
     path: PathBuf,
-    request: Request<Vec<u8>>,
+    request: Arc<Request<Vec<u8>>>,
 }
 
 impl AssetRequest {
@@ -90,7 +91,10 @@ impl From<Request<Vec<u8>>> for AssetRequest {
         let decoded = urlencoding::decode(request.uri().path().trim_start_matches('/'))
             .expect("expected URL to be UTF-8 encoded");
         let path = PathBuf::from(&*decoded);
-        Self { request, path }
+        Self {
+            request: Arc::new(request),
+            path,
+        }
     }
 }
 
@@ -102,12 +106,6 @@ impl Deref for AssetRequest {
     }
 }
 
-impl DerefMut for AssetRequest {
-    fn deref_mut(&mut self) -> &mut Self::Target {
-        &mut self.request
-    }
-}
-
 /// A handler that takes an [`AssetRequest`] and returns a future that either loads the asset, or returns `None`.
 /// This handler is stashed indefinitely in a context object, so it must be `'static`.
 pub trait AssetHandler<F: AssetFuture>: Send + Sync + 'static {