Skip to content

Commit 0a2d604

Browse files
committed
feat: implement activity tracing
1 parent f3c34fb commit 0a2d604

9 files changed

+92
-62
lines changed

projects/RabbitMQ.Client/ConnectionFactory.cs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Diagnostics;
3435
using System.Diagnostics.CodeAnalysis;
3536
using System.Linq;
3637
using System.Net.Security;
@@ -544,24 +545,31 @@ public async Task<IConnection> CreateConnectionAsync(IEndpointResolver endpointR
544545
CancellationToken cancellationToken = default)
545546
{
546547
ConnectionConfig config = CreateConfig(clientProvidedName);
548+
using Activity? connectionActivity = RabbitMQActivitySource.OpenConnection(false);
547549
try
548550
{
549551
if (AutomaticRecoveryEnabled)
550552
{
551-
return await AutorecoveringConnection.CreateAsync(config, endpointResolver, cancellationToken)
553+
connectionActivity?.SetTag("messaging.rabbitmq.connection.automatic_recovery", true);
554+
return await AutorecoveringConnection.CreateAsync(config, endpointResolver, connectionActivity, cancellationToken)
552555
.ConfigureAwait(false);
553556
}
554557
else
555558
{
559+
560+
connectionActivity?.SetTag("messaging.rabbitmq.connection.automatic_recovery", false);
556561
IFrameHandler frameHandler = await endpointResolver.SelectOneAsync(CreateFrameHandlerAsync, cancellationToken)
557562
.ConfigureAwait(false);
563+
connectionActivity.SetNetworkTags(frameHandler);
558564
var c = new Connection(config, frameHandler);
559565
return await c.OpenAsync(cancellationToken)
560566
.ConfigureAwait(false);
561567
}
562568
}
563569
catch (OperationCanceledException ex)
564570
{
571+
connectionActivity?.SetStatus(ActivityStatusCode.Error);
572+
connectionActivity?.AddException(ex);
565573
if (cancellationToken.IsCancellationRequested)
566574
{
567575
throw;
@@ -573,7 +581,10 @@ public async Task<IConnection> CreateConnectionAsync(IEndpointResolver endpointR
573581
}
574582
catch (Exception ex)
575583
{
576-
throw new BrokerUnreachableException(ex);
584+
var brokerUnreachableException = new BrokerUnreachableException(ex);
585+
connectionActivity?.SetStatus(ActivityStatusCode.Error);
586+
connectionActivity?.AddException(brokerUnreachableException);
587+
throw brokerUnreachableException;
577588
}
578589
}
579590

projects/RabbitMQ.Client/IEndpointResolverExtensions.cs

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,52 +38,42 @@ namespace RabbitMQ.Client
3838
{
3939
public static class EndpointResolverExtensions
4040
{
41-
public static async Task<T> SelectOneAsync<T>(this IEndpointResolver resolver,
41+
public static async Task<T> SelectOneAsync<T>(this IEndpointResolver resolver,
4242
Func<AmqpTcpEndpoint, CancellationToken, Task<T>> selector, CancellationToken cancellationToken)
4343
{
44-
var t = default(T);
4544
var exceptions = new List<Exception>();
4645
foreach (AmqpTcpEndpoint ep in resolver.All())
4746
{
4847
cancellationToken.ThrowIfCancellationRequested();
48+
using var tcpConnection = RabbitMQActivitySource.OpenTcpConnection();
49+
tcpConnection?.SetServerTags(ep);
4950
try
5051
{
51-
t = await selector(ep, cancellationToken).ConfigureAwait(false);
52-
if (t!.Equals(default(T)) == false)
53-
{
54-
return t;
55-
}
52+
return await selector(ep, cancellationToken).ConfigureAwait(false);
5653
}
5754
catch (OperationCanceledException ex)
5855
{
56+
tcpConnection?.AddException(ex);
5957
if (cancellationToken.IsCancellationRequested)
6058
{
6159
throw;
6260
}
63-
else
64-
{
65-
exceptions.Add(ex);
66-
}
61+
62+
exceptions.Add(ex);
6763
}
6864
catch (Exception e)
6965
{
66+
tcpConnection?.AddException(e);
7067
exceptions.Add(e);
7168
}
7269
}
7370

74-
if (EqualityComparer<T>.Default.Equals(t!, default!))
71+
if (exceptions.Count > 0)
7572
{
76-
if (exceptions.Count > 0)
77-
{
78-
throw new AggregateException(exceptions);
79-
}
80-
else
81-
{
82-
throw new InvalidOperationException(InternalConstants.BugFound);
83-
}
73+
throw new AggregateException(exceptions);
8474
}
8575

86-
return t!;
76+
throw new InvalidOperationException(InternalConstants.BugFound);
8777
}
8878
}
8979
}

projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recovery.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Diagnostics;
3435
using System.Linq;
3536
using System.Threading;
3637
using System.Threading.Tasks;
@@ -243,13 +244,13 @@ await _innerConnection.AbortAsync(Constants.InternalError, "FailedAutoRecovery",
243244
private async ValueTask<bool> TryRecoverConnectionDelegateAsync(CancellationToken cancellationToken)
244245
{
245246
Connection? maybeNewInnerConnection = null;
247+
using Activity? connectionActivity = RabbitMQActivitySource.OpenConnection(true);
246248
try
247249
{
248250
Connection defunctConnection = _innerConnection;
249-
250251
IFrameHandler fh = await _endpoints.SelectOneAsync(_config.FrameHandlerFactoryAsync, cancellationToken)
251252
.ConfigureAwait(false);
252-
253+
connectionActivity?.SetNetworkTags(fh);
253254
maybeNewInnerConnection = new Connection(_config, fh);
254255

255256
await maybeNewInnerConnection.OpenAsync(cancellationToken)
@@ -267,6 +268,8 @@ await maybeNewInnerConnection.OpenAsync(cancellationToken)
267268
}
268269
catch (Exception e)
269270
{
271+
connectionActivity?.AddException(e);
272+
connectionActivity?.SetStatus(ActivityStatusCode.Error);
270273
ESLog.Error("Connection recovery exception.", e);
271274
// Trigger recovery error events
272275
if (!_connectionRecoveryErrorAsyncWrapper.IsEmpty)

projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Diagnostics;
3435
using System.Diagnostics.CodeAnalysis;
3536
using System.Runtime.CompilerServices;
3637
using System.Threading;
@@ -89,11 +90,12 @@ Task onExceptionAsync(Exception exception, string context, CancellationToken can
8990
_innerConnection.OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken));
9091
}
9192

92-
internal static async ValueTask<AutorecoveringConnection> CreateAsync(ConnectionConfig config, IEndpointResolver endpoints,
93+
internal static async ValueTask<AutorecoveringConnection> CreateAsync(ConnectionConfig config, IEndpointResolver endpoints, Activity? connectionActivity,
9394
CancellationToken cancellationToken)
9495
{
9596
IFrameHandler fh = await endpoints.SelectOneAsync(config.FrameHandlerFactoryAsync, cancellationToken)
9697
.ConfigureAwait(false);
98+
connectionActivity.SetNetworkTags(fh);
9799
Connection innerConnection = new(config, fh);
98100
AutorecoveringConnection connection = new(config, endpoints, innerConnection);
99101
await innerConnection.OpenAsync(cancellationToken)

projects/RabbitMQ.Client/Impl/Connection.cs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,6 @@ internal void TakeOver(Connection other)
228228
internal async ValueTask<IConnection> OpenAsync(CancellationToken cancellationToken)
229229
{
230230
cancellationToken.ThrowIfCancellationRequested();
231-
using Activity? connectionActivity = RabbitMQActivitySource.OpenConnection(_frameHandler);
232231
try
233232
{
234233
RabbitMqClientEventSource.Log.ConnectionOpened();
@@ -249,10 +248,8 @@ await _channel0.ConnectionOpenAsync(_config.VirtualHost, cancellationToken)
249248

250249
return this;
251250
}
252-
catch (Exception ex)
251+
catch (Exception)
253252
{
254-
connectionActivity?.SetStatus(ActivityStatusCode.Error);
255-
connectionActivity?.AddException(ex);
256253
try
257254
{
258255
var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "FailedOpen");

projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,18 @@ public static class RabbitMQActivitySource
6565
new KeyValuePair<string, object?>(ProtocolVersion, "0.9.1")
6666
};
6767

68-
internal static Activity? OpenConnection(IFrameHandler frameHandler)
68+
internal static Activity? OpenConnection(bool isReconnection)
6969
{
7070
Activity? connectionActivity =
7171
s_connectionSource.StartRabbitMQActivity("connection attempt", ActivityKind.Client);
72-
connectionActivity?
73-
.SetNetworkTags(frameHandler);
72+
connectionActivity?.SetTag("messaging.rabbitmq.connection.is_reconnection", isReconnection);
73+
return connectionActivity;
74+
}
75+
76+
internal static Activity? OpenTcpConnection()
77+
{
78+
Activity? connectionActivity =
79+
s_connectionSource.StartRabbitMQActivity("tcp connection attempt", ActivityKind.Client);
7480
return connectionActivity;
7581
}
7682

@@ -199,24 +205,7 @@ internal static void SetNetworkTags(this Activity? activity, IFrameHandler frame
199205
{
200206
if (activity?.IsAllDataRequested ?? false)
201207
{
202-
switch (frameHandler.RemoteEndPoint.AddressFamily)
203-
{
204-
case AddressFamily.InterNetworkV6:
205-
activity.SetTag("network.type", "ipv6");
206-
break;
207-
case AddressFamily.InterNetwork:
208-
activity.SetTag("network.type", "ipv4");
209-
break;
210-
}
211-
212-
if (!string.IsNullOrEmpty(frameHandler.Endpoint.HostName))
213-
{
214-
activity
215-
.SetTag("server.address", frameHandler.Endpoint.HostName);
216-
}
217-
218-
activity
219-
.SetTag("server.port", frameHandler.Endpoint.Port);
208+
activity.SetServerTags(frameHandler.Endpoint);
220209

221210
if (frameHandler.RemoteEndPoint is IPEndPoint ipEndpoint)
222211
{
@@ -244,6 +233,28 @@ internal static void SetNetworkTags(this Activity? activity, IFrameHandler frame
244233
}
245234
}
246235

236+
internal static void SetServerTags(this Activity activity, AmqpTcpEndpoint endpoint)
237+
{
238+
switch (endpoint.AddressFamily)
239+
{
240+
case AddressFamily.InterNetworkV6:
241+
activity.SetTag("network.type", "ipv6");
242+
break;
243+
case AddressFamily.InterNetwork:
244+
activity.SetTag("network.type", "ipv4");
245+
break;
246+
}
247+
248+
if (!string.IsNullOrEmpty(endpoint.HostName))
249+
{
250+
activity
251+
.SetTag("server.address", endpoint.HostName);
252+
}
253+
254+
activity
255+
.SetTag("server.port", endpoint.Port);
256+
}
257+
247258
private static void DefaultContextInjector(Activity sendActivity, IDictionary<string, object?> props)
248259
{
249260
DistributedContextPropagator.Current.Inject(sendActivity, props, DefaultContextSetter);
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1-
const RabbitMQ.Client.RabbitMQActivitySource.ConnectionSourceName = "RabbitMQ.Client.Connection" -> string!
1+
const RabbitMQ.Client.RabbitMQActivitySource.ConnectionSourceName = "RabbitMQ.Client.Connection" -> string!
2+
static RabbitMQ.Client.EndpointResolverExtensions.SelectOneAsync<T>(this RabbitMQ.Client.IEndpointResolver! resolver, System.Func<RabbitMQ.Client.AmqpTcpEndpoint!, System.Threading.CancellationToken, System.Threading.Tasks.Task<T>!>! selector, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task<(T, System.Diagnostics.Activity?)>!

projects/Test/Common/ActivityRecorder.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,15 @@ public static void HasTag<T>(this Activity activity, string name, T expectedValu
141141
}
142142

143143
public static void HasRecordedException(this Activity activity, Exception exception)
144+
{
145+
activity.HasRecordedException(exception.GetType().ToString());
146+
}
147+
148+
public static void HasRecordedException(this Activity activity, string exceptionTypeName)
144149
{
145150
var exceptionEvent = activity.Events.First();
146-
Assert.Equal("exception", activity.Events.First().Name);
147-
Assert.Equal(exception.GetType().ToString(),
151+
Assert.Equal("exception", exceptionEvent.Name);
152+
Assert.Equal(exceptionTypeName,
148153
exceptionEvent.Tags.SingleOrDefault(t => t.Key == "exception.type").Value);
149154
}
150155

projects/Test/Integration/TestConnectionFactory.cs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -439,30 +439,40 @@ public async Task TestCreateConnectionAsync_TruncatesWhenClientNameIsLong_GH980(
439439
[Fact]
440440
public async Task TestCreateConnectionRegisterAnActivity()
441441
{
442-
using ActivityRecorder recorder =
443-
new ActivityRecorder(RabbitMQActivitySource.ConnectionSourceName, "connection attempt");
442+
using ActivityRecorder connectionRecorder =
443+
new (RabbitMQActivitySource.ConnectionSourceName, "connection attempt");
444+
using ActivityRecorder tcpConnectionRecorder =
445+
new (RabbitMQActivitySource.ConnectionSourceName, "tcp connection attempt");
446+
tcpConnectionRecorder.VerifyParent = false;
444447
ConnectionFactory cf = CreateConnectionFactory();
445448
await using IConnection conn = await cf.CreateConnectionAsync();
446-
recorder.VerifyActivityRecordedOnce();
449+
var connectionActivity = connectionRecorder.VerifyActivityRecordedOnce();
450+
var tcpConnectionActivity = tcpConnectionRecorder.VerifyActivityRecordedOnce();
451+
Assert.Equal(connectionActivity, tcpConnectionActivity.Parent);
447452
await conn.CloseAsync();
448453
}
449454

450455
[Fact]
451456
public async Task TestCreateConnectionWithFailureRecordException()
452457
{
453458
using ActivityRecorder recorder =
454-
new ActivityRecorder(RabbitMQActivitySource.ConnectionSourceName, "connection attempt");
459+
new (RabbitMQActivitySource.ConnectionSourceName, "connection attempt");
460+
using ActivityRecorder tcpConnectionRecorder =
461+
new (RabbitMQActivitySource.ConnectionSourceName, "tcp connection attempt");
462+
tcpConnectionRecorder.VerifyParent = false;
455463
ConnectionFactory cf = CreateConnectionFactory();
456464
cf.AutomaticRecoveryEnabled = true;
457465
var unreachablePort = 1234;
458466
var ep = new AmqpTcpEndpoint("localhost", unreachablePort);
459467
var exception = await Assert.ThrowsAsync<BrokerUnreachableException>(() =>
460468
{
461-
return cf.CreateConnectionAsync(new List<AmqpTcpEndpoint> { ep });
469+
return cf.CreateConnectionAsync(new List<AmqpTcpEndpoint> {ep});
462470
});
463-
Activity activity = recorder.VerifyActivityRecordedOnce();
464-
activity.HasRecordedException(exception);
465-
activity.IsInError();
471+
Activity connectionActivity = recorder.VerifyActivityRecordedOnce();
472+
connectionActivity.HasRecordedException(exception);
473+
connectionActivity.IsInError();
474+
Activity tcpConnectionActivity = tcpConnectionRecorder.VerifyActivityRecordedOnce();
475+
tcpConnectionActivity.HasRecordedException("RabbitMQ.Client.Exceptions.ConnectFailureException");
466476
}
467477
}
468478
}

0 commit comments

Comments
 (0)