Skip to content

Commit dbb0d4f

Browse files
committed
feat: implement activity tracing
1 parent 9ecad93 commit dbb0d4f

13 files changed

+320
-82
lines changed

projects/Directory.Packages.props

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77
<PackageVersion Include="EasyNetQ.Management.Client" Version="3.0.0" />
88
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
99
<PackageVersion Include="Nullable" Version="1.3.1" />
10-
<PackageVersion Include="OpenTelemetry.Api" Version="1.9.0" />
10+
<PackageVersion Include="OpenTelemetry.Api" Version="1.11.2" />
1111
<PackageVersion Include="OpenTelemetry.Exporter.InMemory" Version="1.9.0" />
1212
<PackageVersion Include="System.Collections.Immutable" Version="8.0.0" />
13+
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="9.0.4" />
1314
<!--
1415
Note: do NOT upgrade the System.IO.Pipelines dependency unless necessary
1516
See https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1481#pullrequestreview-1847905299
@@ -33,7 +34,6 @@
3334
* https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1481#pullrequestreview-1847905299
3435
* https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1594
3536
-->
36-
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="8.0.1" />
3737
<PackageVersion Include="System.Memory" Version="4.5.5" />
3838
<PackageVersion Include="System.Threading.Channels" Version="8.0.0" />
3939
<PackageVersion Include="System.Net.Http.Json" Version="8.0.1" />
@@ -44,4 +44,4 @@
4444
<GlobalPackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
4545
<GlobalPackageReference Include="MinVer" Version="6.0.0" />
4646
</ItemGroup>
47-
</Project>
47+
</Project>

projects/RabbitMQ.Client.OpenTelemetry/RabbitMQ.Client.OpenTelemetry.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050

5151
<ItemGroup>
5252
<PackageReference Include="OpenTelemetry.Api" />
53+
<PackageReference Include="System.Diagnostics.DiagnosticSource" />
5354
</ItemGroup>
5455

5556
<ItemGroup Condition="$(TargetFramework) == 'netstandard2.0'">

