A re-entrant thread-safe resource pool that generates new resources on demand. @private
Creates a new resource pool. @param [Proc, call] open a callable which allocates a new object for the
pool
@param [Proc, call] close a callable which is called with an
object before it is freed.
# File lib/innertube.rb, line 56 def initialize(open, close) @open = open @close = close @lock = Mutex.new @iterator = Mutex.new @element_released = ConditionVariable.new @pool = Set.new end
On each element of the pool, calls close(element) and removes it. @private
# File lib/innertube.rb, line 77 def clear each_element do |e| delete_element e end end
Locks each element in turn and closes/deletes elements for which the object passes the block. @yield [object] a block that should determine whether an element
should be deleted from the pool
@yieldparam [Object] object the resource
# File lib/innertube.rb, line 100 def delete_if raise ArgumentError, "block required" unless block_given? each_element do |e| if yield e.object delete_element e end end end
As #each_element, but yields objects, not wrapper elements. @yield [resource] a block that will do something with each
resource in the pool
@yieldparam [Object] resource the current resource in the
iteration
# File lib/innertube.rb, line 188 def each each_element do |e| yield e.object end end
Iterate over a snapshot of the pool. Yielded objects are locked for the duration of the block. This may block the current thread until elements in the snapshot are released by other threads. @yield [element] a block that will do something with each
element in the pool
@yieldparam [Element] element the current element in the
iteration
# File lib/innertube.rb, line 160 def each_element targets = @pool.to_a unlocked = [] @iterator.synchronize do until targets.empty? @lock.synchronize do @element_released.wait(@iterator) if targets.all? {|e| e.locked? } unlocked, targets = targets.partition {|e| e.unlocked? } unlocked.each {|e| e.lock } end unlocked.each do |e| begin yield e ensure e.unlock end end end end end
Populate the pool with existing, open resources. @param [Array] An array of resources.
# File lib/innertube.rb, line 67 def fill(resources) @lock.synchronize do resources.each do |r| @pool << Element.new(r) end end end
@return [Integer] the number of the resources in the pool
# File lib/innertube.rb, line 195 def size @lock.synchronize { @pool.size } end
Acquire an element of the pool. Yields the object. If all elements are claimed, it will create another one. @yield [resource] a block that will perform some action with the
element of the pool
@yieldparam [Object] resource a resource managed by the pool.
Locked for the duration of the block
@param [Proc, call] :filter a callable which receives objects and has
the opportunity to reject each in turn.
@param [Object] :default if no resources are available, use this object
instead of calling #open.
@private
# File lib/innertube.rb, line 121 def take(opts = {}) raise ArgumentError, "block required" unless block_given? result = nil element = nil opts[:filter] ||= proc {|_| true } @lock.synchronize do element = @pool.find { |e| e.unlocked? && opts[:filter].call(e.object) } unless element # No objects were acceptable resource = opts[:default] || @open.call element = Element.new(resource) @pool << element end element.lock end begin result = yield element.object rescue BadResource delete_element element raise ensure # Unlock if element element.unlock @element_released.signal end end result end
Deletes an element of the pool. Calls the close callback on its object. Not intended for external use. @param [Element] e the element to remove from the pool
# File lib/innertube.rb, line 87 def delete_element(e) @close.call(e.object) @lock.synchronize do @pool.delete e end end