Skip to content

feat: add activity on connection #1734

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions projects/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
<PackageVersion Include="EasyNetQ.Management.Client" Version="3.0.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
<PackageVersion Include="Nullable" Version="1.3.1" />
<PackageVersion Include="OpenTelemetry.Api" Version="1.9.0" />
<PackageVersion Include="OpenTelemetry.Api" Version="1.11.2" />
<PackageVersion Include="OpenTelemetry.Exporter.InMemory" Version="1.9.0" />
<PackageVersion Include="System.Collections.Immutable" Version="8.0.0" />
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="9.0.4" />
<!--
Note: do NOT upgrade the System.IO.Pipelines dependency unless necessary
See https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1481#pullrequestreview-1847905299
Expand All @@ -33,7 +34,6 @@
* https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1481#pullrequestreview-1847905299
* https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1594
-->
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="8.0.1" />
<PackageVersion Include="System.Memory" Version="4.5.5" />
<PackageVersion Include="System.Threading.Channels" Version="8.0.0" />
<PackageVersion Include="System.Net.Http.Json" Version="8.0.1" />
Expand All @@ -44,4 +44,4 @@
<GlobalPackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
<GlobalPackageReference Include="MinVer" Version="6.0.0" />
</ItemGroup>
</Project>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

<ItemGroup>
<PackageReference Include="OpenTelemetry.Api" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" />
</ItemGroup>

<ItemGroup Condition="$(TargetFramework) == 'netstandard2.0'">
Expand Down
15 changes: 13 additions & 2 deletions projects/RabbitMQ.Client/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Net.Security;
Expand Down Expand Up @@ -544,24 +545,31 @@ public async Task<IConnection> CreateConnectionAsync(IEndpointResolver endpointR
CancellationToken cancellationToken = default)
{
ConnectionConfig config = CreateConfig(clientProvidedName);
using Activity? connectionActivity = RabbitMQActivitySource.OpenConnection(false);
try
{
if (AutomaticRecoveryEnabled)
{
return await AutorecoveringConnection.CreateAsync(config, endpointResolver, cancellationToken)
connectionActivity?.SetTag("messaging.rabbitmq.connection.automatic_recovery", true);
return await AutorecoveringConnection.CreateAsync(config, endpointResolver, connectionActivity, cancellationToken)
.ConfigureAwait(false);
}
else
{

connectionActivity?.SetTag("messaging.rabbitmq.connection.automatic_recovery", false);
IFrameHandler frameHandler = await endpointResolver.SelectOneAsync(CreateFrameHandlerAsync, cancellationToken)
.ConfigureAwait(false);
connectionActivity.SetNetworkTags(frameHandler);
var c = new Connection(config, frameHandler);
return await c.OpenAsync(cancellationToken)
.ConfigureAwait(false);
}
}
catch (OperationCanceledException ex)
{
connectionActivity?.SetStatus(ActivityStatusCode.Error);
connectionActivity?.AddException(ex);
if (cancellationToken.IsCancellationRequested)
{
throw;
Expand All @@ -573,7 +581,10 @@ public async Task<IConnection> CreateConnectionAsync(IEndpointResolver endpointR
}
catch (Exception ex)
{
throw new BrokerUnreachableException(ex);
var brokerUnreachableException = new BrokerUnreachableException(ex);
connectionActivity?.SetStatus(ActivityStatusCode.Error);
connectionActivity?.AddException(brokerUnreachableException);
throw brokerUnreachableException;
}
}

Expand Down
30 changes: 10 additions & 20 deletions projects/RabbitMQ.Client/IEndpointResolverExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,49 +41,39 @@ public static class EndpointResolverExtensions
public static async Task<T> SelectOneAsync<T>(this IEndpointResolver resolver,
Func<AmqpTcpEndpoint, CancellationToken, Task<T>> selector, CancellationToken cancellationToken)
{
var t = default(T);
var exceptions = new List<Exception>();
foreach (AmqpTcpEndpoint ep in resolver.All())
{
cancellationToken.ThrowIfCancellationRequested();
using var tcpConnection = RabbitMQActivitySource.OpenTcpConnection();
tcpConnection?.SetServerTags(ep);
try
{
t = await selector(ep, cancellationToken).ConfigureAwait(false);
if (t!.Equals(default(T)) == false)
{
return t;
}
return await selector(ep, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException ex)
{
tcpConnection?.AddException(ex);
if (cancellationToken.IsCancellationRequested)
{
throw;
}
else
{
exceptions.Add(ex);
}

exceptions.Add(ex);
}
catch (Exception e)
{
tcpConnection?.AddException(e);
exceptions.Add(e);
}
}

if (EqualityComparer<T>.Default.Equals(t!, default!))
if (exceptions.Count > 0)
{
if (exceptions.Count > 0)
{
throw new AggregateException(exceptions);
}
else
{
throw new InvalidOperationException(InternalConstants.BugFound);
}
throw new AggregateException(exceptions);
}

return t!;
throw new InvalidOperationException(InternalConstants.BugFound);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -243,13 +244,13 @@ await _innerConnection.AbortAsync(Constants.InternalError, "FailedAutoRecovery",
private async ValueTask<bool> TryRecoverConnectionDelegateAsync(CancellationToken cancellationToken)
{
Connection? maybeNewInnerConnection = null;
using Activity? connectionActivity = RabbitMQActivitySource.OpenConnection(true);
try
{
Connection defunctConnection = _innerConnection;

IFrameHandler fh = await _endpoints.SelectOneAsync(_config.FrameHandlerFactoryAsync, cancellationToken)
.ConfigureAwait(false);

connectionActivity?.SetNetworkTags(fh);
maybeNewInnerConnection = new Connection(_config, fh);

await maybeNewInnerConnection.OpenAsync(cancellationToken)
Expand All @@ -267,6 +268,8 @@ await maybeNewInnerConnection.OpenAsync(cancellationToken)
}
catch (Exception e)
{
connectionActivity?.AddException(e);
connectionActivity?.SetStatus(ActivityStatusCode.Error);
ESLog.Error("Connection recovery exception.", e);
// Trigger recovery error events
if (!_connectionRecoveryErrorAsyncWrapper.IsEmpty)
Expand Down
4 changes: 3 additions & 1 deletion projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading;
Expand Down Expand Up @@ -89,11 +90,12 @@ Task onExceptionAsync(Exception exception, string context, CancellationToken can
_innerConnection.OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken));
}

internal static async ValueTask<AutorecoveringConnection> CreateAsync(ConnectionConfig config, IEndpointResolver endpoints,
internal static async ValueTask<AutorecoveringConnection> CreateAsync(ConnectionConfig config, IEndpointResolver endpoints, Activity? connectionActivity,
CancellationToken cancellationToken)
{
IFrameHandler fh = await endpoints.SelectOneAsync(config.FrameHandlerFactoryAsync, cancellationToken)
.ConfigureAwait(false);
connectionActivity.SetNetworkTags(fh);
Connection innerConnection = new(config, fh);
AutorecoveringConnection connection = new(config, endpoints, innerConnection);
await innerConnection.OpenAsync(cancellationToken)
Expand Down
8 changes: 2 additions & 6 deletions projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)

var cmd = new BasicPublish(exchange, routingKey, mandatory, default);

using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length)
: default;
using Activity? sendActivity = RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length);

ulong publishSequenceNumber = 0;
if (publisherConfirmationInfo is not null)
Expand Down Expand Up @@ -115,9 +113,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)

var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);

using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length)
: default;
using Activity? sendActivity = RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length);

