Skip to content

Commit c09103c

Browse files
committed
Merge pull request #14760 from jasontedor/immutable-transport-options
Transport options should be immutable
2 parents 2172e60 + 3acd780 commit c09103c

File tree

16 files changed

+123
-105
lines changed

16 files changed

+123
-105
lines changed

core/src/main/java/org/elasticsearch/action/bulk/BulkAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ public BulkRequestBuilder newRequestBuilder(ElasticsearchClient client) {
4747

4848
@Override
4949
public TransportRequestOptions transportOptions(Settings settings) {
50-
return TransportRequestOptions.options()
50+
return TransportRequestOptions.builder()
5151
.withType(TransportRequestOptions.Type.BULK)
5252
.withCompress(settings.getAsBoolean("action.bulk.compress", true)
53-
);
53+
).build();
5454
}
5555
}

core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,11 @@ public void run() {
126126
});
127127
return;
128128
}
129-
TransportRequestOptions transportRequestOptions = TransportRequestOptions.options();
129+
TransportRequestOptions.Builder builder = TransportRequestOptions.builder();
130130
if (request.timeout() != null) {
131-
transportRequestOptions.withTimeout(request.timeout());
131+
builder.withTimeout(request.timeout());
132132
}
133-
transportRequestOptions.withCompress(transportCompress());
133+
builder.withCompress(transportCompress());
134134
for (int i = 0; i < nodesIds.length; i++) {
135135
final String nodeId = nodesIds[i];
136136
final int idx = i;
@@ -145,7 +145,7 @@ public void run() {
145145
onFailure(idx, nodeId, new NodeShouldNotConnectException(clusterService.localNode(), node));
146146
} else {
147147
NodeRequest nodeRequest = newNodeRequest(nodeId, request);
148-
transportService.sendRequest(node, transportNodeAction, nodeRequest, transportRequestOptions, new BaseTransportResponseHandler<NodeResponse>() {
148+
transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new BaseTransportResponseHandler<NodeResponse>() {
149149
@Override
150150
public NodeResponse newInstance() {
151151
return newNodeResponse();

core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ protected void doSample() {
359359
try {
360360
LivenessResponse livenessResponse = transportService.submitRequest(listedNode, TransportLivenessAction.NAME,
361361
headers.applyTo(new LivenessRequest()),
362-
TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout),
362+
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
363363
new FutureTransportResponseHandler<LivenessResponse>() {
364364
@Override
365365
public LivenessResponse newInstance() {
@@ -430,7 +430,7 @@ public void run() {
430430
}
431431
transportService.sendRequest(listedNode, ClusterStateAction.NAME,
432432
headers.applyTo(Requests.clusterStateRequest().clear().nodes(true).local(true)),
433-
TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout),
433+
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
434434
new BaseTransportResponseHandler<ClusterStateResponse>() {
435435

436436
@Override

core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@
3939
import java.util.concurrent.CopyOnWriteArrayList;
4040
import java.util.concurrent.atomic.AtomicBoolean;
4141

42-
import static org.elasticsearch.transport.TransportRequestOptions.options;
43-
4442
/**
4543
* A fault detection that pings the master periodically to see if its alive.
4644
*/
@@ -222,7 +220,7 @@ public void run() {
222220
return;
223221
}
224222
final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName);
225-
final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
223+
final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout).build();
226224
transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler<MasterPingResponseResponse>() {
227225

228226
@Override

core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import java.util.concurrent.CopyOnWriteArrayList;
3535

3636
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
37-
import static org.elasticsearch.transport.TransportRequestOptions.options;
3837

3938
/**
4039
* A fault detection of multiple nodes.
@@ -189,7 +188,7 @@ public void run() {
189188
return;
190189
}
191190
final PingRequest pingRequest = new PingRequest(node.id(), clusterName, localNode, clusterStateVersion);
192-
final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
191+
final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout).build();
193192
transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, new BaseTransportResponseHandler<PingResponse>() {
194193
@Override
195194
public PingResponse newInstance() {

core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ public void run() {
437437

438438
private void sendPingRequestToNode(final int id, final TimeValue timeout, final UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) {
439439
logger.trace("[{}] sending to {}", id, nodeToSend);
440-
transportService.sendRequest(nodeToSend, ACTION_NAME, pingRequest, TransportRequestOptions.options().withTimeout((long) (timeout.millis() * 1.25)), new BaseTransportResponseHandler<UnicastPingResponse>() {
440+
transportService.sendRequest(nodeToSend, ACTION_NAME, pingRequest, TransportRequestOptions.builder().withTimeout((long) (timeout.millis() * 1.25)).build(), new BaseTransportResponseHandler<UnicastPingResponse>() {
441441

442442
@Override
443443
public UnicastPingResponse newInstance() {

core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ private void sendClusterStateToNode(final ClusterState clusterState, BytesRefere
248248
// -> no need to put a timeout on the options here, because we want the response to eventually be received
249249
// and not log an error if it arrives after the timeout
250250
// -> no need to compress, we already compressed the bytes
251-
TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false);
251+
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withCompress(false).build();
252252
transportService.sendRequest(node, SEND_ACTION_NAME,
253253
new BytesTransportRequest(bytes, node.version()),
254254
options,
@@ -282,7 +282,7 @@ public void handleException(TransportException exp) {
282282
private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final SendingController sendingController) {
283283
try {
284284
logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]", clusterState.stateUUID(), clusterState.version(), node);
285-
TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE);
285+
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build();
286286
// no need to put a timeout on the options here, because we want the response to eventually be received
287287
// and not log an error if it arrives after the timeout
288288
transportService.sendRequest(node, COMMIT_ACTION_NAME,

core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,11 @@ public RecoverySourceHandler(final IndexShard shard, final StartRecoveryRequest
109109
this.shardId = this.request.shardId().id();
110110

111111
this.response = new RecoveryResponse();
112-
this.requestOptions = TransportRequestOptions.options()
112+
this.requestOptions = TransportRequestOptions.builder()
113113
.withCompress(recoverySettings.compress())
114114
.withType(TransportRequestOptions.Type.RECOVERY)
115-
.withTimeout(recoverySettings.internalActionTimeout());
115+
.withTimeout(recoverySettings.internalActionTimeout())
116+
.build();
116117

117118
}
118119

@@ -244,7 +245,7 @@ public void phase1(final IndexCommit snapshot, final Translog.View translogView)
244245
response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes,
245246
translogView.totalOperations());
246247
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest,
247-
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
248+
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
248249
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
249250
});
250251
// How many bytes we've copied since we last called RateLimiter.pause
@@ -263,7 +264,7 @@ public void phase1(final IndexCommit snapshot, final Translog.View translogView)
263264
try {
264265
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
265266
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, translogView.totalOperations()),
266-
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
267+
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
267268
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
268269
} catch (RemoteTransportException remoteException) {
269270
final IOException corruptIndexException;
@@ -332,7 +333,7 @@ public void run() throws InterruptedException {
332333
// garbage collection (not the JVM's GC!) of tombstone deletes
333334
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG,
334335
new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId(), translogView.totalOperations()),
335-
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
336+
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
336337
}
337338
});
338339

@@ -390,7 +391,7 @@ public void run() throws InterruptedException {
390391
// during this time
391392
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE,
392393
new RecoveryFinalizeRecoveryRequest(request.recoveryId(), request.shardId()),
393-
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionLongTimeout()),
394+
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
394395
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
395396
}
396397
});
@@ -431,10 +432,11 @@ protected int sendSnapshot(final Translog.Snapshot snapshot) {
431432
throw new ElasticsearchException("failed to get next operation from translog", ex);
432433
}
433434

434-
final TransportRequestOptions recoveryOptions = TransportRequestOptions.options()
435+
final TransportRequestOptions recoveryOptions = TransportRequestOptions.builder()
435436
.withCompress(recoverySettings.compress())
436437
.withType(TransportRequestOptions.Type.RECOVERY)
437-
.withTimeout(recoverySettings.internalActionLongTimeout());
438+
.withTimeout(recoverySettings.internalActionLongTimeout())
439+
.build();
438440

439441
if (operation == null) {
440442
logger.trace("[{}][{}] no translog operations to send to {}",

core/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java

Lines changed: 56 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -21,75 +21,80 @@
2121

2222
import org.elasticsearch.common.unit.TimeValue;
2323

24-
/**
25-
*
26-
*/
2724
public class TransportRequestOptions {
2825

29-
public static final TransportRequestOptions EMPTY = options();
26+
private final TimeValue timeout;
27+
private final boolean compress;
28+
private final Type type;
3029

31-
public static TransportRequestOptions options() {
32-
return new TransportRequestOptions();
30+
private TransportRequestOptions(TimeValue timeout, boolean compress, Type type) {
31+
this.timeout = timeout;
32+
this.compress = compress;
33+
this.type = type;
3334
}
3435

35-
public static enum Type {
36-
RECOVERY,
37-
BULK,
38-
REG,
39-
STATE,
40-
PING;
41-
42-
public static Type fromString(String type) {
43-
if ("bulk".equalsIgnoreCase(type)) {
44-
return BULK;
45-
} else if ("reg".equalsIgnoreCase(type)) {
46-
return REG;
47-
} else if ("state".equalsIgnoreCase(type)) {
48-
return STATE;
49-
} else if ("recovery".equalsIgnoreCase(type)) {
50-
return RECOVERY;
51-
} else if ("ping".equalsIgnoreCase(type)) {
52-
return PING;
53-
} else {
54-
throw new IllegalArgumentException("failed to match transport type for [" + type + "]");
55-
}
56-
}
36+
public TimeValue timeout() {
37+
return this.timeout;
5738
}
5839

59-
private TimeValue timeout;
40+
public boolean compress() {
41+
return this.compress;
42+
}
6043

61-
private boolean compress;
44+
public Type type() {
45+
return this.type;
46+
}
6247

63-
private Type type = Type.REG;
48+
public static final TransportRequestOptions EMPTY = new TransportRequestOptions.Builder().build();
6449

65-
public TransportRequestOptions withTimeout(long timeout) {
66-
return withTimeout(TimeValue.timeValueMillis(timeout));
50+
public enum Type {
51+
RECOVERY,
52+
BULK,
53+
REG,
54+
STATE,
55+
PING
6756
}
6857

69-
public TransportRequestOptions withTimeout(TimeValue timeout) {
70-
this.timeout = timeout;
71-
return this;
58+
public static Builder builder() {
59+
return new Builder();
7260
}
7361

74-
public TransportRequestOptions withCompress(boolean compress) {
75-
this.compress = compress;
76-
return this;
62+
public static Builder builder(TransportRequestOptions options) {
63+
return new Builder()
64+
.withTimeout(options.timeout)
65+
.withCompress(options.compress)
66+
.withType(options.type());
7767
}
7868

79-
public TransportRequestOptions withType(Type type) {
80-
this.type = type;
81-
return this;
82-
}
69+
public static class Builder {
70+
private TimeValue timeout;
71+
private boolean compress;
72+
private Type type = Type.REG;
8373

84-
public TimeValue timeout() {
85-
return this.timeout;
86-
}
74+
private Builder() {
75+
}
8776

88-
public boolean compress() {
89-
return this.compress;
90-
}
77+
public Builder withTimeout(long timeout) {
78+
return withTimeout(TimeValue.timeValueMillis(timeout));
79+
}
9180

92-
public Type type() {
93-
return this.type;
81+
public Builder withTimeout(TimeValue timeout) {
82+
this.timeout = timeout;
83+
return this;
84+
}
85+
86+
public Builder withCompress(boolean compress) {
87+
this.compress = compress;
88+
return this;
89+
}
90+
91+
public Builder withType(Type type) {
92+
this.type = type;
93+
return this;
94+
}
95+
96+
public TransportRequestOptions build() {
97+
return new TransportRequestOptions(timeout, compress, type);
98+
}
9499
}
95100
}

core/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,37 @@
2424
*/
2525
public class TransportResponseOptions {
2626

27-
public static final TransportResponseOptions EMPTY = options();
27+
private final boolean compress;
2828

29-
public static TransportResponseOptions options() {
30-
return new TransportResponseOptions();
31-
}
32-
33-
private boolean compress;
34-
35-
public TransportResponseOptions withCompress(boolean compress) {
29+
private TransportResponseOptions(boolean compress) {
3630
this.compress = compress;
37-
return this;
3831
}
3932

4033
public boolean compress() {
4134
return this.compress;
4235
}
36+
37+
public static final TransportResponseOptions EMPTY = TransportResponseOptions.builder().build();
38+
39+
public static Builder builder() {
40+
return new Builder();
41+
}
42+
43+
public static Builder builder(TransportResponseOptions options) {
44+
return new Builder()
45+
.withCompress(options.compress);
46+
}
47+
48+
public static class Builder {
49+
private boolean compress;
50+
51+
public Builder withCompress(boolean compress) {
52+
this.compress = compress;
53+
return this;
54+
}
55+
56+
public TransportResponseOptions build() {
57+
return new TransportResponseOptions(compress);
58+
}
59+
}
4360
}

core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -812,7 +812,7 @@ public void sendRequest(final DiscoveryNode node, final long requestId, final St
812812
Channel targetChannel = nodeChannel(node, options);
813813

814814
if (compress) {
815-
options.withCompress(true);
815+
options = TransportRequestOptions.builder(options).withCompress(true).build();
816816
}
817817

818818
byte status = 0;

core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public void sendResponse(TransportResponse response) throws IOException {
7878
@Override
7979
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
8080
if (transport.compress) {
81-
options.withCompress(true);
81+
options = TransportResponseOptions.builder(options).withCompress(transport.compress).build();
8282
}
8383

8484
byte status = 0;

0 commit comments

Comments
 (0)