Skip to content

Commit 40568de

Browse files
authored
chore(spanner): fix tests cases for mux rw (#3628)
* chore(spanner): add interceptor * chore(spanner): assert INVALID_ARGUMENT error code * chore(spanner): update interceptor * chore(spanner): revert precommit token check on mockspanner
1 parent 7c78ec9 commit 40568de

File tree

7 files changed

+126
-11
lines changed

7 files changed

+126
-11
lines changed

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java

-8
Original file line numberDiff line numberDiff line change
@@ -2024,14 +2024,6 @@ public void commit(CommitRequest request, StreamObserver<CommitResponse> respons
20242024
return;
20252025
}
20262026
sessionLastUsed.put(session.getName(), Instant.now());
2027-
if (session.getMultiplexed()
2028-
&& !request.hasPrecommitToken()
2029-
&& !request.hasSingleUseTransaction()) {
2030-
throw Status.INVALID_ARGUMENT
2031-
.withDescription(
2032-
"A Commit request for a read-write transaction on a multiplexed session must specify a precommit token.")
2033-
.asRuntimeException();
2034-
}
20352027
try {
20362028
commitExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock);
20372029
// Find or start a transaction

google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ITAbstractSpannerTest.java

+11
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.cloud.spanner.GceTestEnvConfig;
2323
import com.google.cloud.spanner.IntegrationTestEnv;
2424
import com.google.cloud.spanner.ResultSet;
25+
import com.google.cloud.spanner.Spanner;
2526
import com.google.cloud.spanner.SpannerExceptionFactory;
2627
import com.google.cloud.spanner.SpannerOptions;
2728
import com.google.cloud.spanner.Statement;
@@ -146,6 +147,9 @@ public void intercept(
146147
if (usingMultiplexedsession) {
147148
Field stateField = cls.getDeclaredField("txnState");
148149
stateField.setAccessible(true);
150+
if (tx.getState() == null) {
151+
return;
152+
}
149153
tx.rollback();
150154
stateField.set(tx, TransactionState.ABORTED);
151155
} else {
@@ -368,4 +372,11 @@ protected boolean indexExists(Connection connection, String table, String index)
368372
}
369373
return false;
370374
}
375+
376+
protected boolean isMultiplexedSessionsEnabledForRW(Spanner spanner) {
377+
if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) {
378+
return false;
379+
}
380+
return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW();
381+
}
371382
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITAsyncTransactionRetryTest.java

