diff --git a/x-pack/plugin/searchable-snapshots/qa/rest/src/test/resources/rest-api-spec/test/clear_cache.yml b/x-pack/plugin/searchable-snapshots/qa/rest/src/test/resources/rest-api-spec/test/clear_cache.yml new file mode 100644 index 0000000000000..76c1bc33c328d --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/qa/rest/src/test/resources/rest-api-spec/test/clear_cache.yml @@ -0,0 +1,145 @@ +--- +setup: + + - do: + indices.create: + index: docs + body: + settings: + number_of_shards: 1 + number_of_replicas: 0 + + - do: + bulk: + body: + - index: + _index: docs + _id: 1 + - field: doc + - index: + _index: docs + _id: 2 + - field: doc + - index: + _index: docs + _id: 3 + - field: other + + - do: + snapshot.create_repository: + repository: repository-fs + body: + type: fs + settings: + location: "repository-fs" + + # Remove the snapshot if a previous test failed to delete it. + # Useful for third party tests that runs the test against a real external service. + - do: + snapshot.delete: + repository: repository-fs + snapshot: snapshot + ignore: 404 + + - do: + snapshot.create: + repository: repository-fs + snapshot: snapshot + wait_for_completion: true + + - do: + indices.delete: + index: docs + + - do: + snapshot.create_repository: + repository: repository-searchable-snapshots + body: + type: searchable + settings: + delegate_type: fs + location: "repository-fs" + +--- +teardown: + + - do: + snapshot.delete: + repository: repository-fs + snapshot: snapshot + ignore: 404 + + - do: + snapshot.delete_repository: + repository: repository-fs + + - do: + snapshot.delete_repository: + repository: repository-searchable-snapshots + +--- +"Clear searchable snapshots cache": + - skip: + version: " - 7.99.99" + reason: searchable snapshots introduced in 8.0 + + - do: + catch: missing + searchable_snapshots.clear_cache: {} + + - match: { error.root_cause.0.type: "resource_not_found_exception" } + - match: { error.root_cause.0.reason: "No searchable snapshots indices found" } + + - do: + catch: missing + searchable_snapshots.clear_cache: + index: _all + + - match: { error.root_cause.0.type: "resource_not_found_exception" } + - match: { error.root_cause.0.reason: "No searchable snapshots indices found" } + + - do: + catch: missing + searchable_snapshots.clear_cache: + index: "unknown" + + - do: + indices.create: + index: non_searchable_snapshot_index + + - do: + catch: missing + searchable_snapshots.clear_cache: + index: non_* + + - match: { error.root_cause.0.type: "resource_not_found_exception" } + - match: { error.root_cause.0.reason: "No searchable snapshots indices found" } + + - do: + snapshot.restore: + repository: repository-searchable-snapshots + snapshot: snapshot + wait_for_completion: true + + - match: { snapshot.snapshot: snapshot } + - match: { snapshot.shards.failed: 0 } + - match: { snapshot.shards.successful: 1 } + + - do: + search: + rest_total_hits_as_int: true + index: docs + body: + query: + match: + field: "doc" + + - match: { hits.total: 2 } + + - do: + searchable_snapshots.clear_cache: + index: "docs" + + - match: { _shards.total: 1 } + - match: { _shards.failed: 0 } + diff --git a/x-pack/plugin/searchable-snapshots/qa/rest/src/test/resources/rest-api-spec/test/stats.yml b/x-pack/plugin/searchable-snapshots/qa/rest/src/test/resources/rest-api-spec/test/stats.yml index 4a2167129c38c..7f5a02f35e611 100644 --- a/x-pack/plugin/searchable-snapshots/qa/rest/src/test/resources/rest-api-spec/test/stats.yml +++ b/x-pack/plugin/searchable-snapshots/qa/rest/src/test/resources/rest-api-spec/test/stats.yml @@ -33,6 +33,14 @@ setup: settings: location: "repository-fs" + # Remove the snapshot if a previous test failed to delete it. + # Useful for third party tests that runs the test against a real external service. + - do: + snapshot.delete: + repository: repository-fs + snapshot: snapshot + ignore: 404 + - do: snapshot.create: repository: repository-fs @@ -133,6 +141,9 @@ teardown: searchable_snapshots.stats: index: "d*" + - match: { _shards.total: 1 } + - match: { _shards.failed: 0 } + - length: { indices: 1 } - length: { indices.docs.shards: 1 } - length: { indices.docs.shards.0: 1 } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 9b5b88d7fd847..296c1fa96ca34 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -38,9 +38,12 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xpack.searchablesnapshots.action.ClearSearchableSnapshotsCacheAction; import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsAction; +import org.elasticsearch.xpack.searchablesnapshots.action.TransportClearSearchableSnapshotsCacheAction; import org.elasticsearch.xpack.searchablesnapshots.action.TransportSearchableSnapshotsStatsAction; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; +import org.elasticsearch.xpack.searchablesnapshots.rest.RestClearSearchableSnapshotsCacheAction; import org.elasticsearch.xpack.searchablesnapshots.rest.RestSearchableSnapshotsStatsAction; import java.util.Collection; @@ -126,14 +129,20 @@ public Map getRepositories(Environment env, NamedXCo @Override public List> getActions() { - return List.of(new ActionHandler<>(SearchableSnapshotsStatsAction.INSTANCE, TransportSearchableSnapshotsStatsAction.class)); + return List.of( + new ActionHandler<>(SearchableSnapshotsStatsAction.INSTANCE, TransportSearchableSnapshotsStatsAction.class), + new ActionHandler<>(ClearSearchableSnapshotsCacheAction.INSTANCE, TransportClearSearchableSnapshotsCacheAction.class) + ); } public List getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings, IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster) { - return List.of(new RestSearchableSnapshotsStatsAction()); + return List.of( + new RestSearchableSnapshotsStatsAction(), + new RestClearSearchableSnapshotsCacheAction() + ); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java new file mode 100644 index 0000000000000..e9d5b9b20501d --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/AbstractTransportSearchableSnapshotsAction.java @@ -0,0 +1,118 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.searchablesnapshots.action; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.broadcast.BroadcastRequest; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.searchablesnapshots.InMemoryNoOpCommitDirectory; +import org.elasticsearch.xpack.searchablesnapshots.cache.CacheDirectory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotRepository.SNAPSHOT_CACHE_ENABLED_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotRepository.SNAPSHOT_DIRECTORY_FACTORY_KEY; + +public abstract class AbstractTransportSearchableSnapshotsAction + , Response extends BroadcastResponse, ShardOperationResult extends Writeable> + extends TransportBroadcastByNodeAction { + + private final IndicesService indicesService; + + AbstractTransportSearchableSnapshotsAction(String actionName, ClusterService clusterService, TransportService transportService, + ActionFilters actionFilters, IndexNameExpressionResolver resolver, + Writeable.Reader request, String executor, IndicesService indicesService) { + super(actionName, clusterService, transportService, actionFilters, resolver, request, executor); + this.indicesService = indicesService; + } + + AbstractTransportSearchableSnapshotsAction(String actionName, ClusterService clusterService, TransportService transportService, + ActionFilters actionFilters, IndexNameExpressionResolver resolver, + Writeable.Reader request, String executor, IndicesService indicesService, + boolean canTripCircuitBreaker) { + super(actionName, clusterService, transportService, actionFilters, resolver, request, executor, canTripCircuitBreaker); + this.indicesService = indicesService; + } + + @Override + protected ClusterBlockException checkGlobalBlock(ClusterState state, Request request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } + + @Override + protected ClusterBlockException checkRequestBlock(ClusterState state, Request request, String[] indices) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, indices); + } + + @Override + protected ShardsIterator shards(ClusterState state, Request request, String[] concreteIndices) { + final List searchableSnapshotIndices = new ArrayList<>(); + for (String concreteIndex : concreteIndices) { + IndexMetaData indexMetaData = state.metaData().index(concreteIndex); + if (indexMetaData != null) { + Settings indexSettings = indexMetaData.getSettings(); + if (INDEX_STORE_TYPE_SETTING.get(indexSettings).equals(SNAPSHOT_DIRECTORY_FACTORY_KEY)) { + if (SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings)) { + searchableSnapshotIndices.add(concreteIndex); + } + } + } + } + if (searchableSnapshotIndices.isEmpty()) { + throw new ResourceNotFoundException("No searchable snapshots indices found"); + } + return state.routingTable().allShards(searchableSnapshotIndices.toArray(new String[0])); + } + + @Override + protected ShardOperationResult shardOperation(Request request, ShardRouting shardRouting) throws IOException { + final IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.index()).getShard(shardRouting.id()); + final CacheDirectory cacheDirectory = unwrapCacheDirectory(indexShard.store().directory()); + assert cacheDirectory != null; + assert cacheDirectory.getShardId().equals(shardRouting.shardId()); + return executeShardOperation(request, shardRouting, cacheDirectory); + } + + protected abstract ShardOperationResult executeShardOperation(Request request, ShardRouting shardRouting, + CacheDirectory cacheDirectory) throws IOException; + + @Nullable + private static CacheDirectory unwrapCacheDirectory(Directory dir) { + while (dir != null) { + if (dir instanceof CacheDirectory) { + return (CacheDirectory) dir; + } else if (dir instanceof InMemoryNoOpCommitDirectory) { + dir = ((InMemoryNoOpCommitDirectory) dir).getRealDirectory(); + } else if (dir instanceof FilterDirectory) { + dir = ((FilterDirectory) dir).getDelegate(); + } else { + dir = null; + } + } + return null; + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/ClearSearchableSnapshotsCacheAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/ClearSearchableSnapshotsCacheAction.java new file mode 100644 index 0000000000000..4d8130227c5c7 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/ClearSearchableSnapshotsCacheAction.java @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.searchablesnapshots.action; + +import org.elasticsearch.action.ActionType; + +public class ClearSearchableSnapshotsCacheAction extends ActionType { + + public static final ClearSearchableSnapshotsCacheAction INSTANCE = new ClearSearchableSnapshotsCacheAction(); + static final String NAME = "cluster:admin/xpack/searchable_snapshots/cache/clear"; + + private ClearSearchableSnapshotsCacheAction() { + super(NAME, ClearSearchableSnapshotsCacheResponse::new); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/ClearSearchableSnapshotsCacheRequest.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/ClearSearchableSnapshotsCacheRequest.java new file mode 100644 index 0000000000000..1b86962e6cca1 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/ClearSearchableSnapshotsCacheRequest.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.searchablesnapshots.action; + +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.broadcast.BroadcastRequest; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; + +public class ClearSearchableSnapshotsCacheRequest extends BroadcastRequest { + + public ClearSearchableSnapshotsCacheRequest(StreamInput in) throws IOException { + super(in); + } + + public ClearSearchableSnapshotsCacheRequest(String... indices) { + super(indices); + } + + protected ClearSearchableSnapshotsCacheRequest(String[] indices, IndicesOptions indicesOptions) { + super(indices, indicesOptions); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/ClearSearchableSnapshotsCacheResponse.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/ClearSearchableSnapshotsCacheResponse.java new file mode 100644 index 0000000000000..18d5936d34911 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/ClearSearchableSnapshotsCacheResponse.java @@ -0,0 +1,25 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.searchablesnapshots.action; + +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; +import java.util.List; + +public class ClearSearchableSnapshotsCacheResponse extends BroadcastResponse { + + ClearSearchableSnapshotsCacheResponse(StreamInput in) throws IOException { + super(in); + } + + ClearSearchableSnapshotsCacheResponse(int totalShards, int successfulShards, int failedShards, + List shardFailures) { + super(totalShards, successfulShards, failedShards, shardFailures); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportClearSearchableSnapshotsCacheAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportClearSearchableSnapshotsCacheAction.java new file mode 100644 index 0000000000000..5a8ceab0fb7bf --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportClearSearchableSnapshotsCacheAction.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.searchablesnapshots.action; + +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction.EmptyResult; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.searchablesnapshots.cache.CacheDirectory; + +import java.io.IOException; +import java.util.List; + +public class TransportClearSearchableSnapshotsCacheAction extends AbstractTransportSearchableSnapshotsAction + { + + @Inject + public TransportClearSearchableSnapshotsCacheAction(ClusterService clusterService, TransportService transportService, + IndicesService indicesService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(ClearSearchableSnapshotsCacheAction.NAME, clusterService, transportService, actionFilters, + indexNameExpressionResolver, ClearSearchableSnapshotsCacheRequest::new, ThreadPool.Names.MANAGEMENT, indicesService, false); + } + + @Override + protected EmptyResult readShardResult(StreamInput in) { + return EmptyResult.readEmptyResultFrom(in); + } + + @Override + protected ClearSearchableSnapshotsCacheResponse newResponse(ClearSearchableSnapshotsCacheRequest request, + int totalShards, int successfulShards, int failedShards, + List responses, + List shardFailures, + ClusterState clusterState) { + return new ClearSearchableSnapshotsCacheResponse(totalShards, successfulShards, failedShards, shardFailures); + } + + @Override + protected ClearSearchableSnapshotsCacheRequest readRequestFrom(StreamInput in) throws IOException { + return new ClearSearchableSnapshotsCacheRequest(in); + } + + @Override + protected EmptyResult executeShardOperation(ClearSearchableSnapshotsCacheRequest request, ShardRouting shardRouting, + CacheDirectory cacheDirectory) { + cacheDirectory.clearCache(); + return EmptyResult.INSTANCE; + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportSearchableSnapshotsStatsAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportSearchableSnapshotsStatsAction.java index c5b0019cf661b..34cdf668536ac 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportSearchableSnapshotsStatsAction.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportSearchableSnapshotsStatsAction.java @@ -5,25 +5,14 @@ */ package org.elasticsearch.xpack.searchablesnapshots.action; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FilterDirectory; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.DefaultShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -31,41 +20,22 @@ import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotShardStats.CacheIndexInputStats; import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotShardStats.Counter; import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotShardStats.TimedCounter; -import org.elasticsearch.xpack.searchablesnapshots.InMemoryNoOpCommitDirectory; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheDirectory; import org.elasticsearch.xpack.searchablesnapshots.cache.IndexInputStats; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; -import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotRepository.SNAPSHOT_CACHE_ENABLED_SETTING; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotRepository.SNAPSHOT_DIRECTORY_FACTORY_KEY; - -public class TransportSearchableSnapshotsStatsAction extends TransportBroadcastByNodeAction { - private final IndicesService indicesService; - +public class TransportSearchableSnapshotsStatsAction extends AbstractTransportSearchableSnapshotsAction { @Inject public TransportSearchableSnapshotsStatsAction(ClusterService clusterService, TransportService transportService, IndicesService indicesService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(SearchableSnapshotsStatsAction.NAME, clusterService, transportService, actionFilters, indexNameExpressionResolver, - SearchableSnapshotsStatsRequest::new, ThreadPool.Names.MANAGEMENT); - this.indicesService = indicesService; - } - - @Override - protected ClusterBlockException checkGlobalBlock(ClusterState state, SearchableSnapshotsStatsRequest request) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); - } - - @Override - protected ClusterBlockException checkRequestBlock(ClusterState state, SearchableSnapshotsStatsRequest request, String[] indices) { - return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, indices); + SearchableSnapshotsStatsRequest::new, ThreadPool.Names.MANAGEMENT, indicesService); } @Override @@ -88,32 +58,8 @@ protected SearchableSnapshotsStatsRequest readRequestFrom(StreamInput in) throws } @Override - protected ShardsIterator shards(ClusterState state, SearchableSnapshotsStatsRequest request, String[] concreteIndices) { - final List searchableSnapshotIndices = new ArrayList<>(); - for (String concreteIndex : concreteIndices) { - IndexMetaData indexMetaData = state.metaData().index(concreteIndex); - if (indexMetaData != null) { - Settings indexSettings = indexMetaData.getSettings(); - if (INDEX_STORE_TYPE_SETTING.get(indexSettings).equals(SNAPSHOT_DIRECTORY_FACTORY_KEY)) { - if (SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings)) { - searchableSnapshotIndices.add(concreteIndex); - } - } - } - } - if (searchableSnapshotIndices.isEmpty()) { - throw new ResourceNotFoundException("No searchable snapshots indices found"); - } - return state.routingTable().allShards(searchableSnapshotIndices.toArray(new String[0])); - } - - @Override - protected SearchableSnapshotShardStats shardOperation(SearchableSnapshotsStatsRequest request, ShardRouting shardRouting) { - final IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.index()).getShard(shardRouting.id()); - final CacheDirectory cacheDirectory = unwrap(indexShard.store().directory()); - assert cacheDirectory != null; - assert cacheDirectory.getShardId().equals(shardRouting.shardId()); - + protected SearchableSnapshotShardStats executeShardOperation(SearchableSnapshotsStatsRequest request, ShardRouting shardRouting, + CacheDirectory cacheDirectory) { return new SearchableSnapshotShardStats(shardRouting, cacheDirectory.getSnapshotId(), cacheDirectory.getIndexId(), cacheDirectory.getStats().entrySet().stream() .map(entry -> toCacheIndexInputStats(entry.getKey(), entry.getValue())) @@ -137,20 +83,4 @@ private static Counter toCounter(final IndexInputStats.Counter counter) { private static TimedCounter toTimedCounter(final IndexInputStats.TimedCounter counter) { return new TimedCounter(counter.count(), counter.total(), counter.min(), counter.max(), counter.totalNanoseconds()); } - - @Nullable - private static CacheDirectory unwrap(Directory dir) { - while (dir != null) { - if (dir instanceof CacheDirectory) { - return (CacheDirectory) dir; - } else if (dir instanceof InMemoryNoOpCommitDirectory) { - dir = ((InMemoryNoOpCommitDirectory) dir).getRealDirectory(); - } else if (dir instanceof FilterDirectory) { - dir = ((FilterDirectory) dir).getDelegate(); - } else { - dir = null; - } - } - return null; - } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java index 99733bcbfc379..b1aa6bc6836a7 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java @@ -100,6 +100,10 @@ public void close() throws IOException { super.close(); // Ideally we could let the cache evict/remove cached files by itself after the // directory has been closed. + clearCache(); + } + + public void clearCache() { cacheService.removeFromCache(cacheKey -> cacheKey.belongsTo(snapshotId, indexId, shardId)); } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/rest/RestClearSearchableSnapshotsCacheAction.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/rest/RestClearSearchableSnapshotsCacheAction.java new file mode 100644 index 0000000000000..0f829e88e90dd --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/rest/RestClearSearchableSnapshotsCacheAction.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.searchablesnapshots.rest; + +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.searchablesnapshots.action.ClearSearchableSnapshotsCacheAction; +import org.elasticsearch.xpack.searchablesnapshots.action.ClearSearchableSnapshotsCacheRequest; + +import java.util.List; + +public class RestClearSearchableSnapshotsCacheAction extends BaseRestHandler { + + @Override + public List routes() { + return List.of( + new Route(RestRequest.Method.POST, "/_searchable_snapshots/cache/clear"), + new Route(RestRequest.Method.POST, "/{index}/_searchable_snapshots/cache/clear") + ); + } + + @Override + public String getName() { + return "clear_indices_searchable_snapshots_cache_action"; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) { + final ClearSearchableSnapshotsCacheRequest request = new ClearSearchableSnapshotsCacheRequest(); + request.indices(Strings.splitStringByCommaToArray(restRequest.param("index"))); + request.indicesOptions(IndicesOptions.fromRequest(restRequest, request.indicesOptions())); + return channel -> client.execute(ClearSearchableSnapshotsCacheAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java index 3449abcab1102..c14d6e5512e04 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java @@ -33,8 +33,10 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.function.Function; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.notNullValue; public abstract class AbstractSearchableSnapshotsRestTestCase extends ESRestTestCase { @@ -121,10 +123,6 @@ private void runSearchableSnapshotsTest(SearchableSnapshotsTestCaseBody testCase logger.info("deleting snapshot [{}]", snapshot); deleteSnapshot(repository, snapshot, false); - - final Map searchableSnapshotStats = searchableSnapshotStats(restoredIndexName); - assertThat("Expected searchable snapshots stats for " + numberOfShards + " shards but got " + searchableSnapshotStats, - searchableSnapshotStats.size(), equalTo(numberOfShards)); } public void testSearchResults() throws Exception { @@ -162,6 +160,54 @@ public void testCloseAndReopen() throws Exception { }); } + public void testStats() throws Exception { + runSearchableSnapshotsTest((restoredIndexName, numDocs) -> { + final Map stats = searchableSnapshotStats(restoredIndexName); + assertThat("Expected searchable snapshots stats for [" + restoredIndexName + ']', stats.size(), greaterThan(0)); + + final int nbShards = Integer.valueOf(extractValue(indexSettings(restoredIndexName), IndexMetaData.SETTING_NUMBER_OF_SHARDS)); + assertThat("Expected searchable snapshots stats for " + nbShards + " shards but got " + stats, stats.size(), equalTo(nbShards)); + }); + } + + public void testClearCache() throws Exception { + @SuppressWarnings("unchecked") + final Function, Long> sumCachedBytesWritten = stats -> stats.values().stream() + .filter(o -> o instanceof List) + .flatMap(o -> ((List) o).stream()) + .filter(o -> o instanceof Map) + .map(o -> ((Map)o).get("files")) + .filter(o -> o instanceof List) + .flatMap(o -> ((List) o).stream()) + .filter(o -> o instanceof Map) + .map(o -> ((Map)o).get("cached_bytes_written")) + .filter(o -> o instanceof Map) + .map(o -> ((Map)o).get("sum")) + .mapToLong(o -> ((Number) o).longValue()) + .sum(); + + runSearchableSnapshotsTest((restoredIndexName, numDocs) -> { + + Map searchResults = search(restoredIndexName, QueryBuilders.matchAllQuery(), Boolean.TRUE); + assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs)); + + final long bytesInCacheBeforeClear = sumCachedBytesWritten.apply(searchableSnapshotStats(restoredIndexName)); + assertThat(bytesInCacheBeforeClear, greaterThan(0L)); + + final Request request = new Request(HttpPost.METHOD_NAME, restoredIndexName + "/_searchable_snapshots/cache/clear"); + assertOK(client().performRequest(request)); + + final long bytesInCacheAfterClear = sumCachedBytesWritten.apply(searchableSnapshotStats(restoredIndexName)); + assertThat(bytesInCacheAfterClear, equalTo(bytesInCacheBeforeClear)); + + searchResults = search(restoredIndexName, QueryBuilders.matchAllQuery(), Boolean.TRUE); + assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs)); + + final long bytesInCacheAfterSearch = sumCachedBytesWritten.apply(searchableSnapshotStats(restoredIndexName)); + assertThat(bytesInCacheAfterSearch, greaterThan(bytesInCacheBeforeClear)); + }); + } + public void assertSearchResults(String indexName, int numDocs, Boolean ignoreThrottled) throws IOException { final int randomTieBreaker = randomIntBetween(1, numDocs - 1); Map searchResults; @@ -284,6 +330,13 @@ protected static Map searchableSnapshotStats(String index) throw return extractValue(responseAsMap, "indices." + index + ".shards"); } + protected static Map indexSettings(String index) throws IOException { + final Response response = client().performRequest(new Request(HttpGet.METHOD_NAME, '/' + index)); + assertThat("Failed to get settings on index [" + index + "]: " + response, + response.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus())); + return extractValue(responseAsMap(response), index + ".settings"); + } + protected static Map responseAsMap(Response response) throws IOException { final XContentType xContentType = XContentType.fromMediaTypeOrFormat(response.getEntity().getContentType().getValue()); assertThat("Unknown XContentType", xContentType, notNullValue()); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectoryTests.java new file mode 100644 index 0000000000000..57b428dd2aa2f --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectoryTests.java @@ -0,0 +1,107 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.searchablesnapshots.cache; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.test.ESTestCase; +import org.hamcrest.Matcher; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class CacheDirectoryTests extends ESTestCase { + + public void testClearCache() throws Exception { + try (CacheService cacheService = new CacheService(Settings.EMPTY)) { + cacheService.start(); + try (Directory directory = newDirectory()) { + final int nbRandomFiles = randomIntBetween(3, 10); + final Map randomFiles = new HashMap<>(nbRandomFiles); + + for (int i = 0; i < nbRandomFiles; i++) { + final String fileName = randomAlphaOfLength(10); + final byte[] fileContent = randomUnicodeOfLength(randomIntBetween(1, 100_000)).getBytes(StandardCharsets.UTF_8); + + final IndexOutput indexOutput = directory.createOutput(fileName, newIOContext(random())); + indexOutput.writeBytes(fileContent, fileContent.length); + indexOutput.close(); + randomFiles.put(fileName, fileContent.length); + } + + final Path cacheDir = createTempDir(); + try (CacheDirectory cacheDirectory = newCacheDirectory(directory, cacheService, cacheDir)) { + final byte[] buffer = new byte[1024]; + for (int i = 0; i < randomIntBetween(10, 50); i++) { + final String fileName = randomFrom(randomFiles.keySet()); + final int fileLength = randomFiles.get(fileName); + + try (IndexInput input = cacheDirectory.openInput(fileName, newIOContext(random()))) { + assertThat(input.length(), equalTo((long) fileLength)); + final int start = between(0, fileLength - 1); + final int end = between(start + 1, fileLength); + + input.seek(start); + while (input.getFilePointer() < end) { + input.readBytes(buffer, 0, Math.toIntExact(Math.min(buffer.length, end - input.getFilePointer()))); + } + } + assertListOfFiles(cacheDir, allOf(greaterThan(0), lessThanOrEqualTo(nbRandomFiles)), greaterThan(0L)); + if (randomBoolean()) { + cacheDirectory.clearCache(); + assertListOfFiles(cacheDir, equalTo(0), equalTo(0L)); + } + } + } + } + } + } + + private CacheDirectory newCacheDirectory(Directory directory, CacheService cacheService, Path cacheDir) throws IOException { + return new CacheDirectory(directory, cacheService, cacheDir, new SnapshotId("_na","_na"), new IndexId("_na", "_na"), + new ShardId("_na", "_na", 0), () -> 0L); + } + + private void assertListOfFiles(Path cacheDir, Matcher matchNumberOfFiles, Matcher matchSizeOfFiles) throws IOException { + final Map files = new HashMap<>(); + try (DirectoryStream stream = Files.newDirectoryStream(cacheDir)) { + for (Path file : stream) { + final String fileName = file.getFileName().toString(); + if (fileName.equals("write.lock") || fileName.startsWith("extra")) { + continue; + } + try { + if (Files.isRegularFile(file)) { + final BasicFileAttributes fileAttributes = Files.readAttributes(file, BasicFileAttributes.class); + files.put(fileName, fileAttributes.size()); + } + } catch (FileNotFoundException | NoSuchFileException e) { + // ignoring as the cache file might be evicted + } + } + } + assertThat("Number of files (" + files.size() + ") mismatch, got : " + files.keySet(), files.size(), matchNumberOfFiles); + assertThat("Sum of file sizes mismatch, got: " + files, files.values().stream().mapToLong(Long::longValue).sum(), matchSizeOfFiles); + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/searchable_snapshots.clear_cache.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/searchable_snapshots.clear_cache.json new file mode 100644 index 0000000000000..def34d9383bb7 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/searchable_snapshots.clear_cache.json @@ -0,0 +1,55 @@ +{ + "searchable_snapshots.clear_cache": { + "documentation": { + "url": "https://www.elastic.co/guide/en/elasticsearch/reference/current/searchable-snapshots-clear-indices.html //NORELEASE This API should be documented. We expect this API to be stable at the time it is merged in master, but in case it is not its stability should be documented appropriately." + }, + "stability": "experimental", + "url": { + "paths": [ + { + "path": "/_searchable_snapshots/cache/clear", + "methods": [ + "POST" + ] + }, + { + "path": "/{index}/_searchable_snapshots/cache/clear", + "methods": [ + "POST" + ], + "parts": { + "index": { + "type": "list", + "description": "A comma-separated list of index names" + } + } + } + ] + }, + "params": { + "ignore_unavailable": { + "type": "boolean", + "description": "Whether specified concrete indices should be ignored when unavailable (missing or closed)" + }, + "allow_no_indices": { + "type": "boolean", + "description": "Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes `_all` string or when no indices have been specified)" + }, + "expand_wildcards": { + "type": "enum", + "options": [ + "open", + "closed", + "none", + "all" + ], + "default": "open", + "description": "Whether to expand wildcard expression to concrete indices that are open, closed or both." + }, + "index": { + "type": "list", + "description": "A comma-separated list of index name to limit the operation" + } + } + } +}