Refactor Tapas::Queue
Video transcript & code
Several episodes back we wrote the Tapas::Queue
class, a thread-safe queue class that supports both timeouts and size limits. Since then we've been shaving a whole barnyard full of yaks in order to be able to safely refactor that class. Finally, we have a couple of tests in place which, while not comprehensive, should at least give us a sanity check as we change the implementation of Tapas::Queue
.
require "spec_helper"
require "tapas/queue"
require "timeout"
require "lockstep"
include Lockstep
module Tapas
describe Queue do
specify "waiting for an item" do
q = Queue.new
consumer = Thread.new do
q.pop
end
wait_for { consumer.status == "sleep" }
q.push "hello"
expect(consumer.value).to eq("hello")
end
class FakeCondition
def wait(timeout)
SyncThread.interrupt(self, :wait, timeout)
end
def signal
SyncThread.interrupt(self, :signal)
end
end
class FakeLock
def synchronize
yield
end
end
specify "waiting to push" do
producer = SyncThread.new
consumer = SyncThread.new
q = Queue.new(3,
lock: FakeLock.new,
space_available_condition: space_available = FakeCondition.new,
item_available_condition: item_available = FakeCondition.new)
producer.run(ignore: [:signal]) do
3.times do |n|
q.push "item #{n+1}"
end
end
expect(producer).to be_finished
producer.run(ignore: [:signal]) do
q.push "item 4"
end
expect(producer).to be_interrupted_by(space_available, :wait)
consumer.run do
q.pop
end
expect(consumer).to be_interrupted_by(space_available, :signal)
consumer.finish
expect(producer.resume(ignore: [:signal])).to be_finished
consumer.run(ignore: [:signal]) do
3.times.map { q.pop }
end
expect(consumer.last_return_value).to eq(["item 2", "item 3", "item 4"])
end
def wait_for
Timeout.timeout 1 do
sleep 0.001 until yield
end
end
end
end
Within the Tapas::Queue
class, we have two calls to a method called #wait_for_condition
. We've put all the logic necessary to robustly wait on a condition variable, including the ability to specify a timeout, into this method. Each time it is called, it requires four arguments: the condition variable to wait on; a callable object used to determine if the condition holds, a timeout, and a callable policy that specifies what to do in case of a timeout. In addition, it takes a block which specifies what to do if and when the condition becomes true.
class Queue
def initialize(max_size = :infinite, options={})
@items = []
@max_size = max_size
@lock = options.fetch(:lock) { Lock.new }
@space_available = options.fetch(:space_available_condition) {
Condition.new(@lock)
}
@item_available = options.fetch(:item_available_condition) {
Condition.new(@lock)
}
end
def push(obj, timeout=:never, &timeout_policy)
timeout_policy ||= -> do
raise "Push timed out"
end
wait_for_condition(
@space_available,
->{!full?},
timeout,
timeout_policy) do
@items.push(obj)
@item_available.signal
end
end
def pop(timeout = :never, &timeout_policy)
timeout_policy ||= ->{nil}
wait_for_condition(
@item_available,
->{@items.any?},
timeout,
timeout_policy) do
item = @items.shift
@space_available.signal unless full?
item
end
end
private
def full?
return false if @max_size == :infinite
@max_size <= @items.size
end
def wait_for_condition(
cv, condition_predicate, timeout=:never, timeout_policy=->{nil})
deadline = timeout == :never ? :never : Time.now + timeout
@lock.synchronize do
loop do
cv_timeout = timeout == :never ? nil : deadline - Time.now
if !condition_predicate.call && cv_timeout.to_f >= 0
cv.wait(cv_timeout)
end
if condition_predicate.call
return yield
elsif deadline == :never || deadline > Time.now
next
else
return timeout_policy.call
end
end
end
end
end
These calls have been bothering me. Counting the block they involve 5 arguments, which is about two more than I'm usually comfortable with. Now that we have a good understanding of the logical conditions that this code may need to wait on, I'd like to capture this knowledge in a more formal, decomposed way. I'm also interested in taking this #wait_for_condition
logic and generalizing it so that it can be used for more than just queues.
So today we're going to refactor. And we're going to try our best to do it right, which means making many very small, incremental changes, each of which leaves the code still passing its tests.
We start by copying the #wait_for_condition
method into a new class called SpaceAvailableCondition
. We also give this class a constructor which saves references to the queue
, a mutex lock
, and a condition
variable. However, for the time being, we do not change the wait_for_condition
method to use all of these instance variables. We leave it exactly as we found it.
class SpaceAvailableCondition
def initialize(queue, lock, condition)
@queue = queue
@lock = lock
@condition = condition
end
def wait_for_condition(
cv, condition_predicate, timeout=:never, timeout_policy=->{nil})
deadline = timeout == :never ? :never : Time.now + timeout
@lock.synchronize do
loop do
cv_timeout = timeout == :never ? nil : deadline - Time.now
if !condition_predicate.call && cv_timeout.to_f >= 0
cv.wait(cv_timeout)
end
if condition_predicate.call
return yield
elsif deadline == :never || deadline > Time.now
next
else
return timeout_policy.call
end
end
end
end
end
We update the Queue
initializer to make an instance of this new class, passing in itself, its @lock
, and the @space_available
condition variable.
@space_available = options.fetch(:space_available_condition) {
Condition.new(@lock)
}
@space_available_condition = SpaceAvailableCondition.new(
self, @lock, @space_available)
We update the call to #wait_for_condition
in the #push
method to be a sent to the @space_available_condition
instead of self. Then we run the tests and verify they still pass.
@space_available_condition.wait_for_condition(
@space_available,
->{!full?},
timeout,
timeout_policy) do
Next we look to see where else the @space_available
variable is used. We find where we send it the #signal
message, and change the receiver to be our new @space_available_condition
. This means that we need to provide a #signal
method on our new class, which we do by delegating the call to the @condition
collaborator. We also expose this collaborator with a private reader method, for internal use.
The tests still pass after this change.
def pop(timeout = :never, &timeout_policy)
# ...
@space_available_condition.signal unless full?
# ...
end
class Queue
class SpaceAvailableCondition
# ...
def signal
condition.signal
end
# ...
private
attr_reader :condition
end
So far we've made no changes to the copied #wait_for_condition
method. But since our logical condition class has its own internal reference to the lower-level condition variable, passing the condition variable to the message is redundant. We remove the argument from the invocation as well as the definition. And we update the implementation to use the internal condition
reference.
Our tests still pass.
def push(obj, timeout=:never, &timeout_policy)
# ...
@space_available_condition.wait_for_condition(
->{!full?},
timeout,
timeout_policy) do
# ...
end
end
def wait_for_condition(
condition_predicate, timeout=:never, timeout_policy=->{nil})
deadline = timeout == :never ? :never : Time.now + timeout
@lock.synchronize do
loop do
cv_timeout = timeout == :never ? nil : deadline - Time.now
if !condition_predicate.call && cv_timeout.to_f >= 0
condition.wait(cv_timeout)
end
if condition_predicate.call
return yield
elsif deadline == :never || deadline > Time.now
next
else
return timeout_policy.call
end
end
end
end
The next argument to #wait_for_condition
is a lambda that serves as a predicate to determine if condition presently holds. This argument is the next to go. We remove it from the send and from the definition. In place of calls to the condition_predicate
, we put sends to a #condition_holds?
predicate method. We implement this predicate with logic to determine if there is, in fact, space available in the queue. In order to make this work, we have to expose a private queue
reader, as well as making the Queue#full?
predicate public.
Our tests continue to pass.
def push(obj, timeout=:never, &timeout_policy)
# ...
@space_available_condition.wait_for_condition(timeout, timeout_policy) do
# ...
end
end
def wait_for_condition(
timeout=:never, timeout_policy=->{nil})
deadline = timeout == :never ? :never : Time.now + timeout
@lock.synchronize do
loop do
cv_timeout = timeout == :never ? nil : deadline - Time.now
if !condition_holds? && cv_timeout.to_f >= 0
condition.wait(cv_timeout)
end
if condition_holds?
return yield
elsif deadline == :never || deadline > Time.now
next
else
return timeout_policy.call
end
end
end
end
class SpaceAvailableCondition
# ...
private
def condition_holds?
!queue.full?
end
attr_reader :queue, :condition
end
class Queue
# ...
def full?
return false if @max_size == :infinite
@max_size <= @items.size
end
private
# ...
end
Now that we've slimmed down the method signature, we take a look at the message name itself. Since we are sending the message to a SpaceAvailableCondition
, the method name is now a bit redundant. We shorten it to simply #wait
.
We run the tests again.
def wait(timeout=:never, timeout_policy=->{nil})
# ...
@space_available_condition.wait(timeout, timeout_policy) do
We've now updated all the Queue
code that previously dealt with the Condition
adapter class to deal with our SpaceAvailableCondition
instead. We no longer have need of an explicit reference to the @space_available
instance variable, so get rid of it and move the acquisition of that object inside the SpaceAvailableCondition
instantiation.
@space_available_condition = SpaceAvailableCondition.new(
self,
@lock,
options.fetch(:space_available_condition) {Condition.new(@lock)} )
Next up, we set our sights on the other invocation of #wait_for_condition
, which waits for items to be available. We duplicate our SpaceAvailableCondition
, and rename the duplicate to ItemAvailableCondition
. We change the #condition_holds?
predicate to check if the queue is not empty.
We add an instance of this class to the queue, and replace the uses of the old @item_available
condition variable with uses of the replacement. We also have to add the #empty?
predicate referenced in the new condition class.
Again, we check that the tests still pass.
class ItemAvailableCondition
def initialize(queue, lock, condition)
@queue = queue
@lock = lock
@condition = condition
end
def signal
condition.signal
end
def wait(timeout=:never, timeout_policy=->{nil})
deadline = timeout == :never ? :never : Time.now + timeout
@lock.synchronize do
loop do
cv_timeout = timeout == :never ? nil : deadline - Time.now
if !condition_holds? && cv_timeout.to_f >= 0
condition.wait(cv_timeout)
end
if condition_holds?
return yield
elsif deadline == :never || deadline > Time.now
next
else
return timeout_policy.call
end
end
end
end
private
def condition_holds?
!queue.empty?
end
attr_reader :queue, :condition
end
class Queue
# ...
def initialize(max_size = :infinite, options={})
@items = []
@max_size = max_size
@lock = options.fetch(:lock) { Lock.new }
@space_available_condition = SpaceAvailableCondition.new(
self,
@lock,
options.fetch(:space_available_condition) {Condition.new(@lock)} )
@item_available_condition = ItemAvailableCondition.new(
self,
@lock,
options.fetch(:item_available_condition) {Condition.new(@lock)} )
end
def push(obj, timeout=:never, &timeout_policy)
timeout_policy ||= -> do
raise "Push timed out"
end
@space_available_condition.wait(timeout, timeout_policy) do
@items.push(obj)
@item_available_condition.signal
end
end
def pop(timeout = :never, &timeout_policy)
timeout_policy ||= ->{nil}
@item_available_condition.wait(timeout, timeout_policy) do
item = @items.shift
@space_available_condition.signal unless full?
item
end
end
def full?
return false if @max_size == :infinite
@max_size <= @items.size
end
def empty?
@items.empty?
end
end
We now have two nearly identical condition classes. You can probably guess what's coming next. We pull all of the shared behavior between these two classes up into a new base class we call LogicalCondition
. As it turns out, the only thing that differs between the two condition classes is their method for determining if the condition presently holds.
Once that's done, we run the tests again.
class LogicalCondition
def initialize(queue, lock, condition)
@queue = queue
@lock = lock
@condition = condition
end
def signal
condition.signal
end
def wait(timeout=:never, timeout_policy=->{nil})
deadline = timeout == :never ? :never : Time.now + timeout
@lock.synchronize do
loop do
cv_timeout = timeout == :never ? nil : deadline - Time.now
if !condition_holds? && cv_timeout.to_f >= 0
condition.wait(cv_timeout)
end
if condition_holds?
return yield
elsif deadline == :never || deadline > Time.now
next
else
return timeout_policy.call
end
end
end
end
private
attr_reader :queue, :condition
end
class SpaceAvailableCondition < LogicalCondition
private
def condition_holds?
!queue.full?
end
end
class ItemAvailableCondition < LogicalCondition
private
def condition_holds?
!queue.empty?
end
end
Looking at this new LogicalCondition
class, we can see that its relationship to the Queue
is coincidental. In fact, what we now have is a general class for dealing with any kind of logical condition a thread might care to wait on. Accordingly, we remove references to queue
and replace them withe more generic "client". Then we move this class out of the Queue
class and into its own file. Once more, we run the tests and see that they pass.
module Tapas
class LogicalCondition
def initialize(client, lock, condition)
@client = client
@lock = lock
@condition = condition
end
def signal
condition.signal
end
def wait(timeout=:never, timeout_policy=->{nil})
deadline = timeout == :never ? :never : Time.now + timeout
@lock.synchronize do
loop do
cv_timeout = timeout == :never ? nil : deadline - Time.now
if !condition_holds? && cv_timeout.to_f >= 0
condition.wait(cv_timeout)
end
if condition_holds?
return yield
elsif deadline == :never || deadline > Time.now
next
else
return timeout_policy.call
end
end
end
end
private
attr_reader :client, :condition
end
end
At the end of this refactoring, we've slimmed down the Queue
class by extracting out the general condition-waiting logic. We've simplified the calling convention for waiting on space-available and item-available conditions. And we've given the two scenarios a more formal representation in which the logic that differentiates them—the logic for checking if their respective conditions currently holds—is front and center in its own method, rather than buried in one of five arguments.
class SpaceAvailableCondition < LogicalCondition
private
def condition_holds?
!client.full?
end
end
class ItemAvailableCondition < LogicalCondition
private
def condition_holds?
!client.empty?
end
end
Many refactoring efforts fall apart because after making a long series of changes, we realize that the code no longer works and we're not sure which change broke it. The easiest thing to do in that scenario is often just to throw all the changes away. We've avoided this situation today by making a series of tiny incremental changes, none of which broke the code. Some of the changes actually increased duplication, rather than reducing it, in order to avoid failing tests. The net result was a refactoring session in which we felt confident and in control at all times.
There are several directions we could go from here. For one thing, I think we could simplify the unit tests now, by separating the tests for Queue
logic from the tests of LogicalCondition
. But that's a task for another day. Happy hacking!
Responses