Skip to content

Commit bfc7dbd

Browse files
authored
Async backlog (#1612)
* alternative implementation along the lines of #1574: - make the backlog processor async - check for work *before* trying to get the lock * release notes
1 parent 18d89ae commit bfc7dbd

File tree

2 files changed

+18
-23
lines changed

2 files changed

+18
-23
lines changed

docs/ReleaseNotes.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Release Notes
22

3-
## 2.2.0
3+
## 2.2.2
44

55
- add .NET 5 target
66
- fix mutex race condition (#1585 via arsnyder16)
@@ -13,6 +13,7 @@
1313
- fix `Int64` parse scenario (#1568 via arsnyder16)
1414
- force replication check during failover (via joroda)
1515
- documentation tweaks (multiple)
16+
- fix backlog contention issue (#1612, see also #1574 via devbv)
1617

1718
## 2.1.58
1819

src/StackExchange.Redis/PhysicalBridge.cs

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Diagnostics;
4-
using System.IO;
5-
using System.IO.Pipelines;
64
using System.Runtime.CompilerServices;
75
using System.Text;
86
using System.Threading;
97
using System.Threading.Channels;
108
using System.Threading.Tasks;
11-
using Pipelines.Sockets.Unofficial;
129
using Pipelines.Sockets.Unofficial.Threading;
1310
using static Pipelines.Sockets.Unofficial.Threading.MutexSlim;
1411
using static StackExchange.Redis.ConnectionMultiplexer;
@@ -56,9 +53,7 @@ public PhysicalBridge(ServerEndPoint serverEndPoint, ConnectionType type, int ti
5653
Name = Format.ToString(serverEndPoint.EndPoint) + "/" + ConnectionType.ToString();
5754
TimeoutMilliseconds = timeoutMilliseconds;
5855
_singleWriterMutex = new MutexSlim(timeoutMilliseconds: timeoutMilliseconds);
59-
_weakRefThis = new WeakReference(this);
6056
}
61-
private readonly WeakReference _weakRefThis;
6257

6358
private readonly int TimeoutMilliseconds;
6459

@@ -771,22 +766,15 @@ private bool PushToBacklog(Message message, bool onlyIfExists)
771766
[MethodImpl(MethodImplOptions.AggressiveInlining)]
772767
private void StartBacklogProcessor()
773768
{
774-
var sched = Multiplexer.SocketManager?.Scheduler ?? PipeScheduler.ThreadPool;
775769
#if DEBUG
776770
_backlogProcessorRequestedTime = Environment.TickCount;
777771
#endif
778-
sched.Schedule(s_ProcessBacklog, _weakRefThis);
772+
Task.Run(ProcessBacklogAsync);
779773
}
780774
#if DEBUG
781775
private volatile int _backlogProcessorRequestedTime;
782776
#endif
783777

784-
private static readonly Action<object> s_ProcessBacklog = s =>
785-
{
786-
var wr = (WeakReference)s;
787-
if (wr.Target is PhysicalBridge bridge) bridge.ProcessBacklog();
788-
};
789-
790778
private void CheckBacklogForTimeouts() // check the head of the backlog queue, consuming anything that looks dead
791779
{
792780
lock (_backlog)
@@ -810,6 +798,7 @@ private void CheckBacklogForTimeouts() // check the head of the backlog queue, c
810798
internal enum BacklogStatus : byte
811799
{
812800
Inactive,
801+
Starting,
813802
Started,
814803
CheckingForWork,
815804
CheckingForTimeout,
@@ -823,7 +812,7 @@ internal enum BacklogStatus : byte
823812
Faulted,
824813
}
825814
private volatile BacklogStatus _backlogStatus;
826-
private void ProcessBacklog()
815+
private async Task ProcessBacklogAsync()
827816
{
828817
LockToken token = default;
829818
try
@@ -833,12 +822,19 @@ private void ProcessBacklog()
833822
var msToStartWorker = unchecked(tryToAcquireTime - _backlogProcessorRequestedTime);
834823
int failureCount = 0;
835824
#endif
836-
while(true)
825+
_backlogStatus = BacklogStatus.Starting;
826+
while (true)
837827
{
838-
// try and get the lock; if unsuccessful, check for termination
839-
token = _singleWriterMutex.TryWait();
840-
if (token) break; // got the lock
841-
lock (_backlog) { if (_backlog.Count == 0) return; }
828+
// check whether the backlog is empty *before* even trying to get the lock
829+
lock (_backlog)
830+
{
831+
if (_backlog.Count == 0) return; // nothing to do
832+
}
833+
834+
// try and get the lock; if unsuccessful, retry
835+
token = await _singleWriterMutex.TryWaitAsync().ConfigureAwait(false);
836+
if (token.Success) break; // got the lock; now go do something with it
837+
842838
#if DEBUG
843839
failureCount++;
844840
#endif
@@ -887,9 +883,7 @@ private void ProcessBacklog()
887883
if (result == WriteResult.Success)
888884
{
889885
_backlogStatus = BacklogStatus.Flushing;
890-
#pragma warning disable CS0618
891-
result = physical.FlushSync(false, timeout);
892-
#pragma warning restore CS0618
886+
result = await physical.FlushAsync(false).ConfigureAwait(false);
893887
}
894888

895889
_backlogStatus = BacklogStatus.MarkingInactive;

0 commit comments

Comments
 (0)