Timed Queue
Video transcript & code
In episode 136, we talked about the problem of threads silently dying. And we looked at how this can lead to other threads silently hanging indefinitely as they wait for a resource or signal that will never come.
A good strategy for ensuring errors like this are detected quickly is to never allow a thread to wait indefinitely for something. Instead, in any case where a thread might wait for a resource we ensure that we provide a timeout for the wait. The timeout should be long enough that it will never be exceeded so long as the program is operating normally.
Some of Ruby's threading primitives give us the ability to optionally specify timeouts for operations that might block the thread. For instance, when waiting for a condition variable to signal that a resource is available, we can specify an optional number of seconds to wait. If the condition variable doesn't signal within that time, the #wait
call times out.
When we specify a timeout, after we send the #wait
message it's important to check to see if we actually have the resource we were waiting for, or if the wait timed out before the resource became available.
require "thread"
cv = ConditionVariable.new
l = Mutex.new
resource_ready = false
Thread.new do
l.synchronize do
cv.wait(l, 1)
if resource_ready
puts "Acquired the resource"
else
puts "Timed out without acquiring"
end
end
end
By the way, one second is an eternity in CPU terms. In Ruby, whenever a method takes a timeout measured in seconds, we can use a floating point number to specify a time period of less than a second. So for instance we could specify a fifty millisecond wait by passing 0.05
.
cv.wait(l, 0.05)
Let's return to the queue class we've been building. Right now, the #pop
method will wait forever for a new item to be available in the queue. Let's add an argument to that method, enabling the caller to specify a timeout in seconds. We'll default it to a special symbol indicating that the method should wait indefinitely.
Ruby's condition variable class does not use symbolic placeholders. So before we pass the timeout on we have to translate the special :never
symbol to nil
, which is how we tell the ConditionVariable#wait
method to wait indefinitely. At this point you might be wondering why we didn't just make the default value of the argument be nil
. This is a legitimate question, but personally I feel that the value of the self-documenting symbolic argument is worth a little extra code. See episodes 108 and 111 for more on this topic.
Next we change the call to ConditionVariable#wait
to include the timeout. Now we have to handle the case where this call times out before a new item becomes available. We add a check after the #wait
to see if there are any items in the queue. If so, we continue as before to shift an item off of the front of the queue and return it. Otherwise, we return nil
.
With the class updated, we change our producer thread to wait a second in between producing widgets, and our consumer thread to time out after half a second of waiting for a widget. Instead of exiting when #pop
returns nil
, we just report the timeout and go around again.
When we run this code, we can see that the consumer regularly times out as the slow producer fails to feed it widgets fast enough.
require "thread"
class MyQueue
def initialize
@lock = Mutex.new
@items = []
@item_available = ConditionVariable.new
end
def push(obj)
@items.push(obj)
@item_available.signal
end
def pop(timeout = :never)
cv_timeout = timeout == :never ? nil : timeout
@lock.synchronize do
@item_available.wait(@lock, cv_timeout) if @items.empty?
if @items.any?
@items.shift
else
nil
end
end
end
end
q = MyQueue.new
producer = Thread.new do
i = 0
loop do
sleep 1
widget = "widget#{i+=1}"
puts "producing #{widget}"
q.push(widget)
end
end
def consume(thread, q)
loop do
widget = q.pop(0.5)
if widget
puts "#{thread} consuming #{widget}"
else
puts "Timed out waiting for a widget"
end
end
end
Thread.abort_on_exception = true
threads = (1..3).map do |n|
Thread.new do consume("thread #{n}", q) end
end
producer.join
threads.each(&:join)
$ ruby timed_queue.rb Timed out waiting for a widgetTimed out waiting for a widget Timed out waiting for a widget producing widget1 thread 2 consuming widget1 Timed out waiting for a widget Timed out waiting for a widget Timed out waiting for a widget Timed out waiting for a widget Timed out waiting for a widget producing widget2 thread 2 consuming widget2
One of the most common reasons to loop with a timeout like this is as a way to facilitate clean thread shutdown. Right now, our threads just loop until we hit Control-C, which immediately terminates them in the middle of whatever they were doing. Let's change this. We'll introduce a variable called $shutdown
. It will default to false
. Then we'll trap the interrupt triggered by Control-C, and handle it by setting the shutdown
variable to true
.
Inside each thread, we modify the loops to loop until shutdown
is true. Once the their main loop ends, the threads log the fact that they are shutting down before finishing. Note that in the consumer threads, if we had no #pop
timeout and the queue was empty, the thread might wait forever to pop an item even though the producer had already shut down. It's the presence of the timeout which enables the consumer thread to periodically stop waiting on the queue and do housekeeping - such as checking to see if the $shutdown
flag has been set.
require "thread"
class MyQueue
def initialize
@lock = Mutex.new
@items = []
@item_available = ConditionVariable.new
end
def push(obj)
@items.push(obj)
@item_available.signal
end
def pop(timeout = :never)
cv_timeout = timeout == :never ? nil : timeout
@lock.synchronize do
@item_available.wait(@lock, cv_timeout) if @items.empty?
if @items.any?
@items.shift
else
nil
end
end
end
end
q = MyQueue.new
$shutdown = false
trap("INT") do
$shutdown = true
end
producer = Thread.new do
i = 0
until($shutdown) do
sleep 1
widget = "widget#{i+=1}"
puts "producing #{widget}"
q.push(widget)
end
puts "Producer shutting down"
end
def consume(thread, q)
until($shutdown) do
widget = q.pop(0.5)
if widget
puts "#{thread} consuming #{widget}"
else
puts "Timed out waiting for a widget"
end
end
puts "Consumer #{thread} shutting down"
end
Thread.abort_on_exception = true
threads = (1..3).map do |n|
Thread.new do consume("thread #{n}", q) end
end
producer.join
threads.each(&:join)
Let's run this code and see what happens. After a few seconds we hit Control-C. This time, instead of instantly exiting with a stack trace, we see the threads cleanly shutting down.
$ ruby shutdown.rb Timed out waiting for a widgetTimed out waiting for a widget Timed out waiting for a widget producing widget1 thread 3 consuming widget1 Timed out waiting for a widget Timed out waiting for a widget Timed out waiting for a widget Timed out waiting for a widgetTimed out waiting for a widget ^Cproducing widget2 Producer shutting downthread 3 consuming widget2 Consumer thread 3 shutting down Timed out waiting for a widget Consumer thread 1 shutting downTimed out waiting for a widget Consumer thread 2 shutting down
Before we finish, let's make one more change to our queue class. Right now, our queue has a subtle limitation: if the queue can legitimately contain nil
values, there is no way to tell the difference between a successful queue pop and a queue pop that timed out.
Now, I want to stop right here and say that if your application is passing around nil
values from one thread to another, you should probably reconsider your design. But be that as it may, when we're coding a utility class it's not our job to impose limits on how applications make use of it.
Instead, let's take inspiration from the #fetch
method that we've discussed in previous episodes. We'll enable #pop
to take an optional block, which will determine the policy for handling timeouts. We'll default this policy to one that simply returns nil. And we'll replace the line that currently returns nil
with one that executes the timeout policy.
require "thread"
class MyQueue
def initialize
@lock = Mutex.new
@items = []
@item_available = ConditionVariable.new
end
def push(obj)
@items.push(obj)
@item_available.signal
end
def pop(timeout = :never, &timeout_policy)
cv_timeout = timeout == :never ? nil : timeout
timeout_policy ||= ->{nil}
@lock.synchronize do
@item_available.wait(@lock, cv_timeout) if @items.empty?
if @items.any?
@items.shift
else
timeout_policy.call
end
end
end
end
q = MyQueue.new
$shutdown = false
trap("INT") do
$shutdown = true
end
producer = Thread.new do
i = 0
until($shutdown) do
sleep 1
widget = "widget#{i+=1}"
puts "producing #{widget}"
q.push(widget)
end
puts "Producer shutting down"
end
def consume(thread, q)
until($shutdown) do
widget = q.pop(0.5)
if widget
puts "#{thread} consuming #{widget}"
else
puts "Timed out waiting for a widget"
end
end
puts "Consumer #{thread} shutting down"
end
Thread.abort_on_exception = true
threads = (1..3).map do |n|
Thread.new do consume("thread #{n}", q) end
end
producer.join
threads.each(&:join)
This immediately gives client code a great deal of flexibility. If the code wants to use a special placeholder value to represent a timeout, it can.
widget = q.pop(0.5){:timed_out}
if widget != :timed_out
puts "#{thread} consuming #{widget}"
else
puts "Timed out waiting for a widget"
end
Or let's say we want to end the consumer thread in the case of a timeout. To do that we can simply use the block passed to #pop
to break out of the loop.
require "thread"
class MyQueue
def initialize
@lock = Mutex.new
@items = []
@item_available = ConditionVariable.new
end
def push(obj)
@items.push(obj)
@item_available.signal
end
def pop(timeout = :never, &timeout_policy)
cv_timeout = timeout == :never ? nil : timeout
timeout_policy ||= ->{nil}
@lock.synchronize do
@item_available.wait(@lock, cv_timeout) if @items.empty?
if @items.any?
@items.shift
else
timeout_policy.call
end
end
end
end
q = MyQueue.new
$shutdown = false
trap("INT") do
$shutdown = true
end
producer = Thread.new do
i = 0
until($shutdown) do
sleep 1
widget = "widget#{i+=1}"
puts "producing #{widget}"
q.push(widget)
end
puts "Producer shutting down"
end
def consume(thread, q)
until($shutdown) do
widget = q.pop(0.5) do
puts "#{thread} timed out; shutting down"
return
end
puts "#{thread} consuming #{widget}"
end
puts "Consumer #{thread} shutting down"
end
Thread.abort_on_exception = true
threads = (1..3).map do |n|
Thread.new do consume("thread #{n}", q) end
end
producer.join
threads.each(&:join)
Let's run this version and see what happens.
$ ruby break_on_timeout.rb thread 1 timed out; shutting down thread 2 timed out; shutting down thread 3 timed out; shutting down producing widget1 producing widget2 producing widget3 producing widget4 producing widget5 ^Cproducing widget6 Producer shutting down
As we can see, all the consumers die out as the slow producer can't give them widgets fast enough.
We've now implemented a simple timed queue. In the next episode, we'll apply timeouts to a size-constrained queue. Until then, happy hacking!
Responses