Skip to content

Commit b9e9734

Browse files
committed
chore(x-goog-spanner-request-id): plumb for BatchCreateSessions
This change plumbs x-goog-spanner-request-id into BatchCreateSessions and asserts that the header is present for that method. Updates #3537
1 parent e97b92e commit b9e9734

File tree

10 files changed

+517
-22
lines changed

10 files changed

+517
-22
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

+74-7
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,14 @@
2323
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
2424
import com.google.cloud.spanner.SpannerImpl.ClosedException;
2525
import com.google.common.annotations.VisibleForTesting;
26-
import com.google.common.base.Function;
2726
import com.google.common.util.concurrent.ListenableFuture;
2827
import com.google.spanner.v1.BatchWriteResponse;
2928
import io.opentelemetry.api.common.Attributes;
29+
import java.util.ArrayList;
30+
import java.util.Arrays;
31+
import java.util.Objects;
32+
import java.util.concurrent.atomic.AtomicInteger;
33+
import java.util.function.BiFunction;
3034
import javax.annotation.Nullable;
3135

3236
class DatabaseClientImpl implements DatabaseClient {
@@ -40,6 +44,8 @@ class DatabaseClientImpl implements DatabaseClient {
4044
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
4145
@VisibleForTesting final boolean useMultiplexedSessionPartitionedOps;
4246
@VisibleForTesting final boolean useMultiplexedSessionForRW;
47+
private final int dbId;
48+
private final AtomicInteger nthRequest;
4349

4450
final boolean useMultiplexedSessionBlindWrite;
4551

@@ -86,6 +92,18 @@ class DatabaseClientImpl implements DatabaseClient {
8692
this.tracer = tracer;
8793
this.useMultiplexedSessionForRW = useMultiplexedSessionForRW;
8894
this.commonAttributes = commonAttributes;
95+
96+
this.dbId = this.dbIdFromClientId(this.clientId);
97+
this.nthRequest = new AtomicInteger(0);
98+
}
99+
100+
private int dbIdFromClientId(String clientId) {
101+
int i = clientId.indexOf("-");
102+
String strWithValue = clientId.substring(i + 1);
103+
if (Objects.equals(strWithValue, "")) {
104+
strWithValue = "0";
105+
}
106+
return Integer.parseInt(strWithValue);
89107
}
90108

91109
@VisibleForTesting
@@ -159,7 +177,11 @@ public CommitResponse writeWithOptions(
159177
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
160178
return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options);
161179
}
162-
return runWithSessionRetry(session -> session.writeWithOptions(mutations, options));
180+
181+
return runWithSessionRetry(
182+
(session, reqId) -> {
183+
return session.writeWithOptions(mutations, withReqId(reqId, options));
184+
});
163185
} catch (RuntimeException e) {
164186
span.setStatus(e);
165187
throw e;
@@ -177,14 +199,23 @@ public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws Spa
177199
public CommitResponse writeAtLeastOnceWithOptions(
178200
final Iterable<Mutation> mutations, final TransactionOption... options)
179201
throws SpannerException {
202+
return doWriteAtLeastOnceWithOptions(mutations, options);
203+
}
204+
205+
private CommitResponse doWriteAtLeastOnceWithOptions(
206+
final Iterable<Mutation> mutations, final TransactionOption... options)
207+
throws SpannerException {
180208
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
181209
try (IScope s = tracer.withSpan(span)) {
182210
if (useMultiplexedSessionBlindWrite && getMultiplexedSessionDatabaseClient() != null) {
183211
return getMultiplexedSessionDatabaseClient()
184212
.writeAtLeastOnceWithOptions(mutations, options);
185213
}
214+
186215
return runWithSessionRetry(
187-
session -> session.writeAtLeastOnceWithOptions(mutations, options));
216+
(session, reqId) -> {
217+
return session.writeAtLeastOnceWithOptions(mutations, withReqId(reqId, options));
218+
});
188219
} catch (RuntimeException e) {
189220
span.setStatus(e);
190221
throw e;
@@ -193,6 +224,10 @@ public CommitResponse writeAtLeastOnceWithOptions(
193224
}
194225
}
195226

227+
private int nextNthRequest() {
228+
return this.nthRequest.incrementAndGet();
229+
}
230+
196231
@Override
197232
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
198233
final Iterable<MutationGroup> mutationGroups, final TransactionOption... options)
@@ -202,7 +237,9 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
202237
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
203238
return getMultiplexedSessionDatabaseClient().batchWriteAtLeastOnce(mutationGroups, options);
204239
}
205-
return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options));
240+
return runWithSessionRetry(
241+
(session, reqId) ->
242+
session.batchWriteAtLeastOnce(mutationGroups, withReqId(reqId, options)));
206243
} catch (RuntimeException e) {
207244
span.setStatus(e);
208245
throw e;
@@ -346,27 +383,57 @@ public long executePartitionedUpdate(final Statement stmt, final UpdateOption...
346383
return executePartitionedUpdateWithPooledSession(stmt, options);
347384
}
348385

386+
private UpdateOption[] withReqId(
387+
final XGoogSpannerRequestId reqId, final UpdateOption... options) {
388+
if (reqId == null) {
389+
return options;
390+
}
391+
ArrayList<UpdateOption> allOptions = new ArrayList(Arrays.asList(options));
392+
allOptions.add(new Options.RequestIdOption(reqId));
393+
return allOptions.toArray(new UpdateOption[0]);
394+
}
395+
396+
private TransactionOption[] withReqId(
397+
final XGoogSpannerRequestId reqId, final TransactionOption... options) {
398+
if (reqId == null) {
399+
return options;
400+
}
401+
ArrayList<TransactionOption> allOptions = new ArrayList(Arrays.asList(options));
402+
allOptions.add(new Options.RequestIdOption(reqId));
403+
return allOptions.toArray(new TransactionOption[0]);
404+
}
405+
349406
private long executePartitionedUpdateWithPooledSession(
350407
final Statement stmt, final UpdateOption... options) {
351408
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION, commonAttributes);
352409
try (IScope s = tracer.withSpan(span)) {
353-
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));
410+
return runWithSessionRetry(
411+
(session, reqId) -> {
412+
return session.executePartitionedUpdate(stmt, withReqId(reqId, options));
413+
});
354414
} catch (RuntimeException e) {
355415
span.setStatus(e);
356416
span.end();
357417
throw e;
358418
}
359419
}
360420

