Skip to content

Remove "Push back excessive requests for stats (#83832)" #87054

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions docs/changelog/83832.yaml

This file was deleted.

35 changes: 0 additions & 35 deletions docs/reference/cluster/nodes-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,6 @@ using metrics.
`transport`::
Transport statistics about sent and received bytes in cluster
communication.

`stats_requests`::
Statistics about stats requests such as indices stats, nodes stats,
recovery stats etc.
--

`<index_metric>`::
Expand Down Expand Up @@ -2647,37 +2643,6 @@ search requests on the keyed node.
The rank of this node; used for shard selection when routing search
requests.
======
======

[[cluster-nodes-stats-api-response-body-stats-requests]]
`stats_requests`::
(object)
Contains statistics about the stats requests the node has received.
+
.Properties of `stats_requests`
[%collapsible%open]
======
`<stats_requests_name>`::
(object)
Contains statistics about a specific type of a stats request the node has received.
+
.Properties of `<stats_requests_name>`
[%collapsible%open]
=======
`current`::
(integer)
Number of stats requests currently in progress.

`completed`::
(integer)
Number of stats requests that have been completed by the node (successfully or
not).

`rejected`::
(integer)
Number of stats requests that were rejected by the node because it had reached
the limit of concurrent stats requests (`node.stats.max_concurrent_requests`).
=======
=====
====

Expand Down
18 changes: 0 additions & 18 deletions docs/reference/modules/cluster/misc.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -101,24 +101,6 @@ number of shards for each node, use the
setting.
--

[[stats-requests-limit]]
===== Stats request limit

A stats request might require information from all nodes to be aggregated before it returns to the user.
These requests can be heavy and they put extra pressure on the coordinating node (the node collecting the
responses from all the nodes), for this reason there is a limit on the concurrent requests that a node can coordinate.

--

[[node-stats-max-concurrent-requests]]
`node.stats.max_concurrent_requests`::
+
--
(<<dynamic-cluster-setting,Dynamic>>)
Limits the stats requests a coordinating node can concurrently handle. Defaults to `100`.



[[user-defined-data]]
===== User-defined cluster metadata

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.action.support.StatsRequestLimiter.MAX_CONCURRENT_STATS_REQUESTS_PER_NODE;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful;
Expand Down Expand Up @@ -1387,57 +1386,52 @@ public void testConcurrentIndexingAndStatsRequests() throws BrokenBarrierExcepti
}

// start threads that will get stats concurrently with indexing
try {
updateClusterSettings(Settings.builder().put(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey(), numberOfStatsThreads + 1));
for (int i = 0; i < numberOfStatsThreads; i++) {
final Thread thread = new Thread(() -> {
for (int i = 0; i < numberOfStatsThreads; i++) {
final Thread thread = new Thread(() -> {
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
failed.set(true);
executionFailures.get().add(e);
latch.countDown();
}
final IndicesStatsRequest request = new IndicesStatsRequest();
request.all();
request.indices(new String[0]);
while (stop.get() == false) {
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
failed.set(true);
executionFailures.get().add(e);
latch.countDown();
}
final IndicesStatsRequest request = new IndicesStatsRequest();
request.all();
request.indices(new String[0]);
while (stop.get() == false) {
try {
final IndicesStatsResponse response = client().admin().indices().stats(request).get();
if (response.getFailedShards() > 0) {
failed.set(true);
shardFailures.get().addAll(Arrays.asList(response.getShardFailures()));
latch.countDown();
}
} catch (final ExecutionException | InterruptedException e) {
final IndicesStatsResponse response = client().admin().indices().stats(request).get();
if (response.getFailedShards() > 0) {
failed.set(true);
executionFailures.get().add(e);
shardFailures.get().addAll(Arrays.asList(response.getShardFailures()));
latch.countDown();
}
} catch (final ExecutionException | InterruptedException e) {
failed.set(true);
executionFailures.get().add(e);
latch.countDown();
}
});
thread.setName("stats-" + i);
threads.add(thread);
thread.start();
}

// release the hounds
barrier.await();
}
});
thread.setName("stats-" + i);
threads.add(thread);
thread.start();
}

// wait for a failure, or for fifteen seconds to elapse
latch.await(15, TimeUnit.SECONDS);
// release the hounds
barrier.await();

// stop all threads and wait for them to complete
stop.set(true);
for (final Thread thread : threads) {
thread.join();
}
// wait for a failure, or for fifteen seconds to elapse
latch.await(15, TimeUnit.SECONDS);