+36
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,8 @@ public void testCommitAborted() {
221221
AbortInterceptor interceptor = new AbortInterceptor(0);
222222
try (ITConnection connection =
223223
createConnection(interceptor, new CountTransactionRetryListener())) {
224+
interceptor.setUsingMultiplexedSession(
225+
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
224226
ApiFuture<Long> count = getTestRecordCountAsync(connection);
225227
// do an insert
226228
ApiFuture<Long> updateCount =
@@ -253,6 +255,8 @@ public void testInsertAborted() {
253255
AbortInterceptor interceptor = new AbortInterceptor(0);
254256
try (ITConnection connection =
255257
createConnection(interceptor, new CountTransactionRetryListener())) {
258+
interceptor.setUsingMultiplexedSession(
259+
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
256260
ApiFuture<Long> count = getTestRecordCountAsync(connection);
257261
// indicate that the next statement should abort
258262
interceptor.setProbability(1.0);
@@ -276,6 +280,8 @@ public void testUpdateAborted() {
276280
AbortInterceptor interceptor = new AbortInterceptor(0);
277281
try (ITConnection connection =
278282
createConnection(interceptor, new CountTransactionRetryListener())) {
283+
interceptor.setUsingMultiplexedSession(
284+
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
279285
ApiFuture<Long> count = getTestRecordCountAsync(connection);
280286
// insert a test record
281287
connection.executeUpdateAsync(
@@ -309,6 +315,8 @@ public void testQueryAborted() {
309315
AbortInterceptor interceptor = new AbortInterceptor(0);
310316
try (ITConnection connection =
311317
createConnection(interceptor, new CountTransactionRetryListener())) {
318+
interceptor.setUsingMultiplexedSession(
319+
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
312320
// insert a test record
313321
connection.executeUpdateAsync(
314322
Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test aborted')"));
@@ -359,6 +367,8 @@ public void testNextCallAborted() {
359367
AbortInterceptor interceptor = new AbortInterceptor(0);
360368
try (ITConnection connection =
361369
createConnection(interceptor, new CountTransactionRetryListener())) {
370+
interceptor.setUsingMultiplexedSession(
371+
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
362372
// insert two test records
363373
connection.executeUpdateAsync(
364374
Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')"));
@@ -392,6 +402,8 @@ public void testMultipleAborts() {
392402
AbortInterceptor interceptor = new AbortInterceptor(0);
393403
try (ITConnection connection =
394404
createConnection(interceptor, new CountTransactionRetryListener())) {
405+
interceptor.setUsingMultiplexedSession(
406+
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
395407
ApiFuture<Long> count = getTestRecordCountAsync(connection);
396408
// do three inserts which all will abort and retry
397409
interceptor.setProbability(1.0);
@@ -428,6 +440,8 @@ public void testAbortAfterSelect() {
428440
AbortInterceptor interceptor = new AbortInterceptor(0);
429441
try (ITConnection connection =
430442
createConnection(interceptor, new CountTransactionRetryListener())) {
443+
interceptor.setUsingMultiplexedSession(
444+
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
431445
ApiFuture<Long> count = getTestRecordCountAsync(connection);
432446
// insert a test record
433447
connection.executeUpdateAsync(
@@ -504,6 +518,8 @@ public void testAbortWithResultSetHalfway() {
504518
AbortInterceptor interceptor = new AbortInterceptor(0);
505519
try (ITConnection connection =
506520
createConnection(interceptor, new CountTransactionRetryListener())) {
521+
interceptor.setUsingMultiplexedSession(
522+
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
507523
// insert two test records
508524
connection.executeUpdateAsync(
509525
Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')"));
@@ -539,6 +555,8 @@ public void testAbortWithResultSetFullyConsumed() {
539555
AbortInterceptor interceptor = new AbortInterceptor(0);
540556
try (ITConnection connection =
541557
createConnection(interceptor, new CountTransactionRetryListener())) {
558+
interceptor.setUsingMultiplexedSession(
559+
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
542560
// insert two test records
543561
connection.executeUpdateAsync(
544562
Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')"));
@@ -581,6 +599,8 @@ public void testAbortWithConcurrentInsert() {
581599
AbortInterceptor interceptor = new AbortInterceptor(0);
582600
try (ITConnection connection =
583601
createConnection(interceptor, new CountTransactionRetryListener())) {
602+
interceptor.setUsingMultiplexedSession(
603+
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
584604
// insert two test records
585605
connection.executeUpdateAsync(
586606
Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')"));
@@ -632,6 +652,8 @@ public void testAbortWithConcurrentDelete() {
632652
AbortInterceptor interceptor = new AbortInterceptor(0);
633653
// first insert two test records
634654
try (ITConnection connection = createConnection()) {
655+
interceptor.setUsingMultiplexedSession(
656+
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
635657
connection.executeUpdateAsync(
636658
Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')"));
637659
connection.executeUpdateAsync(
@@ -641,6 +663,8 @@ public void testAbortWithConcurrentDelete() {
641663
// open a new connection and select the two test records
642664
try (ITConnection connection =
643665
createConnection(interceptor, new CountTransactionRetryListener())) {
666+
interceptor.setUsingMultiplexedSession(
667+
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
644668
// select the test records and consume the entire result set
645669
try (AsyncResultSet rs =
646670
connection.executeQueryAsync(Statement.of("SELECT * FROM TEST ORDER BY ID"))) {
@@ -694,6 +718,8 @@ public void testAbortWithConcurrentUpdate() {
694718
// open a new connection and select the two test records
695719
try (ITConnection connection =
696720
createConnection(interceptor, new CountTransactionRetryListener())) {
721+
interceptor.setUsingMultiplexedSession(
722+
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
697723
// select the test records and consume the entire result set
698724
try (AsyncResultSet rs =
699725
connection.executeQueryAsync(Statement.of("SELECT * FROM TEST ORDER BY ID"))) {
@@ -744,6 +770,8 @@ public void testAbortWithUnseenConcurrentInsert() throws InterruptedException {
744770
AbortInterceptor interceptor = new AbortInterceptor(0);
745771
try (ITConnection connection =
746772
createConnection(interceptor, new CountTransactionRetryListener())) {
773+
interceptor.setUsingMultiplexedSession(
774+
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
747775
// insert three test records
748776
connection.executeUpdateAsync(
749777
Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')"));
@@ -833,6 +861,8 @@ public void testRetryLargeResultSet() {
833861
final long UPDATED_RECORDS = 1000L;
834862
AbortInterceptor interceptor = new AbortInterceptor(0);
835863
try (ITConnection connection = createConnection()) {
864+
interceptor.setUsingMultiplexedSession(
865+
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
836866
// insert test records
837867
for (int i = 0; i < NUMBER_OF_TEST_RECORDS; i++) {
838868
connection.bufferedWrite(
@@ -845,6 +875,8 @@ public void testRetryLargeResultSet() {
845875
}
846876
try (ITConnection connection =
847877
createConnection(interceptor, new CountTransactionRetryListener())) {
878+
interceptor.setUsingMultiplexedSession(
879+
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
848880
// select the test records and iterate over them
849881
try (AsyncResultSet rs =
850882
connection.executeQueryAsync(Statement.of("SELECT * FROM TEST ORDER BY ID"))) {
@@ -867,6 +899,8 @@ public void testRetryLargeResultSet() {
867899
// Wait until the entire result set has been consumed.
868900
get(finished);
869901
}
902+
interceptor.setUsingMultiplexedSession(
903+
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
870904
// Do an update that will abort and retry.
871905
interceptor.setProbability(1.0);
872906
interceptor.setOnlyInjectOnce(true);
@@ -898,6 +932,8 @@ public void testRetryHighAbortRate() {
898932
AbortInterceptor interceptor = new AbortInterceptor(0.25D);
899933
try (ITConnection connection =
900934
createConnection(interceptor, new CountTransactionRetryListener())) {
935+
interceptor.setUsingMultiplexedSession(
936+
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
901937
// insert test records
902938
for (int i = 0; i < NUMBER_OF_TEST_RECORDS; i++) {
903939
connection.bufferedWrite(

google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITSqlMusicScriptTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ public void test02_RunAbortedTest() {
7171
long numberOfSongs = 0L;
7272
AbortInterceptor interceptor = new AbortInterceptor(0.0D);
7373
try (ITConnection connection = createConnection(interceptor)) {
74+
interceptor.setUsingMultiplexedSession(
75+
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
7476
connection.setAutocommit(false);
7577
connection.setRetryAbortsInternally(true);
7678
// Read all data from the different music tables in the transaction

0 commit comments

Comments
 (0)