Skip to content

Commit c630e66

Browse files
authored
Merge pull request #3 from googleapis/x-goog-spanner-request-id-plumb-into-BatchCreateSessions
test: fix failing test cases
2 parents e7463fb + 5ccd1df commit c630e66

File tree

5 files changed

+47
-34
lines changed

5 files changed

+47
-34
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java

+20-8
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.api.pathtemplate.PathTemplate;
2323
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
2424
import com.google.cloud.spanner.spi.v1.SpannerRpc;
25+
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
2526
import com.google.common.base.Preconditions;
2627
import com.google.common.collect.ImmutableMap;
2728
import com.google.common.collect.Maps;
@@ -109,6 +110,13 @@ Object value() {
109110
return ImmutableMap.copyOf(tmp);
110111
}
111112

113+
static Map<SpannerRpc.Option, ?> createRequestOptions(
114+
long channelId, XGoogSpannerRequestId requestId) {
115+
return ImmutableMap.of(
116+
Option.CHANNEL_HINT, channelId,
117+
Option.REQUEST_ID, requestId);
118+
}
119+
112120
private final class BatchCreateSessionsRunnable implements Runnable {
113121
private final long channelHint;
114122
private final int sessionCount;
@@ -219,26 +227,28 @@ public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) {
219227
SessionImpl createSession() {
220228
// The sessionChannelCounter could overflow, but that will just flip it to Integer.MIN_VALUE,
221229
// which is also a valid channel hint.
222-
final Map<SpannerRpc.Option, ?> options;
223230
final long channelId;
224231
synchronized (this) {
225-
options = optionMap(SessionOption.channelHint(sessionChannelCounter++));
226232
channelId = sessionChannelCounter;
233+
sessionChannelCounter++;
227234
}
235+
XGoogSpannerRequestId reqId = nextRequestId(channelId, 1);
228236
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.commonAttributes);
229237
try (IScope s = spanner.getTracer().withSpan(span)) {
230-
XGoogSpannerRequestId reqId = this.nextRequestId(channelId, 1);
231238
com.google.spanner.v1.Session session =
232239
spanner
233240
.getRpc()
234241
.createSession(
235242
db.getName(),
236243
spanner.getOptions().getDatabaseRole(),
237244
spanner.getOptions().getSessionLabels(),
238-
reqId.withOptions(options));
245+
createRequestOptions(channelId, reqId));
239246
SessionReference sessionReference =
240247
new SessionReference(
241-
session.getName(), session.getCreateTime(), session.getMultiplexed(), options);
248+
session.getName(),
249+
session.getCreateTime(),
250+
session.getMultiplexed(),
251+
optionMap(SessionOption.channelHint(channelId)));
242252
SessionImpl sessionImpl = new SessionImpl(spanner, sessionReference);
243253
sessionImpl.setRequestIdCreator(this);
244254
return sessionImpl;
@@ -399,7 +409,6 @@ void asyncBatchCreateSessions(
399409
*/
400410
private List<SessionImpl> internalBatchCreateSessions(
401411
final int sessionCount, final long channelHint) throws SpannerException {
402-
final Map<SpannerRpc.Option, ?> options = optionMap(SessionOption.channelHint(channelHint));
403412
ISpan parent = spanner.getTracer().getCurrentSpan();
404413
ISpan span =
405414
spanner
@@ -417,7 +426,7 @@ private List<SessionImpl> internalBatchCreateSessions(
417426
sessionCount,
418427
spanner.getOptions().getDatabaseRole(),
419428
spanner.getOptions().getSessionLabels(),
420-
reqId.withOptions(options));
429+
createRequestOptions(channelHint, reqId));
421430
span.addAnnotation(
422431
String.format(
423432
"Request for %d sessions returned %d sessions", sessionCount, sessions.size()));
@@ -428,7 +437,10 @@ private List<SessionImpl> internalBatchCreateSessions(
428437
new SessionImpl(
429438
spanner,
430439
new SessionReference(
431-
session.getName(), session.getCreateTime(), session.getMultiplexed(), options));
440+
session.getName(),
441+
session.getCreateTime(),
442+
session.getMultiplexed(),
443+
optionMap(SessionOption.channelHint(channelHint))));
432444
sessionImpl.setRequestIdCreator(this);
433445
res.add(sessionImpl);
434446
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,7 @@ public void incrementAttempt() {
124124
this.attempt++;
125125
}
126126

