Skip to content

Commit 69e898d

Browse files
authored
Group field caps response by index mapping hash (#83494)
This commit utilizes the index mapping hash to share the fields-caps for indices with the same index mapping to reduce the memory usage and the size of transport messages. Closes #78665 Closes #82879
1 parent acf9968 commit 69e898d

9 files changed

+302
-71
lines changed

docs/changelog/83494.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pr: 83494
2+
summary: Group field caps response by index mapping hash
3+
area: Search
4+
type: enhancement
5+
issues:
6+
- 78665
7+
- 82879

server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesFetcher.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.elasticsearch.action.fieldcaps;
1010

11+
import org.elasticsearch.cluster.metadata.MappingMetadata;
1112
import org.elasticsearch.index.IndexService;
1213
import org.elasticsearch.index.engine.Engine;
1314
import org.elasticsearch.index.mapper.MappedFieldType;
@@ -37,6 +38,7 @@
3738
*/
3839
class FieldCapabilitiesFetcher {
3940
private final IndicesService indicesService;
41+
private final Map<String, Map<String, IndexFieldCapabilities>> indexMappingHashToResponses = new HashMap<>();
4042

4143
FieldCapabilitiesFetcher(IndicesService indicesService) {
4244
this.indicesService = indicesService;
@@ -65,17 +67,34 @@ FieldCapabilitiesIndexResponse fetch(
6567
);
6668

6769
if (canMatchShard(shardId, indexFilter, nowInMillis, searchExecutionContext) == false) {
68-
return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), Collections.emptyMap(), false);
70+
return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), null, Collections.emptyMap(), false);
6971
}
7072

71-
Predicate<String> fieldPredicate = indicesService.getFieldFilter().apply(shardId.getIndexName());
73+
final MappingMetadata mapping = indexService.getMetadata().mapping();
74+
final String indexMappingHash = mapping != null ? mapping.getSha256() : null;
75+
if (indexMappingHash != null) {
76+
final Map<String, IndexFieldCapabilities> existing = indexMappingHashToResponses.get(indexMappingHash);
77+
if (existing != null) {
78+
return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), indexMappingHash, existing, true);
79+
}
80+
}
7281

73-
return retrieveFieldCaps(shardId.getIndexName(), searchExecutionContext, fieldPatterns, filters, fieldTypes, fieldPredicate);
82+
Predicate<String> fieldPredicate = indicesService.getFieldFilter().apply(shardId.getIndexName());
83+
final Map<String, IndexFieldCapabilities> responseMap = retrieveFieldCaps(
84+
searchExecutionContext,
85+
fieldPatterns,
86+
filters,
87+
fieldTypes,
88+
fieldPredicate
89+
);
90+
if (indexMappingHash != null) {
91+
indexMappingHashToResponses.put(indexMappingHash, responseMap);
92+
}
93+
return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), indexMappingHash, responseMap, true);
7494
}
7595
}
7696

77-
public static FieldCapabilitiesIndexResponse retrieveFieldCaps(
78-
String indexName,
97+
static Map<String, IndexFieldCapabilities> retrieveFieldCaps(
7998
SearchExecutionContext context,
8099
String[] fieldPatterns,
81100
String[] filters,
@@ -141,7 +160,7 @@ public static FieldCapabilitiesIndexResponse retrieveFieldCaps(
141160
}
142161
}
143162
}
144-
return new FieldCapabilitiesIndexResponse(indexName, responseMap, true);
163+
return responseMap;
145164
}
146165