projects/RabbitMQ.Client/ConnectionFactory.cs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Diagnostics;
3435
using System.Diagnostics.CodeAnalysis;
3536
using System.Linq;
3637
using System.Net.Security;
@@ -544,24 +545,31 @@ public async Task<IConnection> CreateConnectionAsync(IEndpointResolver endpointR
544545
CancellationToken cancellationToken = default)
545546
{
546547
ConnectionConfig config = CreateConfig(clientProvidedName);
548+
using Activity? connectionActivity = RabbitMQActivitySource.OpenConnection(false);
547549
try
548550
{
549551
if (AutomaticRecoveryEnabled)
550552
{
551-
return await AutorecoveringConnection.CreateAsync(config, endpointResolver, cancellationToken)
553+
connectionActivity?.SetTag("messaging.rabbitmq.connection.automatic_recovery", true);
554+
return await AutorecoveringConnection.CreateAsync(config, endpointResolver, connectionActivity, cancellationToken)
552555
.ConfigureAwait(false);
553556
}
554557
else
555558
{
559+
560+
connectionActivity?.SetTag("messaging.rabbitmq.connection.automatic_recovery", false);
556561
IFrameHandler frameHandler = await endpointResolver.SelectOneAsync(CreateFrameHandlerAsync, cancellationToken)
557562
.ConfigureAwait(false);
563+
connectionActivity.SetNetworkTags(frameHandler);
558564
var c = new Connection(config, frameHandler);
559565
return await c.OpenAsync(cancellationToken)
560566
.ConfigureAwait(false);
561567
}
562568
}
563569
catch (OperationCanceledException ex)
564570
{
571+
connectionActivity?.SetStatus(ActivityStatusCode.Error);
572+
connectionActivity?.AddException(ex);
565573
if (cancellationToken.IsCancellationRequested)
566574
{
567575
throw;
@@ -573,7 +581,10 @@ public async Task<IConnection> CreateConnectionAsync(IEndpointResolver endpointR
573581
}
574582
catch (Exception ex)
575583
{
576-
throw new BrokerUnreachableException(ex);
584+
var brokerUnreachableException = new BrokerUnreachableException(ex);
585+
connectionActivity?.SetStatus(ActivityStatusCode.Error);
586+
connectionActivity?.AddException(brokerUnreachableException);
587+
throw brokerUnreachableException;
577588
}
578589
}
579590

projects/RabbitMQ.Client/IEndpointResolverExtensions.cs

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,52 +38,42 @@ namespace RabbitMQ.Client
3838
{
3939
public static class EndpointResolverExtensions
4040
{
41-
public static async Task<T> SelectOneAsync<T>(this IEndpointResolver resolver,
41+
public static async Task<T> SelectOneAsync<T>(this IEndpointResolver resolver,
4242
Func<AmqpTcpEndpoint, CancellationToken, Task<T>> selector, CancellationToken cancellationToken)
4343
{
44-
var t = default(T);
4544
var exceptions = new List<Exception>();
4645
foreach (AmqpTcpEndpoint ep in resolver.All())
4746
{
4847
cancellationToken.ThrowIfCancellationRequested();
48+
using var tcpConnection = RabbitMQActivitySource.OpenTcpConnection();
49+
tcpConnection?.SetServerTags(ep);
4950
try
5051
{
51-
t = await selector(ep, cancellationToken).ConfigureAwait(false);
52-
if (t!.Equals(default(T)) == false)
53-
{
54-
return t;
55-
}
52+
return await selector(ep, cancellationToken).ConfigureAwait(false);
5653
}
5754
catch (OperationCanceledException ex)
5855
{
56+
tcpConnection?.AddException(ex);
5957
if (cancellationToken.IsCancellationRequested)
6058
{
6159
throw;
6260
}
63-
else
64-
{
65-
exceptions.Add(ex);
66-
}
61+
62+
exceptions.Add(ex);
6763
}
6864
catch (Exception e)
6965
{
66+
tcpConnection?.AddException(e);
7067
exceptions.Add(e);
7168
}
7269
}
7370

74-
if (EqualityComparer<T>.Default.Equals(t!, default!))
71+
if (exceptions.Count > 0)
7572
{
76-
if (exceptions.Count > 0)
77-
{
78-
throw new AggregateException(exceptions);
79-
}
80-
else
81-
{
82-
throw new InvalidOperationException(InternalConstants.BugFound);
83-
}
73+
throw new AggregateException(exceptions);
8474
}
8575

86-
return t!;
76+
throw new InvalidOperationException(InternalConstants.BugFound);
8777
}
8878
}
8979
}

projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recovery.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Diagnostics;
3435
using System.Linq;
3536
using System.Threading;
3637
using System.Threading.Tasks;
@@ -243,13 +244,13 @@ await _innerConnection.AbortAsync(Constants.InternalError, "FailedAutoRecovery",
243244
private async ValueTask<bool> TryRecoverConnectionDelegateAsync(CancellationToken cancellationToken)
244245
{
245246
Connection? maybeNewInnerConnection = null;
247+
using Activity? connectionActivity = RabbitMQActivitySource.OpenConnection(true);
246248
try
247249
{
248250
Connection defunctConnection = _innerConnection;
249-
250251
IFrameHandler fh = await _endpoints.SelectOneAsync(_config.FrameHandlerFactoryAsync, cancellationToken)
251252
.ConfigureAwait(false);
252-
253+
connectionActivity?.SetNetworkTags(fh);
253254
maybeNewInnerConnection = new Connection(_config, fh);
254255

255256
await maybeNewInnerConnection.OpenAsync(cancellationToken)
@@ -267,6 +268,8 @@ await maybeNewInnerConnection.OpenAsync(cancellationToken)
267268
}
268269
catch (Exception e)
269270
{
271+
connectionActivity?.AddException(e);
272+
connectionActivity?.SetStatus(ActivityStatusCode.Error);
270273
ESLog.Error("Connection recovery exception.", e);
271274
// Trigger recovery error events
272275
if (!_connectionRecoveryErrorAsyncWrapper.IsEmpty)

projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Diagnostics;
3435
using System.Diagnostics.CodeAnalysis;
3536
using System.Runtime.CompilerServices;
3637
using System.Threading;
@@ -89,11 +90,12 @@ Task onExceptionAsync(Exception exception, string context, CancellationToken can
8990
_innerConnection.OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken));
9091
}
9192

92-
internal static async ValueTask<AutorecoveringConnection> CreateAsync(ConnectionConfig config, IEndpointResolver endpoints,
93+
internal static async ValueTask<AutorecoveringConnection> CreateAsync(ConnectionConfig config, IEndpointResolver endpoints, Activity? connectionActivity,
9394
CancellationToken cancellationToken)
9495
{
9596
IFrameHandler fh = await endpoints.SelectOneAsync(config.FrameHandlerFactoryAsync, cancellationToken)
9697
.ConfigureAwait(false);
98+
connectionActivity.SetNetworkTags(fh);
9799
Connection innerConnection = new(config, fh);
98100
AutorecoveringConnection connection = new(config, endpoints, innerConnection);
99101
await innerConnection.OpenAsync(cancellationToken)

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
6060

6161
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
6262

63-
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
64-
? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length)
65-
: default;
63+
using Activity? sendActivity = RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length);
6664

6765
ulong publishSequenceNumber = 0;
6866
if (publisherConfirmationInfo is not null)
@@ -115,9 +113,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
115113

116114
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
117115

118-
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
119-
? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length)
120-
: default;
116+
using Activity? sendActivity = RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length);
121117

122118
ulong publishSequenceNumber = 0;
123119
if (publisherConfirmationInfo is not null)

projects/RabbitMQ.Client/Impl/Connection.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,9 @@ internal void TakeOver(Connection other)
228228
internal async ValueTask<IConnection> OpenAsync(CancellationToken cancellationToken)
229229
{
230230
cancellationToken.ThrowIfCancellationRequested();
231-
232231
try
233232
{
234233
RabbitMqClientEventSource.Log.ConnectionOpened();
235-
236234
cancellationToken.ThrowIfCancellationRequested();
237235

238236
// Note: this must happen *after* the frame handler is started
@@ -250,7 +248,7 @@ await _channel0.ConnectionOpenAsync(_config.VirtualHost, cancellationToken)
250248

251249
return this;
252250
}
253-
catch
251+
catch (Exception)
254252
{
255253
try
256254
{

0 commit comments

Comments
 (0)