In Progress
Unit 1, Lesson 1
In Progress

Active Object

Video transcript & code

Back in Episode #127, we wrote a Ruby equivalent to a basic Elixir program using threads and queues. That example did a decent job demonstrating that we can emulate an Erlang-style Actor model by passing simple arrays from thread to thread via queues. But for the actual task at hand–calculating a list of fibonacci numbers in parallel–the approach felt a bit like overkill.

Today let's revisit this problem, but apply a different pattern in order to arrive at a solution. As before, we'll use a deliberately naive fibonacci function, in order to simulate some processor-intensive activity that we want to split across multiple CPU cores.

def fib_calc(n)
  case n
  when 0, 1 then 1
  else fib_calc(n-1) + fib_calc(n-2)
  end
end

We're going to make use of the timed, sized thread-safe queue class that we built up over a series of recent episodes. We've renamed this class TapasQueue. We'll also tell Ruby to immediately abort when a thread encounters an exception, in order to ease debugging.

require "./tapas_queue"

Thread.abort_on_exception = true

First off, we're going to create a new class we'll call WorkQueue. The job of a Work Queue is to keep a queue of jobs to perform, and distribute those jobs out to a pool of "worker" threads.

A work queue will receive a thread count on initialization. It will queue up work to be done in a @jobs queue. We cap this queue, somewhat arbitrarily, at 10 items. As we learned in the "dead thread" episode, it's usually a good idea to have some kind of limit on queue size so that we can more quickly discover if they are not being emptied in a timely fashion.

The next step in initializing our work queue is to set up the thread pool. We use the thread_count to determine how many threads to start. Each thread delegates its actual logic to a method called #dispatch, which we'll get to in a moment.

We need a way to put new jobs into the work queue. We satisfy this requirement with an #enqueue method.

We also want to be able to shut down the work queue when we have no more work for it. We write a #stop method. It enqueues a special :die symbol once for each thread in the pool. Then it waits for all the threads to end, using the Thread#join method.

Next up, we tackle the heart of the worker threads, the #dispatch method. This method loops indefinitely. On each iteration, it pops a job off of the work queue, raising an exception if it waits longer than 30 seconds.

If the job is the special :die instruction, it immediately breaks out of the loop. This will result in the thread terminating.

Otherwise, it executes the job. This consists of simply sending the #call message to the job. What this accomplishes is left up to the job object.

This work queue is completely generic: its pool of threads simply calls each job that is enqueued. The jobs themselves could be simple procs or lambdas, or they could be complex objects that respond to the #call method.

class WorkQueue
  def initialize(thread_count)
    @jobs    = TapasQueue.new(10)
    @threads = thread_count.times.map {|worker_num|
      Thread.new do
        dispatch(worker_num)
      end
    }
  end

  def enqueue(job)
    @jobs.push(job)
  end

  def stop
    $log.puts "Stopping thread pool"
    @threads.size.times do
      enqueue(:die)
    end
    @threads.each(&:join)
    $log.puts "Thread pool stopped"
  end

  private

  def dispatch(worker_num)
    $log.puts "W#{worker_num}: Starting"
    loop do
      job = @jobs.pop(30) do
        raise "W#{worker_num}: Timed out waiting for job"
      end
      break if job == :die
      $log.puts "W#{worker_num}: working..."
      job.call
    end
    $log.puts "W#{worker_num}: terminating"
  end
end

Now to put this work queue to work. Let's write a method to compute a batch of fibonacci solutions. It'll receive a thread count and a list of numbers to compute. We'll instantiate a work queue to process the calculations, using the passed thread_count to determine the size of the thread pool. We'll also set up a queue to collect answers as they are produces, and an empty hash intended to contain the final set of solutions.

We create a producer thread which loops through the list of numbers to process. For each one, it enqueues a lambda onto the work queue. The lambda contains code to calculate a single answer, and then push the answer, along with the original number, onto the answer_queue.

We then create a collector thread. This thread waits on the answer queue. As answers arrive, it fills in the solutions hash. This thread will finish once there is a solution for every given number.

You might be wondering why we don't simply have the job lambdas fill in the solutions hash directly. The reasons is that we don't know if Hash operations are thread-safe on all Ruby implementations. For maximum safety, we would need to surround all solutions hash access in a Mutex synchronization block. That wouldn't be a terrible solution, but as a general rule of thumb, when writing threaded programs we want to make the individual threads as independent as possible. We try to avoid making them share and potentially contend over a resource. Every time we add shared resources and manual synchronization, we increase our chances of introducing subtle threading bugs.

So instead, we have our jobs push results into a thread-safe answer queue, and we delegate the job of collating the results to a dedicated thread. Since this thread is the only one touching the solutions hash until it finishes, there is no contention.

