1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
//! Defines structs for operating on ISPC task groups and getting chunks //! of a task to be scheduled on to threads use libc; use aligned_alloc; use std::cmp; use std::iter::Iterator; use std::sync::{Mutex, RwLock, Arc}; use std::sync::atomic::{self, AtomicUsize, AtomicPtr}; /// A pointer to an ISPC task function. /// /// The ISPC task function pointer is: /// ```c /// void (*TaskFuncPtr)(void *data, int threadIndex, int threadCount, /// int taskIndex, int taskCount, /// int taskIndex0, int taskIndex1, int taskIndex2, /// int taskCount0, int taskCount1, int taskCount2); /// ``` pub type ISPCTaskFn = extern "C" fn(data: *mut libc::c_void, thread_idx: libc::c_int, thread_cnt: libc::c_int, task_idx: libc::c_int, task_cnt: libc::c_int, task_idx0: libc::c_int, task_idx1: libc::c_int, task_idx2: libc::c_int, task_cnt0: libc::c_int, task_cnt1: libc::c_int, task_cnt2: libc::c_int); /// A list of all task groups spawned by a function in some launch context which /// will be sync'd at an explicit `sync` call or function exit. /// /// **Note:** A Context is done if and only if `ISPCSync` has been called with /// its handle and all of its tasks are finished. Until `ISPCSync` is called on the /// Context's handle more tasks could be launched. /// /// Additionally, because we're not really able to associate a call to `ISPCAlloc` /// with a specific Group care must be taken that the Context is not dropped /// until `ISPCSync` has been called on its handle and all Groups within have /// completed execution. #[derive(Debug)] pub struct Context { /// Task groups launched by this function /// TODO: Must be protected by a Reader-Writer lock, though I don't think we'd want to /// protect each Group, it'd be an RwLock<Vec<Group>> /// PROBLEM: If we're accessing this from multiple threads and have other threads /// working on the group when we want to push a new group on we'll get stuck until those /// tasks finish because they'll have a read lock on the vec to access the Group safely. /// I guess an easy fix would be to push groups behind Arcs? But then how would the /// Chunk get the Arc? tasks: RwLock<Vec<Arc<Group>>>, /// The memory allocated for the various task group's parameters mem: Mutex<Vec<AtomicPtr<libc::c_void>>>, /// A unique identifier for this context pub id: usize, } impl Context { /// Create a new list of tasks for some function with id `id` pub fn new(id: usize) -> Context { Context { tasks: RwLock::new(Vec::new()), mem: Mutex::new(Vec::new()), id } } /// Add a task group for execution that was launched in this context pub fn launch(&self, total: (i32, i32, i32), data: *mut libc::c_void, fcn: ISPCTaskFn) { self.tasks.write().unwrap().push(Arc::new(Group::new(total, AtomicPtr::new(data), fcn))); } /// Check if all tasks currently in the task list are completed /// /// **Note:** A Context is done if and only if ISPCSync has been called with /// its handle and all of its tasks are finished. Until ISPCSync is called on the /// Context's handle more tasks could be launched. /// TODO: With this design we're essentially requiring the thread waiting on the context /// to busy wait since we provide no condition variable to block on. pub fn current_tasks_done(&self) -> bool { self.tasks.read().unwrap().iter().all(|t| t.is_finished()) } /// Allocate some memory for this Context's task groups, returns a pointer to the allocated memory. pub unsafe fn alloc(&self, size: usize, align: usize) -> *mut libc::c_void { // TODO: The README for this lib mentions it may be slow. Maybe use some other allocator? let ptr = aligned_alloc::aligned_alloc(size as usize, align as usize) as *mut libc::c_void; let mut mem = self.mem.lock().unwrap(); mem.push(AtomicPtr::new(ptr)); ptr } /// An iterator over the **current** groups in the context which have remaining tasks to /// run on a thread. If more task groups are added before this iterator has returned /// None those will appear as well. pub fn iter(&self) -> ContextIter { ContextIter { context: self } } /// Get a Group with tasks remaining to be executed, returns None if there /// are no groups left to run in this context. /// /// Note that you can't assume that the Group you get back is guaranteed /// to have tasks remaining since between the time of checking that the /// group has outstanding tasks and getting the group back to call `chunks` /// those remaining tasks may have been taken by another threaad. fn get_active_group(&self) -> Option<Arc<Group>> { let tasks = self.tasks.read().unwrap(); for group in tasks.iter() { if group.has_tasks() { return Some(Arc::clone(&group)); } } None } } impl Drop for Context { /// Release memory for all the tasks in this context /// /// **Note:** that because we're not really able to associate a call to ISPCAlloc /// with a specific Group care must be taken that the Context is not dropped /// until ISPCSync has been called on its handle and all Groups within have /// completed execution. fn drop(&mut self) { let mut mem = self.mem.lock().unwrap(); for ptr in mem.drain(0..) { let m = ptr.load(atomic::Ordering::SeqCst); unsafe { aligned_alloc::aligned_free(m as *mut ()); } } } } /// An iterator over the **current** groups in the context which have remaining tasks to /// run on a thread. If more task groups are added before this iterator has returned /// None those will appear as well. pub struct ContextIter<'a> { context: &'a Context, } impl<'a> Iterator for ContextIter<'a> { type Item = Arc<Group>; /// Get a Group with tasks remaining to be executed, returns None if there /// are no groups left to run in this context. /// /// Note that you can't assume that the Group you get back is guaranteed /// to have tasks remaining since between the time of checking that the /// group has outstanding tasks and getting the group back to call `chunks` /// those remaining tasks may have been taken by another threaad. fn next(&mut self) -> Option<Arc<Group>> { self.context.get_active_group() } } /// A group of tasks spawned by a call to `launch` in ISPC #[derive(Debug)] pub struct Group { /// Current starting index to execute the remaining tasks in this group /// TODO: Protect start by a mutex since it will be modified by `get_chunk` /// which would get a chunk of tasks to be executed along with a copy of the /// total, fcn ptr and data. This would be wrapped in to a struct, `Chunk` which /// would expose next() and behave like an iterator to go through the chunk of tasks /// and run them. Right now we just schedule tasks like in a nested for loop, /// would some tiled scheduling be better? start: AtomicUsize, end: usize, /// Total number of tasks scheduled in this group pub total: (i32, i32, i32), /// Function to run for this task pub fcn: ISPCTaskFn, /// Data pointer to user params to pass to the function pub data: AtomicPtr<libc::c_void>, /// Tracks how many chunks we've given out so far to threads chunks_launched: AtomicUsize, /// Tracks how many of the chunks we gave out are completed. A group is finished /// only when all chunks are done and start >= total tasks, call `is_finished` to check. /// /// I'm unsure whether or semaphore/condvar would be the better choice here /// The TASK_LIST would want to send an alert when new tasks are pushed so in /// Sync we could wait on the context to finish? /// TODO: We can't just have the last chunk executed mark the group as done /// because earlier chunks might still be running! We need to mark ourselves chunks_finished: AtomicUsize, } impl Group { /// Create a new task group for execution of the function pub fn new(total: (i32, i32, i32), data: AtomicPtr<libc::c_void>, fcn: ISPCTaskFn) -> Group { Group { start: AtomicUsize::new(0), end: (total.0 * total.1 * total.2) as usize, total, data, fcn, chunks_launched: AtomicUsize::new(0), chunks_finished: AtomicUsize::new(0) } } /// Get an iterator over `chunk_size` chunks of tasks to be executed for this group pub fn chunks(&self, chunk_size: usize) -> GroupChunks { GroupChunks { group: self, chunk_size } } /// Check if all tasks for this group have been completed pub fn is_finished(&self) -> bool { let finished = self.chunks_finished.load(atomic::Ordering::SeqCst); let launched = self.chunks_launched.load(atomic::Ordering::SeqCst); let start = self.start.load(atomic::Ordering::SeqCst); // This shouldn't happen, if it does some bad threading voodoo is afoot assert!(finished <= launched); finished == launched && start >= self.end } /// Check if this group has tasks left to execute fn has_tasks(&self) -> bool { let start = self.start.load(atomic::Ordering::SeqCst); start < self.end } /// Get a chunk of tasks from the group to run if there are any tasks left to run /// /// `desired_tasks` specifies the number of tasks we'd like the chunk to contain, /// though you may get fewer if there aren't that many tasks left. If the chunk /// you get is the last chunk to be executed (`chunk.end == total.0 * total.1 * total.2`) /// you must mark this group as finished upon completing execution of the chunk fn get_chunk(&self, desired_tasks: usize) -> Option<Chunk> { let start = self.start.fetch_add(desired_tasks, atomic::Ordering::SeqCst); if start < self.end { // Give the chunk 4 tasks or whatever remain let c = Some(Chunk::new(self, start, cmp::min(start + desired_tasks, self.end))); self.chunks_launched.fetch_add(1, atomic::Ordering::SeqCst); c } else { None } } } /// An iterator over chunks of tasks to be executed in a Group pub struct GroupChunks<'a> { group: &'a Group, chunk_size: usize, } impl<'a> Iterator for GroupChunks<'a> { type Item = Chunk<'a>; /// Get the next chunk of tasks to be executed fn next(&mut self) -> Option<Chunk<'a>> { self.group.get_chunk(self.chunk_size) } } /// A chunk of tasks from a Group to be executed /// /// Executes task in the range [start, end) #[derive(Debug)] pub struct Chunk<'a> { /// The next task to be executed in this chunk start: i32, /// The last task to be executed in this chunk end: i32, /// Total number of tasks scheduled in the group this chunk came from total: (i32, i32, i32), /// Function to run for this task fcn: ISPCTaskFn, /// Data pointer to user params to pass to the function data: AtomicPtr<libc::c_void>, /// The group this chunk is running tasks from group: &'a Group, } impl<'a> Chunk<'a> { /// Create a new chunk to execute tasks in the group from [start, end) pub fn new(group: &'a Group, start: usize, end: usize) -> Chunk { let d = AtomicPtr::new(group.data.load(atomic::Ordering::SeqCst)); Chunk { start: start as i32, end: end as i32, total: group.total, fcn: group.fcn, data: d, group } } /// Execute all tasks in this chunk pub fn execute(&self, thread_id: i32, total_threads: i32) { let total_tasks = self.total.0 * self.total.1 * self.total.2; let data = self.data.load(atomic::Ordering::SeqCst); for t in self.start..self.end { let id = self.task_indices(t); (self.fcn)(data, thread_id as libc::c_int, total_threads as libc::c_int, t as libc::c_int, total_tasks as libc::c_int, id.0 as libc::c_int, id.1 as libc::c_int, id.2 as libc::c_int, self.total.0 as libc::c_int, self.total.1 as libc::c_int, self.total.2 as libc::c_int); } // Tell the group this chunk is done self.group.chunks_finished.fetch_add(1, atomic::Ordering::SeqCst); } /// Get the global task id for the task index fn task_indices(&self, id: i32) -> (i32, i32, i32) { (id % self.total.0, (id / self.total.0) % self.total.1, id / (self.total.0 * self.total.1)) } }