Skip to content

Commit d752179

Browse files
committed
Cache completion stats between refreshes
Computing the stats for completion fields may involve a significant amount of work since it walks every field of every segment looking for completion fields. Innocuous-looking APIs like `GET _stats` or `GET _cluster/stats` do this for every shard in the cluster. This repeated work is unnecessary since these stats do not change between refreshes; in many indices they remain constant for a long time. This commit introduces a cache for these stats which is invalidated on a refresh, allowing most stats calls to bypass the work needed to compute them on most shards. Closes elastic#51915 Backport of elastic#51991
1 parent 14c21ae commit d752179

File tree

7 files changed

+484
-37
lines changed

7 files changed

+484
-37
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
---
2+
setup:
3+
4+
- do:
5+
indices.create:
6+
index: test1
7+
wait_for_active_shards: all
8+
body:
9+
settings:
10+
# Limit the number of shards so that shards are unlikely
11+
# to be relocated or being initialized between the test
12+
# set up and the test execution
13+
index.number_of_shards: 3
14+
index.number_of_replicas: 0
15+
mappings:
16+
properties:
17+
bar:
18+
type: text
19+
fielddata: true
20+
fields:
21+
completion:
22+
type: completion
23+
24+
- do:
25+
cluster.health:
26+
wait_for_no_relocating_shards: true
27+
wait_for_events: languid
28+
29+
- do:
30+
index:
31+
index: test1
32+
id: 1
33+
body: { "bar": "bar" }
34+
35+
- do:
36+
index:
37+
index: test1
38+
id: 2
39+
body: { "bar": "foo" }
40+
41+
- do:
42+
indices.refresh: {}
43+
44+
---
45+
"Completion stats":
46+
- do:
47+
indices.stats: { completion_fields: "*" }
48+
49+
- match: { _shards.failed: 0}
50+
- gt: { _all.total.completion.fields.bar\.completion.size_in_bytes: 0 }
51+
- gt: { _all.total.completion.size_in_bytes: 0 }
52+
- set: { _all.total.completion.size_in_bytes: original_size }
53+
54+
- do:
55+
index:
56+
index: test1
57+
id: 3
58+
body: { "bar": "foo", "baz": "foo" }
59+
60+
- do:
61+
indices.refresh: {}
62+
63+
- do:
64+
indices.stats: { completion_fields: "*" }
65+
66+
- match: { _shards.failed: 0}
67+
- gt: { _all.total.completion.size_in_bytes: $original_size }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.index.engine;
20+
21+
import com.carrotsearch.hppc.ObjectLongHashMap;
22+
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
23+
import org.apache.lucene.index.FieldInfo;
24+
import org.apache.lucene.index.LeafReader;
25+
import org.apache.lucene.index.LeafReaderContext;
26+
import org.apache.lucene.index.Terms;
27+
import org.apache.lucene.search.ReferenceManager;
28+
import org.apache.lucene.search.suggest.document.CompletionTerms;
29+
import org.elasticsearch.action.ActionListener;
30+
import org.elasticsearch.action.support.PlainActionFuture;
31+
import org.elasticsearch.common.FieldMemoryStats;
32+
import org.elasticsearch.common.Nullable;
33+
import org.elasticsearch.common.regex.Regex;
34+
import org.elasticsearch.search.suggest.completion.CompletionStats;
35+
36+
import java.util.function.Supplier;
37+
38+
class CompletionStatsCache implements ReferenceManager.RefreshListener {
39+
40+
private final Supplier<Engine.Searcher> searcherSupplier;
41+
42+
/**
43+
* Contains a future (i.e. non-null) if another thread is already computing stats, in which case wait for this computation to
44+
* complete. Contains null otherwise, in which case compute the stats ourselves and save them here for other threads to use.
45+
* Futures are eventually completed with stats that include all fields, requiring further filtering (see
46+
* {@link CompletionStatsCache#filterCompletionStatsByFieldName}).
47+
*/
48+
@Nullable
49+
private PlainActionFuture<CompletionStats> completionStatsFuture;
50+
51+
/**
52+
* Protects accesses to {@code completionStatsFuture} since we can't use {@link java.util.concurrent.atomic.AtomicReference} in JDK8.
53+
*/
54+
private final Object completionStatsFutureMutex = new Object();
55+
56+
CompletionStatsCache(Supplier<Engine.Searcher> searcherSupplier) {
57+
this.searcherSupplier = searcherSupplier;
58+
}
59+
60+
CompletionStats get(String... fieldNamePatterns) {
61+
final PlainActionFuture<CompletionStats> newFuture = new PlainActionFuture<>();
62+
63+
// final PlainActionFuture<CompletionStats> oldFuture = completionStatsFutureRef.compareAndExchange(null, newFuture);
64+
// except JDK8 doesn't have compareAndExchange so we emulate it:
65+
final PlainActionFuture<CompletionStats> oldFuture;
66+
synchronized (completionStatsFutureMutex) {
67+
if (completionStatsFuture == null) {
68+
completionStatsFuture = newFuture;
69+
oldFuture = null;
70+
} else {
71+
oldFuture = completionStatsFuture;
72+
}
73+
}
74+
75+
if (oldFuture != null) {
76+
// we lost the race, someone else is already computing stats, so we wait for that to finish
77+
return filterCompletionStatsByFieldName(fieldNamePatterns, oldFuture.actionGet());
78+
}
79+
80+
// we won the race, nobody else is already computing stats, so it's up to us
81+
ActionListener.completeWith(newFuture, () -> {
82+
long sizeInBytes = 0;
83+
final ObjectLongHashMap<String> completionFields = new ObjectLongHashMap<>();
84+
85+
try (Engine.Searcher currentSearcher = searcherSupplier.get()) {
86+
for (LeafReaderContext atomicReaderContext : currentSearcher.getIndexReader().leaves()) {
87+
LeafReader atomicReader = atomicReaderContext.reader();
88+
for (FieldInfo info : atomicReader.getFieldInfos()) {
89+
Terms terms = atomicReader.terms(info.name);
90+
if (terms instanceof CompletionTerms) {
91+
// TODO: currently we load up the suggester for reporting its size
92+
final long fstSize = ((CompletionTerms) terms).suggester().ramBytesUsed();
93+
completionFields.addTo(info.name, fstSize);
94+
sizeInBytes += fstSize;
95+
}
96+
}
97+
}
98+
}
99+
100+
return new CompletionStats(sizeInBytes, new FieldMemoryStats(completionFields));
101+
});
102+
103+
boolean success = false;
104+
final CompletionStats completionStats;
105+
try {
106+
completionStats = newFuture.actionGet();
107+
success = true;
108+
} finally {
109+
if (success == false) {
110+
// invalidate the cache (if not already invalidated) so that future calls will retry
111+
112+
// completionStatsFutureRef.compareAndSet(newFuture, null); except we're not using AtomicReference in JDK8
113+
synchronized (completionStatsFutureMutex) {
114+
if (completionStatsFuture == newFuture) {
115+
completionStatsFuture = null;
116+
}
117+
}
118+
}
119+
}
120+
121+
return filterCompletionStatsByFieldName(fieldNamePatterns, completionStats);
122+
}
123+
124+
private static CompletionStats filterCompletionStatsByFieldName(String[] fieldNamePatterns, CompletionStats fullCompletionStats) {
125+
final FieldMemoryStats fieldMemoryStats;
126+
if (fieldNamePatterns != null && fieldNamePatterns.length > 0) {
127+
final ObjectLongHashMap<String> completionFields = new ObjectLongHashMap<>(fieldNamePatterns.length);
128+
for (ObjectLongCursor<String> fieldCursor : fullCompletionStats.getFields()) {
129+
if (Regex.simpleMatch(fieldNamePatterns, fieldCursor.key)) {
130+
completionFields.addTo(fieldCursor.key, fieldCursor.value);
131+
}
132+
}
133+
fieldMemoryStats = new FieldMemoryStats(completionFields);
134+
} else {
135+
fieldMemoryStats = null;
136+
}
137+
return new CompletionStats(fullCompletionStats.getSizeInBytes(), fieldMemoryStats);
138+
}
139+
140+
@Override
141+
public void beforeRefresh() {
142+
}
143+
144+
@Override
145+
public void afterRefresh(boolean didRefresh) {
146+
if (didRefresh) {
147+
// completionStatsFutureRef.set(null); except we're not using AtomicReference in JDK8
148+
synchronized (completionStatsFutureMutex) {
149+
completionStatsFuture = null;
150+
}
151+
}
152+
}
153+
}

