diff --git a/core/src/main/java/org/elasticsearch/index/IndexModule.java b/core/src/main/java/org/elasticsearch/index/IndexModule.java index f806c210f0014..38201fc458fed 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexModule.java +++ b/core/src/main/java/org/elasticsearch/index/IndexModule.java @@ -39,7 +39,6 @@ import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.index.shard.IndexingOperationListener; import org.elasticsearch.index.shard.SearchOperationListener; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.similarity.BM25SimilarityProvider; import org.elasticsearch.index.similarity.SimilarityProvider; import org.elasticsearch.index.similarity.SimilarityService; @@ -59,6 +58,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; @@ -108,8 +108,7 @@ public final class IndexModule { private final IndexSettings indexSettings; private final AnalysisRegistry analysisRegistry; - // pkg private so tests can mock - final SetOnce engineFactory = new SetOnce<>(); + private final EngineFactory engineFactory; private SetOnce indexSearcherWrapper = new SetOnce<>(); private final Set indexEventListeners = new HashSet<>(); private final Map similarities = new HashMap<>(); @@ -119,9 +118,18 @@ public final class IndexModule { private final List indexOperationListeners = new ArrayList<>(); private final AtomicBoolean frozen = new AtomicBoolean(false); - public IndexModule(IndexSettings indexSettings, AnalysisRegistry analysisRegistry) { + /** + * Construct the index module for the index with the specified index settings. The index module contains extension points for plugins + * via {@link org.elasticsearch.plugins.PluginsService#onIndexModule(IndexModule)}. + * + * @param indexSettings the index settings + * @param analysisRegistry the analysis registry + * @param engineFactory the engine factory + */ + public IndexModule(final IndexSettings indexSettings, final AnalysisRegistry analysisRegistry, final EngineFactory engineFactory) { this.indexSettings = indexSettings; this.analysisRegistry = analysisRegistry; + this.engineFactory = Objects.requireNonNull(engineFactory); this.searchOperationListeners.add(new SearchSlowLog(indexSettings)); this.indexOperationListeners.add(new IndexingSlowLog(indexSettings)); } @@ -162,6 +170,15 @@ public Index getIndex() { return indexSettings.getIndex(); } + /** + * The engine factory provided during construction of this index module. + * + * @return the engine factory + */ + EngineFactory getEngineFactory() { + return engineFactory; + } + /** * Adds an {@link IndexEventListener} for this index. All listeners added here * are maintained for the entire index lifecycle on this node. Once an index is closed or deleted these @@ -363,7 +380,7 @@ public IndexService newIndexService( } return new IndexService(indexSettings, environment, xContentRegistry, new SimilarityService(indexSettings, scriptService, similarities), - shardStoreDeleter, analysisRegistry, engineFactory.get(), circuitBreakerService, bigArrays, threadPool, scriptService, + shardStoreDeleter, analysisRegistry, engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService, client, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry, indicesFieldDataCache, searchOperationListeners, indexOperationListeners, namedWriteableRegistry); } diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index d192e8781d6da..22aaaec1e72c0 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -137,7 +137,7 @@ public IndexService( SimilarityService similarityService, ShardStoreDeleter shardStoreDeleter, AnalysisRegistry registry, - @Nullable EngineFactory engineFactory, + EngineFactory engineFactory, CircuitBreakerService circuitBreakerService, BigArrays bigArrays, ThreadPool threadPool, @@ -185,7 +185,7 @@ public IndexService( this.warmer = new IndexWarmer(indexSettings.getSettings(), threadPool, indexFieldData, bitsetFilterCache.createListener(threadPool)); this.indexCache = new IndexCache(indexSettings, queryCache, bitsetFilterCache); - this.engineFactory = engineFactory; + this.engineFactory = Objects.requireNonNull(engineFactory); // initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE this.searcherWrapper = wrapperFactory.newWrapper(this); this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners); @@ -657,9 +657,9 @@ public interface ShardStoreDeleter { void addPendingDelete(ShardId shardId, IndexSettings indexSettings); } - final EngineFactory getEngineFactory() { + public final EngineFactory getEngineFactory() { return engineFactory; - } // pkg private for testing + } final IndexSearcherWrapper getSearcherWrapper() { return searcherWrapper; diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 9f0f6b84b459f..76ebba246d66c 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -80,7 +80,6 @@ import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngine; -import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.engine.RefreshFailedEngineException; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.engine.SegmentsStats; @@ -195,7 +194,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl protected volatile IndexShardState state; protected volatile long primaryTerm; protected final AtomicReference currentEngineReference = new AtomicReference<>(); - protected final EngineFactory engineFactory; + final EngineFactory engineFactory; private final IndexingOperationListener indexingOperationListeners; private final Runnable globalCheckpointSyncer; @@ -265,7 +264,7 @@ public IndexShard( this.warmer = warmer; this.similarityService = similarityService; Objects.requireNonNull(store, "Store must be provided to the index shard"); - this.engineFactory = engineFactory == null ? new InternalEngineFactory() : engineFactory; + this.engineFactory = Objects.requireNonNull(engineFactory); this.store = store; this.indexSortSupplier = indexSortSupplier; this.indexEventListener = indexEventListener; diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesService.java b/core/src/main/java/org/elasticsearch/indices/IndicesService.java index caffa1b7befda..bfe69097b7bac 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -45,6 +45,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -78,6 +79,8 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.cache.request.ShardRequestCache; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.get.GetStats; @@ -101,6 +104,7 @@ import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.plugins.EnginePlugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.script.ScriptService; @@ -115,10 +119,14 @@ import java.io.IOException; import java.nio.file.Files; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -169,6 +177,7 @@ public class IndicesService extends AbstractLifecycleComponent private final IndicesRequestCache indicesRequestCache; private final IndicesQueryCache indicesQueryCache; private final MetaStateService metaStateService; + private final Collection enginePlugins; @Override protected void doStart() { @@ -180,7 +189,8 @@ public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvi AnalysisRegistry analysisRegistry, IndexNameExpressionResolver indexNameExpressionResolver, MapperRegistry mapperRegistry, NamedWriteableRegistry namedWriteableRegistry, ThreadPool threadPool, IndexScopedSettings indexScopedSettings, CircuitBreakerService circuitBreakerService, BigArrays bigArrays, - ScriptService scriptService, Client client, MetaStateService metaStateService) { + ScriptService scriptService, Client client, MetaStateService metaStateService, + Collection enginePlugins) { super(settings); this.threadPool = threadPool; this.pluginsService = pluginsService; @@ -211,6 +221,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon this.cleanInterval = INDICES_CACHE_CLEAN_INTERVAL_SETTING.get(settings); this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, indicesRequestCache, logger, threadPool, this.cleanInterval); this.metaStateService = metaStateService; + this.enginePlugins = enginePlugins; } @Override @@ -437,7 +448,7 @@ private synchronized IndexService createIndexService(final String reason, idxSettings.getNumberOfReplicas(), reason); - final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry); + final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings)); for (IndexingOperationListener operationListener : indexingOperationListeners) { indexModule.addIndexOperationListener(operationListener); } @@ -461,6 +472,34 @@ private synchronized IndexService createIndexService(final String reason, ); } + private EngineFactory getEngineFactory(final IndexSettings idxSettings) { + final List>> engineFactories = + enginePlugins + .stream() + .map(p -> Tuple.tuple(p, p.getEngineFactory(idxSettings))) + .filter(t -> Objects.requireNonNull(t.v2()).isPresent()) + .collect(Collectors.toList()); + if (engineFactories.isEmpty()) { + return new InternalEngineFactory(); + } else if (engineFactories.size() == 1) { + assert engineFactories.get(0).v2().isPresent(); + return engineFactories.get(0).v2().get(); + } else { + final String message = String.format( + Locale.ROOT, + "multiple plugins provided engine factories for %s: %s", + idxSettings.getIndex(), + engineFactories + .stream() + .map(t -> { + assert t.v2().isPresent(); + return "[" + t.v1().getClass().getName() + "/" + t.v2().get().getClass().getName() + "]"; + }) + .collect(Collectors.joining(","))); + throw new IllegalStateException(message); + } + } + /** * creates a new mapper service for the given index, in order to do administrative work like mapping updates. * This *should not* be used for document parsing. Doing so will result in an exception. @@ -469,7 +508,7 @@ private synchronized IndexService createIndexService(final String reason, */ public synchronized MapperService createIndexMapperService(IndexMetaData indexMetaData) throws IOException { final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexScopeSetting); - final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry); + final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings)); pluginsService.onIndexModule(indexModule); return indexModule.newIndexMapperService(xContentRegistry, mapperRegistry, scriptService); } diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index a7669f3ff54d5..3664ddc62fe97 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -100,7 +100,6 @@ import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.cluster.IndicesClusterStateService; -import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.indices.recovery.PeerRecoverySourceService; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoverySettings; @@ -112,6 +111,7 @@ import org.elasticsearch.plugins.AnalysisPlugin; import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.DiscoveryPlugin; +import org.elasticsearch.plugins.EnginePlugin; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.plugins.MetaDataUpgrader; @@ -385,11 +385,15 @@ protected Node(final Environment environment, Collection .flatMap(Function.identity()).collect(toList())); modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), xContentRegistry)); final MetaStateService metaStateService = new MetaStateService(settings, nodeEnvironment, xContentRegistry); - final IndicesService indicesService = new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, - analysisModule.getAnalysisRegistry(), - clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry, - threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptModule.getScriptService(), - client, metaStateService); + + // collect engine factory providers per plugin + final Collection enginePlugins = pluginsService.filterPlugins(EnginePlugin.class); + + final IndicesService indicesService = + new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, analysisModule.getAnalysisRegistry(), + clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry, + threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, + scriptModule.getScriptService(), client, metaStateService, enginePlugins); Collection pluginComponents = pluginsService.filterPlugins(Plugin.class).stream() .flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService, diff --git a/core/src/main/java/org/elasticsearch/plugins/EnginePlugin.java b/core/src/main/java/org/elasticsearch/plugins/EnginePlugin.java new file mode 100644 index 0000000000000..593129458717d --- /dev/null +++ b/core/src/main/java/org/elasticsearch/plugins/EnginePlugin.java @@ -0,0 +1,42 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugins; + +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.EngineFactory; + +import java.util.Optional; + +/** + * A plugin that provides alternative engine implementations. + */ +public interface EnginePlugin { + + /** + * When an index is created this method is invoked for each engine plugin. Engine plugins can inspect the index settings to determine + * whether or not to provide an engine factory for the given index. A plugin that is not overriding the default engine should return + * {@link Optional#empty()}. If multiple plugins return an engine factory for a given index the index will not be created and an + * {@link IllegalStateException} will be thrown during index creation. + * + * @return an optional engine factory + */ + Optional getEngineFactory(IndexSettings indexSettings); + +} diff --git a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java index 609ed02eb2e89..1caf0fa135e60 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -48,6 +48,7 @@ import org.elasticsearch.index.cache.query.QueryCache; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.engine.InternalEngineTests; import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.mapper.ParsedDocument; @@ -144,13 +145,12 @@ private IndexService newIndexService(IndexModule module) throws IOException { } public void testWrapperIsBound() throws IOException { - IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry); + IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new MockEngineFactory(AssertingDirectoryReader.class)); module.setSearcherWrapper((s) -> new Wrapper()); - module.engineFactory.set(new MockEngineFactory(AssertingDirectoryReader.class)); IndexService indexService = newIndexService(module); assertTrue(indexService.getSearcherWrapper() instanceof Wrapper); - assertSame(indexService.getEngineFactory(), module.engineFactory.get()); + assertSame(indexService.getEngineFactory(), module.getEngineFactory()); indexService.close("simon says", false); } @@ -163,7 +163,7 @@ public void testRegisterIndexStore() throws IOException { .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), "foo_store") .build(); IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings); - IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry); + IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory()); module.addIndexStore("foo_store", FooStore::new); try { module.addIndexStore("foo_store", FooStore::new); @@ -187,7 +187,7 @@ public void beforeIndexRemoved(IndexService indexService, IndexRemovalReason rea } }; IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings); - IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry); + IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory()); module.addIndexEventListener(eventListener); IndexService indexService = newIndexService(module); IndexSettings x = indexService.getIndexSettings(); @@ -201,7 +201,8 @@ public void beforeIndexRemoved(IndexService indexService, IndexRemovalReason rea public void testListener() throws IOException { Setting booleanSetting = Setting.boolSetting("index.foo.bar", false, Property.Dynamic, Property.IndexScope); - IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings, booleanSetting), emptyAnalysisRegistry); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings, booleanSetting); + IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory()); Setting booleanSetting2 = Setting.boolSetting("index.foo.bar.baz", false, Property.Dynamic, Property.IndexScope); AtomicBoolean atomicBoolean = new AtomicBoolean(false); module.addSettingsUpdateConsumer(booleanSetting, atomicBoolean::set); @@ -220,7 +221,8 @@ public void testListener() throws IOException { } public void testAddIndexOperationListener() throws IOException { - IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), emptyAnalysisRegistry); + IndexModule module = + new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), emptyAnalysisRegistry, new InternalEngineFactory()); AtomicBoolean executed = new AtomicBoolean(false); IndexingOperationListener listener = new IndexingOperationListener() { @Override @@ -250,7 +252,8 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) { } public void testAddSearchOperationListener() throws IOException { - IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), emptyAnalysisRegistry); + IndexModule module = + new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), emptyAnalysisRegistry, new InternalEngineFactory()); AtomicBoolean executed = new AtomicBoolean(false); SearchOperationListener listener = new SearchOperationListener() { @@ -277,13 +280,14 @@ public void onNewContext(SearchContext context) { } public void testAddSimilarity() throws IOException { - Settings indexSettings = Settings.builder() + Settings settings = Settings.builder() .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put("index.similarity.my_similarity.type", "test_similarity") .put("index.similarity.my_similarity.key", "there is a key") .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) .build(); - IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), emptyAnalysisRegistry); + IndexModule module = + new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings), emptyAnalysisRegistry, new InternalEngineFactory()); module.addSimilarity("test_similarity", (string, providerSettings, indexLevelSettings, scriptService) -> new SimilarityProvider() { @Override public String name() { @@ -306,7 +310,8 @@ public Similarity get() { } public void testFrozen() { - IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), emptyAnalysisRegistry); + IndexModule module = + new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), emptyAnalysisRegistry, new InternalEngineFactory()); module.freeze(); String msg = "Can't modify IndexModule once the index service has been created"; assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addSearchOperationListener(null)).getMessage()); @@ -319,32 +324,35 @@ public void testFrozen() { } public void testSetupUnknownSimilarity() throws IOException { - Settings indexSettings = Settings.builder() + Settings settings = Settings.builder() .put("index.similarity.my_similarity.type", "test_similarity") .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) .build(); - IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), emptyAnalysisRegistry); + IndexModule module = + new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings), emptyAnalysisRegistry, new InternalEngineFactory()); Exception ex = expectThrows(IllegalArgumentException.class, () -> newIndexService(module)); assertEquals("Unknown Similarity type [test_similarity] for [my_similarity]", ex.getMessage()); } public void testSetupWithoutType() throws IOException { - Settings indexSettings = Settings.builder() + Settings settings = Settings.builder() .put("index.similarity.my_similarity.foo", "bar") .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .build(); - IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), emptyAnalysisRegistry); + IndexModule module = + new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings), emptyAnalysisRegistry, new InternalEngineFactory()); Exception ex = expectThrows(IllegalArgumentException.class, () -> newIndexService(module)); assertEquals("Similarity [my_similarity] must have an associated type", ex.getMessage()); } public void testForceCustomQueryCache() throws IOException { - Settings indexSettings = Settings.builder() + Settings settings = Settings.builder() .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); - IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), emptyAnalysisRegistry); + IndexModule module = + new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings), emptyAnalysisRegistry, new InternalEngineFactory()); module.forceQueryCacheProvider((a, b) -> new CustomQueryCache()); expectThrows(AlreadySetException.class, () -> module.forceQueryCacheProvider((a, b) -> new CustomQueryCache())); IndexService indexService = newIndexService(module); @@ -353,21 +361,23 @@ public void testForceCustomQueryCache() throws IOException { } public void testDefaultQueryCacheImplIsSelected() throws IOException { - Settings indexSettings = Settings.builder() + Settings settings = Settings.builder() .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); - IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), emptyAnalysisRegistry); + IndexModule module = + new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings), emptyAnalysisRegistry, new InternalEngineFactory()); IndexService indexService = newIndexService(module); assertTrue(indexService.cache().query() instanceof IndexQueryCache); indexService.close("simon says", false); } public void testDisableQueryCacheHasPrecedenceOverForceQueryCache() throws IOException { - Settings indexSettings = Settings.builder() + Settings settings = Settings.builder() .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); - IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), emptyAnalysisRegistry); + IndexModule module = + new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings), emptyAnalysisRegistry, new InternalEngineFactory()); module.forceQueryCacheProvider((a, b) -> new CustomQueryCache()); IndexService indexService = newIndexService(module); assertTrue(indexService.cache().query() instanceof DisabledQueryCache); diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 7e8949cd15fbf..f6ca3c40a2a58 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -57,6 +57,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; @@ -156,7 +157,7 @@ private ShardRouting createShardRouting(String nodeId, boolean primary) { } protected EngineFactory getEngineFactory(ShardRouting routing) { - return null; + return new InternalEngineFactory(); } public int indexDocs(final int numOfDoc) throws Exception { diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 96f6aa6d47acb..d5ca1ac8974bd 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.engine.InternalEngineTests; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.shard.IndexShard; @@ -336,7 +337,7 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { if (routing.primary()) { return primaryEngineFactory; } else { - return null; + return new InternalEngineFactory(); } } }) { @@ -427,7 +428,7 @@ public void testCheckpointsAndMarkingInSync() throws Exception { @Override protected EngineFactory getEngineFactory(final ShardRouting routing) { if (routing.primary()) { - return null; + return new InternalEngineFactory(); } else { return replicaEngineFactory; } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index a01f0230f22ca..58dc974018d5b 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -72,6 +72,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.fielddata.FieldDataStats; import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.fielddata.IndexFieldDataCache; @@ -775,7 +776,8 @@ public void testGlobalCheckpointSync() throws IOException { .build(); final IndexMetaData.Builder indexMetadata = IndexMetaData.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1); final AtomicBoolean synced = new AtomicBoolean(); - final IndexShard primaryShard = newShard(shardRouting, indexMetadata.build(), null, null, () -> { synced.set(true); }); + final IndexShard primaryShard = + newShard(shardRouting, indexMetadata.build(), null, new InternalEngineFactory(), () -> synced.set(true)); // add a replica recoverShardFromStore(primaryShard); final IndexShard replicaShard = newShard(shardId, false); @@ -1784,7 +1786,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { closeShards(shard); IndexShard newShard = newShard( ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE), - shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null, () -> {}); + shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, new InternalEngineFactory(), () -> {}); recoverShardFromStore(newShard); @@ -1930,7 +1932,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { closeShards(shard); IndexShard newShard = newShard( ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE), - shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null, () -> {}); + shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, new InternalEngineFactory(), () -> {}); recoverShardFromStore(newShard); diff --git a/core/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java b/core/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java index 267253ff12a94..96181e76ba790 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java +++ b/core/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.NodeEnvironment; @@ -41,6 +42,11 @@ import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.Mapper; import org.elasticsearch.index.mapper.MapperService; @@ -51,10 +57,12 @@ import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.similarity.BM25SimilarityProvider; import org.elasticsearch.indices.IndicesService.ShardDeletionCheckResult; +import org.elasticsearch.plugins.EnginePlugin; import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.test.hamcrest.RegexMatcher; import java.io.IOException; import java.util.ArrayList; @@ -63,14 +71,18 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.mockito.Mockito.mock; @@ -88,9 +100,70 @@ public NodeEnvironment getNodeEnvironment() { @Override protected Collection> getPlugins() { - ArrayList> plugins = new ArrayList<>(super.getPlugins()); - plugins.add(TestPlugin.class); - return plugins; + return Stream.concat( + super.getPlugins().stream(), + Stream.of(TestPlugin.class, FooEnginePlugin.class, BarEnginePlugin.class)) + .collect(Collectors.toList()); + } + + public static class FooEnginePlugin extends Plugin implements EnginePlugin { + + static class FooEngineFactory implements EngineFactory { + + @Override + public Engine newReadWriteEngine(final EngineConfig config) { + return new InternalEngine(config); + } + + } + + private static final Setting FOO_INDEX_SETTING = + Setting.boolSetting("index.foo_index", false, Setting.Property.IndexScope); + + @Override + public List> getSettings() { + return Collections.singletonList(FOO_INDEX_SETTING); + } + + @Override + public Optional getEngineFactory(final IndexSettings indexSettings) { + if (FOO_INDEX_SETTING.get(indexSettings.getSettings())) { + return Optional.of(new FooEngineFactory()); + } else { + return Optional.empty(); + } + } + + } + + public static class BarEnginePlugin extends Plugin implements EnginePlugin { + + static class BarEngineFactory implements EngineFactory { + + @Override + public Engine newReadWriteEngine(final EngineConfig config) { + return new InternalEngine(config); + } + + } + + private static final Setting BAR_INDEX_SETTING = + Setting.boolSetting("index.bar_index", false, Setting.Property.IndexScope); + + @Override + public List> getSettings() { + return Collections.singletonList(BAR_INDEX_SETTING); + } + + @Override + public Optional getEngineFactory(final IndexSettings indexSettings) { + if (BAR_INDEX_SETTING.get(indexSettings.getSettings())) { + return Optional.of(new BarEngineFactory()); + } else { + return Optional.empty(); + } + } + } public static class TestPlugin extends Plugin implements MapperPlugin { @@ -110,7 +183,6 @@ public void onIndexModule(IndexModule indexModule) { } } - @Override protected boolean resetNodeAfterTest() { return true; @@ -431,4 +503,57 @@ public void testStatsByShardDoesNotDieFromExpectedExceptions() { assertThat("index not defined", indexStats.containsKey(index), equalTo(true)); assertThat("unexpected shard stats", indexStats.get(index), equalTo(shardStats)); } + + public void testGetEngineFactory() throws IOException { + final IndicesService indicesService = getIndicesService(); + + final Boolean[] values = new Boolean[] { true, false, null }; + for (final Boolean value : values) { + final String indexName = "foo-" + value; + final Index index = new Index(indexName, UUIDs.randomBase64UUID()); + final Settings.Builder builder = Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, index.getUUID()); + if (value != null) { + builder.put(FooEnginePlugin.FOO_INDEX_SETTING.getKey(), value); + } + + final IndexMetaData indexMetaData = new IndexMetaData.Builder(index.getName()) + .settings(builder.build()) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + final IndexService indexService = indicesService.createIndex(indexMetaData, Collections.emptyList()); + if (value != null && value) { + assertThat(indexService.getEngineFactory(), instanceOf(FooEnginePlugin.FooEngineFactory.class)); + } else { + assertThat(indexService.getEngineFactory(), instanceOf(InternalEngineFactory.class)); + } + } + } + + public void testConflictingEngineFactories() throws IOException { + final String indexName = "foobar"; + final Index index = new Index(indexName, UUIDs.randomBase64UUID()); + final Settings settings = Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, index.getUUID()) + .put(FooEnginePlugin.FOO_INDEX_SETTING.getKey(), true) + .put(BarEnginePlugin.BAR_INDEX_SETTING.getKey(), true) + .build(); + final IndexMetaData indexMetaData = new IndexMetaData.Builder(index.getName()) + .settings(settings) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + final IndicesService indicesService = getIndicesService(); + final IllegalStateException e = + expectThrows(IllegalStateException.class, () -> indicesService.createIndex(indexMetaData, Collections.emptyList())); + final String pattern = + ".*multiple plugins provided engine factories for \\[foobar/.*\\]: " + + "\\[.*FooEnginePlugin/.*FooEngineFactory\\],\\[.*BarEnginePlugin/.*BarEngineFactory\\].*"; + assertThat(e, hasToString(new RegexMatcher(pattern))); + } + } diff --git a/test/framework/src/main/java/org/elasticsearch/index/MockEngineFactoryPlugin.java b/test/framework/src/main/java/org/elasticsearch/index/MockEngineFactoryPlugin.java index c6065f7e5831a..be00d9f5ada2d 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/MockEngineFactoryPlugin.java +++ b/test/framework/src/main/java/org/elasticsearch/index/MockEngineFactoryPlugin.java @@ -20,25 +20,23 @@ import org.apache.lucene.index.AssertingDirectoryReader; import org.apache.lucene.index.FilterDirectoryReader; -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.SettingsModule; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.plugins.EnginePlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.engine.MockEngineFactory; import org.elasticsearch.test.engine.MockEngineSupport; import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.List; +import java.util.Optional; /** * A plugin to use {@link MockEngineFactory}. * * Subclasses may override the reader wrapper used. */ -public class MockEngineFactoryPlugin extends Plugin { +public class MockEngineFactoryPlugin extends Plugin implements EnginePlugin { @Override public List> getSettings() { @@ -46,8 +44,8 @@ public List> getSettings() { } @Override - public void onIndexModule(IndexModule module) { - module.engineFactory.set(new MockEngineFactory(getReaderWrapperClass())); + public Optional getEngineFactory(final IndexSettings indexSettings) { + return Optional.of(new MockEngineFactory(getReaderWrapperClass())); } protected Class getReaderWrapperClass() { diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index d463fdbd17bdd..89850547508cf 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -53,6 +53,7 @@ import org.elasticsearch.index.cache.query.DisabledQueryCache; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; @@ -214,7 +215,7 @@ protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, I @Nullable IndexSearcherWrapper searcherWrapper, Runnable globalCheckpointSyncer) throws IOException { ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, nodeId, primary, ShardRoutingState.INITIALIZING, primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE); - return newShard(shardRouting, indexMetaData, searcherWrapper, null, globalCheckpointSyncer); + return newShard(shardRouting, indexMetaData, searcherWrapper, new InternalEngineFactory(), globalCheckpointSyncer); } @@ -228,7 +229,7 @@ protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, I */ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, IndexingOperationListener... listeners) throws IOException { - return newShard(routing, indexMetaData, null, null, () -> {}, listeners); + return newShard(routing, indexMetaData, null, new InternalEngineFactory(), () -> {}, listeners); } /**