Skip to content

Commit e481b86

Browse files
authored
Enable engine factory to be pluggable (#31183)
This commit enables the engine factory to be pluggable based on index settings used when creating the index service for an index.
1 parent d8c0a39 commit e481b86

File tree

16 files changed

+350
-76
lines changed

16 files changed

+350
-76
lines changed

server/src/main/java/org/elasticsearch/index/IndexModule.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
package org.elasticsearch.index;
2121

22-
import org.apache.lucene.search.similarities.Similarity;
2322
import org.apache.lucene.search.similarities.BM25Similarity;
23+
import org.apache.lucene.search.similarities.Similarity;
2424
import org.apache.lucene.util.SetOnce;
2525
import org.elasticsearch.Version;
2626
import org.elasticsearch.client.Client;
@@ -60,6 +60,7 @@
6060
import java.util.List;
6161
import java.util.Locale;
6262
import java.util.Map;
63+
import java.util.Objects;
6364
import java.util.Set;
6465
import java.util.concurrent.atomic.AtomicBoolean;
6566
import java.util.function.BiFunction;
@@ -104,8 +105,7 @@ public final class IndexModule {
104105

105106
private final IndexSettings indexSettings;
106107
private final AnalysisRegistry analysisRegistry;
107-
// pkg private so tests can mock
108-
final SetOnce<EngineFactory> engineFactory = new SetOnce<>();
108+
private final EngineFactory engineFactory;
109109
private SetOnce<IndexSearcherWrapperFactory> indexSearcherWrapper = new SetOnce<>();
110110
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
111111
private final Map<String, TriFunction<Settings, Version, ScriptService, Similarity>> similarities = new HashMap<>();
@@ -115,9 +115,18 @@ public final class IndexModule {
115115
private final List<IndexingOperationListener> indexOperationListeners = new ArrayList<>();
116116
private final AtomicBoolean frozen = new AtomicBoolean(false);
117117

118-
public IndexModule(IndexSettings indexSettings, AnalysisRegistry analysisRegistry) {
118+
/**
119+
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
120+
* via {@link org.elasticsearch.plugins.PluginsService#onIndexModule(IndexModule)}.
121+
*
122+
* @param indexSettings the index settings
123+
* @param analysisRegistry the analysis registry
124+
* @param engineFactory the engine factory
125+
*/
126+
public IndexModule(final IndexSettings indexSettings, final AnalysisRegistry analysisRegistry, final EngineFactory engineFactory) {
119127
this.indexSettings = indexSettings;
120128
this.analysisRegistry = analysisRegistry;
129+
this.engineFactory = Objects.requireNonNull(engineFactory);
121130
this.searchOperationListeners.add(new SearchSlowLog(indexSettings));
122131
this.indexOperationListeners.add(new IndexingSlowLog(indexSettings));
123132
}
@@ -158,6 +167,15 @@ public Index getIndex() {
158167
return indexSettings.getIndex();
159168
}
160169

170+
/**
171+
* The engine factory provided during construction of this index module.
172+
*
173+
* @return the engine factory
174+
*/
175+
EngineFactory getEngineFactory() {
176+
return engineFactory;
177+
}
178+
161179
/**
162180
* Adds an {@link IndexEventListener} for this index. All listeners added here
163181
* are maintained for the entire index lifecycle on this node. Once an index is closed or deleted these
@@ -364,7 +382,7 @@ public IndexService newIndexService(
364382
}
365383
return new IndexService(indexSettings, environment, xContentRegistry,
366384
new SimilarityService(indexSettings, scriptService, similarities),
367-
shardStoreDeleter, analysisRegistry, engineFactory.get(), circuitBreakerService, bigArrays, threadPool, scriptService,
385+
shardStoreDeleter, analysisRegistry, engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService,
368386
client, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry,
369387
indicesFieldDataCache, searchOperationListeners, indexOperationListeners, namedWriteableRegistry);
370388
}

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public IndexService(
139139
SimilarityService similarityService,
140140
ShardStoreDeleter shardStoreDeleter,
141141
AnalysisRegistry registry,
142-
@Nullable EngineFactory engineFactory,
142+
EngineFactory engineFactory,
143143
CircuitBreakerService circuitBreakerService,
144144
BigArrays bigArrays,
145145
ThreadPool threadPool,
@@ -188,7 +188,7 @@ public IndexService(
188188
this.warmer = new IndexWarmer(indexSettings.getSettings(), threadPool, indexFieldData,
189189
bitsetFilterCache.createListener(threadPool));
190190
this.indexCache = new IndexCache(indexSettings, queryCache, bitsetFilterCache);
191-
this.engineFactory = engineFactory;
191+
this.engineFactory = Objects.requireNonNull(engineFactory);
192192
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
193193
this.searcherWrapper = wrapperFactory.newWrapper(this);
194194
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
@@ -681,9 +681,9 @@ public interface ShardStoreDeleter {
681681
void addPendingDelete(ShardId shardId, IndexSettings indexSettings);
682682
}
683683

684-
final EngineFactory getEngineFactory() {
684+
public final EngineFactory getEngineFactory() {
685685
return engineFactory;
686-
} // pkg private for testing
686+
}
687687

688688
final IndexSearcherWrapper getSearcherWrapper() {
689689
return searcherWrapper;

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@
8282
import org.elasticsearch.index.engine.EngineException;
8383
import org.elasticsearch.index.engine.EngineFactory;
8484
import org.elasticsearch.index.engine.InternalEngine;
85-
import org.elasticsearch.index.engine.InternalEngineFactory;
8685
import org.elasticsearch.index.engine.RefreshFailedEngineException;
8786
import org.elasticsearch.index.engine.Segment;
8887
import org.elasticsearch.index.engine.SegmentsStats;
@@ -194,7 +193,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
194193
protected volatile IndexShardState state;
195194
protected volatile long primaryTerm;
196195
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
197-
protected final EngineFactory engineFactory;
196+
final EngineFactory engineFactory;
198197

199198
private final IndexingOperationListener indexingOperationListeners;
200199
private final Runnable globalCheckpointSyncer;
@@ -267,7 +266,7 @@ public IndexShard(
267266
this.warmer = warmer;
268267
this.similarityService = similarityService;
269268
Objects.requireNonNull(store, "Store must be provided to the index shard");
270-
this.engineFactory = engineFactory == null ? new InternalEngineFactory() : engineFactory;
269+
this.engineFactory = Objects.requireNonNull(engineFactory);
271270
this.store = store;
272271
this.indexSortSupplier = indexSortSupplier;
273272
this.indexEventListener = indexEventListener;

server/src/main/java/org/elasticsearch/indices/IndicesModule.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.elasticsearch.common.geo.ShapesAvailability;
2828
import org.elasticsearch.common.inject.AbstractModule;
2929
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
30+
import org.elasticsearch.index.IndexSettings;
31+
import org.elasticsearch.index.engine.EngineFactory;
3032
import org.elasticsearch.index.mapper.BinaryFieldMapper;
3133
import org.elasticsearch.index.mapper.BooleanFieldMapper;
3234
import org.elasticsearch.index.mapper.CompletionFieldMapper;
@@ -60,10 +62,12 @@
6062
import org.elasticsearch.plugins.MapperPlugin;
6163

6264
import java.util.ArrayList;
65+
import java.util.Collection;
6366
import java.util.Collections;
6467
import java.util.LinkedHashMap;
6568
import java.util.List;
6669
import java.util.Map;
70+
import java.util.Optional;
6771
import java.util.Set;
6872
import java.util.function.Function;
6973
import java.util.function.Predicate;
@@ -234,4 +238,9 @@ protected void configure() {
234238
public MapperRegistry getMapperRegistry() {
235239
return mapperRegistry;
236240
}
241+
242+
public Collection<Function<IndexSettings, Optional<EngineFactory>>> getEngineFactories() {
243+
return Collections.emptyList();
244+
}
245+
237246
}

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.lucene.store.AlreadyClosedException;
2626
import org.apache.lucene.store.LockObtainFailedException;
2727
import org.apache.lucene.util.CollectionUtil;
28-
import org.elasticsearch.core.internal.io.IOUtils;
2928
import org.apache.lucene.util.RamUsageEstimator;
3029
import org.elasticsearch.ElasticsearchException;
3130
import org.elasticsearch.ResourceAlreadyExistsException;
@@ -45,6 +44,7 @@
4544
import org.elasticsearch.common.Nullable;
4645
import org.elasticsearch.common.breaker.CircuitBreaker;
4746
import org.elasticsearch.common.bytes.BytesReference;
47+
import org.elasticsearch.common.collect.Tuple;
4848
import org.elasticsearch.common.component.AbstractLifecycleComponent;
4949
import org.elasticsearch.common.io.FileSystemUtils;
5050
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -67,6 +67,7 @@
6767
import org.elasticsearch.common.xcontent.XContentFactory;
6868
import org.elasticsearch.common.xcontent.XContentParser;
6969
import org.elasticsearch.common.xcontent.XContentType;
70+
import org.elasticsearch.core.internal.io.IOUtils;
7071
import org.elasticsearch.env.NodeEnvironment;
7172
import org.elasticsearch.env.ShardLock;
7273
import org.elasticsearch.env.ShardLockObtainFailedException;
@@ -79,6 +80,8 @@
7980
import org.elasticsearch.index.IndexSettings;
8081
import org.elasticsearch.index.analysis.AnalysisRegistry;
8182
import org.elasticsearch.index.cache.request.ShardRequestCache;
83+
import org.elasticsearch.index.engine.EngineFactory;
84+
import org.elasticsearch.index.engine.InternalEngineFactory;
8285
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
8386
import org.elasticsearch.index.flush.FlushStats;
8487
import org.elasticsearch.index.get.GetStats;
@@ -116,10 +119,14 @@
116119
import java.io.IOException;
117120
import java.nio.file.Files;
118121
import java.util.ArrayList;
122+
import java.util.Collection;
119123
import java.util.HashMap;
120124
import java.util.Iterator;
121125
import java.util.List;
126+
import java.util.Locale;
122127
import java.util.Map;
128+
import java.util.Objects;
129+
import java.util.Optional;
123130
import java.util.Set;
124131
import java.util.concurrent.CountDownLatch;
125132
import java.util.concurrent.ExecutorService;
@@ -172,6 +179,7 @@ public class IndicesService extends AbstractLifecycleComponent
172179
private final IndicesRequestCache indicesRequestCache;
173180
private final IndicesQueryCache indicesQueryCache;
174181
private final MetaStateService metaStateService;
182+
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
175183

176184
@Override
177185
protected void doStart() {
@@ -183,7 +191,8 @@ public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvi
183191
AnalysisRegistry analysisRegistry, IndexNameExpressionResolver indexNameExpressionResolver,
184192
MapperRegistry mapperRegistry, NamedWriteableRegistry namedWriteableRegistry, ThreadPool threadPool,
185193
IndexScopedSettings indexScopedSettings, CircuitBreakerService circuitBreakerService, BigArrays bigArrays,
186-
ScriptService scriptService, Client client, MetaStateService metaStateService) {
194+
ScriptService scriptService, Client client, MetaStateService metaStateService,
195+
Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders) {
187196
super(settings);
188197
this.threadPool = threadPool;
189198
this.pluginsService = pluginsService;
@@ -214,6 +223,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
214223
this.cleanInterval = INDICES_CACHE_CLEAN_INTERVAL_SETTING.get(settings);
215224
this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, indicesRequestCache, logger, threadPool, this.cleanInterval);
216225
this.metaStateService = metaStateService;
226+
this.engineFactoryProviders = engineFactoryProviders;
217227
}
218228

219229
@Override
@@ -442,7 +452,7 @@ private synchronized IndexService createIndexService(final String reason,
442452
idxSettings.getNumberOfReplicas(),
443453
reason);
444454

445-
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry);
455+
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings));
446456
for (IndexingOperationListener operationListener : indexingOperationListeners) {
447457
indexModule.addIndexOperationListener(operationListener);
448458
}
@@ -466,6 +476,34 @@ private synchronized IndexService createIndexService(final String reason,
466476
);
467477
}
468478

479+
private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
480+
final List<Optional<EngineFactory>> engineFactories =
481+
engineFactoryProviders
482+
.stream()
483+
.map(engineFactoryProvider -> engineFactoryProvider.apply(idxSettings))
484+
.filter(maybe -> Objects.requireNonNull(maybe).isPresent())
485+
.collect(Collectors.toList());
486+
if (engineFactories.isEmpty()) {
487+
return new InternalEngineFactory();
488+
} else if (engineFactories.size() == 1) {
489+
assert engineFactories.get(0).isPresent();
490+
return engineFactories.get(0).get();
491+
} else {
492+
final String message = String.format(
493+
Locale.ROOT,
494+
"multiple engine factories provided for %s: %s",
495+
idxSettings.getIndex(),
496+
engineFactories
497+
.stream()
498+
.map(t -> {
499+
assert t.isPresent();
500+
return "[" + t.get().getClass().getName() + "]";
501+
})
502+
.collect(Collectors.joining(",")));
503+
throw new IllegalStateException(message);
504+
}
505+
}
506+
469507
/**
470508
* creates a new mapper service for the given index, in order to do administrative work like mapping updates.
471509
* This *should not* be used for document parsing. Doing so will result in an exception.
@@ -474,7 +512,7 @@ private synchronized IndexService createIndexService(final String reason,
474512
*/
475513
public synchronized MapperService createIndexMapperService(IndexMetaData indexMetaData) throws IOException {
476514
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexScopedSettings);
477-
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry);
515+
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings));
478516
pluginsService.onIndexModule(indexModule);
479517
return indexModule.newIndexMapperService(xContentRegistry, mapperRegistry, scriptService);
480518
}

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.logging.log4j.Logger;
2323
import org.apache.lucene.util.Constants;
24-
import org.elasticsearch.core.internal.io.IOUtils;
2524
import org.apache.lucene.util.SetOnce;
2625
import org.elasticsearch.Build;
2726
import org.elasticsearch.ElasticsearchException;
@@ -58,12 +57,10 @@
5857
import org.elasticsearch.common.StopWatch;
5958
import org.elasticsearch.common.component.Lifecycle;
6059
import org.elasticsearch.common.component.LifecycleComponent;
61-
import org.elasticsearch.common.inject.Binder;
6260
import org.elasticsearch.common.inject.Injector;
6361
import org.elasticsearch.common.inject.Key;
6462
import org.elasticsearch.common.inject.Module;
6563
import org.elasticsearch.common.inject.ModulesBuilder;
66-
import org.elasticsearch.common.inject.util.Providers;
6764
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
6865
import org.elasticsearch.common.lease.Releasables;
6966
import org.elasticsearch.common.logging.DeprecationLogger;
@@ -82,6 +79,7 @@
8279
import org.elasticsearch.common.util.BigArrays;
8380
import org.elasticsearch.common.util.PageCacheRecycler;
8481
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
82+
import org.elasticsearch.core.internal.io.IOUtils;
8583
import org.elasticsearch.discovery.Discovery;
8684
import org.elasticsearch.discovery.DiscoveryModule;
8785
import org.elasticsearch.discovery.DiscoverySettings;
@@ -93,8 +91,10 @@
9391
import org.elasticsearch.gateway.GatewayService;
9492
import org.elasticsearch.gateway.MetaStateService;
9593
import org.elasticsearch.http.HttpServerTransport;
96-
import org.elasticsearch.http.HttpTransportSettings;
94+
import org.elasticsearch.index.IndexSettings;
9795
import org.elasticsearch.index.analysis.AnalysisRegistry;
96+
import org.elasticsearch.index.engine.Engine;
97+
import org.elasticsearch.index.engine.EngineFactory;
9898
import org.elasticsearch.indices.IndicesModule;
9999
import org.elasticsearch.indices.IndicesService;
100100
import org.elasticsearch.indices.analysis.AnalysisModule;
@@ -109,10 +109,15 @@
109109
import org.elasticsearch.ingest.IngestService;
110110
import org.elasticsearch.monitor.MonitorService;
111111
import org.elasticsearch.monitor.jvm.JvmInfo;
112+
import org.elasticsearch.persistent.PersistentTasksClusterService;
113+
import org.elasticsearch.persistent.PersistentTasksExecutor;
114+
import org.elasticsearch.persistent.PersistentTasksExecutorRegistry;
115+
import org.elasticsearch.persistent.PersistentTasksService;
112116
import org.elasticsearch.plugins.ActionPlugin;
113117
import org.elasticsearch.plugins.AnalysisPlugin;
114118
import org.elasticsearch.plugins.ClusterPlugin;
115119
import org.elasticsearch.plugins.DiscoveryPlugin;
120+
import org.elasticsearch.plugins.EnginePlugin;
116121
import org.elasticsearch.plugins.IngestPlugin;
117122
import org.elasticsearch.plugins.MapperPlugin;
118123
import org.elasticsearch.plugins.MetaDataUpgrader;
@@ -140,10 +145,6 @@
140145
import org.elasticsearch.transport.TransportService;
141146
import org.elasticsearch.usage.UsageService;
142147
import org.elasticsearch.watcher.ResourceWatcherService;
143-
import org.elasticsearch.persistent.PersistentTasksClusterService;
144-
import org.elasticsearch.persistent.PersistentTasksExecutor;
145-
import org.elasticsearch.persistent.PersistentTasksExecutorRegistry;
146-
import org.elasticsearch.persistent.PersistentTasksService;
147148

