In Progress
Unit 1, Lesson 1
In Progress

Thread Interruptions

Video transcript & code

Recently we explored a strategy for cleanly shutting down threads by setting a $shutdown flag and having the threads periodically check the flag's state.

q = MyQueue.new(1)
$shutdown = false

trap("INT") do
  $shutdown = true
end

producer = Thread.new do
  i = 0
  until($shutdown) do
    widget = "widget#{i+=1}"
    puts "producing #{widget}"
    q.push(widget, 0.1)
    sleep 0.1
  end
  puts "Producer shutting down"
end

#...

If you take a look at the documentation for the Thread class, you might come across what appear to be some much more direct and straightforward ways to terminate a thread. There is the Thread#kill method, which immediately ends a thread. And there is the Thread#raise method, which causes a given exception to be raised within the receiving thread.

So why bother with shutdown flags if we have the ability to immediately kill or raise an exception within any thread?

To understand why these methods aren't as useful as they first appear, let's take a look at some contrived example code. This code is pretty simple: it starts a thread. The thread does some work, simulated with a sleep. Then it does some cleanup inside an ensure block, also simulated with a sleep.

Outside the method, we sleep for a second and a half, long enough for the thread to complete it's work and then get partway through cleanup. Then we send it the #raise method, forcing an exception to be raised in whatever the thread happens to be doing at the time.

Let's run this program and see what it outputs.

t = Thread.new do
  begin
    puts "BEGIN WORK"
    sleep 1
    puts "END WORK"
  ensure
    puts "BEGIN CLEANUP"
    sleep 1
    puts "END CLEANUP"
  end
end

sleep 1.5
puts "RAISE"
t.raise "Abort!"
result = t.join rescue $!
puts "Result: #{result.inspect}"

Look closely at this output. What's missing?

We can see that the thread was in the middle of cleanup when the exception was forced. There's no "END CLEANUP" in the output, because it was interrupted at that point.

The entire point of using ensure clauses in our programs is to ensure that cleanup that is necessary to retain consistency will always be performed, even if an exception is raised. We use ensure clauses to do things like close files, shutdown network connections, release memory in native extensions, or write data to a file. If an ensure block is interrupted in the middle, the results can be anywhere from anomalous to catastrophic. How catastrophic? Consider the case where a database file write is never completed, leaving a file corrupted. Or the case where an emergency "STOP" signal is never sent to a remotely piloted vehicle, leaving it moving forwards as the control program shuts down due to an error.

Because of the nature of these clauses, we generally try to write them in such a way that they can't raise an exception themselves. And that's where Thread#raise and Thread#kill become a problem: they can cause an exception to be raised, or a termination to occur, at literally any point in the target thread. And because of the inherently unpredictable nature of asynchronously executing threads, there's no way for us to predict what the target thread is doing when a #raise or #kill take effect. We've gone to great lengths in this example to cause the interrupt to occur at a specific point in execution, but in the real world we are unlikely to have anything like these predictable second-long waits.

In effect, these methods are the Kanye West of Ruby concurrency, interrupting execution at unpredictable and potentially inopportune moments. And it's not just Ruby that has this problem; the Java threading API once had methods like these, but they have since been deprecated for exactly these reasons.

So OK, you might say; no big deal, we just won't use them. Unfortunately, it's possible to introduce thread interruptions without knowing it. Remember the Timeout library? Let's take a look at how it's implemented.

def timeout(sec, klass = nil)   #:yield: +sec+
  return yield(sec) if sec == nil or sec.zero?
  message = "execution expired"
  e = Error
  bt = catch((klass||ExitException).new) do |exception|
    begin
      x = Thread.current
      y = Thread.start {
        begin
          sleep sec
        rescue => e
          x.raise e
        else
          x.raise exception, message
        end
      }
      return yield(sec)
    rescue (klass||ExitException) => e
      e.backtrace
    ensure
      if y
        y.kill
        y.join # make sure y is dead.
      end
    end
  end
  rej = /\A#{Regexp.quote(__FILE__)}:#{__LINE__-4}\z/o
  bt.reject! {|m| rej =~ m}
  level = -caller(CALLER_OFFSET).size
  while THIS_FILE =~ bt[level]
    bt.delete_at(level)
  end
  raise(e, message, bt)
end

Ruby's timeout library is designed as a generic timeout for any operation. The way it accomplishes this is by starting a special thread, having that thread sleep for the specified amount of time, and then triggering an exception to be raised in the original thread using Thread#raise.

And now at last we can begin to see why, back in episode 136, I said that the timeout library was unsafe. By raising exceptions at arbitrary points in other threads, the timeout library will nearly always introduce instability into a threaded program.

I say "nearly". And you might recall that in episode 136 I said it was possible to use timeouts safely in Ruby 2.0. The reason is this: in version 2.0, Ruby introduced a new thread method called handle_interrupt.

