Skip to content

feat: [Internal] client-side metrics for afe latency and connectivity error #3819

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,13 @@
<method>boolean isEnableGRPCBuiltInMetrics()</method>
</difference>

<!-- Added AFE Server Timing option -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/SpannerOptions$SpannerEnvironment</className>
<method>boolean isEnableAFEServerTiming()</method>
</difference>

<!-- Added Monitoring host option -->
<difference>
<differenceType>7012</differenceType>
Expand Down Expand Up @@ -899,7 +906,7 @@
<className>com/google/cloud/spanner/connection/Connection</className>
<method>java.lang.String getDefaultSequenceKind()</method>
</difference>

<!-- Default isolation level -->
<difference>
<differenceType>7012</differenceType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.View;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -39,6 +40,9 @@ public class BuiltInMetricsConstant {
static final String SPANNER_METER_NAME = "spanner-java";
static final String GRPC_METER_NAME = "grpc-java";
static final String GFE_LATENCIES_NAME = "gfe_latencies";
static final String AFE_LATENCIES_NAME = "afe_latencies";
static final String GFE_CONNECTIVITY_ERROR_NAME = "gfe_connectivity_error_count";
static final String AFE_CONNECTIVITY_ERROR_NAME = "afe_connectivity_error_count";
static final String OPERATION_LATENCIES_NAME = "operation_latencies";
static final String ATTEMPT_LATENCIES_NAME = "attempt_latencies";
static final String OPERATION_LATENCY_NAME = "operation_latency";
Expand All @@ -52,7 +56,10 @@ public class BuiltInMetricsConstant {
ATTEMPT_LATENCIES_NAME,
OPERATION_COUNT_NAME,
ATTEMPT_COUNT_NAME,
GFE_LATENCIES_NAME)
GFE_LATENCIES_NAME,
AFE_LATENCIES_NAME,
GFE_CONNECTIVITY_ERROR_NAME,
AFE_CONNECTIVITY_ERROR_NAME)
.stream()
.map(m -> METER_NAME + '/' + m)
.collect(Collectors.toSet());
Expand Down Expand Up @@ -110,14 +117,14 @@ public class BuiltInMetricsConstant {
static final Set<String> GRPC_LB_RLS_ATTRIBUTES =
ImmutableSet.of("grpc.lb.rls.data_plane_target", "grpc.lb.pick_result");

static List<Double> BUCKET_BOUNDARIES =
ImmutableList.of(
0.0, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0,
16.0, 17.0, 18.0, 19.0, 20.0, 25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0, 160.0,
200.0, 250.0, 300.0, 400.0, 500.0, 650.0, 800.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0,
50000.0, 100000.0, 200000.0, 400000.0, 800000.0, 1600000.0, 3200000.0);
static Aggregation AGGREGATION_WITH_MILLIS_HISTOGRAM =
Aggregation.explicitBucketHistogram(
ImmutableList.of(
0.0, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0,
15.0, 16.0, 17.0, 18.0, 19.0, 20.0, 25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0,
160.0, 200.0, 250.0, 300.0, 400.0, 500.0, 650.0, 800.0, 1000.0, 2000.0, 5000.0,
10000.0, 20000.0, 50000.0, 100000.0, 200000.0, 400000.0, 800000.0, 1600000.0,
3200000.0));
Aggregation.explicitBucketHistogram(BUCKET_BOUNDARIES);

static final Collection<String> GRPC_METRICS_ENABLED_BY_DEFAULT =
ImmutableList.of(
Expand Down Expand Up @@ -145,14 +152,6 @@ static Map<InstrumentSelector, View> getAllViews() {
BuiltInMetricsConstant.AGGREGATION_WITH_MILLIS_HISTOGRAM,
InstrumentType.HISTOGRAM,
"ms");
defineView(
views,
BuiltInMetricsConstant.SPANNER_METER_NAME,
BuiltInMetricsConstant.GFE_LATENCIES_NAME,
BuiltInMetricsConstant.GFE_LATENCIES_NAME,
BuiltInMetricsConstant.AGGREGATION_WITH_MILLIS_HISTOGRAM,
InstrumentType.HISTOGRAM,
"ms");
defineView(
views,
BuiltInMetricsConstant.GAX_METER_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import java.util.Map;

Expand All @@ -35,6 +36,9 @@
class BuiltInMetricsRecorder extends OpenTelemetryMetricsRecorder {

private final DoubleHistogram gfeLatencyRecorder;
private final DoubleHistogram afeLatencyRecorder;
private final LongCounter gfeHeaderMissingCountRecorder;
private final LongCounter afeHeaderMissingCountRecorder;

/**
* Creates the following instruments for the following metrics:
Expand All @@ -59,6 +63,27 @@ class BuiltInMetricsRecorder extends OpenTelemetryMetricsRecorder {
.setDescription(
"Latency between Google's network receiving an RPC and reading back the first byte of the response")
.setUnit("ms")
.setExplicitBucketBoundariesAdvice(BuiltInMetricsConstant.BUCKET_BOUNDARIES)
.build();
this.afeLatencyRecorder =
meter
.histogramBuilder(serviceName + '/' + BuiltInMetricsConstant.AFE_LATENCIES_NAME)
.setDescription(
"Latency between Spanner API Frontend receiving an RPC and starting to write back the response.")
.setExplicitBucketBoundariesAdvice(BuiltInMetricsConstant.BUCKET_BOUNDARIES)
.setUnit("ms")
.build();
this.gfeHeaderMissingCountRecorder =
meter
.counterBuilder(serviceName + '/' + BuiltInMetricsConstant.GFE_CONNECTIVITY_ERROR_NAME)
.setDescription("Number of requests that failed to reach the Google network.")
.setUnit("1")
.build();
this.afeHeaderMissingCountRecorder =
meter
.counterBuilder(serviceName + '/' + BuiltInMetricsConstant.AFE_CONNECTIVITY_ERROR_NAME)
.setDescription("Number of requests that failed to reach the Spanner API Frontend.")
.setUnit("1")
.build();
}

Expand All @@ -69,8 +94,25 @@ class BuiltInMetricsRecorder extends OpenTelemetryMetricsRecorder {
* @param gfeLatency Attempt Latency in ms
* @param attributes Map of the attributes to store
*/
void recordGFELatency(double gfeLatency, Map<String, String> attributes) {
gfeLatencyRecorder.record(gfeLatency, toOtelAttributes(attributes));
void recordServerTimingHeaderMetrics(
Long gfeLatency,
Long afeLatency,
Long gfeHeaderMissingCount,
Long afeHeaderMissingCount,
Map<String, String> attributes) {
io.opentelemetry.api.common.Attributes otelAttributes = toOtelAttributes(attributes);
if (gfeLatency != null) {
gfeLatencyRecorder.record(gfeLatency, otelAttributes);
}
if (gfeHeaderMissingCount > 0) {
gfeHeaderMissingCountRecorder.add(gfeHeaderMissingCount, otelAttributes);
}
if (afeLatency != null) {
afeLatencyRecorder.record(afeLatency, otelAttributes);
}
if (afeHeaderMissingCount > 0) {
afeHeaderMissingCountRecorder.add(afeHeaderMissingCount, otelAttributes);
}
}

Attributes toOtelAttributes(Map<String, String> attributes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ class BuiltInMetricsTracer extends MetricsTracer implements ApiTracer {
private final BuiltInMetricsRecorder builtInOpenTelemetryMetricsRecorder;
// These are RPC specific attributes and pertain to a specific API Trace
private final Map<String, String> attributes = new HashMap<>();

private Long gfeLatency = null;
private Long afeLatency = null;
private long gfeHeaderMissingCount = 0;
private long afeHeaderMissingCount = 0;

BuiltInMetricsTracer(
MethodName methodName, BuiltInMetricsRecorder builtInOpenTelemetryMetricsRecorder) {
Expand All @@ -54,10 +56,9 @@ class BuiltInMetricsTracer extends MetricsTracer implements ApiTracer {
@Override
public void attemptSucceeded() {
super.attemptSucceeded();
if (gfeLatency != null) {
attributes.put(STATUS_ATTRIBUTE, StatusCode.Code.OK.toString());
builtInOpenTelemetryMetricsRecorder.recordGFELatency(gfeLatency, attributes);
}
attributes.put(STATUS_ATTRIBUTE, StatusCode.Code.OK.toString());
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
}

/**
Expand All @@ -67,10 +68,9 @@ public void attemptSucceeded() {
@Override
public void attemptCancelled() {
super.attemptCancelled();
if (gfeLatency != null) {
attributes.put(STATUS_ATTRIBUTE, StatusCode.Code.CANCELLED.toString());
builtInOpenTelemetryMetricsRecorder.recordGFELatency(gfeLatency, attributes);
}
attributes.put(STATUS_ATTRIBUTE, StatusCode.Code.CANCELLED.toString());
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
}

/**
Expand All @@ -84,10 +84,9 @@ public void attemptCancelled() {
@Override
public void attemptFailedDuration(Throwable error, java.time.Duration delay) {
super.attemptFailedDuration(error, delay);
if (gfeLatency != null) {
attributes.put(STATUS_ATTRIBUTE, extractStatus(error));
builtInOpenTelemetryMetricsRecorder.recordGFELatency(gfeLatency, attributes);
}
attributes.put(STATUS_ATTRIBUTE, extractStatus(error));
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
}

/**
Expand All @@ -100,10 +99,9 @@ public void attemptFailedDuration(Throwable error, java.time.Duration delay) {
@Override
public void attemptFailedRetriesExhausted(Throwable error) {
super.attemptFailedRetriesExhausted(error);
if (gfeLatency != null) {
attributes.put(STATUS_ATTRIBUTE, extractStatus(error));
builtInOpenTelemetryMetricsRecorder.recordGFELatency(gfeLatency, attributes);
}
attributes.put(STATUS_ATTRIBUTE, extractStatus(error));
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
}

/**
Expand All @@ -116,16 +114,27 @@ public void attemptFailedRetriesExhausted(Throwable error) {
@Override
public void attemptPermanentFailure(Throwable error) {
super.attemptPermanentFailure(error);
if (gfeLatency != null) {
attributes.put(STATUS_ATTRIBUTE, extractStatus(error));
builtInOpenTelemetryMetricsRecorder.recordGFELatency(gfeLatency, attributes);
}
attributes.put(STATUS_ATTRIBUTE, extractStatus(error));
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
}

void recordGFELatency(Long gfeLatency) {
this.gfeLatency = gfeLatency;
}

void recordAFELatency(Long afeLatency) {
this.afeLatency = afeLatency;
}

void recordGfeHeaderMissingCount(Long value) {
this.gfeHeaderMissingCount = value;
}

void recordAfeHeaderMissingCount(Long value) {
this.afeHeaderMissingCount = value;
}

@Override
public void addAttributes(Map<String, String> attributes) {
super.addAttributes(attributes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,28 @@ public void recordGFELatency(Long gfeLatency) {
}
}
}

public void recordGfeHeaderMissingCount(Long value) {
for (ApiTracer child : children) {
if (child instanceof BuiltInMetricsTracer) {
((BuiltInMetricsTracer) child).recordGfeHeaderMissingCount(value);
}
}
}

public void recordAFELatency(Long afeLatency) {
for (ApiTracer child : children) {
if (child instanceof BuiltInMetricsTracer) {
((BuiltInMetricsTracer) child).recordAFELatency(afeLatency);
}
}
}

public void recordAfeHeaderMissingCount(Long value) {
for (ApiTracer child : children) {
if (child instanceof BuiltInMetricsTracer) {
((BuiltInMetricsTracer) child).recordAfeHeaderMissingCount(value);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,10 @@ private static boolean isEmulatorEnabled(SpannerOptions options, String emulator
&& options.getHost().endsWith(emulatorHost);
}

public static boolean isEnableAFEServerTiming() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why are we not using SpannerOptions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature can only be disabled using ENV .

return "false".equalsIgnoreCase(System.getenv("SPANNER_DISABLE_AFE_SERVER_TIMING"));
}

private static final RetrySettings ADMIN_REQUESTS_LIMIT_EXCEEDED_RETRY_SETTINGS =
RetrySettings.newBuilder()
.setInitialRetryDelayDuration(Duration.ofSeconds(5L))
Expand Down Expand Up @@ -1993,6 +1997,9 @@ private GrpcCallContext createBaseCallContext() {
if (endToEndTracingEnabled) {
context = context.withExtraHeaders(metadataProvider.newEndToEndTracingHeader());
}
if (isEnableAFEServerTiming()) {
context = context.withExtraHeaders(metadataProvider.newAfeServerTimingHeader());
}
return context
.withStreamWaitTimeoutDuration(waitTimeout)
.withStreamIdleTimeoutDuration(idleTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class HeaderInterceptor implements ClientInterceptor {
private static final Metadata.Key<String> SERVER_TIMING_HEADER_KEY =
Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER);
private static final String GFE_TIMING_HEADER = "gfet4t7";
private static final String AFE_TIMING_HEADER = "afe";
private static final Metadata.Key<String> GOOGLE_CLOUD_RESOURCE_PREFIX_KEY =
Metadata.Key.of("google-cloud-resource-prefix", Metadata.ASCII_STRING_MARSHALLER);
private static final Pattern SERVER_TIMING_PATTERN =
Expand Down Expand Up @@ -174,13 +175,25 @@ private void processHeader(
if (compositeTracer != null) {
compositeTracer.recordGFELatency(gfeLatency);
}

if (span != null) {
span.setAttribute("gfe_latency", String.valueOf(gfeLatency));
}
} else {
measureMap.put(SPANNER_GFE_HEADER_MISSING_COUNT, 1L).record(tagContext);
spannerRpcMetrics.recordGfeHeaderMissingCount(1L, attributes);
if (compositeTracer != null) {
compositeTracer.recordGfeHeaderMissingCount(1L);
}
}

// Record AFE metrics
if (compositeTracer != null && GapicSpannerRpc.isEnableAFEServerTiming()) {
if (serverTimingMetrics.containsKey(AFE_TIMING_HEADER)) {
long afeLatency = serverTimingMetrics.get(AFE_TIMING_HEADER);
compositeTracer.recordAFELatency(afeLatency);
} else {
compositeTracer.recordAfeHeaderMissingCount(1L);
}
}
} catch (NumberFormatException e) {
LOGGER.log(LEVEL, "Invalid server-timing object in header: {}", serverTiming);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class SpannerMetadataProvider {
private final String resourceHeaderKey;
private static final String ROUTE_TO_LEADER_HEADER_KEY = "x-goog-spanner-route-to-leader";
private static final String END_TO_END_TRACING_HEADER_KEY = "x-goog-spanner-end-to-end-tracing";
private static final String AFE_SERVER_TIMING_HEADER_KEY =
"x-goog-spanner-enable-afe-server-timing";
private static final Pattern[] RESOURCE_TOKEN_PATTERNS = {
Pattern.compile("^(?<headerValue>projects/[^/]*/instances/[^/]*/databases/[^/]*)(.*)?"),
Pattern.compile("^(?<headerValue>projects/[^/]*/instances/[^/]*)(.*)?")
Expand All @@ -47,6 +49,8 @@ class SpannerMetadataProvider {
ImmutableMap.of(ROUTE_TO_LEADER_HEADER_KEY, Collections.singletonList("true"));
private static final Map<String, List<String>> END_TO_END_TRACING_HEADER_MAP =
ImmutableMap.of(END_TO_END_TRACING_HEADER_KEY, Collections.singletonList("true"));
private static final Map<String, List<String>> AFE_SERVER_TIMING_HEADER_MAP =
ImmutableMap.of(AFE_SERVER_TIMING_HEADER_KEY, Collections.singletonList("true"));

private SpannerMetadataProvider(Map<String, String> headers, String resourceHeaderKey) {
this.resourceHeaderKey = resourceHeaderKey;
Expand Down Expand Up @@ -96,6 +100,10 @@ Map<String, List<String>> newEndToEndTracingHeader() {
return END_TO_END_TRACING_HEADER_MAP;
}

Map<String, List<String>> newAfeServerTimingHeader() {
return AFE_SERVER_TIMING_HEADER_MAP;
}

private Map<Metadata.Key<String>, String> constructHeadersAsMetadata(
Map<String, String> headers) {
ImmutableMap.Builder<Metadata.Key<String>, String> headersAsMetadataBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ abstract class AbstractNettyMockServerTest {
protected static AtomicInteger fakeServerTiming =
new AtomicInteger(new Random().nextInt(1000) + 1);

protected static AtomicInteger fakeAFEServerTiming =
new AtomicInteger(new Random().nextInt(500) + 1);

protected Spanner spanner;

@BeforeClass
Expand All @@ -72,7 +75,9 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
public void sendHeaders(Metadata headers) {
headers.put(
Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER),
String.format("gfet4t7; dur=%d", fakeServerTiming.get()));
String.format(
"afe; dur=%d, gfet4t7; dur=%d",
fakeAFEServerTiming.get(), fakeServerTiming.get()));
super.sendHeaders(headers);
}
},
Expand Down
Loading
Loading