Threads are Hard
Video transcript & code
It's a truism that writing low-level multithreaded code is one of the harder challenges in programming. Just to show you an example, let's take another look at the queue class we've been building.
require "thread"
class MyQueue
def initialize
@lock = Mutex.new
@items = []
@item_available = ConditionVariable.new
end
def push(obj)
@items.push(obj)
@item_available.signal
end
def pop(timeout = :never, &timeout_policy)
cv_timeout = timeout == :never ? nil : timeout
timeout_policy ||= ->{nil}
@lock.synchronize do
@item_available.wait(@lock, cv_timeout) if @items.empty?
if @items.any?
@items.shift
else
timeout_policy.call
end
end
end
end
There is actually a latent bug in this code. Have you noticed it?
A few of episodes back, we surrounded part of the #pop
code with a mutex synchronization block. Back then, the #push
method was just one line, pushing a new object into the @items
array. This one-liner didn't need any special synchronization.
But since then we've also added a condition variable to signal when items are available. And now this method is has two lines. This is a problem.
Why? Imagine a queue is empty. In a multithreaded program, it is entirely possible that the first line could be executed, pushing a new item onto the queue; then one consumer could immediately pop that item off the queue; then the second line could be executed, notifying another sleeping consumer thread that a new item is available. This notification would be a lie, since the first consumer already claimed that item.
Let's see if we can force this scenario. First, I'll put a line of logging code in between line 1 and line 2, which should increase the chances that another thread gets scheduled to run in between the two lines. Then we'll set up one producer thread and three consumers, all going as fast as they can.
require "thread"
class MyQueue
def initialize
@lock = Mutex.new
@items = []
@item_available = ConditionVariable.new
end
def push(obj)
@items.push(obj)
puts "Signaling #{obj} is available"
@item_available.signal
end
def pop(timeout = :never, &timeout_policy)
cv_timeout = timeout == :never ? nil : timeout
timeout_policy ||= ->{nil}
@lock.synchronize do
@item_available.wait(@lock, cv_timeout) if @items.empty?
if @items.any?
@items.shift
else
timeout_policy.call
end
end
end
end
q = MyQueue.new
$shutdown = false
trap("INT") do
$shutdown = true
end
producer = Thread.new do
i = 0
until($shutdown) do
widget = "widget#{i+=1}"
puts "producing #{widget}"
q.push(widget)
end
puts "Producer shutting down"
end
def consume(thread, q)
until($shutdown) do
widget = q.pop
if widget
puts "#{thread} consuming #{widget}"
else
puts "BUG: No widget popped"
exit(1)
end
end
puts "Consumer #{thread} shutting down"
end
Thread.abort_on_exception = true
threads = (1..3).map do |n|
Thread.new do consume("thread #{n}", q) end
end
producer.join
threads.each(&:join)
When I run this it only takes a fraction of a second before the program ends because of the bug.
Still, even in this fraction of a second, several widgets were successfully pushed and popped before the bug manifested. If we hadn't set up the scenario to make it extra likely that the bug would be triggered, the program might have run for minutes or hours before finally dying. Or worse, not dying but exhibiting anomolous behavior.
How do we fix the bug? It's simple enough: we surround the offending lines in a mutex-synchronized block:
require "thread"
class MyQueue
def initialize
@lock = Mutex.new
@items = []
@item_available = ConditionVariable.new
end
def push(obj)
@lock.synchronize do
@items.push(obj)
puts "Signaling #{obj} is available"
@item_available.signal
end
end
def pop(timeout = :never, &timeout_policy)
cv_timeout = timeout == :never ? nil : timeout
timeout_policy ||= ->{nil}
@lock.synchronize do
@item_available.wait(@lock, cv_timeout) if @items.empty?
if @items.any?
@items.shift
else
timeout_policy.call
end
end
end
end
q = MyQueue.new
$shutdown = false
trap("INT") do
$shutdown = true
end
producer = Thread.new do
i = 0
until($shutdown) do
widget = "widget#{i+=1}"
puts "producing #{widget}"
q.push(widget)
end
puts "Producer shutting down"
end
def consume(thread, q)
until($shutdown) do
widget = q.pop
if widget
puts "#{thread} consuming #{widget}"
else
puts "BUG: No widget popped in #{thread}"
exit(1)
end
end
puts "Consumer #{thread} shutting down"
end
Thread.abort_on_exception = true
threads = (1..3).map do |n|
Thread.new do consume("thread #{n}", q) end
end
producer.join
threads.each(&:join)
When we run this code… we still see the BUG?!!!
$ ruby fixed.rb producing widget1 Signaling widget1 is available producing widget2 thread 1 consuming widget1Signaling widget2 is available producing widget3BUG: No widget popped in thread 2 thread 1 consuming widget2 Signaling widget3 is available producing widget4
What's going on this time? As it turns our, we implemented the timeout code in the #pop
method all wrong. Consider the case where thread 1 is waiting for a new item. Since it is waiting, the mutex has been released. Meanwhile, thread 2 is just finishing handling the last item. The producer enqueues a new item, and thread two, seeing that there is an item available, doesn't bother waiting and immediately snatches it. When thread 1 finally wakes up as a result of being signaled from the #push
, the queue is empty again.
If your head hurts right now, you're not alone. If I had more time I'd draw a picture to illustrate all this, but I'm not sure how much it would help.
Here's what we need to do. At the top of the pop method, we calculate a deadline, which is a wallclock time past which our timeout has expired. We use a special symbol :never
to indicate that we should wait indefinitely.
We then surround the waiting and queue shifting code in a loop. We move the calculation of the timeout argument to #wait
inside this loop. In the case of an indefinite wait it will be nil
; otherwise it will be the remaining difference between now and the deadline.
We rearrange the code that determines if #wait
will be called, and add a new condition ensuring that we will never call #wait
with a timeout of less than zero.
We add a new case to the if
statement inside this loop. In the case where the queue is still empty after waiting and the deadline has not yet expired—in other words, the case where this thread has been woken up but another thread has emptied the queue before it could acquire a lock—we will simply loop around and wait again.
Finally, we add break statements to the other two cases in this statement so that they will break out of the loop if either an item is successfully popped, or the timeout expires.
require "thread"
class MyQueue
def initialize
@lock = Mutex.new
@items = []
@item_available = ConditionVariable.new
end
def push(obj)
@lock.synchronize do
@items.push(obj)
puts "Signaling #{obj} is available"
@item_available.signal
end
end
def pop(timeout = :never, &timeout_policy)
deadline = timeout == :never ? :never : Time.now + timeout
timeout_policy ||= ->{nil}
@lock.synchronize do
loop do
cv_timeout = timeout == :never ? nil : deadline - Time.now
if @items.empty? && cv_timeout.to_f >= 0
@item_available.wait(@lock, cv_timeout)
end
if @items.any?
break @items.shift
elsif deadline == :never || deadline > Time.now
next
else
break timeout_policy.call
end
end
end
end
end
q = MyQueue.new
$shutdown = false
trap("INT") do
$shutdown = true
end
producer = Thread.new do
i = 0
until($shutdown) do
widget = "widget#{i+=1}"
puts "producing #{widget}"
q.push(widget)
end
puts "Producer shutting down"
end
def consume(thread, q)
until($shutdown) do
widget = q.pop
if widget
puts "#{thread} consuming #{widget}"
else
puts "BUG: No widget popped in #{thread}"
exit(1)
end
end
puts "Consumer #{thread} shutting down"
end
Thread.abort_on_exception = true
threads = (1..3).map do |n|
Thread.new do consume("thread #{n}", q) end
end
producer.join
threads.each(&:join)
Now, finally, we can successfully run this program without unexpectedly getting a nil
back from #pop
.
$ ruby fixed2.rb producing widget1 Signaling widget1 is available producing widget2 Signaling widget2 is available producing widget3 Signaling widget3 is available
These kinds of latent bugs are the nightmare of multithreaded coding. I'd like to say I allowed them to sneak in just so I could demonstrate the dangers of coding with threads, but the truth is I simply hadn't noticed them until now. How did I finally realize they existed? Not by seeing them manifest. No; I only realized them after inspecting the code several times and realizing, from my experience writing multithreaded code, and also by comparison to other people's implementations of similar classes, that I'd made a mistake.
Just to scare you a little bit more: remember how I said that the original version of the #pop
method was safe because it was a one-liner pushing an item onto the @items
array? Well, this version was only safe in MRI, due to the happy accident that Arrays are implemented in C code and therefore their methods always executed in the context of the Global Interpreter Lock. In Rubinius or JRuby, that simple Array push alone would have been a potential bug, since Arrays are not inherently thread-safe in those implementations.
The moral of this story is that threading is hard; and often the only way to find bugs before they manifest in production code is through careful code inspection. If you are working with threads and are lucky enough to have someone else on your team who has written multithreaded code, it's a good idea to have them do a code review on any change or addition you make. Bribe them with cookies if you need to.
But threads aren't all fear and pain. Once you work the kinks out, there are few things more satisfying than watching many independent threads of execution performing an intricately choreographed dance, passing information back and forth and efficiently getting work done. And as we'll see in future episodes, all of the elegant higher-level patterns for concurrency are built on these simple primitives. So take heart—and happy hacking!
Responses