assertThat(shardFailures.get(), emptyCollectionOf(DefaultShardOperationFailedException.class));
assertThat(executionFailures.get(), emptyCollectionOf(Exception.class));
} finally {
updateClusterSettings(Settings.builder().putNull(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey()));
// stop all threads and wait for them to complete
stop.set(true);
for (final Thread thread : threads) {
thread.join();
}

assertThat(shardFailures.get(), emptyCollectionOf(DefaultShardOperationFailedException.class));
assertThat(executionFailures.get(), emptyCollectionOf(Exception.class));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@

package org.elasticsearch.action.admin.cluster.node.info;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.StatsRequestLimiter;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -35,16 +33,14 @@ public class TransportNodesInfoAction extends TransportNodesAction<
NodeInfo> {

private final NodeService nodeService;
private final StatsRequestLimiter statsRequestLimiter;

@Inject
public TransportNodesInfoAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
NodeService nodeService,
ActionFilters actionFilters,
StatsRequestLimiter statsRequestLimiter
ActionFilters actionFilters
) {
super(
NodesInfoAction.NAME,
Expand All @@ -58,7 +54,6 @@ public TransportNodesInfoAction(
NodeInfo.class
);
this.nodeService = nodeService;
this.statsRequestLimiter = statsRequestLimiter;
}

@Override
Expand Down Expand Up @@ -99,11 +94,6 @@ protected NodeInfo nodeOperation(NodeInfoRequest nodeRequest, Task task) {
);
}

@Override
protected void doExecute(Task task, NodesInfoRequest request, ActionListener<NodesInfoResponse> listener) {
statsRequestLimiter.tryToExecute(task, request, listener, super::doExecute);
}

public static class NodeInfoRequest extends TransportRequest {

NodesInfoRequest request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.elasticsearch.action.admin.cluster.node.stats;

import org.elasticsearch.Version;
import org.elasticsearch.action.support.StatsRequestStats;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
Expand Down Expand Up @@ -89,9 +87,6 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private IndexingPressureStats indexingPressureStats;

@Nullable
private StatsRequestStats statsRequestStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand All @@ -112,9 +107,6 @@ public NodeStats(StreamInput in) throws IOException {
ingestStats = in.readOptionalWriteable(IngestStats::new);
adaptiveSelectionStats = in.readOptionalWriteable(AdaptiveSelectionStats::new);
indexingPressureStats = in.readOptionalWriteable(IndexingPressureStats::new);
if (in.getVersion().onOrAfter(Version.V_8_3_0)) {
statsRequestStats = in.readOptionalWriteable(StatsRequestStats::new);
}
}

public NodeStats(
Expand All @@ -134,8 +126,7 @@ public NodeStats(
@Nullable IngestStats ingestStats,
@Nullable AdaptiveSelectionStats adaptiveSelectionStats,
@Nullable ScriptCacheStats scriptCacheStats,
@Nullable IndexingPressureStats indexingPressureStats,
@Nullable StatsRequestStats statsRequestStats
@Nullable IndexingPressureStats indexingPressureStats
) {
super(node);
this.timestamp = timestamp;
Expand All @@ -154,7 +145,6 @@ public NodeStats(
this.adaptiveSelectionStats = adaptiveSelectionStats;
this.scriptCacheStats = scriptCacheStats;
this.indexingPressureStats = indexingPressureStats;
this.statsRequestStats = statsRequestStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -259,11 +249,6 @@ public IndexingPressureStats getIndexingPressureStats() {
return indexingPressureStats;
}

@Nullable
public StatsRequestStats getStatsRequestStats() {
return statsRequestStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand All @@ -287,9 +272,6 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(ingestStats);
out.writeOptionalWriteable(adaptiveSelectionStats);
out.writeOptionalWriteable(indexingPressureStats);
if (out.getVersion().onOrAfter(Version.V_8_3_0)) {
out.writeOptionalWriteable(statsRequestStats);
}
}

@Override
Expand Down Expand Up @@ -359,9 +341,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getIndexingPressureStats() != null) {
getIndexingPressureStats().toXContent(builder, params);
}
if (getStatsRequestStats() != null) {
getStatsRequestStats().toXContent(builder, params);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ public enum Metric {
INGEST("ingest"),
ADAPTIVE_SELECTION("adaptive_selection"),
SCRIPT_CACHE("script_cache"),
INDEXING_PRESSURE("indexing_pressure"),
STATS_REQUESTS("stats_requests"),;
INDEXING_PRESSURE("indexing_pressure"),;

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@

package org.elasticsearch.action.admin.cluster.node.stats;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.StatsRequestLimiter;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -38,16 +36,14 @@ public class TransportNodesStatsAction extends TransportNodesAction<
NodeStats> {

private final NodeService nodeService;
private final StatsRequestLimiter statsRequestLimiter;

@Inject
public TransportNodesStatsAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
NodeService nodeService,
ActionFilters actionFilters,
StatsRequestLimiter statsRequestLimiter
ActionFilters actionFilters
) {
super(
NodesStatsAction.NAME,
Expand All @@ -61,7 +57,6 @@ public TransportNodesStatsAction(
NodeStats.class
);
this.nodeService = nodeService;
this.statsRequestLimiter = statsRequestLimiter;
}

@Override
Expand Down Expand Up @@ -100,16 +95,10 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest, Task task)
NodesStatsRequest.Metric.INGEST.containedIn(metrics),
NodesStatsRequest.Metric.ADAPTIVE_SELECTION.containedIn(metrics),
NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics),
NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.STATS_REQUESTS.containedIn(metrics)
NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics)
);
}

@Override
protected void doExecute(Task task, NodesStatsRequest request, ActionListener<NodesStatsResponse> listener) {
statsRequestLimiter.tryToExecute(task, request, listener, super::doExecute);
}

public static class NodeStatsRequest extends TransportRequest {

NodesStatsRequest request;
Expand Down
Loading