Skip to content

PoC: Otel metrics #1807

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
package com.google.api.gax.grpc;

import com.google.api.core.InternalApi;
import com.google.api.gax.tracing.ClientMetricsTracer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -80,11 +81,27 @@ class ChannelPool extends ManagedChannel {
private final AtomicInteger indexTicker = new AtomicInteger();
private final String authority;

private ClientMetricsTracer clientMetricsTracer;

static ChannelPool create(ChannelPoolSettings settings, ChannelFactory channelFactory)
throws IOException {
return new ChannelPool(settings, channelFactory, Executors.newSingleThreadScheduledExecutor());
}

static ChannelPool create(
ChannelPoolSettings settings,
ChannelFactory channelFactory,
ClientMetricsTracer clientMetricsTracer)
throws IOException {
ChannelPool channelPool =
new ChannelPool(settings, channelFactory, Executors.newSingleThreadScheduledExecutor());
channelPool.clientMetricsTracer = clientMetricsTracer;
if (channelPool.clientMetricsTracer != null) {
channelPool.clientMetricsTracer.recordCurrentChannelSize(settings.getInitialChannelCount());
}
return channelPool;
}

/**
* Initializes the channel pool. Assumes that all channels have the same authority.
*
Expand Down Expand Up @@ -302,6 +319,7 @@ void resize() {

shrink(dampenedTarget);
}
clientMetricsTracer.recordCurrentChannelSize(entries.get().size());
}

/** Not threadsafe, must be called under the entryWriteLock monitor */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
*/
@BetaApi("Reference ApiCallContext instead - this class is likely to experience breaking changes")
public final class GrpcCallContext implements ApiCallContext {
static final CallOptions.Key<ApiTracer> TRACER_KEY = CallOptions.Key.create("gax.tracer");
public static final CallOptions.Key<ApiTracer> TRACER_KEY = CallOptions.Key.create("gax.tracer");

private final Channel channel;
private final CallOptions callOptions;
Expand Down Expand Up @@ -504,7 +504,10 @@ public ApiTracer getTracer() {
@Override
public GrpcCallContext withTracer(@Nonnull ApiTracer tracer) {
Preconditions.checkNotNull(tracer);
return withCallOptions(callOptions.withOption(TRACER_KEY, tracer));
return withCallOptions(
callOptions
.withOption(TRACER_KEY, tracer)
.withStreamTracerFactory(new GrpcStreamTracer.Factory(tracer)));
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,20 @@
package com.google.api.gax.grpc;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.ListenableFutureToApiFuture;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.ApiTracer;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.ClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.stub.ClientCalls;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* {@code GrpcDirectCallable} creates gRPC calls.
Expand All @@ -56,18 +63,71 @@ class GrpcDirectCallable<RequestT, ResponseT> extends UnaryCallable<RequestT, Re
public ApiFuture<ResponseT> futureCall(RequestT request, ApiCallContext inputContext) {
Preconditions.checkNotNull(request);
Preconditions.checkNotNull(inputContext);

ClientCall<RequestT, ResponseT> clientCall = GrpcClientCalls.newCall(descriptor, inputContext);

final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata();
GrpcCallContext grpcCallContext = responseMetadata.addHandlers(inputContext);
ClientCall<RequestT, ResponseT> clientCall =
GrpcClientCalls.newCall(descriptor, grpcCallContext);
GfeUnaryCallback<ResponseT> callback =
new GfeUnaryCallback<ResponseT>(inputContext.getTracer(), responseMetadata);
ApiFuture<ResponseT> future;
if (awaitTrailers) {
return new ListenableFutureToApiFuture<>(ClientCalls.futureUnaryCall(clientCall, request));
future = new ListenableFutureToApiFuture<>(ClientCalls.futureUnaryCall(clientCall, request));
} else {
return GrpcClientCalls.eagerFutureUnaryCall(clientCall, request);
future = GrpcClientCalls.eagerFutureUnaryCall(clientCall, request);
}
ApiFutures.addCallback(future, callback, MoreExecutors.directExecutor());
return future;
}

@Override
public String toString() {
return String.format("direct(%s)", descriptor);
}

private static final Metadata.Key<String> SERVER_TIMING_HEADER_KEY =
Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER);

private static final Pattern SERVER_TIMING_HEADER_PATTERN = Pattern.compile(".*dur=(?<dur>\\d+)");

static class GfeUnaryCallback<ResponseT> implements ApiFutureCallback<ResponseT> {

private final ApiTracer tracer;
private final GrpcResponseMetadata responseMetadata;

GfeUnaryCallback(ApiTracer tracer, GrpcResponseMetadata responseMetadata) {
this.tracer = tracer;
this.responseMetadata = responseMetadata;
}

@Override
public void onFailure(Throwable throwable) {
// Util.recordMetricsFromMetadata(responseMetadata, tracer, throwable);
}

@Override
public void onSuccess(ResponseT response) {
Metadata metadata = responseMetadata.getMetadata();
if (metadata == null) {
return;
}
String allKeys = metadata.keys().stream().reduce((a, b) -> a + ", " + b).get();
// System.out.println(
// "************************ metadata size: "
// + metadata.keys().size()
// + ", all keys: "
// + allKeys);
if (metadata.get(SERVER_TIMING_HEADER_KEY) == null) {
return;
}

String durMetadata = metadata.get(SERVER_TIMING_HEADER_KEY);
Matcher matcher = SERVER_TIMING_HEADER_PATTERN.matcher(durMetadata);
// this should always be true
if (matcher.find()) {
long latency = Long.valueOf(matcher.group("dur"));
tracer.recordGfeMetadata(latency);
}
// System.out.println("GFE metadata: " + metadata.get(SERVER_TIMING_HEADER_KEY));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.api.gax.grpc;

import com.google.api.gax.tracing.ApiTracer;
import com.google.common.base.Stopwatch;
import io.grpc.Attributes;
import io.grpc.ClientStreamTracer;
import io.grpc.Metadata;
import java.util.concurrent.TimeUnit;

/**
* Records the time a request is enqueued in a grpc channel queue. Its primary purpose is to measure
* the transition time between asking gRPC to start an RPC and gRPC actually serializing that RPC.
*/
class GrpcStreamTracer extends ClientStreamTracer {

private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private final ApiTracer tracer;

public GrpcStreamTracer(ApiTracer tracer) {
this.tracer = tracer;
stopwatch.start();
}

@Override
public void createPendingStream() {
tracer.grpcTargetResolutionDelay(stopwatch.elapsed(TimeUnit.NANOSECONDS));
stopwatch.reset();
stopwatch.start();
}

@Override
public void streamCreated(Attributes transportAttrs, Metadata headers) {
tracer.grpcChannelReadinessDelay(stopwatch.elapsed(TimeUnit.NANOSECONDS));
stopwatch.reset();
stopwatch.start();
}

@Override
public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
tracer.grpcCallSendDelay(stopwatch.elapsed(TimeUnit.NANOSECONDS));
}

static class Factory extends ClientStreamTracer.Factory {

private final ApiTracer tracer;

Factory(ApiTracer tracer) {
this.tracer = tracer;
}

@Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
return new GrpcStreamTracer(tracer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.internal.EnvironmentProvider;
import com.google.api.gax.rpc.mtls.MtlsProvider;
import com.google.api.gax.tracing.ClientMetricsTracer;
import com.google.auth.Credentials;
import com.google.auth.oauth2.ComputeEngineCredentials;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -117,6 +118,7 @@ public final class InstantiatingGrpcChannelProvider implements TransportChannelP
@Nullable private final Boolean allowNonDefaultServiceAccount;
@VisibleForTesting final ImmutableMap<String, ?> directPathServiceConfig;
@Nullable private final MtlsProvider mtlsProvider;
@Nullable private ClientMetricsTracer clientMetricsTracer;

@Nullable
private final ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> channelConfigurator;
Expand Down Expand Up @@ -184,6 +186,11 @@ public String getTransportName() {
return GrpcTransportChannel.getGrpcTransportName();
}

@Override
public void setClientMetricsTracer(ClientMetricsTracer clientMetricsTracer) {
this.clientMetricsTracer = clientMetricsTracer;
}

@Override
public boolean needsEndpoint() {
return endpoint == null;
Expand Down Expand Up @@ -241,7 +248,9 @@ public TransportChannel getTransportChannel() throws IOException {
private TransportChannel createChannel() throws IOException {
return GrpcTransportChannel.create(
ChannelPool.create(
channelPoolSettings, InstantiatingGrpcChannelProvider.this::createSingleChannel));
channelPoolSettings,
InstantiatingGrpcChannelProvider.this::createSingleChannel,
clientMetricsTracer));
}

private boolean isDirectPathEnabled() {
Expand Down
12 changes: 12 additions & 0 deletions gax-java/gax/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@
<artifactId>graal-sdk</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ void handleAttempt(Throwable throwable, ResponseT response) {
}
super.setException(throwable);
} else {
tracer.attemptSucceeded();
tracer.attemptSucceeded(response);
tracer.retryCount(attemptSettings.getAttemptCount());
super.set(response);
}
} catch (CancellationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.api.gax.rpc.internal.QuotaProjectIdHidingCredentials;
import com.google.api.gax.tracing.ApiTracerFactory;
import com.google.api.gax.tracing.BaseApiTracerFactory;
import com.google.api.gax.tracing.ClientMetricsTracer;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GdchCredentials;
import com.google.auto.value.AutoValue;
Expand Down Expand Up @@ -223,6 +224,9 @@ public static ClientContext create(StubSettings settings) throws IOException {
if (transportChannelProvider.needsEndpoint()) {
transportChannelProvider = transportChannelProvider.withEndpoint(endpoint);
}
ClientMetricsTracer clientMetricsTracer = settings.getTracerFactory().newClientMetricsTracer();
transportChannelProvider.setClientMetricsTracer(clientMetricsTracer);

TransportChannel transportChannel = transportChannelProvider.getTransportChannel();

ApiCallContext defaultCallContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.tracing.ClientMetricsTracer;
import com.google.auth.Credentials;
import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -143,6 +144,8 @@ public interface TransportChannelProvider {
*/
String getTransportName();

default void setClientMetricsTracer(ClientMetricsTracer clientMetricsTracer) {};

/**
* User set custom endpoint for the Transport Channel Provider
*
Expand Down
Loading