Skip to content

Commit 9619453

Browse files
authored
Implement grpc.lb.backend_service optional label
This completes gRFC A89. 7162d2d and fc86084 had already implemented the LB plumbing for the optional label on RPC metrics. This observes the value in OpenTelemetry and adds it to WRR metrics as well. https://github.com/grpc/proposal/blob/master/A89-backend-service-metric-label.md
1 parent 53de8a7 commit 9619453

File tree

8 files changed

+212
-21
lines changed

8 files changed

+212
-21
lines changed

api/src/main/java/io/grpc/NameResolver.java

+5
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,11 @@ public Status onResult2(ResolutionResult resolutionResult) {
275275
@Documented
276276
public @interface ResolutionResultAttr {}
277277

278+
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11989")
279+
@ResolutionResultAttr
280+
public static final Attributes.Key<String> ATTR_BACKEND_SERVICE =
281+
Attributes.Key.create("io.grpc.NameResolver.ATTR_BACKEND_SERVICE");
282+
278283
/**
279284
* Information that a {@link Factory} uses to create a {@link NameResolver}.
280285
*

opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java

+16-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.grpc.opentelemetry;
1818

1919
import static com.google.common.base.Preconditions.checkNotNull;
20+
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BACKEND_SERVICE_KEY;
2021
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
2122
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
2223
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
@@ -70,7 +71,6 @@
7071
*/
7172
final class OpenTelemetryMetricsModule {
7273
private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsModule.class.getName());
73-
private static final String LOCALITY_LABEL_NAME = "grpc.lb.locality";
7474
public static final ImmutableSet<String> DEFAULT_PER_CALL_METRICS_SET =
7575
ImmutableSet.of(
7676
"grpc.client.attempt.started",
@@ -90,14 +90,16 @@ final class OpenTelemetryMetricsModule {
9090
private final OpenTelemetryMetricsResource resource;
9191
private final Supplier<Stopwatch> stopwatchSupplier;
9292
private final boolean localityEnabled;
93+
private final boolean backendServiceEnabled;
9394
private final ImmutableList<OpenTelemetryPlugin> plugins;
9495

9596
OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
9697
OpenTelemetryMetricsResource resource, Collection<String> optionalLabels,
9798
List<OpenTelemetryPlugin> plugins) {
9899
this.resource = checkNotNull(resource, "resource");
99100
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
100-
this.localityEnabled = optionalLabels.contains(LOCALITY_LABEL_NAME);
101+
this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey());
102+
this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey());
101103
this.plugins = ImmutableList.copyOf(plugins);
102104
}
103105

@@ -162,6 +164,7 @@ private static final class ClientTracer extends ClientStreamTracer {
162164
volatile long outboundWireSize;
163165
volatile long inboundWireSize;
164166
volatile String locality;
167+
volatile String backendService;
165168
long attemptNanos;
166169
Code statusCode;
167170

@@ -206,9 +209,12 @@ public void inboundWireSize(long bytes) {
206209

207210
@Override
208211
public void addOptionalLabel(String key, String value) {
209-
if (LOCALITY_LABEL_NAME.equals(key)) {
212+
if ("grpc.lb.locality".equals(key)) {
210213
locality = value;
211214
}
215+
if ("grpc.lb.backend_service".equals(key)) {
216+
backendService = value;
217+
}
212218
}
213219

214220
@Override
@@ -248,6 +254,13 @@ void recordFinishedAttempt() {
248254
}
249255
builder.put(LOCALITY_KEY, savedLocality);
250256
}
257+
if (module.backendServiceEnabled) {
258+
String savedBackendService = backendService;
259+
if (savedBackendService == null) {
260+
savedBackendService = "";
261+
}
262+
builder.put(BACKEND_SERVICE_KEY, savedBackendService);
263+
}
251264
for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
252265
plugin.addLabels(builder);
253266
}

opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java

+3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ public final class OpenTelemetryConstants {
3333
public static final AttributeKey<String> LOCALITY_KEY =
3434
AttributeKey.stringKey("grpc.lb.locality");
3535

36+
public static final AttributeKey<String> BACKEND_SERVICE_KEY =
37+
AttributeKey.stringKey("grpc.lb.backend_service");
38+
3639
public static final List<Double> LATENCY_BUCKETS =
3740
ImmutableList.of(
3841
0d, 0.00001d, 0.00005d, 0.0001d, 0.0003d, 0.0006d, 0.0008d, 0.001d, 0.002d,

opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java

+135
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import io.grpc.opentelemetry.OpenTelemetryMetricsModule.CallAttemptsTracerFactory;
5252
import io.grpc.opentelemetry.internal.OpenTelemetryConstants;
5353
import io.grpc.testing.GrpcServerRule;
54+
import io.opentelemetry.api.common.AttributeKey;
5455
import io.opentelemetry.api.metrics.Meter;
5556
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
5657
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
@@ -1070,6 +1071,140 @@ public void clientLocalityMetrics_missing() {
10701071
point -> point.hasAttributes(clientAttributes))));
10711072
}
10721073

