Browse Source

work on upward pass

= 2 years ago
parent
commit
4096861c59
2 changed files with 45 additions and 27 deletions
  1. 45 7
      packages/native-core/src/tree.rs
  2. 0 20
      packages/native-core/src/worker_pool.rs

+ 45 - 7
packages/native-core/src/tree.rs

@@ -1,4 +1,5 @@
 use core::panic;
+use crossbeam_deque::{Injector, Stealer, Worker};
 use parking_lot::lock_api::RawMutex as _;
 use parking_lot::{RawMutex, RwLock};
 use slab::Slab;
@@ -6,6 +7,7 @@ use std::cell::UnsafeCell;
 use std::collections::VecDeque;
 use std::marker::PhantomData;
 use std::sync::Arc;
+use std::thread;
 
 #[derive(Hash, PartialEq, Eq, Clone, Copy, Debug, PartialOrd, Ord)]
 pub struct NodeId(pub usize);
@@ -824,15 +826,51 @@ trait UpwardPass<T> {
     fn upward_pass(&mut self, node: &mut T, parent: Option<&mut T>) -> bool;
 
     fn resolve_pass(&mut self, tree: &mut impl TreeView<T>, starting_nodes: &[NodeId]) {
-        let mut stack = Vec::new();
+        let global = Injector::default();
         for node in starting_nodes {
-            stack.push(*node);
+            global.push(*node);
         }
-        while let Some(node_id) = stack.pop() {
-            let (node, parent) = tree.node_parent_mut(node_id).unwrap();
-            if self.upward_pass(node, parent) {
-                stack.push(tree.parent_id(node_id).unwrap());
+
+        let core_count = thread::available_parallelism()
+            .map(|c| c.get())
+            .unwrap_or(1);
+        let workers: Vec<_> = (0..core_count).map(|_| Worker::new_fifo()).collect();
+        let stealers: Vec<_> = workers.iter().map(|w| w.stealer()).collect();
+        let shared_view = SharedView::new(tree);
+        thread::scope(|s| {
+            let global = &global;
+            let stealers = &stealers;
+            for (_, w) in (0..core_count).zip(workers.into_iter()) {
+                let mut shared_view = shared_view.clone();
+                s.spawn(move || {
+                    while let Some(id) = find_task(&w, &global, &stealers) {
+                        let (node, parent) = shared_view.node_parent_mut(id).unwrap();
+                        if self.upward_pass(node, parent) {
+                            if let Some(id) = shared_view.parent_id(id) {
+                                w.push(id);
+                            }
+                        }
+                    }
+                });
             }
-        }
+        });
     }
 }
+
+fn find_task<T>(local: &Worker<T>, global: &Injector<T>, stealers: &[Stealer<T>]) -> Option<T> {
+    // Pop a task from the local queue, if not empty.
+    local.pop().or_else(|| {
+        // Otherwise, we need to look for a task elsewhere.
+        std::iter::repeat_with(|| {
+            // Try stealing a batch of tasks from the global queue.
+            global
+                .steal_batch_and_pop(local)
+                // Or try stealing a task from one of the other threads.
+                .or_else(|| stealers.iter().map(|s| s.steal()).collect())
+        })
+        // Loop while no task was stolen and any steal operation needs to be retried.
+        .find(|s| !s.is_retry())
+        // Extract the stolen task, if there is one.
+        .and_then(|s| s.success())
+    })
+}

+ 0 - 20
packages/native-core/src/worker_pool.rs

@@ -1,20 +0,0 @@
-use crossbeam_deque::{Injector, Stealer, Worker};
-use std::iter;
-
-fn find_task<T>(local: &Worker<T>, global: &Injector<T>, stealers: &[Stealer<T>]) -> Option<T> {
-    // Pop a task from the local queue, if not empty.
-    local.pop().or_else(|| {
-        // Otherwise, we need to look for a task elsewhere.
-        iter::repeat_with(|| {
-            // Try stealing a batch of tasks from the global queue.
-            global
-                .steal_batch_and_pop(local)
-                // Or try stealing a task from one of the other threads.
-                .or_else(|| stealers.iter().map(|s| s.steal()).collect())
-        })
-        // Loop while no task was stolen and any steal operation needs to be retried.
-        .find(|s| !s.is_retry())
-        // Extract the stolen task, if there is one.
-        .and_then(|s| s.success())
-    })
-}