|
@@ -9,30 +9,158 @@
|
|
//! This is all pretty unsafe stuff.
|
|
//! This is all pretty unsafe stuff.
|
|
//! The major invariant here is that tasks that enter the queue may be invalidated during transitions.
|
|
//! The major invariant here is that tasks that enter the queue may be invalidated during transitions.
|
|
|
|
|
|
-use std::pin::Pin;
|
|
|
|
|
|
+use std::{
|
|
|
|
+ cell::Cell,
|
|
|
|
+ pin::Pin,
|
|
|
|
+ sync::{Arc, RwLock},
|
|
|
|
+ task::{Context, Poll},
|
|
|
|
+};
|
|
|
|
|
|
use futures::{Future, Stream, StreamExt};
|
|
use futures::{Future, Stream, StreamExt};
|
|
use slotmap::{DefaultKey, SlotMap};
|
|
use slotmap::{DefaultKey, SlotMap};
|
|
|
|
|
|
-use crate::events::EventTrigger;
|
|
|
|
|
|
+use crate::{events::EventTrigger, prelude::ScopeIdx};
|
|
|
|
|
|
pub struct TaskQueue {
|
|
pub struct TaskQueue {
|
|
- slots: SlotMap<DefaultKey, Task>,
|
|
|
|
|
|
+ slots: Arc<RwLock<SlotMap<DefaultKey, DTask>>>,
|
|
|
|
+ submitter: Arc<dyn Fn(DTask)>,
|
|
}
|
|
}
|
|
|
|
|
|
impl TaskQueue {
|
|
impl TaskQueue {
|
|
- unsafe fn push_task(&mut self, task: Task) -> TaskHandle {
|
|
|
|
- todo!()
|
|
|
|
|
|
+ pub fn new() -> Self {
|
|
|
|
+ let slots = Arc::new(RwLock::new(SlotMap::new()));
|
|
|
|
+
|
|
|
|
+ let slots2 = slots.clone();
|
|
|
|
+
|
|
|
|
+ let submitter = Arc::new(move |task| {
|
|
|
|
+ let mut slots = slots2.write().unwrap();
|
|
|
|
+ slots.insert(task);
|
|
|
|
+ });
|
|
|
|
+ Self { slots, submitter }
|
|
}
|
|
}
|
|
|
|
|
|
- async fn next(&mut self) -> EventTrigger {
|
|
|
|
- for (key, task) in self.slots.iter_mut() {
|
|
|
|
- let ptr = task.0;
|
|
|
|
- }
|
|
|
|
- todo!()
|
|
|
|
|
|
+ fn push_task(&mut self, task: DTask) -> TaskHandle {
|
|
|
|
+ let key = self.slots.write().unwrap().insert(task);
|
|
|
|
+ TaskHandle {}
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ fn is_empty(&self) -> bool {
|
|
|
|
+ self.slots.read().unwrap().is_empty()
|
|
|
|
+ }
|
|
|
|
+ fn len(&self) -> usize {
|
|
|
|
+ self.slots.read().unwrap().len()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-struct Task(*mut Pin<Box<dyn Future<Output = ()>>>);
|
|
|
|
|
|
+impl Stream for TaskQueue {
|
|
|
|
+ type Item = EventTrigger;
|
|
|
|
+
|
|
|
|
+ /// We can never be finished polling
|
|
|
|
+ fn poll_next(
|
|
|
|
+ mut self: Pin<&mut Self>,
|
|
|
|
+ cx: &mut std::task::Context<'_>,
|
|
|
|
+ ) -> std::task::Poll<Option<Self::Item>> {
|
|
|
|
+ // let yield_every = self.len();
|
|
|
|
+ // let mut polled = 0;
|
|
|
|
+
|
|
|
|
+ let mut slots = self.slots.write().unwrap();
|
|
|
|
+ for (key, slot) in slots.iter_mut() {
|
|
|
|
+ if slot.dead.get() {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ let r = slot.fut;
|
|
|
|
+ let mut fut = unsafe { &mut *r };
|
|
|
|
+ // use futures::{future::Future, poll, FutureExt};
|
|
|
|
+
|
|
|
|
+ let f2 = fut.as_mut();
|
|
|
|
+ let w = cx.waker();
|
|
|
|
+ let mut cx = Context::from_waker(&w);
|
|
|
|
+
|
|
|
|
+ // Pin::new_unchecked(pointer)
|
|
|
|
+ // use std::future::Future;
|
|
|
|
+ match f2.poll(&mut cx) {
|
|
|
|
+ Poll::Ready(_) => {
|
|
|
|
+ let trigger = EventTrigger::new_from_task(slot.originator);
|
|
|
|
+ slot.dead.set(true);
|
|
|
|
+ return Poll::Ready(Some(trigger));
|
|
|
|
+ }
|
|
|
|
+ Poll::Pending => continue,
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // we tried polling every active task.
|
|
|
|
+ // give up and relinquish controlto the parent
|
|
|
|
+
|
|
|
|
+ // We have polled a large number of futures in a row without yielding.
|
|
|
|
+ // To ensure we do not starve other tasks waiting on the executor,
|
|
|
|
+ // we yield here, but immediately wake ourselves up to continue.
|
|
|
|
+ // cx.waker().wake_by_ref();
|
|
|
|
+ return Poll::Pending;
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
|
|
struct TaskHandle {}
|
|
struct TaskHandle {}
|
|
|
|
+
|
|
|
|
+pub struct DTask {
|
|
|
|
+ fut: *mut Pin<Box<dyn Future<Output = ()>>>,
|
|
|
|
+ originator: ScopeIdx,
|
|
|
|
+ dead: Cell<bool>,
|
|
|
|
+}
|
|
|
|
+impl DTask {
|
|
|
|
+ pub fn new(fut: &mut Pin<Box<dyn Future<Output = ()>>>, originator: ScopeIdx) -> Self {
|
|
|
|
+ Self {
|
|
|
|
+ fut,
|
|
|
|
+ originator,
|
|
|
|
+ dead: Cell::new(false),
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ fn debug_new(fut: &mut Pin<Box<dyn Future<Output = ()>>>) -> Self {
|
|
|
|
+ let originator = ScopeIdx::default();
|
|
|
|
+ Self {
|
|
|
|
+ fut,
|
|
|
|
+ originator,
|
|
|
|
+ dead: Cell::new(false),
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+mod tests {
|
|
|
|
+ use std::time::Duration;
|
|
|
|
+
|
|
|
|
+ use super::*;
|
|
|
|
+ use bumpalo::Bump;
|
|
|
|
+
|
|
|
|
+ #[async_std::test]
|
|
|
|
+ async fn example() {
|
|
|
|
+ let bump = Bump::new();
|
|
|
|
+ type RawTask = Pin<Box<dyn Future<Output = ()>>>;
|
|
|
|
+ // build the three
|
|
|
|
+ let f1 = bump.alloc(Box::pin(async {
|
|
|
|
+ //
|
|
|
|
+ async_std::task::sleep(Duration::from_secs(3)).await;
|
|
|
|
+ println!("3 sec")
|
|
|
|
+ }) as RawTask);
|
|
|
|
+
|
|
|
|
+ let f2 = bump.alloc(Box::pin(async {
|
|
|
|
+ //
|
|
|
|
+ async_std::task::sleep(Duration::from_secs(2)).await;
|
|
|
|
+ println!("2 sec")
|
|
|
|
+ }) as RawTask);
|
|
|
|
+
|
|
|
|
+ let f3 = bump.alloc(Box::pin(async {
|
|
|
|
+ //
|
|
|
|
+ async_std::task::sleep(Duration::from_secs(1)).await;
|
|
|
|
+ println!("1 sec");
|
|
|
|
+ }) as RawTask);
|
|
|
|
+
|
|
|
|
+ let mut queue = TaskQueue::new();
|
|
|
|
+ queue.push_task(DTask::debug_new(f1));
|
|
|
|
+ queue.push_task(DTask::debug_new(f2));
|
|
|
|
+ queue.push_task(DTask::debug_new(f3));
|
|
|
|
+
|
|
|
|
+ while !queue.is_empty() {
|
|
|
|
+ let next = queue.next().await;
|
|
|
|
+ println!("Event received {:#?}", next);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|