Skip to content

Commit fee5cce

Browse files
committed
expose IAsyncEnumerable on ChannelMessageQueue
fix #2400
1 parent 867b04d commit fee5cce

File tree

4 files changed

+58
-8
lines changed

4 files changed

+58
-8
lines changed

docs/ReleaseNotes.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ Current package versions:
88

99
## Unreleased
1010

11-
No pending changes.
11+
- Fix [#2400](https://github.com/StackExchange/StackExchange.Redis/issues/2400): Expose `ChannelMessageQueue` as `IAsyncEnumerable<ChannelMessage>` ([#???? by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/????))
1212

1313
## 2.6.96
1414

15-
- Fix [#2350](https://github.com/StackExchange/StackExchange.Redis/issues/2350): Properly parse lua script paramters in all cultures ([#2351 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2351))
15+
- Fix [#2350](https://github.com/StackExchange/StackExchange.Redis/issues/2350): Properly parse lua script parameters in all cultures ([#2351 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2351))
1616
- Fix [#2362](https://github.com/StackExchange/StackExchange.Redis/issues/2362): Set `RedisConnectionException.FailureType` to `AuthenticationFailure` on all authentication scenarios for better handling ([#2367 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2367))
1717
- Fix [#2368](https://github.com/StackExchange/StackExchange.Redis/issues/2368): Support `RedisValue.Length()` for all storage types ([#2370 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2370))
1818
- Fix [#2376](https://github.com/StackExchange/StackExchange.Redis/issues/2376): Avoid a (rare) deadlock scenario ([#2378 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2378))

src/StackExchange.Redis/ChannelMessageQueue.cs

+21-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
using System;
2+
using System.Collections.Generic;
23
using System.Reflection;
4+
using System.Runtime.CompilerServices;
35
using System.Threading;
46
using System.Threading.Channels;
57
using System.Threading.Tasks;
@@ -66,7 +68,7 @@ internal ChannelMessage(ChannelMessageQueue queue, in RedisChannel channel, in R
6668
/// To create a ChannelMessageQueue, use <see cref="ISubscriber.Subscribe(RedisChannel, CommandFlags)"/>
6769
/// or <see cref="ISubscriber.SubscribeAsync(RedisChannel, CommandFlags)"/>.
6870
/// </remarks>
69-
public sealed class ChannelMessageQueue
71+
public sealed class ChannelMessageQueue : IAsyncEnumerable<ChannelMessage>
7072
{
7173
private readonly Channel<ChannelMessage> _queue;
7274
/// <summary>
@@ -319,10 +321,7 @@ internal void UnsubscribeImpl(Exception? error = null, CommandFlags flags = Comm
319321
{
320322
var parent = _parent;
321323
_parent = null;
322-
if (parent != null)
323-
{
324-
parent.UnsubscribeAsync(Channel, null, this, flags);
325-
}
324+
parent?.UnsubscribeAsync(Channel, null, this, flags);
326325
_queue.Writer.TryComplete(error);
327326
}
328327

@@ -348,5 +347,22 @@ internal async Task UnsubscribeAsyncImpl(Exception? error = null, CommandFlags f
348347
/// </summary>
349348
/// <param name="flags">The flags to use when unsubscribing.</param>
350349
public Task UnsubscribeAsync(CommandFlags flags = CommandFlags.None) => UnsubscribeAsyncImpl(null, flags);
350+
351+
/// <inheritdoc cref="IAsyncEnumerable{ChannelMessage}.GetAsyncEnumerator(CancellationToken)"/>
352+
#if NETCOREAPP3_0_OR_GREATER
353+
public IAsyncEnumerator<ChannelMessage> GetAsyncEnumerator(CancellationToken cancellationToken = default)
354+
=> _queue.Reader.ReadAllAsync().GetAsyncEnumerator(cancellationToken);
355+
#else
356+
public async IAsyncEnumerator<ChannelMessage> GetAsyncEnumerator(CancellationToken cancellationToken = default)
357+
{
358+
while (await _queue.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
359+
{
360+
while (_queue.Reader.TryRead(out var item))
361+
{
362+
yield return item;
363+
}
364+
}
365+
}
366+
#endif
351367
}
352368
}
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-

1+
StackExchange.Redis.ChannelMessageQueue.GetAsyncEnumerator(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Collections.Generic.IAsyncEnumerator<StackExchange.Redis.ChannelMessage>!

tests/StackExchange.Redis.Tests/PubSubTests.cs

+34
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,40 @@ private void TestMassivePublish(ISubscriber sub, string channel, string caption)
308308
Assert.True(withFAF.ElapsedMilliseconds < withAsync.ElapsedMilliseconds + 3000, caption);
309309
}
310310

311+
[Fact]
312+
public async Task SubscribeAsyncEnumerable()
313+
{
314+
using var conn = Create(syncTimeout: 20000, shared: false, log: Writer);
315+
316+
var sub = conn.GetSubscriber();
317+
RedisChannel channel = Me();
318+
319+
const int TO_SEND = 5;
320+
var gotall = new TaskCompletionSource<int>();
321+
322+
var source = await sub.SubscribeAsync(channel);
323+
var op = Task.Run(async () => {
324+
int count = 0;
325+
await foreach (var item in source)
326+
{
327+
count++;
328+
if (count == TO_SEND) gotall.TrySetResult(count);
329+
}
330+
return count;
331+
});
332+
333+
for (int i = 0; i < TO_SEND; i++)
334+
{
335+
await sub.PublishAsync(channel, i);
336+
}
337+
await gotall.Task.WithTimeout(5000);
338+
339+
// check the enumerator exits cleanly
340+
sub.Unsubscribe(channel);
341+
var count = await op.WithTimeout(1000);
342+
Assert.Equal(5, count);
343+
}
344+
311345
[Fact]
312346
public async Task PubSubGetAllAnyOrder()
313347
{

0 commit comments

Comments
 (0)