In Progress
Unit 1, Lesson 21
In Progress

Queue

Video transcript & code

In most Ruby programs when we need a queue of items we just use an Array. After all, we can easily push objects into an array with the shovel operator, and then pop them off the other end with #shift.

queue = []
queue << :a << :b << :c
queue.shift                     # => :a
queue.shift                     # => :b
queue.shift                     # => :c

But when we introduce threads to the mix a raw Array alone is insufficient for communication between them. Here's an example where we queue up three objects. Then we start six threads, each of which will dequeue one object and print it. Then we allow control to pass to another thread. After that we queue up three more objects. Finally, we wait for all the threads to finish.

When we run this, we can see that only three threads pulled objects off the queue. The last three objects were never processed. This is because all of the threads grabbed an object as soon as they could, whether the queue had anything in it or not. The ones that tried to get items when the queue was empty got nil instead.

queue = []
queue << :a << :b << :c
1.upto(6) do |n|
  Thread.new do puts "Thread #{n}: Got #{queue.shift.inspect}" end
end
Thread.pass
queue << :d << :e << :f
Thread.list.reject{|t| t == Thread.main}.each(&:join)
# >> Thread 6: Got :a
# >> Thread 1: Got :b
# >> Thread 3: Got :c
# >> Thread 4: Got nil
# >> Thread 5: Got nil
# >> Thread 2: Got nil

Now let's try using the Queue class in place of an Array. We need to require the thread library in order to use it. Again, we enqueue three objects to begin with. Note that we have to do this on separate lines because the Queue shovel operator currently doesn't return self and so it can't be chained. This oversight has been corrected in ruby trunk.

Then we fire up six threads as before, and give them a chance execute by calling Thread.pass in the main thread.

At this point we can run the code and see that only three threads finish.

require 'thread'

queue = Queue.new
queue << :a 
queue << :b 
queue << :c
1.upto(6) do |n|
  Thread.new do puts "Thread #{n}: Got #{queue.shift.inspect}" end
end
Thread.pass

# >> Thread 6: Got :a
# >> Thread 3: Got :b
# >> Thread 2: Got :c

What about the other three threads? The queue is currently empty, as it will tell us if we ask. If we then ask the queue how many threads are waiting, it tells us that there are three waiting threads. This is because unlike a raw Array, when threads try to pop an element off of an empty queue the operation blocks. The thread won't continue until the queue has a new item to give it.

require 'thread'

queue = Queue.new
queue << :a 
queue << :b 
queue << :c
1.upto(6) do |n|
  Thread.new do puts "Thread #{n}: Got #{queue.shift.inspect}" end
end
Thread.pass
queue.size                      # => 0
queue.empty?                    # => true
queue.num_waiting               # => 3

# >> Thread 5: Got :a
# >> Thread 1: Got :b
# >> Thread 3: Got :c

Now let's put three more items on the queue and then wait for all threads to finish.

require 'thread'

queue = Queue.new
queue << :a 
queue << :b 
queue << :c
1.upto(6) do |n|
  Thread.new do puts "Thread #{n}: Got #{queue.shift.inspect}" end
end
Thread.pass
queue.num_waiting               # => 3
queue << :d
queue << :e
queue << :f
Thread.list.reject{|t| t == Thread.main}.each(&:join)

# >> Thread 1: Got :a
# >> Thread 2: Got :b
# >> Thread 3: Got :c
# >> Thread 5: Got :d
# >> Thread 6: Got :e
# >> Thread 4: Got :f

This time, all six threads eventually get an element and finish.

If we execute this a few times we can see that the order the threads execute in is effectively arbitrary. The queue will wake them up for a new item in the order in which they started waiting. You might think this means that threads which started earlier will get first dibs. But in practice it all boils down to how Ruby and/or the operating system schedules threads, and there's no guarantee that one thread started right before another will reach the call to queue.pop earlier than the second thread.

One other thing: queues have a number of aliases for their methods. In this example I've used the shovel operator to push new elements onto the end, and #shift to pop elements off the head of the list. This is consistent with how we might use an Array as a queue. However, if we don't like this language we have several other options.

First of all, instead of #shift and shovel we can use #push and #pop. Be aware, though, that these have different semantics than their counterparts on Array. Using #push and #pop on an array gives us a last-in, first-out stack. Whereas pushing and popping on a queue is first-in, first-out.

require 'thread'

q = Queue.new
q.push 123
q.push 456
q.pop                           # => 123
q.pop                           # => 456

a = []
a.push 123
a.push 456
a.pop                           # => 456
a.pop                           # => 123

On the other hand, if we want to make it very clear that we are working with a queue and not some other data structure, we can use the aliases #enq and #deq to put items into, and take items off of the queue respectively.

require 'thread'

q = Queue.new
q.enq 123
q.enq 456
q.deq                           # => 123
q.deq                           # => 456

Again, these methods are all just aliases for the same fundamental insertion and removal operations. Which ones you choose to use are a matter of taste.

So to recap, a Queue is similar to an Array. Like an Array we can push things in on one end and pull them off the other. Unlike an array, that's pretty much all we can do with it. And also unlike an Array, it causes any threads asking for the next item to block until one is available. When another thread pushes an item into the queue, it wakes up one of the waiting threads and returns that item.

It may not seem like it does much, but thread-aware queues are a fundamental building block of higher-level concurrent programming patterns. In the next episode, we'll apply them to an actual programming problem. Until then, happy hacking!

Responses