23
23
import com .google .cloud .spanner .SessionPool .PooledSessionFuture ;
24
24
import com .google .cloud .spanner .SpannerImpl .ClosedException ;
25
25
import com .google .common .annotations .VisibleForTesting ;
26
- import com .google .common .base .Function ;
27
26
import com .google .common .util .concurrent .ListenableFuture ;
28
27
import com .google .spanner .v1 .BatchWriteResponse ;
29
28
import io .opentelemetry .api .common .Attributes ;
29
+ import java .util .ArrayList ;
30
+ import java .util .Arrays ;
31
+ import java .util .Objects ;
32
+ import java .util .concurrent .atomic .AtomicInteger ;
33
+ import java .util .function .BiFunction ;
30
34
import javax .annotation .Nullable ;
31
35
32
36
class DatabaseClientImpl implements DatabaseClient {
@@ -40,6 +44,8 @@ class DatabaseClientImpl implements DatabaseClient {
40
44
@ VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient ;
41
45
@ VisibleForTesting final boolean useMultiplexedSessionPartitionedOps ;
42
46
@ VisibleForTesting final boolean useMultiplexedSessionForRW ;
47
+ private final int dbId ;
48
+ private final AtomicInteger nthRequest ;
43
49
44
50
final boolean useMultiplexedSessionBlindWrite ;
45
51
@@ -86,6 +92,18 @@ class DatabaseClientImpl implements DatabaseClient {
86
92
this .tracer = tracer ;
87
93
this .useMultiplexedSessionForRW = useMultiplexedSessionForRW ;
88
94
this .commonAttributes = commonAttributes ;
95
+
96
+ this .dbId = this .dbIdFromClientId (this .clientId );
97
+ this .nthRequest = new AtomicInteger (0 );
98
+ }
99
+
100
+ private int dbIdFromClientId (String clientId ) {
101
+ int i = clientId .indexOf ("-" );
102
+ String strWithValue = clientId .substring (i + 1 );
103
+ if (Objects .equals (strWithValue , "" )) {
104
+ strWithValue = "0" ;
105
+ }
106
+ return Integer .parseInt (strWithValue );
89
107
}
90
108
91
109
@ VisibleForTesting
@@ -159,7 +177,11 @@ public CommitResponse writeWithOptions(
159
177
if (canUseMultiplexedSessionsForRW () && getMultiplexedSessionDatabaseClient () != null ) {
160
178
return getMultiplexedSessionDatabaseClient ().writeWithOptions (mutations , options );
161
179
}
162
- return runWithSessionRetry (session -> session .writeWithOptions (mutations , options ));
180
+
181
+ return runWithSessionRetry (
182
+ (session , reqId ) -> {
183
+ return session .writeWithOptions (mutations , withReqId (reqId , options ));
184
+ });
163
185
} catch (RuntimeException e ) {
164
186
span .setStatus (e );
165
187
throw e ;
@@ -184,7 +206,8 @@ public CommitResponse writeAtLeastOnceWithOptions(
184
206
.writeAtLeastOnceWithOptions (mutations , options );
185
207
}
186
208
return runWithSessionRetry (
187
- session -> session .writeAtLeastOnceWithOptions (mutations , options ));
209
+ (session , reqId ) ->
210
+ session .writeAtLeastOnceWithOptions (mutations , withReqId (reqId , options )));
188
211
} catch (RuntimeException e ) {
189
212
span .setStatus (e );
190
213
throw e ;
@@ -193,6 +216,10 @@ public CommitResponse writeAtLeastOnceWithOptions(
193
216
}
194
217
}
195
218
219
+ private int nextNthRequest () {
220
+ return this .nthRequest .incrementAndGet ();
221
+ }
222
+
196
223
@ Override
197
224
public ServerStream <BatchWriteResponse > batchWriteAtLeastOnce (
198
225
final Iterable <MutationGroup > mutationGroups , final TransactionOption ... options )
@@ -202,7 +229,9 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
202
229
if (canUseMultiplexedSessionsForRW () && getMultiplexedSessionDatabaseClient () != null ) {
203
230
return getMultiplexedSessionDatabaseClient ().batchWriteAtLeastOnce (mutationGroups , options );
204
231
}
205
- return runWithSessionRetry (session -> session .batchWriteAtLeastOnce (mutationGroups , options ));
232
+ return runWithSessionRetry (
233
+ (session , reqId ) ->
234
+ session .batchWriteAtLeastOnce (mutationGroups , withReqId (reqId , options )));
206
235
} catch (RuntimeException e ) {
207
236
span .setStatus (e );
208
237
throw e ;
@@ -346,27 +375,57 @@ public long executePartitionedUpdate(final Statement stmt, final UpdateOption...
346
375
return executePartitionedUpdateWithPooledSession (stmt , options );
347
376
}
348
377
378
+ private UpdateOption [] withReqId (
379
+ final XGoogSpannerRequestId reqId , final UpdateOption ... options ) {
380
+ if (reqId == null ) {
381
+ return options ;
382
+ }
383
+ ArrayList <UpdateOption > allOptions = new ArrayList (Arrays .asList (options ));
384
+ allOptions .add (new Options .RequestIdOption (reqId ));
385
+ return allOptions .toArray (new UpdateOption [0 ]);
386
+ }
387
+
388
+ private TransactionOption [] withReqId (
389
+ final XGoogSpannerRequestId reqId , final TransactionOption ... options ) {
390
+ if (reqId == null ) {
391
+ return options ;
392
+ }
393
+ ArrayList <TransactionOption > allOptions = new ArrayList (Arrays .asList (options ));
394
+ allOptions .add (new Options .RequestIdOption (reqId ));
395
+ return allOptions .toArray (new TransactionOption [0 ]);
396
+ }
397
+
349
398
private long executePartitionedUpdateWithPooledSession (
350
399
final Statement stmt , final UpdateOption ... options ) {
351
400
ISpan span = tracer .spanBuilder (PARTITION_DML_TRANSACTION , commonAttributes );
352
401
try (IScope s = tracer .withSpan (span )) {
353
- return runWithSessionRetry (session -> session .executePartitionedUpdate (stmt , options ));
402
+ return runWithSessionRetry (
403
+ (session , reqId ) -> {
404
+ return session .executePartitionedUpdate (stmt , withReqId (reqId , options ));
405
+ });
354
406
} catch (RuntimeException e ) {
355
407
span .setStatus (e );
356
408
span .end ();
357
409
throw e ;
358
410
}
359
411
}
360
412
361
- private <T > T runWithSessionRetry (Function <Session , T > callable ) {
413
+ private <T > T runWithSessionRetry (BiFunction <Session , XGoogSpannerRequestId , T > callable ) {
362
414
PooledSessionFuture session = getSession ();
415
+ XGoogSpannerRequestId reqId =
416
+ XGoogSpannerRequestId .of (
417
+ this .dbId , Long .valueOf (session .getChannel ()), this .nextNthRequest (), 0 );
363
418
while (true ) {
364
419
try {
365
- return callable .apply (session );
420
+ reqId .incrementAttempt ();
421
+ return callable .apply (session , reqId );
366
422
} catch (SessionNotFoundException e ) {
367
423
session =
368
424
(PooledSessionFuture )
369
425
pool .getPooledSessionReplacementHandler ().replaceSession (e , session );
426
+ reqId =
427
+ XGoogSpannerRequestId .of (
428
+ this .dbId , Long .valueOf (session .getChannel ()), this .nextNthRequest (), 0 );
370
429
}
371
430
}
372
431
}
0 commit comments