Skip to content

Commit 5a1be3d

Browse files
authored
feat: add transaction runner for connections (#3559)
Adds a `runTransaction` method to `Connection` to allow applications to execute read/write transactions that are automatically retried in the same way as in the standard client library. This feature will be extended to the JDBC driver, so transaction retries can be defined using a runner there as well.
1 parent d9813a0 commit 5a1be3d

File tree

8 files changed

+362
-9
lines changed

8 files changed

+362
-9
lines changed

google-cloud-spanner/clirr-ignored-differences.xml

+7-1
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,12 @@
814814
<className>com/google/cloud/spanner/connection/TransactionRetryListener</className>
815815
<method>void retryDmlAsPartitionedDmlFailed(java.util.UUID, com.google.cloud.spanner.Statement, java.lang.Throwable)</method>
816816
</difference>
817-
817+
818+
<!-- Added transaction runner. -->
819+
<difference>
820+
<differenceType>7012</differenceType>
821+
<className>com/google/cloud/spanner/connection/Connection</className>
822+
<method>java.lang.Object runTransaction(com.google.cloud.spanner.connection.Connection$TransactionCallable)</method>
823+
</difference>
818824

819825
</differences>

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public void rollback() {
9999
public TransactionContext resetForRetry() {
100100
if (txn == null || !txn.isAborted() && txnState != TransactionState.ABORTED) {
101101
throw new IllegalStateException(
102-
"resetForRetry can only be called if the previous attempt" + " aborted");
102+
"resetForRetry can only be called if the previous attempt aborted");
103103
}
104104
try (IScope s = tracer.withSpan(span)) {
105105
boolean useInlinedBegin = txn.transactionId != null;

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/Connection.java

+15
Original file line numberDiff line numberDiff line change
@@ -835,6 +835,21 @@ default boolean isKeepTransactionAlive() {
835835
*/
836836
ApiFuture<Void> rollbackAsync();
837837

838+
/** Functional interface for the {@link #runTransaction(TransactionCallable)} method. */
839+
interface TransactionCallable<T> {
840+
/** This method is invoked with a fresh transaction on the connection. */
841+
T run(Connection transaction);
842+
}
843+
844+
/**
845+
* Runs the given callable in a transaction. The transaction type is determined by the current
846+
* state of the connection. That is; if the connection is in read/write mode, the transaction type
847+
* will be a read/write transaction. If the connection is in read-only mode, it will be a
848+
* read-only transaction. The transaction will automatically be retried if it is aborted by
849+
* Spanner.
850+
*/
851+
<T> T runTransaction(TransactionCallable<T> callable);
852+
838853
/** Returns the current savepoint support for this connection. */
839854
SavepointSupport getSavepointSupport();
840855

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java

+42-7
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,11 @@ private LeakedConnectionException() {
194194
*/
195195
private final ConnectionOptions options;
196196

197+
enum Caller {
198+
APPLICATION,
199+
TRANSACTION_RUNNER,
200+
}
201+
197202
/** The supported batch modes. */
198203
enum BatchMode {
199204
NONE,
@@ -267,6 +272,9 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
267272
*/
268273
private boolean transactionBeginMarked = false;
269274

275+
/** This field is set to true when a transaction runner is active for this connection. */
276+
private boolean transactionRunnerActive = false;
277+
270278
private BatchMode batchMode;
271279
private UnitOfWorkType unitOfWorkType;
272280
private final Stack<UnitOfWork> transactionStack = new Stack<>();
@@ -1164,16 +1172,19 @@ public void onFailure() {
11641172

11651173
@Override
11661174
public void commit() {
1167-
get(commitAsync(CallType.SYNC));
1175+
get(commitAsync(CallType.SYNC, Caller.APPLICATION));
11681176
}
11691177

11701178
@Override
11711179
public ApiFuture<Void> commitAsync() {
1172-
return commitAsync(CallType.ASYNC);
1180+
return commitAsync(CallType.ASYNC, Caller.APPLICATION);
11731181
}
11741182

1175-
private ApiFuture<Void> commitAsync(CallType callType) {
1183+
ApiFuture<Void> commitAsync(CallType callType, Caller caller) {
11761184
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
1185+
ConnectionPreconditions.checkState(
1186+
!transactionRunnerActive || caller == Caller.TRANSACTION_RUNNER,
1187+
"Cannot call commit when a transaction runner is active");
11771188
maybeAutoCommitOrFlushCurrentUnitOfWork(COMMIT_STATEMENT.getType(), COMMIT_STATEMENT);
11781189
return endCurrentTransactionAsync(callType, commit, COMMIT_STATEMENT);
11791190
}
@@ -1201,16 +1212,19 @@ public void onFailure() {
12011212

12021213
@Override
12031214
public void rollback() {
1204-
get(rollbackAsync(CallType.SYNC));
1215+
get(rollbackAsync(CallType.SYNC, Caller.APPLICATION));
12051216
}
12061217

12071218
@Override
12081219
public ApiFuture<Void> rollbackAsync() {
1209-
return rollbackAsync(CallType.ASYNC);
1220+
return rollbackAsync(CallType.ASYNC, Caller.APPLICATION);
12101221
}
12111222

1212-
private ApiFuture<Void> rollbackAsync(CallType callType) {
1223+
ApiFuture<Void> rollbackAsync(CallType callType, Caller caller) {
12131224
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
1225+
ConnectionPreconditions.checkState(
1226+
!transactionRunnerActive || caller == Caller.TRANSACTION_RUNNER,
1227+
"Cannot call rollback when a transaction runner is active");
12141228
maybeAutoCommitOrFlushCurrentUnitOfWork(ROLLBACK_STATEMENT.getType(), ROLLBACK_STATEMENT);
12151229
return endCurrentTransactionAsync(callType, rollback, ROLLBACK_STATEMENT);
12161230
}
@@ -1243,6 +1257,27 @@ private ApiFuture<Void> endCurrentTransactionAsync(
12431257
return res;
12441258
}
12451259

1260+
@Override
1261+
public <T> T runTransaction(TransactionCallable<T> callable) {
1262+
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
1263+
ConnectionPreconditions.checkState(!isBatchActive(), "Cannot run transaction while in a batch");
1264+
ConnectionPreconditions.checkState(
1265+
!isTransactionStarted(), "Cannot run transaction when a transaction is already active");
1266+
ConnectionPreconditions.checkState(
1267+
!transactionRunnerActive, "A transaction runner is already active for this connection");
1268+
this.transactionRunnerActive = true;
1269+
try {
1270+
return new TransactionRunnerImpl(this).run(callable);
1271+
} finally {
1272+
this.transactionRunnerActive = false;
1273+
}
1274+
}
1275+
1276+
void resetForRetry(UnitOfWork retryUnitOfWork) {
1277+
retryUnitOfWork.resetForRetry();
1278+
this.currentUnitOfWork = retryUnitOfWork;
1279+
}
1280+
12461281
@Override
12471282
public SavepointSupport getSavepointSupport() {
12481283
return getConnectionPropertyValue(SAVEPOINT_SUPPORT);
@@ -2000,7 +2035,7 @@ private UnitOfWork maybeStartAutoDmlBatch(UnitOfWork transaction) {
20002035
return transaction;
20012036
}
20022037

2003-
private UnitOfWork getCurrentUnitOfWorkOrStartNewUnitOfWork() {
2038+
UnitOfWork getCurrentUnitOfWorkOrStartNewUnitOfWork() {
20042039
return getCurrentUnitOfWorkOrStartNewUnitOfWork(
20052040
StatementType.UNKNOWN, /* parsedStatement = */ null, /* internalMetadataQuery = */ false);
20062041
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java

+5
Original file line numberDiff line numberDiff line change
@@ -1261,6 +1261,11 @@ private ApiFuture<Void> rollbackAsync(CallType callType, boolean updateStatusAnd
12611261
}
12621262
}
12631263

1264+
@Override
1265+
public void resetForRetry() {
1266+
txContextFuture = ApiFutures.immediateFuture(txManager.resetForRetry());
1267+
}
1268+
12641269
@Override
12651270
String getUnitOfWorkName() {
12661271
return "read/write transaction";
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner.connection;
18+
19+
import static com.google.cloud.spanner.SpannerApiFutures.get;
20+
21+
import com.google.cloud.spanner.AbortedException;
22+
import com.google.cloud.spanner.SpannerExceptionFactory;
23+
import com.google.cloud.spanner.connection.Connection.TransactionCallable;
24+
import com.google.cloud.spanner.connection.ConnectionImpl.Caller;
25+
import com.google.cloud.spanner.connection.UnitOfWork.CallType;
26+
27+
class TransactionRunnerImpl {
28+
private final ConnectionImpl connection;
29+
30+
TransactionRunnerImpl(ConnectionImpl connection) {
31+
this.connection = connection;
32+
}
33+
34+
<T> T run(TransactionCallable<T> callable) {
35+
connection.beginTransaction();
36+
// Disable internal retries during this transaction.
37+
connection.setRetryAbortsInternally(/* retryAbortsInternally = */ false, /* local = */ true);
38+
UnitOfWork transaction = connection.getCurrentUnitOfWorkOrStartNewUnitOfWork();
39+
while (true) {
40+
try {
41+
T result = callable.run(connection);
42+
get(connection.commitAsync(CallType.SYNC, Caller.TRANSACTION_RUNNER));
43+
return result;
44+
} catch (AbortedException abortedException) {
45+
try {
46+
//noinspection BusyWait
47+
Thread.sleep(abortedException.getRetryDelayInMillis());
48+
connection.resetForRetry(transaction);
49+
} catch (InterruptedException interruptedException) {
50+
connection.rollbackAsync(CallType.SYNC, Caller.TRANSACTION_RUNNER);
51+
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
52+
} catch (Throwable t) {
53+
connection.rollbackAsync(CallType.SYNC, Caller.TRANSACTION_RUNNER);
54+
throw t;
55+
}
56+
} catch (Throwable t) {
57+
connection.rollbackAsync(CallType.SYNC, Caller.TRANSACTION_RUNNER);
58+
throw t;
59+
}
60+
}
61+
}
62+
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/UnitOfWork.java

+4
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,10 @@ interface EndTransactionCallback {
125125
ApiFuture<Void> rollbackAsync(
126126
@Nonnull CallType callType, @Nonnull EndTransactionCallback callback);
127127

128+
default void resetForRetry() {
129+
throw new UnsupportedOperationException();
130+
}
131+
128132
/** @see Connection#savepoint(String) */
129133
void savepoint(@Nonnull String name, @Nonnull Dialect dialect);
130134

0 commit comments

Comments
 (0)