server/src/main/java/org/elasticsearch/index/engine/Engine.java

+1-31
Original file line numberDiff line numberDiff line change
@@ -19,27 +19,22 @@
1919

2020
package org.elasticsearch.index.engine;
2121

22-
import com.carrotsearch.hppc.ObjectLongHashMap;
2322
import org.apache.logging.log4j.Logger;
2423
import org.apache.logging.log4j.message.ParameterizedMessage;
2524
import org.apache.lucene.index.DirectoryReader;
26-
import org.apache.lucene.index.FieldInfo;
2725
import org.apache.lucene.index.IndexCommit;
2826
import org.apache.lucene.index.IndexFileNames;
2927
import org.apache.lucene.index.IndexReader;
30-
import org.apache.lucene.index.LeafReader;
3128
import org.apache.lucene.index.LeafReaderContext;
3229
import org.apache.lucene.index.SegmentCommitInfo;
3330
import org.apache.lucene.index.SegmentInfos;
3431
import org.apache.lucene.index.SegmentReader;
3532
import org.apache.lucene.index.Term;
36-
import org.apache.lucene.index.Terms;
3733
import org.apache.lucene.search.IndexSearcher;
3834
import org.apache.lucene.search.QueryCache;
3935
import org.apache.lucene.search.QueryCachingPolicy;
4036
import org.apache.lucene.search.ReferenceManager;
4137
import org.apache.lucene.search.similarities.Similarity;
42-
import org.apache.lucene.search.suggest.document.CompletionTerms;
4338
import org.apache.lucene.store.AlreadyClosedException;
4439
import org.apache.lucene.store.Directory;
4540
import org.apache.lucene.store.IOContext;
@@ -49,7 +44,6 @@
4944
import org.elasticsearch.ExceptionsHelper;
5045
import org.elasticsearch.action.index.IndexRequest;
5146
import org.elasticsearch.common.CheckedRunnable;
52-
import org.elasticsearch.common.FieldMemoryStats;
5347
import org.elasticsearch.common.Nullable;
5448
import org.elasticsearch.common.bytes.BytesReference;
5549
import org.elasticsearch.common.collect.ImmutableOpenMap;
@@ -65,7 +59,6 @@
6559
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
6660
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
6761
import org.elasticsearch.common.metrics.CounterMetric;
68-
import org.elasticsearch.common.regex.Regex;
6962
import org.elasticsearch.common.unit.ByteSizeValue;
7063
import org.elasticsearch.common.unit.TimeValue;
7164
import org.elasticsearch.common.util.concurrent.ReleasableLock;
@@ -185,30 +178,7 @@ public MergeStats getMergeStats() {
185178
/**
186179
* Returns the {@link CompletionStats} for this engine
187180
*/
188-
public CompletionStats completionStats(String... fieldNamePatterns) throws IOException {
189-
try (Searcher currentSearcher = acquireSearcher("completion_stats", SearcherScope.INTERNAL)) {
190-
long sizeInBytes = 0;
191-
ObjectLongHashMap<String> completionFields = null;
192-
if (fieldNamePatterns != null && fieldNamePatterns.length > 0) {
193-
completionFields = new ObjectLongHashMap<>(fieldNamePatterns.length);
194-
}
195-
for (LeafReaderContext atomicReaderContext : currentSearcher.getIndexReader().leaves()) {
196-
LeafReader atomicReader = atomicReaderContext.reader();
197-
for (FieldInfo info : atomicReader.getFieldInfos()) {
198-
Terms terms = atomicReader.terms(info.name);
199-
if (terms instanceof CompletionTerms) {
200-
// TODO: currently we load up the suggester for reporting its size
201-
long fstSize = ((CompletionTerms) terms).suggester().ramBytesUsed();
202-
if (Regex.simpleMatch(fieldNamePatterns, info.name)) {
203-
completionFields.addTo(info.name, fstSize);
204-
}
205-
sizeInBytes += fstSize;
206-
}
207-
}
208-
}
209-
return new CompletionStats(sizeInBytes, completionFields == null ? null : new FieldMemoryStats(completionFields));
210-
}
211-
}
181+
public abstract CompletionStats completionStats(String... fieldNamePatterns);
212182

213183
/**
214184
* Returns the {@link DocsStats} for this engine

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

+10
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101
import org.elasticsearch.index.translog.TranslogCorruptedException;
102102
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
103103
import org.elasticsearch.index.translog.TranslogStats;
104+
import org.elasticsearch.search.suggest.completion.CompletionStats;
104105
import org.elasticsearch.threadpool.ThreadPool;
105106

106107
import java.io.Closeable;
@@ -180,6 +181,8 @@ public class InternalEngine extends Engine {
180181
private final SoftDeletesPolicy softDeletesPolicy;
181182
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;
182183

184+
private final CompletionStatsCache completionStatsCache;
185+
183186
private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false);
184187
private final KeyedLock<Long> noOpKeyedLock = new KeyedLock<>();
185188
private final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false);
@@ -272,6 +275,8 @@ public InternalEngine(EngineConfig engineConfig) {
272275
"failed to restore version map and local checkpoint tracker", e);
273276
}
274277
}
278+
completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats"));
279+
this.externalReaderManager.addListener(completionStatsCache);
275280
success = true;
276281
} finally {
277282
if (success == false) {
@@ -312,6 +317,11 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
312317
engineConfig.retentionLeasesSupplier());
313318
}
314319

320+
@Override
321+
public CompletionStats completionStats(String... fieldNamePatterns) {
322+
return completionStatsCache.get(fieldNamePatterns);
323+
}
324+
315325
/**
316326
* This reference manager delegates all it's refresh calls to another (internal) ReaderManager
317327
* The main purpose for this is that if we have external refreshes happening we don't issue extra

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

+11
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.index.translog.TranslogConfig;
4343
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
4444
import org.elasticsearch.index.translog.TranslogStats;
45+
import org.elasticsearch.search.suggest.completion.CompletionStats;
4546

4647
import java.io.Closeable;
4748
import java.io.IOException;
@@ -78,6 +79,7 @@ public class ReadOnlyEngine extends Engine {
7879
private final DocsStats docsStats;
7980
private final RamAccountingRefreshListener refreshListener;
8081
private final SafeCommitInfo safeCommitInfo;
82+
private final CompletionStatsCache completionStatsCache;
8183

8284
protected volatile TranslogStats translogStats;
8385

@@ -122,6 +124,10 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
122124
this.translogStats = translogStats != null ? translogStats : translogStats(config, lastCommittedSegmentInfos);
123125
this.indexWriterLock = indexWriterLock;
124126
this.safeCommitInfo = new SafeCommitInfo(seqNoStats.getLocalCheckpoint(), lastCommittedSegmentInfos.totalMaxDoc());
127+
128+
completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats"));
129+
// no need to register a refresh listener to invalidate completionStatsCache since this engine is readonly
130+
125131
success = true;
126132
} finally {
127133
if (success == false) {
@@ -542,4 +548,9 @@ protected static DirectoryReader openDirectory(Directory directory, boolean wrap
542548
return reader;
543549
}
544550
}
551+
552+
@Override
553+
public CompletionStats completionStats(String... fieldNamePatterns) {
554+
return completionStatsCache.get(fieldNamePatterns);
555+
}
545556
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@
154154
import java.io.Closeable;
155155
import java.io.IOException;
156156
import java.io.PrintStream;
157-
import java.io.UncheckedIOException;
158157
import java.nio.channels.ClosedByInterruptException;
159158
import java.nio.charset.StandardCharsets;
160159
import java.util.ArrayList;
@@ -1062,11 +1061,7 @@ public TranslogStats translogStats() {
10621061

10631062
public CompletionStats completionStats(String... fields) {
10641063
readAllowed();
1065-
try {
1066-
return getEngine().completionStats(fields);
1067-
} catch (IOException e) {
1068-
throw new UncheckedIOException(e);
1069-
}
1064+
return getEngine().completionStats(fields);
10701065
}
10711066

10721067
public Engine.SyncedFlushResult syncFlush(String syncId, Engine.CommitId expectedCommitId) {

0 commit comments

Comments
 (0)