Finally, this method waits for all the threads to stop, and returns the completed solutions hash.

def compute_batch(thread_count, numbers)
  work_queue   = WorkQueue.new(thread_count)
  answer_queue = TapasQueue.new(10)
  solutions    = {}

  producer = Thread.new do
    numbers.each do |n|
      $log.puts "P: Pushing #{n}"
      work_queue.enqueue(-> do
        result = fib_calc(n)
        answer_queue.push(number: n, result: result)
      end)
    end
  end

  collector = Thread.new do
    while solutions.size < numbers.size
      answer = answer_queue.pop(30) do
        raise "Collector timed out waiting for an answer"
      end
      $log.puts "C: Collecting answer for #{answer[:number]}"
      solutions[answer[:number]] = answer[:result]
    end
  end
  producer.join
  collector.join
  work_queue.stop
  solutions
end

We wrap up by writing an invocation of this method with four threads specified, and our original list of input numbers from episode 127.

numbers = [
  27, 33, 35, 11, 36, 29, 18, 37, 21, 31, 19, 10, 14, 30,
  15, 17, 23, 28, 25, 34, 22, 20, 13, 16, 32, 12, 26, 24,
]

p compute_batch(4, numbers)

Let's run this code:

$ rvm ruby-2.0.0 do ruby pool.rb                                                                       
W2: Starting
W1: StartingW0: Starting
P: Pushing 27
W3: Starting

P: Pushing 33
W3: working...
P: Pushing 35
C: Collecting answer for 27
W0: working...P: Pushing 11
W1: working...

P: Pushing 36
W2: working...
P: Pushing 29
W2: working...C: Collecting answer for 11

W3: working...
C: Collecting answer for 33P: Pushing 18

C: Collecting answer for 29P: Pushing 37
W0: working...
C: Collecting answer for 35

C: Collecting answer for 18
P: Pushing 21
P: Pushing 31W3: working...
W1: working...

C: Collecting answer for 21P: Pushing 19W0: working...


P: Pushing 10
W1: working...P: Pushing 14
W0: working...
C: Collecting answer for 31
P: Pushing 30
W2: working...

C: Collecting answer for 36
P: Pushing 15C: Collecting answer for 19
W1: working...C: Collecting answer for 10

C: Collecting answer for 14

P: Pushing 17
W0: working...
C: Collecting answer for 30P: Pushing 23
W2: working...
W1: working...
P: Pushing 28

C: Collecting answer for 15P: Pushing 25
W0: working...
P: Pushing 34
W2: working...
C: Collecting answer for 17

C: Collecting answer for 23
C: Collecting answer for 28
P: Pushing 22
W1: working...
C: Collecting answer for 25P: Pushing 20W0: working...


C: Collecting answer for 37
C: Collecting answer for 22
P: Pushing 13
W2: working...
P: Pushing 16W3: working...
C: Collecting answer for 20

P: Pushing 32W0: working...
C: Collecting answer for 34

C: Collecting answer for 13
C: Collecting answer for 16
P: Pushing 12W2: working...

P: Pushing 26W1: working...
C: Collecting answer for 12

C: Collecting answer for 32
P: Pushing 24W3: working...

W0: working...
C: Collecting answer for 26
C: Collecting answer for 24
Stopping thread pool
W1: terminating
W2: terminating
W3: terminating
W0: terminating
Thread pool stopped

The output is a little messy because the puts statements from various threads are stepping on each other a bit. But we can see that multiple workers are popping work off the queue and executing it.

We've implemented a basic thread pool in Ruby. If we benchmark this solution under a Ruby implementation where threads are truly independent, we can see a modest improvement in performance on my two-core machine when it goes from one thread to two.

Benchmark.bm(3) do |b|
  (1..6).each do |num_threads|
    b.report("#{num_threads}:") {
      compute_batch(num_threads, numbers)
    }
  end
end
$ rvm jruby do ruby pool.rb 
          user     system      total        real
1:    3.290000   0.010000   3.300000 (  2.886000)
2:    3.070000   0.010000   3.080000 (  1.672000)
3:    3.930000   0.010000   3.940000 (  1.856000)
4:    4.880000   0.020000   4.900000 (  1.694000)
5:    4.330000   0.010000   4.340000 (  1.609000)
6:    4.790000   0.020000   4.810000 (  1.947000)

There are many patterns for concurrent processing, and we'll cover some others in future episodes. For cases when we can easily break the task to be done into discrete, independent chunks of work, a work queue coupled with a thread pool is a straightforward, easy-to-understand approach. And it's an approach that can be found at the heart of many applications, including some web servers.

That's all for now. Happy hacking!

Responses