By way of introduction, let's use Thread.handle_interrupt in our example code. We pass a hash to it which maps from a type of error to a symbol specifying a strategy for dealing with that kind of exception. Then we nest the rest of the code inside the block passed to this method.

t = Thread.new do
  Thread.handle_interrupt(RuntimeError => :never) do
    begin
      puts "BEGIN WORK"
      sleep 1
      puts "END WORK"
    ensure
      puts "BEGIN CLEANUP"
      sleep 1
      puts "END CLEANUP"
    end
  end
end

sleep 1.5
puts "RAISE"
t.raise "Abort!"
result = t.join rescue $!
puts "Result: #{result.inspect}"

This time when we run the code we see that it reaches the end of cleanup despite the error being raised halfway through. What happened here is that Ruby saw that the exception being raised was matched by the argument we gave to .handle_interrupt. It then deferred the raising of the exception until after the block was complete. Note that it didn't simply swallow the exception. It was still raised eventually, just not within the .handle_interrupt block.

If Thread#raise is the Kanye West of threads, you can think of .handle_interrupt as a ring of beefy security guards around the stage. They know the show schedule, and they aren't going to let anyone interrupt out of turn.

When we use .handle_interrupt we can choose from three possible strategies for dealing with exceptions. There's :never, which we've seen already. Then there's :immediate, which is the same as when we don't use .handle_interrupt at all. This might seem pointless, but by nesting calls to .handle_interrupt we can set up carefully-demarcated areas in which it is OK for exceptions to be raised at any point. For instance, we could re-enable thread interrupts within the work segment of our example code by putting a second .handle_interrupt call inside the first. This ensures that we can interrupt the "work" of the method, but the cleanup section is still protected. Of course, this opens up the possibility of undefined behavior unless we are extremely careful to only call methods that we know are interrupt-safe within the "work" section.

t = Thread.new do
  Thread.handle_interrupt(RuntimeError => :never) do
    begin
      Thread.handle_interrupt(RuntimeError => :immediate) do
        puts "BEGIN WORK"
        sleep 1
        puts "END WORK"
      end
    ensure
      puts "BEGIN CLEANUP"
      sleep 1
      puts "END CLEANUP"
    end
  end
end

sleep 1.5
puts "RAISE"
t.raise "Abort!"
result = t.join rescue $!
puts "Result: #{result.inspect}"

The final strategy is called :on_blocking. :on_blocking enables exceptions to be forced into the thread, but only when it is currently blocked. Operations which may block a thread include performing I/O, sleeping, and waiting on a condition variable. This last option is nice because it enables the queue to be interrupted while it is asleep waiting for something to happen. But since there are a finite number of operations which can cause a thread to block, so long as we are careful about how we handle those operations we don't have to worry about exceptions cropping up any-old-where.

Thread.handle_interrupt(RuntimeError => :on_blocking) do ... end

Let's return to our threadsafe queue class. We've moved all of the code for waiting to either push or pop an item into the #wait_for_condition method. Let's make the behavior of this method in the face of random interrupts more predictable by wrapping the whole thing in a #handle_interrupt call. We'll use the :on_blocking strategy to limit the points at which exceptions can be raised. With this protection in place, there are only three possible points that could produce an unexpected interrupt: the call to ConditionVariable#wait, or, depending on how the calling code is written, one of the two calls to the #condition_predicate. Since this predicate is only used to check if the queue is either full or empty, this shouldn't be an issue.

None of these calls occurs in the middle of sensitive cleanup code, so this code should be safe now.

def wait_for_condition(
    cv, condition_predicate, timeout=:never, timeout_policy=->{nil})
  Thread.handle_interrupt(RuntimeError => :on_blocking) do
    deadline = timeout == :never ? :never : Time.now + timeout
    @lock.synchronize do
      loop do
        cv_timeout = timeout == :never ? nil : timeout
        if !condition_predicate.call && cv_timeout.to_f >= 0
          cv.wait(@lock, cv_timeout)
        end
        if condition_predicate.call
          return yield
        elsif deadline == :never || deadline > Time.now
          next
        else
          return timeout_policy.call
        end
      end
    end
  end
end

You might have noticed I told #handle_interrupt only to handle RuntimeError exception and its descendants. I honestly chose this subset because that's exactly what Ruby's own Queue class uses in Ruby 2.0 and later. I'm not sure why the implementors chose this rather than letting it handle all exceptions this way, and I feel like a good argument could be made for changing this to apply to all exceptions by specifying the Exception root class.

Thread.handle_interrupt(Exceptions => :on_blocking) do ... end

Now you know why I gave such dire warnings about using the timeout library several episodes ago, and you know how to defend your Ruby 2.0 code from random interruptions. Happy hacking!

Responses