In Progress
Unit 1, Lesson 1
In Progress

Timed Queue

Video transcript & code

In episode 136, we talked about the problem of threads silently dying. And we looked at how this can lead to other threads silently hanging indefinitely as they wait for a resource or signal that will never come.

A good strategy for ensuring errors like this are detected quickly is to never allow a thread to wait indefinitely for something. Instead, in any case where a thread might wait for a resource we ensure that we provide a timeout for the wait. The timeout should be long enough that it will never be exceeded so long as the program is operating normally.

Some of Ruby's threading primitives give us the ability to optionally specify timeouts for operations that might block the thread. For instance, when waiting for a condition variable to signal that a resource is available, we can specify an optional number of seconds to wait. If the condition variable doesn't signal within that time, the #wait call times out.

When we specify a timeout, after we send the #wait message it's important to check to see if we actually have the resource we were waiting for, or if the wait timed out before the resource became available.

require "thread"

cv = ConditionVariable.new
l  = Mutex.new
resource_ready = false

Thread.new do
  l.synchronize do
    cv.wait(l, 1)
    if resource_ready
      puts "Acquired the resource"
    else
      puts "Timed out without acquiring"
    end
  end
end

By the way, one second is an eternity in CPU terms. In Ruby, whenever a method takes a timeout measured in seconds, we can use a floating point number to specify a time period of less than a second. So for instance we could specify a fifty millisecond wait by passing 0.05.

cv.wait(l, 0.05)

Let's return to the queue class we've been building. Right now, the #pop method will wait forever for a new item to be available in the queue. Let's add an argument to that method, enabling the caller to specify a timeout in seconds. We'll default it to a special symbol indicating that the method should wait indefinitely.

Ruby's condition variable class does not use symbolic placeholders. So before we pass the timeout on we have to translate the special :never symbol to nil, which is how we tell the ConditionVariable#wait method to wait indefinitely. At this point you might be wondering why we didn't just make the default value of the argument be nil. This is a legitimate question, but personally I feel that the value of the self-documenting symbolic argument is worth a little extra code. See episodes 108 and 111 for more on this topic.

Next we change the call to ConditionVariable#wait to include the timeout. Now we have to handle the case where this call times out before a new item becomes available. We add a check after the #wait to see if there are any items in the queue. If so, we continue as before to shift an item off of the front of the queue and return it. Otherwise, we return nil.

With the class updated, we change our producer thread to wait a second in between producing widgets, and our consumer thread to time out after half a second of waiting for a widget. Instead of exiting when #pop returns nil, we just report the timeout and go around again.

When we run this code, we can see that the consumer regularly times out as the slow producer fails to feed it widgets fast enough.

require "thread"

class MyQueue
  def initialize
    @lock  = Mutex.new
    @items = []
    @item_available = ConditionVariable.new
  end

  def push(obj)
    @items.push(obj)
    @item_available.signal
  end

  def pop(timeout = :never)
    cv_timeout = timeout == :never ? nil : timeout
    @lock.synchronize do
      @item_available.wait(@lock, cv_timeout) if @items.empty?
      if @items.any?
        @items.shift
      else
        nil
      end
    end
  end
end

q = MyQueue.new

producer = Thread.new do
  i = 0
  loop do
    sleep 1
    widget = "widget#{i+=1}"
    puts "producing #{widget}"
    q.push(widget)
  end
end

def consume(thread, q)
  loop do
    widget = q.pop(0.5)
    if widget
      puts "#{thread} consuming #{widget}"
    else
      puts "Timed out waiting for a widget"
    end
  end
end

Thread.abort_on_exception = true

threads = (1..3).map do |n|
  Thread.new do consume("thread #{n}", q) end
end

producer.join
threads.each(&:join)
$ ruby timed_queue.rb 
Timed out waiting for a widgetTimed out waiting for a widget
Timed out waiting for a widget

producing widget1
thread 2 consuming widget1
Timed out waiting for a widget
Timed out waiting for a widget
Timed out waiting for a widget
Timed out waiting for a widget
Timed out waiting for a widget
producing widget2
thread 2 consuming widget2

One of the most common reasons to loop with a timeout like this is as a way to facilitate clean thread shutdown. Right now, our threads just loop until we hit Control-C, which immediately terminates them in the middle of whatever they were doing. Let's change this. We'll introduce a variable called $shutdown. It will default to false. Then we'll trap the interrupt triggered by Control-C, and handle it by setting the shutdown variable to true.

Inside each thread, we modify the loops to loop until shutdown is true. Once the their main loop ends, the threads log the fact that they are shutting down before finishing. Note that in the consumer threads, if we had no #pop timeout and the queue was empty, the thread might wait forever to pop an item even though the producer had already shut down. It's the presence of the timeout which enables the consumer thread to periodically stop waiting on the queue and do housekeeping - such as checking to see if the $shutdown flag has been set.

require "thread"

class MyQueue
  def initialize
    @lock  = Mutex.new
    @items = []
    @item_available = ConditionVariable.new
  end

  def push(obj)
    @items.push(obj)
    @item_available.signal
  end

  def pop(timeout = :never)
    cv_timeout = timeout == :never ? nil : timeout
    @lock.synchronize do
      @item_available.wait(@lock, cv_timeout) if @items.empty?
      if @items.any?
        @items.shift
      else
        nil
      end
    end
  end
end

q = MyQueue.new
$shutdown = false

trap("INT") do
  $shutdown = true
end

