diff --git a/core/src/main/java/org/elasticsearch/ElasticsearchException.java b/core/src/main/java/org/elasticsearch/ElasticsearchException.java index 3332bfed0c31a..b242811b7be88 100644 --- a/core/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/core/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -47,7 +48,7 @@ /** * A base class for all elasticsearch exceptions. */ -public class ElasticsearchException extends RuntimeException implements ToXContent { +public class ElasticsearchException extends RuntimeException implements ToXContent, Writeable { public static final String REST_EXCEPTION_SKIP_CAUSE = "rest.exception.cause.skip"; public static final String REST_EXCEPTION_SKIP_STACK_TRACE = "rest.exception.stacktrace.skip"; @@ -235,6 +236,7 @@ public boolean contains(Class exType) { } } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(this.getMessage()); out.writeThrowable(this.getCause()); diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsResponse.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsResponse.java index 22d4795fc9566..3136f2b6826ba 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/NodesHotThreadsResponse.java @@ -19,12 +19,14 @@ package org.elasticsearch.action.admin.cluster.node.hotthreads; +import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; +import java.util.List; /** */ @@ -33,26 +35,18 @@ public class NodesHotThreadsResponse extends BaseNodesResponse { NodesHotThreadsResponse() { } - public NodesHotThreadsResponse(ClusterName clusterName, NodeHotThreads[] nodes) { - super(clusterName, nodes); + public NodesHotThreadsResponse(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); } @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - nodes = new NodeHotThreads[in.readVInt()]; - for (int i = 0; i < nodes.length; i++) { - nodes[i] = NodeHotThreads.readNodeHotThreads(in); - } + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(NodeHotThreads::readNodeHotThreads); } @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeVInt(nodes.length); - for (NodeHotThreads node : nodes) { - node.writeTo(out); - } + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeStreamableList(nodes); } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/TransportNodesHotThreadsAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/TransportNodesHotThreadsAction.java index d53f651da4531..7198851fd281d 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/TransportNodesHotThreadsAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/hotthreads/TransportNodesHotThreadsAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.cluster.node.hotthreads; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.nodes.BaseNodeRequest; import org.elasticsearch.action.support.nodes.TransportNodesAction; @@ -35,33 +36,28 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicReferenceArray; /** * */ -public class TransportNodesHotThreadsAction extends TransportNodesAction { +public class TransportNodesHotThreadsAction extends TransportNodesAction { @Inject public TransportNodesHotThreadsAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, NodesHotThreadsAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters, - indexNameExpressionResolver, NodesHotThreadsRequest::new, NodeRequest::new, ThreadPool.Names.GENERIC); + indexNameExpressionResolver, NodesHotThreadsRequest::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeHotThreads.class); } @Override - protected NodesHotThreadsResponse newResponse(NodesHotThreadsRequest request, AtomicReferenceArray responses) { - final List nodes = new ArrayList<>(); - for (int i = 0; i < responses.length(); i++) { - Object resp = responses.get(i); - if (resp instanceof NodeHotThreads) { - nodes.add((NodeHotThreads) resp); - } - } - return new NodesHotThreadsResponse(clusterName, nodes.toArray(new NodeHotThreads[nodes.size()])); + protected NodesHotThreadsResponse newResponse(NodesHotThreadsRequest request, + List responses, List failures) { + return new NodesHotThreadsResponse(clusterName, responses, failures); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoResponse.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoResponse.java index fdb15db8ffc54..d5a43eb030e55 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoResponse.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.cluster.node.info; +import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -30,6 +31,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import java.io.IOException; +import java.util.List; import java.util.Map; /** @@ -40,34 +42,24 @@ public class NodesInfoResponse extends BaseNodesResponse implements To public NodesInfoResponse() { } - public NodesInfoResponse(ClusterName clusterName, NodeInfo[] nodes) { - super(clusterName, nodes); + public NodesInfoResponse(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); } @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - nodes = new NodeInfo[in.readVInt()]; - for (int i = 0; i < nodes.length; i++) { - nodes[i] = NodeInfo.readNodeInfo(in); - } + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(NodeInfo::readNodeInfo); } @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeVInt(nodes.length); - for (NodeInfo node : nodes) { - node.writeTo(out); - } + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeStreamableList(nodes); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field("cluster_name", getClusterName().value()); - builder.startObject("nodes"); - for (NodeInfo nodeInfo : this) { + for (NodeInfo nodeInfo : getNodes()) { builder.startObject(nodeInfo.getNode().getId()); builder.field("name", nodeInfo.getNode().getName()); diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java index f52729faa4f22..f68e2d65903ec 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.cluster.node.info; +import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.nodes.BaseNodeRequest; import org.elasticsearch.action.support.nodes.TransportNodesAction; @@ -34,36 +35,32 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicReferenceArray; /** * */ -public class TransportNodesInfoAction extends TransportNodesAction { +public class TransportNodesInfoAction extends TransportNodesAction { private final NodeService nodeService; @Inject public TransportNodesInfoAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, - NodeService nodeService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + NodeService nodeService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, NodesInfoAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters, - indexNameExpressionResolver, NodesInfoRequest::new, NodeInfoRequest::new, ThreadPool.Names.MANAGEMENT); + indexNameExpressionResolver, NodesInfoRequest::new, NodeInfoRequest::new, ThreadPool.Names.MANAGEMENT, NodeInfo.class); this.nodeService = nodeService; } @Override - protected NodesInfoResponse newResponse(NodesInfoRequest nodesInfoRequest, AtomicReferenceArray responses) { - final List nodesInfos = new ArrayList<>(); - for (int i = 0; i < responses.length(); i++) { - Object resp = responses.get(i); - if (resp instanceof NodeInfo) { - nodesInfos.add((NodeInfo) resp); - } - } - return new NodesInfoResponse(clusterName, nodesInfos.toArray(new NodeInfo[nodesInfos.size()])); + protected NodesInfoResponse newResponse(NodesInfoRequest nodesInfoRequest, + List responses, List failures) { + return new NodesInfoResponse(clusterName, responses, failures); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsResponse.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsResponse.java index af28c1fb5d593..1a9023ab93cad 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsResponse.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.cluster.node.stats; +import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.io.stream.StreamInput; @@ -28,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import java.io.IOException; +import java.util.List; /** * @@ -37,34 +39,24 @@ public class NodesStatsResponse extends BaseNodesResponse implements NodesStatsResponse() { } - public NodesStatsResponse(ClusterName clusterName, NodeStats[] nodes) { - super(clusterName, nodes); + public NodesStatsResponse(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); } @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - nodes = new NodeStats[in.readVInt()]; - for (int i = 0; i < nodes.length; i++) { - nodes[i] = NodeStats.readNodeStats(in); - } + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(NodeStats::readNodeStats); } @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeVInt(nodes.length); - for (NodeStats node : nodes) { - node.writeTo(out); - } + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeStreamableList(nodes); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field("cluster_name", getClusterName().value()); - builder.startObject("nodes"); - for (NodeStats nodeStats : this) { + for (NodeStats nodeStats : getNodes()) { builder.startObject(nodeStats.getNode().getId()); builder.field("timestamp", nodeStats.getTimestamp()); nodeStats.toXContent(builder, params); diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 8ba3d00558b3f..d61e3f1accef4 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.cluster.node.stats; +import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.nodes.BaseNodeRequest; import org.elasticsearch.action.support.nodes.TransportNodesAction; @@ -34,36 +35,31 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicReferenceArray; /** * */ -public class TransportNodesStatsAction extends TransportNodesAction { +public class TransportNodesStatsAction extends TransportNodesAction { private final NodeService nodeService; @Inject public TransportNodesStatsAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, - NodeService nodeService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { - super(settings, NodesStatsAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, - NodesStatsRequest::new, NodeStatsRequest::new, ThreadPool.Names.MANAGEMENT); + NodeService nodeService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, NodesStatsAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters, + indexNameExpressionResolver, NodesStatsRequest::new, NodeStatsRequest::new, ThreadPool.Names.MANAGEMENT, NodeStats.class); this.nodeService = nodeService; } @Override - protected NodesStatsResponse newResponse(NodesStatsRequest nodesInfoRequest, AtomicReferenceArray responses) { - final List nodeStats = new ArrayList<>(); - for (int i = 0; i < responses.length(); i++) { - Object resp = responses.get(i); - if (resp instanceof NodeStats) { - nodeStats.add((NodeStats) resp); - } - } - return new NodesStatsResponse(clusterName, nodeStats.toArray(new NodeStats[nodeStats.size()])); + protected NodesStatsResponse newResponse(NodesStatsRequest request, List responses, List failures) { + return new NodesStatsResponse(clusterName, responses, failures); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java index fb6310a45bf79..bc1393894609a 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java @@ -43,18 +43,20 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicReferenceArray; import static java.util.Collections.unmodifiableMap; /** * Transport client that collects snapshot shard statuses from data nodes */ -public class TransportNodesSnapshotsStatus extends TransportNodesAction { +public class TransportNodesSnapshotsStatus extends TransportNodesAction { public static final String ACTION_NAME = SnapshotsStatusAction.NAME + "[nodes]"; @@ -66,7 +68,7 @@ public TransportNodesSnapshotsStatus(Settings settings, ClusterName clusterName, SnapshotShardsService snapshotShardsService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, ACTION_NAME, clusterName, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, - Request::new, NodeRequest::new, ThreadPool.Names.GENERIC); + Request::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeSnapshotStatus.class); this.snapshotShardsService = snapshotShardsService; } @@ -86,21 +88,8 @@ protected NodeSnapshotStatus newNodeResponse() { } @Override - protected NodesSnapshotStatus newResponse(Request request, AtomicReferenceArray responses) { - final List nodesList = new ArrayList<>(); - final List failures = new ArrayList<>(); - for (int i = 0; i < responses.length(); i++) { - Object resp = responses.get(i); - if (resp instanceof NodeSnapshotStatus) { // will also filter out null response for unallocated ones - nodesList.add((NodeSnapshotStatus) resp); - } else if (resp instanceof FailedNodeException) { - failures.add((FailedNodeException) resp); - } else { - logger.warn("unknown response type [{}], expected NodeSnapshotStatus or FailedNodeException", resp); - } - } - return new NodesSnapshotStatus(clusterName, nodesList.toArray(new NodeSnapshotStatus[nodesList.size()]), - failures.toArray(new FailedNodeException[failures.size()])); + protected NodesSnapshotStatus newResponse(Request request, List responses, List failures) { + return new NodesSnapshotStatus(clusterName, responses, failures); } @Override @@ -169,75 +158,47 @@ public void writeTo(StreamOutput out) throws IOException { public static class NodesSnapshotStatus extends BaseNodesResponse { - private FailedNodeException[] failures; - NodesSnapshotStatus() { } - public NodesSnapshotStatus(ClusterName clusterName, NodeSnapshotStatus[] nodes, FailedNodeException[] failures) { - super(clusterName, nodes); - this.failures = failures; + public NodesSnapshotStatus(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); } @Override - public FailedNodeException[] failures() { - return failures; + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readStreamableList(NodeSnapshotStatus::new); } @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - nodes = new NodeSnapshotStatus[in.readVInt()]; - for (int i = 0; i < nodes.length; i++) { - nodes[i] = new NodeSnapshotStatus(); - nodes[i].readFrom(in); - } - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeVInt(nodes.length); - for (NodeSnapshotStatus response : nodes) { - response.writeTo(out); - } + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeStreamableList(nodes); } } public static class NodeRequest extends BaseNodeRequest { - private SnapshotId[] snapshotIds; + private List snapshotIds; public NodeRequest() { } NodeRequest(String nodeId, TransportNodesSnapshotsStatus.Request request) { super(nodeId); - snapshotIds = request.snapshotIds; + snapshotIds = Arrays.asList(request.snapshotIds); } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - int n = in.readVInt(); - snapshotIds = new SnapshotId[n]; - for (int i = 0; i < n; i++) { - snapshotIds[i] = SnapshotId.readSnapshotId(in); - } + snapshotIds = in.readList(SnapshotId::readSnapshotId); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - if (snapshotIds != null) { - out.writeVInt(snapshotIds.length); - for (int i = 0; i < snapshotIds.length; i++) { - snapshotIds[i].writeTo(out); - } - } else { - out.writeVInt(0); - } + out.writeStreamableList(snapshotIds); } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIndices.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIndices.java index b23b646728801..8c0c427beeab5 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIndices.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIndices.java @@ -22,9 +22,6 @@ import com.carrotsearch.hppc.ObjectObjectHashMap; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.action.admin.indices.stats.CommonStats; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.cache.query.QueryCacheStats; @@ -36,8 +33,9 @@ import org.elasticsearch.search.suggest.completion.CompletionStats; import java.io.IOException; +import java.util.List; -public class ClusterStatsIndices implements ToXContent, Streamable { +public class ClusterStatsIndices implements ToXContent { private int indexCount; private ShardStats shards; @@ -49,10 +47,7 @@ public class ClusterStatsIndices implements ToXContent, Streamable { private SegmentsStats segments; private PercolatorQueryCacheStats percolatorCache; - private ClusterStatsIndices() { - } - - public ClusterStatsIndices(ClusterStatsNodeResponse[] nodeResponses) { + public ClusterStatsIndices(List nodeResponses) { ObjectObjectHashMap countsPerIndex = new ObjectObjectHashMap<>(); this.docs = new DocsStats(); @@ -131,38 +126,6 @@ public PercolatorQueryCacheStats getPercolatorCache() { return percolatorCache; } - @Override - public void readFrom(StreamInput in) throws IOException { - indexCount = in.readVInt(); - shards = ShardStats.readShardStats(in); - docs = DocsStats.readDocStats(in); - store = StoreStats.readStoreStats(in); - fieldData = FieldDataStats.readFieldDataStats(in); - queryCache = QueryCacheStats.readQueryCacheStats(in); - completion = CompletionStats.readCompletionStats(in); - segments = SegmentsStats.readSegmentsStats(in); - percolatorCache = PercolatorQueryCacheStats.readPercolateStats(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(indexCount); - shards.writeTo(out); - docs.writeTo(out); - store.writeTo(out); - fieldData.writeTo(out); - queryCache.writeTo(out); - completion.writeTo(out); - segments.writeTo(out); - percolatorCache.writeTo(out); - } - - public static ClusterStatsIndices readIndicesStats(StreamInput in) throws IOException { - ClusterStatsIndices indicesStats = new ClusterStatsIndices(); - indicesStats.readFrom(in); - return indicesStats; - } - static final class Fields { static final String COUNT = "count"; } @@ -181,7 +144,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } - public static class ShardStats implements ToXContent, Streamable { + public static class ShardStats implements ToXContent { int indices; int total; @@ -326,40 +289,6 @@ public void addIndexShardCount(ShardStats indexShardCount) { } } - public static ShardStats readShardStats(StreamInput in) throws IOException { - ShardStats c = new ShardStats(); - c.readFrom(in); - return c; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - indices = in.readVInt(); - total = in.readVInt(); - primaries = in.readVInt(); - minIndexShards = in.readVInt(); - maxIndexShards = in.readVInt(); - minIndexPrimaryShards = in.readVInt(); - maxIndexPrimaryShards = in.readVInt(); - minIndexReplication = in.readDouble(); - totalIndexReplication = in.readDouble(); - maxIndexReplication = in.readDouble(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(indices); - out.writeVInt(total); - out.writeVInt(primaries); - out.writeVInt(minIndexShards); - out.writeVInt(maxIndexShards); - out.writeVInt(minIndexPrimaryShards); - out.writeVInt(maxIndexPrimaryShards); - out.writeDouble(minIndexReplication); - out.writeDouble(totalIndexReplication); - out.writeDouble(maxIndexReplication); - } - static final class Fields { static final String SHARDS = "shards"; static final String TOTAL = "total"; diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java index 05b7753ef3aaf..017b448124085 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java @@ -26,9 +26,6 @@ import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; @@ -48,7 +45,7 @@ import java.util.Map; import java.util.Set; -public class ClusterStatsNodes implements ToXContent, Writeable { +public class ClusterStatsNodes implements ToXContent { private final Counts counts; private final Set versions; @@ -58,33 +55,12 @@ public class ClusterStatsNodes implements ToXContent, Writeable { private final FsInfo.Path fs; private final Set plugins; - ClusterStatsNodes(StreamInput in) throws IOException { - this.counts = new Counts(in); - - int size = in.readVInt(); - this.versions = new HashSet<>(size); - for (int i = 0; i < size; i++) { - this.versions.add(Version.readVersion(in)); - } - - this.os = new OsStats(in); - this.process = new ProcessStats(in); - this.jvm = new JvmStats(in); - this.fs = new FsInfo.Path(in); - - size = in.readVInt(); - this.plugins = new HashSet<>(size); - for (int i = 0; i < size; i++) { - this.plugins.add(PluginInfo.readFromStream(in)); - } - } - - ClusterStatsNodes(ClusterStatsNodeResponse[] nodeResponses) { + ClusterStatsNodes(List nodeResponses) { this.versions = new HashSet<>(); this.fs = new FsInfo.Path(); this.plugins = new HashSet<>(); - Set seenAddresses = new HashSet<>(nodeResponses.length); + Set seenAddresses = new HashSet<>(nodeResponses.size()); List nodeInfos = new ArrayList<>(); List nodeStats = new ArrayList<>(); for (ClusterStatsNodeResponse nodeResponse : nodeResponses) { @@ -140,21 +116,6 @@ public Set getPlugins() { return plugins; } - @Override - public void writeTo(StreamOutput out) throws IOException { - counts.writeTo(out); - out.writeVInt(versions.size()); - for (Version v : versions) Version.writeVersion(v, out); - os.writeTo(out); - process.writeTo(out); - jvm.writeTo(out); - fs.writeTo(out); - out.writeVInt(plugins.size()); - for (PluginInfo p : plugins) { - p.writeTo(out); - } - } - static final class Fields { static final String COUNT = "count"; static final String VERSIONS = "versions"; @@ -200,18 +161,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } - public static class Counts implements Writeable, ToXContent { + public static class Counts implements ToXContent { static final String COORDINATING_ONLY = "coordinating_only"; private final int total; private final Map roles; - @SuppressWarnings("unchecked") - private Counts(StreamInput in) throws IOException { - this.total = in.readVInt(); - this.roles = (Map)in.readGenericValue(); - } - private Counts(List nodeInfos) { this.roles = new HashMap<>(); for (DiscoveryNode.Role role : DiscoveryNode.Role.values()) { @@ -243,12 +198,6 @@ public Map getRoles() { return roles; } - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(total); - out.writeGenericValue(roles); - } - static final class Fields { static final String TOTAL = "total"; } @@ -263,7 +212,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } } - public static class OsStats implements ToXContent, Writeable { + public static class OsStats implements ToXContent { final int availableProcessors; final int allocatedProcessors; final ObjectIntHashMap names; @@ -287,30 +236,6 @@ private OsStats(List nodeInfos) { this.allocatedProcessors = allocatedProcessors; } - /** - * Read from a stream. - */ - private OsStats(StreamInput in) throws IOException { - this.availableProcessors = in.readVInt(); - this.allocatedProcessors = in.readVInt(); - int size = in.readVInt(); - this.names = new ObjectIntHashMap<>(); - for (int i = 0; i < size; i++) { - names.addTo(in.readString(), in.readVInt()); - } - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(availableProcessors); - out.writeVInt(allocatedProcessors); - out.writeVInt(names.size()); - for (ObjectIntCursor name : names) { - out.writeString(name.key); - out.writeVInt(name.value); - } - } - public int getAvailableProcessors() { return availableProcessors; } @@ -343,7 +268,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } } - public static class ProcessStats implements ToXContent, Writeable { + public static class ProcessStats implements ToXContent { final int count; final int cpuPercent; @@ -384,27 +309,6 @@ private ProcessStats(List nodeStatsList) { this.maxOpenFileDescriptors = maxOpenFileDescriptors; } - /** - * Read from a stream. - */ - private ProcessStats(StreamInput in) throws IOException { - this.count = in.readVInt(); - this.cpuPercent = in.readVInt(); - this.totalOpenFileDescriptors = in.readVLong(); - this.minOpenFileDescriptors = in.readLong(); - this.maxOpenFileDescriptors = in.readLong(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(count); - out.writeVInt(cpuPercent); - out.writeVLong(totalOpenFileDescriptors); - out.writeLong(minOpenFileDescriptors); - out.writeLong(maxOpenFileDescriptors); - } - - /** * Cpu usage in percentages - 100 is 1 core. */ @@ -456,7 +360,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } } - public static class JvmStats implements Writeable, ToXContent { + public static class JvmStats implements ToXContent { private final ObjectIntHashMap versions; private final long threads; @@ -497,34 +401,6 @@ private JvmStats(List nodeInfos, List nodeStatsList) { this.heapMax = heapMax; } - /** - * Read from a stream. - */ - private JvmStats(StreamInput in) throws IOException { - int size = in.readVInt(); - this.versions = new ObjectIntHashMap<>(size); - for (int i = 0; i < size; i++) { - this.versions.addTo(new JvmVersion(in), in.readVInt()); - } - this.threads = in.readVLong(); - this.maxUptime = in.readVLong(); - this.heapUsed = in.readVLong(); - this.heapMax = in.readVLong(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(versions.size()); - for (ObjectIntCursor v : versions) { - v.key.writeTo(out); - out.writeVInt(v.value); - } - out.writeVLong(threads); - out.writeVLong(maxUptime); - out.writeVLong(heapUsed); - out.writeVLong(heapMax); - } - public ObjectIntHashMap getVersions() { return versions; } @@ -598,7 +474,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } } - public static class JvmVersion implements Writeable { + public static class JvmVersion { String version; String vmName; String vmVersion; @@ -611,27 +487,6 @@ public static class JvmVersion implements Writeable { vmVendor = jvmInfo.getVmVendor(); } - /** - * Read from a stream. - */ - JvmVersion(StreamInput in) throws IOException { - version = in.readString(); - vmName = in.readString(); - vmVersion = in.readString(); - vmVendor = in.readString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(version); - out.writeString(vmName); - out.writeString(vmVersion); - out.writeString(vmVendor); - } - - JvmVersion() { - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java index c272e6d6fbecf..efc72d104f86e 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.cluster.stats; +import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.health.ClusterHealthStatus; @@ -29,9 +30,8 @@ import org.elasticsearch.common.xcontent.XContentFactory; import java.io.IOException; -import java.util.Iterator; +import java.util.List; import java.util.Locale; -import java.util.Map; /** * @@ -48,8 +48,9 @@ public class ClusterStatsResponse extends BaseNodesResponse nodes, List failures) { + super(clusterName, nodes, failures); this.timestamp = timestamp; this.clusterUUID = clusterUUID; nodesStats = new ClusterStatsNodes(nodes); @@ -79,77 +80,53 @@ public ClusterStatsIndices getIndicesStats() { return indicesStats; } - @Override - public ClusterStatsNodeResponse[] getNodes() { - throw new UnsupportedOperationException(); - } - - @Override - public Map getNodesMap() { - throw new UnsupportedOperationException(); - } - - @Override - public ClusterStatsNodeResponse getAt(int position) { - throw new UnsupportedOperationException(); - } - - @Override - public Iterator iterator() { - throw new UnsupportedOperationException(); - } - @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); timestamp = in.readVLong(); - status = null; - if (in.readBoolean()) { - // it may be that the master switched on us while doing the operation. In this case the status may be null. - status = ClusterHealthStatus.fromValue(in.readByte()); - } clusterUUID = in.readString(); - nodesStats = new ClusterStatsNodes(in); - indicesStats = ClusterStatsIndices.readIndicesStats(in); + // it may be that the master switched on us while doing the operation. In this case the status may be null. + status = in.readOptionalWriteable(ClusterHealthStatus::readFrom); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeVLong(timestamp); - if (status == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - out.writeByte(status.value()); - } out.writeString(clusterUUID); - nodesStats.writeTo(out); - indicesStats.writeTo(out); + out.writeOptionalWriteable(status); } - static final class Fields { - static final String NODES = "nodes"; - static final String INDICES = "indices"; - static final String UUID = "uuid"; - static final String CLUSTER_NAME = "cluster_name"; - static final String STATUS = "status"; + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + List nodes = in.readList(ClusterStatsNodeResponse::readNodeResponse); + + // built from nodes rather than from the stream directly + nodesStats = new ClusterStatsNodes(nodes); + indicesStats = new ClusterStatsIndices(nodes); + + return nodes; + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + // nodeStats and indicesStats are rebuilt from nodes + out.writeStreamableList(nodes); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field("timestamp", getTimestamp()); - builder.field(Fields.CLUSTER_NAME, getClusterName().value()); if (params.paramAsBoolean("output_uuid", false)) { - builder.field(Fields.UUID, clusterUUID); + builder.field("uuid", clusterUUID); } if (status != null) { - builder.field(Fields.STATUS, status.name().toLowerCase(Locale.ROOT)); + builder.field("status", status.name().toLowerCase(Locale.ROOT)); } - builder.startObject(Fields.INDICES); + builder.startObject("indices"); indicesStats.toXContent(builder, params); builder.endObject(); - builder.startObject(Fields.NODES); + builder.startObject("nodes"); nodesStats.toXContent(builder, params); builder.endObject(); return builder; diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 4a0eb33c0b596..bae7b20694d50 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.cluster.stats; +import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.indices.stats.CommonStats; @@ -46,7 +47,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicReferenceArray; /** * @@ -68,22 +68,17 @@ public TransportClusterStatsAction(Settings settings, ClusterName clusterName, T NodeService nodeService, IndicesService indicesService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, ClusterStatsAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters, - indexNameExpressionResolver, ClusterStatsRequest::new, ClusterStatsNodeRequest::new, ThreadPool.Names.MANAGEMENT); + indexNameExpressionResolver, ClusterStatsRequest::new, ClusterStatsNodeRequest::new, ThreadPool.Names.MANAGEMENT, + ClusterStatsNodeResponse.class); this.nodeService = nodeService; this.indicesService = indicesService; } @Override - protected ClusterStatsResponse newResponse(ClusterStatsRequest clusterStatsRequest, AtomicReferenceArray responses) { - final List nodeStats = new ArrayList<>(responses.length()); - for (int i = 0; i < responses.length(); i++) { - Object resp = responses.get(i); - if (resp instanceof ClusterStatsNodeResponse) { - nodeStats.add((ClusterStatsNodeResponse) resp); - } - } - return new ClusterStatsResponse(System.currentTimeMillis(), clusterName, - clusterService.state().metaData().clusterUUID(), nodeStats.toArray(new ClusterStatsNodeResponse[nodeStats.size()])); + protected ClusterStatsResponse newResponse(ClusterStatsRequest request, + List responses, List failures) { + return new ClusterStatsResponse(System.currentTimeMillis(), clusterName, clusterService.state().metaData().clusterUUID(), + responses, failures); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java index 5dbac12f694e2..250e4123bba8e 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java @@ -53,6 +53,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -152,7 +153,7 @@ private class InternalAsyncFetch extends AsyncShardFetch responses, List failures) { fetchResponses.add(new Response(shardId, responses, failures)); if (expectedOps.countDown()) { finish(); @@ -220,10 +221,10 @@ protected void reroute(ShardId shardId, String reason) { public class Response { private final ShardId shardId; - private final NodeGatewayStartedShards[] responses; - private final FailedNodeException[] failures; + private final List responses; + private final List failures; - public Response(ShardId shardId, NodeGatewayStartedShards[] responses, FailedNodeException[] failures) { + public Response(ShardId shardId, List responses, List failures) { this.shardId = shardId; this.responses = responses; this.failures = failures; diff --git a/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java b/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java index e1a34413e2cfc..bea5a2c8bc337 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java @@ -80,7 +80,7 @@ protected void masterOperation(PutPipelineRequest request, ClusterState state, A public void onResponse(NodesInfoResponse nodeInfos) { try { Map ingestInfos = new HashMap<>(); - for (NodeInfo nodeInfo : nodeInfos) { + for (NodeInfo nodeInfo : nodeInfos.getNodes()) { ingestInfos.put(nodeInfo.getNode(), nodeInfo.getIngest()); } pipelineStore.put(clusterService, ingestInfos, request, listener); diff --git a/core/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesResponse.java b/core/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesResponse.java index 01401bc7c6ece..a49864154db30 100644 --- a/core/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesResponse.java +++ b/core/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesResponse.java @@ -22,61 +22,77 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; import java.util.HashMap; -import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.Objects; /** * */ -public abstract class BaseNodesResponse extends ActionResponse implements Iterable { +public abstract class BaseNodesResponse extends ActionResponse { private ClusterName clusterName; - protected TNodeResponse[] nodes; + private List failures; + private List nodes; private Map nodesMap; protected BaseNodesResponse() { } - protected BaseNodesResponse(ClusterName clusterName, TNodeResponse[] nodes) { - this.clusterName = clusterName; - this.nodes = nodes; + protected BaseNodesResponse(ClusterName clusterName, List nodes, List failures) { + this.clusterName = Objects.requireNonNull(clusterName); + this.failures = Objects.requireNonNull(failures); + this.nodes = Objects.requireNonNull(nodes); } /** - * The failed nodes, if set to be captured. + * Get the {@link ClusterName} associated with all of the nodes. + * + * @return Never {@code null}. */ - @Nullable - public FailedNodeException[] failures() { - return null; - } - public ClusterName getClusterName() { - return this.clusterName; - } - - public String getClusterNameAsString() { - return this.clusterName.value(); + return clusterName; } - public TNodeResponse[] getNodes() { - return nodes; + /** + * Get the failed node exceptions. + * + * @return Never {@code null}. Can be empty. + */ + public List failures() { + return failures; } - public TNodeResponse getAt(int position) { - return nodes[position]; + /** + * Determine if there are any node failures in {@link #failures}. + * + * @return {@code true} if {@link #failures} contains at least 1 {@link FailedNodeException}. + */ + public boolean hasFailures() { + return failures.isEmpty() == false; } - @Override - public Iterator iterator() { - return getNodesMap().values().iterator(); + /** + * Get the successful node responses. + * + * @return Never {@code null}. Can be empty. + * @see #hasFailures() + */ + public List getNodes() { + return nodes; } + /** + * Lazily build and get a map of Node ID to node response. + * + * @return Never {@code null}. Can be empty. + * @see #getNodes() + */ public Map getNodesMap() { if (nodesMap == null) { nodesMap = new HashMap<>(); @@ -91,11 +107,28 @@ public Map getNodesMap() { public void readFrom(StreamInput in) throws IOException { super.readFrom(in); clusterName = ClusterName.readClusterName(in); + nodes = readNodesFrom(in); + failures = in.readList(FailedNodeException::new); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); clusterName.writeTo(out); + writeNodesTo(out, nodes); + out.writeList(failures); } + + /** + * Read the {@link #nodes} from the stream. + * + * @return Never {@code null}. + */ + protected abstract List readNodesFrom(StreamInput in) throws IOException; + + /** + * Write the {@link #nodes} to the stream. + */ + protected abstract void writeNodesTo(StreamOutput out, List nodes) throws IOException; + } diff --git a/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index c996d530dcefa..2767bc80bf3df 100644 --- a/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -43,6 +43,9 @@ import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.function.Supplier; @@ -50,22 +53,30 @@ /** * */ -public abstract class TransportNodesAction, NodesResponse extends BaseNodesResponse, NodeRequest extends BaseNodeRequest, NodeResponse extends BaseNodeResponse> extends HandledTransportAction { +public abstract class TransportNodesAction, + NodesResponse extends BaseNodesResponse, + NodeRequest extends BaseNodeRequest, + NodeResponse extends BaseNodeResponse> + extends HandledTransportAction { protected final ClusterName clusterName; protected final ClusterService clusterService; protected final TransportService transportService; + protected final Class nodeResponseClass; final String transportNodeAction; protected TransportNodesAction(Settings settings, String actionName, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, Supplier nodeRequest, - String nodeExecutor) { + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier request, Supplier nodeRequest, + String nodeExecutor, + Class nodeResponseClass) { super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request); - this.clusterName = clusterName; - this.clusterService = clusterService; - this.transportService = transportService; + this.clusterName = Objects.requireNonNull(clusterName); + this.clusterService = Objects.requireNonNull(clusterService); + this.transportService = Objects.requireNonNull(transportService); + this.nodeResponseClass = Objects.requireNonNull(nodeResponseClass); this.transportNodeAction = actionName + "[n]"; @@ -87,7 +98,46 @@ protected boolean transportCompress() { return false; } - protected abstract NodesResponse newResponse(NodesRequest request, AtomicReferenceArray nodesResponses); + /** + * Map the responses into {@code nodeResponseClass} responses and {@link FailedNodeException}s. + * + * @param request The associated request. + * @param nodesResponses All node-level responses + * @return Never {@code null}. + * @throws NullPointerException if {@code nodesResponses} is {@code null} + * @see #newResponse(BaseNodesRequest, List, List) + */ + protected NodesResponse newResponse(NodesRequest request, AtomicReferenceArray nodesResponses) { + final List responses = new ArrayList<>(); + final List failures = new ArrayList<>(); + + for (int i = 0; i < nodesResponses.length(); ++i) { + Object response = nodesResponses.get(i); + + if (nodeResponseClass.isInstance(response)) { + responses.add(nodeResponseClass.cast(response)); + } else if (response instanceof FailedNodeException) { + failures.add((FailedNodeException)response); + } else { + logger.warn("ignoring unexpected response [{}] of type [{}], expected [{}] or [{}]", + response, response != null ? response.getClass().getName() : null, + nodeResponseClass.getSimpleName(), FailedNodeException.class.getSimpleName()); + } + } + + return newResponse(request, responses, failures); + } + + /** + * Create a new {@link NodesResponse} (multi-node response). + * + * @param request The associated request. + * @param responses All successful node-level responses. + * @param failures All node-level failures. + * @return Never {@code null}. + * @throws NullPointerException if any parameter is {@code null}. + */ + protected abstract NodesResponse newResponse(NodesRequest request, List responses, List failures); protected abstract NodeRequest newNodeRequest(String nodeId, NodesRequest request); @@ -165,7 +215,8 @@ public void run() { taskManager.registerChildTask(task, node.getId()); } - transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new BaseTransportResponseHandler() { + transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), + new BaseTransportResponseHandler() { @Override public NodeResponse newInstance() { return newNodeResponse(); @@ -238,4 +289,5 @@ public void messageReceived(NodeRequest request, TransportChannel channel) throw } } + } diff --git a/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index b1362538cb9fa..3101e700327bd 100644 --- a/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -404,7 +404,7 @@ static void buildShardLevelInfo(ESLogger logger, ShardStats[] stats, ImmutableOp } } - static void fillDiskUsagePerNode(ESLogger logger, NodeStats[] nodeStatsArray, + static void fillDiskUsagePerNode(ESLogger logger, List nodeStatsArray, ImmutableOpenMap.Builder newLeastAvaiableUsages, ImmutableOpenMap.Builder newMostAvaiableUsages) { for (NodeStats nodeStats : nodeStatsArray) { diff --git a/core/src/main/java/org/elasticsearch/cluster/health/ClusterHealthStatus.java b/core/src/main/java/org/elasticsearch/cluster/health/ClusterHealthStatus.java index 6d3e136eb1a8b..a261d28f5371f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/health/ClusterHealthStatus.java +++ b/core/src/main/java/org/elasticsearch/cluster/health/ClusterHealthStatus.java @@ -20,10 +20,16 @@ package org.elasticsearch.cluster.health; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; + /** * */ -public enum ClusterHealthStatus { +public enum ClusterHealthStatus implements Writeable { GREEN((byte) 0), YELLOW((byte) 1), RED((byte) 2); @@ -38,7 +44,21 @@ public byte value() { return value; } - public static ClusterHealthStatus fromValue(byte value) { + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeByte(value); + } + + /** + * Read from a stream. + * + * @throws IllegalArgumentException if the value is unrecognized + */ + public static ClusterHealthStatus readFrom(StreamInput in) throws IOException { + return fromValue(in.readByte()); + } + + public static ClusterHealthStatus fromValue(byte value) throws IOException { switch (value) { case 0: return GREEN; diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index 8df43f687f6cd..ec56103e7a009 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -736,6 +736,29 @@ public C readOptionalNamedWriteable(Class category return null; } + /** + * Read a {@link List} of {@link Streamable} objects, using the {@code constructor} to instantiate each instance. + *

+ * This is expected to take the form: + * + * List<MyStreamableClass> list = in.readStreamList(MyStreamableClass::new); + * + * + * @param constructor Streamable instance creator + * @return Never {@code null}. + * @throws IOException if any step fails + */ + public List readStreamableList(Supplier constructor) throws IOException { + int count = readVInt(); + List builder = new ArrayList<>(count); + for (int i=0; i list) throws IOException { + writeVInt(list.size()); + for (Streamable obj: list) { + obj.writeTo(this); + } + } + /** * Writes a list of {@link Writeable} objects */ - public void writeList(List list) throws IOException { + public void writeList(List list) throws IOException { writeVInt(list.size()); - for (T obj: list) { + for (Writeable obj: list) { obj.writeTo(this); } } diff --git a/core/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java b/core/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java index cd6268d04ecec..b74507a4acced 100644 --- a/core/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java +++ b/core/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java @@ -35,9 +35,11 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.transport.ReceiveTimeoutTransportException; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; @@ -57,24 +59,24 @@ public abstract class AsyncShardFetch implements Rel /** * An action that lists the relevant shard data that needs to be fetched. */ - public interface List, NodeResponse extends BaseNodeResponse> { + public interface Lister, NodeResponse extends BaseNodeResponse> { void list(ShardId shardId, String[] nodesIds, ActionListener listener); } protected final ESLogger logger; protected final String type; private final ShardId shardId; - private final List, T> action; + private final Lister, T> action; private final Map> cache = new HashMap<>(); private final Set nodesToIgnore = new HashSet<>(); private boolean closed; @SuppressWarnings("unchecked") - protected AsyncShardFetch(ESLogger logger, String type, ShardId shardId, List, T> action) { + protected AsyncShardFetch(ESLogger logger, String type, ShardId shardId, Lister, T> action) { this.logger = logger; this.type = type; this.shardId = shardId; - this.action = (List, T>) action; + this.action = (Lister, T>) action; } @Override @@ -167,7 +169,7 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Set i * the shard (response + failures), issuing a reroute at the end of it to make sure there will be another round * of allocations taking this new data into account. */ - protected synchronized void processAsyncFetch(ShardId shardId, T[] responses, FailedNodeException[] failures) { + protected synchronized void processAsyncFetch(ShardId shardId, List responses, List failures) { if (closed) { // we are closed, no need to process this async fetch at all logger.trace("{} ignoring fetched [{}] results, already closed", shardId, type); @@ -276,9 +278,9 @@ public void onResponse(BaseNodesResponse response) { @Override public void onFailure(Throwable e) { - FailedNodeException[] failures = new FailedNodeException[nodesIds.length]; - for (int i = 0; i < failures.length; i++) { - failures[i] = new FailedNodeException(nodesIds[i], "total failure in fetching", e); + List failures = new ArrayList<>(nodesIds.length); + for (String nodeId : nodesIds) { + failures.add(new FailedNodeException(nodeId, "total failure in fetching", e)); } processAsyncFetch(shardId, null, failures); } diff --git a/core/src/main/java/org/elasticsearch/gateway/Gateway.java b/core/src/main/java/org/elasticsearch/gateway/Gateway.java index 4da789b43fcb2..0e6b19592796b 100644 --- a/core/src/main/java/org/elasticsearch/gateway/Gateway.java +++ b/core/src/main/java/org/elasticsearch/gateway/Gateway.java @@ -82,7 +82,7 @@ public void performStateRecovery(final GatewayStateRecoveredListener listener) t int requiredAllocation = Math.max(1, minimumMasterNodesProvider.get()); - if (nodesState.failures().length > 0) { + if (nodesState.hasFailures()) { for (FailedNodeException failedNodeException : nodesState.failures()) { logger.warn("failed to fetch state from node", failedNodeException); } @@ -91,7 +91,7 @@ public void performStateRecovery(final GatewayStateRecoveredListener listener) t ObjectFloatHashMap indices = new ObjectFloatHashMap<>(); MetaData electedGlobalState = null; int found = 0; - for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState) { + for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) { if (nodeState.metaData() == null) { continue; } @@ -119,7 +119,7 @@ public void performStateRecovery(final GatewayStateRecoveredListener listener) t Index index = (Index) keys[i]; IndexMetaData electedIndexMetaData = null; int indexMetaDataCount = 0; - for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState) { + for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) { if (nodeState.metaData() == null) { continue; } diff --git a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index e76e8085e86c6..15cd0e2bf63d9 100644 --- a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -125,7 +125,7 @@ public boolean allocateUnassigned(final RoutingAllocation allocation) { class InternalAsyncFetch extends AsyncShardFetch { - public InternalAsyncFetch(ESLogger logger, String type, ShardId shardId, List, T> action) { + public InternalAsyncFetch(ESLogger logger, String type, ShardId shardId, Lister, T> action) { super(logger, type, shardId, action); } diff --git a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayMetaState.java b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayMetaState.java index 0fd1fd35809fd..2d0b894939de3 100644 --- a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayMetaState.java +++ b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayMetaState.java @@ -43,14 +43,15 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicReferenceArray; /** * */ -public class TransportNodesListGatewayMetaState extends TransportNodesAction { +public class TransportNodesListGatewayMetaState extends TransportNodesAction { public static final String ACTION_NAME = "internal:gateway/local/meta_state"; @@ -61,7 +62,7 @@ public TransportNodesListGatewayMetaState(Settings settings, ClusterName cluster ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, ACTION_NAME, clusterName, threadPool, clusterService, transportService, actionFilters, - indexNameExpressionResolver, Request::new, NodeRequest::new, ThreadPool.Names.GENERIC); + indexNameExpressionResolver, Request::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeGatewayMetaState.class); } TransportNodesListGatewayMetaState init(GatewayMetaState metaState) { @@ -80,7 +81,7 @@ protected boolean transportCompress() { @Override protected NodeRequest newNodeRequest(String nodeId, Request request) { - return new NodeRequest(nodeId, request); + return new NodeRequest(nodeId); } @Override @@ -89,21 +90,8 @@ protected NodeGatewayMetaState newNodeResponse() { } @Override - protected NodesGatewayMetaState newResponse(Request request, AtomicReferenceArray responses) { - final List nodesList = new ArrayList<>(); - final List failures = new ArrayList<>(); - for (int i = 0; i < responses.length(); i++) { - Object resp = responses.get(i); - if (resp instanceof NodeGatewayMetaState) { // will also filter out null response for unallocated ones - nodesList.add((NodeGatewayMetaState) resp); - } else if (resp instanceof FailedNodeException) { - failures.add((FailedNodeException) resp); - } else { - logger.warn("unknown response type [{}], expected NodeLocalGatewayMetaState or FailedNodeException", resp); - } - } - return new NodesGatewayMetaState(clusterName, nodesList.toArray(new NodeGatewayMetaState[nodesList.size()]), - failures.toArray(new FailedNodeException[failures.size()])); + protected NodesGatewayMetaState newResponse(Request request, List responses, List failures) { + return new NodesGatewayMetaState(clusterName, responses, failures); } @Override @@ -142,47 +130,30 @@ public void writeTo(StreamOutput out) throws IOException { public static class NodesGatewayMetaState extends BaseNodesResponse { - private FailedNodeException[] failures; - NodesGatewayMetaState() { } - public NodesGatewayMetaState(ClusterName clusterName, NodeGatewayMetaState[] nodes, FailedNodeException[] failures) { - super(clusterName, nodes); - this.failures = failures; - } - - public FailedNodeException[] failures() { - return failures; + public NodesGatewayMetaState(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); } @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - nodes = new NodeGatewayMetaState[in.readVInt()]; - for (int i = 0; i < nodes.length; i++) { - nodes[i] = new NodeGatewayMetaState(); - nodes[i].readFrom(in); - } + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readStreamableList(NodeGatewayMetaState::new); } @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeVInt(nodes.length); - for (NodeGatewayMetaState response : nodes) { - response.writeTo(out); - } + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeStreamableList(nodes); } } - public static class NodeRequest extends BaseNodeRequest { public NodeRequest() { } - NodeRequest(String nodeId, TransportNodesListGatewayMetaState.Request request) { + NodeRequest(String nodeId) { super(nodeId); } diff --git a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java index bdeb6d1660f08..675c00880824b 100644 --- a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java @@ -48,9 +48,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicReferenceArray; /** * This transport action is used to fetch the shard version from each node during primary allocation in {@link GatewayAllocator}. @@ -63,7 +61,7 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesListGatewayStartedShards.NodeRequest, TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> implements - AsyncShardFetch.List { public static final String ACTION_NAME = "internal:gateway/local/started_shards"; @@ -77,7 +75,8 @@ public TransportNodesListGatewayStartedShards(Settings settings, ClusterName clu IndexNameExpressionResolver indexNameExpressionResolver, NodeEnvironment env) { super(settings, ACTION_NAME, clusterName, threadPool, clusterService, transportService, actionFilters, - indexNameExpressionResolver, Request::new, NodeRequest::new, ThreadPool.Names.FETCH_SHARD_STARTED); + indexNameExpressionResolver, Request::new, NodeRequest::new, ThreadPool.Names.FETCH_SHARD_STARTED, + NodeGatewayStartedShards.class); this.nodeEnv = env; } @@ -110,23 +109,9 @@ protected NodeGatewayStartedShards newNodeResponse() { } @Override - protected NodesGatewayStartedShards newResponse(Request request, AtomicReferenceArray responses) { - final List nodesList = new ArrayList<>(); - final List failures = new ArrayList<>(); - for (int i = 0; i < responses.length(); i++) { - Object resp = responses.get(i); - if (resp instanceof NodeGatewayStartedShards) { // will also filter out null response for unallocated ones - nodesList.add((NodeGatewayStartedShards) resp); - } else if (resp instanceof FailedNodeException) { - failures.add((FailedNodeException) resp); - } else { - logger.warn("unknown response type [{}], expected NodeLocalGatewayStartedShards or FailedNodeException", - resp); - } - } - return new NodesGatewayStartedShards(clusterName, - nodesList.toArray(new NodeGatewayStartedShards[nodesList.size()]), - failures.toArray(new FailedNodeException[failures.size()])); + protected NodesGatewayStartedShards newResponse(Request request, + List responses, List failures) { + return new NodesGatewayStartedShards(clusterName, responses, failures); } @Override @@ -217,36 +202,19 @@ public void writeTo(StreamOutput out) throws IOException { public static class NodesGatewayStartedShards extends BaseNodesResponse { - private FailedNodeException[] failures; - - public NodesGatewayStartedShards(ClusterName clusterName, NodeGatewayStartedShards[] nodes, - FailedNodeException[] failures) { - super(clusterName, nodes); - this.failures = failures; + public NodesGatewayStartedShards(ClusterName clusterName, List nodes, + List failures) { + super(clusterName, nodes, failures); } @Override - public FailedNodeException[] failures() { - return failures; + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readStreamableList(NodeGatewayStartedShards::new); } @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - nodes = new NodeGatewayStartedShards[in.readVInt()]; - for (int i = 0; i < nodes.length; i++) { - nodes[i] = new NodeGatewayStartedShards(); - nodes[i].readFrom(in); - } - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeVInt(nodes.length); - for (NodeGatewayStartedShards response : nodes) { - response.writeTo(out); - } + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeStreamableList(nodes); } } @@ -258,7 +226,7 @@ public static class NodeRequest extends BaseNodeRequest { public NodeRequest() { } - NodeRequest(String nodeId, Request request) { + public NodeRequest(String nodeId, Request request) { super(nodeId); this.shardId = request.shardId(); } diff --git a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index e4a1709db551b..92c9bd3b5752f 100644 --- a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -54,18 +54,20 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReferenceArray; /** * */ -public class TransportNodesListShardStoreMetaData extends TransportNodesAction - implements AsyncShardFetch.List { +public class TransportNodesListShardStoreMetaData extends TransportNodesAction + implements AsyncShardFetch.Lister { public static final String ACTION_NAME = "internal:cluster/nodes/indices/shard/store"; @@ -74,11 +76,12 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction nodeStoreFilesMetaDatas = new ArrayList<>(); - final List failures = new ArrayList<>(); - for (int i = 0; i < responses.length(); i++) { - Object resp = responses.get(i); - if (resp instanceof NodeStoreFilesMetaData) { // will also filter out null response for unallocated ones - nodeStoreFilesMetaDatas.add((NodeStoreFilesMetaData) resp); - } else if (resp instanceof FailedNodeException) { - failures.add((FailedNodeException) resp); - } else { - logger.warn("unknown response type [{}], expected NodeStoreFilesMetaData or FailedNodeException", resp); - } - } - return new NodesStoreFilesMetaData(clusterName, nodeStoreFilesMetaDatas.toArray(new NodeStoreFilesMetaData[nodeStoreFilesMetaDatas.size()]), - failures.toArray(new FailedNodeException[failures.size()])); + protected NodesStoreFilesMetaData newResponse(Request request, + List responses, List failures) { + return new NodesStoreFilesMetaData(clusterName, responses, failures); } @Override @@ -293,37 +284,21 @@ public void writeTo(StreamOutput out) throws IOException { public static class NodesStoreFilesMetaData extends BaseNodesResponse { - private FailedNodeException[] failures; - NodesStoreFilesMetaData() { } - public NodesStoreFilesMetaData(ClusterName clusterName, NodeStoreFilesMetaData[] nodes, FailedNodeException[] failures) { - super(clusterName, nodes); - this.failures = failures; + public NodesStoreFilesMetaData(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); } @Override - public FailedNodeException[] failures() { - return failures; + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(NodeStoreFilesMetaData::readListShardStoreNodeOperationResponse); } @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - nodes = new NodeStoreFilesMetaData[in.readVInt()]; - for (int i = 0; i < nodes.length; i++) { - nodes[i] = NodeStoreFilesMetaData.readListShardStoreNodeOperationResponse(in); - } - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeVInt(nodes.length); - for (NodeStoreFilesMetaData response : nodes) { - response.writeTo(out); - } + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeStreamableList(nodes); } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/hotthreads/RestNodesHotThreadsAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/hotthreads/RestNodesHotThreadsAction.java index 53bec14f967f8..19bee19509a01 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/hotthreads/RestNodesHotThreadsAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/hotthreads/RestNodesHotThreadsAction.java @@ -69,7 +69,7 @@ public void handleRequest(final RestRequest request, final RestChannel channel, @Override public RestResponse buildResponse(NodesHotThreadsResponse response) throws Exception { StringBuilder sb = new StringBuilder(); - for (NodeHotThreads node : response) { + for (NodeHotThreads node : response.getNodes()) { sb.append("::: ").append(node.getNode().toString()).append("\n"); Strings.spaceify(3, node.getHotThreads(), sb); sb.append('\n'); diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/info/RestNodesInfoAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/info/RestNodesInfoAction.java index bd6637cb788a2..65b7715385f3f 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/info/RestNodesInfoAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/info/RestNodesInfoAction.java @@ -20,22 +20,17 @@ package org.elasticsearch.rest.action.admin.cluster.node.info; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; -import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.BaseRestHandler; -import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.RestResponse; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.rest.action.support.RestBuilderListener; +import org.elasticsearch.rest.action.support.RestActions.NodesResponseRestListener; import java.util.Set; @@ -106,15 +101,6 @@ public void handleRequest(final RestRequest request, final RestChannel channel, settingsFilter.addFilterSettingParams(request); - client.admin().cluster().nodesInfo(nodesInfoRequest, new RestBuilderListener(channel) { - - @Override - public RestResponse buildResponse(NodesInfoResponse response, XContentBuilder builder) throws Exception { - builder.startObject(); - response.toXContent(builder, request); - builder.endObject(); - return new BytesRestResponse(RestStatus.OK, builder); - } - }); + client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel)); } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/stats/RestNodesStatsAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/stats/RestNodesStatsAction.java index 1e2aece1646ef..47cce5283a9f6 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/stats/RestNodesStatsAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/stats/RestNodesStatsAction.java @@ -20,7 +20,6 @@ package org.elasticsearch.rest.action.admin.cluster.node.stats; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest; -import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag; import org.elasticsearch.client.Client; @@ -31,7 +30,7 @@ import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.support.RestToXContentListener; +import org.elasticsearch.rest.action.support.RestActions.NodesResponseRestListener; import java.util.Set; @@ -114,6 +113,6 @@ public void handleRequest(final RestRequest request, final RestChannel channel, nodesStatsRequest.indices().includeSegmentFileSizes(true); } - client.admin().cluster().nodesStats(nodesStatsRequest, new RestToXContentListener<>(channel)); + client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel)); } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/stats/RestClusterStatsAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/stats/RestClusterStatsAction.java index a02dcfccb98c4..29cc6377494eb 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/stats/RestClusterStatsAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/stats/RestClusterStatsAction.java @@ -27,8 +27,7 @@ import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.support.RestToXContentListener; - +import org.elasticsearch.rest.action.support.RestActions.NodesResponseRestListener; /** * @@ -46,6 +45,6 @@ public RestClusterStatsAction(Settings settings, RestController controller, Clie public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) { ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null)); clusterStatsRequest.timeout(request.param("timeout")); - client.admin().cluster().clusterStats(clusterStatsRequest, new RestToXContentListener<>(channel)); + client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel)); } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/support/RestActions.java b/core/src/main/java/org/elasticsearch/rest/action/support/RestActions.java index 652cb8c61e902..51d5089ec6f6b 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/support/RestActions.java +++ b/core/src/main/java/org/elasticsearch/rest/action/support/RestActions.java @@ -21,13 +21,17 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContent.Params; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; @@ -38,9 +42,14 @@ import org.elasticsearch.index.query.QueryParseContext; import org.elasticsearch.index.query.QueryStringQueryBuilder; import org.elasticsearch.indices.query.IndicesQueriesRegistry; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; import java.io.IOException; +import java.util.List; /** * @@ -63,25 +72,21 @@ public static long parseVersion(RestRequest request, long defaultVersion) { return (version == Versions.MATCH_ANY) ? defaultVersion : version; } - static final class Fields { - static final String _SHARDS = "_shards"; - static final String TOTAL = "total"; - static final String SUCCESSFUL = "successful"; - static final String FAILED = "failed"; - static final String FAILURES = "failures"; + public static void buildBroadcastShardsHeader(XContentBuilder builder, Params params, BroadcastResponse response) throws IOException { + buildBroadcastShardsHeader(builder, params, + response.getTotalShards(), response.getSuccessfulShards(), response.getFailedShards(), + response.getShardFailures()); } - public static void buildBroadcastShardsHeader(XContentBuilder builder, ToXContent.Params params, BroadcastResponse response) throws IOException { - buildBroadcastShardsHeader(builder, params, response.getTotalShards(), response.getSuccessfulShards(), response.getFailedShards(), response.getShardFailures()); - } - - public static void buildBroadcastShardsHeader(XContentBuilder builder, ToXContent.Params params, int total, int successful, int failed, ShardOperationFailedException[] shardFailures) throws IOException { - builder.startObject(Fields._SHARDS); - builder.field(Fields.TOTAL, total); - builder.field(Fields.SUCCESSFUL, successful); - builder.field(Fields.FAILED, failed); + public static void buildBroadcastShardsHeader(XContentBuilder builder, Params params, + int total, int successful, int failed, + ShardOperationFailedException[] shardFailures) throws IOException { + builder.startObject("_shards"); + builder.field("total", total); + builder.field("successful", successful); + builder.field("failed", failed); if (shardFailures != null && shardFailures.length > 0) { - builder.startArray(Fields.FAILURES); + builder.startArray("failures"); final boolean group = params.paramAsBoolean("group_shard_failures", true); // we group by default for (ShardOperationFailedException shardFailure : group ? ExceptionsHelper.groupBy(shardFailures) : shardFailures) { builder.startObject(); @@ -92,6 +97,94 @@ public static void buildBroadcastShardsHeader(XContentBuilder builder, ToXConten } builder.endObject(); } + /** + * Create the XContent header for any {@link BaseNodesResponse}. + * + * @param builder XContent builder. + * @param params XContent parameters. + * @param response The response containing individual, node-level responses. + * @see #buildNodesHeader(XContentBuilder, Params, int, int, int, List) + */ + public static void buildNodesHeader(final XContentBuilder builder, final Params params, + final BaseNodesResponse response) + throws IOException { + final int successful = response.getNodes().size(); + final int failed = response.failures().size(); + + buildNodesHeader(builder, params, successful + failed, successful, failed, response.failures()); + } + + /** + * Create the XContent header for any {@link BaseNodesResponse}. This looks like: + * + * "_nodes" : { + * "total" : 3, + * "successful" : 1, + * "failed" : 2, + * "failures" : [ { ... }, { ... } ] + * } + * + * Prefer the overload that properly invokes this method to calling this directly. + * + * @param builder XContent builder. + * @param params XContent parameters. + * @param total The total number of nodes touched. + * @param successful The successful number of responses received. + * @param failed The number of failures (effectively {@code total - successful}). + * @param failures The failure exceptions related to {@code failed}. + * @see #buildNodesHeader(XContentBuilder, Params, BaseNodesResponse) + */ + public static void buildNodesHeader(final XContentBuilder builder, final Params params, + final int total, final int successful, final int failed, + final List failures) throws IOException { + builder.startObject("_nodes"); + builder.field("total", total); + builder.field("successful", successful); + builder.field("failed", failed); + + if (failures.isEmpty() == false) { + builder.startArray("failures"); + for (FailedNodeException failure : failures) { + builder.startObject(); + failure.toXContent(builder, params); + builder.endObject(); + } + builder.endArray(); + } + + builder.endObject(); + } + + /** + * Automatically transform the {@link ToXContent}-compatible, nodes-level {@code response} into a a {@link BytesRestResponse}. + *

+ * This looks like: + * + * { + * "_nodes" : { ... }, + * "cluster_name" : "...", + * ... + * } + * + * + * @param builder XContent builder. + * @param params XContent parameters. + * @param response The nodes-level (plural) response. + * @return Never {@code null}. + * @throws IOException if building the response causes an issue + */ + public static BytesRestResponse nodesResponse(final XContentBuilder builder, + final Params params, + final NodesResponse response) + throws IOException { + builder.startObject(); + RestActions.buildNodesHeader(builder, params, response); + builder.field("cluster_name", response.getClusterName().value()); + response.toXContent(builder, params); + builder.endObject(); + + return new BytesRestResponse(RestStatus.OK, builder); + } public static QueryBuilder urlParamsToQueryBuilder(RestRequest request) { String queryString = request.param("q"); @@ -130,7 +223,8 @@ public static BytesReference getRestContent(RestRequest request) { return content; } - public static QueryBuilder getQueryContent(BytesReference source, IndicesQueriesRegistry indicesQueriesRegistry, ParseFieldMatcher parseFieldMatcher) { + public static QueryBuilder getQueryContent(BytesReference source, IndicesQueriesRegistry indicesQueriesRegistry, + ParseFieldMatcher parseFieldMatcher) { try (XContentParser requestParser = XContentFactory.xContent(source).createParser(source)) { QueryParseContext context = new QueryParseContext(indicesQueriesRegistry, requestParser, parseFieldMatcher); return context.parseTopLevelQueryBuilder(); @@ -158,4 +252,32 @@ public static XContentType guessBodyContentType(final RestRequest request) { public static boolean hasBodyContent(final RestRequest request) { return request.hasContent() || request.hasParam("source"); } + + /** + * {@code NodesResponseRestBuilderListener} automatically translates any {@link BaseNodesResponse} (multi-node) response that is + * {@link ToXContent}-compatible into a {@link RestResponse} with the necessary header info (e.g., "cluster_name"). + *

+ * This is meant to avoid a slew of anonymous classes doing (or worse): + * + * client.admin().cluster().request(nodesRequest, new RestBuilderListener<NodesResponse>(channel) { + * public RestResponse buildResponse(NodesResponse response, XContentBuilder builder) throws Exception { + * return RestActions.nodesResponse(builder, ToXContent.EMPTY_PARAMS, response); + * } + * }); + * + */ + public static class NodesResponseRestListener + extends RestBuilderListener { + + public NodesResponseRestListener(RestChannel channel) { + super(channel); + } + + @Override + public RestResponse buildResponse(NodesResponse response, XContentBuilder builder) throws Exception { + return RestActions.nodesResponse(builder, ToXContent.EMPTY_PARAMS, response); + } + + } + } diff --git a/core/src/test/java/org/elasticsearch/action/admin/HotThreadsIT.java b/core/src/test/java/org/elasticsearch/action/admin/HotThreadsIT.java index 47823307ffd23..480c103df81d5 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/HotThreadsIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/HotThreadsIT.java @@ -86,7 +86,7 @@ public void onResponse(NodesHotThreadsResponse nodeHotThreads) { assertThat(nodeHotThreads, notNullValue()); Map nodesMap = nodeHotThreads.getNodesMap(); assertThat(nodesMap.size(), equalTo(cluster().size())); - for (NodeHotThreads ht : nodeHotThreads) { + for (NodeHotThreads ht : nodeHotThreads.getNodes()) { assertNotNull(ht.getHotThreads()); //logger.info(ht.getHotThreads()); } diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java index 8168d1a8819ce..628e26bc4cf90 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java @@ -28,7 +28,6 @@ import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.ESSingleNodeTestCase; import java.util.HashMap; import java.util.List; @@ -37,7 +36,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; /** * Tests for the cluster allocation explanation @@ -54,7 +52,7 @@ public void testDelayShards() throws Exception { @Override public void run() { NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get(); - assertThat(resp.getNodes().length, equalTo(3)); + assertThat(resp.getNodes().size(), equalTo(3)); } }); diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java index d96531b2f6cf8..9af2bb0741755 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java @@ -255,12 +255,12 @@ public void onFailure(Throwable e) { // Make sure that the request was successful assertNull(throwableReference.get()); assertNotNull(responseReference.get()); - assertEquals(nodesCount, responseReference.get().getNodes().length); + assertEquals(nodesCount, responseReference.get().getNodes().size()); assertEquals(0, responseReference.get().failureCount()); } else { // We canceled the request, in this case it should have fail, but we should get partial response assertNull(throwableReference.get()); - assertEquals(nodesCount, responseReference.get().failureCount() + responseReference.get().getNodes().length); + assertEquals(nodesCount, responseReference.get().failureCount() + responseReference.get().getNodes().size()); // and we should have at least as many failures as the number of blocked operations // (we might have cancelled some non-blocked operations before they even started and that's ok) assertThat(responseReference.get().failureCount(), greaterThanOrEqualTo(blockedNodesCount)); diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index 9c0e2bfcafd9e..ffc19aa0dcd7b 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -50,11 +50,9 @@ import org.junit.BeforeClass; import java.io.IOException; -import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.function.Supplier; import static java.util.Collections.emptyMap; @@ -88,7 +86,6 @@ public void setupTestNodes(Settings settings) { testNodes = new TestNode[nodesCount]; for (int i = 0; i < testNodes.length; i++) { testNodes[i] = new TestNode("node" + i, threadPool, settings); - ; } } @@ -113,27 +110,22 @@ protected NodeResponse(DiscoveryNode node) { static class NodesResponse extends BaseNodesResponse { - private int failureCount; - - protected NodesResponse(ClusterName clusterName, NodeResponse[] nodes, int failureCount) { - super(clusterName, nodes); - this.failureCount = failureCount; + protected NodesResponse(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); } @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - failureCount = in.readVInt(); + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readStreamableList(NodeResponse::new); } @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeVInt(failureCount); + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeStreamableList(nodes); } public int failureCount() { - return failureCount; + return failures().size(); } } @@ -148,24 +140,12 @@ abstract class AbstractTestNodesAction nodeRequest) { super(settings, actionName, clusterName, threadPool, clusterService, transportService, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), - request, nodeRequest, ThreadPool.Names.GENERIC); + request, nodeRequest, ThreadPool.Names.GENERIC, NodeResponse.class); } @Override - protected NodesResponse newResponse(NodesRequest request, AtomicReferenceArray responses) { - final List nodesList = new ArrayList<>(); - int failureCount = 0; - for (int i = 0; i < responses.length(); i++) { - Object resp = responses.get(i); - if (resp instanceof NodeResponse) { // will also filter out null response for unallocated ones - nodesList.add((NodeResponse) resp); - } else if (resp instanceof FailedNodeException) { - failureCount++; - } else { - logger.warn("unknown response type [{}], expected NodeLocalGatewayMetaState or FailedNodeException", resp); - } - } - return new NodesResponse(clusterName, nodesList.toArray(new NodeResponse[nodesList.size()]), failureCount); + protected NodesResponse newResponse(NodesRequest request, List responses, List failures) { + return new NodesResponse(clusterName, responses, failures); } @Override diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java index 7f1d6c8b835de..ec05ff9c98401 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java @@ -56,7 +56,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.concurrent.atomic.AtomicReferenceArray; import static org.elasticsearch.test.ESTestCase.awaitBusy; @@ -111,31 +110,26 @@ public NodeResponse(DiscoveryNode node) { public static class NodesResponse extends BaseNodesResponse { - private int failureCount; - NodesResponse() { } - public NodesResponse(ClusterName clusterName, NodeResponse[] nodes, int failureCount) { - super(clusterName, nodes); - this.failureCount = failureCount; + public NodesResponse(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); } @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - failureCount = in.readVInt(); + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readStreamableList(NodeResponse::new); } @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeVInt(failureCount); + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeStreamableList(nodes); } public int failureCount() { - return failureCount; + return failures().size(); } } @@ -219,25 +213,13 @@ public static class TransportTestTaskAction extends TransportNodesAction()), new IndexNameExpressionResolver(Settings.EMPTY), - NodesRequest::new, NodeRequest::new, ThreadPool.Names.GENERIC); + new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), + NodesRequest::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeResponse.class); } @Override - protected NodesResponse newResponse(NodesRequest request, AtomicReferenceArray responses) { - final List nodesList = new ArrayList<>(); - int failureCount = 0; - for (int i = 0; i < responses.length(); i++) { - Object resp = responses.get(i); - if (resp instanceof NodeResponse) { // will also filter out null response for unallocated ones - nodesList.add((NodeResponse) resp); - } else if (resp instanceof FailedNodeException) { - failureCount++; - } else { - logger.warn("unknown response type [{}], expected NodeLocalGatewayMetaState or FailedNodeException", resp); - } - } - return new NodesResponse(clusterName, nodesList.toArray(new NodeResponse[nodesList.size()]), failureCount); + protected NodesResponse newResponse(NodesRequest request, List responses, List failures) { + return new NodesResponse(clusterName, responses, failures); } @Override diff --git a/core/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java b/core/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java index 6a7f7ac339828..a3335d87fd63f 100644 --- a/core/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/nodes/TransportNodesActionTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.support.nodes; import org.elasticsearch.Version; +import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeActionTests; @@ -28,6 +29,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.test.ESTestCase; @@ -39,6 +42,7 @@ import org.junit.Before; import org.junit.BeforeClass; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -53,6 +57,7 @@ import static org.elasticsearch.cluster.service.ClusterServiceUtils.createClusterService; import static org.elasticsearch.cluster.service.ClusterServiceUtils.setState; +import static org.mockito.Mockito.mock; public class TransportNodesActionTests extends ESTestCase { @@ -92,6 +97,42 @@ public void testNodesSelectors() { assertEquals(clusterService.state().nodes().resolveNodesIds(finalNodesIds).length, capturedRequests.size()); } + public void testNewResponseNullArray() { + expectThrows(NullPointerException.class, () -> action.newResponse(new TestNodesRequest(), null)); + } + + public void testNewResponse() { + TestNodesRequest request = new TestNodesRequest(); + List expectedNodeResponses = mockList(TestNodeResponse.class, randomIntBetween(0, 2)); + expectedNodeResponses.add(new TestNodeResponse()); + List nodeResponses = new ArrayList<>(expectedNodeResponses); + // This should be ignored: + nodeResponses.add(new OtherNodeResponse()); + List failures = mockList(FailedNodeException.class, randomIntBetween(0, 2)); + + List allResponses = new ArrayList<>(expectedNodeResponses); + allResponses.addAll(failures); + + Collections.shuffle(allResponses, random()); + + AtomicReferenceArray atomicArray = new AtomicReferenceArray<>(allResponses.toArray()); + + TestNodesResponse response = action.newResponse(request, atomicArray); + + assertSame(request, response.request); + // note: I shuffled the overall list, so it's not possible to guarantee that it's in the right order + assertTrue(expectedNodeResponses.containsAll(response.getNodes())); + assertTrue(failures.containsAll(response.failures())); + } + + private List mockList(Class clazz, int size) { + List failures = new ArrayList<>(size); + for (int i = 0; i < size; ++i) { + failures.add(mock(clazz)); + } + return failures; + } + private enum NodeSelector { LOCAL("_local"), ELECTED_MASTER("_master"), MASTER_ELIGIBLE("master:true"), DATA("data:true"), CUSTOM_ATTRIBUTE("attr:value"); @@ -165,26 +206,20 @@ private static DiscoveryNode newNode(int nodeId, Map attributes, return new DiscoveryNode(node, node, DummyTransportAddress.INSTANCE, attributes, roles, Version.CURRENT); } - private static class TestTransportNodesAction extends TransportNodesAction { + private static class TestTransportNodesAction + extends TransportNodesAction { TestTransportNodesAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, Supplier request, Supplier nodeRequest, String nodeExecutor) { super(settings, "indices:admin/test", CLUSTER_NAME, threadPool, clusterService, transportService, actionFilters, - null, request, nodeRequest, nodeExecutor); + null, request, nodeRequest, nodeExecutor, TestNodeResponse.class); } @Override - protected TestNodesResponse newResponse(TestNodesRequest request, AtomicReferenceArray nodesResponses) { - final List nodeResponses = new ArrayList<>(); - for (int i = 0; i < nodesResponses.length(); i++) { - Object resp = nodesResponses.get(i); - if (resp instanceof TestNodeResponse) { - nodeResponses.add((TestNodeResponse) resp); - } - } - return new TestNodesResponse(nodeResponses); + protected TestNodesResponse newResponse(TestNodesRequest request, + List responses, List failures) { + return new TestNodesResponse(request, responses, failures); } @Override @@ -216,16 +251,28 @@ private static class TestNodesRequest extends BaseNodesRequest private static class TestNodesResponse extends BaseNodesResponse { - private final List nodeResponses; + private final TestNodesRequest request; - TestNodesResponse(List nodeResponses) { - this.nodeResponses = nodeResponses; + TestNodesResponse(TestNodesRequest request, List nodeResponses, List failures) { + super(CLUSTER_NAME, nodeResponses, failures); + this.request = request; } - } - private static class TestNodeRequest extends BaseNodeRequest { - } + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readStreamableList(TestNodeResponse::new); + } - private static class TestNodeResponse extends BaseNodeResponse { + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeStreamableList(nodes); + } } + + private static class TestNodeRequest extends BaseNodeRequest { } + + private static class TestNodeResponse extends BaseNodeResponse { } + + private static class OtherNodeResponse extends BaseNodeResponse { } + } diff --git a/core/src/test/java/org/elasticsearch/bwcompat/ClusterStateBackwardsCompatIT.java b/core/src/test/java/org/elasticsearch/bwcompat/ClusterStateBackwardsCompatIT.java index 81d894e3349d4..a4427befea2f8 100644 --- a/core/src/test/java/org/elasticsearch/bwcompat/ClusterStateBackwardsCompatIT.java +++ b/core/src/test/java/org/elasticsearch/bwcompat/ClusterStateBackwardsCompatIT.java @@ -44,7 +44,7 @@ public void testClusterState() throws Exception { createIndex("test"); // connect to each node with a custom TransportClient, issue a ClusterStateRequest to test serialization - for (NodeInfo n : clusterNodes()) { + for (NodeInfo n : clusterNodes().getNodes()) { try (TransportClient tc = newTransportClient()) { tc.addTransportAddress(n.getNode().getAddress()); ClusterStateResponse response = tc.admin().cluster().prepareState().execute().actionGet(); @@ -68,7 +68,7 @@ public void testClusterStateWithBlocks() { try { enableIndexBlock("test-blocks", block.getKey()); - for (NodeInfo n : clusterNodes()) { + for (NodeInfo n : clusterNodes().getNodes()) { try (TransportClient tc = newTransportClient()) { tc.addTransportAddress(n.getNode().getAddress()); diff --git a/core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java b/core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java index 1826ced24ea30..31c9c21b8e7c0 100644 --- a/core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java @@ -41,6 +41,8 @@ import org.elasticsearch.test.ESTestCase; import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -196,14 +198,14 @@ public void testFillDiskUsage() { new FsInfo.Path("/least", "/dev/sda", 100, 90, 70), new FsInfo.Path("/most", "/dev/sda", 100, 90, 80), }; - NodeStats[] nodeStats = new NodeStats[] { + List nodeStats = Arrays.asList( new NodeStats(new DiscoveryNode("node_1", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT), 0, null,null,null,null,null,new FsInfo(0, node1FSInfo), null,null,null,null,null, null), new NodeStats(new DiscoveryNode("node_2", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT), 0, null,null,null,null,null, new FsInfo(0, node2FSInfo), null,null,null,null,null, null), new NodeStats(new DiscoveryNode("node_3", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT), 0, null,null,null,null,null, new FsInfo(0, node3FSInfo), null,null,null,null,null, null) - }; + ); InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvaiableUsages, newMostAvaiableUsages); DiskUsage leastNode_1 = newLeastAvaiableUsages.get("node_1"); DiskUsage mostNode_1 = newMostAvaiableUsages.get("node_1"); @@ -237,14 +239,14 @@ public void testFillDiskUsageSomeInvalidValues() { new FsInfo.Path("/most", "/dev/sda", 100, 90, 70), new FsInfo.Path("/least", "/dev/sda", 10, -8, 0), }; - NodeStats[] nodeStats = new NodeStats[] { + List nodeStats = Arrays.asList( new NodeStats(new DiscoveryNode("node_1", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT), 0, null,null,null,null,null,new FsInfo(0, node1FSInfo), null,null,null,null,null, null), new NodeStats(new DiscoveryNode("node_2", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT), 0, null,null,null,null,null, new FsInfo(0, node2FSInfo), null,null,null,null,null, null), new NodeStats(new DiscoveryNode("node_3", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT), 0, null,null,null,null,null, new FsInfo(0, node3FSInfo), null,null,null,null,null, null) - }; + ); InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvailableUsages, newMostAvailableUsages); DiskUsage leastNode_1 = newLeastAvailableUsages.get("node_1"); DiskUsage mostNode_1 = newMostAvailableUsages.get("node_1"); diff --git a/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java b/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java index 8e2dea63d6aa0..8d13f04240e45 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java @@ -81,7 +81,7 @@ public void testClusterUpdateSettingsAcknowledgement() { NodesInfoResponse nodesInfo = client().admin().cluster().prepareNodesInfo().get(); String excludedNodeId = null; - for (NodeInfo nodeInfo : nodesInfo) { + for (NodeInfo nodeInfo : nodesInfo.getNodes()) { if (nodeInfo.getNode().isDataNode()) { excludedNodeId = nodeInfo.getNode().getId(); break; @@ -124,7 +124,7 @@ public void testClusterUpdateSettingsNoAcknowledgement() { NodesInfoResponse nodesInfo = client().admin().cluster().prepareNodesInfo().get(); String excludedNodeId = null; - for (NodeInfo nodeInfo : nodesInfo) { + for (NodeInfo nodeInfo : nodesInfo.getNodes()) { if (nodeInfo.getNode().isDataNode()) { excludedNodeId = nodeInfo.getNode().getId(); break; diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java index 81a28adee2dc3..ef54c6fd6372c 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java @@ -59,7 +59,7 @@ public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception { @Override public void run() { NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get(); - assertThat(resp.getNodes().length, equalTo(3)); + assertThat(resp.getNodes().size(), equalTo(3)); } }); diff --git a/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java b/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java index 381120fdc0f54..9fcbb708156b3 100644 --- a/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java +++ b/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java @@ -22,20 +22,20 @@ import org.apache.lucene.util.Constants; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.geo.GeoPoint; -import org.elasticsearch.common.joda.FormatDateTimeFormatter; -import org.elasticsearch.common.joda.Joda; import org.elasticsearch.common.lucene.BytesRefs; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.test.ESTestCase; import org.joda.time.DateTimeZone; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.startsWith; @@ -431,6 +431,33 @@ public String getWriteableName() { endsWith(" claims to have a different name [intentionally-broken] than it was read from [test-named-writeable].")); } + public void testWriteStreamableList() throws IOException { + final int size = randomIntBetween(0, 5); + final List expected = new ArrayList<>(size); + + for (int i = 0; i < size; ++i) { + expected.add(new TestStreamable(randomBoolean())); + } + + final BytesStreamOutput out = new BytesStreamOutput(); + out.writeStreamableList(expected); + + final StreamInput in = StreamInput.wrap(out.bytes().toBytes()); + + List loaded = in.readStreamableList(TestStreamable::new); + + assertThat(loaded, hasSize(expected.size())); + + for (int i = 0; i < expected.size(); ++i) { + assertEquals(expected.get(i).value, loaded.get(i).value); + } + + assertEquals(0, in.available()); + + in.close(); + out.close(); + } + private static abstract class BaseNamedWriteable implements NamedWriteable { } @@ -544,4 +571,25 @@ public void testReadWriteGeoPoint() throws IOException { assertEquals(point, geoPoint); } } + + private static class TestStreamable implements Streamable { + + private boolean value; + + public TestStreamable() { } + + public TestStreamable(boolean value) { + this.value = value; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + value = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(value); + } + } } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java index 2de9d669511ec..d12a0ff55a36e 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java @@ -349,9 +349,9 @@ public void testDiscoveryStats() throws IOException { logger.info("--> request node discovery stats"); NodesStatsResponse statsResponse = client().admin().cluster().prepareNodesStats().clear().setDiscovery(true).get(); - assertThat(statsResponse.getNodes().length, equalTo(1)); + assertThat(statsResponse.getNodes().size(), equalTo(1)); - DiscoveryStats stats = statsResponse.getNodes()[0].getDiscoveryStats(); + DiscoveryStats stats = statsResponse.getNodes().get(0).getDiscoveryStats(); assertThat(stats.getQueueStats(), notNullValue()); assertThat(stats.getQueueStats().getTotal(), equalTo(0)); assertThat(stats.getQueueStats().getCommitted(), equalTo(0)); diff --git a/core/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java b/core/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java index 3ab15baf2e03a..b710aa50ee0e4 100644 --- a/core/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java @@ -284,10 +284,11 @@ public void run() { assert entry != null; entry.executeLatch.await(); if (entry.failure != null) { - processAsyncFetch(shardId, null, new FailedNodeException[]{new FailedNodeException(nodeId, - "unexpected", entry.failure)}); + processAsyncFetch(shardId, null, Collections.singletonList(new FailedNodeException(nodeId, + "unexpected", + entry.failure))); } else { - processAsyncFetch(shardId, new Response[]{entry.response}, null); + processAsyncFetch(shardId, Collections.singletonList(entry.response), null); } } catch (Throwable e) { logger.error("unexpected failure", e); diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index 702e83e7d55b9..aaa29ad1970fe 100644 --- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -63,9 +63,9 @@ import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -568,12 +568,12 @@ public Settings onNodeStopped(String nodeName) throws Exception { .execute(new TransportNodesListGatewayStartedShards.Request(shardId, new String[]{node.getId()})) .get(); - assertThat(response.getNodes(), arrayWithSize(1)); - assertThat(response.getNodes()[0].allocationId(), notNullValue()); + assertThat(response.getNodes(), hasSize(1)); + assertThat(response.getNodes().get(0).allocationId(), notNullValue()); if (corrupt) { - assertThat(response.getNodes()[0].storeException(), notNullValue()); + assertThat(response.getNodes().get(0).storeException(), notNullValue()); } else { - assertThat(response.getNodes()[0].storeException(), nullValue()); + assertThat(response.getNodes().get(0).storeException(), nullValue()); } // start another node so cluster consistency checks won't time out due to the lack of state diff --git a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java index 367ecf7676802..df8736b44efa5 100644 --- a/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java +++ b/core/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java @@ -578,7 +578,7 @@ private Map> findFilesToCorruptForReplica() throws IOExceptio } assertTrue(shardRouting.assignedToNode()); NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(shardRouting.currentNodeId()).setFs(true).get(); - NodeStats nodeStats = nodeStatses.getNodes()[0]; + NodeStats nodeStats = nodeStatses.getNodes().get(0); List files = new ArrayList<>(); filesToNodes.put(nodeStats.getNode().getName(), files); for (FsInfo.Path info : nodeStats.getFs()) { @@ -615,7 +615,7 @@ private ShardRouting corruptRandomPrimaryFile(final boolean includePerCommitFile String nodeId = shardRouting.currentNodeId(); NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(nodeId).setFs(true).get(); Set files = new TreeSet<>(); // treeset makes sure iteration order is deterministic - for (FsInfo.Path info : nodeStatses.getNodes()[0].getFs()) { + for (FsInfo.Path info : nodeStatses.getNodes().get(0).getFs()) { String path = info.getPath(); Path file = PathUtils.get(path).resolve("indices").resolve(test.getUUID()).resolve(Integer.toString(shardRouting.getId())).resolve("index"); if (Files.exists(file)) { // multi data path might only have one path in use @@ -678,9 +678,9 @@ public List listShardFiles(ShardRouting routing) throws IOException { NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(routing.currentNodeId()).setFs(true).get(); ClusterState state = client().admin().cluster().prepareState().get().getState(); final Index test = state.metaData().index("test").getIndex(); - assertThat(routing.toString(), nodeStatses.getNodes().length, equalTo(1)); + assertThat(routing.toString(), nodeStatses.getNodes().size(), equalTo(1)); List files = new ArrayList<>(); - for (FsInfo.Path info : nodeStatses.getNodes()[0].getFs()) { + for (FsInfo.Path info : nodeStatses.getNodes().get(0).getFs()) { String path = info.getPath(); Path file = PathUtils.get(path).resolve("indices/" + test.getUUID() + "/" + Integer.toString(routing.getId()) + "/index"); if (Files.exists(file)) { // multi data path might only have one path in use diff --git a/core/src/test/java/org/elasticsearch/index/store/CorruptedTranslogIT.java b/core/src/test/java/org/elasticsearch/index/store/CorruptedTranslogIT.java index 94ee490c729bb..018bce16566e7 100644 --- a/core/src/test/java/org/elasticsearch/index/store/CorruptedTranslogIT.java +++ b/core/src/test/java/org/elasticsearch/index/store/CorruptedTranslogIT.java @@ -121,7 +121,7 @@ private void corruptRandomTranslogFiles() throws IOException { String nodeId = shardRouting.currentNodeId(); NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(nodeId).setFs(true).get(); Set files = new TreeSet<>(); // treeset makes sure iteration order is deterministic - for (FsInfo.Path fsPath : nodeStatses.getNodes()[0].getFs()) { + for (FsInfo.Path fsPath : nodeStatses.getNodes().get(0).getFs()) { String path = fsPath.getPath(); final String relativeDataLocationPath = "indices/"+ test.getUUID() +"/" + Integer.toString(shardRouting.getId()) + "/translog"; Path file = PathUtils.get(path).resolve(relativeDataLocationPath); diff --git a/core/src/test/java/org/elasticsearch/index/suggest/stats/SuggestStatsIT.java b/core/src/test/java/org/elasticsearch/index/suggest/stats/SuggestStatsIT.java index 0d85063239338..cb0397990ab73 100644 --- a/core/src/test/java/org/elasticsearch/index/suggest/stats/SuggestStatsIT.java +++ b/core/src/test/java/org/elasticsearch/index/suggest/stats/SuggestStatsIT.java @@ -119,10 +119,9 @@ public void testSimpleStats() throws Exception { assertThat(suggest.getSuggestTimeInMillis(), lessThanOrEqualTo(totalShards * (endTime - startTime))); NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet(); - NodeStats[] nodes = nodeStats.getNodes(); Set nodeIdsWithIndex = nodeIdsWithIndex("test1", "test2"); int num = 0; - for (NodeStats stat : nodes) { + for (NodeStats stat : nodeStats.getNodes()) { SearchStats.Stats suggestStats = stat.getIndices().getSearch().getTotal(); logger.info("evaluating {}", stat.getNode()); if (nodeIdsWithIndex.contains(stat.getNode().getId())) { diff --git a/core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java b/core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java index 2f039f6c47209..351742d971239 100644 --- a/core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java +++ b/core/src/test/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java @@ -101,7 +101,7 @@ public void teardown() { /** Returns true if any of the nodes used a noop breaker */ private boolean noopBreakerUsed() { NodesStatsResponse stats = client().admin().cluster().prepareNodesStats().setBreaker(true).get(); - for (NodeStats nodeStats : stats) { + for (NodeStats nodeStats : stats.getNodes()) { if (nodeStats.getBreaker().getStats(CircuitBreaker.REQUEST).getLimit() == NoopCircuitBreaker.LIMIT) { return true; } @@ -230,7 +230,7 @@ public void testParentChecking() throws Exception { // We need the request limit beforehand, just from a single node because the limit should always be the same long beforeReqLimit = client.admin().cluster().prepareNodesStats().setBreaker(true).get() - .getNodes()[0].getBreaker().getStats(CircuitBreaker.REQUEST).getLimit(); + .getNodes().get(0).getBreaker().getStats(CircuitBreaker.REQUEST).getLimit(); Settings resetSettings = Settings.builder() .put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "10b") diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 17b1b8e7d7001..77cffc20ae191 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -69,10 +69,10 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; @@ -315,7 +315,7 @@ public void run() { @Override public void run() { NodesStatsResponse statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get(); - assertThat(statsResponse.getNodes(), arrayWithSize(2)); + assertThat(statsResponse.getNodes(), hasSize(2)); for (NodeStats nodeStats : statsResponse.getNodes()) { final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); if (nodeStats.getNode().getName().equals(nodeA)) { @@ -344,7 +344,7 @@ public void run() { validateIndexRecoveryState(recoveryStates.get(0).getIndex()); statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get(); - assertThat(statsResponse.getNodes(), arrayWithSize(2)); + assertThat(statsResponse.getNodes(), hasSize(2)); for (NodeStats nodeStats : statsResponse.getNodes()) { final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); assertThat(recoveryStats.currentAsSource(), equalTo(0)); @@ -363,7 +363,7 @@ public void run() { ensureGreen(); statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get(); - assertThat(statsResponse.getNodes(), arrayWithSize(2)); + assertThat(statsResponse.getNodes(), hasSize(2)); for (NodeStats nodeStats : statsResponse.getNodes()) { final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); assertThat(recoveryStats.currentAsSource(), equalTo(0)); diff --git a/core/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java b/core/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java index b1513908874d1..4716e7dba7835 100644 --- a/core/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java +++ b/core/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java @@ -104,7 +104,7 @@ public void testFieldDataStats() { client().admin().indices().prepareRefresh().execute().actionGet(); NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet(); - assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); + assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet(); assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0L)); @@ -113,7 +113,7 @@ public void testFieldDataStats() { client().prepareSearch().addSort("field", SortOrder.ASC).execute().actionGet(); nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet(); - assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L)); + assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L)); indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet(); assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), greaterThan(0L)); @@ -123,9 +123,9 @@ public void testFieldDataStats() { // now check the per field stats nodesStats = client().admin().cluster().prepareNodesStats().setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.FieldData, true).fieldDataFields("*")).execute().actionGet(); - assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L)); - assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getFields().get("field") + nodesStats.getNodes()[1].getIndices().getFieldData().getFields().get("field"), greaterThan(0L)); - assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getFields().get("field") + nodesStats.getNodes()[1].getIndices().getFieldData().getFields().get("field"), lessThan(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes())); + assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L)); + assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getFields().get("field") + nodesStats.getNodes().get(1).getIndices().getFieldData().getFields().get("field"), greaterThan(0L)); + assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getFields().get("field") + nodesStats.getNodes().get(1).getIndices().getFieldData().getFields().get("field"), lessThan(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes())); indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).setFieldDataFields("*").execute().actionGet(); assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), greaterThan(0L)); @@ -134,7 +134,7 @@ public void testFieldDataStats() { client().admin().indices().prepareClearCache().setFieldDataCache(true).execute().actionGet(); nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet(); - assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); + assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet(); assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0L)); @@ -152,8 +152,8 @@ public void testClearAllCaches() throws Exception { NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true) .execute().actionGet(); - assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); - assertThat(nodesStats.getNodes()[0].getIndices().getQueryCache().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getQueryCache().getMemorySizeInBytes(), equalTo(0L)); + assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); + assertThat(nodesStats.getNodes().get(0).getIndices().getQueryCache().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getQueryCache().getMemorySizeInBytes(), equalTo(0L)); IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test") .clear().setFieldData(true).setQueryCache(true) @@ -173,8 +173,8 @@ public void testClearAllCaches() throws Exception { nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true) .execute().actionGet(); - assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L)); - assertThat(nodesStats.getNodes()[0].getIndices().getQueryCache().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getQueryCache().getMemorySizeInBytes(), greaterThan(0L)); + assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0L)); + assertThat(nodesStats.getNodes().get(0).getIndices().getQueryCache().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getQueryCache().getMemorySizeInBytes(), greaterThan(0L)); indicesStats = client().admin().indices().prepareStats("test") .clear().setFieldData(true).setQueryCache(true) @@ -186,8 +186,8 @@ public void testClearAllCaches() throws Exception { Thread.sleep(100); // Make sure the filter cache entries have been removed... nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true) .execute().actionGet(); - assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); - assertThat(nodesStats.getNodes()[0].getIndices().getQueryCache().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getQueryCache().getMemorySizeInBytes(), equalTo(0L)); + assertThat(nodesStats.getNodes().get(0).getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L)); + assertThat(nodesStats.getNodes().get(0).getIndices().getQueryCache().getMemorySizeInBytes() + nodesStats.getNodes().get(1).getIndices().getQueryCache().getMemorySizeInBytes(), equalTo(0L)); indicesStats = client().admin().indices().prepareStats("test") .clear().setFieldData(true).setQueryCache(true) diff --git a/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java b/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java index c38aeb967ef36..b7dcf2872e2de 100644 --- a/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java +++ b/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java @@ -54,29 +54,29 @@ public void testNodesInfos() throws Exception { logger.info("--> started nodes: {} and {}", server1NodeId, server2NodeId); NodesInfoResponse response = client().admin().cluster().prepareNodesInfo().execute().actionGet(); - assertThat(response.getNodes().length, is(2)); + assertThat(response.getNodes().size(), is(2)); assertThat(response.getNodesMap().get(server1NodeId), notNullValue()); assertThat(response.getNodesMap().get(server2NodeId), notNullValue()); response = client().admin().cluster().nodesInfo(nodesInfoRequest()).actionGet(); - assertThat(response.getNodes().length, is(2)); + assertThat(response.getNodes().size(), is(2)); assertThat(response.getNodesMap().get(server1NodeId), notNullValue()); assertThat(response.getNodesMap().get(server2NodeId), notNullValue()); response = client().admin().cluster().nodesInfo(nodesInfoRequest(server1NodeId)).actionGet(); - assertThat(response.getNodes().length, is(1)); + assertThat(response.getNodes().size(), is(1)); assertThat(response.getNodesMap().get(server1NodeId), notNullValue()); response = client().admin().cluster().nodesInfo(nodesInfoRequest(server1NodeId)).actionGet(); - assertThat(response.getNodes().length, is(1)); + assertThat(response.getNodes().size(), is(1)); assertThat(response.getNodesMap().get(server1NodeId), notNullValue()); response = client().admin().cluster().nodesInfo(nodesInfoRequest(server2NodeId)).actionGet(); - assertThat(response.getNodes().length, is(1)); + assertThat(response.getNodes().size(), is(1)); assertThat(response.getNodesMap().get(server2NodeId), notNullValue()); response = client().admin().cluster().nodesInfo(nodesInfoRequest(server2NodeId)).actionGet(); - assertThat(response.getNodes().length, is(1)); + assertThat(response.getNodes().size(), is(1)); assertThat(response.getNodesMap().get(server2NodeId), notNullValue()); } @@ -99,7 +99,7 @@ public void testAllocatedProcessors() throws Exception { NodesInfoResponse response = client().admin().cluster().prepareNodesInfo().execute().actionGet(); - assertThat(response.getNodes().length, is(2)); + assertThat(response.getNodes().size(), is(2)); assertThat(response.getNodesMap().get(server1NodeId), notNullValue()); assertThat(response.getNodesMap().get(server2NodeId), notNullValue()); diff --git a/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java b/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java index 95a931d1d5424..baf017bc7b91d 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java +++ b/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java @@ -168,8 +168,8 @@ public void testUpdatingThreadPoolSettings() throws Exception { // Check that node info is correct NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo().all().execute().actionGet(); - for (int i = 0; i < 2; i++) { - NodeInfo nodeInfo = nodesInfoResponse.getNodes()[i]; + assertEquals(2, nodesInfoResponse.getNodes().size()); + for (NodeInfo nodeInfo : nodesInfoResponse.getNodes()) { boolean found = false; for (ThreadPool.Info info : nodeInfo.getThreadPool()) { if (info.getName().equals(Names.SEARCH)) { diff --git a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportPublishAddressIT.java b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportPublishAddressIT.java index 0fceda316641c..75faa8c49b4c0 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportPublishAddressIT.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportPublishAddressIT.java @@ -67,7 +67,7 @@ public void testDifferentPorts() throws Exception { logger.info("--> checking if boundAddress matching publishAddress has same port"); NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo().get(); - for (NodeInfo nodeInfo : nodesInfoResponse) { + for (NodeInfo nodeInfo : nodesInfoResponse.getNodes()) { BoundTransportAddress boundTransportAddress = nodeInfo.getTransport().getAddress(); if (nodeInfo.getNode().getName().equals(ipv4OnlyNode)) { assertThat(boundTransportAddress.boundAddresses().length, equalTo(1)); diff --git a/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/SearchStatsTests.java b/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/SearchStatsTests.java index 95a2691d1c4e0..52b2f5af797c2 100644 --- a/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/SearchStatsTests.java +++ b/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/SearchStatsTests.java @@ -130,10 +130,10 @@ public void testSimpleStats() throws Exception { assertThat(indicesStats.getTotal().getSearch().getGroupStats().get("group1").getFetchCount(), greaterThan(0L)); assertThat(indicesStats.getTotal().getSearch().getGroupStats().get("group1").getFetchTimeInMillis(), greaterThan(0L)); NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet(); - NodeStats[] nodes = nodeStats.getNodes(); + Set nodeIdsWithIndex = nodeIdsWithIndex("test1", "test2"); int num = 0; - for (NodeStats stat : nodes) { + for (NodeStats stat : nodeStats.getNodes()) { Stats total = stat.getIndices().getSearch().getTotal(); if (nodeIdsWithIndex.contains(stat.getNode().getId())) { assertThat(total.getQueryCount(), greaterThan(0L)); diff --git a/plugins/discovery-azure/src/test/java/org/elasticsearch/cloud/azure/AbstractAzureComputeServiceTestCase.java b/plugins/discovery-azure/src/test/java/org/elasticsearch/cloud/azure/AbstractAzureComputeServiceTestCase.java index 3f78b1f668384..0c57ec3f16e57 100644 --- a/plugins/discovery-azure/src/test/java/org/elasticsearch/cloud/azure/AbstractAzureComputeServiceTestCase.java +++ b/plugins/discovery-azure/src/test/java/org/elasticsearch/cloud/azure/AbstractAzureComputeServiceTestCase.java @@ -65,6 +65,6 @@ protected void checkNumberOfNodes(int expected) { NodesInfoResponse nodeInfos = client().admin().cluster().prepareNodesInfo().execute().actionGet(); assertNotNull(nodeInfos); assertNotNull(nodeInfos.getNodes()); - assertEquals(expected, nodeInfos.getNodes().length); + assertEquals(expected, nodeInfos.getNodes().size()); } } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java index d66b1f9fc9ee7..ef8857475b358 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java @@ -38,6 +38,8 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.threadpool.ThreadPool; +import java.util.Arrays; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import static java.util.Collections.emptyMap; @@ -107,7 +109,7 @@ public void setN3Usage(String nodeName, DiskUsage newUsage) { @Override public CountDownLatch updateNodeStats(final ActionListener listener) { - NodesStatsResponse response = new NodesStatsResponse(clusterName, stats); + NodesStatsResponse response = new NodesStatsResponse(clusterName, Arrays.asList(stats), Collections.emptyList()); listener.onResponse(response); return new CountDownLatch(0); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index a31ef76272ec8..247a340d8c49b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -2046,8 +2046,8 @@ protected HttpRequestBuilder httpClient() { protected HttpRequestBuilder httpClient(CloseableHttpClient httpClient) { final NodesInfoResponse nodeInfos = client().admin().cluster().prepareNodesInfo().get(); - final NodeInfo[] nodes = nodeInfos.getNodes(); - assertTrue(nodes.length > 0); + final List nodes = nodeInfos.getNodes(); + assertFalse(nodeInfos.hasFailures()); TransportAddress publishAddress = randomFrom(nodes).getHttp().address().publishAddress(); assertEquals(1, publishAddress.uniqueAddressTypeId()); InetSocketAddress address = ((InetSocketTransportAddress) publishAddress).address(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ExternalNode.java b/test/framework/src/main/java/org/elasticsearch/test/ExternalNode.java index 296f34637dd14..4625aa77e25de 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ExternalNode.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ExternalNode.java @@ -154,8 +154,7 @@ synchronized void startInternal(Client client, Settings settings, String nodeNam static boolean waitForNode(final Client client, final String name) throws InterruptedException { return ESTestCase.awaitBusy(() -> { final NodesInfoResponse nodeInfos = client.admin().cluster().prepareNodesInfo().get(); - final NodeInfo[] nodes = nodeInfos.getNodes(); - for (NodeInfo info : nodes) { + for (NodeInfo info : nodeInfos.getNodes()) { if (name.equals(info.getNode().getName())) { return true; } @@ -166,8 +165,7 @@ static boolean waitForNode(final Client client, final String name) throws Interr static NodeInfo nodeInfo(final Client client, final String nodeName) { final NodesInfoResponse nodeInfos = client.admin().cluster().prepareNodesInfo().get(); - final NodeInfo[] nodes = nodeInfos.getNodes(); - for (NodeInfo info : nodes) { + for (NodeInfo info : nodeInfos.getNodes()) { if (nodeName.equals(info.getNode().getName())) { return info; } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ExternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/ExternalTestCluster.java index 17c3e3d0805b5..5372c319daeef 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ExternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ExternalTestCluster.java @@ -87,12 +87,12 @@ public ExternalTestCluster(Path tempDir, Settings additionalSettings, Collection try { client.addTransportAddresses(transportAddresses); NodesInfoResponse nodeInfos = client.admin().cluster().prepareNodesInfo().clear().setSettings(true).setHttp(true).get(); - httpAddresses = new InetSocketAddress[nodeInfos.getNodes().length]; + httpAddresses = new InetSocketAddress[nodeInfos.getNodes().size()]; this.clusterName = nodeInfos.getClusterName().value(); int dataNodes = 0; int masterAndDataNodes = 0; - for (int i = 0; i < nodeInfos.getNodes().length; i++) { - NodeInfo nodeInfo = nodeInfos.getNodes()[i]; + for (int i = 0; i < nodeInfos.getNodes().size(); i++) { + NodeInfo nodeInfo = nodeInfos.getNodes().get(i); httpAddresses[i] = ((InetSocketTransportAddress) nodeInfo.getHttp().address().publishAddress()).address(); if (DiscoveryNode.isDataNode(nodeInfo.getSettings())) { dataNodes++;