From 77f54f55ad5c760742d85feba85207ba42a10d62 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Sat, 28 Jun 2014 16:54:32 +0200 Subject: [PATCH 1/4] Query Cache: Support shard level query response caching The query cache allow to cache the (binary serialized) response of the shard level query phase execution based on the actual request as the key. The cache is fully coherent with the semantics of NRT, with a refresh (that actually ended up refreshing) causing previous cached entries on the relevant shard to be invalidated and eventually evicted. This change enables query caching as an opt in index level setting, called `index.cache.query.enable` and defaults to `false`. The setting can be changed dynamically on an index. The cache is only enabled for search requests with search_type count. The indices query cache is a node level query cache. The `indices.cache.query.size` controls what is the size (bytes wise) the cache will take, and defaults to `1%` of the heap. Note, this cache is very effective with small values in it already. There is also the advanced option to set `indices.cache.query.expire` that allow to control after a certain time of inaccessibility the cache will be evicted. Note, the request takes the search "body" as is (bytes), and uses it as the key. This means same JSON but with different key order will constitute different cache entries. This change includes basic stats (shard level, index/indices level, and node level) for the query cache, showing how much is used and eviction rates. While this is a good first step, and the goal is to get it in, there are a few things that would be great additions to this work, but they can be done as additional pull requests: - More stats, specifically cache hit and cache miss, per shard. - Request level flag, defaults to "not set" (inheriting what the setting is). - Allowing to change the cache size using the cluster update settings API - Consider enabling the cache to query phase also when asking hits are involved, note, this will only include the "top docs", not the actual hits. - See if there is a performant manner to solve the "out of order" of keys in the JSON case. - Maybe introduce a filter element, that is outside of the request, that is checked, and if it matches all docs in a shard, will not be used as part of the key. This will help with time based indices and moving windows for shards that fall "inside" the window to be more effective caching wise. - Add a more infra level support in search context that allows for any element to mark the search as non deterministic (on top of the support for "now"), and use it to not cache search responses. closes #7161 --- .../cache/clear/ClearIndicesCacheRequest.java | 19 +- .../ClearIndicesCacheRequestBuilder.java | 5 + .../clear/ShardClearIndicesCacheRequest.java | 13 + .../TransportClearIndicesCacheAction.java | 14 +- .../admin/indices/stats/CommonStats.java | 32 ++ .../admin/indices/stats/CommonStatsFlags.java | 3 +- .../indices/stats/IndicesStatsRequest.java | 9 + .../stats/IndicesStatsRequestBuilder.java | 5 + .../stats/TransportIndicesStatsAction.java | 3 + .../type/TransportSearchCountAction.java | 5 +- .../TransportSearchQueryThenFetchAction.java | 9 +- .../index/cache/query/QueryCacheStats.java | 91 ++++ .../index/cache/query/ShardQueryCache.java | 71 +++ .../cache/query/ShardQueryCacheModule.java | 32 ++ .../index/service/InternalIndexService.java | 2 + .../settings/IndexDynamicSettingsModule.java | 2 + .../index/shard/service/IndexShard.java | 3 + .../shard/service/InternalIndexShard.java | 10 +- .../elasticsearch/indices/IndicesModule.java | 2 + .../indices/NodeIndicesStats.java | 6 + .../cache/filter/IndicesFilterCache.java | 2 +- .../cache/query/IndicesQueryCache.java | 495 ++++++++++++++++++ .../percolator/PercolateContext.java | 2 +- .../indices/stats/RestIndicesStatsAction.java | 1 + .../rest/action/cat/RestIndicesAction.java | 12 + .../rest/action/cat/RestNodesAction.java | 6 + .../elasticsearch/search/SearchService.java | 29 +- .../action/SearchServiceTransportAction.java | 13 +- .../metrics/tophits/TopHitsContext.java | 2 +- .../search/fetch/QueryFetchSearchResult.java | 2 +- .../search/internal/DefaultSearchContext.java | 2 +- .../search/internal/SearchContext.java | 13 +- .../search/internal/ShardSearchRequest.java | 12 +- .../search/query/QuerySearchResult.java | 13 +- .../query/QuerySearchResultProvider.java | 7 +- .../indices/cache/CacheTests.java | 177 ------- ...exStatsTests.java => IndexStatsTests.java} | 206 +++++++- .../test/ElasticsearchIntegrationTest.java | 5 + .../elasticsearch/test/TestSearchContext.java | 2 +- 39 files changed, 1120 insertions(+), 217 deletions(-) create mode 100644 src/main/java/org/elasticsearch/index/cache/query/QueryCacheStats.java create mode 100644 src/main/java/org/elasticsearch/index/cache/query/ShardQueryCache.java create mode 100644 src/main/java/org/elasticsearch/index/cache/query/ShardQueryCacheModule.java create mode 100644 src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java delete mode 100644 src/test/java/org/elasticsearch/indices/cache/CacheTests.java rename src/test/java/org/elasticsearch/indices/stats/{SimpleIndexStatsTests.java => IndexStatsTests.java} (66%) diff --git a/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ClearIndicesCacheRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ClearIndicesCacheRequest.java index 3aa7ddb1c9b72..381c3f33fe7e4 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ClearIndicesCacheRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ClearIndicesCacheRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.cache.clear; +import org.elasticsearch.Version; import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -34,6 +35,7 @@ public class ClearIndicesCacheRequest extends BroadcastOperationRequest listener) { client.stats(request, listener); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java b/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java index a501a6a4d1e1a..6408a5ce6c0d6 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java @@ -192,6 +192,9 @@ protected ShardStats shardOperation(IndexShardStatsRequest request) throws Elast if (request.request.suggest()) { flags.set(CommonStatsFlags.Flag.Suggest); } + if (request.request.queryCache()) { + flags.set(CommonStatsFlags.Flag.QueryCache); + } return new ShardStats(indexShard, flags); } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchCountAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchCountAction.java index 9d1820df7bd42..3b2d818c3f88a 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchCountAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchCountAction.java @@ -35,6 +35,7 @@ import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.threadpool.ThreadPool; import static org.elasticsearch.action.search.type.TransportSearchHelper.buildScrollId; @@ -55,7 +56,7 @@ protected void doExecute(SearchRequest searchRequest, ActionListener { + private class AsyncAction extends BaseAsyncAction { private AsyncAction(SearchRequest request, ActionListener listener) { super(request, listener); @@ -67,7 +68,7 @@ protected String firstPhaseName() { } @Override - protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener listener) { + protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener listener) { searchService.sendExecuteQuery(node, request, listener); } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java index d07e6eaa07e52..022bf11af282c 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java @@ -40,6 +40,7 @@ import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.threadpool.ThreadPool; import java.util.concurrent.atomic.AtomicInteger; @@ -60,7 +61,7 @@ protected void doExecute(SearchRequest searchRequest, ActionListener { + private class AsyncAction extends BaseAsyncAction { final AtomicArray fetchResults; final AtomicArray docIdsToLoad; @@ -77,7 +78,7 @@ protected String firstPhaseName() { } @Override - protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener listener) { + protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener listener) { searchService.sendExecuteQuery(node, request, listener); } @@ -97,9 +98,9 @@ protected void moveToSecondPhase() throws Exception { ); final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); for (AtomicArray.Entry entry : docIdsToLoad.asList()) { - QuerySearchResult queryResult = firstResults.get(entry.index); + QuerySearchResultProvider queryResult = firstResults.get(entry.index); DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); + FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard); executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } } diff --git a/src/main/java/org/elasticsearch/index/cache/query/QueryCacheStats.java b/src/main/java/org/elasticsearch/index/cache/query/QueryCacheStats.java new file mode 100644 index 0000000000000..7372781c28cf0 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/cache/query/QueryCacheStats.java @@ -0,0 +1,91 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.cache.query; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; + +import java.io.IOException; + +/** + */ +public class QueryCacheStats implements Streamable, ToXContent { + + long memorySize; + long evictions; + + public QueryCacheStats() { + } + + public QueryCacheStats(long memorySize, long evictions) { + this.memorySize = memorySize; + this.evictions = evictions; + } + + public void add(QueryCacheStats stats) { + this.memorySize += stats.memorySize; + this.evictions += stats.evictions; + } + + public long getMemorySizeInBytes() { + return this.memorySize; + } + + public ByteSizeValue getMemorySize() { + return new ByteSizeValue(memorySize); + } + + public long getEvictions() { + return this.evictions; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + memorySize = in.readVLong(); + evictions = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(memorySize); + out.writeVLong(evictions); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.QueryCacheStats); + builder.byteSizeField(Fields.MEMORY_SIZE_IN_BYTES, Fields.MEMORY_SIZE, memorySize); + builder.field(Fields.EVICTIONS, getEvictions()); + builder.endObject(); + return builder; + } + + static final class Fields { + static final XContentBuilderString QueryCacheStats = new XContentBuilderString("query_cache"); + static final XContentBuilderString MEMORY_SIZE = new XContentBuilderString("memory_size"); + static final XContentBuilderString MEMORY_SIZE_IN_BYTES = new XContentBuilderString("memory_size_in_bytes"); + static final XContentBuilderString EVICTIONS = new XContentBuilderString("evictions"); + } +} diff --git a/src/main/java/org/elasticsearch/index/cache/query/ShardQueryCache.java b/src/main/java/org/elasticsearch/index/cache/query/ShardQueryCache.java new file mode 100644 index 0000000000000..819b31100839d --- /dev/null +++ b/src/main/java/org/elasticsearch/index/cache/query/ShardQueryCache.java @@ -0,0 +1,71 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.cache.query; + +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import org.apache.lucene.search.DocIdSet; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.lucene.docset.DocIdSets; +import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.cache.filter.FilterCacheStats; +import org.elasticsearch.index.cache.filter.weighted.WeightedFilterCache; +import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.cache.query.IndicesQueryCache; + +/** + */ +public class ShardQueryCache extends AbstractIndexShardComponent implements RemovalListener { + + final CounterMetric evictionsMetric = new CounterMetric(); + final CounterMetric totalMetric = new CounterMetric(); + + @Inject + public ShardQueryCache(ShardId shardId, @IndexSettings Settings indexSettings) { + super(shardId, indexSettings); + } + + public QueryCacheStats stats() { + return new QueryCacheStats(totalMetric.count(), evictionsMetric.count()); + } + + public void onCached(IndicesQueryCache.Key key, BytesReference value) { + totalMetric.inc(key.ramBytesUsed() + value.length()); + } + + @Override + public void onRemoval(RemovalNotification removalNotification) { + if (removalNotification.wasEvicted()) { + evictionsMetric.inc(); + } + long dec = 0; + if (removalNotification.getKey() != null) { + dec += removalNotification.getKey().ramBytesUsed(); + } + if (removalNotification.getValue() != null) { + dec += removalNotification.getValue().length(); + } + totalMetric.dec(dec); + } +} diff --git a/src/main/java/org/elasticsearch/index/cache/query/ShardQueryCacheModule.java b/src/main/java/org/elasticsearch/index/cache/query/ShardQueryCacheModule.java new file mode 100644 index 0000000000000..938f016a8c35f --- /dev/null +++ b/src/main/java/org/elasticsearch/index/cache/query/ShardQueryCacheModule.java @@ -0,0 +1,32 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.cache.query; + +import org.elasticsearch.common.inject.AbstractModule; + +/** + */ +public class ShardQueryCacheModule extends AbstractModule { + + @Override + protected void configure() { + bind(ShardQueryCache.class).asEagerSingleton(); + } +} diff --git a/src/main/java/org/elasticsearch/index/service/InternalIndexService.java b/src/main/java/org/elasticsearch/index/service/InternalIndexService.java index 2f0358f43f565..9c7aba8028b4f 100644 --- a/src/main/java/org/elasticsearch/index/service/InternalIndexService.java +++ b/src/main/java/org/elasticsearch/index/service/InternalIndexService.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.filter.ShardFilterCacheModule; +import org.elasticsearch.index.cache.query.ShardQueryCacheModule; import org.elasticsearch.index.deletionpolicy.DeletionPolicyModule; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineModule; @@ -329,6 +330,7 @@ public synchronized IndexShard createShard(int sShardId) throws ElasticsearchExc modules.add(new MergePolicyModule(indexSettings)); modules.add(new MergeSchedulerModule(indexSettings)); modules.add(new ShardFilterCacheModule()); + modules.add(new ShardQueryCacheModule()); modules.add(new ShardFieldDataModule()); modules.add(new TranslogModule(indexSettings)); modules.add(new EngineModule(indexSettings)); diff --git a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java index 30ece8cad6faf..0b0ff4e2826bf 100644 --- a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java +++ b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java @@ -40,6 +40,7 @@ import org.elasticsearch.index.store.support.AbstractIndexStore; import org.elasticsearch.index.translog.TranslogService; import org.elasticsearch.index.translog.fs.FsTranslog; +import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.indices.ttl.IndicesTTLService; import org.elasticsearch.indices.warmer.InternalIndicesWarmer; @@ -119,6 +120,7 @@ public IndexDynamicSettingsModule() { indexDynamicSettings.addDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD, Validator.TIME); indexDynamicSettings.addDynamicSetting(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH); indexDynamicSettings.addDynamicSetting(InternalIndicesWarmer.INDEX_WARMER_ENABLED); + indexDynamicSettings.addDynamicSetting(IndicesQueryCache.INDEX_QUERY_CACHE_ENABLED, Validator.BOOLEAN); } public void addDynamicSettings(String... settings) { diff --git a/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java index 4a5ea5bd620bc..a0d35430ee294 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java @@ -27,6 +27,7 @@ import org.elasticsearch.index.cache.filter.FilterCacheStats; import org.elasticsearch.index.cache.filter.ShardFilterCache; import org.elasticsearch.index.cache.id.IdCacheStats; +import org.elasticsearch.index.cache.query.ShardQueryCache; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; @@ -77,6 +78,8 @@ public interface IndexShard extends IndexShardComponent { ShardFilterCache filterCache(); + ShardQueryCache queryCache(); + ShardFieldData fieldData(); ShardRouting routingEntry(); diff --git a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index daa2bcd98b43b..6b24570bd7a68 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -50,6 +50,7 @@ import org.elasticsearch.index.cache.filter.FilterCacheStats; import org.elasticsearch.index.cache.filter.ShardFilterCache; import org.elasticsearch.index.cache.id.IdCacheStats; +import org.elasticsearch.index.cache.query.ShardQueryCache; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.engine.*; @@ -121,6 +122,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I private final ShardGetService getService; private final ShardIndexWarmerService shardWarmerService; private final ShardFilterCache shardFilterCache; + private final ShardQueryCache shardQueryCache; private final ShardFieldData shardFieldData; private final PercolatorQueriesRegistry percolatorQueriesRegistry; private final ShardPercolateService shardPercolateService; @@ -153,7 +155,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService, ShardFilterCache shardFilterCache, ShardFieldData shardFieldData, PercolatorQueriesRegistry percolatorQueriesRegistry, ShardPercolateService shardPercolateService, CodecService codecService, - ShardTermVectorService termVectorService, IndexFieldDataService indexFieldDataService, IndexService indexService, ShardSuggestService shardSuggestService) { + ShardTermVectorService termVectorService, IndexFieldDataService indexFieldDataService, IndexService indexService, ShardSuggestService shardSuggestService, ShardQueryCache shardQueryCache) { super(shardId, indexSettings); this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle; this.indexSettingsService = indexSettingsService; @@ -172,6 +174,7 @@ public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings this.searchService = searchService; this.shardWarmerService = shardWarmerService; this.shardFilterCache = shardFilterCache; + this.shardQueryCache = shardQueryCache; this.shardFieldData = shardFieldData; this.percolatorQueriesRegistry = percolatorQueriesRegistry; this.shardPercolateService = shardPercolateService; @@ -256,6 +259,11 @@ public ShardFilterCache filterCache() { return this.shardFilterCache; } + @Override + public ShardQueryCache queryCache() { + return this.shardQueryCache; + } + @Override public ShardFieldData fieldData() { return this.shardFieldData; diff --git a/src/main/java/org/elasticsearch/indices/IndicesModule.java b/src/main/java/org/elasticsearch/indices/IndicesModule.java index dad12f9a295bb..fa7513687c6e6 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -28,6 +28,7 @@ import org.elasticsearch.indices.analysis.IndicesAnalysisModule; import org.elasticsearch.indices.cache.filter.IndicesFilterCache; import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache; +import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener; @@ -72,6 +73,7 @@ protected void configure() { bind(IndicesClusterStateService.class).asEagerSingleton(); bind(IndexingMemoryController.class).asEagerSingleton(); bind(IndicesFilterCache.class).asEagerSingleton(); + bind(IndicesQueryCache.class).asEagerSingleton(); bind(IndicesFieldDataCache.class).asEagerSingleton(); bind(IndicesTermsFilterCache.class).asEagerSingleton(); bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java b/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java index 08bc41a504ae5..f790eb36345bc 100644 --- a/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java +++ b/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.cache.filter.FilterCacheStats; import org.elasticsearch.index.cache.id.IdCacheStats; +import org.elasticsearch.index.cache.query.QueryCacheStats; import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.fielddata.FieldDataStats; import org.elasticsearch.index.flush.FlushStats; @@ -136,6 +137,11 @@ public FilterCacheStats getFilterCache() { return stats.getFilterCache(); } + @Nullable + public QueryCacheStats getQueryCache() { + return stats.getQueryCache(); + } + @Nullable public IdCacheStats getIdCache() { return stats.getIdCache(); diff --git a/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java b/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java index 024a892bde36c..0b8f7856fc95d 100644 --- a/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java +++ b/src/main/java/org/elasticsearch/indices/cache/filter/IndicesFilterCache.java @@ -185,7 +185,6 @@ public void run() { keys.add(it.next()); it.remove(); } - cache.cleanUp(); if (!keys.isEmpty()) { for (Iterator it = cache.asMap().keySet().iterator(); it.hasNext(); ) { WeightedFilterCache.FilterCacheKey filterCacheKey = it.next(); @@ -195,6 +194,7 @@ public void run() { } } } + cache.cleanUp(); schedule(); keys.clear(); } diff --git a/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java b/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java new file mode 100644 index 0000000000000..11dc539a18dc7 --- /dev/null +++ b/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java @@ -0,0 +1,495 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.cache.query; + +import com.carrotsearch.hppc.ObjectOpenHashSet; +import com.carrotsearch.hppc.ObjectSet; +import com.google.common.cache.*; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.MemorySizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.internal.ShardSearchRequest; +import org.elasticsearch.search.query.QueryPhase; +import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.search.query.QuerySearchResultProvider; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.common.Strings.hasLength; + +/** + * The indices query cache allows to cache a shard level query stage responses, helping with improving + * similar requests that are potentially expensive (because of aggs for example). The cache is fully coherent + * with the semantics of NRT (the index reader version is part of the cache key), and relies on size based + * eviction to evict old reader associated cache entries as well as scheduler reaper to clean readers that + * are no longer used or closed shards. + *

