In Progress
Unit 1, Lesson 21
In Progress

Refactor Tapas::Queue

Video transcript & code

Several episodes back we wrote the Tapas::Queue class, a thread-safe queue class that supports both timeouts and size limits. Since then we've been shaving a whole barnyard full of yaks in order to be able to safely refactor that class. Finally, we have a couple of tests in place which, while not comprehensive, should at least give us a sanity check as we change the implementation of Tapas::Queue.

require "spec_helper"
require "tapas/queue"
require "timeout"
require "lockstep"

include Lockstep

module Tapas
  describe Queue do
    specify "waiting for an item" do
      q = Queue.new
      consumer = Thread.new do
        q.pop
      end
      wait_for { consumer.status == "sleep" }
      q.push "hello"
      expect(consumer.value).to eq("hello")
    end

    class FakeCondition
      def wait(timeout)
        SyncThread.interrupt(self, :wait, timeout)
      end
      def signal
        SyncThread.interrupt(self, :signal)
      end
    end

    class FakeLock
      def synchronize
        yield
      end
    end

    specify "waiting to push" do
      producer = SyncThread.new
      consumer = SyncThread.new
      q = Queue.new(3,
        lock: FakeLock.new,
        space_available_condition: space_available = FakeCondition.new,
        item_available_condition:  item_available = FakeCondition.new)
      producer.run(ignore: [:signal]) do
        3.times do |n|
          q.push "item #{n+1}"
        end
      end
      expect(producer).to be_finished
      producer.run(ignore: [:signal]) do
        q.push "item 4"
      end
      expect(producer).to be_interrupted_by(space_available, :wait)
      consumer.run do
        q.pop
      end
      expect(consumer).to be_interrupted_by(space_available, :signal)
      consumer.finish
      expect(producer.resume(ignore: [:signal])).to be_finished
      consumer.run(ignore: [:signal]) do
        3.times.map { q.pop }
      end
      expect(consumer.last_return_value).to eq(["item 2", "item 3", "item 4"])
    end

    def wait_for
      Timeout.timeout 1 do
        sleep 0.001 until yield
      end
    end
  end
end

Within the Tapas::Queue class, we have two calls to a method called #wait_for_condition. We've put all the logic necessary to robustly wait on a condition variable, including the ability to specify a timeout, into this method. Each time it is called, it requires four arguments: the condition variable to wait on; a callable object used to determine if the condition holds, a timeout, and a callable policy that specifies what to do in case of a timeout. In addition, it takes a block which specifies what to do if and when the condition becomes true.

class Queue
  def initialize(max_size = :infinite, options={})
    @items           = []
    @max_size        = max_size
    @lock            = options.fetch(:lock) { Lock.new }
    @space_available = options.fetch(:space_available_condition) {
      Condition.new(@lock)
    }
    @item_available  = options.fetch(:item_available_condition) {
      Condition.new(@lock)
    }
  end

  def push(obj, timeout=:never, &timeout_policy)
    timeout_policy ||= -> do
      raise "Push timed out"
    end
    wait_for_condition(
      @space_available,
      ->{!full?},
      timeout,
      timeout_policy) do

      @items.push(obj)
      @item_available.signal
    end
  end

  def pop(timeout = :never, &timeout_policy)
    timeout_policy ||= ->{nil}
    wait_for_condition(
      @item_available,
      ->{@items.any?},
      timeout,
      timeout_policy) do

      item = @items.shift
      @space_available.signal unless full?
      item
    end
  end

  private

  def full?
    return false if @max_size == :infinite
    @max_size <= @items.size
  end

  def wait_for_condition(
      cv, condition_predicate, timeout=:never, timeout_policy=->{nil})
    deadline = timeout == :never ? :never : Time.now + timeout
    @lock.synchronize do
      loop do
        cv_timeout = timeout == :never ? nil : deadline - Time.now
        if !condition_predicate.call && cv_timeout.to_f >= 0
          cv.wait(cv_timeout)
        end
        if condition_predicate.call
          return yield
        elsif deadline == :never || deadline > Time.now
          next
        else
          return timeout_policy.call
        end
      end
    end
  end