127-
@SuppressWarnings("unchecked")
128-
public Map withOptions(Map options) {
127+
Map<SpannerRpc.Option, ?> withOptions(Map<SpannerRpc.Option, ?> options) {
129128
Map copyOptions = new HashMap<>();
130129
if (options != null) {
131130
copyOptions.putAll(options);
@@ -139,11 +138,11 @@ public int hashCode() {
139138
return Objects.hash(this.nthClientId, this.nthChannelId, this.nthRequest, this.attempt);
140139
}
141140

142-
public interface RequestIdCreator {
141+
interface RequestIdCreator {
143142
XGoogSpannerRequestId nextRequestId(long channelId, int attempt);
144143
}
145144

146-
public static class NoopRequestIdCreator implements RequestIdCreator {
145+
static class NoopRequestIdCreator implements RequestIdCreator {
147146
NoopRequestIdCreator() {}
148147

149148
@Override
@@ -152,7 +151,7 @@ public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) {
152151
}
153152
}
154153

155-
public static void assertMonotonicityOfIds(String prefix, List<XGoogSpannerRequestId> reqIds) {
154+
static void assertMonotonicityOfIds(String prefix, List<XGoogSpannerRequestId> reqIds) {
156155
int size = reqIds.size();
157156

158157
List<String> violations = new ArrayList<>();
@@ -164,7 +163,7 @@ public static void assertMonotonicityOfIds(String prefix, List<XGoogSpannerReque
164163
}
165164
}
166165

167-
if (violations.size() == 0) {
166+
if (violations.isEmpty()) {
168167
return;
169168
}
170169

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -1657,7 +1657,7 @@ public Session createSession(
16571657
@Nullable Map<String, String> labels,
16581658
@Nullable Map<Option, ?> options)
16591659
throws SpannerException {
1660-
// By default sessions are not multiplexed
1660+
// By default, sessions are not multiplexed
16611661
return createSession(databaseName, databaseRole, labels, options, false);
16621662
}
16631663

@@ -2043,8 +2043,10 @@ <ReqT, RespT> GrpcCallContext newCallContext(
20432043
context = context.withChannelAffinity(affinity.intValue());
20442044
}
20452045
}
2046-
String methodName = method.getFullMethodName();
2047-
context = withRequestId(context, options, methodName);
2046+
if (method != null) {
2047+
String methodName = method.getFullMethodName();
2048+
context = withRequestId(context, options, methodName);
2049+
}
20482050
}
20492051
context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName));
20502052
if (routeToLeader && leaderAwareRoutingEnabled) {
@@ -2065,7 +2067,8 @@ <ReqT, RespT> GrpcCallContext newCallContext(
20652067
return (GrpcCallContext) context.merge(apiCallContextFromContext);
20662068
}
20672069

2068-
GrpcCallContext withRequestId(GrpcCallContext context, Map options, String methodName) {
2070+
GrpcCallContext withRequestId(
2071+
GrpcCallContext context, Map<SpannerRpc.Option, ?> options, String methodName) {
20692072
XGoogSpannerRequestId reqId = (XGoogSpannerRequestId) options.get(Option.REQUEST_ID);
20702073
if (reqId == null) {
20712074
return context;

google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java

+8-10
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.junit.Assert.assertNotNull;
2222
import static org.junit.Assert.assertThrows;
2323
import static org.junit.Assert.fail;
24+
import static org.mockito.ArgumentMatchers.anyMap;
2425
import static org.mockito.ArgumentMatchers.eq;
2526
import static org.mockito.Mockito.mock;
2627
import static org.mockito.Mockito.when;
@@ -223,7 +224,7 @@ public void writeAtLeastOnce() throws ParseException {
223224
ArgumentCaptor<CommitRequest> commit = ArgumentCaptor.forClass(CommitRequest.class);
224225
CommitResponse response =
225226
CommitResponse.newBuilder().setCommitTimestamp(Timestamps.parse(timestampString)).build();
226-
Mockito.when(rpc.commit(commit.capture(), Mockito.eq(options))).thenReturn(response);
227+
Mockito.when(rpc.commit(commit.capture(), anyMap())).thenReturn(response);
227228

228229
Timestamp timestamp =
229230
session.writeAtLeastOnce(
@@ -255,7 +256,7 @@ public void writeAtLeastOnceWithOptions() throws ParseException {
255256
ArgumentCaptor<CommitRequest> commit = ArgumentCaptor.forClass(CommitRequest.class);
256257
CommitResponse response =
257258
CommitResponse.newBuilder().setCommitTimestamp(Timestamps.parse(timestampString)).build();
258-
Mockito.when(rpc.commit(commit.capture(), Mockito.eq(options))).thenReturn(response);
259+
Mockito.when(rpc.commit(commit.capture(), anyMap())).thenReturn(response);
259260
session.writeAtLeastOnceWithOptions(
260261
Collections.singletonList(Mutation.newInsertBuilder("T").set("C").to("x").build()),
261262
Options.tag(tag));
@@ -340,7 +341,7 @@ public void newMultiUseReadOnlyTransactionContextClosesOldSingleUseContext() {
340341
public void writeClosesOldSingleUseContext() throws ParseException {
341342
ReadContext ctx = session.singleUse(TimestampBound.strong());
342343

343-
Mockito.when(rpc.commit(Mockito.any(), Mockito.eq(options)))
344+
Mockito.when(rpc.commit(Mockito.any(), anyMap()))
344345
.thenReturn(
345346
CommitResponse.newBuilder()
346347
.setCommitTimestamp(Timestamps.parse("2015-10-01T10:54:20.021Z"))
@@ -442,7 +443,7 @@ public void request(int numMessages) {}
442443
private void mockRead(final PartialResultSet myResultSet) {
443444
final ArgumentCaptor<SpannerRpc.ResultStreamConsumer> consumer =
444445
ArgumentCaptor.forClass(SpannerRpc.ResultStreamConsumer.class);
445-
Mockito.when(rpc.read(Mockito.any(), consumer.capture(), Mockito.eq(options), eq(false)))
446+
Mockito.when(rpc.read(Mockito.any(), consumer.capture(), anyMap(), eq(false)))
446447
.then(
447448
invocation -> {
448449
consumer.getValue().onPartialResultSet(myResultSet);
@@ -458,8 +459,7 @@ public void multiUseReadOnlyTransactionReturnsEmptyTransactionMetadata() {
458459
PartialResultSet.newBuilder()
459460
.setMetadata(newMetadata(Type.struct(Type.StructField.of("C", Type.string()))))
460461
.build();
461-
Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options), eq(false)))
462-
.thenReturn(txnMetadata);
462+
Mockito.when(rpc.beginTransaction(Mockito.any(), anyMap(), eq(false))).thenReturn(txnMetadata);
463463
mockRead(resultSet);
464464

465465
ReadOnlyTransaction txn = session.readOnlyTransaction(TimestampBound.strong());
@@ -477,8 +477,7 @@ public void multiUseReadOnlyTransactionReturnsMissingTimestamp() {
477477
PartialResultSet.newBuilder()
478478
.setMetadata(newMetadata(Type.struct(Type.StructField.of("C", Type.string()))))
479479
.build();
480-
Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options), eq(false)))
481-
.thenReturn(txnMetadata);
480+
Mockito.when(rpc.beginTransaction(Mockito.any(), anyMap(), eq(false))).thenReturn(txnMetadata);
482481
mockRead(resultSet);
483482

484483
ReadOnlyTransaction txn = session.readOnlyTransaction(TimestampBound.strong());
@@ -497,8 +496,7 @@ public void multiUseReadOnlyTransactionReturnsMissingTransactionId() throws Pars
497496
PartialResultSet.newBuilder()
498497
.setMetadata(newMetadata(Type.struct(Type.StructField.of("C", Type.string()))))
499498
.build();
500-
Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options), eq(false)))
501-
.thenReturn(txnMetadata);
499+
Mockito.when(rpc.beginTransaction(Mockito.any(), anyMap(), eq(false))).thenReturn(txnMetadata);
502500
mockRead(resultSet);
503501

504502
ReadOnlyTransaction txn = session.readOnlyTransaction(TimestampBound.strong());

google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -1647,13 +1647,13 @@ public void testSessionNotFoundWrite() {
16471647
SpannerExceptionFactoryTest.newSessionNotFoundException(sessionName);
16481648
List<Mutation> mutations = Collections.singletonList(Mutation.newInsertBuilder("FOO").build());
16491649
final SessionImpl closedSession = mockSession();
1650-
when(closedSession.writeWithOptions(mutations)).thenThrow(sessionNotFound);
1650+
when(closedSession.writeWithOptions(eq(mutations), any())).thenThrow(sessionNotFound);
16511651

16521652
final SessionImpl openSession = mockSession();
16531653
com.google.cloud.spanner.CommitResponse response =
16541654
mock(com.google.cloud.spanner.CommitResponse.class);
16551655
when(response.getCommitTimestamp()).thenReturn(Timestamp.now());
1656-
when(openSession.writeWithOptions(mutations)).thenReturn(response);
1656+
when(openSession.writeWithOptions(eq(mutations), any())).thenReturn(response);
16571657
doAnswer(
16581658
invocation -> {
16591659
executor.submit(
@@ -1690,13 +1690,14 @@ public void testSessionNotFoundWriteAtLeastOnce() {
16901690
SpannerExceptionFactoryTest.newSessionNotFoundException(sessionName);
16911691
List<Mutation> mutations = Collections.singletonList(Mutation.newInsertBuilder("FOO").build());
16921692
final SessionImpl closedSession = mockSession();
1693-
when(closedSession.writeAtLeastOnceWithOptions(mutations)).thenThrow(sessionNotFound);
1693+
when(closedSession.writeAtLeastOnceWithOptions(eq(mutations), any()))
1694+
.thenThrow(sessionNotFound);
16941695

16951696
final SessionImpl openSession = mockSession();
16961697
com.google.cloud.spanner.CommitResponse response =
16971698
mock(com.google.cloud.spanner.CommitResponse.class);
16981699
when(response.getCommitTimestamp()).thenReturn(Timestamp.now());
1699-
when(openSession.writeAtLeastOnceWithOptions(mutations)).thenReturn(response);
1700+
when(openSession.writeAtLeastOnceWithOptions(eq(mutations), any())).thenReturn(response);
17001701
doAnswer(
17011702
invocation -> {
17021703
executor.submit(
@@ -1732,10 +1733,10 @@ public void testSessionNotFoundPartitionedUpdate() {
17321733
SpannerExceptionFactoryTest.newSessionNotFoundException(sessionName);
17331734
Statement statement = Statement.of("UPDATE FOO SET BAR=1 WHERE 1=1");
17341735
final SessionImpl closedSession = mockSession();
1735-
when(closedSession.executePartitionedUpdate(statement)).thenThrow(sessionNotFound);
1736+
when(closedSession.executePartitionedUpdate(eq(statement), any())).thenThrow(sessionNotFound);
17361737

17371738
final SessionImpl openSession = mockSession();
1738-
when(openSession.executePartitionedUpdate(statement)).thenReturn(1L);
1739+
when(openSession.executePartitionedUpdate(eq(statement), any())).thenReturn(1L);
17391740
doAnswer(
17401741
invocation -> {
17411742
executor.submit(

0 commit comments

Comments
 (0)