use {Handler, Evented, Poll, NotifyError, Token};
use event::{IoEvent, EventSet, PollOpt};
use notify::Notify;
use timer::{Timer, Timeout, TimerResult};
use std::{cmp, io, fmt, thread, usize};
use std::default::Default;
#[derive(Clone, Debug)]
pub struct EventLoopConfig {
notify_capacity: usize,
messages_per_tick: usize,
timer_tick_ms: u64,
timer_wheel_size: usize,
timer_capacity: usize,
}
impl EventLoopConfig {
pub fn new() -> EventLoopConfig {
EventLoopConfig {
notify_capacity: 4_096,
messages_per_tick: 256,
timer_tick_ms: 100,
timer_wheel_size: 1_024,
timer_capacity: 65_536,
}
}
pub fn notify_capacity(&mut self, capacity: usize) -> &mut Self {
self.notify_capacity = capacity;
self
}
pub fn messages_per_tick(&mut self, messages: usize) -> &mut Self {
self.messages_per_tick = messages;
self
}
pub fn timer_tick_ms(&mut self, ms: u64) -> &mut Self {
self.timer_tick_ms = ms;
self
}
pub fn timer_wheel_size(&mut self, size: usize) -> &mut Self {
self.timer_wheel_size = size;
self
}
pub fn timer_capacity(&mut self, cap: usize) -> &mut Self {
self.timer_capacity = cap;
self
}
}
impl Default for EventLoopConfig {
fn default() -> EventLoopConfig {
EventLoopConfig::new()
}
}
#[derive(Debug)]
pub struct EventLoop<H: Handler> {
run: bool,
poll: Poll,
timer: Timer<H::Timeout>,
notify: Notify<H::Message>,
config: EventLoopConfig,
}
const NOTIFY: Token = Token(usize::MAX);
impl<H: Handler> EventLoop<H> {
pub fn new() -> io::Result<EventLoop<H>> {
EventLoop::configured(EventLoopConfig::new())
}
pub fn configured(config: EventLoopConfig) -> io::Result<EventLoop<H>> {
let mut poll = try!(Poll::new());
let mut timer = Timer::new(
config.timer_tick_ms,
config.timer_wheel_size,
config.timer_capacity);
let notify = try!(Notify::with_capacity(config.notify_capacity));
try!(poll.register(¬ify, NOTIFY, EventSet::readable() | EventSet::writable() , PollOpt::edge()));
timer.setup();
Ok(EventLoop {
run: true,
poll: poll,
timer: timer,
notify: notify,
config: config,
})
}
pub fn channel(&self) -> Sender<H::Message> {
Sender::new(self.notify.clone())
}
pub fn timeout_ms(&mut self, token: H::Timeout, delay: u64) -> TimerResult<Timeout> {
self.timer.timeout_ms(token, delay)
}
pub fn clear_timeout(&mut self, timeout: Timeout) -> bool {
self.timer.clear(timeout)
}
pub fn shutdown(&mut self) {
self.run = false;
}
pub fn is_running(&self) -> bool {
self.run
}
pub fn register<E: ?Sized>(&mut self, io: &E, token: Token, interest: EventSet, opt: PollOpt) -> io::Result<()>
where E: Evented
{
self.poll.register(io, token, interest, opt)
}
pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: EventSet, opt: PollOpt) -> io::Result<()>
where E: Evented
{
self.poll.reregister(io, token, interest, opt)
}
pub fn run(&mut self, handler: &mut H) -> io::Result<()> {
self.run = true;
while self.run {
try!(self.run_once(handler, None));
}
Ok(())
}
pub fn deregister<E: ?Sized>(&mut self, io: &E) -> io::Result<()> where E: Evented {
self.poll.deregister(io)
}
pub fn run_once(&mut self, handler: &mut H, mut timeout_ms: Option<usize>) -> io::Result<()> {
let mut messages;
trace!("event loop tick");
messages = self.notify.check(self.config.messages_per_tick, true);
let pending = messages > 0;
if pending {
timeout_ms = Some(0);
}
let events = match self.io_poll(timeout_ms) {
Ok(e) => e,
Err(err) => {
if err.kind() == io::ErrorKind::Interrupted {
handler.interrupted(self);
0
} else {
return Err(err);
}
}
};
if !pending {
let remaining = self.config.messages_per_tick - messages;
messages += self.notify.check(remaining, false);
}
self.io_process(handler, events);
self.notify(handler, messages);
self.timer_process(handler);
handler.tick(self);
Ok(())
}
#[inline]
fn io_poll(&mut self, timeout: Option<usize>) -> io::Result<usize> {
let next_tick = self.timer.next_tick_in_ms()
.map(|ms| cmp::min(ms, usize::MAX as u64) as usize);
let timeout = match (timeout, next_tick) {
(Some(a), Some(b)) => Some(cmp::min(a, b)),
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
_ => None,
};
self.poll.poll(timeout)
}
fn io_process(&mut self, handler: &mut H, cnt: usize) {
let mut i = 0;
while i < cnt {
let evt = self.poll.event(i);
trace!("event={:?}", evt);
match evt.token {
NOTIFY => self.notify.cleanup(),
_ => self.io_event(handler, evt)
}
i += 1;
}
}
fn io_event(&mut self, handler: &mut H, evt: IoEvent) {
handler.ready(self, evt.token, evt.kind);
}
fn notify(&mut self, handler: &mut H, mut cnt: usize) {
while cnt > 0 {
match self.notify.poll() {
Some(msg) => {
handler.notify(self, msg);
cnt -= 1;
},
None => thread::yield_now(),
}
}
}
fn timer_process(&mut self, handler: &mut H) {
let now = self.timer.now();
loop {
match self.timer.tick_to(now) {
Some(t) => handler.timeout(self, t),
_ => return
}
}
}
}
unsafe impl<H: Handler> Sync for EventLoop<H> { }
impl <H: Handler> Drop for EventLoop<H> {
fn drop(&mut self) {
self.notify.close();
}
}
pub struct Sender<M: Send> {
notify: Notify<M>
}
impl<M: Send> Clone for Sender<M> {
fn clone(&self) -> Sender<M> {
Sender { notify: self.notify.clone() }
}
}
impl<M: Send> fmt::Debug for Sender<M> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "Sender<?> {{ ... }}")
}
}
unsafe impl<M: Send> Sync for Sender<M> { }
impl<M: Send> Sender<M> {
fn new(notify: Notify<M>) -> Sender<M> {
Sender { notify: notify }
}
pub fn send(&self, msg: M) -> Result<(), NotifyError<M>> {
self.notify.notify(msg)
}
}
#[cfg(test)]
#[cfg(unix)]
mod tests {
use std::str;
use std::sync::Arc;
use std::sync::atomic::AtomicIsize;
use std::sync::atomic::Ordering::SeqCst;
use super::EventLoop;
use {unix, Handler, Token, TryRead, TryWrite, EventSet, PollOpt};
use bytes::{Buf, SliceBuf, ByteBuf};
#[test]
pub fn test_event_loop_size() {
use std::mem;
assert!(512 >= mem::size_of::<EventLoop<Funtimes>>());
}
struct Funtimes {
rcount: Arc<AtomicIsize>,
wcount: Arc<AtomicIsize>
}
impl Funtimes {
fn new(rcount: Arc<AtomicIsize>, wcount: Arc<AtomicIsize>) -> Funtimes {
Funtimes {
rcount: rcount,
wcount: wcount
}
}
}
impl Handler for Funtimes {
type Timeout = usize;
type Message = ();
fn ready(&mut self, _event_loop: &mut EventLoop<Funtimes>, token: Token, events: EventSet) {
if events.is_readable() {
(*self.rcount).fetch_add(1, SeqCst);
assert_eq!(token, Token(10));
}
if events.is_writable() {
(*self.wcount).fetch_add(1, SeqCst);
assert_eq!(token, Token(10));
}
}
}
#[test]
pub fn test_readable() {
let mut event_loop = EventLoop::new().ok().expect("Couldn't make event loop");
let (mut reader, mut writer) = unix::pipe().unwrap();
let rcount = Arc::new(AtomicIsize::new(0));
let wcount = Arc::new(AtomicIsize::new(0));
let mut handler = Funtimes::new(rcount.clone(), wcount.clone());
writer.try_write_buf(&mut SliceBuf::wrap("hello".as_bytes())).unwrap();
event_loop.register(&reader, Token(10), EventSet::readable(),
PollOpt::edge()).unwrap();
let _ = event_loop.run_once(&mut handler, None);
let mut b = ByteBuf::mut_with_capacity(16);
assert_eq!((*rcount).load(SeqCst), 1);
reader.try_read_buf(&mut b).unwrap();
assert_eq!(str::from_utf8(b.flip().bytes()).unwrap(), "hello");
}
pub struct BrokenPipeHandler;
impl Handler for BrokenPipeHandler {
type Timeout = ();
type Message = ();
fn ready(&mut self, _: &mut EventLoop<Self>, token: Token, _: EventSet) {
if token == Token(1) {
panic!("Received ready() on a closed pipe.");
}
}
}
#[test]
pub fn broken_pipe() {
let mut event_loop: EventLoop<BrokenPipeHandler> = EventLoop::new().unwrap();
let (reader, _) = unix::pipe().unwrap();
let _ = event_loop.register(&reader, Token(1), EventSet::all(), PollOpt::edge());
let mut handler = BrokenPipeHandler;
drop(reader);
event_loop.run_once(&mut handler, Some(1000)).unwrap();
}
}