Skip to content

Enable engine factory to be pluggable #31183

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.elasticsearch.index;

import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.search.similarities.BM25Similarity;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
Expand Down Expand Up @@ -60,6 +60,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -104,8 +105,7 @@ public final class IndexModule {

private final IndexSettings indexSettings;
private final AnalysisRegistry analysisRegistry;
// pkg private so tests can mock
final SetOnce<EngineFactory> engineFactory = new SetOnce<>();
private final EngineFactory engineFactory;
private SetOnce<IndexSearcherWrapperFactory> indexSearcherWrapper = new SetOnce<>();
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
private final Map<String, TriFunction<Settings, Version, ScriptService, Similarity>> similarities = new HashMap<>();
Expand All @@ -115,9 +115,18 @@ public final class IndexModule {
private final List<IndexingOperationListener> indexOperationListeners = new ArrayList<>();
private final AtomicBoolean frozen = new AtomicBoolean(false);

public IndexModule(IndexSettings indexSettings, AnalysisRegistry analysisRegistry) {
/**
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
* via {@link org.elasticsearch.plugins.PluginsService#onIndexModule(IndexModule)}.
*
* @param indexSettings the index settings
* @param analysisRegistry the analysis registry
* @param engineFactory the engine factory
*/
public IndexModule(final IndexSettings indexSettings, final AnalysisRegistry analysisRegistry, final EngineFactory engineFactory) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
this.engineFactory = Objects.requireNonNull(engineFactory);
this.searchOperationListeners.add(new SearchSlowLog(indexSettings));
this.indexOperationListeners.add(new IndexingSlowLog(indexSettings));
}
Expand Down Expand Up @@ -158,6 +167,15 @@ public Index getIndex() {
return indexSettings.getIndex();
}

/**
* The engine factory provided during construction of this index module.
*
* @return the engine factory
*/
EngineFactory getEngineFactory() {
return engineFactory;
}

/**
* Adds an {@link IndexEventListener} for this index. All listeners added here
* are maintained for the entire index lifecycle on this node. Once an index is closed or deleted these
Expand Down Expand Up @@ -364,7 +382,7 @@ public IndexService newIndexService(
}
return new IndexService(indexSettings, environment, xContentRegistry,
new SimilarityService(indexSettings, scriptService, similarities),
shardStoreDeleter, analysisRegistry, engineFactory.get(), circuitBreakerService, bigArrays, threadPool, scriptService,
shardStoreDeleter, analysisRegistry, engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService,
client, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry,
indicesFieldDataCache, searchOperationListeners, indexOperationListeners, namedWriteableRegistry);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public IndexService(
SimilarityService similarityService,
ShardStoreDeleter shardStoreDeleter,
AnalysisRegistry registry,
@Nullable EngineFactory engineFactory,
EngineFactory engineFactory,
CircuitBreakerService circuitBreakerService,
BigArrays bigArrays,
ThreadPool threadPool,
Expand Down Expand Up @@ -188,7 +188,7 @@ public IndexService(
this.warmer = new IndexWarmer(indexSettings.getSettings(), threadPool, indexFieldData,
bitsetFilterCache.createListener(threadPool));
this.indexCache = new IndexCache(indexSettings, queryCache, bitsetFilterCache);
this.engineFactory = engineFactory;
this.engineFactory = Objects.requireNonNull(engineFactory);
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
this.searcherWrapper = wrapperFactory.newWrapper(this);
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
Expand Down Expand Up @@ -681,9 +681,9 @@ public interface ShardStoreDeleter {
void addPendingDelete(ShardId shardId, IndexSettings indexSettings);
}

final EngineFactory getEngineFactory() {
public final EngineFactory getEngineFactory() {
return engineFactory;
} // pkg private for testing
}

final IndexSearcherWrapper getSearcherWrapper() {
return searcherWrapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.RefreshFailedEngineException;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.index.engine.SegmentsStats;
Expand Down Expand Up @@ -194,7 +193,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
protected volatile IndexShardState state;
protected volatile long primaryTerm;
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
protected final EngineFactory engineFactory;
final EngineFactory engineFactory;

private final IndexingOperationListener indexingOperationListeners;
private final Runnable globalCheckpointSyncer;
Expand Down Expand Up @@ -267,7 +266,7 @@ public IndexShard(
this.warmer = warmer;
this.similarityService = similarityService;
Objects.requireNonNull(store, "Store must be provided to the index shard");
this.engineFactory = engineFactory == null ? new InternalEngineFactory() : engineFactory;
this.engineFactory = Objects.requireNonNull(engineFactory);
this.store = store;
this.indexSortSupplier = indexSortSupplier;
this.indexEventListener = indexEventListener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.elasticsearch.common.geo.ShapesAvailability;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.mapper.BinaryFieldMapper;
import org.elasticsearch.index.mapper.BooleanFieldMapper;
import org.elasticsearch.index.mapper.CompletionFieldMapper;
Expand Down Expand Up @@ -60,10 +62,12 @@
import org.elasticsearch.plugins.MapperPlugin;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -234,4 +238,9 @@ protected void configure() {
public MapperRegistry getMapperRegistry() {
return mapperRegistry;
}

public Collection<Function<IndexSettings, Optional<EngineFactory>>> getEngineFactories() {
return Collections.emptyList();
}

}
46 changes: 42 additions & 4 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.core.internal.io.IOUtils;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceAlreadyExistsException;
Expand All @@ -45,6 +44,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand All @@ -67,6 +67,7 @@
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.env.ShardLockObtainFailedException;
Expand All @@ -79,6 +80,8 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.cache.request.ShardRequestCache;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
Expand Down Expand Up @@ -116,10 +119,14 @@
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -172,6 +179,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final IndicesRequestCache indicesRequestCache;
private final IndicesQueryCache indicesQueryCache;
private final MetaStateService metaStateService;
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;

@Override
protected void doStart() {
Expand All @@ -183,7 +191,8 @@ public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvi
AnalysisRegistry analysisRegistry, IndexNameExpressionResolver indexNameExpressionResolver,
MapperRegistry mapperRegistry, NamedWriteableRegistry namedWriteableRegistry, ThreadPool threadPool,
IndexScopedSettings indexScopedSettings, CircuitBreakerService circuitBreakerService, BigArrays bigArrays,
ScriptService scriptService, Client client, MetaStateService metaStateService) {
ScriptService scriptService, Client client, MetaStateService metaStateService,
Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders) {
super(settings);
this.threadPool = threadPool;
this.pluginsService = pluginsService;
Expand Down Expand Up @@ -214,6 +223,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
this.cleanInterval = INDICES_CACHE_CLEAN_INTERVAL_SETTING.get(settings);
this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, indicesRequestCache, logger, threadPool, this.cleanInterval);
this.metaStateService = metaStateService;
this.engineFactoryProviders = engineFactoryProviders;
}

@Override
Expand Down Expand Up @@ -442,7 +452,7 @@ private synchronized IndexService createIndexService(final String reason,
idxSettings.getNumberOfReplicas(),
reason);

final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings));
for (IndexingOperationListener operationListener : indexingOperationListeners) {
indexModule.addIndexOperationListener(operationListener);
}
Expand All @@ -466,6 +476,34 @@ private synchronized IndexService createIndexService(final String reason,
);
}

private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
final List<Optional<EngineFactory>> engineFactories =
engineFactoryProviders
.stream()
.map(engineFactoryProvider -> engineFactoryProvider.apply(idxSettings))
.filter(maybe -> Objects.requireNonNull(maybe).isPresent())
.collect(Collectors.toList());
if (engineFactories.isEmpty()) {
return new InternalEngineFactory();
} else if (engineFactories.size() == 1) {
assert engineFactories.get(0).isPresent();
return engineFactories.get(0).get();
} else {
final String message = String.format(
Locale.ROOT,
"multiple engine factories provided for %s: %s",
idxSettings.getIndex(),
engineFactories
.stream()
.map(t -> {
assert t.isPresent();
return "[" + t.get().getClass().getName() + "]";
})
.collect(Collectors.joining(",")));
throw new IllegalStateException(message);
}
}

/**
* creates a new mapper service for the given index, in order to do administrative work like mapping updates.
* This *should not* be used for document parsing. Doing so will result in an exception.
Expand All @@ -474,7 +512,7 @@ private synchronized IndexService createIndexService(final String reason,
*/
public synchronized MapperService createIndexMapperService(IndexMetaData indexMetaData) throws IOException {
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexScopedSettings);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings));
pluginsService.onIndexModule(indexModule);
return indexModule.newIndexMapperService(xContentRegistry, mapperRegistry, scriptService);
}
Expand Down
38 changes: 25 additions & 13 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.Constants;
import org.elasticsearch.core.internal.io.IOUtils;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
Expand Down Expand Up @@ -58,12 +57,10 @@
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Binder;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Key;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.DeprecationLogger;
Expand All @@ -82,6 +79,7 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoverySettings;
Expand All @@ -93,8 +91,10 @@
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.AnalysisModule;
Expand All @@ -109,10 +109,15 @@
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.persistent.PersistentTasksClusterService;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.persistent.PersistentTasksExecutorRegistry;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.MetaDataUpgrader;
Expand Down Expand Up @@ -140,10 +145,6 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.usage.UsageService;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.persistent.PersistentTasksClusterService;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.persistent.PersistentTasksExecutorRegistry;
import org.elasticsearch.persistent.PersistentTasksService;

import java.io.BufferedWriter;
import java.io.Closeable;
Expand All @@ -161,6 +162,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -395,11 +397,21 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
.flatMap(Function.identity()).collect(toList()));
modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), xContentRegistry));
final MetaStateService metaStateService = new MetaStateService(settings, nodeEnvironment, xContentRegistry);
final IndicesService indicesService = new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry,
analysisModule.getAnalysisRegistry(),
clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptModule.getScriptService(),
client, metaStateService);

// collect engine factory providers from server and from plugins
final Collection<EnginePlugin> enginePlugins = pluginsService.filterPlugins(EnginePlugin.class);
final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders =
Stream.concat(
indicesModule.getEngineFactories().stream(),
enginePlugins.stream().map(plugin -> plugin::getEngineFactory))
.collect(Collectors.toList());

final IndicesService indicesService =
new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, analysisModule.getAnalysisRegistry(),
clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays,
scriptModule.getScriptService(), client, metaStateService, engineFactoryProviders);


Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,
Expand Down
Loading