Skip to content

Commit a53dac2

Browse files
authored
Merge pull request #70 from kirides/issues/68/distributed_lock
fixes #68 & 69 Refactor DistributedLock
2 parents 1ee6a3f + 7c3c90c commit a53dac2

File tree

4 files changed

+193
-132
lines changed

4 files changed

+193
-132
lines changed

src/main/Hangfire.Storage.SQLite/ExpirationManager.cs

+2-4
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,8 @@ private int RemoveExpireRows(HangfireDbContext db, string table)
105105

106106
try
107107
{
108-
var _lock = new SQLiteDistributedLock(DistributedLockKey, DefaultLockTimeout,
109-
db, db.StorageOptions);
110-
111-
using (_lock)
108+
using (SQLiteDistributedLock.Acquire(DistributedLockKey, DefaultLockTimeout,
109+
db, db.StorageOptions))
112110
{
113111
rowsAffected = db.Database.Execute(deleteScript);
114112
}

src/main/Hangfire.Storage.SQLite/HangfireSQLiteConnection.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public override void Dispose()
4747
public override IDisposable AcquireDistributedLock(string resource, TimeSpan timeout)
4848
{
4949
return Retry.Twice((_) =>
50-
new SQLiteDistributedLock($"HangFire:{resource}", timeout, DbContext, _storageOptions)
50+
SQLiteDistributedLock.Acquire($"HangFire:{resource}", timeout, DbContext, _storageOptions)
5151
);
5252
}
5353

src/main/Hangfire.Storage.SQLite/SQLiteDistributedLock.cs

+85-103
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
using Hangfire.Storage.SQLite.Entities;
33
using SQLite;
44
using System;
5-
using System.Collections.Generic;
5+
using System.Diagnostics;
66
using System.Threading;
77

88
namespace Hangfire.Storage.SQLite
@@ -14,9 +14,6 @@ public class SQLiteDistributedLock : IDisposable
1414
{
1515
private static readonly ILog Logger = LogProvider.For<SQLiteDistributedLock>();
1616

17-
private static readonly ThreadLocal<Dictionary<string, int>> AcquiredLocks
18-
= new ThreadLocal<Dictionary<string, int>>(() => new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase));
19-
2017
private readonly string _resource;
2118
private readonly string _resourceKey;
2219

@@ -30,15 +27,17 @@ private static readonly ThreadLocal<Dictionary<string, int>> AcquiredLocks
3027

3128
private string EventWaitHandleName => string.Intern($@"{GetType().FullName}.{_resource}");
3229

30+
public event Action<bool> Heartbeat;
31+
3332
/// <summary>
3433
/// Creates SQLite distributed lock
3534
/// </summary>
3635
/// <param name="resource">Lock resource</param>
37-
/// <param name="timeout">Lock timeout</param>
3836
/// <param name="database">Lock database</param>
3937
/// <param name="storageOptions">Database options</param>
4038
/// <exception cref="DistributedLockTimeoutException">Thrown if lock is not acuired within the timeout</exception>
41-
public SQLiteDistributedLock(string resource, TimeSpan timeout, HangfireDbContext database,
39+
private SQLiteDistributedLock(string resource,
40+
HangfireDbContext database,
4241
SQLiteStorageOptions storageOptions)
4342
{
4443
_resource = resource ?? throw new ArgumentNullException(nameof(resource));
@@ -50,22 +49,25 @@ public SQLiteDistributedLock(string resource, TimeSpan timeout, HangfireDbContex
5049
{
5150
throw new ArgumentException($@"The {nameof(resource)} cannot be empty", nameof(resource));
5251
}
52+
}
53+
54+
public static SQLiteDistributedLock Acquire(
55+
string resource,
56+
TimeSpan timeout,
57+
HangfireDbContext database,
58+
SQLiteStorageOptions storageOptions)
59+
{
5360
if (timeout.TotalSeconds > int.MaxValue)
5461
{
5562
throw new ArgumentException($"The timeout specified is too large. Please supply a timeout equal to or less than {int.MaxValue} seconds", nameof(timeout));
5663
}
5764

58-
if (!AcquiredLocks.Value.ContainsKey(_resource) || AcquiredLocks.Value[_resource] == 0)
59-
{
60-
Cleanup();
61-
Acquire(timeout);
62-
AcquiredLocks.Value[_resource] = 1;
63-
StartHeartBeat();
64-
}
65-
else
66-
{
67-
AcquiredLocks.Value[_resource]++;
68-
}
65+
var slock = new SQLiteDistributedLock(resource, database, storageOptions);
66+
67+
slock.Acquire(timeout);
68+
slock.StartHeartBeat();
69+
70+
return slock;
6971
}
7072

7173
/// <summary>
@@ -78,96 +80,52 @@ public void Dispose()
7880
{
7981
return;
8082
}
83+
8184
_completed = true;
85+
_heartbeatTimer?.Dispose();
86+
Release();
87+
}
8288

83-
if (!AcquiredLocks.Value.ContainsKey(_resource))
89+
private bool TryAcquireLock()
90+
{
91+
Cleanup();
92+
try
8493
{
85-
return;
86-
}
87-
88-
AcquiredLocks.Value[_resource]--;
94+
var distributedLock = new DistributedLock
95+
{
96+
Id = Guid.NewGuid().ToString(),
97+
Resource = _resource,
98+
ResourceKey = _resourceKey,
99+
ExpireAt = DateTime.UtcNow.Add(_storageOptions.DistributedLockLifetime)
100+
};
89101

90-
if (AcquiredLocks.Value[_resource] > 0)
91-
{
92-
return;
102+
return _dbContext.Database.Insert(distributedLock) == 1;
93103
}
94-
95-
// Timer callback may be invoked after the Dispose method call,
96-
// but since we use the resource key, we will not disturb other owners.
97-
AcquiredLocks.Value.Remove(_resource);
98-
99-
if (_heartbeatTimer != null)
104+
catch (SQLiteException e) when (e.Result == SQLite3.Result.Constraint)
100105
{
101-
_heartbeatTimer.Dispose();
102-
_heartbeatTimer = null;
106+
return false;
103107
}
104-
105-
Release();
106-
107-
Cleanup();
108108
}
109109

110110
private void Acquire(TimeSpan timeout)
111111
{
112-
try
112+
var sw = Stopwatch.StartNew();
113+
do
113114
{
114-
var isLockAcquired = false;
115-
var now = DateTime.UtcNow;
116-
var lockTimeoutTime = now.Add(timeout);
117-
118-
while (lockTimeoutTime >= now)
115+
if (TryAcquireLock())
119116
{
120-
Cleanup();
121-
122-
lock (EventWaitHandleName)
123-
{
124-
var result = _dbContext.DistributedLockRepository.FirstOrDefault(_ => _.Resource == _resource);
125-
126-
if (result == null)
127-
{
128-
try
129-
{
130-
var distributedLock = new DistributedLock();
131-
distributedLock.Id = Guid.NewGuid().ToString();
132-
distributedLock.Resource = _resource;
133-
distributedLock.ResourceKey = _resourceKey;
134-
distributedLock.ExpireAt = DateTime.UtcNow.Add(_storageOptions.DistributedLockLifetime);
135-
136-
_dbContext.Database.Insert(distributedLock);
137-
138-
// we were able to acquire the lock - break the loop
139-
isLockAcquired = true;
140-
break;
141-
}
142-
catch (SQLiteException e) when (e.Result == SQLite3.Result.Constraint)
143-
{
144-
// The lock already exists preventing us from inserting.
145-
continue;
146-
}
147-
}
148-
}
149-
150-
// we couldn't acquire the lock - wait a bit and try again
151-
var waitTime = (int)timeout.TotalMilliseconds / 10;
152-
lock (EventWaitHandleName)
153-
Monitor.Wait(EventWaitHandleName, waitTime);
154-
155-
now = DateTime.UtcNow;
117+
return;
156118
}
157119

158-
if (!isLockAcquired)
120+
var waitTime = (int) timeout.TotalMilliseconds / 10;
121+
// either wait for the event to be raised, or timeout
122+
lock (EventWaitHandleName)
159123
{
160-
throw new DistributedLockTimeoutException(_resource);
124+
Monitor.Wait(EventWaitHandleName, waitTime);
161125
}
162-
}
163-
catch (DistributedLockTimeoutException ex)
164-
{
165-
throw ex;
166-
}
167-
catch (Exception ex)
168-
{
169-
throw ex;
170-
}
126+
} while (sw.Elapsed <= timeout);
127+
128+
throw new DistributedLockTimeoutException(_resource);
171129
}
172130

173131
/// <summary>
@@ -179,9 +137,12 @@ private void Release()
179137
Retry.Twice((retry) => {
180138

181139
// Remove resource lock (if it's still ours)
182-
_dbContext.DistributedLockRepository.Delete(_ => _.Resource == _resource && _.ResourceKey == _resourceKey);
183-
lock (EventWaitHandleName)
184-
Monitor.Pulse(EventWaitHandleName);
140+
var count = _dbContext.DistributedLockRepository.Delete(_ => _.Resource == _resource && _.ResourceKey == _resourceKey);
141+
if (count != 0)
142+
{
143+
lock (EventWaitHandleName)
144+
Monitor.Pulse(EventWaitHandleName);
145+
}
185146
});
186147
}
187148

