In Progress
Unit 1, Lesson 21
In Progress

Bounded Queue

Video transcript & code

For a few episodes now we've been building our own thread-safe queue implementation. One thing we haven't tackled yet is enabling our queue to be bounded—that is, to have a fixed maximum capacity. You may remember that in episode 136 we saw how making queues bounded could help us catch dead threads and other concurrency problems early by making it obvious when a queue wasn't being emptied fast enough, or at all.

Let's add the ability to set a maximum size on our queue class. But first, let's do a little refactoring. We're going to pull out a generic version of the code that waits for an item to be available in the queue. We start by copying the current contents of the #pop method, and pasting it into a new method called #wait_for_condition.

The new method will take four arguments:

  1. The condition variable which will signal that the condition may now be satisfied.
  2. A condition_predicate, which will be a callable object that returns true of the condition we are waiting for is currently satisfied.
  3. A timeout, which defaults to :never.
  4. A timeout policy, which defaults to returning nil.

We remove the timeout policy defaulting from the method body. Then we go through the method, replacing each part that refers specifically to waiting for an item to be available with its generic equivalent from the argument list. Instead of shifting an item off of the queue, we yield to the block passed to this generic method. We also change the break statements to return statements.

Now we go back to the #pop method and replace its body with a call to #wait_for_condition, filling in each argument with the specifics of waiting for an item to be available. We pass a block to the method telling it what to do when the condition is satisfied, which in this case is to shift the first item off of the queue.

require "thread"

class MyQueue
  def initialize
    @lock  = Mutex.new
    @items = []
    @item_available = ConditionVariable.new
  end

  def push(obj)
    @lock.synchronize do
      @items.push(obj)
      @item_available.signal
    end
  end

  def pop(timeout = :never, &timeout_policy)
    timeout_policy ||= ->{nil}
    wait_for_condition(
      @item_available,
      ->{@items.any?},
      timeout,
      timeout_policy) do 

      @items.shift 
    end
  end

  def wait_for_condition(
      cv, condition_predicate, timeout=:never, timeout_policy=->{nil})
    deadline = timeout == :never ? :never : Time.now + timeout
    @lock.synchronize do
      loop do
        cv_timeout = timeout == :never ? nil : deadline - Time.now
        if !condition_predicate.call && cv_timeout.to_f >= 0
          cv.wait(@lock, cv_timeout) 
        end
        if condition_predicate.call
          return yield
        elsif deadline == :never || deadline > Time.now
          next
        else
          return timeout_policy.call
        end
      end
    end
  end
end

q = MyQueue.new
$shutdown = false

trap("INT") do
  $shutdown = true
end

producer = Thread.new do
  i = 0
  until($shutdown) do
    widget = "widget#{i+=1}"
    puts "producing #{widget}"
    q.push(widget)
    sleep 1
  end
  puts "Producer shutting down"
end

def consume(thread, q)
  until($shutdown) do
    widget = q.pop
    if widget
      puts "#{thread} consuming #{widget}"
    else
      puts "BUG: No widget popped in #{thread}"
      exit(1)
    end
  end
  puts "Consumer #{thread} shutting down"
end

Thread.abort_on_exception = true

threads = (1..3).map do |n|
  Thread.new do consume("thread #{n}", q) end
end

producer.join
threads.each(&:join)

Now let's think about setting a limit on the number of items the queue can hold. First we'll need a the initializer to take an optional argument setting the max size. It will default to unlimited size. We will save this setting in an instance variable.

We have a new condition that we care about in a bounded queue. Not only do we care when an item becomes available in the queue, if there is a size limit we also care about when space becomes available. We add a new condition variable we can use to signal this condition.

We'll be needing to check if the queue is full, so we add a #full? predicate method to make this easy.

Next, we modify the #push method. Since we may have to wait for space to be available, we add a new timeout argument. We also add a block argument which will be used as a timeout policy. We default this policy to raise an exception when a #push times out on a full queue.

We then replace the lock synchronization with a call to our new #wait_for_condition method. We pass the @space_available condition variable. For a condition predicate, we supply a lambda checking that the queue is not full. We also pass the timeout and timeout policy.

Now we have a #push method which will wait, possibly with a timeout, until there is space available in the queue. We need to make one more change: We return to the #pop method, and add a line signaling that there is space available after the shift.

OK, let's test it out.

We give our test queue a max size of 10. We update the queue push to time out after a tenth of a second. We make sure our producer thread is producing widgets at a fast clip, and set up our consumers to be a little bit slower.

require "thread"

class MyQueue
  def initialize(max_size = :infinite)
    @lock  = Mutex.new
    @items = []
    @item_available = ConditionVariable.new
    @max_size = max_size
    @space_available = ConditionVariable.new
  end

  def push(obj, timeout=:never, &timeout_policy)
    timeout_policy ||= -> do
      raise "Push timed out"
    end
    wait_for_condition(
      @space_available,
      ->{!full?},
      timeout,
      timeout_policy) do

      @items.push(obj)
      @item_available.signal
    end
  end

  def pop(timeout = :never, &timeout_policy)
    timeout_policy ||= ->{nil}
    wait_for_condition(
      @item_available,
      ->{@items.any?},
      timeout,
      timeout_policy) do 

      item = @items.shift 
      @space_available.signal unless full?
      item
    end
  end

  private

  def full?
    return false if @max_size == :infinite
    @max_size <= @items.size
  end

  def wait_for_condition(
      cv, condition_predicate, timeout=:never, timeout_policy=->{nil})
    deadline = timeout == :never ? :never : Time.now + timeout
    @lock.synchronize do
      loop do
        cv_timeout = timeout == :never ? nil : deadline - Time.now
        if !condition_predicate.call && cv_timeout.to_f >= 0
          cv.wait(@lock, cv_timeout) 
        end
        if condition_predicate.call
          return yield
        elsif deadline == :never || deadline > Time.now
          next
        else
          return timeout_policy.call
        end
      end
    end
  end
end

q = MyQueue.new(10)
$shutdown = false

trap("INT") do
  $shutdown = true
end

producer = Thread.new do
  i = 0
  until($shutdown) do
    widget = "widget#{i+=1}"
    puts "producing #{widget}"
    q.push(widget, 0.1)
    sleep 0.1
  end
  puts "Producer shutting down"
end

def consume(thread, q)
  until($shutdown) do
    widget = q.pop
    if widget
      puts "#{thread} consuming #{widget}"
    else
      puts "BUG: No widget popped in #{thread}"
      exit(1)
    end
    sleep 0.5
  end
  puts "Consumer #{thread} shutting down"
end

Thread.abort_on_exception = true

threads = (1..3).map do |n|
  Thread.new do consume("thread #{n}", q) end
end

producer.join
threads.each(&:join)

Then we run the code. It runs for a while, but eventually the queue fills up and we see a "Push timed out" error.

And that's it: we've now created a thread-safe, optionally bounded, timeout-enabled queue suitable for communicating from one thread to another.

There are a few more concerns we should address before we call this production quality, mostly having to do with thread interrupts. But we'll deal with that another day. Happy hacking!

Responses