In Progress
Unit 1, Lesson 1
In Progress

Async with the Reactor Pattern

Have you ever wondered how async frameworks such as Ruby’s EventMachine or the Node JavaScript runtime work under the hood? Those frameworks are built on a technology known as the Reactor Pattern, and today we’re going to learn about this pattern by example. With a little help from marshmallows, chocolate, and graham crackers!

Video transcript & code

Async with Reactor

In this series, we’ve been looking at efficient ways to put together campfire snacks. Specifically “s’mores”, which consist of chocolate melted over a toasted marshmallow and sandwiched between two graham crackers.

Our code has focused on coordinating the work of three campers, each with their own responsibility in the s’more creation process.

In the last episode we made a small but important improvement to our process: we went from a polling loop to a multiplexing-and-demultiplexing model. This approach ensures that our loop waits exactly until a s’more prepper is ready for a new task. It doesn’t poll too often, or wait too long.

While this change has improved the efficiency of our implementation, it left the essential structure unchanged.

The basic pattern here is a hardcoded loop which is very specific to the task of assembling s’mores. It knows all about the different s’more-cooking roles, and parcels out work according to that knowledge.

This is fine for a program which is only about creating campfire snacks. But we’re using this code as a simplified stand-in for more complex programs such as web application servers. In those programs, there’s often a need to handle all kinds of potentially unrelated I/O operations within a single thread of execution.

So here’s a question: how could we generalize our event-handling loop? Is there some way we could abstract away the multiplexing-and-demultiplexing aspect, and just “plug in” application-specific handlers for different events?

Let’s take a crack at this.

We’ll start by introducing a new role. We’ll call it Expediter.

An Expediter’s job is to coordinate work around the campfire.

In the initializer we initialize a single instance variable: @ready_handlers.

We make this a Hash in which the default for an uninitialized slot is an empty array.

(You can learn more about this method of giving Hashes default values in Episode #032)

Next we add a method called when_ready. It takes a prepper object, like a MarshmallowWrangler or a Toaster, and a handler block to be associated with that prepper.

This is a registration method: it registers an action to be associated with a given prepper being ready for new work, by adding it to a list in the @ready_handlers Hash-of-Arrays.

Finally, we add a method named expedite.

This is where we’re going to move our multiplexing and demultiplexing logic. We start out with a call to wait_for_ready, passing it all the preppers that have been registered.

Remember, this method will simulate sleeping until one of the preppers it was passed is ready for new work. Then it returns a list of ready preppers, which we capture in a variable.

Once we have a list of ready preppers, we cycle through them.

For each one we get a list of handler blocks that have been registered for when that prepper is ready.

And then we simply call each handler.


class Expediter
  def initialize
    @ready_handlers = Hash.new{|h, k| h[k] = Array.new}
  end

  def when_ready(prepper, &handler)
    @ready_handlers[prepper] << handler
  end

  def expedite
    ready_preppers = wait_for_ready(@ready_handlers.keys)
    ready_preppers.each do |prepper|
      handlers_for_prepper = @ready_handlers[prepper]
      handlers_for_prepper.each(&:call)
    end
  end
end

And that’s it! That’s our expediter. Now to put it into action!

We’ll get rid of the old event loop completely.

In its place, we create a new Expediter.

Then we begin registering handlers using when_ready.

Our first handler says what to do when our marshmallow wrangler is ready for new work.

In this case, we search for any s’more orders that haven’t been started.

We log that we are starting on that order.

And we tell the wrangler to start extracting a marshmallow from the bag.

Next up, we specify what to do when our camper responsible for toasting marshmallows is ready.

In this case, we look for s’mores orders that have a marshmallow ready,

and if we find one, we log that we are toasting it and give it to the toaster to hold over the fire for a while.

And so on for the s’more assembler: when they have nothing to do, we’ll hand them the toasted marshmallow and tell them to get started putting a s’more together.


expediter = Expediter.new
expediter.when_ready(wrangler) do
  if(smore = smores.detect{|s| s.state == :not_started})
    log "Finding a marshmallow for #{smore.camper}"
    wrangler.start(smore)
  end
end
expediter.when_ready(toaster) do
  if(smore = smores.detect{|s| s.state == :has_marshmallow})
    log "Toasting marshmallow for #{smore.camper}"
    toaster.start(smore)
  end
end
expediter.when_ready(assembler) do
  if(smore = smores.detect{|s| s.state == :toasted})
    log "Assembling a smore for #{smore.camper}"
    assembler.start(smore)
  end
end

Now that we’ve told the ‘Expediter’ how to handle various events, we tell it to just keep expediting until all the s’more orders are finished.


expediter.expedite until smores.all?{|smore| smore.state == :ready }

We need to make one more small change before all this will work.


if(smore = smores.detect{|s| s.state == :not_started})

We need to make sure none of these handlers will try to give a s’more order to two workers at once. To do that, we need to make sure that s’more orders that are currently being handled by a prepper are marked as being in an intermediate state, so they won’t be selected for the next prepper yet.

To arrange for this, we go up to our Prepper base class.

We add a new line to the start method which puts the s’more order into a new :in_prep state. This way s’more orders in progress will be taken out of consideration until their current prepper marks them as completed.


class Prepper
    # ...
  def start(smore)
    @current_smore = smore
    @current_smore.state = :in_prep
  end
    # ...
end

OK let’s give this a whirl.

We get the same output as before! This is exactly what we hoped for.


expediter = Expediter.new
expediter.when_ready(wrangler) do
  if(smore = smores.detect{|s| s.state == :not_started})
    log "Finding a marshmallow for #{smore.camper}"
    wrangler.start(smore)
  end
end
expediter.when_ready(toaster) do
  if(smore = smores.detect{|s| s.state == :has_marshmallow})
    log "Toasting marshmallow for #{smore.camper}"
    toaster.start(smore)
  end
end
expediter.when_ready(assembler) do
  if(smore = smores.detect{|s| s.state == :toasted})
    log "Assembling a smore for #{smore.camper}"
    assembler.start(smore)
  end
end

Our new code is not wildly different in shape from the multiplexed event loop we had before. But it differs in one key respect: instead of a single hardcoded event loop, we are now registering handlers for prepper readiness events. We could do this registration anywhere in the program — it doesn’t have to happen all in one place as it does here. There’s no longer any need to edit a single location in the codebase every time we add a new type of prepper or a new activity that should trigger when a prepper is ready for work.

What from When

What we’ve done here is decouple the what from the when. We’ve abstracted our multiplexing and demultiplexing event loop into the generic Expediter class. And we can register arbitrary preppers and actions against this loop. In fact we could even register new actions inside other actions!

Reactor Pattern

There is a name for this pattern: it’s called the Reactor pattern. The Reactor pattern is how frameworks like Node.js or Ruby’s EventMachine library enable a single thread of execution to handle arbitrary numbers of I/O events, registered from various parts of an application.

As I/O handles become ready, the Reactor demultiplexes them to the handlers registered against them. One by one, these handlers are executed synchronously. They may register new handlers, which in turn will be triggered when their associated I/O completes. Once the Reactor has no more events to process, it goes back to waiting for new ones.

We’ve succeeded in making our s’more-making instructions more generic and open to future extension. But we haven’t really made any progress in making the code tell a clean, sequential story about making s’mores. We’ll tackle that task in future episodes. Until then,

Happy hacking!

Responses