1074+
@Test
1075+
public void clientBackendServiceMetrics_present() {
1076+
String target = "target:///";
1077+
OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter,
1078+
enabledMetricsMap, disableDefaultMetrics);
1079+
OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule(
1080+
fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"),
1081+
emptyList());
1082+
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
1083+
new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList());
1084+
1085+
ClientStreamTracer tracer =
1086+
callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
1087+
tracer.addOptionalLabel("grpc.lb.foo", "unimportant");
1088+
tracer.addOptionalLabel("grpc.lb.backend_service", "should-be-overwritten");
1089+
tracer.addOptionalLabel("grpc.lb.backend_service", "the-moon");
1090+
tracer.addOptionalLabel("grpc.lb.foo", "thats-no-moon");
1091+
tracer.streamClosed(Status.OK);
1092+
callAttemptsTracerFactory.callEnded(Status.OK);
1093+
1094+
io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
1095+
TARGET_KEY, target,
1096+
METHOD_KEY, method.getFullMethodName());
1097+
1098+
io.opentelemetry.api.common.Attributes clientAttributes
1099+
= io.opentelemetry.api.common.Attributes.of(
1100+
TARGET_KEY, target,
1101+
METHOD_KEY, method.getFullMethodName(),
1102+
STATUS_KEY, Status.Code.OK.toString());
1103+
1104+
io.opentelemetry.api.common.Attributes clientAttributesWithBackendService
1105+
= clientAttributes.toBuilder()
1106+
.put(AttributeKey.stringKey("grpc.lb.backend_service"), "the-moon")
1107+
.build();
1108+
1109+
assertThat(openTelemetryTesting.getMetrics())
1110+
.satisfiesExactlyInAnyOrder(
1111+
metric ->
1112+
assertThat(metric)
1113+
.hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME)
1114+
.hasLongSumSatisfying(
1115+
longSum -> longSum.hasPointsSatisfying(
1116+
point -> point.hasAttributes(attributes))),
1117+
metric ->
1118+
assertThat(metric)
1119+
.hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME)
1120+
.hasHistogramSatisfying(
1121+
histogram -> histogram.hasPointsSatisfying(
1122+
point -> point.hasAttributes(clientAttributesWithBackendService))),
1123+
metric ->
1124+
assertThat(metric)
1125+
.hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE)
1126+
.hasHistogramSatisfying(
1127+
histogram -> histogram.hasPointsSatisfying(
1128+
point -> point.hasAttributes(clientAttributesWithBackendService))),
1129+
metric ->
1130+
assertThat(metric)
1131+
.hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE)
1132+
.hasHistogramSatisfying(
1133+
histogram -> histogram.hasPointsSatisfying(
1134+
point -> point.hasAttributes(clientAttributesWithBackendService))),
1135+
metric ->
1136+
assertThat(metric)
1137+
.hasName(CLIENT_CALL_DURATION)
1138+
.hasHistogramSatisfying(
1139+
histogram -> histogram.hasPointsSatisfying(
1140+
point -> point.hasAttributes(clientAttributes))));
1141+
}
1142+
1143+
@Test
1144+
public void clientBackendServiceMetrics_missing() {
1145+
String target = "target:///";
1146+
OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter,
1147+
enabledMetricsMap, disableDefaultMetrics);
1148+
OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule(
1149+
fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"),
1150+
emptyList());
1151+
OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory =
1152+
new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList());
1153+
1154+
ClientStreamTracer tracer =
1155+
callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
1156+
tracer.streamClosed(Status.OK);
1157+
callAttemptsTracerFactory.callEnded(Status.OK);
1158+
1159+
io.opentelemetry.api.common.Attributes attributes = io.opentelemetry.api.common.Attributes.of(
1160+
TARGET_KEY, target,
1161+
METHOD_KEY, method.getFullMethodName());
1162+
1163+
io.opentelemetry.api.common.Attributes clientAttributes
1164+
= io.opentelemetry.api.common.Attributes.of(
1165+
TARGET_KEY, target,
1166+
METHOD_KEY, method.getFullMethodName(),
1167+
STATUS_KEY, Status.Code.OK.toString());
1168+
1169+
io.opentelemetry.api.common.Attributes clientAttributesWithBackendService
1170+
= clientAttributes.toBuilder()
1171+
.put(AttributeKey.stringKey("grpc.lb.backend_service"), "")
1172+
.build();
1173+
1174+
assertThat(openTelemetryTesting.getMetrics())
1175+
.satisfiesExactlyInAnyOrder(
1176+
metric ->
1177+
assertThat(metric)
1178+
.hasName(CLIENT_ATTEMPT_COUNT_INSTRUMENT_NAME)
1179+
.hasLongSumSatisfying(
1180+
longSum -> longSum.hasPointsSatisfying(
1181+
point -> point.hasAttributes(attributes))),
1182+
metric ->
1183+
assertThat(metric)
1184+
.hasName(CLIENT_ATTEMPT_DURATION_INSTRUMENT_NAME)
1185+
.hasHistogramSatisfying(
1186+
histogram -> histogram.hasPointsSatisfying(
1187+
point -> point.hasAttributes(clientAttributesWithBackendService))),
1188+
metric ->
1189+
assertThat(metric)
1190+
.hasName(CLIENT_ATTEMPT_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE)
1191+
.hasHistogramSatisfying(
1192+
histogram -> histogram.hasPointsSatisfying(
1193+
point -> point.hasAttributes(clientAttributesWithBackendService))),
1194+
metric ->
1195+
assertThat(metric)
1196+
.hasName(CLIENT_ATTEMPT_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE)
1197+
.hasHistogramSatisfying(
1198+
histogram -> histogram.hasPointsSatisfying(
1199+
point -> point.hasAttributes(clientAttributesWithBackendService))),
1200+
metric ->
1201+
assertThat(metric)
1202+
.hasName(CLIENT_CALL_DURATION)
1203+
.hasHistogramSatisfying(
1204+
histogram -> histogram.hasPointsSatisfying(
1205+
point -> point.hasAttributes(clientAttributes))));
1206+
}
1207+
10731208
@Test
10741209
public void serverBasicMetrics() {
10751210
OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter,

xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.grpc.InternalLogId;
3333
import io.grpc.LoadBalancer;
3434
import io.grpc.Metadata;
35+
import io.grpc.NameResolver;
3536
import io.grpc.Status;
3637
import io.grpc.internal.ForwardingClientStreamTracer;
3738
import io.grpc.internal.GrpcUtil;
@@ -150,7 +151,9 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
150151

151152
childSwitchLb.handleResolvedAddresses(
152153
resolvedAddresses.toBuilder()
153-
.setAttributes(attributes)
154+
.setAttributes(attributes.toBuilder()
155+
.set(NameResolver.ATTR_BACKEND_SERVICE, cluster)
156+
.build())
154157
.setLoadBalancingPolicyConfig(config.childConfig)
155158
.build());
156159
return Status.OK;

xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java

+33-13
Original file line numberDiff line numberDiff line change
@@ -102,32 +102,44 @@ final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
102102
private final long infTime;
103103
private final Ticker ticker;
104104
private String locality = "";
105+
private String backendService = "";
105106
private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
106107

107108
// The metric instruments are only registered once and shared by all instances of this LB.
108109
static {
109110
MetricInstrumentRegistry metricInstrumentRegistry
110111
= MetricInstrumentRegistry.getDefaultRegistry();
111-
RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.wrr.rr_fallback",
112+
RR_FALLBACK_COUNTER = metricInstrumentRegistry.registerLongCounter(
113+
"grpc.lb.wrr.rr_fallback",
112114
"EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints "
113115
+ "with valid weight, which caused the WRR policy to fall back to RR behavior",
114-
"{update}", Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"),
116+
"{update}",
117+
Lists.newArrayList("grpc.target"),
118+
Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
115119
false);
116120
ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER = metricInstrumentRegistry.registerLongCounter(
117-
"grpc.lb.wrr.endpoint_weight_not_yet_usable", "EXPERIMENTAL. Number of endpoints "
118-
+ "from each scheduler update that don't yet have usable weight information",
119-
"{endpoint}", Lists.newArrayList("grpc.target"), Lists.newArrayList("grpc.lb.locality"),
121+
"grpc.lb.wrr.endpoint_weight_not_yet_usable",
122+
"EXPERIMENTAL. Number of endpoints from each scheduler update that don't yet have usable "
123+
+ "weight information",
124+
"{endpoint}",
125+
Lists.newArrayList("grpc.target"),
126+
Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
120127
false);
121128
ENDPOINT_WEIGHT_STALE_COUNTER = metricInstrumentRegistry.registerLongCounter(
122129
"grpc.lb.wrr.endpoint_weight_stale",
123130
"EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is "
124-
+ "older than the expiration period", "{endpoint}", Lists.newArrayList("grpc.target"),
125-
Lists.newArrayList("grpc.lb.locality"), false);
131+
+ "older than the expiration period",
132+
"{endpoint}",
133+
Lists.newArrayList("grpc.target"),
134+
Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
135+
false);
126136
ENDPOINT_WEIGHTS_HISTOGRAM = metricInstrumentRegistry.registerDoubleHistogram(
127137
"grpc.lb.wrr.endpoint_weights",
128138
"EXPERIMENTAL. The histogram buckets will be endpoint weight ranges.",
129-
"{weight}", Lists.newArrayList(), Lists.newArrayList("grpc.target"),
130-
Lists.newArrayList("grpc.lb.locality"),
139+
"{weight}",
140+
Lists.newArrayList(),
141+
Lists.newArrayList("grpc.target"),
142+
Lists.newArrayList("grpc.lb.locality", "grpc.lb.backend_service"),
131143
false);
132144
}
133145

