In Progress
Unit 1, Lesson 1
In Progress

# Active Object

Video transcript & code

Back in Episode #127, we wrote a Ruby equivalent to a basic Elixir program using threads and queues. That example did a decent job demonstrating that we can emulate an Erlang-style Actor model by passing simple arrays from thread to thread via queues. But for the actual task at handâ€“calculating a list of fibonacci numbers in parallelâ€“the approach felt a bit like overkill.

Today let's revisit this problem, but apply a different pattern in order to arrive at a solution. As before, we'll use a deliberately naive fibonacci function, in order to simulate some processor-intensive activity that we want to split across multiple CPU cores.

``````def fib_calc(n)
case n
when 0, 1 then 1
else fib_calc(n-1) + fib_calc(n-2)
end
end
``````

We're going to make use of the timed, sized thread-safe queue class that we built up over a series of recent episodes. We've renamed this class `TapasQueue`. We'll also tell Ruby to immediately abort when a thread encounters an exception, in order to ease debugging.

``````require "./tapas_queue"

``````

First off, we're going to create a new class we'll call `WorkQueue`. The job of a Work Queue is to keep a queue of jobs to perform, and distribute those jobs out to a pool of "worker" threads.

A work queue will receive a thread count on initialization. It will queue up work to be done in a `@jobs` queue. We cap this queue, somewhat arbitrarily, at 10 items. As we learned in the "dead thread" episode, it's usually a good idea to have some kind of limit on queue size so that we can more quickly discover if they are not being emptied in a timely fashion.

The next step in initializing our work queue is to set up the thread pool. We use the `thread_count` to determine how many threads to start. Each thread delegates its actual logic to a method called `#dispatch`, which we'll get to in a moment.

We need a way to put new jobs into the work queue. We satisfy this requirement with an `#enqueue` method.

We also want to be able to shut down the work queue when we have no more work for it. We write a `#stop` method. It enqueues a special `:die` symbol once for each thread in the pool. Then it waits for all the threads to end, using the `Thread#join` method.

Next up, we tackle the heart of the worker threads, the `#dispatch` method. This method loops indefinitely. On each iteration, it pops a job off of the work queue, raising an exception if it waits longer than 30 seconds.

If the job is the special `:die` instruction, it immediately breaks out of the loop. This will result in the thread terminating.

Otherwise, it executes the job. This consists of simply sending the `#call` message to the `job`. What this accomplishes is left up to the job object.

This work queue is completely generic: its pool of threads simply calls each job that is enqueued. The jobs themselves could be simple procs or lambdas, or they could be complex objects that respond to the `#call` method.

``````class WorkQueue
@jobs    = TapasQueue.new(10)
dispatch(worker_num)
end
}
end

def enqueue(job)
@jobs.push(job)
end

def stop
enqueue(:die)
end
end

private

def dispatch(worker_num)
\$log.puts "W#{worker_num}: Starting"
loop do
job = @jobs.pop(30) do
raise "W#{worker_num}: Timed out waiting for job"
end
break if job == :die
\$log.puts "W#{worker_num}: working..."
job.call
end
\$log.puts "W#{worker_num}: terminating"
end
end
``````

Now to put this work queue to work. Let's write a method to compute a batch of fibonacci solutions. It'll receive a thread count and a list of numbers to compute. We'll instantiate a work queue to process the calculations, using the passed `thread_count` to determine the size of the thread pool. We'll also set up a queue to collect answers as they are produces, and an empty hash intended to contain the final set of solutions.

We create a producer thread which loops through the list of numbers to process. For each one, it enqueues a lambda onto the work queue. The lambda contains code to calculate a single answer, and then push the answer, along with the original number, onto the `answer_queue`.

We then create a collector thread. This thread waits on the answer queue. As answers arrive, it fills in the `solutions` hash. This thread will finish once there is a solution for every given number.

