Skip to content

test: fix failing test cases #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.api.pathtemplate.PathTemplate;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -109,6 +110,13 @@ Object value() {
return ImmutableMap.copyOf(tmp);
}

static Map<SpannerRpc.Option, ?> createRequestOptions(
long channelId, XGoogSpannerRequestId requestId) {
return ImmutableMap.of(
Option.CHANNEL_HINT, channelId,
Option.REQUEST_ID, requestId);
}

private final class BatchCreateSessionsRunnable implements Runnable {
private final long channelHint;
private final int sessionCount;
Expand Down Expand Up @@ -219,26 +227,28 @@ public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) {
SessionImpl createSession() {
// The sessionChannelCounter could overflow, but that will just flip it to Integer.MIN_VALUE,
// which is also a valid channel hint.
final Map<SpannerRpc.Option, ?> options;
final long channelId;
synchronized (this) {
options = optionMap(SessionOption.channelHint(sessionChannelCounter++));
channelId = sessionChannelCounter;
sessionChannelCounter++;
}
XGoogSpannerRequestId reqId = nextRequestId(channelId, 1);
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.commonAttributes);
try (IScope s = spanner.getTracer().withSpan(span)) {
XGoogSpannerRequestId reqId = this.nextRequestId(channelId, 1);
com.google.spanner.v1.Session session =
spanner
.getRpc()
.createSession(
db.getName(),
spanner.getOptions().getDatabaseRole(),
spanner.getOptions().getSessionLabels(),
reqId.withOptions(options));
createRequestOptions(channelId, reqId));
SessionReference sessionReference =
new SessionReference(
session.getName(), session.getCreateTime(), session.getMultiplexed(), options);
session.getName(),
session.getCreateTime(),
session.getMultiplexed(),
optionMap(SessionOption.channelHint(channelId)));
SessionImpl sessionImpl = new SessionImpl(spanner, sessionReference);
sessionImpl.setRequestIdCreator(this);
return sessionImpl;
Expand Down Expand Up @@ -399,7 +409,6 @@ void asyncBatchCreateSessions(
*/
private List<SessionImpl> internalBatchCreateSessions(
final int sessionCount, final long channelHint) throws SpannerException {
final Map<SpannerRpc.Option, ?> options = optionMap(SessionOption.channelHint(channelHint));
ISpan parent = spanner.getTracer().getCurrentSpan();
ISpan span =
spanner
Expand All @@ -417,7 +426,7 @@ private List<SessionImpl> internalBatchCreateSessions(
sessionCount,
spanner.getOptions().getDatabaseRole(),
spanner.getOptions().getSessionLabels(),
reqId.withOptions(options));
createRequestOptions(channelHint, reqId));
span.addAnnotation(
String.format(
"Request for %d sessions returned %d sessions", sessionCount, sessions.size()));
Expand All @@ -428,7 +437,10 @@ private List<SessionImpl> internalBatchCreateSessions(
new SessionImpl(
spanner,
new SessionReference(
session.getName(), session.getCreateTime(), session.getMultiplexed(), options));
session.getName(),
session.getCreateTime(),
session.getMultiplexed(),
optionMap(SessionOption.channelHint(channelHint))));
sessionImpl.setRequestIdCreator(this);
res.add(sessionImpl);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,7 @@ public void incrementAttempt() {
this.attempt++;
}

