Skip to content

Query Cache: Support shard level query response caching #7161

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.indices.cache.clear;

import org.elasticsearch.Version;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -34,6 +35,7 @@ public class ClearIndicesCacheRequest extends BroadcastOperationRequest<ClearInd
private boolean fieldDataCache = false;
private boolean idCache = false;
private boolean recycler = false;
private boolean queryCache = false;
private String[] fields = null;
private String[] filterKeys = null;

Expand All @@ -54,6 +56,15 @@ public ClearIndicesCacheRequest filterCache(boolean filterCache) {
return this;
}

public boolean queryCache() {
return this.queryCache;
}

public ClearIndicesCacheRequest queryCache(boolean queryCache) {
this.queryCache = queryCache;
return this;
}

public boolean fieldDataCache() {
return this.fieldDataCache;
}
Expand Down Expand Up @@ -107,6 +118,9 @@ public void readFrom(StreamInput in) throws IOException {
recycler = in.readBoolean();
fields = in.readStringArray();
filterKeys = in.readStringArray();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
queryCache = in.readBoolean();
}
}

public void writeTo(StreamOutput out) throws IOException {
Expand All @@ -117,7 +131,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(recycler);
out.writeStringArrayNullable(fields);
out.writeStringArrayNullable(filterKeys);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeBoolean(queryCache);
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public ClearIndicesCacheRequestBuilder setFilterCache(boolean filterCache) {
return this;
}

public ClearIndicesCacheRequestBuilder setQueryCache(boolean queryCache) {
request.queryCache(queryCache);
return this;
}

public ClearIndicesCacheRequestBuilder setFieldDataCache(boolean fieldDataCache) {
request.fieldDataCache(fieldDataCache);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.indices.cache.clear;

import org.elasticsearch.Version;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -34,6 +35,7 @@ class ShardClearIndicesCacheRequest extends BroadcastShardOperationRequest {
private boolean fieldDataCache = false;
private boolean idCache = false;
private boolean recycler;
private boolean queryCache = false;

private String[] fields = null;
private String[] filterKeys = null;
Expand All @@ -49,12 +51,17 @@ public ShardClearIndicesCacheRequest(String index, int shardId, ClearIndicesCach
fields = request.fields();
filterKeys = request.filterKeys();
recycler = request.recycler();
queryCache = request.queryCache();
}

public boolean filterCache() {
return filterCache;
}

public boolean queryCache() {
return queryCache;
}

public boolean fieldDataCache() {
return this.fieldDataCache;
}
Expand Down Expand Up @@ -89,6 +96,9 @@ public void readFrom(StreamInput in) throws IOException {
recycler = in.readBoolean();
fields = in.readStringArray();
filterKeys = in.readStringArray();
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
queryCache = in.readBoolean();
}
}

@Override
Expand All @@ -100,5 +110,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(recycler);
out.writeStringArrayNullable(fields);
out.writeStringArrayNullable(filterKeys);
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeBoolean(queryCache);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand All @@ -53,16 +55,16 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio

private final IndicesService indicesService;
private final IndicesTermsFilterCache termsFilterCache;
private final CacheRecycler cacheRecycler;
private final IndicesQueryCache indicesQueryCache;

@Inject
public TransportClearIndicesCacheAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService, IndicesTermsFilterCache termsFilterCache,
CacheRecycler cacheRecycler, ActionFilters actionFilters) {
IndicesQueryCache indicesQueryCache, ActionFilters actionFilters) {
super(settings, ClearIndicesCacheAction.NAME, threadPool, clusterService, transportService, actionFilters);
this.indicesService = indicesService;
this.termsFilterCache = termsFilterCache;
this.cacheRecycler = cacheRecycler;
this.indicesQueryCache = indicesQueryCache;
}

@Override
Expand Down Expand Up @@ -116,6 +118,7 @@ protected ShardClearIndicesCacheResponse newShardResponse() {
protected ShardClearIndicesCacheResponse shardOperation(ShardClearIndicesCacheRequest request) throws ElasticsearchException {
IndexService service = indicesService.indexService(request.index());
if (service != null) {
IndexShard shard = service.shard(request.shardId());
// we always clear the query cache
service.cache().queryParserCache().clear();
boolean clearedAtLeastOne = false;
Expand All @@ -139,6 +142,10 @@ protected ShardClearIndicesCacheResponse shardOperation(ShardClearIndicesCacheRe
}
}
}
if (request.queryCache()) {
clearedAtLeastOne = true;
indicesQueryCache.clear(shard);
}
if (request.recycler()) {
logger.debug("Clear CacheRecycler on index [{}]", service.index());
clearedAtLeastOne = true;
Expand All @@ -158,6 +165,7 @@ protected ShardClearIndicesCacheResponse shardOperation(ShardClearIndicesCacheRe
service.cache().clear("api");
service.fieldData().clear();
termsFilterCache.clear("api");
indicesQueryCache.clear(shard);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.cache.filter.FilterCacheStats;
import org.elasticsearch.index.cache.id.IdCacheStats;
import org.elasticsearch.index.cache.query.QueryCacheStats;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.flush.FlushStats;
Expand Down Expand Up @@ -111,6 +112,9 @@ public CommonStats(CommonStatsFlags flags) {
case Suggest:
suggest = new SuggestStats();
break;
case QueryCache:
queryCache = new QueryCacheStats();
break;
default:
throw new IllegalStateException("Unknown Flag: " + flag);
}
Expand Down Expand Up @@ -174,6 +178,9 @@ public CommonStats(IndexShard indexShard, CommonStatsFlags flags) {
case Suggest:
suggest = indexShard.suggestStats();
break;
case QueryCache:
queryCache = indexShard.queryCache().stats();
break;
default:
throw new IllegalStateException("Unknown Flag: " + flag);
}
Expand Down Expand Up @@ -231,6 +238,9 @@ public CommonStats(IndexShard indexShard, CommonStatsFlags flags) {
@Nullable
public SuggestStats suggest;

@Nullable
public QueryCacheStats queryCache;

public void add(CommonStats stats) {
if (docs == null) {
if (stats.getDocs() != null) {
Expand Down Expand Up @@ -370,6 +380,14 @@ public void add(CommonStats stats) {
} else {
suggest.add(stats.getSuggest());
}
if (queryCache == null) {
if (stats.getQueryCache() != null) {
queryCache = new QueryCacheStats();
queryCache.add(stats.getQueryCache());
}
} else {
queryCache.add(stats.getQueryCache());
}
}

@Nullable
Expand Down Expand Up @@ -457,6 +475,11 @@ public SuggestStats getSuggest() {
return suggest;
}

@Nullable
public QueryCacheStats getQueryCache() {
return queryCache;
}

public static CommonStats readCommonStats(StreamInput in) throws IOException {
CommonStats stats = new CommonStats();
stats.readFrom(in);
Expand Down Expand Up @@ -514,6 +537,9 @@ public void readFrom(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
suggest = in.readOptionalStreamable(new SuggestStats());
}
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
queryCache = in.readOptionalStreamable(new QueryCacheStats());
}
}

@Override
Expand Down Expand Up @@ -612,6 +638,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeOptionalStreamable(suggest);
}
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeOptionalStreamable(queryCache);
}
}

// note, requires a wrapping object
Expand Down Expand Up @@ -668,6 +697,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (suggest != null) {
suggest.toXContent(builder, params);
}
if (queryCache != null) {
queryCache.toXContent(builder, params);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ public static enum Flag {
Completion("completion"),
Segments("segments"),
Translog("translog"),
Suggest("suggest");
Suggest("suggest"),
QueryCache("query_cache");

private final String restName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,15 @@ public boolean suggest() {
return flags.isSet(Flag.Suggest);
}

public IndicesStatsRequest queryCache(boolean queryCache) {
flags.set(Flag.QueryCache, queryCache);
return this;
}

public boolean queryCache() {
return flags.isSet(Flag.QueryCache);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ public IndicesStatsRequestBuilder setSuggest(boolean suggest) {
return this;
}

public IndicesStatsRequestBuilder setQueryCache(boolean queryCache) {
request.queryCache(queryCache);
return this;
}

@Override
protected void doExecute(ActionListener<IndicesStatsResponse> listener) {
client.stats(request, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ protected ShardStats shardOperation(IndexShardStatsRequest request) throws Elast
if (request.request.suggest()) {
flags.set(CommonStatsFlags.Flag.Suggest);
}
if (request.request.queryCache()) {
flags.set(CommonStatsFlags.Flag.QueryCache);
}

return new ShardStats(indexShard, flags);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;

import static org.elasticsearch.action.search.type.TransportSearchHelper.buildScrollId;
Expand All @@ -55,7 +56,7 @@ protected void doExecute(SearchRequest searchRequest, ActionListener<SearchRespo
new AsyncAction(searchRequest, listener).start();
}

private class AsyncAction extends BaseAsyncAction<QuerySearchResult> {
private class AsyncAction extends BaseAsyncAction<QuerySearchResultProvider> {

private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
super(request, listener);
Expand All @@ -67,7 +68,7 @@ protected String firstPhaseName() {
}

@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<QuerySearchResult> listener) {
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<QuerySearchResultProvider> listener) {
searchService.sendExecuteQuery(node, request, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -60,7 +61,7 @@ protected void doExecute(SearchRequest searchRequest, ActionListener<SearchRespo
new AsyncAction(searchRequest, listener).start();
}

private class AsyncAction extends BaseAsyncAction<QuerySearchResult> {
private class AsyncAction extends BaseAsyncAction<QuerySearchResultProvider> {

final AtomicArray<FetchSearchResult> fetchResults;
final AtomicArray<IntArrayList> docIdsToLoad;
Expand All @@ -77,7 +78,7 @@ protected String firstPhaseName() {
}

@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<QuerySearchResult> listener) {
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<QuerySearchResultProvider> listener) {
searchService.sendExecuteQuery(node, request, listener);
}

Expand All @@ -97,9 +98,9 @@ protected void moveToSecondPhase() throws Exception {
);
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = firstResults.get(entry.index);
QuerySearchResultProvider queryResult = firstResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
}
Expand Down
Loading