ulong publishSequenceNumber = 0;
if (publisherConfirmationInfo is not null)
Expand Down
4 changes: 1 addition & 3 deletions projects/RabbitMQ.Client/Impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,9 @@ internal void TakeOver(Connection other)
internal async ValueTask<IConnection> OpenAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

try
{
RabbitMqClientEventSource.Log.ConnectionOpened();

cancellationToken.ThrowIfCancellationRequested();

// Note: this must happen *after* the frame handler is started
Expand All @@ -250,7 +248,7 @@ await _channel0.ConnectionOpenAsync(_config.VirtualHost, cancellationToken)

return this;
}
catch
catch (Exception)
{
try
{
Expand Down
70 changes: 36 additions & 34 deletions projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,20 @@ public static class RabbitMQActivitySource
private static readonly ActivitySource s_subscriberSource =
new ActivitySource(SubscriberSourceName, AssemblyVersion);

private static readonly ActivitySource s_connectionSource =
new ActivitySource(ConnectionSourceName, AssemblyVersion);

public const string PublisherSourceName = "RabbitMQ.Client.Publisher";
public const string SubscriberSourceName = "RabbitMQ.Client.Subscriber";
public const string ConnectionSourceName = "RabbitMQ.Client.Connection";

public static Action<Activity, IDictionary<string, object?>> ContextInjector { get; set; } = DefaultContextInjector;
public static Action<Activity, IDictionary<string, object?>> ContextInjector { get; set; } =
DefaultContextInjector;

public static Func<IReadOnlyBasicProperties, ActivityContext> ContextExtractor { get; set; } =
DefaultContextExtractor;

public static bool UseRoutingKeyAsOperationName { get; set; } = true;
internal static bool PublisherHasListeners => s_publisherSource.HasListeners();

internal static readonly IEnumerable<KeyValuePair<string, object?>> CreationTags = new[]
{
Expand All @@ -61,14 +65,24 @@ public static class RabbitMQActivitySource
new KeyValuePair<string, object?>(ProtocolVersion, "0.9.1")
};

internal static Activity? OpenConnection(bool isReconnection)
{
Activity? connectionActivity =
s_connectionSource.StartRabbitMQActivity("connection attempt", ActivityKind.Client);
connectionActivity?.SetTag("messaging.rabbitmq.connection.is_reconnection", isReconnection);
return connectionActivity;
}

internal static Activity? OpenTcpConnection()
{
Activity? connectionActivity =
s_connectionSource.StartRabbitMQActivity("tcp connection attempt", ActivityKind.Client);
return connectionActivity;
}

internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize,
ActivityContext linkedContext = default)
{
if (!s_publisherSource.HasListeners())
{
return null;
}

Activity? activity = linkedContext == default
? s_publisherSource.StartRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicPublish} {routingKey}" : MessagingOperationNameBasicPublish,
Expand All @@ -82,16 +96,10 @@ public static class RabbitMQActivitySource
}