147166
private static boolean checkIncludeParents(String[] filters) {

server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexResponse.java

Lines changed: 96 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,34 +9,113 @@
99
package org.elasticsearch.action.fieldcaps;
1010

1111
import org.elasticsearch.Version;
12-
import org.elasticsearch.action.ActionResponse;
1312
import org.elasticsearch.common.io.stream.StreamInput;
1413
import org.elasticsearch.common.io.stream.StreamOutput;
1514
import org.elasticsearch.common.io.stream.Writeable;
15+
import org.elasticsearch.core.Nullable;
1616

1717
import java.io.IOException;
18+
import java.util.List;
1819
import java.util.Map;
1920
import java.util.Objects;
21+
import java.util.function.Predicate;
22+
import java.util.stream.Collectors;
23+
import java.util.stream.Stream;
24+
25+
final class FieldCapabilitiesIndexResponse implements Writeable {
26+
private static final Version MAPPING_HASH_VERSION = Version.V_8_2_0;
2027

21-
public class FieldCapabilitiesIndexResponse extends ActionResponse implements Writeable {
2228
private final String indexName;
29+
@Nullable
30+
private final String indexMappingHash;
2331
private final Map<String, IndexFieldCapabilities> responseMap;
2432
private final boolean canMatch;
2533
private final transient Version originVersion;
2634

27-
FieldCapabilitiesIndexResponse(String indexName, Map<String, IndexFieldCapabilities> responseMap, boolean canMatch) {
35+
FieldCapabilitiesIndexResponse(
36+
String indexName,
37+
@Nullable String indexMappingHash,
38+
Map<String, IndexFieldCapabilities> responseMap,
39+
boolean canMatch
40+
) {
2841
this.indexName = indexName;
42+
this.indexMappingHash = indexMappingHash;
2943
this.responseMap = responseMap;
3044
this.canMatch = canMatch;
3145
this.originVersion = Version.CURRENT;
3246
}
3347

3448
FieldCapabilitiesIndexResponse(StreamInput in) throws IOException {
35-
super(in);
3649
this.indexName = in.readString();
3750
this.responseMap = in.readMap(StreamInput::readString, IndexFieldCapabilities::new);
3851
this.canMatch = in.readBoolean();
3952
this.originVersion = in.getVersion();
53+
if (in.getVersion().onOrAfter(MAPPING_HASH_VERSION)) {
54+
this.indexMappingHash = in.readOptionalString();
55+
} else {
56+
this.indexMappingHash = null;
57+
}
58+
}
59+
60+
@Override
61+
public void writeTo(StreamOutput out) throws IOException {
62+
out.writeString(indexName);
63+
out.writeMap(responseMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
64+
out.writeBoolean(canMatch);
65+
if (out.getVersion().onOrAfter(MAPPING_HASH_VERSION)) {
66+
out.writeOptionalString(indexMappingHash);
67+
}
68+
}
69+
70+
private record GroupByMappingHash(List<String> indices, String indexMappingHash, Map<String, IndexFieldCapabilities> responseMap)
71+
implements
72+
Writeable {
73+
GroupByMappingHash(StreamInput in) throws IOException {
74+
this(in.readStringList(), in.readString(), in.readMap(StreamInput::readString, IndexFieldCapabilities::new));
75+
}
76+
77+
@Override
78+
public void writeTo(StreamOutput out) throws IOException {
79+
out.writeStringCollection(indices);
80+
out.writeString(indexMappingHash);
81+
out.writeMap(responseMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
82+
}
83+
84+
List<FieldCapabilitiesIndexResponse> getResponses() {
85+
return indices.stream().map(index -> new FieldCapabilitiesIndexResponse(index, indexMappingHash, responseMap, true)).toList();
86+
}
87+
}
88+
89+
static List<FieldCapabilitiesIndexResponse> readList(StreamInput input) throws IOException {
90+
if (input.getVersion().before(MAPPING_HASH_VERSION)) {
91+
return input.readList(FieldCapabilitiesIndexResponse::new);
92+
}
93+
final List<FieldCapabilitiesIndexResponse> ungroupedList = input.readList(FieldCapabilitiesIndexResponse::new);
94+
final List<GroupByMappingHash> groups = input.readList(GroupByMappingHash::new);
95+
return Stream.concat(ungroupedList.stream(), groups.stream().flatMap(g -> g.getResponses().stream())).toList();
96+
}
97+
98+
static void writeList(StreamOutput output, List<FieldCapabilitiesIndexResponse> responses) throws IOException {
99+
if (output.getVersion().before(MAPPING_HASH_VERSION)) {
100+
output.writeCollection(responses);
101+
return;
102+
}
103+
final Predicate<FieldCapabilitiesIndexResponse> canGroup = r -> r.canMatch && r.indexMappingHash != null;
104+
final List<FieldCapabilitiesIndexResponse> ungroupedResponses = responses.stream().filter(r -> canGroup.test(r) == false).toList();
105+
final List<GroupByMappingHash> groupedResponses = responses.stream()
106+
.filter(canGroup)
107+
.collect(Collectors.groupingBy(r -> r.indexMappingHash))
108+
.values()
109+
.stream()
110+
.map(rs -> {
111+
final String indexMappingHash = rs.get(0).indexMappingHash;
112+
final Map<String, IndexFieldCapabilities> responseMap = rs.get(0).responseMap;
113+
final List<String> indices = rs.stream().map(r -> r.indexName).toList();
114+
return new GroupByMappingHash(indices, indexMappingHash, responseMap);
115+
})
116+
.toList();
117+
output.writeList(ungroupedResponses);
118+
output.writeList(groupedResponses);
40119
}
41120

42121
/**
@@ -46,6 +125,14 @@ public String getIndexName() {
46125
return indexName;
47126
}
48127

128+
/**
129+
* Returns the index mapping hash associated with this index if exists
130+
*/
131+
@Nullable
132+
public String getIndexMappingHash() {
133+
return indexMappingHash;
134+
}
135+
49136
public boolean canMatch() {
50137
return canMatch;
51138
}
@@ -69,23 +156,19 @@ Version getOriginVersion() {
69156
return originVersion;
70157
}
71158

72-
@Override
73-
public void writeTo(StreamOutput out) throws IOException {
74-
out.writeString(indexName);
75-
out.writeMap(responseMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
76-
out.writeBoolean(canMatch);
77-
}
78-
79159
@Override
80160
public boolean equals(Object o) {
81161
if (this == o) return true;
82162
if (o == null || getClass() != o.getClass()) return false;
83163
FieldCapabilitiesIndexResponse that = (FieldCapabilitiesIndexResponse) o;
84-
return canMatch == that.canMatch && Objects.equals(indexName, that.indexName) && Objects.equals(responseMap, that.responseMap);
164+
return canMatch == that.canMatch
165+
&& Objects.equals(indexName, that.indexName)
166+
&& Objects.equals(indexMappingHash, that.indexMappingHash)
167+
&& Objects.equals(responseMap, that.responseMap);
85168
}
86169

87170
@Override
88171
public int hashCode() {
89-
return Objects.hash(indexName, responseMap, canMatch);
172+
return Objects.hash(indexName, indexMappingHash, responseMap, canMatch);
90173
}
91174
}

server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponse.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@ class FieldCapabilitiesNodeResponse extends ActionResponse implements Writeable
3737

3838
FieldCapabilitiesNodeResponse(StreamInput in) throws IOException {
3939
super(in);
40-
this.indexResponses = in.readList(FieldCapabilitiesIndexResponse::new);
40+
this.indexResponses = FieldCapabilitiesIndexResponse.readList(in);
4141
this.failures = in.readMap(ShardId::new, StreamInput::readException);
4242
this.unmatchedShardIds = in.readSet(ShardId::new);
4343
}
4444

4545
@Override
4646
public void writeTo(StreamOutput out) throws IOException {
47-
out.writeList(indexResponses);
47+
FieldCapabilitiesIndexResponse.writeList(out, indexResponses);
4848
out.writeMap(failures, (o, v) -> v.writeTo(o), StreamOutput::writeException);
4949
out.writeCollection(unmatchedShardIds);
5050
}

server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public FieldCapabilitiesResponse(StreamInput in) throws IOException {
7575
super(in);
7676
indices = in.readStringArray();
7777
this.responseMap = in.readMap(StreamInput::readString, FieldCapabilitiesResponse::readField);
78-
indexResponses = in.readList(FieldCapabilitiesIndexResponse::new);
78+
this.indexResponses = FieldCapabilitiesIndexResponse.readList(in);
7979
this.failures = in.readList(FieldCapabilitiesFailure::new);
8080
}
8181

@@ -141,7 +141,7 @@ private static Map<String, FieldCapabilities> readField(StreamInput in) throws I
141141
public void writeTo(StreamOutput out) throws IOException {
142142
out.writeStringArray(indices);
143143
out.writeMap(responseMap, StreamOutput::writeString, FieldCapabilitiesResponse::writeField);
144-
out.writeList(indexResponses);
144+
FieldCapabilitiesIndexResponse.writeList(out, indexResponses);
145145
out.writeList(failures);
146146
}
147147

server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.List;
4343
import java.util.Map;
4444
import java.util.Set;
45+
import java.util.function.Consumer;
4546
import java.util.function.Predicate;
4647
import java.util.stream.Collectors;
4748

@@ -55,8 +56,8 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
5556
private final ClusterService clusterService;
5657
private final IndexNameExpressionResolver indexNameExpressionResolver;
5758

58-
private final FieldCapabilitiesFetcher fieldCapabilitiesFetcher;
5959
private final Predicate<String> metadataFieldPred;
60+
private final IndicesService indicesService;
6061
private final boolean ccsCheckCompatibility;
6162

6263
@Inject
@@ -73,7 +74,7 @@ public TransportFieldCapabilitiesAction(
7374
this.transportService = transportService;
7475
this.clusterService = clusterService;
7576
this.indexNameExpressionResolver = indexNameExpressionResolver;
76-
this.fieldCapabilitiesFetcher = new FieldCapabilitiesFetcher(indicesService);
77+
this.indicesService = indicesService;
7778
final Set<String> metadataFields = indicesService.getAllMetadataFields();
7879
this.metadataFieldPred = metadataFields::contains;
7980
transportService.registerRequestHandler(
@@ -112,6 +113,17 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
112113
checkIndexBlocks(clusterState, concreteIndices);
113114

114115
final Map<String, FieldCapabilitiesIndexResponse> indexResponses = Collections.synchronizedMap(new HashMap<>());
116+
// This map is used to share the index response for indices which have the same index mapping hash to reduce the memory usage.
117+
final Map<String, Map<String, IndexFieldCapabilities>> indexMappingHashToResponses = Collections.synchronizedMap(new HashMap<>());
118+
final Consumer<FieldCapabilitiesIndexResponse> handleIndexResponse = resp -> {
119+
if (resp.canMatch() && resp.getIndexMappingHash() != null) {
120+
Map<String, IndexFieldCapabilities> curr = indexMappingHashToResponses.putIfAbsent(resp.getIndexMappingHash(), resp.get());
121+
if (curr != null) {
122+
resp = new FieldCapabilitiesIndexResponse(resp.getIndexName(), resp.getIndexMappingHash(), curr, true);
123+
}
124+
}
125+
indexResponses.putIfAbsent(resp.getIndexName(), resp);
126+
};
115127
final FailureCollector indexFailures = new FailureCollector();
116128
// One for each cluster including the local cluster
117129
final CountDown completionCounter = new CountDown(1 + remoteClusterIndices.size());
@@ -125,7 +137,7 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
125137
nowInMillis,
126138
concreteIndices,
127139
threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION),
128-
indexResponse -> indexResponses.putIfAbsent(indexResponse.getIndexName(), indexResponse),
140+
handleIndexResponse,
129141
indexFailures::collect,
130142
countDown
131143
);
@@ -141,7 +153,9 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
141153
remoteClusterClient.fieldCaps(remoteRequest, ActionListener.wrap(response -> {
142154
for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
143155
String indexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName());
144-
indexResponses.putIfAbsent(indexName, new FieldCapabilitiesIndexResponse(indexName, resp.get(), resp.canMatch()));
156+
handleIndexResponse.accept(
157+
new FieldCapabilitiesIndexResponse(indexName, resp.getIndexMappingHash(), resp.get(), resp.canMatch())
158+
);
145159
}
146160
for (FieldCapabilitiesFailure failure : response.getFailures()) {
147161
Exception ex = failure.getException();
@@ -347,12 +361,13 @@ public void messageReceived(FieldCapabilitiesNodeRequest request, TransportChann
347361
final Map<String, List<ShardId>> groupedShardIds = request.shardIds()
348362
.stream()
349363
.collect(Collectors.groupingBy(ShardId::getIndexName));
364+
final FieldCapabilitiesFetcher fetcher = new FieldCapabilitiesFetcher(indicesService);
350365
for (List<ShardId> shardIds : groupedShardIds.values()) {
351366
final Map<ShardId, Exception> failures = new HashMap<>();
352367
final Set<ShardId> unmatched = new HashSet<>();
353368
for (ShardId shardId : shardIds) {
354369
try {
355-
final FieldCapabilitiesIndexResponse response = fieldCapabilitiesFetcher.fetch(
370+
final FieldCapabilitiesIndexResponse response = fetcher.fetch(
356371
shardId,
357372
request.fields(),
358373
request.filters(),

0 commit comments

Comments
 (0)