|
@@ -101,6 +101,10 @@ pub enum SchedulerMsg {
|
|
Immediate(ScopeId),
|
|
Immediate(ScopeId),
|
|
|
|
|
|
// tasks
|
|
// tasks
|
|
|
|
+ Task(TaskMsg),
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+pub enum TaskMsg {
|
|
SubmitTask(FiberTask, u64),
|
|
SubmitTask(FiberTask, u64),
|
|
ToggleTask(u64),
|
|
ToggleTask(u64),
|
|
PauseTask(u64),
|
|
PauseTask(u64),
|
|
@@ -152,7 +156,9 @@ pub(crate) struct Scheduler {
|
|
|
|
|
|
pub garbage_scopes: HashSet<ScopeId>,
|
|
pub garbage_scopes: HashSet<ScopeId>,
|
|
|
|
|
|
- pub lane: PriorityLane,
|
|
|
|
|
|
+ pub dirty_scopes: IndexSet<ScopeId>,
|
|
|
|
+ pub saved_state: Option<SavedDiffWork<'static>>,
|
|
|
|
+ pub in_progress: bool,
|
|
}
|
|
}
|
|
|
|
|
|
impl Scheduler {
|
|
impl Scheduler {
|
|
@@ -181,7 +187,9 @@ impl Scheduler {
|
|
let task_id = task_counter.get();
|
|
let task_id = task_counter.get();
|
|
task_counter.set(task_id + 1);
|
|
task_counter.set(task_id + 1);
|
|
sender
|
|
sender
|
|
- .unbounded_send(SchedulerMsg::SubmitTask(fiber_task, task_id))
|
|
|
|
|
|
+ .unbounded_send(SchedulerMsg::Task(TaskMsg::SubmitTask(
|
|
|
|
+ fiber_task, task_id,
|
|
|
|
+ )))
|
|
.unwrap();
|
|
.unwrap();
|
|
TaskHandle {
|
|
TaskHandle {
|
|
our_id: task_id,
|
|
our_id: task_id,
|
|
@@ -214,6 +222,12 @@ impl Scheduler {
|
|
|
|
|
|
let async_tasks = FuturesUnordered::new();
|
|
let async_tasks = FuturesUnordered::new();
|
|
|
|
|
|
|
|
+ // push a task that would never resolve - prevents us from immediately aborting the scheduler
|
|
|
|
+ async_tasks.push(Box::pin(async {
|
|
|
|
+ std::future::pending::<()>().await;
|
|
|
|
+ ScopeId(0)
|
|
|
|
+ }) as FiberTask);
|
|
|
|
+
|
|
Self {
|
|
Self {
|
|
pool,
|
|
pool,
|
|
|
|
|
|
@@ -235,24 +249,9 @@ impl Scheduler {
|
|
|
|
|
|
garbage_scopes: HashSet::new(),
|
|
garbage_scopes: HashSet::new(),
|
|
|
|
|
|
- // current_priority: EventPriority::Low,
|
|
|
|
- lane: PriorityLane::new(),
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- pub fn manually_poll_events(&mut self) {
|
|
|
|
- while let Ok(Some(msg)) = self.receiver.try_next() {
|
|
|
|
- match msg {
|
|
|
|
- SchedulerMsg::UiEvent(event) => self.ui_events.push_front(event),
|
|
|
|
- SchedulerMsg::Immediate(im) => self.pending_immediates.push_front(im),
|
|
|
|
-
|
|
|
|
- // task stuff
|
|
|
|
- SchedulerMsg::SubmitTask(_, _) => todo!(),
|
|
|
|
- SchedulerMsg::ToggleTask(_) => todo!(),
|
|
|
|
- SchedulerMsg::PauseTask(_) => todo!(),
|
|
|
|
- SchedulerMsg::ResumeTask(_) => todo!(),
|
|
|
|
- SchedulerMsg::DropTask(_) => todo!(),
|
|
|
|
- }
|
|
|
|
|
|
+ dirty_scopes: Default::default(),
|
|
|
|
+ saved_state: None,
|
|
|
|
+ in_progress: false,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -272,31 +271,27 @@ impl Scheduler {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- use EventPriority::*;
|
|
|
|
|
|
+ // use EventPriority::*;
|
|
|
|
|
|
- match priority {
|
|
|
|
- Immediate => todo!(),
|
|
|
|
- High => todo!(),
|
|
|
|
- Medium => todo!(),
|
|
|
|
- Low => todo!(),
|
|
|
|
- }
|
|
|
|
|
|
+ // match priority {
|
|
|
|
+ // Immediate => todo!(),
|
|
|
|
+ // High => todo!(),
|
|
|
|
+ // Medium => todo!(),
|
|
|
|
+ // Low => todo!(),
|
|
|
|
+ // }
|
|
|
|
|
|
discrete
|
|
discrete
|
|
}
|
|
}
|
|
|
|
|
|
fn prepare_work(&mut self) {
|
|
fn prepare_work(&mut self) {
|
|
- while let Some(trigger) = self.ui_events.pop_back() {
|
|
|
|
- if let Some(scope) = self.pool.get_scope_mut(trigger.scope) {}
|
|
|
|
- }
|
|
|
|
|
|
+ // while let Some(trigger) = self.ui_events.pop_back() {
|
|
|
|
+ // if let Some(scope) = self.pool.get_scope_mut(trigger.scope) {}
|
|
|
|
+ // }
|
|
}
|
|
}
|
|
|
|
|
|
// nothing to do, no events on channels, no work
|
|
// nothing to do, no events on channels, no work
|
|
pub fn has_any_work(&self) -> bool {
|
|
pub fn has_any_work(&self) -> bool {
|
|
- self.lane.has_work() || self.has_pending_events()
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- pub fn has_pending_events(&self) -> bool {
|
|
|
|
- !self.ui_events.is_empty()
|
|
|
|
|
|
+ !(self.dirty_scopes.is_empty() && self.ui_events.is_empty())
|
|
}
|
|
}
|
|
|
|
|
|
/// re-balance the work lanes, ensuring high-priority work properly bumps away low priority work
|
|
/// re-balance the work lanes, ensuring high-priority work properly bumps away low priority work
|
|
@@ -304,35 +299,39 @@ impl Scheduler {
|
|
|
|
|
|
fn save_work(&mut self, lane: SavedDiffWork) {
|
|
fn save_work(&mut self, lane: SavedDiffWork) {
|
|
let saved: SavedDiffWork<'static> = unsafe { std::mem::transmute(lane) };
|
|
let saved: SavedDiffWork<'static> = unsafe { std::mem::transmute(lane) };
|
|
- self.lane.saved_state = Some(saved);
|
|
|
|
|
|
+ self.saved_state = Some(saved);
|
|
}
|
|
}
|
|
|
|
|
|
- fn load_work(&mut self) -> SavedDiffWork<'static> {
|
|
|
|
- // match self.current_priority {
|
|
|
|
- // EventPriority::Immediate => todo!(),
|
|
|
|
- // EventPriority::High => todo!(),
|
|
|
|
- // EventPriority::Medium => todo!(),
|
|
|
|
- // EventPriority::Low => todo!(),
|
|
|
|
- // }
|
|
|
|
- unsafe { self.lane.saved_state.take().unwrap().extend() }
|
|
|
|
|
|
+ unsafe fn load_work(&mut self) -> SavedDiffWork<'static> {
|
|
|
|
+ self.saved_state.take().unwrap().extend()
|
|
}
|
|
}
|
|
|
|
|
|
pub fn handle_channel_msg(&mut self, msg: SchedulerMsg) {
|
|
pub fn handle_channel_msg(&mut self, msg: SchedulerMsg) {
|
|
match msg {
|
|
match msg {
|
|
|
|
+ //
|
|
|
|
+ SchedulerMsg::Task(msg) => todo!(),
|
|
|
|
+
|
|
SchedulerMsg::Immediate(_) => todo!(),
|
|
SchedulerMsg::Immediate(_) => todo!(),
|
|
|
|
|
|
SchedulerMsg::UiEvent(event) => {
|
|
SchedulerMsg::UiEvent(event) => {
|
|
//
|
|
//
|
|
|
|
|
|
- self.handle_ui_event(event);
|
|
|
|
- }
|
|
|
|
|
|
+ let (discrete, priority) = event_meta(&event);
|
|
|
|
|
|
- //
|
|
|
|
- SchedulerMsg::SubmitTask(_, _) => todo!(),
|
|
|
|
- SchedulerMsg::ToggleTask(_) => todo!(),
|
|
|
|
- SchedulerMsg::PauseTask(_) => todo!(),
|
|
|
|
- SchedulerMsg::ResumeTask(_) => todo!(),
|
|
|
|
- SchedulerMsg::DropTask(_) => todo!(),
|
|
|
|
|
|
+ if let Some(scope) = self.pool.get_scope_mut(event.scope) {
|
|
|
|
+ if let Some(element) = event.mounted_dom_id {
|
|
|
|
+ // TODO: bubble properly here
|
|
|
|
+ scope.call_listener(event.event, element);
|
|
|
|
+
|
|
|
|
+ while let Ok(Some(dirty_scope)) = self.receiver.try_next() {
|
|
|
|
+ //
|
|
|
|
+ // self.add_dirty_scope(dirty_scope, trigger.priority)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ discrete;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -346,22 +345,22 @@ impl Scheduler {
|
|
) -> bool {
|
|
) -> bool {
|
|
// Work through the current subtree, and commit the results when it finishes
|
|
// Work through the current subtree, and commit the results when it finishes
|
|
// When the deadline expires, give back the work
|
|
// When the deadline expires, give back the work
|
|
- let saved_state = self.load_work();
|
|
|
|
|
|
+ let saved_state = unsafe { self.load_work() };
|
|
|
|
|
|
// We have to split away some parts of ourself - current lane is borrowed mutably
|
|
// We have to split away some parts of ourself - current lane is borrowed mutably
|
|
- let mut shared = self.pool.clone();
|
|
|
|
- let mut machine = unsafe { saved_state.promote(&mut shared) };
|
|
|
|
|
|
+ let shared = self.pool.clone();
|
|
|
|
+ let mut machine = unsafe { saved_state.promote(&shared) };
|
|
|
|
|
|
if machine.stack.is_empty() {
|
|
if machine.stack.is_empty() {
|
|
let shared = self.pool.clone();
|
|
let shared = self.pool.clone();
|
|
|
|
|
|
- self.lane.dirty_scopes.sort_by(|a, b| {
|
|
|
|
|
|
+ self.dirty_scopes.sort_by(|a, b| {
|
|
let h1 = shared.get_scope(*a).unwrap().height;
|
|
let h1 = shared.get_scope(*a).unwrap().height;
|
|
let h2 = shared.get_scope(*b).unwrap().height;
|
|
let h2 = shared.get_scope(*b).unwrap().height;
|
|
h1.cmp(&h2)
|
|
h1.cmp(&h2)
|
|
});
|
|
});
|
|
|
|
|
|
- if let Some(scope) = self.lane.dirty_scopes.pop() {
|
|
|
|
|
|
+ if let Some(scope) = self.dirty_scopes.pop() {
|
|
let component = self.pool.get_scope(scope).unwrap();
|
|
let component = self.pool.get_scope(scope).unwrap();
|
|
let (old, new) = (component.frames.wip_head(), component.frames.fin_head());
|
|
let (old, new) = (component.frames.wip_head(), component.frames.fin_head());
|
|
machine.stack.push(DiffInstruction::Diff { new, old });
|
|
machine.stack.push(DiffInstruction::Diff { new, old });
|
|
@@ -378,7 +377,7 @@ impl Scheduler {
|
|
false
|
|
false
|
|
} else {
|
|
} else {
|
|
for node in saved.seen_scopes.drain() {
|
|
for node in saved.seen_scopes.drain() {
|
|
- self.lane.dirty_scopes.remove(&node);
|
|
|
|
|
|
+ self.dirty_scopes.remove(&node);
|
|
}
|
|
}
|
|
|
|
|
|
let mut new_mutations = Mutations::new();
|
|
let mut new_mutations = Mutations::new();
|
|
@@ -395,9 +394,9 @@ impl Scheduler {
|
|
/// Uses some fairly complex logic to schedule what work should be produced.
|
|
/// Uses some fairly complex logic to schedule what work should be produced.
|
|
///
|
|
///
|
|
/// Returns a list of successful mutations.
|
|
/// Returns a list of successful mutations.
|
|
- pub async fn work_with_deadline<'a>(
|
|
|
|
|
|
+ pub fn work_with_deadline<'a>(
|
|
&'a mut self,
|
|
&'a mut self,
|
|
- deadline: impl Future<Output = ()>,
|
|
|
|
|
|
+ mut deadline: impl FnMut() -> bool,
|
|
) -> Vec<Mutations<'a>> {
|
|
) -> Vec<Mutations<'a>> {
|
|
/*
|
|
/*
|
|
Strategy:
|
|
Strategy:
|
|
@@ -429,33 +428,29 @@ impl Scheduler {
|
|
- but if we received both - then we don't need to diff, do we? run as many as we can and then finally diff?
|
|
- but if we received both - then we don't need to diff, do we? run as many as we can and then finally diff?
|
|
*/
|
|
*/
|
|
let mut committed_mutations = Vec::<Mutations<'static>>::new();
|
|
let mut committed_mutations = Vec::<Mutations<'static>>::new();
|
|
- pin_mut!(deadline);
|
|
|
|
|
|
|
|
loop {
|
|
loop {
|
|
- // Move work out of the scheduler message receiver and into dedicated message lanes
|
|
|
|
- self.manually_poll_events();
|
|
|
|
-
|
|
|
|
- // Wait for any new events if we have nothing to do
|
|
|
|
- // todo: poll the events once even if there is work to do to prevent starvation
|
|
|
|
- if !self.has_any_work() {
|
|
|
|
- futures_util::select! {
|
|
|
|
- // todo: find a solution for the exhausted queue problem
|
|
|
|
- // msg = self.async_tasks.next() => {}
|
|
|
|
- msg = self.receiver.next() => {
|
|
|
|
- log::debug!("received work in scheduler");
|
|
|
|
- self.handle_channel_msg(msg.unwrap())
|
|
|
|
- },
|
|
|
|
- _ = (&mut deadline).fuse() => return committed_mutations,
|
|
|
|
|
|
+ // switch our priority, pop off any work
|
|
|
|
+ for event in self.ui_events.drain(..) {
|
|
|
|
+ if let Some(scope) = self.pool.get_scope_mut(event.scope) {
|
|
|
|
+ if let Some(element) = event.mounted_dom_id {
|
|
|
|
+ // TODO: bubble properly here
|
|
|
|
+ scope.call_listener(event.event, element);
|
|
|
|
+
|
|
|
|
+ while let Ok(Some(dirty_scope)) = self.receiver.try_next() {
|
|
|
|
+ match dirty_scope {
|
|
|
|
+ SchedulerMsg::Immediate(im) => {
|
|
|
|
+ self.dirty_scopes.insert(im);
|
|
|
|
+ }
|
|
|
|
+ _ => todo!(),
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- // switch our priority, pop off any work
|
|
|
|
- self.prepare_work();
|
|
|
|
-
|
|
|
|
- let mut deadline_reached = || (&mut deadline).now_or_never().is_some();
|
|
|
|
-
|
|
|
|
let finished_before_deadline =
|
|
let finished_before_deadline =
|
|
- self.work_on_current_lane(&mut deadline_reached, &mut committed_mutations);
|
|
|
|
|
|
+ self.work_on_current_lane(&mut deadline, &mut committed_mutations);
|
|
|
|
|
|
if !finished_before_deadline {
|
|
if !finished_before_deadline {
|
|
break;
|
|
break;
|
|
@@ -471,7 +466,9 @@ impl Scheduler {
|
|
pub fn work_sync<'a>(&'a mut self) -> Vec<Mutations<'a>> {
|
|
pub fn work_sync<'a>(&'a mut self) -> Vec<Mutations<'a>> {
|
|
let mut committed_mutations = Vec::new();
|
|
let mut committed_mutations = Vec::new();
|
|
|
|
|
|
- self.manually_poll_events();
|
|
|
|
|
|
+ while let Ok(Some(msg)) = self.receiver.try_next() {
|
|
|
|
+ self.handle_channel_msg(msg);
|
|
|
|
+ }
|
|
|
|
|
|
if !self.has_any_work() {
|
|
if !self.has_any_work() {
|
|
return committed_mutations;
|
|
return committed_mutations;
|
|
@@ -534,23 +531,3 @@ impl Scheduler {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
-pub(crate) struct PriorityLane {
|
|
|
|
- pub dirty_scopes: IndexSet<ScopeId>,
|
|
|
|
- pub saved_state: Option<SavedDiffWork<'static>>,
|
|
|
|
- pub in_progress: bool,
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-impl PriorityLane {
|
|
|
|
- pub fn new() -> Self {
|
|
|
|
- Self {
|
|
|
|
- saved_state: None,
|
|
|
|
- dirty_scopes: Default::default(),
|
|
|
|
- in_progress: false,
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- fn has_work(&self) -> bool {
|
|
|
|
- !self.dirty_scopes.is_empty() || self.in_progress
|
|
|
|
- }
|
|
|
|
-}
|
|
|