You might be wondering why we don't simply have the job lambdas fill in the solutions hash directly. The reasons is that we don't know if Hash operations are thread-safe on all Ruby implementations. For maximum safety, we would need to surround all `solutions` hash access in a Mutex synchronization block. That wouldn't be a terrible solution, but as a general rule of thumb, when writing threaded programs we want to make the individual threads as independent as possible. We try to avoid making them share and potentially contend over a resource. Every time we add shared resources and manual synchronization, we increase our chances of introducing subtle threading bugs.

So instead, we have our jobs push results into a thread-safe answer queue, and we delegate the job of collating the results to a dedicated thread. Since this thread is the only one touching the `solutions` hash until it finishes, there is no contention.

Finally, this method waits for all the threads to stop, and returns the completed `solutions` hash.

``````def compute_batch(thread_count, numbers)
solutions    = {}

numbers.each do |n|
\$log.puts "P: Pushing #{n}"
work_queue.enqueue(-> do
result = fib_calc(n)
end)
end
end

while solutions.size < numbers.size
raise "Collector timed out waiting for an answer"
end
end
end
producer.join
collector.join
work_queue.stop
solutions
end
``````

We wrap up by writing an invocation of this method with four threads specified, and our original list of input numbers from episode 127.

``````numbers = [
27, 33, 35, 11, 36, 29, 18, 37, 21, 31, 19, 10, 14, 30,
15, 17, 23, 28, 25, 34, 22, 20, 13, 16, 32, 12, 26, 24,
]

p compute_batch(4, numbers)
``````

Let's run this code:

```\$ rvm ruby-2.0.0 do ruby pool.rb
W2: Starting
W1: StartingW0: Starting
P: Pushing 27
W3: Starting

P: Pushing 33
W3: working...
P: Pushing 35
W0: working...P: Pushing 11
W1: working...

P: Pushing 36
W2: working...
P: Pushing 29
W2: working...C: Collecting answer for 11

W3: working...
C: Collecting answer for 33P: Pushing 18

C: Collecting answer for 29P: Pushing 37
W0: working...

P: Pushing 21
P: Pushing 31W3: working...
W1: working...

C: Collecting answer for 21P: Pushing 19W0: working...

P: Pushing 10
W1: working...P: Pushing 14
W0: working...
P: Pushing 30
W2: working...

P: Pushing 15C: Collecting answer for 19
W1: working...C: Collecting answer for 10

P: Pushing 17
W0: working...
C: Collecting answer for 30P: Pushing 23
W2: working...
W1: working...
P: Pushing 28

C: Collecting answer for 15P: Pushing 25
W0: working...
P: Pushing 34
W2: working...

P: Pushing 22
W1: working...
C: Collecting answer for 25P: Pushing 20W0: working...

P: Pushing 13
W2: working...
P: Pushing 16W3: working...

P: Pushing 32W0: working...

P: Pushing 12W2: working...

P: Pushing 26W1: working...

P: Pushing 24W3: working...

W0: working...
W1: terminating
W2: terminating
W3: terminating
W0: terminating
```

The output is a little messy because the `puts` statements from various threads are stepping on each other a bit. But we can see that multiple workers are popping work off the queue and executing it.

We've implemented a basic thread pool in Ruby. If we benchmark this solution under a Ruby implementation where threads are truly independent, we can see a modest improvement in performance on my two-core machine when it goes from one thread to two.

``````Benchmark.bm(3) do |b|
}
end
end
``````
```\$ rvm jruby do ruby pool.rb
user     system      total        real
1:    3.290000   0.010000   3.300000 (  2.886000)
2:    3.070000   0.010000   3.080000 (  1.672000)
3:    3.930000   0.010000   3.940000 (  1.856000)
4:    4.880000   0.020000   4.900000 (  1.694000)
5:    4.330000   0.010000   4.340000 (  1.609000)
6:    4.790000   0.020000   4.810000 (  1.947000)
```

There are many patterns for concurrent processing, and we'll cover some others in future episodes. For cases when we can easily break the task to be done into discrete, independent chunks of work, a work queue coupled with a thread pool is a straightforward, easy-to-understand approach. And it's an approach that can be found at the heart of many applications, including some web servers.

That's all for now. Happy hacking!