Skip to content

Commit c24e4ae

Browse files
authored
Enhancements to IndicesQueryCache. (#39099)
This commit adds the following: - more tests to IndicesServiceCloseTests, one of them found a bug in the order in which `IndicesQueryCache#onClose` and `IndicesService.indicesRefCount#decRef` are called. - made `IndicesQueryCache.stats2` a synchronized map. All writes to it are already protected by the lock of the Lucene cache, but the final read from an assertion in `IndicesQueryCache#close()` was not so this change should avoid any potential visibility issues. - human-readable `toString`s to make debugging easier. Relates #37117
1 parent 6afb8c7 commit c24e4ae

File tree

3 files changed

+195
-7
lines changed

3 files changed

+195
-7
lines changed

server/src/main/java/org/elasticsearch/indices/IndicesQueryCache.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
import java.io.Closeable;
4444
import java.io.IOException;
45+
import java.util.Collections;
4546
import java.util.HashMap;
4647
import java.util.IdentityHashMap;
4748
import java.util.Map;
@@ -71,7 +72,7 @@ public class IndicesQueryCache implements QueryCache, Closeable {
7172
// This is a hack for the fact that the close listener for the
7273
// ShardCoreKeyMap will be called before onDocIdSetEviction
7374
// See onDocIdSetEviction for more info
74-
private final Map<Object, StatsAndCount> stats2 = new IdentityHashMap<>();
75+
private final Map<Object, StatsAndCount> stats2 = Collections.synchronizedMap(new IdentityHashMap<>());
7576

7677
public IndicesQueryCache(Settings settings) {
7778
final ByteSizeValue size = INDICES_CACHE_QUERY_SIZE_SETTING.get(settings);
@@ -189,20 +190,35 @@ public void close() {
189190
assert shardKeyMap.size() == 0 : shardKeyMap.size();
190191
assert shardStats.isEmpty() : shardStats.keySet();
191192
assert stats2.isEmpty() : stats2;
193+
194+
// This cache stores two things: filters, and doc id sets. At this time
195+
// we only know that there are no more doc id sets, but we still track
196+
// recently used queries, which we want to reclaim.
192197
cache.clear();
193198
}
194199

195200
private static class Stats implements Cloneable {
196201

202+
final ShardId shardId;
197203
volatile long ramBytesUsed;
198204
volatile long hitCount;
199205
volatile long missCount;
200206
volatile long cacheCount;
201207
volatile long cacheSize;
202208

209+
Stats(ShardId shardId) {
210+
this.shardId = shardId;
211+
}
212+
203213
QueryCacheStats toQueryCacheStats() {
204214
return new QueryCacheStats(ramBytesUsed, hitCount, missCount, cacheCount, cacheSize);
205215
}
216+
217+
@Override
218+
public String toString() {
219+
return "{shardId=" + shardId + ", ramBytedUsed=" + ramBytesUsed + ", hitCount=" + hitCount + ", missCount=" + missCount +
220+
", cacheCount=" + cacheCount + ", cacheSize=" + cacheSize + "}";
221+
}
206222
}
207223

208224
private static class StatsAndCount {
@@ -213,6 +229,11 @@ private static class StatsAndCount {
213229
this.stats = stats;
214230
this.count = 0;
215231
}
232+
233+
@Override
234+
public String toString() {
235+
return "{stats=" + stats + " ,count=" + count + "}";
236+
}
216237
}
217238

218239
private boolean empty(Stats stats) {
@@ -249,7 +270,7 @@ private Stats getOrCreateStats(Object coreKey) {
249270
final ShardId shardId = shardKeyMap.getShardId(coreKey);
250271
Stats stats = shardStats.get(shardId);
251272
if (stats == null) {
252-
stats = new Stats();
273+
stats = new Stats(shardId);
253274
shardStats.put(shardId, stats);
254275
}
255276
return stats;
@@ -265,6 +286,7 @@ protected void onClear() {
265286
stats.cacheSize = 0;
266287
stats.ramBytesUsed = 0;
267288
}
289+
stats2.clear();
268290
sharedRamBytesUsed = 0;
269291
}
270292

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ public class IndicesService extends AbstractLifecycleComponent
192192
private final NamedWriteableRegistry namedWriteableRegistry;
193193
private final IndexingMemoryController indexingMemoryController;
194194
private final TimeValue cleanInterval;
195-
private final IndicesRequestCache indicesRequestCache;
195+
final IndicesRequestCache indicesRequestCache; // pkg-private for testing
196196
private final IndicesQueryCache indicesQueryCache;
197197
private final MetaStateService metaStateService;
198198
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
@@ -481,9 +481,9 @@ public void onStoreCreated(ShardId shardId) {
481481
@Override
482482
public void onStoreClosed(ShardId shardId) {
483483
try {
484-
indicesRefCount.decRef();
485-
} finally {
486484
indicesQueryCache.onClose(shardId);
485+
} finally {
486+
indicesRefCount.decRef();
487487
}
488488
}
489489
};

server/src/test/java/org/elasticsearch/indices/IndicesServiceCloseTests.java

Lines changed: 168 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,37 @@
1919

2020
package org.elasticsearch.indices;
2121

22+
import org.apache.lucene.document.LongPoint;
23+
import org.apache.lucene.search.Query;
2224
import org.elasticsearch.cluster.ClusterName;
2325
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
26+
import org.elasticsearch.common.bytes.BytesArray;
27+
import org.elasticsearch.common.bytes.BytesReference;
28+
import org.elasticsearch.common.cache.RemovalNotification;
2429
import org.elasticsearch.common.settings.Settings;
2530
import org.elasticsearch.common.util.concurrent.EsExecutors;
2631
import org.elasticsearch.env.Environment;
2732
import org.elasticsearch.env.NodeEnvironment;
33+
import org.elasticsearch.index.IndexModule;
2834
import org.elasticsearch.index.IndexService;
35+
import org.elasticsearch.index.engine.Engine.Searcher;
2936
import org.elasticsearch.index.shard.IndexShard;
37+
import org.elasticsearch.indices.IndicesRequestCache.Key;
3038
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
3139
import org.elasticsearch.node.MockNode;
3240
import org.elasticsearch.node.Node;
3341
import org.elasticsearch.node.NodeValidationException;
3442
import org.elasticsearch.script.ScriptService;
3543
import org.elasticsearch.test.ESTestCase;
44+
import org.elasticsearch.test.InternalSettingsPlugin;
3645
import org.elasticsearch.test.InternalTestCluster;
3746
import org.elasticsearch.test.MockHttpTransport;
47+
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
3848
import org.elasticsearch.transport.nio.MockNioTransportPlugin;
3949

4050
import java.nio.file.Path;
4151
import java.util.Arrays;
52+
import java.util.Collections;
4253

4354
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
4455
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
@@ -71,9 +82,11 @@ private Node startNode() throws NodeValidationException {
7182
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false)
7283
.putList(DISCOVERY_SEED_HOSTS_SETTING.getKey()) // empty list disables a port scan for other nodes
7384
.putList(INITIAL_MASTER_NODES_SETTING.getKey(), nodeName)
85+
.put(IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING.getKey(), true)
7486
.build();
7587

76-
Node node = new MockNode(settings, Arrays.asList(MockNioTransportPlugin.class, MockHttpTransport.TestPlugin.class), true);
88+
Node node = new MockNode(settings,
89+
Arrays.asList(MockNioTransportPlugin.class, MockHttpTransport.TestPlugin.class, InternalSettingsPlugin.class), true);
7790
node.start();
7891
return node;
7992
}
@@ -100,7 +113,7 @@ public void testCloseNonEmptyIndicesService() throws Exception {
100113
assertEquals(0, indicesService.indicesRefCount.refCount());
101114
}
102115

103-
public void testCloseWhileOngoingRequest() throws Exception {
116+
public void testCloseWithIncedRefStore() throws Exception {
104117
Node node = startNode();
105118
IndicesService indicesService = node.injector().getInstance(IndicesService.class);
106119
assertEquals(1, indicesService.indicesRefCount.refCount());
@@ -121,4 +134,157 @@ public void testCloseWhileOngoingRequest() throws Exception {
121134
assertEquals(0, indicesService.indicesRefCount.refCount());
122135
}
123136

137+
public void testCloseWhileOngoingRequest() throws Exception {
138+
Node node = startNode();
139+
IndicesService indicesService = node.injector().getInstance(IndicesService.class);
140+
assertEquals(1, indicesService.indicesRefCount.refCount());
141+
142+
assertAcked(node.client().admin().indices().prepareCreate("test")
143+
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)));
144+
node.client().prepareIndex("test", "_doc", "1").setSource(Collections.emptyMap()).get();
145+
ElasticsearchAssertions.assertAllSuccessful(node.client().admin().indices().prepareRefresh("test").get());
146+
147+
assertEquals(2, indicesService.indicesRefCount.refCount());
148+
149+
IndexService indexService = indicesService.iterator().next();
150+
IndexShard shard = indexService.getShard(0);
151+
Searcher searcher = shard.acquireSearcher("test");
152+
assertEquals(1, searcher.reader().maxDoc());
153+
154+
node.close();
155+
assertEquals(1, indicesService.indicesRefCount.refCount());
156+
157+
searcher.close();
158+
assertEquals(0, indicesService.indicesRefCount.refCount());
159+
}
160+
161+
public void testCloseAfterRequestHasUsedQueryCache() throws Exception {
162+
Node node = startNode();
163+
IndicesService indicesService = node.injector().getInstance(IndicesService.class);
164+
assertEquals(1, indicesService.indicesRefCount.refCount());
165+
166+
assertAcked(node.client().admin().indices().prepareCreate("test")
167+
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1)
168+
.put(SETTING_NUMBER_OF_REPLICAS, 0)
169+
.put(IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.getKey(), true)));
170+
node.client().prepareIndex("test", "_doc", "1").setSource(Collections.singletonMap("foo", 3L)).get();
171+
ElasticsearchAssertions.assertAllSuccessful(node.client().admin().indices().prepareRefresh("test").get());
172+
173+
assertEquals(2, indicesService.indicesRefCount.refCount());
174+
175+
IndicesQueryCache cache = indicesService.getIndicesQueryCache();
176+
177+
IndexService indexService = indicesService.iterator().next();
178+
IndexShard shard = indexService.getShard(0);
179+
Searcher searcher = shard.acquireSearcher("test");
180+
assertEquals(1, searcher.reader().maxDoc());
181+
182+
Query query = LongPoint.newRangeQuery("foo", 0, 5);
183+
assertEquals(0L, cache.getStats(shard.shardId()).getCacheSize());
184+
searcher.searcher().count(query);
185+
assertEquals(1L, cache.getStats(shard.shardId()).getCacheSize());
186+
187+
searcher.close();
188+
assertEquals(2, indicesService.indicesRefCount.refCount());
189+
assertEquals(1L, cache.getStats(shard.shardId()).getCacheSize());
190+
191+
node.close();
192+
assertEquals(0, indicesService.indicesRefCount.refCount());
193+
assertEquals(0L, cache.getStats(shard.shardId()).getCacheSize());
194+
}
195+
196+
public void testCloseWhileOngoingRequestUsesQueryCache() throws Exception {
197+
Node node = startNode();
198+
IndicesService indicesService = node.injector().getInstance(IndicesService.class);
199+
assertEquals(1, indicesService.indicesRefCount.refCount());
200+
201+
assertAcked(node.client().admin().indices().prepareCreate("test")
202+
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1)
203+
.put(SETTING_NUMBER_OF_REPLICAS, 0)
204+
.put(IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.getKey(), true)));
205+
node.client().prepareIndex("test", "_doc", "1").setSource(Collections.singletonMap("foo", 3L)).get();
206+
ElasticsearchAssertions.assertAllSuccessful(node.client().admin().indices().prepareRefresh("test").get());
207+
208+
assertEquals(2, indicesService.indicesRefCount.refCount());
209+
210+
IndicesQueryCache cache = indicesService.getIndicesQueryCache();
211+
212+
IndexService indexService = indicesService.iterator().next();
213+
IndexShard shard = indexService.getShard(0);
214+
Searcher searcher = shard.acquireSearcher("test");
215+
assertEquals(1, searcher.reader().maxDoc());
216+
217+
node.close();
218+
assertEquals(1, indicesService.indicesRefCount.refCount());
219+
220+
Query query = LongPoint.newRangeQuery("foo", 0, 5);
221+
assertEquals(0L, cache.getStats(shard.shardId()).getCacheSize());
222+
searcher.searcher().count(query);
223+
assertEquals(1L, cache.getStats(shard.shardId()).getCacheSize());
224+
225+
searcher.close();
226+
assertEquals(0, indicesService.indicesRefCount.refCount());
227+
assertEquals(0L, cache.getStats(shard.shardId()).getCacheSize());
228+
}
229+
230+
public void testCloseWhileOngoingRequestUsesRequestCache() throws Exception {
231+
Node node = startNode();
232+
IndicesService indicesService = node.injector().getInstance(IndicesService.class);
233+
assertEquals(1, indicesService.indicesRefCount.refCount());
234+
235+
assertAcked(node.client().admin().indices().prepareCreate("test")
236+
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1)
237+
.put(SETTING_NUMBER_OF_REPLICAS, 0)
238+
.put(IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.getKey(), true)));
239+
node.client().prepareIndex("test", "_doc", "1").setSource(Collections.singletonMap("foo", 3L)).get();
240+
ElasticsearchAssertions.assertAllSuccessful(node.client().admin().indices().prepareRefresh("test").get());
241+
242+
assertEquals(2, indicesService.indicesRefCount.refCount());
243+
244+
IndicesRequestCache cache = indicesService.indicesRequestCache;
245+
246+
IndexService indexService = indicesService.iterator().next();
247+
IndexShard shard = indexService.getShard(0);
248+
Searcher searcher = shard.acquireSearcher("test");
249+
assertEquals(1, searcher.reader().maxDoc());
250+
251+
node.close();
252+
assertEquals(1, indicesService.indicesRefCount.refCount());
253+
254+
assertEquals(0L, cache.count());
255+
IndicesRequestCache.CacheEntity cacheEntity = new IndicesRequestCache.CacheEntity() {
256+
@Override
257+
public long ramBytesUsed() {
258+
return 42;
259+
}
260+
261+
@Override
262+
public void onCached(Key key, BytesReference value) {}
263+
264+
@Override
265+
public boolean isOpen() {
266+
return true;
267+
}
268+
269+
@Override
270+
public Object getCacheIdentity() {
271+
return this;
272+
}
273+
274+
@Override
275+
public void onHit() {}
276+
277+
@Override
278+
public void onMiss() {}
279+
280+
@Override
281+
public void onRemoval(RemovalNotification<Key, BytesReference> notification) {}
282+
};
283+
cache.getOrCompute(cacheEntity, () -> new BytesArray("bar"), searcher.getDirectoryReader(), new BytesArray("foo"), () -> "foo");
284+
assertEquals(1L, cache.count());
285+
286+
searcher.close();
287+
assertEquals(0, indicesService.indicesRefCount.refCount());
288+
assertEquals(0L, cache.count());
289+
}
124290
}

0 commit comments

Comments
 (0)