Skip to content

Commit 46464fc

Browse files
authored
chore: add internal option for statement executor type (#3534)
The Connection API by default uses either a platform thread or a virtual thread for each connection to execute and control the statements of that connection. This is used to enable asynchronous execution of statements and allows a statement to be cancelled by just interrupting this thread. Both these use cases are however not (or only very rarely) used by the most common users of the Connection API; the JDBC driver and PGAdapter. PGAdapter uses the PostgreSQL wire-protocol, which by design is synchronous, and JDBC is also a synchronous API. The latter has a cancel() method that currently requires this threading model, but this can be modified in the JDBC driver. Using a direct executor instead of a single-threaded executor per connection can save one thread per connection. The option is intentionally made package-private, so the above-mentioned frameworks can set it by default without it becoming part of the public API.
1 parent 3f4873d commit 46464fc

File tree

8 files changed

+136
-10
lines changed

8 files changed

+136
-10
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java

+16-3
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import com.google.cloud.spanner.connection.AbstractStatementParser.StatementType;
7777
import com.google.cloud.spanner.connection.ConnectionProperty.Context;
7878
import com.google.cloud.spanner.connection.ConnectionState.Type;
79+
import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType;
7980
import com.google.cloud.spanner.connection.StatementExecutor.StatementTimeout;
8081
import com.google.cloud.spanner.connection.StatementResult.ResultType;
8182
import com.google.cloud.spanner.connection.UnitOfWork.CallType;
@@ -284,9 +285,17 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
284285
Preconditions.checkNotNull(options);
285286
this.leakedException =
286287
options.isTrackConnectionLeaks() ? new LeakedConnectionException() : null;
288+
StatementExecutorType statementExecutorType;
289+
if (options.getStatementExecutorType() != null) {
290+
statementExecutorType = options.getStatementExecutorType();
291+
} else {
292+
statementExecutorType =
293+
options.isUseVirtualThreads()
294+
? StatementExecutorType.VIRTUAL_THREAD
295+
: StatementExecutorType.DIRECT_EXECUTOR;
296+
}
287297
this.statementExecutor =
288-
new StatementExecutor(
289-
options.isUseVirtualThreads(), options.getStatementExecutionInterceptors());
298+
new StatementExecutor(statementExecutorType, options.getStatementExecutionInterceptors());
290299
this.spannerPool = SpannerPool.INSTANCE;
291300
this.options = options;
292301
this.spanner = spannerPool.getSpanner(options, this);
@@ -330,7 +339,11 @@ && getDialect() == Dialect.POSTGRESQL
330339
this.leakedException =
331340
options.isTrackConnectionLeaks() ? new LeakedConnectionException() : null;
332341
this.statementExecutor =
333-
new StatementExecutor(options.isUseVirtualThreads(), Collections.emptyList());
342+
new StatementExecutor(
343+
options.isUseVirtualThreads()
344+
? StatementExecutorType.VIRTUAL_THREAD
345+
: StatementExecutorType.DIRECT_EXECUTOR,
346+
Collections.emptyList());
334347
this.spannerPool = Preconditions.checkNotNull(spannerPool);
335348
this.options = Preconditions.checkNotNull(options);
336349
this.spanner = spannerPool.getSpanner(options, this);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java

+13
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import com.google.cloud.spanner.SpannerException;
7171
import com.google.cloud.spanner.SpannerExceptionFactory;
7272
import com.google.cloud.spanner.SpannerOptions;
73+
import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType;
7374
import com.google.common.annotations.VisibleForTesting;
7475
import com.google.common.base.Preconditions;
7576
import com.google.common.base.Strings;
@@ -614,6 +615,7 @@ public static class Builder {
614615
new HashMap<>();
615616
private String uri;
616617
private Credentials credentials;
618+
private StatementExecutorType statementExecutorType;
617619
private SessionPoolOptions sessionPoolOptions;
618620
private List<StatementExecutionInterceptor> statementExecutionInterceptors =
619621
Collections.emptyList();
@@ -777,6 +779,11 @@ Builder setCredentials(Credentials credentials) {
777779
return this;
778780
}
779781

782+
Builder setStatementExecutorType(StatementExecutorType statementExecutorType) {
783+
this.statementExecutorType = statementExecutorType;
784+
return this;
785+
}
786+
780787
public Builder setOpenTelemetry(OpenTelemetry openTelemetry) {
781788
this.openTelemetry = openTelemetry;
782789
return this;
@@ -814,6 +821,7 @@ public static Builder newBuilder() {
814821
private final String instanceId;
815822
private final String databaseName;
816823
private final Credentials credentials;
824+
private final StatementExecutorType statementExecutorType;
817825
private final SessionPoolOptions sessionPoolOptions;
818826

819827
private final OpenTelemetry openTelemetry;
@@ -834,6 +842,7 @@ private ConnectionOptions(Builder builder) {
834842
ConnectionPropertyValue<Boolean> value = cast(connectionPropertyValues.get(LENIENT.getKey()));
835843
this.warnings = checkValidProperties(value != null && value.getValue(), uri);
836844
this.fixedCredentials = builder.credentials;
845+
this.statementExecutorType = builder.statementExecutorType;
837846

838847
this.openTelemetry = builder.openTelemetry;
839848
this.statementExecutionInterceptors =
@@ -1105,6 +1114,10 @@ CredentialsProvider getCredentialsProvider() {
11051114
return getInitialConnectionPropertyValue(CREDENTIALS_PROVIDER);
11061115
}
11071116

1117+
StatementExecutorType getStatementExecutorType() {
1118+
return this.statementExecutorType;
1119+
}
1120+
11081121
/** The {@link SessionPoolOptions} of this {@link ConnectionOptions}. */
11091122
public SessionPoolOptions getSessionPoolOptions() {
11101123
return sessionPoolOptions;

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/StatementExecutor.java

+18-5
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,10 @@ java.time.Duration asDuration() {
146146
ThreadFactoryUtil.createVirtualOrPlatformDaemonThreadFactory("connection-executor", false);
147147

148148
/** Creates an {@link ExecutorService} for a {@link StatementExecutor}. */
149-
private static ListeningExecutorService createExecutorService(boolean useVirtualThreads) {
149+
private static ListeningExecutorService createExecutorService(StatementExecutorType type) {
150+
if (type == StatementExecutorType.DIRECT_EXECUTOR) {
151+
return MoreExecutors.newDirectExecutorService();
152+
}
150153
return MoreExecutors.listeningDecorator(
151154
Context.taskWrapping(
152155
new ThreadPoolExecutor(
@@ -155,7 +158,7 @@ private static ListeningExecutorService createExecutorService(boolean useVirtual
155158
0L,
156159
TimeUnit.MILLISECONDS,
157160
new LinkedBlockingQueue<>(),
158-
useVirtualThreads
161+
type == StatementExecutorType.VIRTUAL_THREAD
159162
? DEFAULT_VIRTUAL_THREAD_FACTORY
160163
: DEFAULT_DAEMON_THREAD_FACTORY)));
161164
}
@@ -168,13 +171,23 @@ private static ListeningExecutorService createExecutorService(boolean useVirtual
168171
*/
169172
private final List<StatementExecutionInterceptor> interceptors;
170173

174+
enum StatementExecutorType {
175+
PLATFORM_THREAD,
176+
VIRTUAL_THREAD,
177+
DIRECT_EXECUTOR,
178+
}
179+
171180
@VisibleForTesting
172181
StatementExecutor() {
173-
this(DEFAULT_USE_VIRTUAL_THREADS, Collections.emptyList());
182+
this(
183+
DEFAULT_USE_VIRTUAL_THREADS
184+
? StatementExecutorType.VIRTUAL_THREAD
185+
: StatementExecutorType.PLATFORM_THREAD,
186+
Collections.emptyList());
174187
}
175188

176-
StatementExecutor(boolean useVirtualThreads, List<StatementExecutionInterceptor> interceptors) {
177-
this.executor = createExecutorService(useVirtualThreads);
189+
StatementExecutor(StatementExecutorType type, List<StatementExecutionInterceptor> interceptors) {
190+
this.executor = createExecutorService(type);
178191
this.interceptors = Collections.unmodifiableList(interceptors);
179192
}
180193

google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java

+6
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ ITConnection createConnection(
283283
ConnectionOptions.newBuilder()
284284
.setUri(getBaseUrl() + additionalUrlOptions)
285285
.setStatementExecutionInterceptors(interceptors);
286+
configureConnectionOptions(builder);
286287
ConnectionOptions options = builder.build();
287288
ITConnection connection = createITConnection(options);
288289
for (TransactionRetryListener listener : transactionRetryListeners) {
@@ -291,6 +292,11 @@ ITConnection createConnection(
291292
return connection;
292293
}
293294

295+
protected ConnectionOptions.Builder configureConnectionOptions(
296+
ConnectionOptions.Builder builder) {
297+
return builder;
298+
}
299+
294300
protected String getBaseUrl() {
295301
return String.format(
296302
"cloudspanner://localhost:%d/projects/proj/instances/inst/databases/db?usePlainText=true;autocommit=false;retryAbortsInternally=true",

google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionAsyncApiAbortedTest.java

+7
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232
import com.google.cloud.spanner.Options;
3333
import com.google.cloud.spanner.SpannerExceptionFactory;
3434
import com.google.cloud.spanner.Statement;
35+
import com.google.cloud.spanner.connection.ConnectionOptions.Builder;
3536
import com.google.cloud.spanner.connection.ITAbstractSpannerTest.ITConnection;
37+
import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType;
3638
import com.google.common.collect.Collections2;
3739
import com.google.common.collect.ImmutableList;
3840
import com.google.common.collect.Lists;
@@ -127,6 +129,11 @@ ITConnection createConnection(TransactionRetryListener listener) {
127129
return connection;
128130
}
129131

132+
@Override
133+
protected Builder configureConnectionOptions(Builder builder) {
134+
return builder.setStatementExecutorType(StatementExecutorType.PLATFORM_THREAD);
135+
}
136+
130137
@Test
131138
public void testSingleQueryAborted() {
132139
RetryCounter counter = new RetryCounter();

google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionAsyncApiTest.java

+7
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636
import com.google.cloud.spanner.SpannerApiFutures;
3737
import com.google.cloud.spanner.SpannerException;
3838
import com.google.cloud.spanner.Statement;
39+
import com.google.cloud.spanner.connection.ConnectionOptions.Builder;
3940
import com.google.cloud.spanner.connection.SpannerPool.CheckAndCloseSpannersMode;
41+
import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType;
4042
import com.google.cloud.spanner.connection.StatementResult.ResultType;
4143
import com.google.common.base.Function;
4244
import com.google.common.collect.Collections2;
@@ -86,6 +88,11 @@ public void setup() {
8688
}
8789
}
8890

91+
@Override
92+
protected Builder configureConnectionOptions(Builder builder) {
93+
return builder.setStatementExecutorType(StatementExecutorType.PLATFORM_THREAD);
94+
}
95+
8996
@After
9097
public void reset() {
9198
mockSpanner.removeAllExecutionTimes();

google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionTest.java

+7
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import com.google.cloud.spanner.SpannerOptions;
3737
import com.google.cloud.spanner.Statement;
3838
import com.google.cloud.spanner.TimestampBound;
39+
import com.google.cloud.spanner.connection.ConnectionOptions.Builder;
40+
import com.google.cloud.spanner.connection.StatementExecutor.StatementExecutorType;
3941
import com.google.common.collect.ImmutableList;
4042
import com.google.spanner.v1.BatchCreateSessionsRequest;
4143
import com.google.spanner.v1.CommitRequest;
@@ -417,6 +419,11 @@ protected String getBaseUrl() {
417419
return super.getBaseUrl() + ";maxSessions=1";
418420
}
419421

422+
@Override
423+
protected Builder configureConnectionOptions(Builder builder) {
424+
return builder.setStatementExecutorType(StatementExecutorType.PLATFORM_THREAD);
425+
}
426+
420427
@Test
421428
public void testMaxSessions()
422429
throws InterruptedException, TimeoutException, ExecutionException {

0 commit comments

Comments
 (0)