1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
//! Defines the trait that must be implemented by ISPC task execution systems //! and provides a default threaded one for use. use libc; use num_cpus; use std::time::Duration; use std::cell::RefCell; use std::sync::{Arc, RwLock, Mutex}; use std::sync::atomic::{self, AtomicUsize}; use std::thread::{self, JoinHandle}; use task::{ISPCTaskFn, Context}; /// Trait to be implemented to provide ISPC task execution functionality. /// /// The runtime [required functions](http://ispc.github.io/ispc.html#task-parallelism-runtime-requirements) /// for the ISPC task runtime will be forwarded directly to your struct, making this interface unsafe. pub trait TaskSystem { /// Alloc is called when memory must be allocated to store parameters to pass to a task /// and must return a pointer to an allocation of `size` bytes aligned to `align`. /// /// The `handle_ptr` will be `NULL` if this is the first time launch has been called in /// the function or is the first launch call after an explicit `sync` statement. Both /// situations should be treated equivalently as creating a new exeuction context for tasks. /// The `handle_ptr` should be set to some context tracking facility so that you can later /// track task groups launched in the context and perform finer grained synchronization in /// `sync`. unsafe fn alloc(&self, handle_ptr: *mut *mut libc::c_void, size: i64, align: i32) -> *mut libc::c_void; /// Launch is called when a new group of tasks is being launched and should schedule them to /// be executed in some way. /// /// The `handle_ptr` will point to the same handle you set up in `alloc` and can be used to /// associate groups of tasks with a context of execution as mentioned before. The function `f` /// should be executed `count0 * count1 * count2` times and indices passed to the function /// should be as if running in a nested for loop, though no serial ordering is actually /// required. The `data` pointer points to the ISPC task specific parameter pointer and /// should be passed through to the function. /// /// For example, a serial task launcher could just run the tasks in a nested loop: /// ```ignore /// let total_tasks = count0 * count1 * count2; /// for z in 0..count2 { /// for y in 0..count1 { /// for x in 0..count0 { /// let task_id = x + y * count0 + z * count0 * count1; /// f(data, thread_id, total_threads, task_id, total_tasks, /// x, y, z, count0, count1, count2); /// } /// } /// } /// ``` unsafe fn launch(&self, handle_ptr: *mut *mut libc::c_void, f: ISPCTaskFn, data: *mut libc::c_void, count0: i32, count1: i32, count2: i32); /// Synchronize an execution context with the tasks it's launched. Use `handle` to determine /// the task context that's being synchronized. /// /// This function should not return until all tasks launched within the context being /// synchronized with have been completed. You can use the `handle` to determine which context /// is being synchronized with and thus which tasks must be completed before returning. unsafe fn sync(&self, handle: *mut libc::c_void); } // Thread local storage to store the thread's id, otherwise we don't know // who we are in sync. The thread id starts at an invalid value but will be set // upon thread launch thread_local!(static THREAD_ID: RefCell<usize> = RefCell::new(0)); /// A multithreaded execution environment for the tasks launched in ISPC pub struct Parallel { context_list: RwLock<Vec<Arc<Context>>>, next_context_id: AtomicUsize, threads: Mutex<Vec<JoinHandle<()>>>, chunk_size: usize, } impl Parallel { /// Create a parallel task execution environment that will use `num_cpus` threads /// to run tasks. pub fn new() -> Arc<Parallel> { Parallel::oversubscribed(1.0) } /// Create an oversubscribued parallel task execution environment that will use /// `oversubscribe * num_cpus` threads to run tasks. pub fn oversubscribed(oversubscribe: f32) -> Arc<Parallel> { assert!(oversubscribe >= 1.0); let par = Arc::new(Parallel { context_list: RwLock::new(Vec::new()), next_context_id: AtomicUsize::new(0), threads: Mutex::new(Vec::new()), chunk_size: 8 }); { let mut threads = par.threads.lock().unwrap(); let num_threads = (oversubscribe * num_cpus::get() as f32) as usize; let chunk_size = par.chunk_size; for i in 0..num_threads { let task_sys = Arc::clone(&par); // Note that the spawned thread ids start at 1 since the main thread is 0 threads.push(thread::spawn(move || Parallel::worker_thread(task_sys, i + 1, num_threads + 1, chunk_size))); } } par } /// Return a context that has remaining tasks left to be executed by a thread, returns None /// if no contexts have remaining tasks. /// /// Note that due to threading issues you shouldn't assume the context returned actually has /// outstanding tasks by the time it's returned to the caller and a chunk is requested. fn get_context(&self) -> Option<Arc<Context>> { self.context_list.read().unwrap().iter().find(|c| !c.current_tasks_done()).cloned() } fn worker_thread(task_sys: Arc<Parallel>, thread: usize, total_threads: usize, chunk_size: usize) { THREAD_ID.with(|f| *f.borrow_mut() = thread); loop { // Get a task group to run while let Some(c) = task_sys.get_context() { for tg in c.iter() { for chunk in tg.chunks(chunk_size) { chunk.execute(thread as i32, total_threads as i32); } } } // We ran out of contexts to get, so wait a bit for a new group to get launched // TODO: This could result in some threads remaining parked even if new contexts // have been launched if they're unparked then immediately park. Would be better to // set up a condition var or something that the workers can wait on to be signaled // when new work arrives. thread::park(); } } } impl TaskSystem for Parallel { unsafe fn alloc(&self, handle_ptr: *mut *mut libc::c_void, size: i64, align: i32) -> *mut libc::c_void { // If the handle is null this is the first time this function has spawned tasks // and we should create a new Context structure in the TASK_LIST for it, otherwise // it's the pointer to where we should append the new Group if (*handle_ptr).is_null() { let mut context_list = self.context_list.write().unwrap(); // This is a bit hairy. We allocate the new task context in a box, then // unbox it into a raw ptr to get a ptr we can pass back to ISPC through // the handle_ptr and then re-box it into our TASK_LIST so it will // be free'd properly when we erase it from the vector in ISPCSync let c = Arc::new(Context::new(self.next_context_id.fetch_add(1, atomic::Ordering::SeqCst))); { let h = &*c; *handle_ptr = h as *const Context as *mut libc::c_void; } context_list.push(c); let ctx = context_list.last().unwrap(); ctx.alloc(size as usize, align as usize) } else { let context_list = self.context_list.read().unwrap(); let handle_ctx = *handle_ptr as *mut Context; let ctx = context_list.iter().find(|c| (*handle_ctx).id == c.id).unwrap(); ctx.alloc(size as usize, align as usize) } } unsafe fn launch(&self, handle_ptr: *mut *mut libc::c_void, f: ISPCTaskFn, data: *mut libc::c_void, count0: i32, count1: i32, count2: i32) { // Push the tasks being launched on to the list of task groups for this function let context: &mut Context = &mut *(*handle_ptr as *mut Context); context.launch((count0, count1, count2), data, f); // Unpark any sleeping threads since we have jobs for them let threads = self.threads.lock().unwrap(); for t in threads.iter() { t.thread().unpark(); } } unsafe fn sync(&self, handle: *mut libc::c_void) { //let context: &mut Context = mem::transmute(handle); let context: &mut Context = &mut *(handle as *mut Context); let thread = THREAD_ID.with(|f| *f.borrow()); let total_threads = num_cpus::get(); // Make sure all tasks are done, and execute them if not for this simple // serial version. TODO: In the future we'd wait on each Group's semaphore or atomic bool // Maybe the waiting thread could help execute tasks as well, otherwise it might be // possible to deadlock, where all threads are waiting for some enqueue'd tasks but no // threads are available to run them. Just running tasks in our context is not sufficient // to prevent deadlock actually, because those tasks could in turn launch & sync and get stuck // so if our tasks aren't done and there's none left to run in our context we should start // running tasks from other contexts to help out for tg in context.iter() { for chunk in tg.chunks(self.chunk_size) { // TODO: We need to figure out which thread we are chunk.execute(thread as i32, total_threads as i32); } } // If all the tasks for this context have been finished we're done sync'ing and can // clean up memory and remove the context from the TASK_LIST. Otherwise there are some // unfinished groups further down the the tree that were spawned by our direct tasks that // those are now sync'ing on and we need to help out. However since we don't know the tree // our best option is to just start grabbing chunks from unfinished groups in the TASK_LIST // and running them to at least ensure global forward progress, which will eventually get // the stuff we're waiting on to finish. After each chunk execution we should check if // our sync'ing context is done and break while !context.current_tasks_done() { // Get a task group to run while let Some(c) = self.get_context() { let mut ran_some = false; for tg in c.iter() { for chunk in tg.chunks(self.chunk_size) { ran_some = true; chunk.execute(thread as i32, total_threads as i32); } } if !ran_some { thread::sleep(Duration::from_millis(50)); } } } // Now erase this context from our vector let mut context_list = self.context_list.write().unwrap(); let pos = context_list.iter().position(|c| context.id == c.id).unwrap(); context_list.remove(pos); } }