Mutex
Video transcript & code
I've decided that before we talk more about concurrency patterns using threads and queues, it would be a good idea to introduce some of the lower-level building blocks that make concurrent programming in Ruby possible.
Today I want to address one of the most fundamental concepts in threading: exclusivity.
To illustrate this concept, let's build our own queue class, similar to Ruby's standard Queue library. We'll have it store its queued items internally in an array. The #push
method will simply delegate to that array.
The #pop
method is a little more complicated. Remember, Ruby's Queue class will cause a thread to block until there is an item available in the queue. In order to emulate that behavior, we'll write a loop that that just goes around and around so long as the queue is empty. This is what as known as a "busy wait", and it's a really inefficient, processor-intensive way to wait for something to happen. However, we haven't yet introduced the threading primitives that will help us wait more efficiently, so this will have to do.
Once an item is available, we do a little logging, and then return the item from the front of the array. Remember, this is a queue, so the word "pop" means first-in-first-out, not last-in-first-out as it is implemented on the Array
class.
Let's test out our queue. We create an instance. Then we write a little producer thread. It will loop forever. On each iteration, it will pause briefly to simulate the time it takes to build a widget. Then it will generate a widget and push it onto the queue.
Next we write a method for consuming widgets. It takes the name of a thread, and a reference to the queue. Inside it loops, each time popping a widget off of the queue and logging the fact that it did so. We also put an assertion in that the widget will never be nil
. Since #pop
only returns when there is an item available, this seems like a reasonable assumption.
For ease of debugging we set Thread.abort_on_exception
to true
, as we learned about in the last episode. Then we kick off three consumer threads. Finally, we wait for the consumer threads to finish.
class MyQueue
def initialize
@items = []
end
def push(obj)
@items.push(obj)
end
def pop
while(@items.empty?)
# do nothing
end
puts "Pop goes the weasel!"
@items.shift
end
end
q = MyQueue.new
producer = Thread.new do
i = 0
loop do
sleep 0.1
widget = "widget#{i+=1}"
puts "producing #{widget}"
q.push(widget)
end
end
def consume(thread, q)
loop do
widget = q.pop
if widget
puts "#{thread} consuming #{widget}"
else
puts "FATAL: widget should never be nil"
exit!(1)
end
end
end
Thread.abort_on_exception = true
threads = (1..3).map do |n|
Thread.new do consume("thread #{n}", q) end
end
threads.each(&:join)
Let's run this code. It runs for a moment or two… and then terminates! We can see from the error message that our assertion #pop
would always return a widget was somehow violated.
$ ruby naive_queue.rb producing widget1 Pop goes the weasel! thread 3 consuming widget1 Pop goes the weasel! producing widget2FATAL: widget should never be nil
So how did this happen? Let's take a closer look at the #pop
code. When a new item is pushed onto an empty queue, the busy-wait loop ends. Then it outputs some logging info, and returns the first item from the array.
But remember, there is more than one consumer thread. And while those three things are happening in one thread, another thread is also discovering that the queue is no longer empty, logging, and then grabbing the first element. And since these actions are concurrent, that other thread might get to the part where it shifts an element off of the array before this one does. In which case, this thread will be sending #shift
to an empty array, and returning nil
.
This is what's called a critical section. It's a segment of code that can't afford to have multiple threads executing it at once. Once one thread discovers there's an item available, it needs to be assured that it will be the only thread executing this code until it reaches the end of the section. In other words, it needs exclusivity.
The easiest way to guarantee exclusivity is using Thread.exclusive
. When we wrap these around the code, it guarantees that only one thread can execute that section of code at once.
class MyQueue
def initialize
@items = []
end
def push(obj)
@items.push(obj)
end
def pop
Thread.exclusive do
while(@items.empty?)
# do nothing
end
puts "Pop goes the weasel!"
@items.shift
end
end
end
q = MyQueue.new
producer = Thread.new do
i = 0
loop do
sleep 0.1
widget = "widget#{i+=1}"
puts "producing #{widget}"
q.push(widget)
end
end
def consume(thread, q)
loop do
widget = q.pop
if widget
puts "#{thread} consuming #{widget}"
else
puts "FATAL: widget should never be nil"
exit!(1)
end
end
end
Thread.abort_on_exception = true
threads = (1..3).map do |n|
Thread.new do consume("thread #{n}", q) end
end
threads.each(&:join)
We can run this again and see that we no longer get a failure. Instead, we see the consumers steadily consuming widgets from the producer.
Thread.exclusive
is easy, but it's also a bit heavy-handed. The problem with Thread.exclusive
is that while any thread is within a Thread.exclusive
block, no other exclusive blocks anywhere in the program can be executed. This isn't a big deal in our program, because we only have one exclusive block and only one queue. But in larger programs, we could easily be interfering with completely unrelated code. And we might not even know it, because Thread.exclusive
is used internally by Ruby's libraries. Not to mention that if we had multiple queues in our program, threads interacting with all of those queues would be held up anytime one thread entered one of the queues' #pop
method.
So Thread.exclusive
is probably best left as an implementation detail of Ruby's internal libraries. What we need is an exclusion mechanism specific to an individual queue object.
For this, we turn to a mutex. "Mutex" is short for "mutual exclusion", and that's exactly what a mutex does: it gives us a way to give threads exclusive access to segments of code in a very granular way.
We add a Mutex named @lock
to our queue. Where we had previously used Thread.exclusive
, we change the code to use @lock.synchronize
.
class MyQueue
def initialize
@lock = Mutex.new
@items = []
end
def push(obj)
@items.push(obj)
end
def pop
@lock.synchronize do
while(@items.empty?)
# do nothing
end
puts "Pop goes the weasel!"
@items.shift
end
end
end
q = MyQueue.new
producer = Thread.new do
i = 0
loop do
sleep 0.1
widget = "widget#{i+=1}"
puts "producing #{widget}"
q.push(widget)
end
end
def consume(thread, q)
loop do
widget = q.pop
if widget
puts "#{thread} consuming #{widget}"
else
puts "FATAL: widget should never be nil"
exit!(1)
end
end
end
Thread.abort_on_exception = true
threads = (1..3).map do |n|
Thread.new do consume("thread #{n}", q) end
end
threads.each(&:join)
And that's it! We can run the code again, and see that it still ticks along happily producing and consuming without any failures.
Now that you've seen what a mutex can do, I'll tell you a little secret: in Ruby 2.0, all that Thread.exclusive
does is call #synchronize
on a single, global Mutex
object.
The Mutex
is one of the basic building blocks of multithreaded code. It's the simplest possible way to keep multiple threads from interfering with each other as they interact with a shared resource—in this case, a queue. In the next episode we'll look at another fundamental concurrency primitive, the "condition variable". Until then, happy hacking!
Responses