|
@@ -1,12 +1,21 @@
|
|
-use std::{cell::RefCell, mem, ops::DerefMut, pin::Pin, process::Output, rc::Rc, sync::Arc};
|
|
|
|
|
|
+use std::{
|
|
|
|
+ cell::{RefCell, UnsafeCell},
|
|
|
|
+ marker::PhantomData,
|
|
|
|
+ mem::{self, MaybeUninit},
|
|
|
|
+ ops::DerefMut,
|
|
|
|
+ pin::Pin,
|
|
|
|
+ process::Output,
|
|
|
|
+ rc::Rc,
|
|
|
|
+ sync::Arc,
|
|
|
|
+};
|
|
|
|
|
|
use futures_task::{waker, ArcWake, Context, RawWaker, RawWakerVTable, Waker};
|
|
use futures_task::{waker, ArcWake, Context, RawWaker, RawWakerVTable, Waker};
|
|
use futures_util::{pin_mut, Future, FutureExt};
|
|
use futures_util::{pin_mut, Future, FutureExt};
|
|
use slab::Slab;
|
|
use slab::Slab;
|
|
|
|
|
|
-use crate::ScopeId;
|
|
|
|
|
|
+use crate::{Element, ScopeId};
|
|
|
|
|
|
-use super::{HandleInner, SchedulerHandle, SchedulerMsg};
|
|
|
|
|
|
+use super::{waker::RcWake, HandleInner, SchedulerHandle, SchedulerMsg};
|
|
|
|
|
|
#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))]
|
|
#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))]
|
|
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
|
|
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
|
|
@@ -14,27 +23,25 @@ pub struct TaskId(pub usize);
|
|
|
|
|
|
/// the task itself is the waker
|
|
/// the task itself is the waker
|
|
|
|
|
|
-#[derive(Clone)]
|
|
|
|
pub struct LocalTask {
|
|
pub struct LocalTask {
|
|
id: TaskId,
|
|
id: TaskId,
|
|
scope: ScopeId,
|
|
scope: ScopeId,
|
|
tx: futures_channel::mpsc::UnboundedSender<SchedulerMsg>,
|
|
tx: futures_channel::mpsc::UnboundedSender<SchedulerMsg>,
|
|
- pub task: *mut dyn Future<Output = ()>,
|
|
|
|
|
|
+ pub task: UnsafeCell<Pin<Box<dyn Future<Output = ()> + 'static>>>,
|
|
}
|
|
}
|
|
|
|
|
|
impl HandleInner {
|
|
impl HandleInner {
|
|
pub fn spawn(&self, scope: ScopeId, task: impl Future<Output = ()> + 'static) -> TaskId {
|
|
pub fn spawn(&self, scope: ScopeId, task: impl Future<Output = ()> + 'static) -> TaskId {
|
|
let mut tasks = self.tasks.borrow_mut();
|
|
let mut tasks = self.tasks.borrow_mut();
|
|
-
|
|
|
|
let entry = tasks.vacant_entry();
|
|
let entry = tasks.vacant_entry();
|
|
let task_id = TaskId(entry.key());
|
|
let task_id = TaskId(entry.key());
|
|
|
|
|
|
- entry.insert(LocalTask {
|
|
|
|
|
|
+ entry.insert(Rc::new(LocalTask {
|
|
id: task_id,
|
|
id: task_id,
|
|
tx: self.sender.clone(),
|
|
tx: self.sender.clone(),
|
|
- task: Box::into_raw(Box::new(task)),
|
|
|
|
|
|
+ task: UnsafeCell::new(Box::pin(task)),
|
|
scope,
|
|
scope,
|
|
- });
|
|
|
|
|
|
+ }));
|
|
|
|
|
|
self.sender
|
|
self.sender
|
|
.unbounded_send(SchedulerMsg::TaskNotified(task_id))
|
|
.unbounded_send(SchedulerMsg::TaskNotified(task_id))
|
|
@@ -54,28 +61,15 @@ impl HandleInner {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-pub fn make_task_waker(task: &LocalTask) -> Waker {
|
|
|
|
- let raw = RawWaker::new(task as *const LocalTask as *const _, task_vtable());
|
|
|
|
- unsafe { Waker::from_raw(raw) }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-fn task_vtable() -> &'static RawWakerVTable {
|
|
|
|
- &RawWakerVTable::new(clone, wake, wake_by_ref, drop_task)
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-unsafe fn clone(data: *const ()) -> RawWaker {
|
|
|
|
- RawWaker::new(data as *const (), task_vtable())
|
|
|
|
-}
|
|
|
|
-unsafe fn wake(data: *const ()) {
|
|
|
|
- wake_by_ref(data);
|
|
|
|
-}
|
|
|
|
-unsafe fn wake_by_ref(data: *const ()) {
|
|
|
|
- let task = &*(data as *const LocalTask);
|
|
|
|
- task.tx
|
|
|
|
- .unbounded_send(SchedulerMsg::TaskNotified(task.id))
|
|
|
|
- .expect("Scheduler should exist");
|
|
|
|
|
|
+pub fn make_task_waker(task: Rc<LocalTask>) -> Waker {
|
|
|
|
+ let ptr = Rc::into_raw(task).cast::<()>();
|
|
|
|
+ super::waker::make_rc_waker(task)
|
|
}
|
|
}
|
|
|
|
|
|
-unsafe fn drop_task(_data: *const ()) {
|
|
|
|
- // doesnt do anything
|
|
|
|
|
|
+impl RcWake for LocalTask {
|
|
|
|
+ fn wake_by_ref(arc_self: &Rc<Self>) {
|
|
|
|
+ _ = arc_self
|
|
|
|
+ .tx
|
|
|
|
+ .unbounded_send(SchedulerMsg::TaskNotified(arc_self.id));
|
|
|
|
+ }
|
|
}
|
|
}
|