1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
//! The master module provides the Master struct which instructs Workers which
//! portions of the image they should render and collects their results to combine
//! into the final image.

use std::path::PathBuf;
use std::io::prelude::*;
use std::collections::HashMap;
use std::net::ToSocketAddrs;
use std::iter;
use std::time::SystemTime;

use bincode::{Infinite, serialize, deserialize};
use image;
use mio::tcp::{TcpStream, Shutdown};
use mio::*;

use film::Image;
use exec::Config;
use exec::distrib::{worker, Instructions, Frame};
use sampler::BlockQueue;

/// Stores distributed rendering status. The frame is either `InProgress` and contains
/// partially rendered results from the workers who've reported the frame or is `Completed`
/// and has been saved out to disk.
#[derive(Debug)]
enum DistributedFrame {
    InProgress {
        // The number of workers who have reported results for this
        // frame so far
        num_reporting: usize,
        render: Image,
        // Start time of this frame, when we got the first tiles in from a worker
        first_tile_recv: SystemTime,
    },
    Completed,
}

impl DistributedFrame {
    pub fn start(img_dim: (usize, usize)) -> DistributedFrame {
        DistributedFrame::InProgress {
            num_reporting: 0,
            render: Image::new(img_dim),
            first_tile_recv: SystemTime::now(),
        }
    }
}

/// Buffer for collecting results from a worker asynchronously. The buffer is filled
/// as we get readable events from the workers until it reaches the expected size.
/// After this the Frame is decoded and accumulated in the appropriate `DistributedFrame`
#[derive(Clone, Debug)]
struct WorkerBuffer {
    pub buf: Vec<u8>,
    pub expected_size: usize,
    pub currently_read: usize,
}

impl WorkerBuffer {
    pub fn new() -> WorkerBuffer {
        WorkerBuffer { buf: Vec::new(), expected_size: 8, currently_read: 0 }
    }
}

/// The Master organizes the set of Worker processes and instructions them what parts
/// of the scene to render. As workers report results the master collects them and
/// saves out the PNG once all workers have reported the frame.
pub struct Master {
    /// Hostnames of the workers to send work too
    workers: Vec<String>,
    connections: Vec<TcpStream>,
    /// Temporary buffers to store worker results in as they're
    /// read in over TCP
    worker_buffers: Vec<WorkerBuffer>,
    config: Config,
    /// List of the frames we're collecting or have completed
    frames: HashMap<usize, DistributedFrame>,
    img_dim: (usize, usize),
    /// Number of 8x8 blocks we're assigning per worker
    blocks_per_worker: usize,
    /// Remainder of blocks that will be tacked on to the last
    /// worker's assignment
    blocks_remainder: usize,
}

impl Master {
    /// Create a new master that will contact the worker nodes passed and
    /// send instructions on what parts of the scene to start rendering
    pub fn start_workers(workers: Vec<String>, config: Config, img_dim: (usize, usize))
                         -> (Master, EventLoop<Master>) {
        // Figure out how many blocks we have for this image and assign them to our workers
        let queue = BlockQueue::new((img_dim.0 as u32, img_dim.1 as u32), (8, 8), (0, 0));
        let blocks_per_worker = queue.len() / workers.len();
        let blocks_remainder = queue.len() % workers.len();

        let mut event_loop = EventLoop::<Master>::new().unwrap();
        let mut connections = Vec::new();

        // Connect to each worker and add them to the event loop
        for (i, host) in workers.iter().enumerate() {
            let addr = (&host[..], worker::PORT).to_socket_addrs().unwrap().next().unwrap();
            match TcpStream::connect(&addr) {
                Ok(stream) => {
                    // Each worker is identified in the event loop by their index in the vec
                    if let Err(e) = event_loop.register(&stream, Token(i), EventSet::all(), PollOpt::level()) {
                        panic!("Error registering stream from {}: {}", host, e);
                    }
                    connections.push(stream);
                },
                Err(e) => panic!("Failed to contact worker {}: {:?}", host, e),
            }
        }
        let worker_buffers: Vec<_> = iter::repeat(WorkerBuffer::new()).take(workers.len()).collect();
        let master = Master { workers: workers, connections: connections,
                              worker_buffers: worker_buffers, config: config,
                              frames: HashMap::new(),
                              img_dim: img_dim,
                              blocks_per_worker: blocks_per_worker,
                              blocks_remainder: blocks_remainder };
        (master, event_loop)
    }
    /// Read a result frame from a worker and save it into the list of frames we're collecting from
    /// all workers. Will save out the final render if all workers have reported results for this
    /// frame.
    fn save_results(&mut self, frame: Frame) {
        let frame_num = frame.frame as usize;
        let img_dim = self.img_dim;
        // Find the frame being reported and create it if we haven't received parts of this frame yet
        let mut df = self.frames.entry(frame_num).or_insert_with(|| DistributedFrame::start(img_dim));

        let mut finished = false;
        match *df {
            DistributedFrame::InProgress { ref mut num_reporting, ref mut render, ref first_tile_recv } => {
                // Collect results from the worker and see if we've finished the frame and can save
                // it out
                render.add_blocks(frame.block_size, &frame.blocks, &frame.pixels);
                *num_reporting += 1;
                if *num_reporting == self.workers.len() {
                    let render_time = first_tile_recv.elapsed().expect("Failed to get rendering time?");
                    let out_file = match self.config.out_path.extension() {
                        Some(_) => self.config.out_path.clone(),
                        None => self.config.out_path.join(
                            PathBuf::from(format!("frame{:05}.png", frame_num))),
                    };
                    let img = render.get_srgb8();
                    let dim = render.dimensions();
                    match image::save_buffer(&out_file.as_path(), &img[..], dim.0 as u32,
                    dim.1 as u32, image::RGB(8)) {
                        Ok(_) => {},
                        Err(e) => println!("Error saving image, {}", e),
                    };
                    println!("Frame {}: time between receiving first and last tile {:4}s",
                             frame_num, render_time.as_secs() as f64 + render_time.subsec_nanos() as f64 * 1e-9);
                    println!("Frame {}: rendered to '{}'\n--------------------", frame_num, out_file.display());
                    finished = true;
                }
            },
            DistributedFrame::Completed => println!("Worker reporting on completed frame {}?", frame_num),
        }
        // This is a bit awkward, since we borrow df in the match we can't mark it finished in there
        if finished {
            *df = DistributedFrame::Completed;
        }
    }
    /// Read results from a worker and accumulate this data in its worker buffer. Returns true if
    /// we've read the data being sent and can decode the buffer
    fn read_worker_buffer(&mut self, worker: usize) -> bool {
        let buf = &mut self.worker_buffers[worker];
        // If we haven't read the size of data being sent, read that now
        if buf.currently_read < 8 {
            // First 8 bytes are a u64 specifying the number of bytes being sent
            buf.buf.extend(iter::repeat(0u8).take(8));
            match self.connections[worker].read(&mut buf.buf[buf.currently_read..]) {
                Ok(n) => buf.currently_read += n,
                Err(e) => println!("Error reading results from worker {}: {}", self.workers[worker], e),
            }
            if buf.currently_read == buf.expected_size {
                // How many bytes we expect to get from the worker for a frame
                buf.expected_size = deserialize(&buf.buf[..]).unwrap();
                // Extend the Vec so we've got enough room for the remaning bytes, minus the 8 for the
                // encoded size header
                buf.buf.extend(iter::repeat(0u8).take(buf.expected_size - 8));
            }
        }
        // If we've finished reading the size header we can now start reading the frame data
        if buf.currently_read >= 8 {
            match self.connections[worker].read(&mut buf.buf[buf.currently_read..]) {
                Ok(n) => buf.currently_read += n,
                Err(e) => println!("Error reading results from worker {}: {}", self.workers[worker], e),
            }
        }
        buf.currently_read == buf.expected_size
    }
}