@@ -168,6 +180,13 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
168180
} else {
169181
this.locality = "";
170182
}
183+
String backendService
184+
= resolvedAddresses.getAttributes().get(NameResolver.ATTR_BACKEND_SERVICE);
185+
if (backendService != null) {
186+
this.backendService = backendService;
187+
} else {
188+
this.backendService = "";
189+
}
171190
config =
172191
(WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
173192

@@ -232,26 +251,27 @@ private void updateWeight(WeightedRoundRobinPicker picker) {
232251
helper.getMetricRecorder()
233252
.recordDoubleHistogram(ENDPOINT_WEIGHTS_HISTOGRAM, newWeight,
234253
ImmutableList.of(helper.getChannelTarget()),
235-
ImmutableList.of(locality));
254+
ImmutableList.of(locality, backendService));
236255
newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f;
237256
}
238257

239258
if (staleEndpoints.get() > 0) {
240259
helper.getMetricRecorder()
241260
.addLongCounter(ENDPOINT_WEIGHT_STALE_COUNTER, staleEndpoints.get(),
242261
ImmutableList.of(helper.getChannelTarget()),
243-
ImmutableList.of(locality));
262+
ImmutableList.of(locality, backendService));
244263
}
245264
if (notYetUsableEndpoints.get() > 0) {
246265
helper.getMetricRecorder()
247266
.addLongCounter(ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER, notYetUsableEndpoints.get(),
248-
ImmutableList.of(helper.getChannelTarget()), ImmutableList.of(locality));
267+
ImmutableList.of(helper.getChannelTarget()),
268+
ImmutableList.of(locality, backendService));
249269
}
250270
boolean weightsEffective = picker.updateWeight(newWeights);
251271
if (!weightsEffective) {
252272
helper.getMetricRecorder()
253273
.addLongCounter(RR_FALLBACK_COUNTER, 1, ImmutableList.of(helper.getChannelTarget()),
254-
ImmutableList.of(locality));
274+
ImmutableList.of(locality, backendService));
255275
}
256276
}
257277

xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import io.grpc.LoadBalancerRegistry;
5151
import io.grpc.ManagedChannel;
5252
import io.grpc.Metadata;
53+
import io.grpc.NameResolver;
5354
import io.grpc.Status;
5455
import io.grpc.Status.Code;
5556
import io.grpc.SynchronizationContext;
@@ -198,6 +199,7 @@ public void handleResolvedAddresses_propagateToChildPolicy() {
198199
assertThat(childBalancer.config).isSameInstanceAs(weightedTargetConfig);
199200
assertThat(childBalancer.attributes.get(XdsAttributes.XDS_CLIENT_POOL))
200201
.isSameInstanceAs(xdsClientPool);
202+
assertThat(childBalancer.attributes.get(NameResolver.ATTR_BACKEND_SERVICE)).isEqualTo(CLUSTER);
201203
}
202204

203205
/**

0 commit comments

Comments
 (0)