361-
private <T> T runWithSessionRetry(Function<Session, T> callable) {
421+
private <T> T runWithSessionRetry(BiFunction<Session, XGoogSpannerRequestId, T> callable) {
362422
PooledSessionFuture session = getSession();
423+
XGoogSpannerRequestId reqId =
424+
XGoogSpannerRequestId.of(
425+
this.dbId, Long.valueOf(session.getChannel()), this.nextNthRequest(), 0);
363426
while (true) {
364427
try {
365-
return callable.apply(session);
428+
reqId.incrementAttempt();
429+
return callable.apply(session, reqId);
366430
} catch (SessionNotFoundException e) {
367431
session =
368432
(PooledSessionFuture)
369433
pool.getPooledSessionReplacementHandler().replaceSession(e, session);
434+
reqId =
435+
XGoogSpannerRequestId.of(
436+
this.dbId, Long.valueOf(session.getChannel()), this.nextNthRequest(), 0);
370437
}
371438
}
372439
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java

+47-1
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,10 @@ public static UpdateTransactionOption excludeTxnFromChangeStreams() {
177177
return EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION;
178178
}
179179

180+
public static RequestIdOption requestId(XGoogSpannerRequestId reqId) {
181+
return new RequestIdOption(reqId);
182+
}
183+
180184
/**
181185
* Specifying this will cause the read to yield at most this many rows. This should be greater
182186
* than 0.
@@ -535,6 +539,7 @@ void appendToOptions(Options options) {
535539
private RpcLockHint lockHint;
536540
private Boolean lastStatement;
537541
private IsolationLevel isolationLevel;
542+
private XGoogSpannerRequestId reqId;
538543

539544
// Construction is via factory methods below.
540545
private Options() {}
@@ -599,6 +604,14 @@ String filter() {
599604
return filter;
600605
}
601606

607+
boolean hasReqId() {
608+
return reqId != null;
609+
}
610+
611+
XGoogSpannerRequestId reqId() {
612+
return reqId;
613+
}
614+
602615
boolean hasPriority() {
603616
return priority != null;
604617
}
@@ -756,6 +769,9 @@ public String toString() {
756769
if (isolationLevel != null) {
757770
b.append("isolationLevel: ").append(isolationLevel).append(' ');
758771
}
772+
if (reqId != null) {
773+
b.append("requestId: ").append(reqId.toString());
774+
}
759775
return b.toString();
760776
}
761777

@@ -798,7 +814,8 @@ public boolean equals(Object o) {
798814
&& Objects.equals(orderBy(), that.orderBy())
799815
&& Objects.equals(isLastStatement(), that.isLastStatement())
800816
&& Objects.equals(lockHint(), that.lockHint())
801-
&& Objects.equals(isolationLevel(), that.isolationLevel());
817+
&& Objects.equals(isolationLevel(), that.isolationLevel())
818+
&& Objects.equals(reqId(), that.reqId());
802819
}
803820

804821
@Override
@@ -867,6 +884,9 @@ public int hashCode() {
867884
if (isolationLevel != null) {
868885
result = 31 * result + isolationLevel.hashCode();
869886
}
887+
if (reqId != null) {
888+
result = 31 * result + reqId.hashCode();
889+
}
870890
return result;
871891
}
872892

@@ -1052,4 +1072,30 @@ public boolean equals(Object o) {
10521072
return o instanceof LastStatementUpdateOption;
10531073
}
10541074
}
1075+
1076+
static final class RequestIdOption extends InternalOption
1077+
implements TransactionOption, UpdateOption {
1078+
private final XGoogSpannerRequestId reqId;
1079+
1080+
RequestIdOption(XGoogSpannerRequestId reqId) {
1081+
this.reqId = reqId;
1082+
}
1083+
1084+
@Override
1085+
void appendToOptions(Options options) {
1086+
options.reqId = this.reqId;
1087+
}
1088+
1089+
@Override
1090+
public int hashCode() {
1091+
return RequestIdOption.class.hashCode();
1092+
}
1093+
1094+
@Override
1095+
public boolean equals(Object o) {
1096+
// TODO: Examine why the precedent for LastStatementUpdateOption
1097+
// does not check against the actual value.
1098+
return o instanceof RequestIdOption;
1099+
}
1100+
}
10551101
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

+55-7
Original file line numberDiff line numberDiff line change
@@ -126,18 +126,31 @@ interface SessionTransaction {
126126
private final Clock clock;
127127
private final Map<SpannerRpc.Option, ?> options;
128128
private final ErrorHandler errorHandler;
129+
private XGoogSpannerRequestId.RequestIdCreator requestIdCreator;
129130

130131
SessionImpl(SpannerImpl spanner, SessionReference sessionReference) {
131132
this(spanner, sessionReference, NO_CHANNEL_HINT);
132133
}
133134

134135
SessionImpl(SpannerImpl spanner, SessionReference sessionReference, int channelHint) {
136+
this(spanner, sessionReference, channelHint, new XGoogSpannerRequestId.NoopRequestIdCreator());
137+
}
138+
139+
SessionImpl(
140+
SpannerImpl spanner,
141+
SessionReference sessionReference,
142+
int channelHint,
143+
XGoogSpannerRequestId.RequestIdCreator requestIdCreator) {
135144
this.spanner = spanner;
136145
this.tracer = spanner.getTracer();
137146
this.sessionReference = sessionReference;
138147
this.clock = spanner.getOptions().getSessionPoolOptions().getPoolMaintainerClock();
139148
this.options = createOptions(sessionReference, channelHint);
140149
this.errorHandler = createErrorHandler(spanner.getOptions());
150+
this.requestIdCreator = requestIdCreator;
151+
if (this.requestIdCreator == null) {
152+
this.requestIdCreator = new XGoogSpannerRequestId.NoopRequestIdCreator();
153+
}
141154
}
142155

143156
static Map<SpannerRpc.Option, ?> createOptions(
@@ -287,9 +300,16 @@ public CommitResponse writeAtLeastOnceWithOptions(
287300
}
288301
CommitRequest request = requestBuilder.build();
289302
ISpan span = tracer.spanBuilder(SpannerImpl.COMMIT);
303+
final XGoogSpannerRequestId reqId = reqIdOrFresh(options);
304+
290305
try (IScope s = tracer.withSpan(span)) {
291306
return SpannerRetryHelper.runTxWithRetriesOnAborted(
292-
() -> new CommitResponse(spanner.getRpc().commit(request, getOptions())));
307+
() -> {
308+
// TODO: Detect an abort and then refresh the reqId.
309+
reqId.incrementAttempt();
310+
Map opts = reqId.withOptions(getOptions());
311+
return new CommitResponse(spanner.getRpc().commit(request, opts));
312+
});
293313
} catch (RuntimeException e) {
294314
span.setStatus(e);
295315
throw e;
@@ -298,6 +318,14 @@ public CommitResponse writeAtLeastOnceWithOptions(
298318
}
299319
}
300320

321+
private XGoogSpannerRequestId reqIdOrFresh(Options options) {
322+
XGoogSpannerRequestId reqId = options.reqId();
323+
if (reqId == null) {
324+
reqId = this.getRequestIdCreator().nextRequestId(1 /* TODO: channelId */, 0);
325+
}
326+
return reqId;
327+
}
328+
301329
private RequestOptions getRequestOptions(TransactionOption... transactionOptions) {
302330
Options requestOptions = Options.fromTransactionOptions(transactionOptions);
303331
if (requestOptions.hasPriority() || requestOptions.hasTag()) {
@@ -325,16 +353,19 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
325353
.setSession(getName())
326354
.addAllMutationGroups(mutationGroupsProto);
327355
RequestOptions batchWriteRequestOptions = getRequestOptions(transactionOptions);
356+
Options allOptions = Options.fromTransactionOptions(transactionOptions);
357+
final XGoogSpannerRequestId reqId = reqIdOrFresh(allOptions);
328358
if (batchWriteRequestOptions != null) {
329359
requestBuilder.setRequestOptions(batchWriteRequestOptions);
330360
}
331-
if (Options.fromTransactionOptions(transactionOptions).withExcludeTxnFromChangeStreams()
332-
== Boolean.TRUE) {
361+
if (allOptions.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
333362
requestBuilder.setExcludeTxnFromChangeStreams(true);
334363
}
335364
ISpan span = tracer.spanBuilder(SpannerImpl.BATCH_WRITE);
336365
try (IScope s = tracer.withSpan(span)) {
337-
return spanner.getRpc().batchWriteAtLeastOnce(requestBuilder.build(), getOptions());
366+
return spanner
367+
.getRpc()
368+
.batchWriteAtLeastOnce(requestBuilder.build(), reqId.withOptions(getOptions()));
338369
} catch (Throwable e) {
339370
span.setStatus(e);
340371
throw SpannerExceptionFactory.newSpannerException(e);
@@ -435,14 +466,18 @@ public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption...
435466

436467
@Override
437468
public ApiFuture<Empty> asyncClose() {
438-
return spanner.getRpc().asyncDeleteSession(getName(), getOptions());
469+
XGoogSpannerRequestId reqId =
470+
this.getRequestIdCreator().nextRequestId(1 /* TODO: channelId */, 0);
471+
return spanner.getRpc().asyncDeleteSession(getName(), reqId.withOptions(getOptions()));
439472
}
440473

441474
@Override
442475
public void close() {
443476
ISpan span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION);
444477
try (IScope s = tracer.withSpan(span)) {
445-
spanner.getRpc().deleteSession(getName(), getOptions());
478+
XGoogSpannerRequestId reqId =
479+
this.getRequestIdCreator().nextRequestId(1 /* TODO: channelId */, 0);
480+
spanner.getRpc().deleteSession(getName(), reqId.withOptions(getOptions()));
446481
} catch (RuntimeException e) {
447482
span.setStatus(e);
448483
throw e;
@@ -473,8 +508,13 @@ ApiFuture<Transaction> beginTransactionAsync(
473508
}
474509
final BeginTransactionRequest request = requestBuilder.build();
475510
final ApiFuture<Transaction> requestFuture;
511+
XGoogSpannerRequestId reqId =
512+
this.getRequestIdCreator().nextRequestId(1 /* TODO: channelId */, 1);
476513
try (IScope ignore = tracer.withSpan(span)) {
477-
requestFuture = spanner.getRpc().beginTransactionAsync(request, channelHint, routeToLeader);
514+
requestFuture =
515+
spanner
516+
.getRpc()
517+
.beginTransactionAsync(request, reqId.withOptions(channelHint), routeToLeader);
478518
}
479519
requestFuture.addListener(
480520
() -> {
@@ -552,4 +592,12 @@ void onTransactionDone() {}
552592
TraceWrapper getTracer() {
553593
return tracer;
554594
}
595+
596+
public void setRequestIdCreator(XGoogSpannerRequestId.RequestIdCreator creator) {
597+
this.requestIdCreator = creator;
598+
}
599+
600+
public XGoogSpannerRequestId.RequestIdCreator getRequestIdCreator() {
601+
return this.requestIdCreator;
602+
}
555603
}

0 commit comments

Comments
 (0)