In Progress
Unit 1, Lesson 21
In Progress

Mutex

Video transcript & code

I've decided that before we talk more about concurrency patterns using threads and queues, it would be a good idea to introduce some of the lower-level building blocks that make concurrent programming in Ruby possible.

Today I want to address one of the most fundamental concepts in threading: exclusivity.

To illustrate this concept, let's build our own queue class, similar to Ruby's standard Queue library. We'll have it store its queued items internally in an array. The #push method will simply delegate to that array.

The #pop method is a little more complicated. Remember, Ruby's Queue class will cause a thread to block until there is an item available in the queue. In order to emulate that behavior, we'll write a loop that that just goes around and around so long as the queue is empty. This is what as known as a "busy wait", and it's a really inefficient, processor-intensive way to wait for something to happen. However, we haven't yet introduced the threading primitives that will help us wait more efficiently, so this will have to do.

Once an item is available, we do a little logging, and then return the item from the front of the array. Remember, this is a queue, so the word "pop" means first-in-first-out, not last-in-first-out as it is implemented on the Array class.

Let's test out our queue. We create an instance. Then we write a little producer thread. It will loop forever. On each iteration, it will pause briefly to simulate the time it takes to build a widget. Then it will generate a widget and push it onto the queue.

Next we write a method for consuming widgets. It takes the name of a thread, and a reference to the queue. Inside it loops, each time popping a widget off of the queue and logging the fact that it did so. We also put an assertion in that the widget will never be nil. Since #pop only returns when there is an item available, this seems like a reasonable assumption.

For ease of debugging we set Thread.abort_on_exception to true, as we learned about in the last episode. Then we kick off three consumer threads. Finally, we wait for the consumer threads to finish.

class MyQueue
  def initialize
    @items = []
  end

  def push(obj)
    @items.push(obj)
  end

  def pop
    while(@items.empty?)
      # do nothing
    end
    puts "Pop goes the weasel!"
    @items.shift
  end
end

q = MyQueue.new

producer = Thread.new do
  i = 0
  loop do
    sleep 0.1
    widget = "widget#{i+=1}"
    puts "producing #{widget}"
    q.push(widget)
  end
end

def consume(thread, q)
  loop do
    widget = q.pop
    if widget
      puts "#{thread} consuming #{widget}"
    else
      puts "FATAL: widget should never be nil"
      exit!(1)
    end
  end
end

Thread.abort_on_exception = true

threads = (1..3).map do |n|
  Thread.new do consume("thread #{n}", q) end
end

threads.each(&:join)

Let's run this code. It runs for a moment or two… and then terminates! We can see from the error message that our assertion #pop would always return a widget was somehow violated.

$ ruby naive_queue.rb 
producing widget1
Pop goes the weasel!
thread 3 consuming widget1
Pop goes the weasel!
producing widget2FATAL: widget should never be nil

So how did this happen? Let's take a closer look at the #pop code. When a new item is pushed onto an empty queue, the busy-wait loop ends. Then it outputs some logging info, and returns the first item from the array.

But remember, there is more than one consumer thread. And while those three things are happening in one thread, another thread is also discovering that the queue is no longer empty, logging, and then grabbing the first element. And since these actions are concurrent, that other thread might get to the part where it shifts an element off of the array before this one does. In which case, this thread will be sending #shift to an empty array, and returning nil.

This is what's called a critical section. It's a segment of code that can't afford to have multiple threads executing it at once. Once one thread discovers there's an item available, it needs to be assured that it will be the only thread executing this code until it reaches the end of the section. In other words, it needs exclusivity.

The easiest way to guarantee exclusivity is using Thread.exclusive. When we wrap these around the code, it guarantees that only one thread can execute that section of code at once.

class MyQueue
  def initialize
    @items = []
  end

  def push(obj)
    @items.push(obj)
  end

  def pop
    Thread.exclusive do
      while(@items.empty?)
        # do nothing
      end
      puts "Pop goes the weasel!"
      @items.shift
    end
  end
end

q = MyQueue.new

producer = Thread.new do
  i = 0
  loop do
    sleep 0.1
    widget = "widget#{i+=1}"
    puts "producing #{widget}"
    q.push(widget)
  end
end

def consume(thread, q)
  loop do
    widget = q.pop
    if widget
      puts "#{thread} consuming #{widget}"
    else
      puts "FATAL: widget should never be nil"
      exit!(1)
    end
  end
end

Thread.abort_on_exception = true

threads = (1..3).map do |n|
  Thread.new do consume("thread #{n}", q) end
end

threads.each(&:join)

We can run this again and see that we no longer get a failure. Instead, we see the consumers steadily consuming widgets from the producer.

Thread.exclusive is easy, but it's also a bit heavy-handed. The problem with Thread.exclusive is that while any thread is within a Thread.exclusive block, no other exclusive blocks anywhere in the program can be executed. This isn't a big deal in our program, because we only have one exclusive block and only one queue. But in larger programs, we could easily be interfering with completely unrelated code. And we might not even know it, because Thread.exclusive is used internally by Ruby's libraries. Not to mention that if we had multiple queues in our program, threads interacting with all of those queues would be held up anytime one thread entered one of the queues' #pop method.

So Thread.exclusive is probably best left as an implementation detail of Ruby's internal libraries. What we need is an exclusion mechanism specific to an individual queue object.

For this, we turn to a mutex. "Mutex" is short for "mutual exclusion", and that's exactly what a mutex does: it gives us a way to give threads exclusive access to segments of code in a very granular way.

We add a Mutex named @lock to our queue. Where we had previously used Thread.exclusive, we change the code to use @lock.synchronize.

class MyQueue
  def initialize
    @lock  = Mutex.new
    @items = []
  end

  def push(obj)
    @items.push(obj)
  end

  def pop
    @lock.synchronize do
      while(@items.empty?)
        # do nothing
      end
      puts "Pop goes the weasel!"
      @items.shift
    end
  end
end

q = MyQueue.new

producer = Thread.new do
  i = 0
  loop do
    sleep 0.1
    widget = "widget#{i+=1}"
    puts "producing #{widget}"
    q.push(widget)
  end
end

def consume(thread, q)
  loop do
    widget = q.pop
    if widget
      puts "#{thread} consuming #{widget}"
    else
      puts "FATAL: widget should never be nil"
      exit!(1)
    end
  end
end

Thread.abort_on_exception = true

threads = (1..3).map do |n|
  Thread.new do consume("thread #{n}", q) end
end

threads.each(&:join)

And that's it! We can run the code again, and see that it still ticks along happily producing and consuming without any failures.

Now that you've seen what a mutex can do, I'll tell you a little secret: in Ruby 2.0, all that Thread.exclusive does is call #synchronize on a single, global Mutex object.

The Mutex is one of the basic building blocks of multithreaded code. It's the simplest possible way to keep multiple threads from interfering with each other as they interact with a shared resource—in this case, a queue. In the next episode we'll look at another fundamental concurrency primitive, the "condition variable". Until then, happy hacking!

Responses