In Progress
Unit 1, Lesson 21
In Progress

Condition Variable

Video transcript & code

In the last episode we started writing our own thread-aware queue class. Right now, when a thread wants to pop an item off the queue, the #pop method implements a busy-wait loop to continually check if there is an item available. This is an extremely inefficient and CPU-intensive way for a thread to wait. We'd like to find a better way.

Enter the condition variable. Let me explain what a condition variable does in terms of an analogy.

Let's say I go to the deli to pick up some roast beef and swiss cheese. It's a busy day at the deli, and there are four other customers waiting at the counter with me. There are only two workers behind the counter, and each of them can only fulfill one order at a time.

In order to keep things organized, the deli has a system. Each of the customers takes a number from a machine that spits out numbered cards when they arrive at the deli counter. Then they wait. As soon as one of the deli workers is free, they call out "now serving number 57", and whoever has that card gets to step forward and place their order.

A condition variable implements this system in software. The customers are threads which need access to a scarce resource or collection of resources. The deli workers are the collection of resources. The threads each wait on the condition variable, which is the equivalent of taking a number. When another thread releases a resource, it signals the condition variable, which then wakes up the next thread in line. In effect, it calls out that thread's number.

Let's apply a condition variable to our queue class. First we need to require the "thread" library to gain access to the ConditionVariable class. Next we add a new instance variable pointing to a ConditionVariable. We call it @item_available, because that's the state that this variable will represent.

In the #push method, after we've added the new object to the @items array we #signal the condition. If there are any threads waiting to pop an item off this will wake one—and only one!—of them up.

In the #pop method we get rid of the busy-wait loop. In it's place, we send #wait to the condition variable. This will cause the current thread to go to sleep until another thread signals that an item is available.

We pass the @lock mutex into this call. This part is important! A condition variable always works along with a mutex. Passing the mutex in allows the condition variable to temporarily release the mutex early, so that other threads can enter into this critical section. This might seem like exactly what we're trying to avoid, but remember, those threads will encounter the exact same #wait call and be forced to get in line. If @wait didn't release the mutex, those threads would not be able to get far enough into this method to "take their number" at the deli counter, so to speak.

On the end of this line we qualify it: we say "only wait if the items array is empty". The whole point of waiting is to wait for an item to be available from the queue. If there is already one available, then there is no point waiting.

require "thread"

class MyQueue
  def initialize
    @lock  = Mutex.new
    @items = []
    @item_available = ConditionVariable.new
  end

  def push(obj)
    @items.push(obj)
    @item_available.signal
  end

  def pop
    @lock.synchronize do
      @item_available.wait(@lock) if @items.empty?
      puts "Pop goes the weasel!"
      @items.shift
    end
  end
end

q = MyQueue.new

producer = Thread.new do
  i = 0
  loop do
    sleep 0.1
    widget = "widget#{i+=1}"
    puts "producing #{widget}"
    q.push(widget)
  end
end

def consume(thread, q)
  loop do
    widget = q.pop
    if widget
      puts "#{thread} consuming #{widget}"
    else
      puts "FATAL: widget should never be nil"
      exit!(1)
    end
  end
end

Thread.abort_on_exception = true

threads = (1..3).map do |n|
  Thread.new do consume("thread #{n}", q) end
end

threads.each(&:join)

That's all we need to do. When we run this code, we can see that the producer is producing and the consumers are consuming. Not only that, but it moves quite a bit faster than the old busy-wait version. Now that the threads aren't occupying the CPU with endless looping, it's able to process widgets as quickly as they are manufactured.

We now have a queue which is largely equivalent in functionality to Ruby's standard library Queue. In the next episode, we'll look at how to add a feature the standard Queue lacks: the ability to specify a timeout for "pop" operations.

Until then, happy hacking!

Responses