return activity;

}

internal static Activity? BasicGetEmpty(string queue)
{
if (!s_subscriberSource.HasListeners())
{
return null;
}

Activity? activity = s_subscriberSource.StartRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicGetEmpty} {queue}" : MessagingOperationNameBasicGetEmpty,
ActivityKind.Consumer);
Expand All @@ -109,11 +117,6 @@ public static class RabbitMQActivitySource
internal static Activity? BasicGet(string routingKey, string exchange, ulong deliveryTag,
IReadOnlyBasicProperties readOnlyBasicProperties, int bodySize)
{
if (!s_subscriberSource.HasListeners())
{
return null;
}

// Extract the PropagationContext of the upstream parent from the message headers.
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicGet} {routingKey}" : MessagingOperationNameBasicGet, ActivityKind.Consumer,
Expand All @@ -130,11 +133,6 @@ public static class RabbitMQActivitySource
internal static Activity? Deliver(string routingKey, string exchange, ulong deliveryTag,
IReadOnlyBasicProperties basicProperties, int bodySize)
{
if (!s_subscriberSource.HasListeners())
{
return null;
}

// Extract the PropagationContext of the upstream parent from the message headers.
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicDeliver} {routingKey}" : MessagingOperationNameBasicDeliver,
Expand Down Expand Up @@ -197,15 +195,15 @@ private static void PopulateMessagingTags(string operationType, string operation

internal static void PopulateMessageEnvelopeSize(Activity? activity, int size)
{
if (activity != null && activity.IsAllDataRequested && PublisherHasListeners)
if (activity?.IsAllDataRequested ?? false)
{
activity.SetTag(MessagingEnvelopeSize, size);
}
}

internal static void SetNetworkTags(this Activity? activity, IFrameHandler frameHandler)
{
if (PublisherHasListeners && activity != null && activity.IsAllDataRequested)
if (activity?.IsAllDataRequested ?? false)
{
switch (frameHandler.RemoteEndPoint.AddressFamily)
{
Expand All @@ -216,15 +214,7 @@ internal static void SetNetworkTags(this Activity? activity, IFrameHandler frame
activity.SetTag("network.type", "ipv4");
break;
}

if (!string.IsNullOrEmpty(frameHandler.Endpoint.HostName))
{
activity
.SetTag("server.address", frameHandler.Endpoint.HostName);
}

activity
.SetTag("server.port", frameHandler.Endpoint.Port);
activity.SetServerTags(frameHandler.Endpoint);

if (frameHandler.RemoteEndPoint is IPEndPoint ipEndpoint)
{
Expand Down Expand Up @@ -252,6 +242,18 @@ internal static void SetNetworkTags(this Activity? activity, IFrameHandler frame
}
}

internal static void SetServerTags(this Activity activity, AmqpTcpEndpoint endpoint)
{
if (!string.IsNullOrEmpty(endpoint.HostName))
{
activity
.SetTag("server.address", endpoint.HostName);
}

activity
.SetTag("server.port", endpoint.Port);
}

private static void DefaultContextInjector(Activity sendActivity, IDictionary<string, object?> props)
{
DistributedContextPropagator.Current.Inject(sendActivity, props, DefaultContextSetter);
Expand Down
Loading
Loading