set_state_batch.rs 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. use futures_util::StreamExt;
  2. /*
  3. furtures_channel provides us some batching simply due to how Rust's async works.
  4. Any hook that uses schedule_update is simply deferring to unbounded_send. Multiple
  5. unbounded_sends can be linked together in succession provided there isn't an "await"
  6. between them. Our internal batching mechanism simply waits for the "schedule_update"
  7. to fire and then pulls any messages off the unbounded_send queue.
  8. Additionally, due to how our "time slicing" works we'll always come back and check
  9. in for new work if the deadline hasn't expired. On average, our deadline should be
  10. about 10ms, which is way more than enough for diffing/creating to happen.
  11. */
  12. #[async_std::test]
  13. async fn batch() {
  14. let (sender, mut recver) = futures_channel::mpsc::unbounded::<i32>();
  15. let _handle = async_std::task::spawn(async move {
  16. let _msg = recver.next().await;
  17. while let Ok(msg) = recver.try_next() {
  18. println!("{:#?}", msg);
  19. }
  20. let _msg = recver.next().await;
  21. while let Ok(msg) = recver.try_next() {
  22. println!("{:#?}", msg);
  23. }
  24. });
  25. sender.unbounded_send(1).unwrap();
  26. sender.unbounded_send(2).unwrap();
  27. sender.unbounded_send(3).unwrap();
  28. sender.unbounded_send(4).unwrap();
  29. async_std::task::sleep(std::time::Duration::from_millis(100)).await;
  30. sender.unbounded_send(5).unwrap();
  31. sender.unbounded_send(6).unwrap();
  32. sender.unbounded_send(7).unwrap();
  33. sender.unbounded_send(8).unwrap();
  34. }