@SuppressWarnings("unchecked")
public Map withOptions(Map options) {
Map<SpannerRpc.Option, ?> withOptions(Map<SpannerRpc.Option, ?> options) {
Map copyOptions = new HashMap<>();
if (options != null) {
copyOptions.putAll(options);
Expand All @@ -139,11 +138,11 @@ public int hashCode() {
return Objects.hash(this.nthClientId, this.nthChannelId, this.nthRequest, this.attempt);
}

public interface RequestIdCreator {
interface RequestIdCreator {
XGoogSpannerRequestId nextRequestId(long channelId, int attempt);
}

public static class NoopRequestIdCreator implements RequestIdCreator {
static class NoopRequestIdCreator implements RequestIdCreator {
NoopRequestIdCreator() {}

@Override
Expand All @@ -152,7 +151,7 @@ public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) {
}
}

public static void assertMonotonicityOfIds(String prefix, List<XGoogSpannerRequestId> reqIds) {
static void assertMonotonicityOfIds(String prefix, List<XGoogSpannerRequestId> reqIds) {
int size = reqIds.size();

List<String> violations = new ArrayList<>();
Expand All @@ -164,7 +163,7 @@ public static void assertMonotonicityOfIds(String prefix, List<XGoogSpannerReque
}
}

if (violations.size() == 0) {
if (violations.isEmpty()) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1657,7 +1657,7 @@ public Session createSession(
@Nullable Map<String, String> labels,
@Nullable Map<Option, ?> options)
throws SpannerException {
// By default sessions are not multiplexed
// By default, sessions are not multiplexed
return createSession(databaseName, databaseRole, labels, options, false);
}

Expand Down Expand Up @@ -2043,8 +2043,10 @@ <ReqT, RespT> GrpcCallContext newCallContext(
context = context.withChannelAffinity(affinity.intValue());
}
}
String methodName = method.getFullMethodName();
context = withRequestId(context, options, methodName);
if (method != null) {
String methodName = method.getFullMethodName();
context = withRequestId(context, options, methodName);
}
}
context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName));
if (routeToLeader && leaderAwareRoutingEnabled) {
Expand All @@ -2065,7 +2067,8 @@ <ReqT, RespT> GrpcCallContext newCallContext(
return (GrpcCallContext) context.merge(apiCallContextFromContext);
}

GrpcCallContext withRequestId(GrpcCallContext context, Map options, String methodName) {
GrpcCallContext withRequestId(
GrpcCallContext context, Map<SpannerRpc.Option, ?> options, String methodName) {
XGoogSpannerRequestId reqId = (XGoogSpannerRequestId) options.get(Option.REQUEST_ID);
if (reqId == null) {
return context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -223,7 +224,7 @@ public void writeAtLeastOnce() throws ParseException {
ArgumentCaptor<CommitRequest> commit = ArgumentCaptor.forClass(CommitRequest.class);
CommitResponse response =
CommitResponse.newBuilder().setCommitTimestamp(Timestamps.parse(timestampString)).build();
Mockito.when(rpc.commit(commit.capture(), Mockito.eq(options))).thenReturn(response);
Mockito.when(rpc.commit(commit.capture(), anyMap())).thenReturn(response);

Timestamp timestamp =
session.writeAtLeastOnce(
Expand Down Expand Up @@ -255,7 +256,7 @@ public void writeAtLeastOnceWithOptions() throws ParseException {
ArgumentCaptor<CommitRequest> commit = ArgumentCaptor.forClass(CommitRequest.class);
CommitResponse response =
CommitResponse.newBuilder().setCommitTimestamp(Timestamps.parse(timestampString)).build();
Mockito.when(rpc.commit(commit.capture(), Mockito.eq(options))).thenReturn(response);
Mockito.when(rpc.commit(commit.capture(), anyMap())).thenReturn(response);
session.writeAtLeastOnceWithOptions(
Collections.singletonList(Mutation.newInsertBuilder("T").set("C").to("x").build()),
Options.tag(tag));
Expand Down Expand Up @@ -340,7 +341,7 @@ public void newMultiUseReadOnlyTransactionContextClosesOldSingleUseContext() {
public void writeClosesOldSingleUseContext() throws ParseException {
ReadContext ctx = session.singleUse(TimestampBound.strong());

Mockito.when(rpc.commit(Mockito.any(), Mockito.eq(options)))
Mockito.when(rpc.commit(Mockito.any(), anyMap()))
.thenReturn(
CommitResponse.newBuilder()
.setCommitTimestamp(Timestamps.parse("2015-10-01T10:54:20.021Z"))
Expand Down Expand Up @@ -442,7 +443,7 @@ public void request(int numMessages) {}
private void mockRead(final PartialResultSet myResultSet) {
final ArgumentCaptor<SpannerRpc.ResultStreamConsumer> consumer =
ArgumentCaptor.forClass(SpannerRpc.ResultStreamConsumer.class);
Mockito.when(rpc.read(Mockito.any(), consumer.capture(), Mockito.eq(options), eq(false)))
Mockito.when(rpc.read(Mockito.any(), consumer.capture(), anyMap(), eq(false)))
.then(
invocation -> {
consumer.getValue().onPartialResultSet(myResultSet);
Expand All @@ -458,8 +459,7 @@ public void multiUseReadOnlyTransactionReturnsEmptyTransactionMetadata() {
PartialResultSet.newBuilder()
.setMetadata(newMetadata(Type.struct(Type.StructField.of("C", Type.string()))))
.build();
Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options), eq(false)))
.thenReturn(txnMetadata);
Mockito.when(rpc.beginTransaction(Mockito.any(), anyMap(), eq(false))).thenReturn(txnMetadata);
mockRead(resultSet);

ReadOnlyTransaction txn = session.readOnlyTransaction(TimestampBound.strong());
Expand All @@ -477,8 +477,7 @@ public void multiUseReadOnlyTransactionReturnsMissingTimestamp() {
PartialResultSet.newBuilder()
.setMetadata(newMetadata(Type.struct(Type.StructField.of("C", Type.string()))))
.build();
Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options), eq(false)))
.thenReturn(txnMetadata);
Mockito.when(rpc.beginTransaction(Mockito.any(), anyMap(), eq(false))).thenReturn(txnMetadata);
mockRead(resultSet);

ReadOnlyTransaction txn = session.readOnlyTransaction(TimestampBound.strong());
Expand All @@ -497,8 +496,7 @@ public void multiUseReadOnlyTransactionReturnsMissingTransactionId() throws Pars
PartialResultSet.newBuilder()
.setMetadata(newMetadata(Type.struct(Type.StructField.of("C", Type.string()))))
.build();
Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options), eq(false)))
.thenReturn(txnMetadata);
Mockito.when(rpc.beginTransaction(Mockito.any(), anyMap(), eq(false))).thenReturn(txnMetadata);
mockRead(resultSet);

ReadOnlyTransaction txn = session.readOnlyTransaction(TimestampBound.strong());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1647,13 +1647,13 @@ public void testSessionNotFoundWrite() {
SpannerExceptionFactoryTest.newSessionNotFoundException(sessionName);
List<Mutation> mutations = Collections.singletonList(Mutation.newInsertBuilder("FOO").build());
final SessionImpl closedSession = mockSession();
when(closedSession.writeWithOptions(mutations)).thenThrow(sessionNotFound);
when(closedSession.writeWithOptions(eq(mutations), any())).thenThrow(sessionNotFound);

final SessionImpl openSession = mockSession();
com.google.cloud.spanner.CommitResponse response =
mock(com.google.cloud.spanner.CommitResponse.class);
when(response.getCommitTimestamp()).thenReturn(Timestamp.now());
when(openSession.writeWithOptions(mutations)).thenReturn(response);
when(openSession.writeWithOptions(eq(mutations), any())).thenReturn(response);
doAnswer(
invocation -> {
executor.submit(
Expand Down Expand Up @@ -1690,13 +1690,14 @@ public void testSessionNotFoundWriteAtLeastOnce() {
SpannerExceptionFactoryTest.newSessionNotFoundException(sessionName);
List<Mutation> mutations = Collections.singletonList(Mutation.newInsertBuilder("FOO").build());
final SessionImpl closedSession = mockSession();
when(closedSession.writeAtLeastOnceWithOptions(mutations)).thenThrow(sessionNotFound);
when(closedSession.writeAtLeastOnceWithOptions(eq(mutations), any()))
.thenThrow(sessionNotFound);

final SessionImpl openSession = mockSession();
com.google.cloud.spanner.CommitResponse response =
mock(com.google.cloud.spanner.CommitResponse.class);
when(response.getCommitTimestamp()).thenReturn(Timestamp.now());
when(openSession.writeAtLeastOnceWithOptions(mutations)).thenReturn(response);
when(openSession.writeAtLeastOnceWithOptions(eq(mutations), any())).thenReturn(response);
doAnswer(
invocation -> {
executor.submit(
Expand Down Expand Up @@ -1732,10 +1733,10 @@ public void testSessionNotFoundPartitionedUpdate() {
SpannerExceptionFactoryTest.newSessionNotFoundException(sessionName);
Statement statement = Statement.of("UPDATE FOO SET BAR=1 WHERE 1=1");
final SessionImpl closedSession = mockSession();
when(closedSession.executePartitionedUpdate(statement)).thenThrow(sessionNotFound);
when(closedSession.executePartitionedUpdate(eq(statement), any())).thenThrow(sessionNotFound);

final SessionImpl openSession = mockSession();
when(openSession.executePartitionedUpdate(statement)).thenReturn(1L);
when(openSession.executePartitionedUpdate(eq(statement), any())).thenReturn(1L);
doAnswer(
invocation -> {
executor.submit(
Expand Down