Condition Variable
Video transcript & code
In the last episode we started writing our own thread-aware queue class. Right now, when a thread wants to pop an item off the queue, the #pop
method implements a busy-wait loop to continually check if there is an item available. This is an extremely inefficient and CPU-intensive way for a thread to wait. We'd like to find a better way.
Enter the condition variable. Let me explain what a condition variable does in terms of an analogy.
Let's say I go to the deli to pick up some roast beef and swiss cheese. It's a busy day at the deli, and there are four other customers waiting at the counter with me. There are only two workers behind the counter, and each of them can only fulfill one order at a time.
In order to keep things organized, the deli has a system. Each of the customers takes a number from a machine that spits out numbered cards when they arrive at the deli counter. Then they wait. As soon as one of the deli workers is free, they call out "now serving number 57", and whoever has that card gets to step forward and place their order.
A condition variable implements this system in software. The customers are threads which need access to a scarce resource or collection of resources. The deli workers are the collection of resources. The threads each wait on the condition variable, which is the equivalent of taking a number. When another thread releases a resource, it signals the condition variable, which then wakes up the next thread in line. In effect, it calls out that thread's number.
Let's apply a condition variable to our queue class. First we need to require the "thread" library to gain access to the ConditionVariable
class. Next we add a new instance variable pointing to a ConditionVariable
. We call it @item_available
, because that's the state that this variable will represent.
In the #push
method, after we've added the new object to the @items
array we #signal
the condition. If there are any threads waiting to pop an item off this will wake one—and only one!—of them up.
In the #pop
method we get rid of the busy-wait loop. In it's place, we send #wait
to the condition variable. This will cause the current thread to go to sleep until another thread signals that an item is available.
We pass the @lock
mutex into this call. This part is important! A condition variable always works along with a mutex. Passing the mutex in allows the condition variable to temporarily release the mutex early, so that other threads can enter into this critical section. This might seem like exactly what we're trying to avoid, but remember, those threads will encounter the exact same #wait
call and be forced to get in line. If @wait
didn't release the mutex, those threads would not be able to get far enough into this method to "take their number" at the deli counter, so to speak.
On the end of this line we qualify it: we say "only wait if the items array is empty". The whole point of waiting is to wait for an item to be available from the queue. If there is already one available, then there is no point waiting.
require "thread"
class MyQueue
def initialize
@lock = Mutex.new
@items = []
@item_available = ConditionVariable.new
end
def push(obj)
@items.push(obj)
@item_available.signal
end
def pop
@lock.synchronize do
@item_available.wait(@lock) if @items.empty?
puts "Pop goes the weasel!"
@items.shift
end
end
end
q = MyQueue.new
producer = Thread.new do
i = 0
loop do
sleep 0.1
widget = "widget#{i+=1}"
puts "producing #{widget}"
q.push(widget)
end
end
def consume(thread, q)
loop do
widget = q.pop
if widget
puts "#{thread} consuming #{widget}"
else
puts "FATAL: widget should never be nil"
exit!(1)
end
end
end
Thread.abort_on_exception = true
threads = (1..3).map do |n|
Thread.new do consume("thread #{n}", q) end
end
threads.each(&:join)
That's all we need to do. When we run this code, we can see that the producer is producing and the consumers are consuming. Not only that, but it moves quite a bit faster than the old busy-wait version. Now that the threads aren't occupying the CPU with endless looping, it's able to process widgets as quickly as they are manufactured.
We now have a queue which is largely equivalent in functionality to Ruby's standard library Queue
. In the next episode, we'll look at how to add a feature the standard Queue lacks: the ability to specify a timeout for "pop" operations.
Until then, happy hacking!
Responses