148149
import java.io.BufferedWriter;
149150
import java.io.Closeable;
@@ -161,6 +162,7 @@
161162
import java.util.Collections;
162163
import java.util.List;
163164
import java.util.Map;
165+
import java.util.Optional;
164166
import java.util.Set;
165167
import java.util.concurrent.CountDownLatch;
166168
import java.util.concurrent.TimeUnit;
@@ -395,11 +397,21 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
395397
.flatMap(Function.identity()).collect(toList()));
396398
modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), xContentRegistry));
397399
final MetaStateService metaStateService = new MetaStateService(settings, nodeEnvironment, xContentRegistry);
398-
final IndicesService indicesService = new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry,
399-
analysisModule.getAnalysisRegistry(),
400-
clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
401-
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptModule.getScriptService(),
402-
client, metaStateService);
400+
401+
// collect engine factory providers from server and from plugins
402+
final Collection<EnginePlugin> enginePlugins = pluginsService.filterPlugins(EnginePlugin.class);
403+
final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders =
404+
Stream.concat(
405+
indicesModule.getEngineFactories().stream(),
406+
enginePlugins.stream().map(plugin -> plugin::getEngineFactory))
407+
.collect(Collectors.toList());
408+
409+
final IndicesService indicesService =
410+
new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, analysisModule.getAnalysisRegistry(),
411+
clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
412+
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays,
413+
scriptModule.getScriptService(), client, metaStateService, engineFactoryProviders);
414+
403415

404416
Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
405417
.flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,

0 commit comments

Comments
 (0)