Skip to content

Commit 854baa7

Browse files
authored
[Event Hubs] Partition Reciever Implementation (#11003)
These changes focus on implementing the `PartitionReceiver` and ensuring good unit test coverage. Live tests will be fleshed out as part of a new workstream.
1 parent 37ee687 commit 854baa7

File tree

6 files changed

+400
-18
lines changed

6 files changed

+400
-18
lines changed

sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,48 @@ protected EventProcessor(int eventBatchMaximumCount, string consumerGroup, strin
319319
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
320320
public override string ToString() { throw null; }
321321
}
322+
public partial class PartitionReceiver : System.IAsyncDisposable
323+
{
324+
protected PartitionReceiver() { }
325+
public PartitionReceiver(string consumerGroup, string partitionId, Azure.Messaging.EventHubs.Consumer.EventPosition eventPosition, Azure.Messaging.EventHubs.EventHubConnection connection, Azure.Messaging.EventHubs.Primitives.PartitionReceiverOptions options = null) { }
326+
public PartitionReceiver(string consumerGroup, string partitionId, Azure.Messaging.EventHubs.Consumer.EventPosition eventPosition, string connectionString, Azure.Messaging.EventHubs.Primitives.PartitionReceiverOptions options = null) { }
327+
public PartitionReceiver(string consumerGroup, string partitionId, Azure.Messaging.EventHubs.Consumer.EventPosition eventPosition, string fullyQualifiedNamespace, string eventHubName, Azure.Core.TokenCredential credential, Azure.Messaging.EventHubs.Primitives.PartitionReceiverOptions options = null) { }
328+
public PartitionReceiver(string consumerGroup, string partitionId, Azure.Messaging.EventHubs.Consumer.EventPosition eventPosition, string connectionString, string eventHubName, Azure.Messaging.EventHubs.Primitives.PartitionReceiverOptions options = null) { }
329+
public string ConsumerGroup { get { throw null; } }
330+
public string EventHubName { get { throw null; } }
331+
public string FullyQualifiedNamespace { get { throw null; } }
332+
public Azure.Messaging.EventHubs.Consumer.EventPosition InitialPosition { get { throw null; } }
333+
public bool IsClosed { get { throw null; } protected set { } }
334+
public string PartitionId { get { throw null; } }
335+
public virtual System.Threading.Tasks.Task CloseAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
336+
public virtual System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
337+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
338+
public override bool Equals(object obj) { throw null; }
339+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
340+
public override int GetHashCode() { throw null; }
341+
public virtual System.Threading.Tasks.Task<Azure.Messaging.EventHubs.PartitionProperties> GetPartitionPropertiesAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
342+
public virtual Azure.Messaging.EventHubs.Consumer.LastEnqueuedEventProperties ReadLastEnqueuedEventProperties() { throw null; }
343+
public virtual System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData>> ReceiveBatchAsync(int maximumEventCount, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
344+
public virtual System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData>> ReceiveBatchAsync(int maximumEventCount, System.TimeSpan maximumWaitTime, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
345+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
346+
public override string ToString() { throw null; }
347+
}
348+
public partial class PartitionReceiverOptions
349+
{
350+
public PartitionReceiverOptions() { }
351+
public Azure.Messaging.EventHubs.EventHubConnectionOptions ConnectionOptions { get { throw null; } set { } }
352+
public System.TimeSpan? DefaultMaximumReceiveWaitTime { get { throw null; } set { } }
353+
public long? OwnerLevel { get { throw null; } set { } }
354+
public int PrefetchCount { get { throw null; } set { } }
355+
public Azure.Messaging.EventHubs.EventHubsRetryOptions RetryOptions { get { throw null; } set { } }
356+
public bool TrackLastEnqueuedEventProperties { get { throw null; } set { } }
357+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
358+
public override bool Equals(object obj) { throw null; }
359+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
360+
public override int GetHashCode() { throw null; }
361+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
362+
public override string ToString() { throw null; }
363+
}
322364
}
323365
namespace Azure.Messaging.EventHubs.Processor
324366
{

sdk/eventhub/Azure.Messaging.EventHubs/design/event-processor{T}-proposal.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,7 @@ public abstract class EventProcessor<TPartition> where TPartition : EventProcess
376376
public string ConsumerGroup { get; }
377377
public string Identifier { get; protected set; }
378378
public bool IsRunning { get; protected set; }
379+
protected EventHubsRetryPolicy RetryPolicy { get; }
379380

380381
protected EventProcessor(
381382
int eventBatchMaximumCount,
@@ -403,6 +404,7 @@ public abstract class EventProcessor<TPartition> where TPartition : EventProcess
403404
public virtual void StartProcessing(CancellationToken cancellationToken = default);
404405
public virtual Task StopProcessingAsync(CancellationToken cancellationToken = default);
405406
public virtual void StopProcessing(CancellationToken cancellationToken = default);
407+
protected virtual EventHubConnection CreateConnection();
406408

407409
protected virtual Task OnInitializingPartitionAsync(TPartition partition, CancellationToken cancellationToken);
408410
protected virtual Task OnPartitionProcessingStoppedAsync(TPartition partition, ProcessingStoppedReason reason, CancellationToken cancellationToken);

sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PartitionReceiver.cs

Lines changed: 76 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ namespace Azure.Messaging.EventHubs.Primitives
3030
/// <seealso cref="EventHubConsumerClient.ReadEventsFromPartitionAsync(string, EventPosition, CancellationToken)"/>
3131
/// <seealso cref="EventHubConsumerClient.ReadEventsFromPartitionAsync(string, EventPosition, ReadEventOptions, CancellationToken)"/>
3232
///
33-
internal class PartitionReceiver : IAsyncDisposable
33+
public class PartitionReceiver : IAsyncDisposable
3434
{
3535
/// <summary>
3636
/// The fully qualified Event Hubs namespace that the client is associated with. This is likely
@@ -89,6 +89,12 @@ internal class PartitionReceiver : IAsyncDisposable
8989
///
9090
private bool OwnsConnection { get; } = true;
9191

92+
/// <summary>
93+
/// The default maximum amount of time to wait to build up the requested message count for the batch.
94+
/// </summary>
95+
///
96+
private TimeSpan? DefaultMaximumWaitTime { get; }
97+
9298
/// <summary>
9399
/// The policy to use for determining retry behavior for when an operation fails.
94100
/// </summary>
@@ -174,6 +180,7 @@ public PartitionReceiver(string consumerGroup,
174180
ConsumerGroup = consumerGroup;
175181
PartitionId = partitionId;
176182
InitialPosition = eventPosition;
183+
DefaultMaximumWaitTime = options.DefaultMaximumReceiveWaitTime;
177184
RetryPolicy = options.RetryOptions.ToRetryPolicy();
178185
InnerConsumer = CreateTransportConsumer(consumerGroup, partitionId, eventPosition, RetryPolicy, options);
179186
}
@@ -210,6 +217,7 @@ public PartitionReceiver(string consumerGroup,
210217
ConsumerGroup = consumerGroup;
211218
PartitionId = partitionId;
212219
InitialPosition = eventPosition;
220+
DefaultMaximumWaitTime = options.DefaultMaximumReceiveWaitTime;
213221
RetryPolicy = options.RetryOptions.ToRetryPolicy();
214222
InnerConsumer = CreateTransportConsumer(consumerGroup, partitionId, eventPosition, RetryPolicy, options);
215223
}
@@ -241,6 +249,7 @@ public PartitionReceiver(string consumerGroup,
241249
ConsumerGroup = consumerGroup;
242250
PartitionId = partitionId;
243251
InitialPosition = eventPosition;
252+
DefaultMaximumWaitTime = options.DefaultMaximumReceiveWaitTime;
244253
RetryPolicy = options.RetryOptions.ToRetryPolicy();
245254
InnerConsumer = CreateTransportConsumer(consumerGroup, partitionId, eventPosition, RetryPolicy, options);
246255
}
@@ -265,34 +274,62 @@ protected PartitionReceiver()
265274
///
266275
public virtual async Task<PartitionProperties> GetPartitionPropertiesAsync(CancellationToken cancellationToken = default)
267276
{
277+
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
278+
268279
Argument.AssertNotClosed(IsClosed, nameof(PartitionReceiver));
269280
return await Connection.GetPartitionPropertiesAsync(PartitionId, RetryPolicy, cancellationToken).ConfigureAwait(false);
270281
}
271282

283+
/// <summary>
284+
/// A set of information about the last enqueued event of the partition associated with this receiver, observed as events
285+
/// are received from the Event Hubs service. This is only available if the receiver was created with <see cref="PartitionReceiverOptions.TrackLastEnqueuedEventProperties" />
286+
/// set. Otherwise, the properties will contain default values.
287+
/// </summary>
288+
///
289+
/// <returns>The set of properties for the last event that was enqueued to the partition. If no events were read or tracking was not set, the properties will be returned with default values.</returns>
290+
///
291+
/// <remarks>
292+
/// When information about the partition's last enqueued event is being tracked, each event received from the Event Hubs
293+
/// service will carry metadata about the partition that it otherwise would not. This results in a small amount of
294+
/// additional network bandwidth consumption that is generally a favorable trade-off when considered
295+
/// against periodically making requests for partition properties using an Event Hub client.
296+
/// </remarks>
297+
///
298+
/// <exception cref="EventHubsException">Occurs when the Event Hubs client needed to read this information is no longer available.</exception>
299+
///
300+
public virtual LastEnqueuedEventProperties ReadLastEnqueuedEventProperties()
301+
{
302+
Argument.AssertNotClosed(InnerConsumer.IsClosed, Resources.ClientNeededForThisInformationNotAvailable);
303+
return new LastEnqueuedEventProperties(InnerConsumer.LastReceivedEvent);
304+
}
305+
272306
/// <summary>
273307
/// Receives a batch of <see cref="EventData" /> from the Event Hub partition this client is associated with.
274308
/// </summary>
275309
///
276310
/// <param name="maximumEventCount">The maximum number of messages to receive in this batch.</param>
277-
/// <param name="maximumWaitTime">The maximum amount of time to wait to build up the requested message count for the batch; if not specified, the default wait time specified by the options when the client was created will be used.</param>
278311
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
279312
///
280313
/// <returns>The batch of <see cref="EventData" /> from the Event Hub partition this client is associated with. If no events are present, an empty enumerable is returned.</returns>
281314
///
282-
public virtual Task<IEnumerable<EventData>> ReceiveBatchAsync(int maximumEventCount,
283-
TimeSpan maximumWaitTime,
284-
CancellationToken cancellationToken = default)
285-
{
286-
Argument.AssertNotClosed(IsClosed, nameof(PartitionReceiver));
287-
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
288-
289-
Argument.AssertInRange(maximumEventCount, 1, int.MaxValue, nameof(maximumEventCount));
290-
Argument.AssertNotNegative(maximumWaitTime, nameof(maximumWaitTime));
315+
public virtual async Task<IEnumerable<EventData>> ReceiveBatchAsync(int maximumEventCount,
316+
CancellationToken cancellationToken = default) =>
317+
await ReceiveBatchInternalAsync(maximumEventCount, null, cancellationToken).ConfigureAwait(false);
291318

292-
// TODO: implement method.
293-
294-
return default;
295-
}
319+
/// <summary>
320+
/// Receives a batch of <see cref="EventData" /> from the Event Hub partition this client is associated with.
321+
/// </summary>
322+
///
323+
/// <param name="maximumEventCount">The maximum number of messages to receive in this batch.</param>
324+
/// <param name="maximumWaitTime">The maximum amount of time to wait to build up the requested message count for the batch.</param>
325+
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
326+
///
327+
/// <returns>The batch of <see cref="EventData" /> from the Event Hub partition this client is associated with. If no events are present, an empty enumerable is returned.</returns>
328+
///
329+
public virtual async Task<IEnumerable<EventData>> ReceiveBatchAsync(int maximumEventCount,
330+
TimeSpan maximumWaitTime,
331+
CancellationToken cancellationToken = default) =>
332+
await ReceiveBatchInternalAsync(maximumEventCount, maximumWaitTime, cancellationToken).ConfigureAwait(false);
296333

297334
/// <summary>
298335
/// Closes the client.
@@ -412,5 +449,29 @@ internal virtual TransportConsumer CreateTransportConsumer(string consumerGroup,
412449
EventHubsRetryPolicy retryPolicy,
413450
PartitionReceiverOptions options) =>
414451
Connection.CreateTransportConsumer(consumerGroup, partitionId, eventPosition, retryPolicy, options.TrackLastEnqueuedEventProperties, options.OwnerLevel, (uint?)options.PrefetchCount);
452+
453+
/// <summary>
454+
/// Receives a batch of <see cref="EventData" /> from the Event Hub partition this client is associated with.
455+
/// </summary>
456+
///
457+
/// <param name="maximumEventCount">The maximum number of messages to receive in this batch.</param>
458+
/// <param name="maximumWaitTime">The maximum amount of time to wait to build up the requested message count for the batch; if not specified, the default wait time specified by the options when the client was created will be used.</param>
459+
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
460+
///
461+
/// <returns>The batch of <see cref="EventData" /> from the Event Hub partition this client is associated with. If no events are present, an empty enumerable is returned.</returns>
462+
///
463+
private Task<IReadOnlyList<EventData>> ReceiveBatchInternalAsync(int maximumEventCount,
464+
TimeSpan? maximumWaitTime,
465+
CancellationToken cancellationToken = default)
466+
{
467+
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
468+
maximumWaitTime ??= DefaultMaximumWaitTime;
469+
470+
Argument.AssertNotClosed(IsClosed, nameof(PartitionReceiver));
471+
Argument.AssertInRange(maximumEventCount, 1, int.MaxValue, nameof(maximumEventCount));
472+
Argument.AssertNotNegative(maximumWaitTime ?? TimeSpan.Zero, nameof(maximumWaitTime));
473+
474+
return InnerConsumer.ReceiveAsync(maximumEventCount, maximumWaitTime, cancellationToken);
475+
}
415476
}
416477
}

sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PartitionReceiverOptions.cs

100644100755
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ namespace Azure.Messaging.EventHubs.Primitives
1313
/// to configure its behavior.
1414
/// </summary>
1515
///
16-
internal class PartitionReceiverOptions
16+
public class PartitionReceiverOptions
1717
{
1818
/// <summary>The set of options to use for configuring the connection to the Event Hubs service.</summary>
1919
private EventHubConnectionOptions _connectionOptions = new EventHubConnectionOptions();

sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.MainProcessingLoop.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ public async Task BackgroundProcessingToleratesAnOwnershipClaimFailureWhenThePar
478478
public async Task BackgroundProcessingStartsProcessingForClaimedPartitions()
479479
{
480480
using var cancellationSource = new CancellationTokenSource();
481-
cancellationSource.CancelAfter(TimeSpan.FromSeconds(15));
481+
cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));
482482

483483
var firstPartiton = "27";
484484
var secondPartition = "15";

0 commit comments

Comments
 (0)