end

These calls have been bothering me. Counting the block they involve 5 arguments, which is about two more than I'm usually comfortable with. Now that we have a good understanding of the logical conditions that this code may need to wait on, I'd like to capture this knowledge in a more formal, decomposed way. I'm also interested in taking this #wait_for_condition logic and generalizing it so that it can be used for more than just queues.

So today we're going to refactor. And we're going to try our best to do it right, which means making many very small, incremental changes, each of which leaves the code still passing its tests.

We start by copying the #wait_for_condition method into a new class called SpaceAvailableCondition. We also give this class a constructor which saves references to the queue, a mutex lock, and a condition variable. However, for the time being, we do not change the wait_for_condition method to use all of these instance variables. We leave it exactly as we found it.

class SpaceAvailableCondition
  def initialize(queue, lock, condition)
    @queue     = queue
    @lock      = lock
    @condition = condition
  end

  def wait_for_condition(
      cv, condition_predicate, timeout=:never, timeout_policy=->{nil})
    deadline = timeout == :never ? :never : Time.now + timeout
    @lock.synchronize do
      loop do
        cv_timeout = timeout == :never ? nil : deadline - Time.now
        if !condition_predicate.call && cv_timeout.to_f >= 0
          cv.wait(cv_timeout)
        end
        if condition_predicate.call
          return yield
        elsif deadline == :never || deadline > Time.now
          next
        else
          return timeout_policy.call
        end
      end
    end
  end
end

We update the Queue initializer to make an instance of this new class, passing in itself, its @lock, and the @space_available condition variable.

@space_available = options.fetch(:space_available_condition) {
  Condition.new(@lock)
}
@space_available_condition = SpaceAvailableCondition.new(
  self, @lock, @space_available)

We update the call to #wait_for_condition in the #push method to be a sent to the @space_available_condition instead of self. Then we run the tests and verify they still pass.

@space_available_condition.wait_for_condition(
  @space_available,
  ->{!full?},
  timeout,
  timeout_policy) do

Next we look to see where else the @space_available variable is used. We find where we send it the #signal message, and change the receiver to be our new @space_available_condition. This means that we need to provide a #signal method on our new class, which we do by delegating the call to the @condition collaborator. We also expose this collaborator with a private reader method, for internal use.

The tests still pass after this change.

def pop(timeout = :never, &timeout_policy)
  # ...
  @space_available_condition.signal unless full?
  # ...
end
class Queue
  class SpaceAvailableCondition
    # ...

    def signal
      condition.signal
    end

    # ...

    private

    attr_reader :condition
  end

So far we've made no changes to the copied #wait_for_condition method. But since our logical condition class has its own internal reference to the lower-level condition variable, passing the condition variable to the message is redundant. We remove the argument from the invocation as well as the definition. And we update the implementation to use the internal condition reference.

Our tests still pass.

def push(obj, timeout=:never, &timeout_policy)
  # ...
  @space_available_condition.wait_for_condition(
    ->{!full?},
    timeout,
    timeout_policy) do
    # ...
  end
end
def wait_for_condition(
    condition_predicate, timeout=:never, timeout_policy=->{nil})
  deadline = timeout == :never ? :never : Time.now + timeout
  @lock.synchronize do
    loop do
      cv_timeout = timeout == :never ? nil : deadline - Time.now
      if !condition_predicate.call && cv_timeout.to_f >= 0
        condition.wait(cv_timeout)
      end
      if condition_predicate.call
        return yield
      elsif deadline == :never || deadline > Time.now
        next
      else
        return timeout_policy.call
      end
    end
  end
end

