In Progress
Unit 1, Lesson 21
In Progress

Lockstep Testing

Video transcript & code

In episode 154, we began adding tests to the Tapas::Queue class using a "checkpointing" technique. We kicked off a consumer thread which would attempt to pop an item off of an empty queue. We anticipated that this would cause the thread to block, so we then waited until the thread went to sleep. Once it was asleep, we pushed an item onto the queue and verified that the thread finished and returned the successfully popped item.

specify "waiting for an item" do
  q = Queue.new
  consumer = Thread.new do
    q.pop
  end
  wait_for { consumer.status == "sleep" }
  q.push "hello"
  expect(consumer.value).to eq("hello")
end

Using this method of testing is very black-box. Or more accurately, it's a bit like putting our hands inside thick gloves and then sticking them inside a black box and trying to guess what we are touching. We know that the consumer thread should block when it tries to pop an item, so we assume that once it blocks it is trying to pop. But it could be blocking for some other reason, and we would have no way of knowing.

Black box testing is fine for high-level acceptance tests. But this feels wrong for detailed unit tests. Whenever we've talked about unit testing, we've stressed the importance of separating our code's logic from how the code interacts with the underlying system. It would be nice if we could find a way to do that when it comes to multithreaded code as well.

We'll start the way we usually do when separating logic from system interfaces: by creating adapter classes to establish a level of indirection between our code and the rest of the system.

One way our queue code relies on the system is in its use of mutexes to protect critical sections. To detach our code from its direct dependence on Ruby mutexes, we add an adapter class called Lock. It has a single method, #synchronize, which it delegates to an internal Mutex object. We then update the Queue class to be able to accept an injected lock collaborator. We make this collaborator default to an instance of the Lock adapter class.

class Lock
  attr_reader :mutex

  def initialize
    @mutex = Mutex.new
  end

  def synchronize(&block)
    @mutex.synchronize(&block)
  end
end

class Queue
  def initialize(max_size = :infinite, options={})
    @items           = []
    @max_size        = max_size
    @lock            = options.fetch(:lock) { Lock.new }
    @space_available = ConditionVariable.new
    @item_available  = ConditionVariable.new
  end
  # ...
end

Next, we tackle the ConditionVariable objects our code uses. Once again, we construct an adapter class we call Condition. This class requires a lock object to operate. It exposes two methods, #wait and #signal. Again, these methods delegate directly to an underlying ConditionVariable object.

We update the Queue initialization to accept optional injected collaborators for the @space_available and @item_available conditions. As before, we make these variables default to instances of the new adapter class instead of the core ConditionVariable class.

class Condition
  def initialize(lock)
    @lock = lock
    @cv   = ConditionVariable.new
  end

  def wait(timeout=nil)
    @cv.wait(@lock.mutex, timeout)
  end

  def signal
    @cv.signal
  end
end

class Queue
  def initialize(max_size = :infinite, options={})
    @items           = []
    @max_size        = max_size
    @lock            = options.fetch(:lock) { Lock.new }
    @space_available = options.fetch(:space_available_condition) {
      Condition.new(@lock)
    }
    @item_available  = options.fetch(:item_available_condition) {
      Condition.new(@lock)
    }
  end
  # ...
end

We need to make one more modification. Unlike the system ConditionVariable class, our Condition adapters remember the lock object they should use. So we no longer need to explicitly pass the lock into calls to #wait.

# ...
if !condition_predicate.call && cv_timeout.to_f >= 0
  cv.wait(cv_timeout)
end
# ...

We've now made it possible to swap out the system mutex and condition variable collaborators for our own, custom implementations when needed. As an useful side effect, our two new adapter classes give us a very clear summary of exactly which system calls the Queue class actually relies on. The Ruby ConditionVariable and Mutex classes both have many methods. But by simplifying the interface of these adapter classes down to just what the queue class actually uses, we can clearly see that the queue relies on mutex synchronization, and the condition #signal and #wait methods… and that's it.

Now that we've introduced this new level of indirection, let's write another test. This time, rather than using actual threads, we're going to use something I call "lockstep testing", using Ruby Fibers.

You might remember Fibers from episode 62. Fibers are similar to threads, but with one big difference.

Threads, by their nature, run concurrently. They may trade information back and forth, but a given thread can never know exactly what another thread is up to at any given time without carefully arranging a rendezvous using a mutex or some other thread-synchronization primitive.

Fibers, on the other hand, never run concurrently. A fiber can tell another fiber to resume, at which point the first fiber freezes, and the second fiber proceeds forwards. At some point, that second fiber can yield control back to the first fiber. The yielding fiber saves its place and halts, and the original fiber takes over.

In effect, fibers explicitly hand the CPU back and forth between themselves. Each one maintains its own stack, and remembers exactly where it left off. But only one executes at a time, and it will continue to execute until it deliberately hands the CPU over to another fiber.

