Skip to content

Commit 79d2d74

Browse files
committed
Use final fields in UnicastZenPing request/response objects (#28406)
Prevents a NullPointerException that can happen due to concurrency in UnicastZenPing, see #21658.
1 parent a0d5867 commit 79d2d74

File tree

2 files changed

+65
-78
lines changed

2 files changed

+65
-78
lines changed

server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@
9191
import static java.util.Collections.emptyMap;
9292
import static java.util.Collections.emptySet;
9393
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
94-
import static org.elasticsearch.discovery.zen.ZenPing.PingResponse.readPingResponse;
9594

9695
public class UnicastZenPing extends AbstractComponent implements ZenPing {
9796

@@ -162,7 +161,7 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService
162161
concurrentConnects,
163162
resolveTimeout);
164163

165-
transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest::new, ThreadPool.Names.SAME,
164+
transportService.registerRequestHandler(ACTION_NAME, ThreadPool.Names.SAME, UnicastPingRequest::new,
166165
new UnicastPingRequestHandler());
167166

168167
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
@@ -456,12 +455,8 @@ public ConnectionProfile getConnectionProfile() {
456455

457456

458457
protected void sendPings(final TimeValue timeout, final PingingRound pingingRound) {
459-
final UnicastPingRequest pingRequest = new UnicastPingRequest();
460-
pingRequest.id = pingingRound.id();
461-
pingRequest.timeout = timeout;
462-
ClusterState lastState = contextProvider.clusterState();
463-
464-
pingRequest.pingResponse = createPingResponse(lastState);
458+
final ClusterState lastState = contextProvider.clusterState();
459+
final UnicastPingRequest pingRequest = new UnicastPingRequest(pingingRound.id(), timeout, createPingResponse(lastState));
465460

466461
Set<DiscoveryNode> nodesFromResponses = temporalResponses.stream().map(pingResponse -> {
467462
assert clusterName.equals(pingResponse.clusterName()) :
@@ -553,8 +548,8 @@ protected TransportResponseHandler<UnicastPingResponse> getPingResponseHandler(f
553548
return new TransportResponseHandler<UnicastPingResponse>() {
554549

555550
@Override
556-
public UnicastPingResponse newInstance() {
557-
return new UnicastPingResponse();
551+
public UnicastPingResponse read(StreamInput in) throws IOException {
552+
return new UnicastPingResponse(in);
558553
}
559554

560555
@Override
@@ -599,11 +594,7 @@ private UnicastPingResponse handlePingRequest(final UnicastPingRequest request)
599594
List<PingResponse> pingResponses = CollectionUtils.iterableAsArrayList(temporalResponses);
600595
pingResponses.add(createPingResponse(contextProvider.clusterState()));
601596

602-
UnicastPingResponse unicastPingResponse = new UnicastPingResponse();
603-
unicastPingResponse.id = request.id;
604-
unicastPingResponse.pingResponses = pingResponses.toArray(new PingResponse[pingResponses.size()]);
605-
606-
return unicastPingResponse;
597+
return new UnicastPingResponse(request.id, pingResponses.toArray(new PingResponse[pingResponses.size()]));
607598
}
608599

609600
class UnicastPingRequestHandler implements TransportRequestHandler<UnicastPingRequest> {
@@ -627,21 +618,28 @@ public void messageReceived(UnicastPingRequest request, TransportChannel channel
627618

628619
}
629620

630-
public static class UnicastPingRequest extends TransportRequest {
621+
static class UnicastPingRequest extends TransportRequest {
631622

632-
int id;
633-
TimeValue timeout;
634-
PingResponse pingResponse;
623+
final int id;
624+
final TimeValue timeout;
625+
final PingResponse pingResponse;
635626

636-
public UnicastPingRequest() {
627+
UnicastPingRequest(int id, TimeValue timeout, PingResponse pingResponse) {
628+
this.id = id;
629+
this.timeout = timeout;
630+
this.pingResponse = pingResponse;
637631
}
638632

639-
@Override
640-
public void readFrom(StreamInput in) throws IOException {
641-
super.readFrom(in);
633+
UnicastPingRequest(StreamInput in) throws IOException {
634+
super(in);
642635
id = in.readInt();
643636
timeout = new TimeValue(in);
644-
pingResponse = readPingResponse(in);
637+
pingResponse = new PingResponse(in);
638+
}
639+
640+
@Override
641+
public void readFrom(StreamInput in) throws IOException {
642+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
645643
}
646644

647645
@Override
@@ -660,23 +658,28 @@ private PingResponse createPingResponse(ClusterState clusterState) {
660658

661659
static class UnicastPingResponse extends TransportResponse {
662660

663-
int id;
661+
final int id;
664662

665-
PingResponse[] pingResponses;
663+
final PingResponse[] pingResponses;
666664

667-
UnicastPingResponse() {
665+
UnicastPingResponse(int id, PingResponse[] pingResponses) {
666+
this.id = id;
667+
this.pingResponses = pingResponses;
668668
}
669669

670-
@Override
671-
public void readFrom(StreamInput in) throws IOException {
672-
super.readFrom(in);
670+
UnicastPingResponse(StreamInput in) throws IOException {
673671
id = in.readInt();
674672
pingResponses = new PingResponse[in.readVInt()];
675673
for (int i = 0; i < pingResponses.length; i++) {
676-
pingResponses[i] = readPingResponse(in);
674+
pingResponses[i] = new PingResponse(in);
677675
}
678676
}
679677

678+
@Override
679+
public void readFrom(StreamInput in) throws IOException {
680+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
681+
}
682+
680683
@Override
681684
public void writeTo(StreamOutput out) throws IOException {
682685
super.writeTo(out);

server/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java

Lines changed: 31 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.elasticsearch.cluster.node.DiscoveryNode;
2525
import org.elasticsearch.common.io.stream.StreamInput;
2626
import org.elasticsearch.common.io.stream.StreamOutput;
27-
import org.elasticsearch.common.io.stream.Streamable;
27+
import org.elasticsearch.common.io.stream.Writeable;
2828
import org.elasticsearch.common.lease.Releasable;
2929
import org.elasticsearch.common.unit.TimeValue;
3030

@@ -44,26 +44,21 @@ public interface ZenPing extends Releasable {
4444

4545
void ping(Consumer<PingCollection> resultsConsumer, TimeValue timeout);
4646

47-
class PingResponse implements Streamable {
48-
49-
public static final PingResponse[] EMPTY = new PingResponse[0];
47+
class PingResponse implements Writeable {
5048

5149
private static final AtomicLong idGenerator = new AtomicLong();
5250

5351
// an always increasing unique identifier for this ping response.
5452
// lower values means older pings.
55-
private long id;
56-
57-
private ClusterName clusterName;
53+
private final long id;
5854

59-
private DiscoveryNode node;
55+
private final ClusterName clusterName;
6056

61-
private DiscoveryNode master;
57+
private final DiscoveryNode node;
6258

63-
private long clusterStateVersion;
59+
private final DiscoveryNode master;
6460

65-
private PingResponse() {
66-
}
61+
private final long clusterStateVersion;
6762

6863
/**
6964
* @param node the node which this ping describes
@@ -86,14 +81,34 @@ public PingResponse(DiscoveryNode node, DiscoveryNode master, ClusterState state
8681
ElectMasterService.MasterCandidate.UNRECOVERED_CLUSTER_VERSION : state.version());
8782
}
8883

89-
/**
90-
* an always increasing unique identifier for this ping response.
91-
* lower values means older pings.
92-
*/
84+
PingResponse(StreamInput in) throws IOException {
85+
this.clusterName = new ClusterName(in);
86+
this.node = new DiscoveryNode(in);
87+
this.master = in.readOptionalWriteable(DiscoveryNode::new);
88+
this.clusterStateVersion = in.readLong();
89+
this.id = in.readLong();
90+
}
91+
92+
@Override
93+
public void writeTo(StreamOutput out) throws IOException {
94+
clusterName.writeTo(out);
95+
node.writeTo(out);
96+
out.writeOptionalWriteable(master);
97+
out.writeLong(clusterStateVersion);
98+
out.writeLong(id);
99+
}
100+
101+
/**
102+
* an always increasing unique identifier for this ping response.
103+
* lower values means older pings.
104+
*/
93105
public long id() {
94106
return this.id;
95107
}
96108

109+
/**
110+
* the name of the cluster this node belongs to
111+
*/
97112
public ClusterName clusterName() {
98113
return this.clusterName;
99114
}
@@ -115,37 +130,6 @@ public long getClusterStateVersion() {
115130
return clusterStateVersion;
116131
}
117132

118-
public static PingResponse readPingResponse(StreamInput in) throws IOException {
119-
PingResponse response = new PingResponse();
120-
response.readFrom(in);
121-
return response;
122-
}
123-
124-
@Override
125-
public void readFrom(StreamInput in) throws IOException {
126-
clusterName = new ClusterName(in);
127-
node = new DiscoveryNode(in);
128-
if (in.readBoolean()) {
129-
master = new DiscoveryNode(in);
130-
}
131-
this.clusterStateVersion = in.readLong();
132-
this.id = in.readLong();
133-
}
134-
135-
@Override
136-
public void writeTo(StreamOutput out) throws IOException {
137-
clusterName.writeTo(out);
138-
node.writeTo(out);
139-
if (master == null) {
140-
out.writeBoolean(false);
141-
} else {
142-
out.writeBoolean(true);
143-
master.writeTo(out);
144-
}
145-
out.writeLong(clusterStateVersion);
146-
out.writeLong(id);
147-
}
148-
149133
@Override
150134
public String toString() {
151135
return "ping_response{node [" + node + "], id[" + id + "], master [" + master + "]," +

0 commit comments

Comments
 (0)