The next argument to #wait_for_condition is a lambda that serves as a predicate to determine if condition presently holds. This argument is the next to go. We remove it from the send and from the definition. In place of calls to the condition_predicate, we put sends to a #condition_holds? predicate method. We implement this predicate with logic to determine if there is, in fact, space available in the queue. In order to make this work, we have to expose a private queue reader, as well as making the Queue#full? predicate public.

Our tests continue to pass.

def push(obj, timeout=:never, &timeout_policy)
  # ...
  @space_available_condition.wait_for_condition(timeout, timeout_policy) do
    # ...
  end
end
def wait_for_condition(
    timeout=:never, timeout_policy=->{nil})
  deadline = timeout == :never ? :never : Time.now + timeout
  @lock.synchronize do
    loop do
      cv_timeout = timeout == :never ? nil : deadline - Time.now
      if !condition_holds? && cv_timeout.to_f >= 0
        condition.wait(cv_timeout)
      end
      if condition_holds?
        return yield
      elsif deadline == :never || deadline > Time.now
        next
      else
        return timeout_policy.call
      end
    end
  end
end
class SpaceAvailableCondition
  # ...
  private

  def condition_holds?
    !queue.full?
  end

  attr_reader :queue, :condition
end
class Queue
  # ...
  def full?
    return false if @max_size == :infinite
    @max_size <= @items.size
  end

  private
  # ...
end

Now that we've slimmed down the method signature, we take a look at the message name itself. Since we are sending the message to a SpaceAvailableCondition, the method name is now a bit redundant. We shorten it to simply #wait.

We run the tests again.

def wait(timeout=:never, timeout_policy=->{nil})

# ...

@space_available_condition.wait(timeout, timeout_policy) do

We've now updated all the Queue code that previously dealt with the Condition adapter class to deal with our SpaceAvailableCondition instead. We no longer have need of an explicit reference to the @space_available instance variable, so get rid of it and move the acquisition of that object inside the SpaceAvailableCondition instantiation.

@space_available_condition = SpaceAvailableCondition.new(
  self,
  @lock,
  options.fetch(:space_available_condition) {Condition.new(@lock)} )

Next up, we set our sights on the other invocation of #wait_for_condition, which waits for items to be available. We duplicate our SpaceAvailableCondition, and rename the duplicate to ItemAvailableCondition. We change the #condition_holds? predicate to check if the queue is not empty.

We add an instance of this class to the queue, and replace the uses of the old @item_available condition variable with uses of the replacement. We also have to add the #empty? predicate referenced in the new condition class.

Again, we check that the tests still pass.

class ItemAvailableCondition
  def initialize(queue, lock, condition)
    @queue     = queue
    @lock      = lock
    @condition = condition
  end

  def signal
    condition.signal
  end

  def wait(timeout=:never, timeout_policy=->{nil})
    deadline = timeout == :never ? :never : Time.now + timeout
    @lock.synchronize do
      loop do
        cv_timeout = timeout == :never ? nil : deadline - Time.now
        if !condition_holds? && cv_timeout.to_f >= 0
          condition.wait(cv_timeout)
        end
        if condition_holds?
          return yield
        elsif deadline == :never || deadline > Time.now
          next
        else
          return timeout_policy.call
        end
      end
    end
  end

  private

  def condition_holds?
    !queue.empty?
  end

  attr_reader :queue, :condition
end
class Queue
  # ...
  def initialize(max_size = :infinite, options={})
    @items           = []
    @max_size        = max_size
    @lock            = options.fetch(:lock) { Lock.new }
    @space_available_condition = SpaceAvailableCondition.new(
      self,
      @lock,
      options.fetch(:space_available_condition) {Condition.new(@lock)} )
    @item_available_condition = ItemAvailableCondition.new(
      self,
      @lock,
      options.fetch(:item_available_condition) {Condition.new(@lock)} )
  end

  def push(obj, timeout=:never, &timeout_policy)
    timeout_policy ||= -> do
      raise "Push timed out"
    end
    @space_available_condition.wait(timeout, timeout_policy) do
      @items.push(obj)
      @item_available_condition.signal
    end
  end

  def pop(timeout = :never, &timeout_policy)
    timeout_policy ||= ->{nil}
    @item_available_condition.wait(timeout, timeout_policy) do
      item = @items.shift
      @space_available_condition.signal unless full?
      item
    end
  end

  def full?
    return false if @max_size == :infinite
    @max_size <= @items.size
  end

  def empty?
    @items.empty?
  end
