Skip to content

Commit a94f058

Browse files
dnhatnjtibshirani
andauthored
Add node-level field caps requests (#79212)
Currently to gather field caps, the coordinator sends a separate transport request per index. When the original request targets many indices, the overhead of all these sub-requests can add up and hurt performance. This PR switches the execution strategy to reduce the number of transport requests: it groups together the index requests that target the same node, then sends only one request to each node. Relates #77047 Relates # #78647 Co-authored-by: Julie Tibshirani <[email protected]>
1 parent f2ccc24 commit a94f058

17 files changed

+2217
-381
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/IndicesRequestIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ public void testGetFieldMappings() {
179179
}
180180

181181
public void testFieldCapabilities() {
182-
String fieldCapabilitiesShardAction = FieldCapabilitiesAction.NAME + "[index][s]";
182+
String fieldCapabilitiesShardAction = FieldCapabilitiesAction.NAME + "[n]";
183183
interceptTransportActions(fieldCapabilitiesShardAction);
184184

185185
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest();

server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/CCSFieldCapabilitiesIT.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,7 @@ public void testFailuresFromRemote() {
102102
.filter(f -> Arrays.asList(f.getIndices()).contains("remote_cluster:" + remoteErrorIndex))
103103
.findFirst().get();
104104
ex = failure.getException();
105-
assertEquals(RemoteTransportException.class, ex.getClass());
106-
cause = ExceptionsHelper.unwrapCause(ex);
107-
assertEquals(IllegalArgumentException.class, cause.getClass());
108-
assertEquals("I throw because I choose to.", cause.getMessage());
105+
assertEquals(IllegalArgumentException.class, ex.getClass());
106+
assertEquals("I throw because I choose to.", ex.getMessage());
109107
}
110108
}

server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java

Lines changed: 192 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,35 @@
1010

1111
import org.apache.lucene.search.MatchAllDocsQuery;
1212
import org.apache.lucene.search.Query;
13+
import org.elasticsearch.ElasticsearchException;
1314
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
15+
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
16+
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
17+
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
1418
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
19+
import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction;
1520
import org.elasticsearch.action.index.IndexRequestBuilder;
21+
import org.elasticsearch.action.support.ActiveShardCount;
22+
import org.elasticsearch.cluster.metadata.IndexMetadata;
23+
import org.elasticsearch.cluster.node.DiscoveryNode;
24+
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
25+
import org.elasticsearch.common.breaker.CircuitBreaker;
26+
import org.elasticsearch.common.breaker.CircuitBreakingException;
1627
import org.elasticsearch.common.io.stream.StreamInput;
1728
import org.elasticsearch.common.io.stream.StreamOutput;
18-
import org.elasticsearch.index.mapper.DocumentParserContext;
29+
import org.elasticsearch.common.settings.Settings;
30+
import org.elasticsearch.index.IndexService;
31+
import org.elasticsearch.index.mapper.TimeSeriesParams;
32+
import org.elasticsearch.index.shard.IndexShard;
33+
import org.elasticsearch.index.shard.ShardId;
34+
import org.elasticsearch.indices.IndicesService;
35+
import org.elasticsearch.test.transport.MockTransportService;
36+
import org.elasticsearch.transport.TransportService;
37+
import org.elasticsearch.xcontent.XContentBuilder;
38+
import org.elasticsearch.xcontent.XContentFactory;
1939
import org.elasticsearch.index.mapper.KeywordFieldMapper;
2040
import org.elasticsearch.index.mapper.MetadataFieldMapper;
21-
import org.elasticsearch.index.mapper.TimeSeriesParams;
41+
import org.elasticsearch.index.mapper.DocumentParserContext;
2242
import org.elasticsearch.index.query.AbstractQueryBuilder;
2343
import org.elasticsearch.index.query.QueryBuilder;
2444
import org.elasticsearch.index.query.QueryBuilders;
@@ -28,9 +48,6 @@
2848
import org.elasticsearch.plugins.Plugin;
2949
import org.elasticsearch.plugins.SearchPlugin;
3050
import org.elasticsearch.test.ESIntegTestCase;
31-
import org.elasticsearch.transport.RemoteTransportException;
32-
import org.elasticsearch.xcontent.XContentBuilder;
33-
import org.elasticsearch.xcontent.XContentFactory;
3451
import org.junit.Before;
3552

3653
import java.io.IOException;
@@ -41,18 +58,29 @@
4158
import java.util.HashMap;
4259
import java.util.List;
4360
import java.util.Map;
61+
import java.util.concurrent.atomic.AtomicBoolean;
4462
import java.util.function.Function;
4563
import java.util.function.Predicate;
4664

4765
import static java.util.Collections.singletonList;
4866
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
67+
import static org.hamcrest.Matchers.aMapWithSize;
4968
import static org.hamcrest.Matchers.array;
5069
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
5170
import static org.hamcrest.Matchers.containsInAnyOrder;
5271
import static org.hamcrest.Matchers.equalTo;
72+
import static org.hamcrest.Matchers.hasKey;
73+
import static org.hamcrest.Matchers.hasSize;
5374

5475
public class FieldCapabilitiesIT extends ESIntegTestCase {
5576

77+
@Override
78+
protected Collection<Class<? extends Plugin>> getMockPlugins() {
79+
final Collection<Class<? extends Plugin>> plugins = new ArrayList<>(super.getMockPlugins());
80+
plugins.add(MockTransportService.TestPlugin.class);
81+
return plugins;
82+
}
83+
5684
@Override
5785
@Before
5886
public void setUp() throws Exception {
@@ -335,9 +363,8 @@ public void testFailures() throws InterruptedException {
335363
assertEquals(2, response.getFailedIndices().length);
336364
assertThat(response.getFailures().get(0).getIndices(), arrayContainingInAnyOrder("index1-error", "index2-error"));
337365
Exception failure = response.getFailures().get(0).getException();
338-
assertEquals(RemoteTransportException.class, failure.getClass());
339-
assertEquals(IllegalArgumentException.class, failure.getCause().getClass());
340-
assertEquals("I throw because I choose to.", failure.getCause().getMessage());
366+
assertEquals(IllegalArgumentException.class, failure.getClass());
367+
assertEquals("I throw because I choose to.", failure.getMessage());
341368

342369
// the "indices" section should not include failed ones
343370
assertThat(Arrays.asList(response.getIndices()), containsInAnyOrder("old_index", "new_index"));
@@ -352,6 +379,163 @@ public void testFailures() throws InterruptedException {
352379
assertEquals("I throw because I choose to.", ex.getMessage());
353380
}
354381

382+
private void populateTimeRangeIndices() throws Exception {
383+
internalCluster().ensureAtLeastNumDataNodes(2);
384+
assertAcked(prepareCreate("log-index-1")
385+
.setSettings(Settings.builder()
386+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
387+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
388+
.setMapping("timestamp", "type=date", "field1", "type=keyword"));
389+
assertAcked(prepareCreate("log-index-2")
390+
.setSettings(Settings.builder()
391+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
392+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
393+
.setMapping("timestamp", "type=date", "field1", "type=long"));
394+
List<IndexRequestBuilder> reqs = new ArrayList<>();
395+
reqs.add(client().prepareIndex("log-index-1").setSource("timestamp", "2015-07-08"));
396+
reqs.add(client().prepareIndex("log-index-1").setSource("timestamp", "2018-07-08"));
397+
reqs.add(client().prepareIndex("log-index-1").setSource("timestamp", "2020-03-03"));
398+
reqs.add(client().prepareIndex("log-index-1").setSource("timestamp", "2020-09-09"));
399+
reqs.add(client().prepareIndex("log-index-2").setSource("timestamp", "2019-10-12"));
400+
reqs.add(client().prepareIndex("log-index-2").setSource("timestamp", "2020-02-02"));
401+
reqs.add(client().prepareIndex("log-index-2").setSource("timestamp", "2020-10-10"));
402+
indexRandom(true, reqs);
403+
ensureGreen("log-index-1", "log-index-2");
404+
client().admin().indices().prepareRefresh("log-index-1", "log-index-2").get();
405+
}
406+
407+
public void testTargetNodeFails() throws Exception {
408+
populateTimeRangeIndices();
409+
try {
410+
final AtomicBoolean failedRequest = new AtomicBoolean();
411+
for (String node : internalCluster().getNodeNames()) {
412+
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
413+
transportService.addRequestHandlingBehavior(TransportFieldCapabilitiesAction.ACTION_NODE_NAME,
414+
(handler, request, channel, task) -> {
415+
if (failedRequest.compareAndSet(false, true)) {
416+
channel.sendResponse(new CircuitBreakingException("Simulated", CircuitBreaker.Durability.TRANSIENT));
417+
} else {
418+
handler.messageReceived(request, channel, task);
419+
}
420+
});
421+
}
422+
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
423+
request.indices("log-index-*");
424+
request.fields("*");
425+
if (randomBoolean()) {
426+
request.indexFilter(QueryBuilders.rangeQuery("timestamp").gte("2020-01-01"));
427+
}
428+
final FieldCapabilitiesResponse response = client().execute(FieldCapabilitiesAction.INSTANCE, request).actionGet();
429+
assertTrue(failedRequest.get());
430+
assertThat(response.getIndices(), arrayContainingInAnyOrder("log-index-1", "log-index-2"));
431+
assertThat(response.getField("field1"), aMapWithSize(2));
432+
assertThat(response.getField("field1"), hasKey("long"));
433+
assertThat(response.getField("field1"), hasKey("keyword"));
434+
} finally {
435+
for (String node : internalCluster().getNodeNames()) {
436+
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
437+
transportService.clearAllRules();
438+
}
439+
}
440+
}
441+
442+
public void testNoActiveCopy() throws Exception {
443+
assertAcked(prepareCreate("log-index-inactive")
444+
.setSettings(Settings.builder()
445+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
446+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
447+
.put("index.routing.allocation.require._id", "unknown"))
448+
.setWaitForActiveShards(ActiveShardCount.NONE)
449+
.setMapping("timestamp", "type=date", "field1", "type=keyword"));
450+
{
451+
final ElasticsearchException ex =
452+
expectThrows(ElasticsearchException.class, () -> client().prepareFieldCaps("log-index-*").setFields("*").get());
453+
assertThat(ex.getMessage(), equalTo("index [log-index-inactive] has no active shard copy"));
454+
}
455+
{
456+
populateTimeRangeIndices();
457+
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
458+
request.indices("log-index-*");
459+
request.fields("*");
460+
if (randomBoolean()) {
461+
request.indexFilter(QueryBuilders.rangeQuery("timestamp").gte("2020-01-01"));
462+
}
463+
final FieldCapabilitiesResponse response = client().execute(FieldCapabilitiesAction.INSTANCE, request).actionGet();
464+
assertThat(response.getIndices(), arrayContainingInAnyOrder("log-index-1", "log-index-2"));
465+
assertThat(response.getField("field1"), aMapWithSize(2));
466+
assertThat(response.getField("field1"), hasKey("long"));
467+
assertThat(response.getField("field1"), hasKey("long"));
468+
469+
assertThat(response.getFailures(), hasSize(1));
470+
final FieldCapabilitiesFailure failure = response.getFailures().get(0);
471+
assertThat(failure.getIndices(), arrayContainingInAnyOrder("log-index-inactive"));
472+
assertThat(failure.getException().getMessage(), equalTo("index [log-index-inactive] has no active shard copy"));
473+
}
474+
}
475+
476+
private void moveOrCloseShardsOnNodes(String nodeName) throws Exception {
477+
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
478+
for (IndexService indexService : indicesService) {
479+
for (IndexShard indexShard : indexService) {
480+
if (randomBoolean()) {
481+
indexShard.close("test", randomBoolean());
482+
} else if (randomBoolean()) {
483+
final ShardId shardId = indexShard.shardId();
484+
final String[] nodeNames = internalCluster().getNodeNames();
485+
final String newNodeName = randomValueOtherThanMany(n -> nodeName.equals(n) == false, () -> randomFrom(nodeNames));
486+
DiscoveryNode fromNode = null;
487+
DiscoveryNode toNode = null;
488+
for (DiscoveryNode node : clusterService().state().nodes()) {
489+
if (node.getName().equals(nodeName)) {
490+
fromNode = node;
491+
}
492+
if (node.getName().equals(newNodeName)) {
493+
toNode = node;
494+
}
495+
}
496+
assertNotNull(fromNode);
497+
assertNotNull(toNode);
498+
client().admin().cluster().prepareReroute()
499+
.add(new MoveAllocationCommand(shardId.getIndexName(), shardId.id(), fromNode.getId(), toNode.getId()))
500+
.execute().actionGet();
501+
}
502+
}
503+
}
504+
}
505+
506+
public void testRelocation() throws Exception {
507+
populateTimeRangeIndices();
508+
try {
509+
final AtomicBoolean relocated = new AtomicBoolean();
510+
for (String node : internalCluster().getNodeNames()) {
511+
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
512+
transportService.addRequestHandlingBehavior(TransportFieldCapabilitiesAction.ACTION_NODE_NAME,
513+
(handler, request, channel, task) -> {
514+
if (relocated.compareAndSet(false, true)) {
515+
moveOrCloseShardsOnNodes(node);
516+
}
517+
handler.messageReceived(request, channel, task);
518+
});
519+
}
520+
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
521+
request.indices("log-index-*");
522+
request.fields("*");
523+
if (randomBoolean()) {
524+
request.indexFilter(QueryBuilders.rangeQuery("timestamp").gte("2020-01-01"));
525+
}
526+
final FieldCapabilitiesResponse response = client().execute(FieldCapabilitiesAction.INSTANCE, request).actionGet();
527+
assertThat(response.getIndices(), arrayContainingInAnyOrder("log-index-1", "log-index-2"));
528+
assertThat(response.getField("field1"), aMapWithSize(2));
529+
assertThat(response.getField("field1"), hasKey("long"));
530+
assertThat(response.getField("field1"), hasKey("long"));
531+
} finally {
532+
for (String node : internalCluster().getNodeNames()) {
533+
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
534+
transportService.clearAllRules();
535+
}
536+
}
537+
}
538+
355539
private void assertIndices(FieldCapabilitiesResponse response, String... indices) {
356540
assertNotNull(response.getIndices());
357541
Arrays.sort(indices);

server/src/main/java/org/elasticsearch/action/OriginalIndices.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import java.io.IOException;
1616
import java.util.Arrays;
17+
import java.util.Objects;
1718

1819
/**
1920
* Used to keep track of original indices within internal (e.g. shard level) requests
@@ -67,4 +68,19 @@ public String toString() {
6768
", indicesOptions=" + indicesOptions +
6869
'}';
6970
}
71+
72+
@Override
73+
public boolean equals(Object o) {
74+
if (this == o) return true;
75+
if (o == null || getClass() != o.getClass()) return false;
76+
OriginalIndices that = (OriginalIndices) o;
77+
return Arrays.equals(indices, that.indices) && Objects.equals(indicesOptions, that.indicesOptions);
78+
}
79+
80+
@Override
81+
public int hashCode() {
82+
int result = Objects.hash(indicesOptions);
83+
result = 31 * result + Arrays.hashCode(indices);
84+
return result;
85+
}
7086
}

0 commit comments

Comments
 (0)