Skip to content

Commit db49494

Browse files
committed
Enable engine factory to be pluggable
This commit enables the engine factory to be pluggable based on index settings used when creating the index service for an index. Relates #26827
1 parent edac71d commit db49494

File tree

13 files changed

+301
-62
lines changed

13 files changed

+301
-62
lines changed

core/src/main/java/org/elasticsearch/index/IndexModule.java

+22-5
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.elasticsearch.index.shard.IndexSearcherWrapper;
4040
import org.elasticsearch.index.shard.IndexingOperationListener;
4141
import org.elasticsearch.index.shard.SearchOperationListener;
42-
import org.elasticsearch.index.shard.ShardId;
4342
import org.elasticsearch.index.similarity.BM25SimilarityProvider;
4443
import org.elasticsearch.index.similarity.SimilarityProvider;
4544
import org.elasticsearch.index.similarity.SimilarityService;
@@ -59,6 +58,7 @@
5958
import java.util.List;
6059
import java.util.Locale;
6160
import java.util.Map;
61+
import java.util.Objects;
6262
import java.util.Set;
6363
import java.util.concurrent.atomic.AtomicBoolean;
6464
import java.util.function.BiFunction;
@@ -108,8 +108,7 @@ public final class IndexModule {
108108

109109
private final IndexSettings indexSettings;
110110
private final AnalysisRegistry analysisRegistry;
111-
// pkg private so tests can mock
112-
final SetOnce<EngineFactory> engineFactory = new SetOnce<>();
111+
private final EngineFactory engineFactory;
113112
private SetOnce<IndexSearcherWrapperFactory> indexSearcherWrapper = new SetOnce<>();
114113
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
115114
private final Map<String, SimilarityProvider.Factory> similarities = new HashMap<>();
@@ -119,9 +118,18 @@ public final class IndexModule {
119118
private final List<IndexingOperationListener> indexOperationListeners = new ArrayList<>();
120119
private final AtomicBoolean frozen = new AtomicBoolean(false);
121120

122-
public IndexModule(IndexSettings indexSettings, AnalysisRegistry analysisRegistry) {
121+
/**
122+
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
123+
* via {@link org.elasticsearch.plugins.PluginsService#onIndexModule(IndexModule)}.
124+
*
125+
* @param indexSettings the index settings
126+
* @param analysisRegistry the analysis registry
127+
* @param engineFactory the engine factory
128+
*/
129+
public IndexModule(final IndexSettings indexSettings, final AnalysisRegistry analysisRegistry, final EngineFactory engineFactory) {
123130
this.indexSettings = indexSettings;
124131
this.analysisRegistry = analysisRegistry;
132+
this.engineFactory = Objects.requireNonNull(engineFactory);
125133
this.searchOperationListeners.add(new SearchSlowLog(indexSettings));
126134
this.indexOperationListeners.add(new IndexingSlowLog(indexSettings));
127135
}
@@ -162,6 +170,15 @@ public Index getIndex() {
162170
return indexSettings.getIndex();
163171
}
164172

173+
/**
174+
* The engine factory provided during construction of this index module.
175+
*
176+
* @return the engine factory
177+
*/
178+
EngineFactory getEngineFactory() {
179+
return engineFactory;
180+
}
181+
165182
/**
166183
* Adds an {@link IndexEventListener} for this index. All listeners added here
167184
* are maintained for the entire index lifecycle on this node. Once an index is closed or deleted these
@@ -363,7 +380,7 @@ public IndexService newIndexService(
363380
}
364381
return new IndexService(indexSettings, environment, xContentRegistry,
365382
new SimilarityService(indexSettings, scriptService, similarities),
366-
shardStoreDeleter, analysisRegistry, engineFactory.get(), circuitBreakerService, bigArrays, threadPool, scriptService,
383+
shardStoreDeleter, analysisRegistry, engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService,
367384
client, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry,
368385
indicesFieldDataCache, searchOperationListeners, indexOperationListeners, namedWriteableRegistry);
369386
}

core/src/main/java/org/elasticsearch/index/IndexService.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public IndexService(
137137
SimilarityService similarityService,
138138
ShardStoreDeleter shardStoreDeleter,
139139
AnalysisRegistry registry,
140-
@Nullable EngineFactory engineFactory,
140+
EngineFactory engineFactory,
141141
CircuitBreakerService circuitBreakerService,
142142
BigArrays bigArrays,
143143
ThreadPool threadPool,
@@ -185,7 +185,7 @@ public IndexService(
185185
this.warmer = new IndexWarmer(indexSettings.getSettings(), threadPool, indexFieldData,
186186
bitsetFilterCache.createListener(threadPool));
187187
this.indexCache = new IndexCache(indexSettings, queryCache, bitsetFilterCache);
188-
this.engineFactory = engineFactory;
188+
this.engineFactory = Objects.requireNonNull(engineFactory);
189189
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
190190
this.searcherWrapper = wrapperFactory.newWrapper(this);
191191
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
@@ -657,9 +657,9 @@ public interface ShardStoreDeleter {
657657
void addPendingDelete(ShardId shardId, IndexSettings indexSettings);
658658
}
659659

660-
final EngineFactory getEngineFactory() {
660+
public final EngineFactory getEngineFactory() {
661661
return engineFactory;
662-
} // pkg private for testing
662+
}
663663

664664
final IndexSearcherWrapper getSearcherWrapper() {
665665
return searcherWrapper;

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@
8080
import org.elasticsearch.index.engine.EngineException;
8181
import org.elasticsearch.index.engine.EngineFactory;
8282
import org.elasticsearch.index.engine.InternalEngine;
83-
import org.elasticsearch.index.engine.InternalEngineFactory;
8483
import org.elasticsearch.index.engine.RefreshFailedEngineException;
8584
import org.elasticsearch.index.engine.Segment;
8685
import org.elasticsearch.index.engine.SegmentsStats;
@@ -195,7 +194,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
195194
protected volatile IndexShardState state;
196195
protected volatile long primaryTerm;
197196
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
198-
protected final EngineFactory engineFactory;
197+
final EngineFactory engineFactory;
199198

200199
private final IndexingOperationListener indexingOperationListeners;
201200
private final Runnable globalCheckpointSyncer;
@@ -265,7 +264,7 @@ public IndexShard(
265264
this.warmer = warmer;
266265
this.similarityService = similarityService;
267266
Objects.requireNonNull(store, "Store must be provided to the index shard");
268-
this.engineFactory = engineFactory == null ? new InternalEngineFactory() : engineFactory;
267+
this.engineFactory = Objects.requireNonNull(engineFactory);
269268
this.store = store;
270269
this.indexSortSupplier = indexSortSupplier;
271270
this.indexEventListener = indexEventListener;

core/src/main/java/org/elasticsearch/indices/IndicesService.java

+42-3
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.elasticsearch.common.Nullable;
4646
import org.elasticsearch.common.breaker.CircuitBreaker;
4747
import org.elasticsearch.common.bytes.BytesReference;
48+
import org.elasticsearch.common.collect.Tuple;
4849
import org.elasticsearch.common.component.AbstractLifecycleComponent;
4950
import org.elasticsearch.common.io.FileSystemUtils;
5051
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -78,6 +79,8 @@
7879
import org.elasticsearch.index.IndexSettings;
7980
import org.elasticsearch.index.analysis.AnalysisRegistry;
8081
import org.elasticsearch.index.cache.request.ShardRequestCache;
82+
import org.elasticsearch.index.engine.EngineFactory;
83+
import org.elasticsearch.index.engine.InternalEngineFactory;
8184
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
8285
import org.elasticsearch.index.flush.FlushStats;
8386
import org.elasticsearch.index.get.GetStats;
@@ -101,6 +104,7 @@
101104
import org.elasticsearch.indices.mapper.MapperRegistry;
102105
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
103106
import org.elasticsearch.indices.recovery.RecoveryState;
107+
import org.elasticsearch.plugins.EnginePlugin;
104108
import org.elasticsearch.plugins.PluginsService;
105109
import org.elasticsearch.repositories.RepositoriesService;
106110
import org.elasticsearch.script.ScriptService;
@@ -115,10 +119,14 @@
115119
import java.io.IOException;
116120
import java.nio.file.Files;
117121
import java.util.ArrayList;
122+
import java.util.Collection;
118123
import java.util.HashMap;
119124
import java.util.Iterator;
120125
import java.util.List;
126+
import java.util.Locale;
121127
import java.util.Map;
128+
import java.util.Objects;
129+
import java.util.Optional;
122130
import java.util.Set;
123131
import java.util.concurrent.CountDownLatch;
124132
import java.util.concurrent.ExecutorService;
@@ -169,6 +177,7 @@ public class IndicesService extends AbstractLifecycleComponent
169177
private final IndicesRequestCache indicesRequestCache;
170178
private final IndicesQueryCache indicesQueryCache;
171179
private final MetaStateService metaStateService;
180+
private final Collection<EnginePlugin> enginePlugins;
172181

173182
@Override
174183
protected void doStart() {
@@ -180,7 +189,8 @@ public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvi
180189
AnalysisRegistry analysisRegistry, IndexNameExpressionResolver indexNameExpressionResolver,
181190
MapperRegistry mapperRegistry, NamedWriteableRegistry namedWriteableRegistry, ThreadPool threadPool,
182191
IndexScopedSettings indexScopedSettings, CircuitBreakerService circuitBreakerService, BigArrays bigArrays,
183-
ScriptService scriptService, Client client, MetaStateService metaStateService) {
192+
ScriptService scriptService, Client client, MetaStateService metaStateService,
193+
Collection<EnginePlugin> enginePlugins) {
184194
super(settings);
185195
this.threadPool = threadPool;
186196
this.pluginsService = pluginsService;
@@ -211,6 +221,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
211221
this.cleanInterval = INDICES_CACHE_CLEAN_INTERVAL_SETTING.get(settings);
212222
this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, indicesRequestCache, logger, threadPool, this.cleanInterval);
213223
this.metaStateService = metaStateService;
224+
this.enginePlugins = enginePlugins;
214225
}
215226

216227
@Override
@@ -437,7 +448,7 @@ private synchronized IndexService createIndexService(final String reason,
437448
idxSettings.getNumberOfReplicas(),
438449
reason);
439450

440-
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry);
451+
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings));
441452
for (IndexingOperationListener operationListener : indexingOperationListeners) {
442453
indexModule.addIndexOperationListener(operationListener);
443454
}
@@ -461,6 +472,34 @@ private synchronized IndexService createIndexService(final String reason,
461472
);
462473
}
463474

475+
private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
476+
final List<Tuple<EnginePlugin, Optional<EngineFactory>>> engineFactories =
477+
enginePlugins
478+
.stream()
479+
.map(p -> Tuple.tuple(p, p.getEngineFactory(idxSettings)))
480+
.filter(t -> Objects.requireNonNull(t.v2()).isPresent())
481+
.collect(Collectors.toList());
482+
if (engineFactories.isEmpty()) {
483+
return new InternalEngineFactory();
484+
} else if (engineFactories.size() == 1) {
485+
assert engineFactories.get(0).v2().isPresent();
486+
return engineFactories.get(0).v2().get();
487+
} else {
488+
final String message = String.format(
489+
Locale.ROOT,
490+
"multiple plugins provided engine factories for %s: %s",
491+
idxSettings.getIndex(),
492+
engineFactories
493+
.stream()
494+
.map(t -> {
495+
assert t.v2().isPresent();
496+
return "[" + t.v1().getClass().getName() + "/" + t.v2().get().getClass().getName() + "]";
497+
})
498+
.collect(Collectors.joining(",")));
499+
throw new IllegalStateException(message);
500+
}
501+
}
502+
464503
/**
465504
* creates a new mapper service for the given index, in order to do administrative work like mapping updates.
466505
* This *should not* be used for document parsing. Doing so will result in an exception.
@@ -469,7 +508,7 @@ private synchronized IndexService createIndexService(final String reason,
469508
*/
470509
public synchronized MapperService createIndexMapperService(IndexMetaData indexMetaData) throws IOException {
471510
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexScopeSetting);
472-
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry);
511+
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings));
473512
pluginsService.onIndexModule(indexModule);
474513
return indexModule.newIndexMapperService(xContentRegistry, mapperRegistry, scriptService);
475514
}