producer = Thread.new do
  i = 0
  until($shutdown) do
    sleep 1
    widget = "widget#{i+=1}"
    puts "producing #{widget}"
    q.push(widget)
  end
  puts "Producer shutting down"
end

def consume(thread, q)
  until($shutdown) do
    widget = q.pop(0.5)
    if widget
      puts "#{thread} consuming #{widget}"
    else
      puts "Timed out waiting for a widget"
    end
  end
  puts "Consumer #{thread} shutting down"
end

Thread.abort_on_exception = true

threads = (1..3).map do |n|
  Thread.new do consume("thread #{n}", q) end
end

producer.join
threads.each(&:join)

Let's run this code and see what happens. After a few seconds we hit Control-C. This time, instead of instantly exiting with a stack trace, we see the threads cleanly shutting down.

$ ruby shutdown.rb 
Timed out waiting for a widgetTimed out waiting for a widget
Timed out waiting for a widget

producing widget1
thread 3 consuming widget1
Timed out waiting for a widget
Timed out waiting for a widget
Timed out waiting for a widget
Timed out waiting for a widgetTimed out waiting for a widget

^Cproducing widget2
Producer shutting downthread 3 consuming widget2

Consumer thread 3 shutting down
Timed out waiting for a widget
Consumer thread 1 shutting downTimed out waiting for a widget

Consumer thread 2 shutting down

Before we finish, let's make one more change to our queue class. Right now, our queue has a subtle limitation: if the queue can legitimately contain nil values, there is no way to tell the difference between a successful queue pop and a queue pop that timed out.

Now, I want to stop right here and say that if your application is passing around nil values from one thread to another, you should probably reconsider your design. But be that as it may, when we're coding a utility class it's not our job to impose limits on how applications make use of it.

Instead, let's take inspiration from the #fetch method that we've discussed in previous episodes. We'll enable #pop to take an optional block, which will determine the policy for handling timeouts. We'll default this policy to one that simply returns nil. And we'll replace the line that currently returns nil with one that executes the timeout policy.

require "thread"

class MyQueue
  def initialize
    @lock  = Mutex.new
    @items = []
    @item_available = ConditionVariable.new
  end

  def push(obj)
    @items.push(obj)
    @item_available.signal
  end

  def pop(timeout = :never, &timeout_policy)
    cv_timeout = timeout == :never ? nil : timeout
    timeout_policy ||= ->{nil}
    @lock.synchronize do
      @item_available.wait(@lock, cv_timeout) if @items.empty?
      if @items.any?
        @items.shift
      else
        timeout_policy.call
      end
    end
  end
end

q = MyQueue.new
$shutdown = false

trap("INT") do
  $shutdown = true
end

producer = Thread.new do
  i = 0
  until($shutdown) do
    sleep 1
    widget = "widget#{i+=1}"
    puts "producing #{widget}"
    q.push(widget)
  end
  puts "Producer shutting down"
end

def consume(thread, q)
  until($shutdown) do
    widget = q.pop(0.5)
    if widget
      puts "#{thread} consuming #{widget}"
    else
      puts "Timed out waiting for a widget"
    end
  end
  puts "Consumer #{thread} shutting down"
end

Thread.abort_on_exception = true

threads = (1..3).map do |n|
  Thread.new do consume("thread #{n}", q) end
end

producer.join
threads.each(&:join)

This immediately gives client code a great deal of flexibility. If the code wants to use a special placeholder value to represent a timeout, it can.

widget = q.pop(0.5){:timed_out}
if widget != :timed_out
  puts "#{thread} consuming #{widget}"
else
  puts "Timed out waiting for a widget"
end

Or let's say we want to end the consumer thread in the case of a timeout. To do that we can simply use the block passed to #pop to break out of the loop.

require "thread"

class MyQueue
  def initialize
    @lock  = Mutex.new
    @items = []
    @item_available = ConditionVariable.new
  end

  def push(obj)
    @items.push(obj)
    @item_available.signal
  end

  def pop(timeout = :never, &timeout_policy)
    cv_timeout = timeout == :never ? nil : timeout
    timeout_policy ||= ->{nil}
    @lock.synchronize do
      @item_available.wait(@lock, cv_timeout) if @items.empty?
      if @items.any?
        @items.shift
      else
        timeout_policy.call
      end
    end
  end
end

q = MyQueue.new
$shutdown = false

trap("INT") do
  $shutdown = true
end

producer = Thread.new do
  i = 0
  until($shutdown) do
    sleep 1
    widget = "widget#{i+=1}"
    puts "producing #{widget}"
    q.push(widget)
  end
  puts "Producer shutting down"
end

def consume(thread, q)
  until($shutdown) do
    widget = q.pop(0.5) do
      puts "#{thread} timed out; shutting down"
      return
    end
    puts "#{thread} consuming #{widget}"
  end
  puts "Consumer #{thread} shutting down"
end

Thread.abort_on_exception = true

threads = (1..3).map do |n|
  Thread.new do consume("thread #{n}", q) end
end

producer.join
threads.each(&:join)

Let's run this version and see what happens.

$ ruby break_on_timeout.rb 
thread 1 timed out; shutting down
thread 2 timed out; shutting down
thread 3 timed out; shutting down
producing widget1
producing widget2
producing widget3
producing widget4
producing widget5
^Cproducing widget6
Producer shutting down

As we can see, all the consumers die out as the slow producer can't give them widgets fast enough.

We've now implemented a simple timed queue. In the next episode, we'll apply timeouts to a size-constrained queue. Until then, happy hacking!

Responses