video_stream.rs 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. use dioxus::desktop::wry::http;
  2. use dioxus::desktop::wry::http::Response;
  3. use dioxus::desktop::{use_asset_handler, AssetRequest};
  4. use dioxus::prelude::*;
  5. use http::{header::*, response::Builder as ResponseBuilder, status::StatusCode};
  6. use std::{io::SeekFrom, path::PathBuf};
  7. use tokio::io::AsyncReadExt;
  8. use tokio::io::AsyncSeekExt;
  9. use tokio::io::AsyncWriteExt;
  10. const VIDEO_PATH: &str = "./examples/assets/test_video.mp4";
  11. fn main() {
  12. let video_file = PathBuf::from(VIDEO_PATH);
  13. if !video_file.exists() {
  14. tokio::runtime::Runtime::new()
  15. .unwrap()
  16. .block_on(async move {
  17. println!("Downloading video file...");
  18. let video_url =
  19. "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4";
  20. let mut response = reqwest::get(video_url).await.unwrap();
  21. let mut file = tokio::fs::File::create(&video_file).await.unwrap();
  22. while let Some(chunk) = response.chunk().await.unwrap() {
  23. file.write_all(&chunk).await.unwrap();
  24. }
  25. });
  26. }
  27. launch_desktop(app);
  28. }
  29. fn app() -> Element {
  30. use_asset_handler("videos", move |request, responder| {
  31. // Using dioxus::spawn works, but is slower than a dedicated thread
  32. tokio::task::spawn(async move {
  33. let video_file = PathBuf::from(VIDEO_PATH);
  34. let mut file = tokio::fs::File::open(&video_file).await.unwrap();
  35. match get_stream_response(&mut file, &request).await {
  36. Ok(response) => responder.respond(response),
  37. Err(err) => eprintln!("Error: {}", err),
  38. }
  39. });
  40. });
  41. rsx! {
  42. div {
  43. video {
  44. src: "/videos/test_video.mp4",
  45. autoplay: true,
  46. controls: true,
  47. width: 640,
  48. height: 480
  49. }
  50. }
  51. }
  52. }
  53. /// This was taken from wry's example
  54. async fn get_stream_response(
  55. asset: &mut (impl tokio::io::AsyncSeek + tokio::io::AsyncRead + Unpin + Send + Sync),
  56. request: &AssetRequest,
  57. ) -> Result<Response<Vec<u8>>, Box<dyn std::error::Error>> {
  58. // get stream length
  59. let len = {
  60. let old_pos = asset.stream_position().await?;
  61. let len = asset.seek(SeekFrom::End(0)).await?;
  62. asset.seek(SeekFrom::Start(old_pos)).await?;
  63. len
  64. };
  65. let mut resp = ResponseBuilder::new().header(CONTENT_TYPE, "video/mp4");
  66. // if the webview sent a range header, we need to send a 206 in return
  67. // Actually only macOS and Windows are supported. Linux will ALWAYS return empty headers.
  68. let http_response = if let Some(range_header) = request.headers().get("range") {
  69. let not_satisfiable = || {
  70. ResponseBuilder::new()
  71. .status(StatusCode::RANGE_NOT_SATISFIABLE)
  72. .header(CONTENT_RANGE, format!("bytes */{len}"))
  73. .body(vec![])
  74. };
  75. // parse range header
  76. let ranges = if let Ok(ranges) = http_range::HttpRange::parse(range_header.to_str()?, len) {
  77. ranges
  78. .iter()
  79. // map the output back to spec range <start-end>, example: 0-499
  80. .map(|r| (r.start, r.start + r.length - 1))
  81. .collect::<Vec<_>>()
  82. } else {
  83. return Ok(not_satisfiable()?);
  84. };
  85. /// The Maximum bytes we send in one range
  86. const MAX_LEN: u64 = 1000 * 1024;
  87. if ranges.len() == 1 {
  88. let &(start, mut end) = ranges.first().unwrap();
  89. // check if a range is not satisfiable
  90. //
  91. // this should be already taken care of by HttpRange::parse
  92. // but checking here again for extra assurance
  93. if start >= len || end >= len || end < start {
  94. return Ok(not_satisfiable()?);
  95. }
  96. // adjust end byte for MAX_LEN
  97. end = start + (end - start).min(len - start).min(MAX_LEN - 1);
  98. // calculate number of bytes needed to be read
  99. let bytes_to_read = end + 1 - start;
  100. // allocate a buf with a suitable capacity
  101. let mut buf = Vec::with_capacity(bytes_to_read as usize);
  102. // seek the file to the starting byte
  103. asset.seek(SeekFrom::Start(start)).await?;
  104. // read the needed bytes
  105. asset.take(bytes_to_read).read_to_end(&mut buf).await?;
  106. resp = resp.header(CONTENT_RANGE, format!("bytes {start}-{end}/{len}"));
  107. resp = resp.header(CONTENT_LENGTH, end + 1 - start);
  108. resp = resp.status(StatusCode::PARTIAL_CONTENT);
  109. resp.body(buf)
  110. } else {
  111. let mut buf = Vec::new();
  112. let ranges = ranges
  113. .iter()
  114. .filter_map(|&(start, mut end)| {
  115. // filter out unsatisfiable ranges
  116. //
  117. // this should be already taken care of by HttpRange::parse
  118. // but checking here again for extra assurance
  119. if start >= len || end >= len || end < start {
  120. None
  121. } else {
  122. // adjust end byte for MAX_LEN
  123. end = start + (end - start).min(len - start).min(MAX_LEN - 1);
  124. Some((start, end))
  125. }
  126. })
  127. .collect::<Vec<_>>();
  128. let boundary = format!("{:x}", rand::random::<u64>());
  129. let boundary_sep = format!("\r\n--{boundary}\r\n");
  130. let boundary_closer = format!("\r\n--{boundary}\r\n");
  131. resp = resp.header(
  132. CONTENT_TYPE,
  133. format!("multipart/byteranges; boundary={boundary}"),
  134. );
  135. for (end, start) in ranges {
  136. // a new range is being written, write the range boundary
  137. buf.write_all(boundary_sep.as_bytes()).await?;
  138. // write the needed headers `Content-Type` and `Content-Range`
  139. buf.write_all(format!("{CONTENT_TYPE}: video/mp4\r\n").as_bytes())
  140. .await?;
  141. buf.write_all(format!("{CONTENT_RANGE}: bytes {start}-{end}/{len}\r\n").as_bytes())
  142. .await?;
  143. // write the separator to indicate the start of the range body
  144. buf.write_all("\r\n".as_bytes()).await?;
  145. // calculate number of bytes needed to be read
  146. let bytes_to_read = end + 1 - start;
  147. let mut local_buf = vec![0_u8; bytes_to_read as usize];
  148. asset.seek(SeekFrom::Start(start)).await?;
  149. asset.read_exact(&mut local_buf).await?;
  150. buf.extend_from_slice(&local_buf);
  151. }
  152. // all ranges have been written, write the closing boundary
  153. buf.write_all(boundary_closer.as_bytes()).await?;
  154. resp.body(buf)
  155. }
  156. } else {
  157. resp = resp.header(CONTENT_LENGTH, len);
  158. let mut buf = Vec::with_capacity(len as usize);
  159. asset.read_to_end(&mut buf).await?;
  160. resp.body(buf)
  161. };
  162. http_response.map_err(Into::into)
  163. }