|
@@ -16,28 +16,28 @@ use std::{
|
|
|
task::{Context, Poll},
|
|
|
};
|
|
|
|
|
|
-use futures_util::{Future, Stream};
|
|
|
+use futures_util::{stream::FuturesUnordered, Future, Stream, StreamExt};
|
|
|
use slotmap::{DefaultKey, SlotMap};
|
|
|
|
|
|
-use crate::{events::EventTrigger, innerlude::ScopeIdx};
|
|
|
+use crate::innerlude::{EventTrigger, FiberTask, ScopeIdx};
|
|
|
|
|
|
-pub type TaskSubmitter = Arc<dyn Fn(DTask)>;
|
|
|
+pub type TaskSubmitter = Arc<dyn Fn(FiberTask)>;
|
|
|
|
|
|
pub struct TaskQueue {
|
|
|
- slots: Arc<RwLock<SlotMap<DefaultKey, DTask>>>,
|
|
|
+ slots: Arc<RwLock<FuturesUnordered<FiberTask>>>,
|
|
|
+ // slots: Arc<RwLock<SlotMap<DefaultKey, DTask>>>,
|
|
|
submitter: TaskSubmitter,
|
|
|
}
|
|
|
|
|
|
impl TaskQueue {
|
|
|
pub fn new() -> Self {
|
|
|
- let slots = Arc::new(RwLock::new(SlotMap::new()));
|
|
|
-
|
|
|
+ let slots = Arc::new(RwLock::new(FuturesUnordered::new()));
|
|
|
let slots2 = slots.clone();
|
|
|
|
|
|
let submitter = Arc::new(move |task| {
|
|
|
let mut slots = slots2.write().unwrap();
|
|
|
log::debug!("Task submitted into global task queue");
|
|
|
- slots.insert(task);
|
|
|
+ slots.push(task);
|
|
|
});
|
|
|
Self { slots, submitter }
|
|
|
}
|
|
@@ -46,9 +46,9 @@ impl TaskQueue {
|
|
|
self.submitter.clone()
|
|
|
}
|
|
|
|
|
|
- pub fn submit_task(&mut self, task: DTask) -> TaskHandle {
|
|
|
- let key = self.slots.write().unwrap().insert(task);
|
|
|
- TaskHandle { key }
|
|
|
+ pub fn submit_task(&mut self, task: FiberTask) {
|
|
|
+ self.slots.write().unwrap().push(task);
|
|
|
+ // TaskHandle { key }
|
|
|
}
|
|
|
|
|
|
pub fn is_empty(&self) -> bool {
|
|
@@ -57,73 +57,78 @@ impl TaskQueue {
|
|
|
pub fn len(&self) -> usize {
|
|
|
self.slots.read().unwrap().len()
|
|
|
}
|
|
|
-}
|
|
|
-
|
|
|
-impl Stream for TaskQueue {
|
|
|
- type Item = EventTrigger;
|
|
|
-
|
|
|
- /// We can never be finished polling
|
|
|
- fn poll_next(
|
|
|
- self: Pin<&mut Self>,
|
|
|
- cx: &mut std::task::Context<'_>,
|
|
|
- ) -> std::task::Poll<Option<Self::Item>> {
|
|
|
- // let yield_every = self.len();
|
|
|
- // let mut polled = 0;
|
|
|
|
|
|
+ pub async fn next(&mut self) -> Option<EventTrigger> {
|
|
|
let mut slots = self.slots.write().unwrap();
|
|
|
- for (_key, slot) in slots.iter_mut() {
|
|
|
- if slot.dead.get() {
|
|
|
- continue;
|
|
|
- }
|
|
|
- let r = slot.fut;
|
|
|
- let fut = unsafe { &mut *r };
|
|
|
- // use futures::{future::Future, poll, FutureExt};
|
|
|
-
|
|
|
- let f2 = fut.as_mut();
|
|
|
- let w = cx.waker();
|
|
|
- let mut cx = Context::from_waker(&w);
|
|
|
-
|
|
|
- // Pin::new_unchecked(pointer)
|
|
|
- // use std::future::Future;
|
|
|
- match f2.poll(&mut cx) {
|
|
|
- Poll::Ready(_) => {
|
|
|
- let trigger = EventTrigger::new_from_task(slot.originator);
|
|
|
- slot.dead.set(true);
|
|
|
- return Poll::Ready(Some(trigger));
|
|
|
- }
|
|
|
- Poll::Pending => continue,
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // we tried polling every active task.
|
|
|
- // give up and relinquish controlto the parent
|
|
|
-
|
|
|
- // We have polled a large number of futures in a row without yielding.
|
|
|
- // To ensure we do not starve other tasks waiting on the executor,
|
|
|
- // we yield here, but immediately wake ourselves up to continue.
|
|
|
- // cx.waker().wake_by_ref();
|
|
|
- return Poll::Pending;
|
|
|
+ slots.next().await
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// impl Stream for TaskQueue {
|
|
|
+// type Item = EventTrigger;
|
|
|
+
|
|
|
+// /// We can never be finished polling
|
|
|
+// fn poll_next(
|
|
|
+// self: Pin<&mut Self>,
|
|
|
+// cx: &mut std::task::Context<'_>,
|
|
|
+// ) -> std::task::Poll<Option<Self::Item>> {
|
|
|
+// // let yield_every = self.len();
|
|
|
+// // let mut polled = 0;
|
|
|
+
|
|
|
+// let mut slots = self.slots.write().unwrap();
|
|
|
+// for (_key, slot) in slots.iter_mut() {
|
|
|
+// if slot.dead.get() {
|
|
|
+// continue;
|
|
|
+// }
|
|
|
+// let r = slot.fut;
|
|
|
+// // let fut = unsafe { &mut *r };
|
|
|
+// // use futures::{future::Future, poll, FutureExt};
|
|
|
+
|
|
|
+// let f2 = fut.as_mut();
|
|
|
+// let w = cx.waker();
|
|
|
+// let mut cx = Context::from_waker(&w);
|
|
|
+
|
|
|
+// // Pin::new_unchecked(pointer)
|
|
|
+// // use std::future::Future;
|
|
|
+// match f2.poll(&mut cx) {
|
|
|
+// Poll::Ready(_) => {
|
|
|
+// let trigger = EventTrigger::new_from_task(slot.originator);
|
|
|
+// slot.dead.set(true);
|
|
|
+// return Poll::Ready(Some(trigger));
|
|
|
+// }
|
|
|
+// Poll::Pending => continue,
|
|
|
+// }
|
|
|
+// }
|
|
|
+
|
|
|
+// // we tried polling every active task.
|
|
|
+// // give up and relinquish controlto the parent
|
|
|
+
|
|
|
+// // We have polled a large number of futures in a row without yielding.
|
|
|
+// // To ensure we do not starve other tasks waiting on the executor,
|
|
|
+// // we yield here, but immediately wake ourselves up to continue.
|
|
|
+// // cx.waker().wake_by_ref();
|
|
|
+// return Poll::Pending;
|
|
|
+// }
|
|
|
+// }
|
|
|
+
|
|
|
pub struct TaskHandle {
|
|
|
key: DefaultKey,
|
|
|
}
|
|
|
|
|
|
pub struct DTask {
|
|
|
- fut: *mut Pin<Box<dyn Future<Output = ()>>>,
|
|
|
+ fut: FiberTask,
|
|
|
originator: ScopeIdx,
|
|
|
dead: Cell<bool>,
|
|
|
}
|
|
|
impl DTask {
|
|
|
- pub fn new(fut: &mut Pin<Box<dyn Future<Output = ()>>>, originator: ScopeIdx) -> Self {
|
|
|
+ pub fn new(fut: FiberTask, originator: ScopeIdx) -> Self {
|
|
|
Self {
|
|
|
fut,
|
|
|
originator,
|
|
|
dead: Cell::new(false),
|
|
|
}
|
|
|
}
|
|
|
- pub fn debug_new(fut: &mut Pin<Box<dyn Future<Output = ()>>>) -> Self {
|
|
|
+ pub fn debug_new(fut: FiberTask) -> Self {
|
|
|
let originator = ScopeIdx::default();
|
|
|
Self {
|
|
|
fut,
|