@@ -192,7 +153,7 @@ private void Cleanup()
192153
Retry.Twice((_) => {
193154
// Delete expired locks (of any owner)
194155
_dbContext.DistributedLockRepository.
195-
Delete(x => x.Resource == _resource && x.ExpireAt < DateTime.UtcNow);
156+
Delete(x => x.Resource == _resource && x.ExpireAt < DateTime.UtcNow);
196157
});
197158
}
198159
catch (Exception ex)
@@ -210,27 +171,48 @@ private void StartHeartBeat()
210171

211172
_heartbeatTimer = new Timer(state =>
212173
{
174+
// stop timer
175+
_heartbeatTimer?.Change(Timeout.Infinite, Timeout.Infinite);
213176
// Timer callback may be invoked after the Dispose method call,
214177
// but since we use the resource key, we will not disturb other owners.
215178
try
216179
{
217-
var distributedLock = _dbContext.DistributedLockRepository.FirstOrDefault(x => x.Resource == _resource && x.ResourceKey == _resourceKey);
218-
if (distributedLock != null)
219-
{
220-
distributedLock.ExpireAt = DateTime.UtcNow.Add(_storageOptions.DistributedLockLifetime);
221-
222-
_dbContext.Database.Update(distributedLock);
223-
}
224-
else
180+
var didUpdate = UpdateExpiration(_dbContext.DistributedLockRepository, DateTime.UtcNow.Add(_storageOptions.DistributedLockLifetime));
181+
Heartbeat?.Invoke(didUpdate);
182+
if (!didUpdate)
225183
{
226184
Logger.ErrorFormat("Unable to update heartbeat on the resource '{0}'. The resource is not locked or is locked by another owner.", _resource);
185+
186+
// if we no longer have a lock, stop the heartbeat immediately
187+
_heartbeatTimer?.Dispose();
188+
return;
227189
}
228190
}
229191
catch (Exception ex)
230192
{
231193
Logger.ErrorFormat("Unable to update heartbeat on the resource '{0}'. {1}", _resource, ex);
232194
}
195+
// restart timer
196+
_heartbeatTimer?.Change(timerInterval, timerInterval);
233197
}, null, timerInterval, timerInterval);
234198
}
199+
200+
private bool UpdateExpiration(TableQuery<DistributedLock> tableQuery, DateTime expireAt)
201+
{
202+
var expireColumn = tableQuery.Table.FindColumnWithPropertyName(nameof(DistributedLock.ExpireAt)).Name;
203+
var resourceColumn = tableQuery.Table.FindColumnWithPropertyName(nameof(DistributedLock.Resource)).Name;
204+
var resourceKeyColumn = tableQuery.Table.FindColumnWithPropertyName(nameof(DistributedLock.ResourceKey)).Name;
205+
var table = tableQuery.Table.TableName;
206+
207+
var command = tableQuery.Connection.CreateCommand($@"UPDATE ""{table}""
208+
SET ""{expireColumn}"" = ?
209+
WHERE ""{resourceColumn}"" = ?
210+
AND ""{resourceKeyColumn}"" = ?",
211+
expireAt,
212+
_resource,
213+
_resourceKey);
214+
215+
return command.ExecuteNonQuery() != 0;
216+
}
235217
}
236-
}
218+
}

0 commit comments

Comments
 (0)