Evented Programming

Evented Programming and its patterns

Bruno Michel

What is Evented Programming?

Evented Programming

Non-blocking I/O in a reactor

So, what is this reactor thing?

Reactor == a single-threaded while loop
           that triggers callbacks on events

Something like:

while reactor_running?
  events.each do |event|
    event.callbacks.each do |cb|
      cb.call(event)
    end
  end
end

And how it works?

3 ways to handle incoming requests:

  • One request per process
  • One request per thread
  • And Evented Programming

Implementations

  • C: libev & libevent
  • JS: Node.js
  • Python: Twisted & Gevent
  • Perl: POE
  • Ruby: EventMachine & Cool.io
  • Java: Netty

Implementations

Examples will be in:

  • Node.js (JS)
  • EventMachine (Ruby)

First example: Echo Server

var net = require('net');

var server = net.createServer(function (socket) {
  socket.pipe(socket);
});

server.listen(1337, "127.0.0.1");

First example: Echo Server

require 'eventmachine'
class EchoServer < EM::Connection
  def receive_data(data)
    send_data(data)
  end
end
EM.run do
  EM.start_server("127.0.0.1", 1337, EchoServer)
end

How to handle errors?

Explosions

Errors handling with return code (C)

int res;
res = listen(sock, backlog);
if (res < 0) {
    perror("Listen error");
    return -1;
}

Error handling with exceptions (Ruby)

begin
  File.open('test.rb', 'w') do |f|
    f.puts "Hello World!\n"
  end
rescue Exception => msg
  puts msg
end

And for Evented Programming?

Neither works :(

Because the function returns before the error can happen

Other possibilities

  • Add a second function, errback
  • Add a parameter to the callback
  • Promises / Deferrables / Futures
  • etc.

Node.js: callback with a parameter

fs.readFile('/etc/passwd', function (err, data) {
  if (err) throw err;
  console.log(data);
});

EventMachine: Deferrables

http = EM::HttpRequest.new("http://google.fr/").get
http.errback do |error|
  puts "Error #{error}"
end
http.callback do
 puts "Yiped, Google is up!"
end

Some patterns

Patterns

The two really basic patterns

Sequence

Parallel

Sequence

fs.open('results', 'w', function(err, fd) {
  fs.write(fd, results, function(err, written, f) {
    fs.close(fd, function(err) {
        done();
    });
  });
});

Parallel

var counter = 3;
fs.readFile("part.1", onRead);
fs.readFile("part.2", onRead);
fs.readFile("part.3", onRead);
function onRead(err, content) {
  if (err) throw err;
  if (--counter === 0) { done(); }
}

We can do better!

Lots of node.js libraries for these 2 patterns.

Example: GoWithTheFlow.js (sequence)

Flow().seq(function(next) {
  console.log("first job");
  fs.readFile(filename, next);
}).seq(function(next, err, data) {
  console.log("second job. run *after* first job");
  next();
});

Example: GoWithTheFlow.js (parallel)

Flow().par(function(next) {
  console.log("job foo");
  next(null, "foo");
}).par(function(next) {
  console.log("job bar");
  next(null, "bar");
}).seq(function(next, errs, results) {
  console.log("job run *after* foo and bar");
});

Mix of sequences and parallels

Mix

And what about EventMachine

EventMachine has EM::Iterator!

EM::Iterator for a sequence

cmds = ['pwd', 'uptime', 'uname', 'date']
EM::Iterator.new(cmds).map(proc{ |cmd,iter|
  EM.system(cmd) { |out,status| iter.return(out) }
}, proc{ |results|
  p results
})

Em::Iterator for parallel actions

cmds = ['pwd', 'uptime', 'uname', 'date']
EM::Iterator.new(cmds, cmds.length).map(proc{ |cmd,iter|
  EM.system(cmd) { |out,status| iter.return(out) }
}, proc{ |results|
  p results
})

Bonus: adjust concurrency

cmds = ['pwd', 'uptime', 'uname', 'date']
EM::Iterator.new(cmds, 2).map(proc{ |cmd,iter|
  EM.system(cmd) { |out,status| iter.return(out) }
}, proc{ |results|
  p results
})

Next pattern: pool of connections

pool = EM::Pool.new
4.times { pool.add EM::HttpRequest.new(url) }
many_paths.each do |path|
  pool.perform do |conn|
    req = conn.get(:path => '/', :keepalive => true)
    req.callback { puts "Size: #{req.response.size}" }
  end
end

Node.js < EventMachine ?

Nope

They are different beast

But Node.js has also its cool features

Streams

var filestream  = fs.createReadStream('file.txt'),
    gzipstream  = gzip.createStream(),
    writestream = fs.createWriteStream('file.tgz');
filestream.pipe(gzipstream)
          .pipe(writestream)
          .on('end', function () { console.log("done"); });

More patterns

Patterns

Timeout

http = EM::HttpRequest.new("http://google.fr/").get
http.errback do |error|
  puts "Error #{error}"
end
http.callback do
 puts "Yiped, Google is up!"
end
http.timeout(10, :timeout)

Join Points: example of a chat

var http = require('http');
http.createServer(function (req, res) {
  if (request.method === 'POST') {
    // Accept the new message
  } else {
    // Wait for the next message
  }
}).listen(1337, "127.0.0.1");

Node.js solution: events

var chat = new EventEmitter();
http.createServer(function (req, res) {
  if (request.method === 'POST') {
    req.on('data', function(msg) {
      chat.emit("message", msg); });
  } else {
    chat.on("message", function(msg) { res.end(msg); });
  }
}).listen(1337, "127.0.0.1");

EventMachine solution: channel

chan = EM::Channel.new
chan.subscribe {|msg| send_data(msg) }
chan << "A message"

Asynchronous is hard

Remember that example?

fs.open('results', 'w', function(err, fd) {
  fs.write(fd, results, function(err, written, f) {
    fs.close(fd, function(err) {
        done();
    });
  });
});

How to refactor it?

Extract function and give them a name

How to refactor it?

But, be warned, you will lose context!

In fact, harder to follow the flow

A magic solution

Magic wand

Ruby 1.9 Fibers

fib = Fiber.new do
  x, y = 0, 1
  loop do
    Fiber.yield y
    x, y = y, x+y
  end
end
20.times { puts fib.resume }

Back to EventMachine

def async_fetch(url)
  f = Fiber.current
  http = EM::HttpRequest.new(url).get :timeout => 10
  http.callback { f.resume(http) }
  http.errback { f.resume(http) }
  return Fiber.yield
end

Back to EventMachine (2)

EM.run do
  Fiber.new do
    data = async_fetch('http://www.google.com/')
    puts "Fetched page: #{data.response_header.status}"
    EM.stop
  end.resume
end

Fibers vs Threads

cooperative scheduling

EM-Synchrony

EventMachine.synchrony do
  multi = EventMachine::Synchrony::Multi.new
  multi.add :a, EM::HttpRequest.new(url1).aget
  multi.add :b, EM::HttpRequest.new(url2).aget
  res = multi.perform
  p res
  EM.stop
end

Too magic to be true?

Not perfect

Each fiber has a stack of 4Kb

Don't use it with Rails for example

Limits?

Limits

Don't block the reactor

EM.run do
  # many things...
  on_a_callback do
    compute_pi_decimals(1_000_000)
  end
end

In fact, only thread

One thread, one process...

We are using only one core of our computer!

Solutions

  1. Split your task in many small parts and run the event loop between the parts
  2. Delegate the computation to another thread or process

Conclusion

That's all folks

Questions?