I've written a tiny library to help us do "lockstep" testing with the help of Fibers. It has the unimaginative name "lockstep". Unfortunately, going over that code now would make this episode go overtime, but I recommend you take a look at it on your own. It's less than 100 lines of code to read.

In order to test our Queue using this library, we need some test double classes. We create a FakeLock which will stand in for the Lock adapter class. We implement the #synchronize as a "pass-through" method - it simply yields to the block, without doing any actual synchronization. Since we won't be testing with actual threads, we don't need to worry about thread safety.

We also create a FakeCondition class. We implement both #wait and #signal, but instead of delegating to a ConditionVariable like the real Condition adapter, these methods send SyncThread#interrupt. Using the magic of Fibers, this method will freeze execution of the current simulated thread where it is, and return control back to the test code.

We'll test that a bounded queue will block a thread from pushing a new item until space has been freed up in the queue. We start by setting up two fake threads, using the lockstep-provided SyncThread class. One is a producer, the other is a consumer. Next, we instantiate a Queue to test. We give it an upper bound of three items. Any more items than that, and pushing a new item will block until more space becomes available. We pass in instances of our FakeLock and FakeCondition. With its collaborators fully injected, we've now taken complete control of this object's interaction with the system.

We kick off the test by telling the faked producer thread to push three items onto the queue. As you may recall, pushing new items onto the queue may result in the item_available Condition being signaled. Which, with our FakeCondition implementation, means that a SyncThread#interrupt will be triggered. We add an option to the #run message to tell the producer to ignore any :signal messages that might crop up during execution. Then, we assert that the producer thread successfully ran this block to completion.

Next up, we tell the producer thread to push one more item. This time, we expect that the queue will block by sending #wait to the space_available Condition. We verify that this is in fact the case by asserting that the producer is interrupted by a space_available :wait. This corresponds to the SyncThread.interrupt call we wrote in the FakeCondition class.

Now that we know the producer is waiting for more space to be available, we have our simulated consumer pop an item off of the queue. We expect that this will trigger a space_available #signal, and we encode that expectation in the test.

We allow the consumer to finish its #pop after the signal. Then we allow the producer to resume where it left off, which as you'll recall was in the middle of a #push.

Finally, we #pop three more times in the consumer thread. We examine the results to check that we've popped off three more items, including the fourth item that caused the bounded queue to wait for more space.

class FakeCondition
  def wait(timeout)
    SyncThread.interrupt(self, :wait, timeout)
  end
  def signal
    SyncThread.interrupt(self, :signal)
  end
end

class FakeLock
  def synchronize
    yield
  end
end

specify "waiting to push" do
  producer = SyncThread.new
  consumer = SyncThread.new
  q = Queue.new(3,
    lock: FakeLock.new,
    space_available_condition: space_available = FakeCondition.new,
    item_available_condition:  item_available = FakeCondition.new)
  producer.run(ignore: [:signal]) do
    3.times do |n|
      q.push "item #{n+1}"
    end
  end
  expect(producer).to be_finished
  producer.run(ignore: [:signal]) do
    q.push "item 4"
  end
  expect(producer).to be_interrupted_by(space_available, :wait)
  consumer.run do
    q.pop
  end
  expect(consumer).to be_interrupted_by(space_available, :signal)
  consumer.finish
  expect(producer.resume(ignore: [:signal])).to be_finished
  consumer.run(ignore: [:signal]) do
    3.times.map { q.pop }
  end
  expect(consumer.last_return_value).to eq(["item 2", "item 3", "item 4"])
end

We run this test, and it passes.

This test runs through a pretty lengthy scenario, perhaps more than we should really be testing in a single unit test. But the important thing to notice here is that we've been able to test that, with known inputs, our Queue class interacts with its adapter collaborators exactly the way we expect. And we've done it without starting up any actual threads. We didn't have to wait for a thread to rendezvous at a particular point in execution, or make guesses about what state a thread was in. There is no way this test could fail due to timing issues, and we don't have to rely on timeouts to catch mistakes. Instead, our fake threads ran in perfect, controlled, lockstep. We've successfully separated testing system services, such as mutexes and condition variables, from testing the pure logic of the Queue class.

Which is exactly what we want in a unit test. As you might recall, what got us started on all this threaded testing was that we wanted to do some refactoring, but in order to do that we first needed some unit tests so that we could ensure that our refactoring didn't change the behavior at all. We now have two strong techniques for regression testing this class's behavior.

But bear in mind: testing that our logic makes the decisions we intended it to make does not prove that those were the right decisions to make. Neither our lockstep tests nor our carefully controlled checkpointing tests can give us assurance that this class is using threading primitives in a way that will guarantee consistency in the presence of multiple threads. For that, we'll need a different strategy.

But we can leave that to another day. For now, we'll continue to expand our unit tests in preparation for the upcoming refactoring. Happy hacking!

Responses