core/src/main/java/org/elasticsearch/node/Node.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@
100100
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
101101
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
102102
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
103-
import org.elasticsearch.indices.mapper.MapperRegistry;
104103
import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
105104
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
106105
import org.elasticsearch.indices.recovery.RecoverySettings;
@@ -112,6 +111,7 @@
112111
import org.elasticsearch.plugins.AnalysisPlugin;
113112
import org.elasticsearch.plugins.ClusterPlugin;
114113
import org.elasticsearch.plugins.DiscoveryPlugin;
114+
import org.elasticsearch.plugins.EnginePlugin;
115115
import org.elasticsearch.plugins.IngestPlugin;
116116
import org.elasticsearch.plugins.MapperPlugin;
117117
import org.elasticsearch.plugins.MetaDataUpgrader;
@@ -385,11 +385,15 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
385385
.flatMap(Function.identity()).collect(toList()));
386386
modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), xContentRegistry));
387387
final MetaStateService metaStateService = new MetaStateService(settings, nodeEnvironment, xContentRegistry);
388-
final IndicesService indicesService = new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry,
389-
analysisModule.getAnalysisRegistry(),
390-
clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
391-
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptModule.getScriptService(),
392-
client, metaStateService);
388+
389+
// collect engine factory providers per plugin
390+
final Collection<EnginePlugin> enginePlugins = pluginsService.filterPlugins(EnginePlugin.class);
391+
392+
final IndicesService indicesService =
393+
new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, analysisModule.getAnalysisRegistry(),
394+
clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
395+
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays,
396+
scriptModule.getScriptService(), client, metaStateService, enginePlugins);
393397

394398
Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
395399
.flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.plugins;
21+
22+
import org.elasticsearch.index.IndexSettings;
23+
import org.elasticsearch.index.engine.EngineFactory;
24+
25+
import java.util.Optional;
26+
27+
/**
28+
* A plugin that provides alternative engine implementations.
29+
*/
30+
public interface EnginePlugin {
31+
32+
/**
33+
* When an index is created this method is invoked for each engine plugin. Engine plugins can inspect the index settings to determine
34+
* whether or not to provide an engine factory for the given index. A plugin that is not overriding the default engine should return
35+
* {@link Optional#empty()}. If multiple plugins return an engine factory for a given index the index will not be created and an
36+
* {@link IllegalStateException} will be thrown during index creation.
37+
*
38+
* @return an optional engine factory
39+
*/
40+
Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings);
41+
42+
}

0 commit comments

Comments
 (0)