Skip to content

chore(spanner): fix tests cases for mux rw #3628

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2024,14 +2024,6 @@ public void commit(CommitRequest request, StreamObserver<CommitResponse> respons
return;
}
sessionLastUsed.put(session.getName(), Instant.now());
if (session.getMultiplexed()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this since this no longer will be the case in backend.

&& !request.hasPrecommitToken()
&& !request.hasSingleUseTransaction()) {
throw Status.INVALID_ARGUMENT
.withDescription(
"A Commit request for a read-write transaction on a multiplexed session must specify a precommit token.")
.asRuntimeException();
}
try {
commitExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock);
// Find or start a transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.spanner.GceTestEnvConfig;
import com.google.cloud.spanner.IntegrationTestEnv;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
Expand Down Expand Up @@ -146,6 +147,9 @@ public void intercept(
if (usingMultiplexedsession) {
Field stateField = cls.getDeclaredField("txnState");
stateField.setAccessible(true);
if (tx.getState() == null) {
return;
}
tx.rollback();
stateField.set(tx, TransactionState.ABORTED);
} else {
Expand Down Expand Up @@ -368,4 +372,11 @@ protected boolean indexExists(Connection connection, String table, String index)
}
return false;
}

protected boolean isMultiplexedSessionsEnabledForRW(Spanner spanner) {
if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) {
return false;
}
return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ public void testCommitAborted() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
ApiFuture<Long> count = getTestRecordCountAsync(connection);
// do an insert
ApiFuture<Long> updateCount =
Expand Down Expand Up @@ -253,6 +255,8 @@ public void testInsertAborted() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
ApiFuture<Long> count = getTestRecordCountAsync(connection);
// indicate that the next statement should abort
interceptor.setProbability(1.0);
Expand All @@ -276,6 +280,8 @@ public void testUpdateAborted() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
ApiFuture<Long> count = getTestRecordCountAsync(connection);
// insert a test record
connection.executeUpdateAsync(
Expand Down Expand Up @@ -309,6 +315,8 @@ public void testQueryAborted() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// insert a test record
connection.executeUpdateAsync(
Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test aborted')"));
Expand Down Expand Up @@ -359,6 +367,8 @@ public void testNextCallAborted() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// insert two test records
connection.executeUpdateAsync(
Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')"));
Expand Down Expand Up @@ -392,6 +402,8 @@ public void testMultipleAborts() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
ApiFuture<Long> count = getTestRecordCountAsync(connection);
// do three inserts which all will abort and retry
interceptor.setProbability(1.0);
Expand Down Expand Up @@ -428,6 +440,8 @@ public void testAbortAfterSelect() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
ApiFuture<Long> count = getTestRecordCountAsync(connection);
// insert a test record
connection.executeUpdateAsync(
Expand Down Expand Up @@ -504,6 +518,8 @@ public void testAbortWithResultSetHalfway() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// insert two test records
connection.executeUpdateAsync(
Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')"));
Expand Down Expand Up @@ -539,6 +555,8 @@ public void testAbortWithResultSetFullyConsumed() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// insert two test records
connection.executeUpdateAsync(
Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')"));
Expand Down Expand Up @@ -581,6 +599,8 @@ public void testAbortWithConcurrentInsert() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// insert two test records
connection.executeUpdateAsync(
Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')"));
Expand Down Expand Up @@ -632,6 +652,8 @@ public void testAbortWithConcurrentDelete() {
AbortInterceptor interceptor = new AbortInterceptor(0);
// first insert two test records
try (ITConnection connection = createConnection()) {
interceptor.setUsingMultiplexedSession(
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
connection.executeUpdateAsync(
Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')"));
connection.executeUpdateAsync(
Expand All @@ -641,6 +663,8 @@ public void testAbortWithConcurrentDelete() {
// open a new connection and select the two test records
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// select the test records and consume the entire result set
try (AsyncResultSet rs =
connection.executeQueryAsync(Statement.of("SELECT * FROM TEST ORDER BY ID"))) {
Expand Down Expand Up @@ -694,6 +718,8 @@ public void testAbortWithConcurrentUpdate() {
// open a new connection and select the two test records
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// select the test records and consume the entire result set
try (AsyncResultSet rs =
connection.executeQueryAsync(Statement.of("SELECT * FROM TEST ORDER BY ID"))) {
Expand Down Expand Up @@ -744,6 +770,8 @@ public void testAbortWithUnseenConcurrentInsert() throws InterruptedException {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// insert three test records
connection.executeUpdateAsync(
Statement.of("INSERT INTO TEST (ID, NAME) VALUES (1, 'test 1')"));
Expand Down Expand Up @@ -833,6 +861,8 @@ public void testRetryLargeResultSet() {
final long UPDATED_RECORDS = 1000L;
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection = createConnection()) {
interceptor.setUsingMultiplexedSession(
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// insert test records
for (int i = 0; i < NUMBER_OF_TEST_RECORDS; i++) {
connection.bufferedWrite(
Expand All @@ -845,6 +875,8 @@ public void testRetryLargeResultSet() {
}
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// select the test records and iterate over them
try (AsyncResultSet rs =
connection.executeQueryAsync(Statement.of("SELECT * FROM TEST ORDER BY ID"))) {
Expand All @@ -867,6 +899,8 @@ public void testRetryLargeResultSet() {
// Wait until the entire result set has been consumed.
get(finished);
}
interceptor.setUsingMultiplexedSession(
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// Do an update that will abort and retry.
interceptor.setProbability(1.0);
interceptor.setOnlyInjectOnce(true);
Expand Down Expand Up @@ -898,6 +932,8 @@ public void testRetryHighAbortRate() {
AbortInterceptor interceptor = new AbortInterceptor(0.25D);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// insert test records
for (int i = 0; i < NUMBER_OF_TEST_RECORDS; i++) {
connection.bufferedWrite(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public void test02_RunAbortedTest() {
long numberOfSongs = 0L;
AbortInterceptor interceptor = new AbortInterceptor(0.0D);
try (ITConnection connection = createConnection(interceptor)) {
interceptor.setUsingMultiplexedSession(
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
connection.setAutocommit(false);
connection.setRetryAbortsInternally(true);
// Read all data from the different music tables in the transaction
Expand Down
Loading
Loading