video_stream.rs 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. use dioxus::prelude::*;
  2. use dioxus_desktop::wry::http;
  3. use dioxus_desktop::wry::http::Response;
  4. use dioxus_desktop::{use_asset_handler, AssetRequest};
  5. use http::{header::*, response::Builder as ResponseBuilder, status::StatusCode};
  6. use std::borrow::Cow;
  7. use std::{io::SeekFrom, path::PathBuf};
  8. use tokio::io::AsyncReadExt;
  9. use tokio::io::AsyncSeekExt;
  10. use tokio::io::AsyncWriteExt;
  11. const VIDEO_PATH: &str = "./examples/assets/test_video.mp4";
  12. fn main() {
  13. let video_file = PathBuf::from(VIDEO_PATH);
  14. if !video_file.exists() {
  15. tokio::runtime::Runtime::new()
  16. .unwrap()
  17. .block_on(async move {
  18. println!("Downloading video file...");
  19. let video_url =
  20. "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4";
  21. let mut response = reqwest::get(video_url).await.unwrap();
  22. let mut file = tokio::fs::File::create(&video_file).await.unwrap();
  23. while let Some(chunk) = response.chunk().await.unwrap() {
  24. file.write_all(&chunk).await.unwrap();
  25. }
  26. });
  27. }
  28. dioxus_desktop::launch(app);
  29. }
  30. fn app(cx: Scope) -> Element {
  31. use_asset_handler(cx, move |request: &AssetRequest| {
  32. let request = request.clone();
  33. async move {
  34. let video_file = PathBuf::from(VIDEO_PATH);
  35. let mut file = tokio::fs::File::open(&video_file).await.unwrap();
  36. let response: Option<Response<Cow<'static, [u8]>>> =
  37. match get_stream_response(&mut file, &request).await {
  38. Ok(response) => Some(response.map(Cow::Owned)),
  39. Err(err) => {
  40. eprintln!("Error: {}", err);
  41. None
  42. }
  43. };
  44. response
  45. }
  46. });
  47. render! {
  48. div { video { src: "test_video.mp4", autoplay: true, controls: true, width: 640, height: 480 } }
  49. }
  50. }
  51. async fn get_stream_response(
  52. asset: &mut (impl tokio::io::AsyncSeek + tokio::io::AsyncRead + Unpin + Send + Sync),
  53. request: &AssetRequest,
  54. ) -> Result<Response<Vec<u8>>, Box<dyn std::error::Error>> {
  55. // get stream length
  56. let len = {
  57. let old_pos = asset.stream_position().await?;
  58. let len = asset.seek(SeekFrom::End(0)).await?;
  59. asset.seek(SeekFrom::Start(old_pos)).await?;
  60. len
  61. };
  62. let mut resp = ResponseBuilder::new().header(CONTENT_TYPE, "video/mp4");
  63. // if the webview sent a range header, we need to send a 206 in return
  64. // Actually only macOS and Windows are supported. Linux will ALWAYS return empty headers.
  65. let http_response = if let Some(range_header) = request.headers().get("range") {
  66. let not_satisfiable = || {
  67. ResponseBuilder::new()
  68. .status(StatusCode::RANGE_NOT_SATISFIABLE)
  69. .header(CONTENT_RANGE, format!("bytes */{len}"))
  70. .body(vec![])
  71. };
  72. // parse range header
  73. let ranges = if let Ok(ranges) = http_range::HttpRange::parse(range_header.to_str()?, len) {
  74. ranges
  75. .iter()
  76. // map the output back to spec range <start-end>, example: 0-499
  77. .map(|r| (r.start, r.start + r.length - 1))
  78. .collect::<Vec<_>>()
  79. } else {
  80. return Ok(not_satisfiable()?);
  81. };
  82. /// The Maximum bytes we send in one range
  83. const MAX_LEN: u64 = 1000 * 1024;
  84. if ranges.len() == 1 {
  85. let &(start, mut end) = ranges.first().unwrap();
  86. // check if a range is not satisfiable
  87. //
  88. // this should be already taken care of by HttpRange::parse
  89. // but checking here again for extra assurance
  90. if start >= len || end >= len || end < start {
  91. return Ok(not_satisfiable()?);
  92. }
  93. // adjust end byte for MAX_LEN
  94. end = start + (end - start).min(len - start).min(MAX_LEN - 1);
  95. // calculate number of bytes needed to be read
  96. let bytes_to_read = end + 1 - start;
  97. // allocate a buf with a suitable capacity
  98. let mut buf = Vec::with_capacity(bytes_to_read as usize);
  99. // seek the file to the starting byte
  100. asset.seek(SeekFrom::Start(start)).await?;
  101. // read the needed bytes
  102. asset.take(bytes_to_read).read_to_end(&mut buf).await?;
  103. resp = resp.header(CONTENT_RANGE, format!("bytes {start}-{end}/{len}"));
  104. resp = resp.header(CONTENT_LENGTH, end + 1 - start);
  105. resp = resp.status(StatusCode::PARTIAL_CONTENT);
  106. resp.body(buf)
  107. } else {
  108. let mut buf = Vec::new();
  109. let ranges = ranges
  110. .iter()
  111. .filter_map(|&(start, mut end)| {
  112. // filter out unsatisfiable ranges
  113. //
  114. // this should be already taken care of by HttpRange::parse
  115. // but checking here again for extra assurance
  116. if start >= len || end >= len || end < start {
  117. None
  118. } else {
  119. // adjust end byte for MAX_LEN
  120. end = start + (end - start).min(len - start).min(MAX_LEN - 1);
  121. Some((start, end))
  122. }
  123. })
  124. .collect::<Vec<_>>();
  125. let boundary = format!("{:x}", rand::random::<u64>());
  126. let boundary_sep = format!("\r\n--{boundary}\r\n");
  127. let boundary_closer = format!("\r\n--{boundary}\r\n");
  128. resp = resp.header(
  129. CONTENT_TYPE,
  130. format!("multipart/byteranges; boundary={boundary}"),
  131. );
  132. for (end, start) in ranges {
  133. // a new range is being written, write the range boundary
  134. buf.write_all(boundary_sep.as_bytes()).await?;
  135. // write the needed headers `Content-Type` and `Content-Range`
  136. buf.write_all(format!("{CONTENT_TYPE}: video/mp4\r\n").as_bytes())
  137. .await?;
  138. buf.write_all(format!("{CONTENT_RANGE}: bytes {start}-{end}/{len}\r\n").as_bytes())
  139. .await?;
  140. // write the separator to indicate the start of the range body
  141. buf.write_all("\r\n".as_bytes()).await?;
  142. // calculate number of bytes needed to be read
  143. let bytes_to_read = end + 1 - start;
  144. let mut local_buf = vec![0_u8; bytes_to_read as usize];
  145. asset.seek(SeekFrom::Start(start)).await?;
  146. asset.read_exact(&mut local_buf).await?;
  147. buf.extend_from_slice(&local_buf);
  148. }
  149. // all ranges have been written, write the closing boundary
  150. buf.write_all(boundary_closer.as_bytes()).await?;
  151. resp.body(buf)
  152. }
  153. } else {
  154. resp = resp.header(CONTENT_LENGTH, len);
  155. let mut buf = Vec::with_capacity(len as usize);
  156. asset.read_to_end(&mut buf).await?;
  157. resp.body(buf)
  158. };
  159. http_response.map_err(Into::into)
  160. }