end

We now have two nearly identical condition classes. You can probably guess what's coming next. We pull all of the shared behavior between these two classes up into a new base class we call LogicalCondition. As it turns out, the only thing that differs between the two condition classes is their method for determining if the condition presently holds.

Once that's done, we run the tests again.

class LogicalCondition
  def initialize(queue, lock, condition)
    @queue     = queue
    @lock      = lock
    @condition = condition
  end

  def signal
    condition.signal
  end

  def wait(timeout=:never, timeout_policy=->{nil})
    deadline = timeout == :never ? :never : Time.now + timeout
    @lock.synchronize do
      loop do
        cv_timeout = timeout == :never ? nil : deadline - Time.now
        if !condition_holds? && cv_timeout.to_f >= 0
          condition.wait(cv_timeout)
        end
        if condition_holds?
          return yield
        elsif deadline == :never || deadline > Time.now
          next
        else
          return timeout_policy.call
        end
      end
    end
  end

  private

  attr_reader :queue, :condition
end
class SpaceAvailableCondition < LogicalCondition
  private
  def condition_holds?
    !queue.full?
  end
end

class ItemAvailableCondition < LogicalCondition
  private
  def condition_holds?
    !queue.empty?
  end
end

Looking at this new LogicalCondition class, we can see that its relationship to the Queue is coincidental. In fact, what we now have is a general class for dealing with any kind of logical condition a thread might care to wait on. Accordingly, we remove references to queue and replace them withe more generic "client". Then we move this class out of the Queue class and into its own file. Once more, we run the tests and see that they pass.

module Tapas
  class LogicalCondition
    def initialize(client, lock, condition)
      @client     = client
      @lock      = lock
      @condition = condition
    end

    def signal
      condition.signal
    end

    def wait(timeout=:never, timeout_policy=->{nil})
      deadline = timeout == :never ? :never : Time.now + timeout
      @lock.synchronize do
        loop do
          cv_timeout = timeout == :never ? nil : deadline - Time.now
          if !condition_holds? && cv_timeout.to_f >= 0
            condition.wait(cv_timeout)
          end
          if condition_holds?
            return yield
          elsif deadline == :never || deadline > Time.now
            next
          else
            return timeout_policy.call
          end
        end
      end
    end

    private

    attr_reader :client, :condition
  end
end

At the end of this refactoring, we've slimmed down the Queue class by extracting out the general condition-waiting logic. We've simplified the calling convention for waiting on space-available and item-available conditions. And we've given the two scenarios a more formal representation in which the logic that differentiates them—the logic for checking if their respective conditions currently holds—is front and center in its own method, rather than buried in one of five arguments.

class SpaceAvailableCondition < LogicalCondition
  private
  def condition_holds?
    !client.full?
  end
end

class ItemAvailableCondition < LogicalCondition
  private
  def condition_holds?
    !client.empty?
  end
end

Many refactoring efforts fall apart because after making a long series of changes, we realize that the code no longer works and we're not sure which change broke it. The easiest thing to do in that scenario is often just to throw all the changes away. We've avoided this situation today by making a series of tiny incremental changes, none of which broke the code. Some of the changes actually increased duplication, rather than reducing it, in order to avoid failing tests. The net result was a refactoring session in which we felt confident and in control at all times.

There are several directions we could go from here. For one thing, I think we could simplify the unit tests now, by separating the tests for Queue logic from the tests of LogicalCondition. But that's a task for another day. Happy hacking!

Responses