new algorithm that differentiates between stuck processes and long running multiple processes
parent
920043c0a3
commit
961e45bf97
17
README.md
17
README.md
|
@ -12,12 +12,13 @@ Major Features
|
|||
|
||||
* Uses your own Redis or Redis::Namespace instance
|
||||
* Uses Redis `SET` using Lua transactions based on documentation from http://redis.io/commands/SET for a simple locking pattern.
|
||||
* It can either _pessimistically_ or _optimistically_ fail. In other words, it will raise an exception if you want and ignore the timeout and continue to run otherwise.
|
||||
* It uses a fixed number of ticks (or cycles) of wait that are calculated on initialization.
|
||||
* It sleeps a random amount less than one second until such time that the timed out time has been met.
|
||||
* When many processes are struggling to get a single lock, if a different process has taken control of the lock when the timeout is met, the timeout is automatically reset. This allows differentiation between a stuck lock and locks with lots of processes struggling to get the lock.
|
||||
* It pessimistically fails by raising an exception if the timeout has been met.
|
||||
* A lock will not overwrite a value in Redis if the value was changed from the lock's "secret" token.
|
||||
* It requires that objects to be locked respond to the method `id`.
|
||||
|
||||
I take many feathers from the cap of Martin Fowler when I wrote this gem. Once initialized, variables contents never change. Methods are not longer than 10 lines. Method names are very specific yet not too long. Methods are alphabetized in the class definition (except the initializer). Tests are included.
|
||||
I took many feathers from the cap of Martin Fowler when I wrote this gem. Once initialized, variables contents never change. Methods are not longer than 10 lines. Method names are very specific yet not too long. Methods are alphabetized in the class definition (except the initializer). Tests are included.
|
||||
|
||||
Usage
|
||||
=====
|
||||
|
@ -25,7 +26,7 @@ Usage
|
|||
A number of options are available on initialization:
|
||||
|
||||
|Option|Default value|Description|
|
||||
|fail_on_timeout|`false`|Raise an exception if mutex lock is not acquired in the requested time|
|
||||
|------|-------------|-----------|
|
||||
|redis|`Redis.new` Redis connection to use|
|
||||
|ticks|`100`|Number of times to wait during the timeout period|
|
||||
|timeout|`300`|Time, in seconds, to wait until lock is considered stale|
|
||||
|
@ -44,9 +45,9 @@ This pattern works very well in Sidekiq or Resque. Also, if you need to access t
|
|||
Locking Algorithm
|
||||
=================
|
||||
|
||||
* Set value in Redis if it does not exist
|
||||
* If it exists, wait up to 100 times until :timeout has been met
|
||||
* Pessimistically throw an exception if lock was not released and configured to do so
|
||||
* Overwrite existing token with new value and assume ownership if configured to do so
|
||||
* Set value in Redis if it does not exist.
|
||||
* If it exists, wait until :timeout has been met.
|
||||
* If a different process still holds the lock, update the secret token and reset the timeout.
|
||||
* If the original process still holds the lock, throw an exception.
|
||||
* Run block of code
|
||||
* Release lock if it contains the same value that it was set to
|
||||
|
|
|
@ -12,12 +12,10 @@ class UncomplicatedMutex
|
|||
def initialize(obj, opts = {})
|
||||
@verbose = opts[:verbose]
|
||||
@timeout = opts[:timeout] || 300
|
||||
@fail_on_timeout = opts[:fail_on_timeout]
|
||||
@ticks = opts[:ticks] || 100
|
||||
@wait_tick = @timeout.to_f / @ticks.to_f
|
||||
@redis = opts[:redis] || Redis.new
|
||||
@lock_name = "lock:#{obj.class.name}:#{obj.id}".squeeze(":")
|
||||
@token = Digest::MD5.new.hexdigest("#{@lock_name}_#{Time.now.to_f}")
|
||||
set_expiration_time
|
||||
end
|
||||
|
||||
def acquire_mutex
|
||||
|
@ -25,6 +23,10 @@ class UncomplicatedMutex
|
|||
@redis.eval(LUA_ACQUIRE, [ @lock_name ], [ @timeout, @token ]) == 1
|
||||
end
|
||||
|
||||
def current_token_value
|
||||
@redis.get(@lock_name)
|
||||
end
|
||||
|
||||
def destroy_mutex
|
||||
puts("Destroying the lock #{@lock_name}") if @verbose
|
||||
@redis.del(@lock_name)
|
||||
|
@ -39,13 +41,9 @@ class UncomplicatedMutex
|
|||
end
|
||||
end
|
||||
|
||||
def overwrite_mutex
|
||||
puts("Replacing the lock #{@lock_name} with #{@token}") if @verbose
|
||||
@redis.set(@lock_name, @token)
|
||||
end
|
||||
|
||||
def recurse_until_ready(depth = 1)
|
||||
return false if depth == @ticks
|
||||
return false if time_has_expired
|
||||
@initial_token = current_token_value if depth == 1
|
||||
wait_a_tick if depth > 1
|
||||
acquire_mutex || recurse_until_ready(depth + 1)
|
||||
end
|
||||
|
@ -55,9 +53,37 @@ class UncomplicatedMutex
|
|||
@redis.eval(LUA_RELEASE, [ @lock_name ], [ @token ])
|
||||
end
|
||||
|
||||
def same_token_as_before
|
||||
new_token = current_token_value
|
||||
if new_token == @initial_token
|
||||
true
|
||||
else
|
||||
@initial_token = new_token
|
||||
false
|
||||
end
|
||||
end
|
||||
|
||||
def set_expiration_time
|
||||
@expiration_time = Time.now.to_i + @timeout
|
||||
end
|
||||
|
||||
def time_has_expired
|
||||
if Time.now.to_i > @expiration_time
|
||||
if same_token_as_before
|
||||
true
|
||||
else
|
||||
set_expiration_time
|
||||
false
|
||||
end
|
||||
else
|
||||
false
|
||||
end
|
||||
end
|
||||
|
||||
def wait_a_tick
|
||||
puts("Sleeping #{@wait_tick} for the lock #{@lock_name} to become available") if @verbose
|
||||
sleep(@wait_tick)
|
||||
sleep_time = rand(100).to_f / 100.0
|
||||
puts("Sleeping #{sleep_time} for the lock #{@lock_name} to become available") if @verbose
|
||||
sleep(sleep_time)
|
||||
end
|
||||
|
||||
def wait_for_mutex
|
||||
|
@ -65,8 +91,7 @@ class UncomplicatedMutex
|
|||
puts("Acquired lock #{@lock_name}") if @verbose
|
||||
else
|
||||
puts("Failed to acquire the lock") if @verbose
|
||||
raise MutexTimeout.new("Failed to acquire the lock") if @fail_on_timeout
|
||||
overwrite_mutex
|
||||
raise MutexTimeout.new("Failed to acquire the lock")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -54,21 +54,6 @@ class TestUncomplicatedMutex < Minitest::Test
|
|||
end
|
||||
end
|
||||
|
||||
def test_exception_is_not_thrown
|
||||
begin
|
||||
@redis.set(@lock_name, 1)
|
||||
@mutex2.lock do
|
||||
sleep 1.05
|
||||
end
|
||||
rescue UncomplicatedMutex::MutexTimeout
|
||||
flunk "Exception thrown"
|
||||
else
|
||||
pass "Exception was not thrown"
|
||||
ensure
|
||||
@redis.del(@lock_name)
|
||||
end
|
||||
end
|
||||
|
||||
def test_lock_is_not_overwritten
|
||||
@mutex1.lock do
|
||||
@redis.set(@lock_name, 'abc123')
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
Gem::Specification.new do |s|
|
||||
s.name = 'uncomplicated_mutex'
|
||||
s.version = '1.0.1'
|
||||
s.date = '2014-12-22'
|
||||
s.version = '1.1.0'
|
||||
s.date = '2015-04-13'
|
||||
s.summary = 'Redis. Lua. Mutex.'
|
||||
s.description = 'A mutex that uses Redis that is also not complicated.'
|
||||
s.authors = [ 'Andrew Coleman' ]
|
||||
|
|
Loading…
Reference in New Issue