In Progress
Unit 1, Lesson 21
In Progress

Threads are Hard

Video transcript & code

It's a truism that writing low-level multithreaded code is one of the harder challenges in programming. Just to show you an example, let's take another look at the queue class we've been building.

require "thread"

class MyQueue
  def initialize
    @lock  = Mutex.new
    @items = []
    @item_available = ConditionVariable.new
  end

  def push(obj)
    @items.push(obj)
    @item_available.signal
  end

  def pop(timeout = :never, &timeout_policy)
    cv_timeout = timeout == :never ? nil : timeout
    timeout_policy ||= ->{nil}
    @lock.synchronize do
      @item_available.wait(@lock, cv_timeout) if @items.empty?
      if @items.any?
        @items.shift
      else
        timeout_policy.call
      end
    end
  end
end

There is actually a latent bug in this code. Have you noticed it?

A few of episodes back, we surrounded part of the #pop code with a mutex synchronization block. Back then, the #push method was just one line, pushing a new object into the @items array. This one-liner didn't need any special synchronization.

But since then we've also added a condition variable to signal when items are available. And now this method is has two lines. This is a problem.

Why? Imagine a queue is empty. In a multithreaded program, it is entirely possible that the first line could be executed, pushing a new item onto the queue; then one consumer could immediately pop that item off the queue; then the second line could be executed, notifying another sleeping consumer thread that a new item is available. This notification would be a lie, since the first consumer already claimed that item.

Let's see if we can force this scenario. First, I'll put a line of logging code in between line 1 and line 2, which should increase the chances that another thread gets scheduled to run in between the two lines. Then we'll set up one producer thread and three consumers, all going as fast as they can.

require "thread"

class MyQueue
  def initialize
    @lock  = Mutex.new
    @items = []
    @item_available = ConditionVariable.new
  end

  def push(obj)
    @items.push(obj)
    puts "Signaling #{obj} is available"
    @item_available.signal
  end

  def pop(timeout = :never, &timeout_policy)
    cv_timeout = timeout == :never ? nil : timeout
    timeout_policy ||= ->{nil}
    @lock.synchronize do
      @item_available.wait(@lock, cv_timeout) if @items.empty?
      if @items.any?
        @items.shift
      else
        timeout_policy.call
      end
    end
  end
end

q = MyQueue.new
$shutdown = false

trap("INT") do
  $shutdown = true
end

producer = Thread.new do
  i = 0
  until($shutdown) do
    widget = "widget#{i+=1}"
    puts "producing #{widget}"
    q.push(widget)
  end
  puts "Producer shutting down"
end

def consume(thread, q)
  until($shutdown) do
    widget = q.pop
    if widget
      puts "#{thread} consuming #{widget}"
    else
      puts "BUG: No widget popped"
      exit(1)
    end
  end
  puts "Consumer #{thread} shutting down"
end

Thread.abort_on_exception = true

threads = (1..3).map do |n|
  Thread.new do consume("thread #{n}", q) end
end

producer.join
threads.each(&:join)

When I run this it only takes a fraction of a second before the program ends because of the bug.

Still, even in this fraction of a second, several widgets were successfully pushed and popped before the bug manifested. If we hadn't set up the scenario to make it extra likely that the bug would be triggered, the program might have run for minutes or hours before finally dying. Or worse, not dying but exhibiting anomolous behavior.

How do we fix the bug? It's simple enough: we surround the offending lines in a mutex-synchronized block:

require "thread"

class MyQueue
  def initialize
    @lock  = Mutex.new
    @items = []
    @item_available = ConditionVariable.new
  end

  def push(obj)
    @lock.synchronize do
      @items.push(obj)
      puts "Signaling #{obj} is available"
      @item_available.signal
    end
  end

  def pop(timeout = :never, &timeout_policy)
    cv_timeout = timeout == :never ? nil : timeout
    timeout_policy ||= ->{nil}
    @lock.synchronize do
      @item_available.wait(@lock, cv_timeout) if @items.empty?
      if @items.any?
        @items.shift
      else
        timeout_policy.call
      end
    end
  end
end

q = MyQueue.new
$shutdown = false

trap("INT") do
  $shutdown = true
end

producer = Thread.new do
  i = 0
  until($shutdown) do
    widget = "widget#{i+=1}"
    puts "producing #{widget}"
    q.push(widget)
  end
  puts "Producer shutting down"
end

def consume(thread, q)
  until($shutdown) do
    widget = q.pop
    if widget
      puts "#{thread} consuming #{widget}"
    else
      puts "BUG: No widget popped in #{thread}"
      exit(1)
    end
  end
  puts "Consumer #{thread} shutting down"
end

Thread.abort_on_exception = true

threads = (1..3).map do |n|
  Thread.new do consume("thread #{n}", q) end
end

producer.join
threads.each(&:join)

When we run this code… we still see the BUG?!!!