impl Handler for Master {
    type Timeout = ();
    type Message = ();

    fn ready(&mut self, event_loop: &mut EventLoop<Master>, token: Token, event: EventSet) {
        let worker = token.as_usize();
        if event.is_error() {
            // We don't do distributed error handling so should abort if we fail to
            // connect for now
            panic!("Error connecting to {}", self.workers[worker]);
        }
        // If the worker has terminated, shutdown the read end of the connection
        if event.is_hup() {
            if let Err(e) = self.connections[worker].shutdown(Shutdown::Both) {
                println!("Error shutting down worker {}: {}", worker, e);
            }
            // Remove the connection from the event loop
            if let Err(e) = event_loop.deregister(&self.connections[worker]) {
                println!("Error deregistering worker {}: {}", worker, e);
            }
        }
        // A worker is ready to receive instructions from us
        if event.is_writable() {
            let b_start = worker * self.blocks_per_worker;
            let b_count =
                if worker == self.workers.len() - 1 {
                    self.blocks_per_worker + self.blocks_remainder
                } else {
                    self.blocks_per_worker
                };
            let instr = Instructions::new(&self.config.scene_file,
                                          (self.config.frame_info.start, self.config.frame_info.end),
                                          b_start, b_count);
            // Encode and send our instructions to the worker
            let bytes = serialize(&instr, Infinite).unwrap();
            if let Err(e) = self.connections[worker].write_all(&bytes[..]) {
                println!("Failed to send instructions to {}: {:?}", self.workers[worker], e);
            }
            // Register that we no longer care about writable events on this connection
            event_loop.reregister(&self.connections[worker], token,
                                  EventSet::readable() | EventSet::error() | EventSet::hup(),
                                  PollOpt::level()).expect("Re-registering failed");
        }
        // Some results are available from a worker
        // Read results from the worker, if we've accumulated all the data being sent
        // decode and accumulate the frame
        if event.is_readable() && self.read_worker_buffer(worker) {
            let frame = deserialize(&self.worker_buffers[worker].buf[..]).unwrap();
            self.save_results(frame);
            // Clean up the worker buffer for the next frame
            self.worker_buffers[worker].buf.clear();
            self.worker_buffers[worker].expected_size = 8;
            self.worker_buffers[worker].currently_read = 0;
        }
        // After getting results from the worker we check if we've completed all our frames
        // and exit if so
        let all_complete = self.frames.values().fold(true,
                                |all, v| {
                                    match *v {
                                        DistributedFrame::Completed => true && all,
                                        _ => false,
                                    }
                                });
        // The frame start/end range is inclusive, so we must add 1 here
        let num_frames = self.config.frame_info.end - self.config.frame_info.start + 1;
        if self.frames.len() == num_frames && all_complete {
            event_loop.shutdown();
        }
    }
}