use std::cell::UnsafeCell;
use std::cmp;
use std::fmt;
use std::mem;
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, SeqCst};
use epoch::{self, Atomic, Pin, Ptr};
#[cfg(not(feature = "strict_gc"))]
const MAX_OBJECTS: usize = 64;
#[cfg(feature = "strict_gc")]
const MAX_OBJECTS: usize = 4;
pub static EPOCH: AtomicUsize = ATOMIC_USIZE_INIT;
pub struct Bag {
len: AtomicUsize,
objects: [UnsafeCell<(unsafe fn(*mut u8, usize), *mut u8, usize)>; MAX_OBJECTS],
epoch: usize,
next: Atomic<Bag>,
}
impl Bag {
pub fn new() -> Self {
Bag {
len: AtomicUsize::new(0),
objects: unsafe { mem::zeroed() },
epoch: unsafe { mem::uninitialized() },
next: Atomic::null(0),
}
}
pub fn is_empty(&self) -> bool {
self.len.load(Relaxed) == 0
}
pub fn try_insert<T>(&self, destroy: unsafe fn(*mut T, usize), object: *mut T, count: usize)
-> bool {
let destroy: unsafe fn(*mut u8, usize) = unsafe { mem::transmute(destroy) };
let object = object as *mut u8;
let mut len = self.len.load(Acquire);
loop {
if len == self.objects.len() {
return false;
}
match self.len.compare_exchange(len, len + 1, AcqRel, Acquire) {
Ok(_) => {
unsafe { *self.objects[len].get() = (destroy, object, count) }
return true;
}
Err(l) => len = l,
}
}
}
unsafe fn destroy_all_objects(&self) {
for cell in self.objects.iter().take(self.len.load(Relaxed)) {
let (destroy, object, count) = *cell.get();
destroy(object, count);
}
}
}
pub struct Garbage {
head: Atomic<Bag>,
tail: Atomic<Bag>,
pending: Atomic<Bag>,
}
unsafe impl Send for Garbage {}
unsafe impl Sync for Garbage {}
impl Garbage {
pub fn new() -> Self {
let garbage = Garbage {
head: Atomic::null(0),
tail: Atomic::null(0),
pending: Atomic::null(0),
};
let pin = unsafe { &mem::zeroed::<Pin>() };
let sentinel = garbage.head.store_box(Box::new(Bag::new()), 0, pin);
garbage.tail.store(sentinel);
garbage
}
fn replace_pending<'p>(&self, old: Ptr<'p, Bag>, pin: &'p Pin)
-> Result<Ptr<'p, Bag>, Ptr<'p, Bag>> {
match self.pending.cas_box(old, Box::new(Bag::new()), 0) {
Ok(new) => {
if !old.is_null() {
let bag = unsafe { Box::from_raw(old.as_raw()) };
self.push(bag, pin);
}
epoch::thread::try_advance(pin);
self.collect(pin);
Ok(new)
}
Err((pending, _)) => Err(pending),
}
}
pub unsafe fn defer_free<T>(&self, object: *mut T, count: usize, pin: &Pin) {
unsafe fn free<T>(ptr: *mut T, count: usize) {
drop(Vec::from_raw_parts(ptr, 0, count));
}
self.defer_destroy(free, object, count, pin);
}
pub unsafe fn defer_drop<T>(&self, object: *mut T, count: usize, pin: &Pin) {
unsafe fn destruct<T>(ptr: *mut T, count: usize) {
drop(Vec::from_raw_parts(ptr, count, count));
}
self.defer_destroy(destruct, object, count, pin);
}
pub unsafe fn defer_destroy<T>(
&self,
destroy: unsafe fn(*mut T, usize),
object: *mut T,
count: usize,
pin: &Pin
) {
let mut pending = self.pending.load(pin);
loop {
match pending.as_ref() {
Some(p) if p.try_insert(destroy, object, count) => break,
_ => {
match self.replace_pending(pending, pin) {
Ok(p) => pending = p,
Err(p) => pending = p,
}
}
}
}
}
pub fn flush(&self, pin: &Pin) {
let mut pending = self.pending.load(pin);
loop {
match pending.as_ref() {
None => break,
Some(p) => {
if p.is_empty() {
break;
} else {
match self.replace_pending(pending, pin) {
Ok(_) => break,
Err(p) => pending = p,
}
}
}
}
}
}
pub fn collect(&self, pin: &Pin) {
const COLLECT_STEPS: usize = 8;
let epoch = EPOCH.load(SeqCst);
let condition = |bag: &Bag| {
let diff = epoch.wrapping_sub(bag.epoch);
cmp::min(diff, 0usize.wrapping_sub(diff)) > 2
};
for _ in 0..COLLECT_STEPS {
match self.try_pop_if(&condition, pin) {
None => break,
Some(bag) => unsafe { bag.destroy_all_objects() },
}
}
}
fn push(&self, mut bag: Box<Bag>, pin: &Pin) {
bag.epoch = EPOCH.load(SeqCst);
let mut tail = self.tail.load(pin);
loop {
let next = tail.unwrap().next.load(pin);
if next.is_null() {
match tail.unwrap().next.cas_box(next, bag, 0) {
Ok(bag) => {
let _ = self.tail.cas(tail, bag);
break;
}
Err((t, b)) => {
tail = t;
bag = b;
}
}
} else {
match self.tail.cas(tail, next) {
Ok(()) => tail = next,
Err(t) => tail = t,
}
}
}
}
fn try_pop_if<'p, F>(&self, condition: F, pin: &'p Pin) -> Option<&'p Bag>
where F: Fn(&Bag) -> bool
{
let mut head = self.head.load(pin);
loop {
let next = head.unwrap().next.load(pin);
match next.as_ref() {
Some(n) if condition(n) => {
match self.head.cas(head, next) {
Ok(()) => {
unsafe { epoch::defer_free(head.as_raw(), 1, pin) }
return Some(n);
}
Err(h) => head = h,
}
}
None | Some(_) => return None,
}
}
}
}
impl Drop for Garbage {
fn drop(&mut self) {
unsafe {
let pending = self.pending.load_raw(Relaxed).0;
if !pending.is_null() {
(*pending).destroy_all_objects();
drop(Vec::from_raw_parts(pending, 0, 1));
}
let mut head = self.head.load_raw(Relaxed).0;
loop {
let next = (*head).next.load_raw(Relaxed).0;
drop(Vec::from_raw_parts(head, 0, 1));
if next.is_null() {
break;
}
head = next;
(*head).destroy_all_objects();
}
}
}
}
impl fmt::Debug for Garbage {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Garbage {{ ... }}")
}
}
fn global() -> &'static Garbage {
static GLOBAL: AtomicUsize = ATOMIC_USIZE_INIT;
let current = GLOBAL.load(Acquire);
let garbage = if current == 0 {
let raw = Box::into_raw(Box::new(Garbage::new()));
let new = raw as usize;
let previous = GLOBAL.compare_and_swap(0, new, AcqRel);
if previous == 0 {
new
} else {
unsafe { drop(Box::from_raw(raw)); }
previous
}
} else {
current
};
unsafe { &*(garbage as *const Garbage) }
}
pub fn push(bag: Box<Bag>, pin: &Pin) {
global().push(bag, pin);
}
pub fn collect(pin: &Pin) {
global().collect(pin);
}
#[cfg(feature = "internals")]
pub unsafe fn destroy_global() {
let global = global() as *const Garbage as *mut Garbage;
drop(Box::from_raw(global));
}
#[cfg(test)]
mod tests {
extern crate rand;
use std::mem;
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT};
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;
use std::thread;
use self::rand::{Rng, thread_rng};
use super::Garbage;
use ::epoch;
#[test]
fn smoke() {
let g = Garbage::new();
epoch::pin(|pin| {
let a = Box::into_raw(Box::new(7));
unsafe { g.defer_free(a, 1, pin) }
assert!(!g.pending.load(pin).unwrap().is_empty());
});
}
#[test]
fn flush_pending() {
let g = Garbage::new();
let mut rng = thread_rng();
for _ in 0..100_000 {
epoch::pin(|pin| unsafe {
let a = Box::into_raw(Box::new(7));
g.defer_drop(a, 1, pin);
if rng.gen_range(0, 100) == 0 {
g.flush(pin);
assert!(g.pending.load(pin).unwrap().is_empty());
}
});
}
}
#[test]
fn incremental() {
const COUNT: usize = 100_000;
static DESTROYS: AtomicUsize = ATOMIC_USIZE_INIT;
let g = Garbage::new();
epoch::pin(|pin| unsafe {
for _ in 0..COUNT {
let a = Box::into_raw(Box::new(7i32));
unsafe fn destroy(ptr: *mut i32, count: usize) {
drop(Box::from_raw(ptr));
DESTROYS.fetch_add(count, SeqCst);
}
g.defer_destroy(destroy, a, 1, pin);
}
g.flush(pin);
});
let mut last = 0;
while last < COUNT {
let curr = DESTROYS.load(SeqCst);
assert!(curr - last < 1000);
last = curr;
epoch::pin(|pin| g.collect(pin));
}
assert!(DESTROYS.load(SeqCst) == COUNT);
}
#[test]
fn buffering() {
const COUNT: usize = 10;
static DESTROYS: AtomicUsize = ATOMIC_USIZE_INIT;
let g = Garbage::new();
epoch::pin(|pin| unsafe {
for _ in 0..COUNT {
let a = Box::into_raw(Box::new(7i32));
unsafe fn destroy(ptr: *mut i32, count: usize) {
drop(Box::from_raw(ptr));
DESTROYS.fetch_add(count, SeqCst);
}
g.defer_destroy(destroy, a, 1, pin);
}
});
for _ in 0..100_000 {
epoch::pin(|pin| {
g.collect(pin);
assert!(DESTROYS.load(SeqCst) < COUNT);
});
}
epoch::pin(|pin| g.flush(pin));
while DESTROYS.load(SeqCst) < COUNT {
epoch::pin(|pin| g.collect(pin));
}
assert_eq!(DESTROYS.load(SeqCst), COUNT);
}
#[test]
fn count_drops() {
const COUNT: usize = 100_000;
static DROPS: AtomicUsize = ATOMIC_USIZE_INIT;
struct Elem(i32);
impl Drop for Elem {
fn drop(&mut self) {
DROPS.fetch_add(1, SeqCst);
}
}
let g = Garbage::new();
epoch::pin(|pin| unsafe {
for _ in 0..COUNT {
let a = Box::into_raw(Box::new(Elem(7i32)));
g.defer_drop(a, 1, pin);
}
g.flush(pin);
});
while DROPS.load(SeqCst) < COUNT {
epoch::pin(|pin| g.collect(pin));
}
assert_eq!(DROPS.load(SeqCst), COUNT);
}
#[test]
fn count_destroy() {
const COUNT: usize = 100_000;
static DESTROYS: AtomicUsize = ATOMIC_USIZE_INIT;
let g = Garbage::new();
epoch::pin(|pin| unsafe {
for _ in 0..COUNT {
let a = Box::into_raw(Box::new(7i32));
unsafe fn destroy(ptr: *mut i32, count: usize) {
drop(Box::from_raw(ptr));
DESTROYS.fetch_add(count, SeqCst);
}
g.defer_destroy(destroy, a, 1, pin);
}
g.flush(pin);
});
while DESTROYS.load(SeqCst) < COUNT {
epoch::pin(|pin| g.collect(pin));
}
assert_eq!(DESTROYS.load(SeqCst), COUNT);
}
#[test]
fn drop_array() {
const COUNT: usize = 700;
static DROPS: AtomicUsize = ATOMIC_USIZE_INIT;
struct Elem(i32);
impl Drop for Elem {
fn drop(&mut self) {
DROPS.fetch_add(1, SeqCst);
}
}
let g = Garbage::new();
epoch::pin(|pin| unsafe {
let mut v = Vec::with_capacity(COUNT);
for i in 0..COUNT {
v.push(Elem(i as i32));
}
g.defer_drop(v.as_mut_ptr(), v.len(), pin);
g.flush(pin);
mem::forget(v);
});
while DROPS.load(SeqCst) < COUNT {
epoch::pin(|pin| g.collect(pin));
}
assert_eq!(DROPS.load(SeqCst), COUNT);
}
#[test]
fn destroy_array() {
const COUNT: usize = 100_000;
static DESTROYS: AtomicUsize = ATOMIC_USIZE_INIT;
let g = Garbage::new();
epoch::pin(|pin| unsafe {
let mut v = Vec::with_capacity(COUNT);
for i in 0..COUNT {
v.push(i as i32);
}
unsafe fn destroy(ptr: *mut i32, count: usize) {
assert!(count == COUNT);
drop(Vec::from_raw_parts(ptr, count, count));
DESTROYS.fetch_add(count, SeqCst);
}
g.defer_destroy(destroy, v.as_mut_ptr(), v.len(), pin);
g.flush(pin);
mem::forget(v);
});
while DESTROYS.load(SeqCst) < COUNT {
epoch::pin(|pin| g.collect(pin));
}
assert_eq!(DESTROYS.load(SeqCst), COUNT);
}
#[test]
fn drop_garbage() {
const COUNT: usize = 100_000;
static DROPS: AtomicUsize = ATOMIC_USIZE_INIT;
struct Elem(i32);
impl Drop for Elem {
fn drop(&mut self) {
DROPS.fetch_add(1, SeqCst);
}
}
let g = Garbage::new();
epoch::pin(|pin| unsafe {
for _ in 0..COUNT {
let a = Box::into_raw(Box::new(Elem(7i32)));
g.defer_drop(a, 1, pin);
}
g.flush(pin);
});
drop(g);
assert_eq!(DROPS.load(SeqCst), COUNT);
}
#[test]
fn stress() {
const THREADS: usize = 8;
const COUNT: usize = 100_000;
static DROPS: AtomicUsize = ATOMIC_USIZE_INIT;
struct Elem(i32);
impl Drop for Elem {
fn drop(&mut self) {
DROPS.fetch_add(1, SeqCst);
}
}
let g = Arc::new(Garbage::new());
let threads = (0..THREADS).map(|_| {
let g = g.clone();
thread::spawn(move || {
for _ in 0..COUNT {
epoch::pin(|pin| unsafe {
let a = Box::into_raw(Box::new(Elem(7i32)));
g.defer_drop(a, 1, pin);
});
}
})
}).collect::<Vec<_>>();
for t in threads {
t.join().unwrap();
}
drop(g);
assert_eq!(DROPS.load(SeqCst), COUNT * THREADS);
}
}