$ ruby fixed.rb 
producing widget1
Signaling widget1 is available
producing widget2
thread 1 consuming widget1Signaling widget2 is available

producing widget3BUG: No widget popped in thread 2

thread 1 consuming widget2
Signaling widget3 is available
producing widget4

What's going on this time? As it turns our, we implemented the timeout code in the #pop method all wrong. Consider the case where thread 1 is waiting for a new item. Since it is waiting, the mutex has been released. Meanwhile, thread 2 is just finishing handling the last item. The producer enqueues a new item, and thread two, seeing that there is an item available, doesn't bother waiting and immediately snatches it. When thread 1 finally wakes up as a result of being signaled from the #push, the queue is empty again.

If your head hurts right now, you're not alone. If I had more time I'd draw a picture to illustrate all this, but I'm not sure how much it would help.

Here's what we need to do. At the top of the pop method, we calculate a deadline, which is a wallclock time past which our timeout has expired. We use a special symbol :never to indicate that we should wait indefinitely.

We then surround the waiting and queue shifting code in a loop. We move the calculation of the timeout argument to #wait inside this loop. In the case of an indefinite wait it will be nil; otherwise it will be the remaining difference between now and the deadline.

We rearrange the code that determines if #wait will be called, and add a new condition ensuring that we will never call #wait with a timeout of less than zero.

We add a new case to the if statement inside this loop. In the case where the queue is still empty after waiting and the deadline has not yet expired—in other words, the case where this thread has been woken up but another thread has emptied the queue before it could acquire a lock—we will simply loop around and wait again.

Finally, we add break statements to the other two cases in this statement so that they will break out of the loop if either an item is successfully popped, or the timeout expires.

require "thread"

class MyQueue
  def initialize
    @lock  = Mutex.new
    @items = []
    @item_available = ConditionVariable.new
  end

  def push(obj)
    @lock.synchronize do
      @items.push(obj)
      puts "Signaling #{obj} is available"
      @item_available.signal
    end
  end

  def pop(timeout = :never, &timeout_policy)
    deadline   = timeout == :never ? :never : Time.now + timeout
    timeout_policy ||= ->{nil}
    @lock.synchronize do
      loop do
        cv_timeout = timeout == :never ? nil : deadline - Time.now
        if @items.empty? && cv_timeout.to_f >= 0
          @item_available.wait(@lock, cv_timeout) 
        end
        if @items.any?
          break @items.shift
        elsif deadline == :never || deadline > Time.now
          next
        else
          break timeout_policy.call
        end
      end
    end
  end
end

q = MyQueue.new
$shutdown = false

trap("INT") do
  $shutdown = true
end

producer = Thread.new do
  i = 0
  until($shutdown) do
    widget = "widget#{i+=1}"
    puts "producing #{widget}"
    q.push(widget)
  end
  puts "Producer shutting down"
end

def consume(thread, q)
  until($shutdown) do
    widget = q.pop
    if widget
      puts "#{thread} consuming #{widget}"
    else
      puts "BUG: No widget popped in #{thread}"
      exit(1)
    end
  end
  puts "Consumer #{thread} shutting down"
end

Thread.abort_on_exception = true

threads = (1..3).map do |n|
  Thread.new do consume("thread #{n}", q) end
end

producer.join
threads.each(&:join)

Now, finally, we can successfully run this program without unexpectedly getting a nil back from #pop.

$ ruby fixed2.rb 
producing widget1
Signaling widget1 is available
producing widget2
Signaling widget2 is available
producing widget3
Signaling widget3 is available

These kinds of latent bugs are the nightmare of multithreaded coding. I'd like to say I allowed them to sneak in just so I could demonstrate the dangers of coding with threads, but the truth is I simply hadn't noticed them until now. How did I finally realize they existed? Not by seeing them manifest. No; I only realized them after inspecting the code several times and realizing, from my experience writing multithreaded code, and also by comparison to other people's implementations of similar classes, that I'd made a mistake.

Just to scare you a little bit more: remember how I said that the original version of the #pop method was safe because it was a one-liner pushing an item onto the @items array? Well, this version was only safe in MRI, due to the happy accident that Arrays are implemented in C code and therefore their methods always executed in the context of the Global Interpreter Lock. In Rubinius or JRuby, that simple Array push alone would have been a potential bug, since Arrays are not inherently thread-safe in those implementations.

The moral of this story is that threading is hard; and often the only way to find bugs before they manifest in production code is through careful code inspection. If you are working with threads and are lucky enough to have someone else on your team who has written multithreaded code, it's a good idea to have them do a code review on any change or addition you make. Bribe them with cookies if you need to.

But threads aren't all fear and pain. Once you work the kinks out, there are few things more satisfying than watching many independent threads of execution performing an intricately choreographed dance, passing information back and forth and efficiently getting work done. And as we'll see in future episodes, all of the elegant higher-level patterns for concurrency are built on these simple primitives. So take heart—and happy hacking!

Responses