Skip to content

Enable engine factory to be pluggable #26827

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions core/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.BM25SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityService;
Expand All @@ -59,6 +58,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -108,8 +108,7 @@ public final class IndexModule {

private final IndexSettings indexSettings;
private final AnalysisRegistry analysisRegistry;
// pkg private so tests can mock
final SetOnce<EngineFactory> engineFactory = new SetOnce<>();
private final EngineFactory engineFactory;
private SetOnce<IndexSearcherWrapperFactory> indexSearcherWrapper = new SetOnce<>();
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
private final Map<String, SimilarityProvider.Factory> similarities = new HashMap<>();
Expand All @@ -119,9 +118,18 @@ public final class IndexModule {
private final List<IndexingOperationListener> indexOperationListeners = new ArrayList<>();
private final AtomicBoolean frozen = new AtomicBoolean(false);

public IndexModule(IndexSettings indexSettings, AnalysisRegistry analysisRegistry) {
/**
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
* via {@link org.elasticsearch.plugins.PluginsService#onIndexModule(IndexModule)}.
*
* @param indexSettings the index settings
* @param analysisRegistry the analysis registry
* @param engineFactory the engine factory
*/
public IndexModule(final IndexSettings indexSettings, final AnalysisRegistry analysisRegistry, final EngineFactory engineFactory) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
this.engineFactory = Objects.requireNonNull(engineFactory);
this.searchOperationListeners.add(new SearchSlowLog(indexSettings));
this.indexOperationListeners.add(new IndexingSlowLog(indexSettings));
}
Expand Down Expand Up @@ -162,6 +170,15 @@ public Index getIndex() {
return indexSettings.getIndex();
}

/**
* The engine factory provided during construction of this index module.
*
* @return the engine factory
*/
EngineFactory getEngineFactory() {
return engineFactory;
}

/**
* Adds an {@link IndexEventListener} for this index. All listeners added here
* are maintained for the entire index lifecycle on this node. Once an index is closed or deleted these
Expand Down Expand Up @@ -363,7 +380,7 @@ public IndexService newIndexService(
}
return new IndexService(indexSettings, environment, xContentRegistry,
new SimilarityService(indexSettings, scriptService, similarities),
shardStoreDeleter, analysisRegistry, engineFactory.get(), circuitBreakerService, bigArrays, threadPool, scriptService,
shardStoreDeleter, analysisRegistry, engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService,
client, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry,
indicesFieldDataCache, searchOperationListeners, indexOperationListeners, namedWriteableRegistry);
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public IndexService(
SimilarityService similarityService,
ShardStoreDeleter shardStoreDeleter,
AnalysisRegistry registry,
@Nullable EngineFactory engineFactory,
EngineFactory engineFactory,
CircuitBreakerService circuitBreakerService,
BigArrays bigArrays,
ThreadPool threadPool,
Expand Down Expand Up @@ -185,7 +185,7 @@ public IndexService(
this.warmer = new IndexWarmer(indexSettings.getSettings(), threadPool, indexFieldData,
bitsetFilterCache.createListener(threadPool));
this.indexCache = new IndexCache(indexSettings, queryCache, bitsetFilterCache);
this.engineFactory = engineFactory;
this.engineFactory = Objects.requireNonNull(engineFactory);
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
this.searcherWrapper = wrapperFactory.newWrapper(this);
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
Expand Down Expand Up @@ -657,9 +657,9 @@ public interface ShardStoreDeleter {
void addPendingDelete(ShardId shardId, IndexSettings indexSettings);
}

final EngineFactory getEngineFactory() {
public final EngineFactory getEngineFactory() {
return engineFactory;
} // pkg private for testing
}

final IndexSearcherWrapper getSearcherWrapper() {
return searcherWrapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.RefreshFailedEngineException;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.index.engine.SegmentsStats;
Expand Down Expand Up @@ -195,7 +194,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
protected volatile IndexShardState state;
protected volatile long primaryTerm;
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
protected final EngineFactory engineFactory;
final EngineFactory engineFactory;

private final IndexingOperationListener indexingOperationListeners;
private final Runnable globalCheckpointSyncer;
Expand Down Expand Up @@ -265,7 +264,7 @@ public IndexShard(
this.warmer = warmer;
this.similarityService = similarityService;
Objects.requireNonNull(store, "Store must be provided to the index shard");
this.engineFactory = engineFactory == null ? new InternalEngineFactory() : engineFactory;
this.engineFactory = Objects.requireNonNull(engineFactory);
this.store = store;
this.indexSortSupplier = indexSortSupplier;
this.indexEventListener = indexEventListener;
Expand Down
45 changes: 42 additions & 3 deletions core/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -78,6 +79,8 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.cache.request.ShardRequestCache;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
Expand All @@ -101,6 +104,7 @@
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.script.ScriptService;
Expand All @@ -115,10 +119,14 @@
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -169,6 +177,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final IndicesRequestCache indicesRequestCache;
private final IndicesQueryCache indicesQueryCache;
private final MetaStateService metaStateService;
private final Collection<EnginePlugin> enginePlugins;

@Override
protected void doStart() {
Expand All @@ -180,7 +189,8 @@ public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvi
AnalysisRegistry analysisRegistry, IndexNameExpressionResolver indexNameExpressionResolver,
MapperRegistry mapperRegistry, NamedWriteableRegistry namedWriteableRegistry, ThreadPool threadPool,
IndexScopedSettings indexScopedSettings, CircuitBreakerService circuitBreakerService, BigArrays bigArrays,
ScriptService scriptService, Client client, MetaStateService metaStateService) {
ScriptService scriptService, Client client, MetaStateService metaStateService,
Collection<EnginePlugin> enginePlugins) {
super(settings);
this.threadPool = threadPool;
this.pluginsService = pluginsService;
Expand Down Expand Up @@ -211,6 +221,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
this.cleanInterval = INDICES_CACHE_CLEAN_INTERVAL_SETTING.get(settings);
this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, indicesRequestCache, logger, threadPool, this.cleanInterval);
this.metaStateService = metaStateService;
this.enginePlugins = enginePlugins;
}

@Override
Expand Down Expand Up @@ -437,7 +448,7 @@ private synchronized IndexService createIndexService(final String reason,
idxSettings.getNumberOfReplicas(),
reason);

final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings));
for (IndexingOperationListener operationListener : indexingOperationListeners) {
indexModule.addIndexOperationListener(operationListener);
}
Expand All @@ -461,6 +472,34 @@ private synchronized IndexService createIndexService(final String reason,
);
}

private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
final List<Tuple<EnginePlugin, Optional<EngineFactory>>> engineFactories =
enginePlugins
.stream()
.map(p -> Tuple.tuple(p, p.getEngineFactory(idxSettings)))
.filter(t -> Objects.requireNonNull(t.v2()).isPresent())
.collect(Collectors.toList());
if (engineFactories.isEmpty()) {
return new InternalEngineFactory();
} else if (engineFactories.size() == 1) {
assert engineFactories.get(0).v2().isPresent();
return engineFactories.get(0).v2().get();
} else {
final String message = String.format(
Locale.ROOT,
"multiple plugins provided engine factories for %s: %s",
idxSettings.getIndex(),
engineFactories
.stream()
.map(t -> {
assert t.v2().isPresent();
return "[" + t.v1().getClass().getName() + "/" + t.v2().get().getClass().getName() + "]";
})
.collect(Collectors.joining(",")));
throw new IllegalStateException(message);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an idea and not sure how feasible it is, but besides failing here, it would also be great if we can fail more directly in the create index api? (for example in MetaDataCreateIndexService#validateIndexSettings(...) method). This way the error is directly visible and an index with illegal settings will not be created.

}
}

/**
* creates a new mapper service for the given index, in order to do administrative work like mapping updates.
* This *should not* be used for document parsing. Doing so will result in an exception.
Expand All @@ -469,7 +508,7 @@ private synchronized IndexService createIndexService(final String reason,
*/
public synchronized MapperService createIndexMapperService(IndexMetaData indexMetaData) throws IOException {
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexScopeSetting);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings));
pluginsService.onIndexModule(indexModule);
return indexModule.newIndexMapperService(xContentRegistry, mapperRegistry, scriptService);
}
Expand Down
16 changes: 10 additions & 6 deletions core/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoverySettings;
Expand All @@ -112,6 +111,7 @@
import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.MetaDataUpgrader;
Expand Down Expand Up @@ -385,11 +385,15 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
.flatMap(Function.identity()).collect(toList()));
modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), xContentRegistry));
final MetaStateService metaStateService = new MetaStateService(settings, nodeEnvironment, xContentRegistry);
final IndicesService indicesService = new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry,
analysisModule.getAnalysisRegistry(),
clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptModule.getScriptService(),
client, metaStateService);

// collect engine factory providers per plugin
final Collection<EnginePlugin> enginePlugins = pluginsService.filterPlugins(EnginePlugin.class);

final IndicesService indicesService =
new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, analysisModule.getAnalysisRegistry(),
clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays,
scriptModule.getScriptService(), client, metaStateService, enginePlugins);

Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,
Expand Down
42 changes: 42 additions & 0 deletions core/src/main/java/org/elasticsearch/plugins/EnginePlugin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.plugins;

import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;

import java.util.Optional;

/**
* A plugin that provides alternative engine implementations.
*/
public interface EnginePlugin {

/**
* When an index is created this method is invoked for each engine plugin. Engine plugins can inspect the index settings to determine
* whether or not to provide an engine factory for the given index. A plugin that is not overriding the default engine should return
* {@link Optional#empty()}. If multiple plugins return an engine factory for a given index the index will not be created and an
* {@link IllegalStateException} will be thrown during index creation.
*
* @return an optional engine factory
*/
Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings);

}
Loading