use std::path::PathBuf;
use std::io::prelude::*;
use std::collections::HashMap;
use std::net::ToSocketAddrs;
use std::iter;
use std::time::SystemTime;
use bincode::{Infinite, serialize, deserialize};
use image;
use mio::tcp::{TcpStream, Shutdown};
use mio::*;
use film::Image;
use exec::Config;
use exec::distrib::{worker, Instructions, Frame};
use sampler::BlockQueue;
#[derive(Debug)]
enum DistributedFrame {
InProgress {
num_reporting: usize,
render: Image,
first_tile_recv: SystemTime,
},
Completed,
}
impl DistributedFrame {
pub fn start(img_dim: (usize, usize)) -> DistributedFrame {
DistributedFrame::InProgress {
num_reporting: 0,
render: Image::new(img_dim),
first_tile_recv: SystemTime::now(),
}
}
}
#[derive(Clone, Debug)]
struct WorkerBuffer {
pub buf: Vec<u8>,
pub expected_size: usize,
pub currently_read: usize,
}
impl WorkerBuffer {
pub fn new() -> WorkerBuffer {
WorkerBuffer { buf: Vec::new(), expected_size: 8, currently_read: 0 }
}
}
pub struct Master {
workers: Vec<String>,
connections: Vec<TcpStream>,
worker_buffers: Vec<WorkerBuffer>,
config: Config,
frames: HashMap<usize, DistributedFrame>,
img_dim: (usize, usize),
blocks_per_worker: usize,
blocks_remainder: usize,
}
impl Master {
pub fn start_workers(workers: Vec<String>, config: Config, img_dim: (usize, usize))
-> (Master, EventLoop<Master>) {
let queue = BlockQueue::new((img_dim.0 as u32, img_dim.1 as u32), (8, 8), (0, 0));
let blocks_per_worker = queue.len() / workers.len();
let blocks_remainder = queue.len() % workers.len();
let mut event_loop = EventLoop::<Master>::new().unwrap();
let mut connections = Vec::new();
for (i, host) in workers.iter().enumerate() {
let addr = (&host[..], worker::PORT).to_socket_addrs().unwrap().next().unwrap();
match TcpStream::connect(&addr) {
Ok(stream) => {
if let Err(e) = event_loop.register(&stream, Token(i), EventSet::all(), PollOpt::level()) {
panic!("Error registering stream from {}: {}", host, e);
}
connections.push(stream);
},
Err(e) => panic!("Failed to contact worker {}: {:?}", host, e),
}
}
let worker_buffers: Vec<_> = iter::repeat(WorkerBuffer::new()).take(workers.len()).collect();
let master = Master { workers: workers, connections: connections,
worker_buffers: worker_buffers, config: config,
frames: HashMap::new(),
img_dim: img_dim,
blocks_per_worker: blocks_per_worker,
blocks_remainder: blocks_remainder };
(master, event_loop)
}
fn save_results(&mut self, frame: Frame) {
let frame_num = frame.frame as usize;
let img_dim = self.img_dim;
let mut df = self.frames.entry(frame_num).or_insert_with(|| DistributedFrame::start(img_dim));
let mut finished = false;
match *df {
DistributedFrame::InProgress { ref mut num_reporting, ref mut render, ref first_tile_recv } => {
render.add_blocks(frame.block_size, &frame.blocks, &frame.pixels);
*num_reporting += 1;
if *num_reporting == self.workers.len() {
let render_time = first_tile_recv.elapsed().expect("Failed to get rendering time?");
let out_file = match self.config.out_path.extension() {
Some(_) => self.config.out_path.clone(),
None => self.config.out_path.join(
PathBuf::from(format!("frame{:05}.png", frame_num))),
};
let img = render.get_srgb8();
let dim = render.dimensions();
match image::save_buffer(&out_file.as_path(), &img[..], dim.0 as u32,
dim.1 as u32, image::RGB(8)) {
Ok(_) => {},
Err(e) => println!("Error saving image, {}", e),
};
println!("Frame {}: time between receiving first and last tile {:4}s",
frame_num, render_time.as_secs() as f64 + render_time.subsec_nanos() as f64 * 1e-9);
println!("Frame {}: rendered to '{}'\n--------------------", frame_num, out_file.display());
finished = true;
}
},
DistributedFrame::Completed => println!("Worker reporting on completed frame {}?", frame_num),
}
if finished {
*df = DistributedFrame::Completed;
}
}
fn read_worker_buffer(&mut self, worker: usize) -> bool {
let buf = &mut self.worker_buffers[worker];
if buf.currently_read < 8 {
buf.buf.extend(iter::repeat(0u8).take(8));
match self.connections[worker].read(&mut buf.buf[buf.currently_read..]) {
Ok(n) => buf.currently_read += n,
Err(e) => println!("Error reading results from worker {}: {}", self.workers[worker], e),
}
if buf.currently_read == buf.expected_size {
buf.expected_size = deserialize(&buf.buf[..]).unwrap();
buf.buf.extend(iter::repeat(0u8).take(buf.expected_size - 8));
}
}
if buf.currently_read >= 8 {
match self.connections[worker].read(&mut buf.buf[buf.currently_read..]) {
Ok(n) => buf.currently_read += n,
Err(e) => println!("Error reading results from worker {}: {}", self.workers[worker], e),
}
}
buf.currently_read == buf.expected_size
}
}
impl Handler for Master {
type Timeout = ();
type Message = ();
fn ready(&mut self, event_loop: &mut EventLoop<Master>, token: Token, event: EventSet) {
let worker = token.as_usize();
if event.is_error() {
panic!("Error connecting to {}", self.workers[worker]);
}
if event.is_hup() {
if let Err(e) = self.connections[worker].shutdown(Shutdown::Both) {
println!("Error shutting down worker {}: {}", worker, e);
}
if let Err(e) = event_loop.deregister(&self.connections[worker]) {
println!("Error deregistering worker {}: {}", worker, e);
}
}
if event.is_writable() {
let b_start = worker * self.blocks_per_worker;
let b_count =
if worker == self.workers.len() - 1 {
self.blocks_per_worker + self.blocks_remainder
} else {
self.blocks_per_worker
};
let instr = Instructions::new(&self.config.scene_file,
(self.config.frame_info.start, self.config.frame_info.end),
b_start, b_count);
let bytes = serialize(&instr, Infinite).unwrap();
if let Err(e) = self.connections[worker].write_all(&bytes[..]) {
println!("Failed to send instructions to {}: {:?}", self.workers[worker], e);
}
event_loop.reregister(&self.connections[worker], token,
EventSet::readable() | EventSet::error() | EventSet::hup(),
PollOpt::level()).expect("Re-registering failed");
}
if event.is_readable() && self.read_worker_buffer(worker) {
let frame = deserialize(&self.worker_buffers[worker].buf[..]).unwrap();
self.save_results(frame);
self.worker_buffers[worker].buf.clear();
self.worker_buffers[worker].expected_size = 8;
self.worker_buffers[worker].currently_read = 0;
}
let all_complete = self.frames.values().fold(true,
|all, v| {
match *v {
DistributedFrame::Completed => true && all,
_ => false,
}
});
let num_frames = self.config.frame_info.end - self.config.frame_info.start + 1;
if self.frames.len() == num_frames && all_complete {
event_loop.shutdown();
}
}
}