+ * Currently, the cache is only enabled for {@link SearchType#COUNT}, and can only be opted in on an index + * level setting that can be dynamically changed and defaults to false. + *

+ * There are still several TODOs left in this class, some easily addressable, some more complex, but the support + * is functional. + */ +public class IndicesQueryCache extends AbstractComponent implements RemovalListener { + + /** + * A setting to enable or disable query caching on an index level. Its dynamic by default + * since we are checking on the cluster state IndexMetaData always. + */ + public static final String INDEX_QUERY_CACHE_ENABLED = "index.cache.query.enable"; + + public static final String INDICES_CACHE_QUERY_SIZE = "indices.cache.query.size"; + public static final String INDICES_CACHE_QUERY_EXPIRE = "indices.cache.query.expire"; + + private final ThreadPool threadPool; + private final ClusterService clusterService; + + private final TimeValue cleanInterval; + private final Reaper reaper; + + final ConcurrentMap registeredClosedListeners = ConcurrentCollections.newConcurrentMap(); + final Set keysToClean = ConcurrentCollections.newConcurrentSet(); + + //TODO make these changes configurable on the cluster level + private volatile String size; + private volatile TimeValue expire; + //TODO expose this in our stats APIs + private volatile Cache cache; + + @Inject + public IndicesQueryCache(Settings settings, ClusterService clusterService, ThreadPool threadPool) { + super(settings); + this.clusterService = clusterService; + this.threadPool = threadPool; + this.cleanInterval = componentSettings.getAsTime("clean_interval", TimeValue.timeValueSeconds(60)); + // this cache can be very small yet still be very effective + this.size = settings.get(INDICES_CACHE_QUERY_SIZE, "1%"); + this.expire = settings.getAsTime(INDICES_CACHE_QUERY_EXPIRE, null); + buildCache(); + + this.reaper = new Reaper(); + threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, reaper); + } + + private void buildCache() { + long sizeInBytes = MemorySizeValue.parseBytesSizeValueOrHeapRatio(size).bytes(); + if (sizeInBytes > ByteSizeValue.MAX_GUAVA_CACHE_SIZE.bytes()) { + logger.warn("reducing requested query cache size of [{}] to the maximum allowed size of [{}]", new ByteSizeValue(sizeInBytes), ByteSizeValue.MAX_GUAVA_CACHE_SIZE); + sizeInBytes = ByteSizeValue.MAX_GUAVA_CACHE_SIZE.bytes(); + // Even though it feels wrong for size and sizeInBytes to get out of + // sync we don't update size here because it might cause the cache + // to be rebuilt every time new settings are applied. + } + + CacheBuilder cacheBuilder = CacheBuilder.newBuilder() + .maximumWeight(sizeInBytes).weigher(new QueryCacheWeigher()).removalListener(this); + + // defaults to 4, but this is a busy map for all indices, increase it a bit + cacheBuilder.concurrencyLevel(16); + + if (expire != null) { + cacheBuilder.expireAfterAccess(expire.millis(), TimeUnit.MILLISECONDS); + } + + cache = cacheBuilder.build(); + } + + private static class QueryCacheWeigher implements Weigher { + + @Override + public int weigh(Key key, BytesReference value) { + // TODO add sizeInBytes to BytesReference, since it might be paged.... (Accountable) + return (int) (key.ramBytesUsed() + value.length()); + } + } + + public void close() { + reaper.close(); + cache.invalidateAll(); + } + + public void clear(IndexShard shard) { + if (shard == null) { + return; + } + keysToClean.add(new CleanupKey(shard, -1)); + logger.trace("{} explicit cache clear", shard.shardId()); + reaper.reap(); + } + + @Override + public void onRemoval(RemovalNotification notification) { + if (notification.getKey() == null) { + return; + } + notification.getKey().shard.queryCache().onRemoval(notification); + } + + /** + * Can the shard request be cached at all? + */ + public boolean canCache(ShardSearchRequest request, SearchContext context) { + // TODO: for now, template is not supported, though we could use the generated bytes as the key + if (hasLength(request.templateSource())) { + return false; + } + // for now, only enable it for search type count + if (request.searchType() != SearchType.COUNT) { + return false; + } + IndexMetaData index = clusterService.state().getMetaData().index(request.index()); + if (index == null) { // in case we didn't yet have the cluster state, or it just got deleted + return false; + } + if (!index.settings().getAsBoolean(INDEX_QUERY_CACHE_ENABLED, Boolean.FALSE)) { + return false; + } + // if the reader is not a directory reader, we can't get the version from it + if (!(context.searcher().getIndexReader() instanceof DirectoryReader)) { + return false; + } + if (context.nowInMillisUsed()) { + return false; + } + // TODO allow to have a queryCache level flag on the request as well + return true; + } + + /** + * Loads the cache result, computing it if needed by executing the query phase. The combination of load + compute allows + * to have a single load operation that will cause other requests with the same key to wait till its loaded an reuse + * the same cache. + */ + public QuerySearchResultProvider load(final ShardSearchRequest request, final SearchContext context, final QueryPhase queryPhase) throws Exception { + assert canCache(request, context); + Key key = buildKey(request, context); + Loader loader = new Loader(queryPhase, context, key); + BytesReference value = cache.get(key, loader); + if (loader.isLoaded()) { + // see if its the first time we see this reader, and make sure to register a cleanup key + CleanupKey cleanupKey = new CleanupKey(context.indexShard(), ((DirectoryReader) context.searcher().getIndexReader()).getVersion()); + if (!registeredClosedListeners.containsKey(cleanupKey)) { + Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE); + if (previous == null) { + context.searcher().getIndexReader().addReaderClosedListener(cleanupKey); + } + } + } + + // try and be smart, and reuse an already loaded and constructed QueryResult of in VM execution + return new BytesQuerySearchResult(context.id(), context.shardTarget(), value, loader.isLoaded() ? context.queryResult() : null); + } + + private static class Loader implements Callable { + + private final QueryPhase queryPhase; + private final SearchContext context; + private final IndicesQueryCache.Key key; + private boolean loaded; + + Loader(QueryPhase queryPhase, SearchContext context, IndicesQueryCache.Key key) { + this.queryPhase = queryPhase; + this.context = context; + this.key = key; + } + + public boolean isLoaded() { + return this.loaded; + } + + @Override + public BytesReference call() throws Exception { + queryPhase.execute(context); + BytesStreamOutput out = new BytesStreamOutput(); + context.queryResult().writeToNoId(out); + // for now, keep the paged data structure, which might have unused bytes to fill a page, but better to keep + // the memory properly paged instead of having varied sized bytes + BytesReference value = out.bytes(); + assert verifyCacheSerializationSameAsQueryResult(value, context, context.queryResult()); + loaded = true; + key.shard.queryCache().onCached(key, value); + return value; + } + } + + public static class Key implements Accountable { + public final IndexShard shard; // use as identity equality + public final long readerVersion; // use the reader version to now keep a reference to a "short" lived reader until its reaped + public final BytesReference value; + + Key(IndexShard shard, long readerVersion, BytesReference value) { + this.shard = shard; + this.readerVersion = readerVersion; + this.value = value; + } + + @Override + public long ramBytesUsed() { + return RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_LONG + value.length(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + Key key = (Key) o; + if (readerVersion != key.readerVersion) return false; + if (!shard.equals(key.shard)) return false; + if (!value.equals(key.value)) return false; + return true; + } + + @Override + public int hashCode() { + int result = shard.hashCode(); + result = 31 * result + (int) (readerVersion ^ (readerVersion >>> 32)); + result = 31 * result + value.hashCode(); + return result; + } + } + + private class CleanupKey implements IndexReader.ReaderClosedListener { + IndexShard indexShard; + long readerVersion; // use the reader version to now keep a reference to a "short" lived reader until its reaped + + private CleanupKey(IndexShard indexShard, long readerVersion) { + this.indexShard = indexShard; + this.readerVersion = readerVersion; + } + + @Override + public void onClose(IndexReader reader) { + Boolean remove = registeredClosedListeners.remove(this); + if (remove != null) { + keysToClean.add(this); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + CleanupKey that = (CleanupKey) o; + if (readerVersion != that.readerVersion) return false; + if (!indexShard.equals(that.indexShard)) return false; + return true; + } + + @Override + public int hashCode() { + int result = indexShard.hashCode(); + result = 31 * result + (int) (readerVersion ^ (readerVersion >>> 32)); + return result; + } + } + + private class Reaper implements Runnable { + + private final ObjectSet currentKeysToClean = ObjectOpenHashSet.newInstance(); + private final ObjectSet currentFullClean = ObjectOpenHashSet.newInstance(); + + private volatile boolean closed; + + void close() { + closed = true; + } + + @Override + public void run() { + if (closed) { + return; + } + if (keysToClean.isEmpty()) { + schedule(); + return; + } + try { + threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { + @Override + public void run() { + reap(); + schedule(); + } + }); + } catch (EsRejectedExecutionException ex) { + logger.debug("Can not run ReaderCleaner - execution rejected", ex); + } + } + + private void schedule() { + try { + threadPool.schedule(cleanInterval, ThreadPool.Names.SAME, this); + } catch (EsRejectedExecutionException ex) { + logger.debug("Can not schedule ReaderCleaner - execution rejected", ex); + } + } + + synchronized void reap() { + currentKeysToClean.clear(); + currentFullClean.clear(); + for (Iterator iterator = keysToClean.iterator(); iterator.hasNext(); ) { + CleanupKey cleanupKey = iterator.next(); + iterator.remove(); + if (cleanupKey.readerVersion == -1 || cleanupKey.indexShard.state() == IndexShardState.CLOSED) { + // -1 indicates full cleanup, as does a closed shard + currentFullClean.add(cleanupKey.indexShard); + } else { + currentKeysToClean.add(cleanupKey); + } + } + + if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) { + CleanupKey lookupKey = new CleanupKey(null, -1); + for (Iterator iterator = cache.asMap().keySet().iterator(); iterator.hasNext(); ) { + Key key = iterator.next(); + if (currentFullClean.contains(key.shard)) { + iterator.remove(); + } else { + lookupKey.indexShard = key.shard; + lookupKey.readerVersion = key.readerVersion; + if (currentKeysToClean.contains(lookupKey)) { + iterator.remove(); + } + } + } + } + + cache.cleanUp(); + currentKeysToClean.clear(); + currentFullClean.clear(); + } + } + + private static boolean verifyCacheSerializationSameAsQueryResult(BytesReference cacheData, SearchContext context, QuerySearchResult result) throws Exception { + BytesStreamOutput out1 = new BytesStreamOutput(); + new BytesQuerySearchResult(context.id(), context.shardTarget(), cacheData).writeTo(out1); + BytesStreamOutput out2 = new BytesStreamOutput(); + result.writeTo(out2); + return out1.bytes().equals(out2.bytes()); + } + + private static Key buildKey(ShardSearchRequest request, SearchContext context) throws Exception { + // TODO: for now, this will create different keys for different JSON order + // TODO: tricky to get around this, need to parse and order all, which can be expensive + BytesStreamOutput out = new BytesStreamOutput(); + request.writeTo(out, true); + // copy it over, most requests are small, we might as well copy to make sure we are not sliced... + // we could potentially keep it without copying, but then pay the price of extra unused bytes up to a page + return new Key(context.indexShard(), + ((DirectoryReader) context.searcher().getIndexReader()).getVersion(), + out.bytes().copyBytesArray()); + } + + /** + * this class aim is to just provide an on the write *write* format that is the same as {@link QuerySearchResult} + * and also provide a nice wrapper for in node communication. + */ + private static class BytesQuerySearchResult extends QuerySearchResultProvider { + + private long id; + private SearchShardTarget shardTarget; + private BytesReference data; + + private transient QuerySearchResult result; + + private BytesQuerySearchResult(long id, SearchShardTarget shardTarget, BytesReference data) { + this(id, shardTarget, data, null); + } + + private BytesQuerySearchResult(long id, SearchShardTarget shardTarget, BytesReference data, QuerySearchResult result) { + this.id = id; + this.shardTarget = shardTarget; + this.data = data; + this.result = result; + } + + @Override + public boolean includeFetch() { + return false; + } + + @Override + public QuerySearchResult queryResult() { + if (result == null) { + result = new QuerySearchResult(id, shardTarget); + try { + result.readFromWithId(id, data.streamInput()); + } catch (Exception e) { + throw new ElasticsearchParseException("failed to parse a cached query", e); + } + } + return result; + } + + @Override + public long id() { + return id; + } + + @Override + public SearchShardTarget shardTarget() { + return shardTarget; + } + + @Override + public void shardTarget(SearchShardTarget shardTarget) { + this.shardTarget = shardTarget; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + throw new ElasticsearchIllegalStateException("readFrom should not be called"); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeLong(id); +// shardTarget.writeTo(out); not needed + data.writeTo(out); // we need to write teh bytes as is, to be the same as QuerySearchResult + } + } +} diff --git a/src/main/java/org/elasticsearch/percolator/PercolateContext.java b/src/main/java/org/elasticsearch/percolator/PercolateContext.java index 701e0648ffb2e..1eddbe726ee9b 100644 --- a/src/main/java/org/elasticsearch/percolator/PercolateContext.java +++ b/src/main/java/org/elasticsearch/percolator/PercolateContext.java @@ -341,7 +341,7 @@ public SearchContext queryBoost(float queryBoost) { } @Override - public long nowInMillis() { + protected long nowInMillisImpl() { throw new UnsupportedOperationException(); } diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/stats/RestIndicesStatsAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/stats/RestIndicesStatsAction.java index b99af297bfbe2..508f7c0137f18 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/stats/RestIndicesStatsAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/stats/RestIndicesStatsAction.java @@ -80,6 +80,7 @@ public void handleRequest(final RestRequest request, final RestChannel channel, indicesStatsRequest.fieldData(metrics.contains("fielddata")); indicesStatsRequest.completion(metrics.contains("completion")); indicesStatsRequest.suggest(metrics.contains("suggest")); + indicesStatsRequest.queryCache(metrics.contains("query_cache")); } if (request.hasParam("groups")) { diff --git a/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java b/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java index 2001a4b50b20c..a6e8b16fd4221 100644 --- a/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java +++ b/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java @@ -123,6 +123,12 @@ Table getTableWithHeader(final RestRequest request) { table.addCell("filter_cache.evictions", "sibling:pri;alias:fce,filterCacheEvictions;default:false;text-align:right;desc:filter cache evictions"); table.addCell("pri.filter_cache.evictions", "default:false;text-align:right;desc:filter cache evictions"); + table.addCell("query_cache.memory_size", "sibling:pri;alias:qcm,queryCacheMemory;default:false;text-align:right;desc:used query cache"); + table.addCell("pri.query_cache.memory_size", "default:false;text-align:right;desc:used query cache"); + + table.addCell("query_cache.evictions", "sibling:pri;alias:qce,queryCacheEvictions;default:false;text-align:right;desc:query cache evictions"); + table.addCell("pri.query_cache.evictions", "default:false;text-align:right;desc:query cache evictions"); + table.addCell("flush.total", "sibling:pri;alias:ft,flushTotal;default:false;text-align:right;desc:number of flushes"); table.addCell("pri.flush.total", "default:false;text-align:right;desc:number of flushes"); @@ -302,6 +308,12 @@ private Table buildTable(RestRequest request, String[] indices, ClusterHealthRes table.addCell(indexStats == null ? null : indexStats.getTotal().getFilterCache().getEvictions()); table.addCell(indexStats == null ? null : indexStats.getPrimaries().getFilterCache().getEvictions()); + table.addCell(indexStats == null ? null : indexStats.getTotal().getQueryCache().getMemorySize()); + table.addCell(indexStats == null ? null : indexStats.getPrimaries().getQueryCache().getMemorySize()); + + table.addCell(indexStats == null ? null : indexStats.getTotal().getQueryCache().getEvictions()); + table.addCell(indexStats == null ? null : indexStats.getPrimaries().getQueryCache().getEvictions()); + table.addCell(indexStats == null ? null : indexStats.getTotal().getFlush().getTotal()); table.addCell(indexStats == null ? null : indexStats.getPrimaries().getFlush().getTotal()); diff --git a/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java b/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java index 2baccaa4dc7e8..05593c104e189 100644 --- a/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java +++ b/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java @@ -122,6 +122,9 @@ Table getTableWithHeader(final RestRequest request) { table.addCell("filter_cache.memory_size", "alias:fcm,filterCacheMemory;default:false;text-align:right;desc:used filter cache"); table.addCell("filter_cache.evictions", "alias:fce,filterCacheEvictions;default:false;text-align:right;desc:filter cache evictions"); + table.addCell("query_cache.memory_size", "alias:qcm,queryCacheMemory;default:false;text-align:right;desc:used query cache"); + table.addCell("query_cache.evictions", "alias:qce,queryCacheEvictions;default:false;text-align:right;desc:query cache evictions"); + table.addCell("flush.total", "alias:ft,flushTotal;default:false;text-align:right;desc:number of flushes"); table.addCell("flush.total_time", "alias:ftt,flushTotalTime;default:false;text-align:right;desc:time spent in flush"); @@ -226,6 +229,9 @@ private Table buildTable(RestRequest req, ClusterStateResponse state, NodesInfoR table.addCell(stats == null ? null : stats.getIndices().getFilterCache().getMemorySize()); table.addCell(stats == null ? null : stats.getIndices().getFilterCache().getEvictions()); + table.addCell(stats == null ? null : stats.getIndices().getQueryCache().getMemorySize()); + table.addCell(stats == null ? null : stats.getIndices().getQueryCache().getEvictions()); + table.addCell(stats == null ? null : stats.getIndices().getFlush().getTotal()); table.addCell(stats == null ? null : stats.getIndices().getFlush().getTotalTime()); diff --git a/src/main/java/org/elasticsearch/search/SearchService.java b/src/main/java/org/elasticsearch/search/SearchService.java index f7829e21a0a84..8c08fa264eaf2 100644 --- a/src/main/java/org/elasticsearch/search/SearchService.java +++ b/src/main/java/org/elasticsearch/search/SearchService.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lease.Releasables; @@ -61,6 +62,7 @@ import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.indices.warmer.IndicesWarmer; import org.elasticsearch.indices.warmer.IndicesWarmer.WarmerContext; import org.elasticsearch.script.ExecutableScript; @@ -84,6 +86,7 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicLong; @@ -125,6 +128,8 @@ public class SearchService extends AbstractLifecycleComponent { private final FetchPhase fetchPhase; + private final IndicesQueryCache indicesQueryCache; + private final long defaultKeepAlive; private final ScheduledFuture keepAliveReaper; @@ -137,7 +142,8 @@ public class SearchService extends AbstractLifecycleComponent { @Inject public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService, IndicesLifecycle indicesLifecycle, IndicesWarmer indicesWarmer, ThreadPool threadPool, - ScriptService scriptService, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase) { + ScriptService scriptService, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase, + IndicesQueryCache indicesQueryCache) { super(settings); this.threadPool = threadPool; this.clusterService = clusterService; @@ -150,6 +156,7 @@ public SearchService(Settings settings, ClusterService clusterService, IndicesSe this.dfsPhase = dfsPhase; this.queryPhase = queryPhase; this.fetchPhase = fetchPhase; + this.indicesQueryCache = indicesQueryCache; TimeValue keepAliveInterval = componentSettings.getAsTime(KEEPALIVE_INTERVAL_COMPONENENT_KEY, timeValueMinutes(1)); // we can have 5 minutes here, since we make sure to clean with search requests and when shard/index closes @@ -252,21 +259,35 @@ public ScrollQueryFetchSearchResult executeScan(InternalScrollSearchRequest requ } } - public QuerySearchResult executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException { + public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException { SearchContext context = createAndPutContext(request); try { context.indexShard().searchService().onPreQueryPhase(context); long time = System.nanoTime(); contextProcessing(context); - queryPhase.execute(context); + + QuerySearchResultProvider result; + boolean canCache = indicesQueryCache.canCache(request, context); + if (canCache) { + result = indicesQueryCache.load(request, context, queryPhase); + } else { + queryPhase.execute(context); + result = context.queryResult(); + } + if (context.searchType() == SearchType.COUNT) { freeContext(context.id()); } else { contextProcessedSuccessfully(context); } context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time); - return context.queryResult(); + + return result; } catch (Throwable e) { + // execution exception can happen while loading the cache, strip it + if (e instanceof ExecutionException) { + e = e.getCause(); + } context.indexShard().searchService().onFailedQueryPhase(context); logger.trace("Query phase failed", e); freeContext(context); diff --git a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java index 602e4ecc3d91b..799c4f41e4517 100644 --- a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java +++ b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java @@ -40,6 +40,7 @@ import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.search.query.ScrollQuerySearchResult; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -211,16 +212,16 @@ public String executor() { } } - public void sendExecuteQuery(DiscoveryNode node, final ShardSearchRequest request, final SearchServiceListener listener) { + public void sendExecuteQuery(DiscoveryNode node, final ShardSearchRequest request, final SearchServiceListener listener) { if (clusterService.state().nodes().localNodeId().equals(node.id())) { - execute(new Callable() { + execute(new Callable() { @Override - public QuerySearchResult call() throws Exception { + public QuerySearchResultProvider call() throws Exception { return searchService.executeQueryPhase(request); } }, listener); } else { - transportService.sendRequest(node, QUERY_ACTION_NAME, request, new BaseTransportResponseHandler() { + transportService.sendRequest(node, QUERY_ACTION_NAME, request, new BaseTransportResponseHandler() { @Override public QuerySearchResult newInstance() { @@ -228,7 +229,7 @@ public QuerySearchResult newInstance() { } @Override - public void handleResponse(QuerySearchResult response) { + public void handleResponse(QuerySearchResultProvider response) { listener.onResult(response); } @@ -690,7 +691,7 @@ public ShardSearchRequest newInstance() { @Override public void messageReceived(ShardSearchRequest request, TransportChannel channel) throws Exception { - QuerySearchResult result = searchService.executeQueryPhase(request); + QuerySearchResultProvider result = searchService.executeQueryPhase(request); channel.sendResponse(result); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/TopHitsContext.java b/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/TopHitsContext.java index 68e75ff2ccd51..c7f407bb8fb78 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/TopHitsContext.java +++ b/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/TopHitsContext.java @@ -170,7 +170,7 @@ public SearchContext queryBoost(float queryBoost) { } @Override - public long nowInMillis() { + protected long nowInMillisImpl() { return context.nowInMillis(); } diff --git a/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java b/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java index e43ec50b002cc..5bc477ede9182 100644 --- a/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java +++ b/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java @@ -34,7 +34,7 @@ /** * */ -public class QueryFetchSearchResult extends TransportResponse implements QuerySearchResultProvider, FetchSearchResultProvider { +public class QueryFetchSearchResult extends QuerySearchResultProvider implements FetchSearchResultProvider { private QuerySearchResult queryResult; private FetchSearchResult fetchResult; diff --git a/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java b/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java index 94dd0b7d81cb4..713df36f426d9 100644 --- a/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java +++ b/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java @@ -297,7 +297,7 @@ public SearchContext queryBoost(float queryBoost) { return this; } - public long nowInMillis() { + protected long nowInMillisImpl() { return request.nowInMillis(); } diff --git a/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/src/main/java/org/elasticsearch/search/internal/SearchContext.java index 0dd88cd9275f7..046c9fcc88e50 100644 --- a/src/main/java/org/elasticsearch/search/internal/SearchContext.java +++ b/src/main/java/org/elasticsearch/search/internal/SearchContext.java @@ -98,6 +98,8 @@ public final void close() { } } + private boolean nowInMillisUsed; + protected abstract void doClose(); /** @@ -129,7 +131,16 @@ public final void close() { public abstract SearchContext queryBoost(float queryBoost); - public abstract long nowInMillis(); + public final long nowInMillis() { + nowInMillisUsed = true; + return nowInMillisImpl(); + } + + public final boolean nowInMillisUsed() { + return nowInMillisUsed; + } + + protected abstract long nowInMillisImpl(); public abstract Scroll scroll(); diff --git a/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index 302aacd892136..89d9f63ce44e4 100644 --- a/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -259,11 +259,17 @@ public void readFrom(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { + writeTo(out, false); + } + + public void writeTo(StreamOutput out, boolean asKey) throws IOException { super.writeTo(out); out.writeString(index); out.writeVInt(shardId); out.writeByte(searchType.id()); - out.writeVInt(numberOfShards); + if (!asKey) { + out.writeVInt(numberOfShards); + } if (scroll == null) { out.writeBoolean(false); } else { @@ -274,7 +280,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBytesReference(extraSource); out.writeStringArray(types); out.writeStringArrayNullable(filteringAliases); - out.writeVLong(nowInMillis); + if (!asKey) { + out.writeVLong(nowInMillis); + } if (out.getVersion().onOrAfter(Version.V_1_1_0)) { out.writeBytesReference(templateSource); diff --git a/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index e6a385c044bdf..f11966746d222 100644 --- a/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -39,7 +39,7 @@ /** * */ -public class QuerySearchResult extends TransportResponse implements QuerySearchResultProvider { +public class QuerySearchResult extends QuerySearchResultProvider { private long id; private SearchShardTarget shardTarget; @@ -159,7 +159,12 @@ public static QuerySearchResult readQuerySearchResult(StreamInput in) throws IOE @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - id = in.readLong(); + long id = in.readLong(); + readFromWithId(id, in); + } + + public void readFromWithId(long id, StreamInput in) throws IOException { + this.id = id; // shardTarget = readSearchShardTarget(in); from = in.readVInt(); size = in.readVInt(); @@ -183,6 +188,10 @@ public void readFrom(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeLong(id); + writeToNoId(out); + } + + public void writeToNoId(StreamOutput out) throws IOException { // shardTarget.writeTo(out); out.writeVInt(from); out.writeVInt(size); diff --git a/src/main/java/org/elasticsearch/search/query/QuerySearchResultProvider.java b/src/main/java/org/elasticsearch/search/query/QuerySearchResultProvider.java index 61b85b673af89..1ae3157fa5355 100644 --- a/src/main/java/org/elasticsearch/search/query/QuerySearchResultProvider.java +++ b/src/main/java/org/elasticsearch/search/query/QuerySearchResultProvider.java @@ -20,16 +20,17 @@ package org.elasticsearch.search.query; import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.transport.TransportResponse; /** * */ -public interface QuerySearchResultProvider extends SearchPhaseResult { +public abstract class QuerySearchResultProvider extends TransportResponse implements SearchPhaseResult { /** * If both query and fetch happened on the same call. */ - boolean includeFetch(); + public abstract boolean includeFetch(); - QuerySearchResult queryResult(); + public abstract QuerySearchResult queryResult(); } diff --git a/src/test/java/org/elasticsearch/indices/cache/CacheTests.java b/src/test/java/org/elasticsearch/indices/cache/CacheTests.java deleted file mode 100644 index 5764ac9d8144f..0000000000000 --- a/src/test/java/org/elasticsearch/indices/cache/CacheTests.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices.cache; - -import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; -import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.query.FilterBuilders; -import org.elasticsearch.search.sort.SortOrder; -import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; -import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; -import org.junit.Test; - -import static org.elasticsearch.index.query.QueryBuilders.filteredQuery; -import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; -import static org.hamcrest.Matchers.*; - -/** - */ -@ClusterScope(scope= Scope.SUITE, numDataNodes =1, numClientNodes = 0, randomDynamicTemplates = false) -public class CacheTests extends ElasticsearchIntegrationTest { - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - //Filter cache is cleaned periodically, default is 60s, so make sure it runs often. Thread.sleep for 60s is bad - return ImmutableSettings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put("indices.cache.filter.clean_interval", "1ms").build(); - } - - @Test - public void testClearCacheFilterKeys() { - client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet(); - client().prepareIndex("test", "type", "1").setSource("field", "value").execute().actionGet(); - client().admin().indices().prepareRefresh().execute().actionGet(); - - NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet(); - assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes(), equalTo(0l)); - IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test").clear().setFilterCache(true).execute().actionGet(); - assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), equalTo(0l)); - - SearchResponse searchResponse = client().prepareSearch().setQuery(filteredQuery(matchAllQuery(), FilterBuilders.termFilter("field", "value").cacheKey("test_key"))).execute().actionGet(); - assertThat(searchResponse.getHits().getHits().length, equalTo(1)); - nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet(); - assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes(), internalCluster().hasFilterCache() ? greaterThan(0l) : is(0L)); - indicesStats = client().admin().indices().prepareStats("test").clear().setFilterCache(true).execute().actionGet(); - assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), internalCluster().hasFilterCache() ? greaterThan(0l) : is(0L)); - - client().admin().indices().prepareClearCache().setFilterKeys("test_key").execute().actionGet(); - nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet(); - assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes(), equalTo(0l)); - indicesStats = client().admin().indices().prepareStats("test").clear().setFilterCache(true).execute().actionGet(); - assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), equalTo(0l)); - } - - @Test - public void testFieldDataStats() { - client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet(); - client().prepareIndex("test", "type", "1").setSource("field", "value1", "field2", "value1").execute().actionGet(); - client().prepareIndex("test", "type", "2").setSource("field", "value2", "field2", "value2").execute().actionGet(); - client().admin().indices().prepareRefresh().execute().actionGet(); - - NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet(); - assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0l)); - IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet(); - assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0l)); - - // sort to load it to field data... - client().prepareSearch().addSort("field", SortOrder.ASC).execute().actionGet(); - client().prepareSearch().addSort("field", SortOrder.ASC).execute().actionGet(); - - nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet(); - assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0l)); - indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet(); - assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), greaterThan(0l)); - - // sort to load it to field data... - client().prepareSearch().addSort("field2", SortOrder.ASC).execute().actionGet(); - client().prepareSearch().addSort("field2", SortOrder.ASC).execute().actionGet(); - - // now check the per field stats - nodesStats = client().admin().cluster().prepareNodesStats().setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.FieldData, true).fieldDataFields("*")).execute().actionGet(); - assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0l)); - assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getFields().get("field"), greaterThan(0l)); - assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getFields().get("field"), lessThan(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes())); - - indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).setFieldDataFields("*").execute().actionGet(); - assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), greaterThan(0l)); - assertThat(indicesStats.getTotal().getFieldData().getFields().get("field"), greaterThan(0l)); - assertThat(indicesStats.getTotal().getFieldData().getFields().get("field"), lessThan(indicesStats.getTotal().getFieldData().getMemorySizeInBytes())); - - client().admin().indices().prepareClearCache().setFieldDataCache(true).execute().actionGet(); - nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet(); - assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0l)); - indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet(); - assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0l)); - - } - - @Test - public void testClearAllCaches() throws Exception { - client().admin().indices().prepareCreate("test") - .setSettings(ImmutableSettings.settingsBuilder() - .put("index.number_of_replicas", 0) - .put("index.number_of_shards", 1)) - .execute().actionGet(); - client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); - client().prepareIndex("test", "type", "1").setSource("field", "value1").execute().actionGet(); - client().prepareIndex("test", "type", "2").setSource("field", "value2").execute().actionGet(); - client().admin().indices().prepareRefresh().execute().actionGet(); - - NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true) - .execute().actionGet(); - assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0l)); - assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes(), equalTo(0l)); - - IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test") - .clear().setFieldData(true).setFilterCache(true) - .execute().actionGet(); - assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0l)); - assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), equalTo(0l)); - - // sort to load it to field data and filter to load filter cache - client().prepareSearch() - .setPostFilter(FilterBuilders.termFilter("field", "value1")) - .addSort("field", SortOrder.ASC) - .execute().actionGet(); - client().prepareSearch() - .setPostFilter(FilterBuilders.termFilter("field", "value2")) - .addSort("field", SortOrder.ASC) - .execute().actionGet(); - - nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true) - .execute().actionGet(); - assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0l)); - assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes(), internalCluster().hasFilterCache() ? greaterThan(0l) : is(0L)); - - indicesStats = client().admin().indices().prepareStats("test") - .clear().setFieldData(true).setFilterCache(true) - .execute().actionGet(); - assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), greaterThan(0l)); - assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), internalCluster().hasFilterCache() ? greaterThan(0l) : is(0L)); - - client().admin().indices().prepareClearCache().execute().actionGet(); - Thread.sleep(100); // Make sure the filter cache entries have been removed... - nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true) - .execute().actionGet(); - assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0l)); - assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes(), equalTo(0l)); - - indicesStats = client().admin().indices().prepareStats("test") - .clear().setFieldData(true).setFilterCache(true) - .execute().actionGet(); - assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0l)); - assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), equalTo(0l)); - } - -} diff --git a/src/test/java/org/elasticsearch/indices/stats/SimpleIndexStatsTests.java b/src/test/java/org/elasticsearch/indices/stats/IndexStatsTests.java similarity index 66% rename from src/test/java/org/elasticsearch/indices/stats/SimpleIndexStatsTests.java rename to src/test/java/org/elasticsearch/indices/stats/IndexStatsTests.java index 69460b71da467..6e06465bd6bd8 100644 --- a/src/test/java/org/elasticsearch/indices/stats/SimpleIndexStatsTests.java +++ b/src/test/java/org/elasticsearch/indices/stats/IndexStatsTests.java @@ -19,13 +19,22 @@ package org.elasticsearch.indices.stats; +import com.carrotsearch.randomizedtesting.annotations.Seed; import org.apache.lucene.util.Version; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.indices.stats.*; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamInput; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.FilterBuilders; +import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; @@ -38,11 +47,197 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.query.QueryBuilders.filteredQuery; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.*; -@ClusterScope(scope = Scope.SUITE, numDataNodes = 2) -public class SimpleIndexStatsTests extends ElasticsearchIntegrationTest { +@ClusterScope(scope = Scope.SUITE, numDataNodes = 2, numClientNodes = 0, randomDynamicTemplates = false) +public class IndexStatsTests extends ElasticsearchIntegrationTest { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + //Filter/Query cache is cleaned periodically, default is 60s, so make sure it runs often. Thread.sleep for 60s is bad + return ImmutableSettings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)) + .put("indices.cache.filter.clean_interval", "1ms") + .put("indices.cache.query.clean_interval", "1ms") + .build(); + } + + @Test + public void testClearCacheFilterKeys() { + client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 2)).execute().actionGet(); + client().prepareIndex("test", "type", "1").setSource("field", "value").execute().actionGet(); + client().admin().indices().prepareRefresh().execute().actionGet(); + + NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet(); + assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFilterCache().getMemorySizeInBytes(), equalTo(0l)); + IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test").clear().setFilterCache(true).execute().actionGet(); + assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), equalTo(0l)); + + SearchResponse searchResponse = client().prepareSearch().setQuery(filteredQuery(matchAllQuery(), FilterBuilders.termFilter("field", "value").cacheKey("test_key"))).execute().actionGet(); + assertThat(searchResponse.getHits().getHits().length, equalTo(1)); + nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet(); + assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFilterCache().getMemorySizeInBytes(), internalCluster().hasFilterCache() ? greaterThan(0l) : is(0L)); + indicesStats = client().admin().indices().prepareStats("test").clear().setFilterCache(true).execute().actionGet(); + assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), internalCluster().hasFilterCache() ? greaterThan(0l) : is(0L)); + + client().admin().indices().prepareClearCache().setFilterKeys("test_key").execute().actionGet(); + nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet(); + assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFilterCache().getMemorySizeInBytes(), equalTo(0l)); + indicesStats = client().admin().indices().prepareStats("test").clear().setFilterCache(true).execute().actionGet(); + assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), equalTo(0l)); + } + + @Test + public void testFieldDataStats() { + client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 2)).execute().actionGet(); + client().prepareIndex("test", "type", "1").setSource("field", "value1", "field2", "value1").execute().actionGet(); + client().prepareIndex("test", "type", "2").setSource("field", "value2", "field2", "value2").execute().actionGet(); + client().admin().indices().prepareRefresh().execute().actionGet(); + + NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet(); + assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0l)); + IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet(); + assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0l)); + + // sort to load it to field data... + client().prepareSearch().addSort("field", SortOrder.ASC).execute().actionGet(); + client().prepareSearch().addSort("field", SortOrder.ASC).execute().actionGet(); + + nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet(); + assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0l)); + indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet(); + assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), greaterThan(0l)); + + // sort to load it to field data... + client().prepareSearch().addSort("field2", SortOrder.ASC).execute().actionGet(); + client().prepareSearch().addSort("field2", SortOrder.ASC).execute().actionGet(); + + // now check the per field stats + nodesStats = client().admin().cluster().prepareNodesStats().setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.FieldData, true).fieldDataFields("*")).execute().actionGet(); + assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0l)); + assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getFields().get("field") + nodesStats.getNodes()[1].getIndices().getFieldData().getFields().get("field"), greaterThan(0l)); + assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getFields().get("field") + nodesStats.getNodes()[1].getIndices().getFieldData().getFields().get("field"), lessThan(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes())); + + indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).setFieldDataFields("*").execute().actionGet(); + assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), greaterThan(0l)); + assertThat(indicesStats.getTotal().getFieldData().getFields().get("field"), greaterThan(0l)); + assertThat(indicesStats.getTotal().getFieldData().getFields().get("field"), lessThan(indicesStats.getTotal().getFieldData().getMemorySizeInBytes())); + + client().admin().indices().prepareClearCache().setFieldDataCache(true).execute().actionGet(); + nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet(); + assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0l)); + indicesStats = client().admin().indices().prepareStats("test").clear().setFieldData(true).execute().actionGet(); + assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0l)); + + } + + @Test + public void testClearAllCaches() throws Exception { + client().admin().indices().prepareCreate("test") + .setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_replicas", 0).put("index.number_of_shards", 2)) + .execute().actionGet(); + client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + client().prepareIndex("test", "type", "1").setSource("field", "value1").execute().actionGet(); + client().prepareIndex("test", "type", "2").setSource("field", "value2").execute().actionGet(); + client().admin().indices().prepareRefresh().execute().actionGet(); + + NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true) + .execute().actionGet(); + assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0l)); + assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFilterCache().getMemorySizeInBytes(), equalTo(0l)); + + IndicesStatsResponse indicesStats = client().admin().indices().prepareStats("test") + .clear().setFieldData(true).setFilterCache(true) + .execute().actionGet(); + assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0l)); + assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), equalTo(0l)); + + // sort to load it to field data and filter to load filter cache + client().prepareSearch() + .setPostFilter(FilterBuilders.termFilter("field", "value1")) + .addSort("field", SortOrder.ASC) + .execute().actionGet(); + client().prepareSearch() + .setPostFilter(FilterBuilders.termFilter("field", "value2")) + .addSort("field", SortOrder.ASC) + .execute().actionGet(); + + nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true) + .execute().actionGet(); + assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), greaterThan(0l)); + assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFilterCache().getMemorySizeInBytes(), internalCluster().hasFilterCache() ? greaterThan(0l) : is(0L)); + + indicesStats = client().admin().indices().prepareStats("test") + .clear().setFieldData(true).setFilterCache(true) + .execute().actionGet(); + assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), greaterThan(0l)); + assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), internalCluster().hasFilterCache() ? greaterThan(0l) : is(0L)); + + client().admin().indices().prepareClearCache().execute().actionGet(); + Thread.sleep(100); // Make sure the filter cache entries have been removed... + nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true) + .execute().actionGet(); + assertThat(nodesStats.getNodes()[0].getIndices().getFieldData().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0l)); + assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes() + nodesStats.getNodes()[1].getIndices().getFilterCache().getMemorySizeInBytes(), equalTo(0l)); + + indicesStats = client().admin().indices().prepareStats("test") + .clear().setFieldData(true).setFilterCache(true) + .execute().actionGet(); + assertThat(indicesStats.getTotal().getFieldData().getMemorySizeInBytes(), equalTo(0l)); + assertThat(indicesStats.getTotal().getFilterCache().getMemorySizeInBytes(), equalTo(0l)); + } + + @Test + public void testQueryCache() throws Exception { + assertAcked(client().admin().indices().prepareCreate("idx").setSettings(IndicesQueryCache.INDEX_QUERY_CACHE_ENABLED, true).get()); + ensureGreen(); + + int numDocs = randomIntBetween(2, 100); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs]; + for (int i = 0; i < numDocs; ++i) { + builders[i] = client().prepareIndex("idx", "type", Integer.toString(i)).setSource(jsonBuilder() + .startObject() + .field("common", "field") + .field("str_value", "s" + i) + .endObject()); + } + indexRandom(true, builders); + + assertThat(client().admin().indices().prepareStats("idx").setQueryCache(true).get().getTotal().getQueryCache().getMemorySizeInBytes(), equalTo(0l)); + for (int i = 0; i < 10; i++) { + assertThat(client().prepareSearch("idx").setSearchType(SearchType.COUNT).get().getHits().getTotalHits(), equalTo((long) numDocs)); + assertThat(client().admin().indices().prepareStats("idx").setQueryCache(true).get().getTotal().getQueryCache().getMemorySizeInBytes(), greaterThan(0l)); + } + + // index the data again... + builders = new IndexRequestBuilder[numDocs]; + for (int i = 0; i < numDocs; ++i) { + builders[i] = client().prepareIndex("idx", "type", Integer.toString(i)).setSource(jsonBuilder() + .startObject() + .field("common", "field") + .field("str_value", "s" + i) + .endObject()); + } + indexRandom(true, builders); + refresh(); + assertBusy(new Runnable() { + @Override + public void run() { + assertThat(client().admin().indices().prepareStats("idx").setQueryCache(true).get().getTotal().getQueryCache().getMemorySizeInBytes(), equalTo(0l)); + } + }); + + for (int i = 0; i < 10; i++) { + assertThat(client().prepareSearch("idx").setSearchType(SearchType.COUNT).get().getHits().getTotalHits(), equalTo((long) numDocs)); + assertThat(client().admin().indices().prepareStats("idx").setQueryCache(true).get().getTotal().getQueryCache().getMemorySizeInBytes(), greaterThan(0l)); + } + + client().admin().indices().prepareClearCache().setQueryCache(true).get(); // clean the cache + assertThat(client().admin().indices().prepareStats("idx").setQueryCache(true).get().getTotal().getQueryCache().getMemorySizeInBytes(), equalTo(0l)); + } @Test public void simpleStats() throws Exception { @@ -318,7 +513,7 @@ public void testEncodeDecodeCommonStats() throws IOException { @Test public void testFlagOrdinalOrder() { Flag[] flags = new Flag[]{Flag.Store, Flag.Indexing, Flag.Get, Flag.Search, Flag.Merge, Flag.Flush, Flag.Refresh, - Flag.FilterCache, Flag.IdCache, Flag.FieldData, Flag.Docs, Flag.Warmer, Flag.Percolate, Flag.Completion, Flag.Segments, Flag.Translog, Flag.Suggest}; + Flag.FilterCache, Flag.IdCache, Flag.FieldData, Flag.Docs, Flag.Warmer, Flag.Percolate, Flag.Completion, Flag.Segments, Flag.Translog, Flag.Suggest, Flag.QueryCache}; assertThat(flags.length, equalTo(Flag.values().length)); for (int i = 0; i < flags.length; i++) { @@ -586,6 +781,9 @@ private static void set(Flag flag, IndicesStatsRequestBuilder builder, boolean s case Suggest: builder.setSuggest(set); break; + case QueryCache: + builder.setQueryCache(set); + break; default: fail("new flag? " + flag); break; @@ -628,6 +826,8 @@ private static boolean isSet(Flag flag, CommonStats response) { return response.getTranslog() != null; case Suggest: return response.getSuggest() != null; + case QueryCache: + return response.getQueryCache() != null; default: fail("new flag? " + flag); return false; diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 5aeb24fd29324..2d9a8053e39d0 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -90,6 +90,7 @@ import org.elasticsearch.index.translog.fs.FsTranslog; import org.elasticsearch.index.translog.fs.FsTranslogFile; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.rest.RestStatus; @@ -442,6 +443,10 @@ private static ImmutableSettings.Builder setRandomSettings(Random random, Immuta // Randomly load or don't load bloom filters: builder.put(CodecService.INDEX_CODEC_BLOOM_LOAD, random.nextBoolean()); + if (random.nextBoolean()) { + builder.put(IndicesQueryCache.INDEX_QUERY_CACHE_ENABLED, random.nextBoolean()); + } + return builder; } diff --git a/src/test/java/org/elasticsearch/test/TestSearchContext.java b/src/test/java/org/elasticsearch/test/TestSearchContext.java index e98d5a0b75d75..f33aa8e1f8af5 100644 --- a/src/test/java/org/elasticsearch/test/TestSearchContext.java +++ b/src/test/java/org/elasticsearch/test/TestSearchContext.java @@ -168,7 +168,7 @@ public SearchContext queryBoost(float queryBoost) { } @Override - public long nowInMillis() { + protected long nowInMillisImpl() { return 0; } From 96aa8032e5686d52b22818972ff513b7406de8ec Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Tue, 5 Aug 2014 15:33:27 +0200 Subject: [PATCH 2/4] [query_cache] address lee comments --- .../index/settings/IndexDynamicSettingsModule.java | 2 +- .../indices/cache/query/IndicesQueryCache.java | 13 ++++++++----- .../indices/stats/IndexStatsTests.java | 5 ++--- .../test/ElasticsearchIntegrationTest.java | 2 +- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java index 0b0ff4e2826bf..99c7aa1b82c79 100644 --- a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java +++ b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java @@ -120,7 +120,7 @@ public IndexDynamicSettingsModule() { indexDynamicSettings.addDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD, Validator.TIME); indexDynamicSettings.addDynamicSetting(TranslogService.INDEX_TRANSLOG_DISABLE_FLUSH); indexDynamicSettings.addDynamicSetting(InternalIndicesWarmer.INDEX_WARMER_ENABLED); - indexDynamicSettings.addDynamicSetting(IndicesQueryCache.INDEX_QUERY_CACHE_ENABLED, Validator.BOOLEAN); + indexDynamicSettings.addDynamicSetting(IndicesQueryCache.INDEX_CACHE_QUERY_ENABLED, Validator.BOOLEAN); } public void addDynamicSettings(String... settings) { diff --git a/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java b/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java index 11dc539a18dc7..d099cd140939d 100644 --- a/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java +++ b/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java @@ -81,7 +81,8 @@ public class IndicesQueryCache extends AbstractComponent implements RemovalListe * A setting to enable or disable query caching on an index level. Its dynamic by default * since we are checking on the cluster state IndexMetaData always. */ - public static final String INDEX_QUERY_CACHE_ENABLED = "index.cache.query.enable"; + public static final String INDEX_CACHE_QUERY_ENABLED = "index.cache.query.enable"; + public static final String INDEX_CACHE_QUERY_CLEAN_INTERVAL = "index.cache.query.clean_interval"; public static final String INDICES_CACHE_QUERY_SIZE = "indices.cache.query.size"; public static final String INDICES_CACHE_QUERY_EXPIRE = "indices.cache.query.expire"; @@ -106,7 +107,7 @@ public IndicesQueryCache(Settings settings, ClusterService clusterService, Threa super(settings); this.clusterService = clusterService; this.threadPool = threadPool; - this.cleanInterval = componentSettings.getAsTime("clean_interval", TimeValue.timeValueSeconds(60)); + this.cleanInterval = settings.getAsTime(INDEX_CACHE_QUERY_CLEAN_INTERVAL, TimeValue.timeValueSeconds(60)); // this cache can be very small yet still be very effective this.size = settings.get(INDICES_CACHE_QUERY_SIZE, "1%"); this.expire = settings.getAsTime(INDICES_CACHE_QUERY_EXPIRE, null); @@ -186,13 +187,15 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) { if (index == null) { // in case we didn't yet have the cluster state, or it just got deleted return false; } - if (!index.settings().getAsBoolean(INDEX_QUERY_CACHE_ENABLED, Boolean.FALSE)) { + if (!index.settings().getAsBoolean(INDEX_CACHE_QUERY_ENABLED, Boolean.FALSE)) { return false; } // if the reader is not a directory reader, we can't get the version from it if (!(context.searcher().getIndexReader() instanceof DirectoryReader)) { return false; } + // if now in millis is used (or in the future, a more generic "isDeterministic" flag + // then we can't cache based on "now" key within the search request, as it is not deterministic if (context.nowInMillisUsed()) { return false; } @@ -424,8 +427,8 @@ private static Key buildKey(ShardSearchRequest request, SearchContext context) t } /** - * this class aim is to just provide an on the write *write* format that is the same as {@link QuerySearchResult} - * and also provide a nice wrapper for in node communication. + * this class aim is to just provide an on the wire *write* format that is the same as {@link QuerySearchResult} + * and also provide a nice wrapper for in node communication for an already constructed {@link QuerySearchResult}. */ private static class BytesQuerySearchResult extends QuerySearchResultProvider { diff --git a/src/test/java/org/elasticsearch/indices/stats/IndexStatsTests.java b/src/test/java/org/elasticsearch/indices/stats/IndexStatsTests.java index 6e06465bd6bd8..842506db7e0e7 100644 --- a/src/test/java/org/elasticsearch/indices/stats/IndexStatsTests.java +++ b/src/test/java/org/elasticsearch/indices/stats/IndexStatsTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.indices.stats; -import com.carrotsearch.randomizedtesting.annotations.Seed; import org.apache.lucene.util.Version; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.indices.stats.*; @@ -61,7 +60,7 @@ protected Settings nodeSettings(int nodeOrdinal) { //Filter/Query cache is cleaned periodically, default is 60s, so make sure it runs often. Thread.sleep for 60s is bad return ImmutableSettings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)) .put("indices.cache.filter.clean_interval", "1ms") - .put("indices.cache.query.clean_interval", "1ms") + .put(IndicesQueryCache.INDEX_CACHE_QUERY_CLEAN_INTERVAL, "1ms") .build(); } @@ -192,7 +191,7 @@ public void testClearAllCaches() throws Exception { @Test public void testQueryCache() throws Exception { - assertAcked(client().admin().indices().prepareCreate("idx").setSettings(IndicesQueryCache.INDEX_QUERY_CACHE_ENABLED, true).get()); + assertAcked(client().admin().indices().prepareCreate("idx").setSettings(IndicesQueryCache.INDEX_CACHE_QUERY_ENABLED, true).get()); ensureGreen(); int numDocs = randomIntBetween(2, 100); diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 2d9a8053e39d0..b5c4875d18283 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -444,7 +444,7 @@ private static ImmutableSettings.Builder setRandomSettings(Random random, Immuta builder.put(CodecService.INDEX_CODEC_BLOOM_LOAD, random.nextBoolean()); if (random.nextBoolean()) { - builder.put(IndicesQueryCache.INDEX_QUERY_CACHE_ENABLED, random.nextBoolean()); + builder.put(IndicesQueryCache.INDEX_CACHE_QUERY_ENABLED, random.nextBoolean()); } return builder; From 1ecc0e349b91e61ddc6f4e77de2c95b462ef9f0f Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Tue, 5 Aug 2014 15:37:24 +0200 Subject: [PATCH 3/4] [query_cache] uppercase --- .../org/elasticsearch/index/cache/query/QueryCacheStats.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/cache/query/QueryCacheStats.java b/src/main/java/org/elasticsearch/index/cache/query/QueryCacheStats.java index 7372781c28cf0..f1be938a72777 100644 --- a/src/main/java/org/elasticsearch/index/cache/query/QueryCacheStats.java +++ b/src/main/java/org/elasticsearch/index/cache/query/QueryCacheStats.java @@ -75,7 +75,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(Fields.QueryCacheStats); + builder.startObject(Fields.QUERY_CACHE_STATS); builder.byteSizeField(Fields.MEMORY_SIZE_IN_BYTES, Fields.MEMORY_SIZE, memorySize); builder.field(Fields.EVICTIONS, getEvictions()); builder.endObject(); @@ -83,7 +83,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } static final class Fields { - static final XContentBuilderString QueryCacheStats = new XContentBuilderString("query_cache"); + static final XContentBuilderString QUERY_CACHE_STATS = new XContentBuilderString("query_cache"); static final XContentBuilderString MEMORY_SIZE = new XContentBuilderString("memory_size"); static final XContentBuilderString MEMORY_SIZE_IN_BYTES = new XContentBuilderString("memory_size_in_bytes"); static final XContentBuilderString EVICTIONS = new XContentBuilderString("evictions"); From 50a3214f9b3b4351ff45fd29712950388b4e5367 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Tue, 5 Aug 2014 17:44:48 +0200 Subject: [PATCH 4/4] change to indices query cache interval --- .../elasticsearch/indices/cache/query/IndicesQueryCache.java | 4 ++-- .../java/org/elasticsearch/indices/stats/IndexStatsTests.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java b/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java index d099cd140939d..9d8b856e90b64 100644 --- a/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java +++ b/src/main/java/org/elasticsearch/indices/cache/query/IndicesQueryCache.java @@ -82,7 +82,7 @@ public class IndicesQueryCache extends AbstractComponent implements RemovalListe * since we are checking on the cluster state IndexMetaData always. */ public static final String INDEX_CACHE_QUERY_ENABLED = "index.cache.query.enable"; - public static final String INDEX_CACHE_QUERY_CLEAN_INTERVAL = "index.cache.query.clean_interval"; + public static final String INDICES_CACHE_QUERY_CLEAN_INTERVAL = "indices.cache.query.clean_interval"; public static final String INDICES_CACHE_QUERY_SIZE = "indices.cache.query.size"; public static final String INDICES_CACHE_QUERY_EXPIRE = "indices.cache.query.expire"; @@ -107,7 +107,7 @@ public IndicesQueryCache(Settings settings, ClusterService clusterService, Threa super(settings); this.clusterService = clusterService; this.threadPool = threadPool; - this.cleanInterval = settings.getAsTime(INDEX_CACHE_QUERY_CLEAN_INTERVAL, TimeValue.timeValueSeconds(60)); + this.cleanInterval = settings.getAsTime(INDICES_CACHE_QUERY_CLEAN_INTERVAL, TimeValue.timeValueSeconds(60)); // this cache can be very small yet still be very effective this.size = settings.get(INDICES_CACHE_QUERY_SIZE, "1%"); this.expire = settings.getAsTime(INDICES_CACHE_QUERY_EXPIRE, null); diff --git a/src/test/java/org/elasticsearch/indices/stats/IndexStatsTests.java b/src/test/java/org/elasticsearch/indices/stats/IndexStatsTests.java index 842506db7e0e7..d574a02071705 100644 --- a/src/test/java/org/elasticsearch/indices/stats/IndexStatsTests.java +++ b/src/test/java/org/elasticsearch/indices/stats/IndexStatsTests.java @@ -60,7 +60,7 @@ protected Settings nodeSettings(int nodeOrdinal) { //Filter/Query cache is cleaned periodically, default is 60s, so make sure it runs often. Thread.sleep for 60s is bad return ImmutableSettings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)) .put("indices.cache.filter.clean_interval", "1ms") - .put(IndicesQueryCache.INDEX_CACHE_QUERY_CLEAN_INTERVAL, "1ms") + .put(IndicesQueryCache.INDICES_CACHE_QUERY_CLEAN_INTERVAL, "1ms") .build(); }