Skip to content

Commit cc824eb

Browse files
authored
[CCR] Added more validation to follow index api. (#31068)
1 parent 1ccb34a commit cc824eb

File tree

2 files changed

+251
-56
lines changed

2 files changed

+251
-56
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java

Lines changed: 156 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,37 @@
1919
import org.elasticsearch.cluster.ClusterState;
2020
import org.elasticsearch.cluster.metadata.IndexMetaData;
2121
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
22+
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
23+
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
2224
import org.elasticsearch.cluster.service.ClusterService;
2325
import org.elasticsearch.common.inject.Inject;
2426
import org.elasticsearch.common.io.stream.StreamInput;
2527
import org.elasticsearch.common.io.stream.StreamOutput;
28+
import org.elasticsearch.common.settings.Setting;
2629
import org.elasticsearch.common.settings.Settings;
2730
import org.elasticsearch.index.IndexSettings;
31+
import org.elasticsearch.index.IndexingSlowLog;
32+
import org.elasticsearch.index.SearchSlowLog;
33+
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
34+
import org.elasticsearch.index.mapper.MapperService;
2835
import org.elasticsearch.index.shard.ShardId;
36+
import org.elasticsearch.indices.IndicesRequestCache;
37+
import org.elasticsearch.indices.IndicesService;
2938
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
3039
import org.elasticsearch.persistent.PersistentTasksService;
3140
import org.elasticsearch.threadpool.ThreadPool;
3241
import org.elasticsearch.transport.RemoteClusterAware;
3342
import org.elasticsearch.transport.RemoteClusterService;
3443
import org.elasticsearch.transport.TransportService;
44+
import org.elasticsearch.xpack.ccr.CcrSettings;
3545

3646
import java.io.IOException;
47+
import java.util.Collections;
48+
import java.util.HashSet;
49+
import java.util.Iterator;
3750
import java.util.List;
3851
import java.util.Map;
52+
import java.util.Set;
3953
import java.util.concurrent.atomic.AtomicInteger;
4054
import java.util.concurrent.atomic.AtomicReferenceArray;
4155
import java.util.stream.Collectors;
@@ -151,16 +165,18 @@ public static class TransportAction extends HandledTransportAction<Request, Resp
151165
private final ClusterService clusterService;
152166
private final RemoteClusterService remoteClusterService;
153167
private final PersistentTasksService persistentTasksService;
168+
private final IndicesService indicesService;
154169

155170
@Inject
156171
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters,
157172
IndexNameExpressionResolver indexNameExpressionResolver, Client client, ClusterService clusterService,
158-
PersistentTasksService persistentTasksService) {
173+
PersistentTasksService persistentTasksService, IndicesService indicesService) {
159174
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
160175
this.client = client;
161176
this.clusterService = clusterService;
162177
this.remoteClusterService = transportService.getRemoteClusterService();
163178
this.persistentTasksService = persistentTasksService;
179+
this.indicesService = indicesService;
164180
}
165181

166182
@Override
@@ -173,7 +189,12 @@ protected void doExecute(Request request, ActionListener<Response> listener) {
173189
if (remoteClusterIndices.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
174190
// Following an index in local cluster, so use local cluster state to fetch leader IndexMetaData:
175191
IndexMetaData leaderIndexMetadata = localClusterState.getMetaData().index(request.leaderIndex);
176-
start(request, null, leaderIndexMetadata, followIndexMetadata, listener);
192+
try {
193+
start(request, null, leaderIndexMetadata, followIndexMetadata, listener);
194+
} catch (IOException e) {
195+
listener.onFailure(e);
196+
return;
197+
}
177198
} else {
178199
// Following an index in remote cluster, so use remote client to fetch leader IndexMetaData:
179200
assert remoteClusterIndices.size() == 1;
@@ -206,81 +227,168 @@ protected void doExecute(Request request, ActionListener<Response> listener) {
206227
* </ul>
207228
*/
208229
void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMetadata, IndexMetaData followIndexMetadata,
209-
ActionListener<Response> handler) {
210-
validate (leaderIndexMetadata ,followIndexMetadata , request);
211-
final int numShards = followIndexMetadata.getNumberOfShards();
212-
final AtomicInteger counter = new AtomicInteger(numShards);
213-
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
214-
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
215-
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
216-
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) {
217-
final int shardId = i;
218-
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
219-
ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias,
220-
new ShardId(followIndexMetadata.getIndex(), shardId),
221-
new ShardId(leaderIndexMetadata.getIndex(), shardId),
222-
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders);
223-
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
224-
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
225-
@Override
226-
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> task) {
227-
responses.set(shardId, task);
228-
finalizeResponse();
229-
}
230-
230+
ActionListener<Response> handler) throws IOException {
231+
MapperService mapperService = followIndexMetadata != null ? indicesService.createIndexMapperService(followIndexMetadata) : null;
232+
validate(request, leaderIndexMetadata, followIndexMetadata, mapperService);
233+
final int numShards = followIndexMetadata.getNumberOfShards();
234+
final AtomicInteger counter = new AtomicInteger(numShards);
235+
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
236+
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
237+
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
238+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) {
239+
final int shardId = i;
240+
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
241+
ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias,
242+
new ShardId(followIndexMetadata.getIndex(), shardId),
243+
new ShardId(leaderIndexMetadata.getIndex(), shardId),
244+
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders);
245+
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
246+
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
231247
@Override
232-
public void onFailure(Exception e) {
233-
responses.set(shardId, e);
248+
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> task) {
249+
responses.set(shardId, task);
234250
finalizeResponse();
235251
}
236252

237-
void finalizeResponse() {
238-
Exception error = null;
239-
if (counter.decrementAndGet() == 0) {
240-
for (int j = 0; j < responses.length(); j++) {
241-
Object response = responses.get(j);
242-
if (response instanceof Exception) {
243-
if (error == null) {
244-
error = (Exception) response;
245-
} else {
246-
error.addSuppressed((Throwable) response);
247-
}
253+
@Override
254+
public void onFailure(Exception e) {
255+
responses.set(shardId, e);
256+
finalizeResponse();
257+
}
258+
259+
void finalizeResponse() {
260+
Exception error = null;
261+
if (counter.decrementAndGet() == 0) {
262+
for (int j = 0; j < responses.length(); j++) {
263+
Object response = responses.get(j);
264+
if (response instanceof Exception) {
265+
if (error == null) {
266+
error = (Exception) response;
267+
} else {
268+
error.addSuppressed((Throwable) response);
248269
}
249270
}
271+
}
250272

251-
if (error == null) {
252-
// include task ids?
253-
handler.onResponse(new Response(true));
254-
} else {
255-
// TODO: cancel all started tasks
256-
handler.onFailure(error);
257-
}
273+
if (error == null) {
274+
// include task ids?
275+
handler.onResponse(new Response(true));
276+
} else {
277+
// TODO: cancel all started tasks
278+
handler.onFailure(error);
258279
}
259280
}
260281
}
282+
}
261283
);
262284
}
263285
}
264286
}
265287

288+
private static final Set<Setting<?>> WHITELISTED_SETTINGS;
289+
290+
static {
291+
Set<Setting<?>> whiteListedSettings = new HashSet<>();
292+
whiteListedSettings.add(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING);
293+
whiteListedSettings.add(IndexMetaData.INDEX_AUTO_EXPAND_REPLICAS_SETTING);
294+
295+
whiteListedSettings.add(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING);
296+
whiteListedSettings.add(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING);
297+
whiteListedSettings.add(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING);
298+
whiteListedSettings.add(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING);
299+
whiteListedSettings.add(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING);
300+
whiteListedSettings.add(ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING);
301+
302+
whiteListedSettings.add(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING);
303+
whiteListedSettings.add(IndexSettings.MAX_RESULT_WINDOW_SETTING);
304+
whiteListedSettings.add(IndexSettings.INDEX_WARMER_ENABLED_SETTING);
305+
whiteListedSettings.add(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING);
306+
whiteListedSettings.add(IndexSettings.MAX_RESCORE_WINDOW_SETTING);
307+
whiteListedSettings.add(IndexSettings.MAX_INNER_RESULT_WINDOW_SETTING);
308+
whiteListedSettings.add(IndexSettings.DEFAULT_FIELD_SETTING);
309+
whiteListedSettings.add(IndexSettings.QUERY_STRING_LENIENT_SETTING);
310+
whiteListedSettings.add(IndexSettings.QUERY_STRING_ANALYZE_WILDCARD);
311+
whiteListedSettings.add(IndexSettings.QUERY_STRING_ALLOW_LEADING_WILDCARD);
312+
whiteListedSettings.add(IndexSettings.ALLOW_UNMAPPED);
313+
whiteListedSettings.add(IndexSettings.INDEX_SEARCH_IDLE_AFTER);
314+
whiteListedSettings.add(BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING);
315+
316+
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG_SETTING);
317+
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN_SETTING);
318+
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO_SETTING);
319+
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_TRACE_SETTING);
320+
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_WARN_SETTING);
321+
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_DEBUG_SETTING);
322+
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_INFO_SETTING);
323+
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_TRACE_SETTING);
324+
whiteListedSettings.add(SearchSlowLog.INDEX_SEARCH_SLOWLOG_LEVEL);
325+
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN_SETTING);
326+
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG_SETTING);
327+
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO_SETTING);
328+
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_TRACE_SETTING);
329+
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_LEVEL_SETTING);
330+
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_REFORMAT_SETTING);
331+
whiteListedSettings.add(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_MAX_SOURCE_CHARS_TO_LOG_SETTING);
332+
333+
whiteListedSettings.add(IndexSettings.INDEX_SOFT_DELETES_SETTING);
334+
whiteListedSettings.add(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING);
335+
336+
WHITELISTED_SETTINGS = Collections.unmodifiableSet(whiteListedSettings);
337+
}
266338

267-
static void validate(IndexMetaData leaderIndex, IndexMetaData followIndex, Request request) {
339+
static void validate(Request request, IndexMetaData leaderIndex, IndexMetaData followIndex, MapperService followerMapperService) {
268340
if (leaderIndex == null) {
269341
throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist");
270342
}
271-
272343
if (followIndex == null) {
273344
throw new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist");
274345
}
275346
if (leaderIndex.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) == false) {
276347
throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not have soft deletes enabled");
277348
}
278-
279349
if (leaderIndex.getNumberOfShards() != followIndex.getNumberOfShards()) {
280350
throw new IllegalArgumentException("leader index primary shards [" + leaderIndex.getNumberOfShards() +
281351
"] does not match with the number of shards of the follow index [" + followIndex.getNumberOfShards() + "]");
282352
}
283-
// TODO: other validation checks
353+
if (leaderIndex.getRoutingNumShards() != followIndex.getRoutingNumShards()) {
354+
throw new IllegalArgumentException("leader index number_of_routing_shards [" + leaderIndex.getRoutingNumShards() +
355+
"] does not match with the number_of_routing_shards of the follow index [" + followIndex.getRoutingNumShards() + "]");
356+
}
357+
if (leaderIndex.getState() != IndexMetaData.State.OPEN || followIndex.getState() != IndexMetaData.State.OPEN) {
358+
throw new IllegalArgumentException("leader and follow index must be open");
359+
}
360+
361+
// Make a copy, remove settings that are allowed to be different and then compare if the settings are equal.
362+
Settings leaderSettings = filter(leaderIndex.getSettings());
363+
Settings followerSettings = filter(followIndex.getSettings());
364+
if (leaderSettings.equals(followerSettings) == false) {
365+
throw new IllegalArgumentException("the leader and follower index settings must be identical");
366+
}
367+
368+
// Validates if the current follower mapping is mergable with the leader mapping.
369+
// This also validates for example whether specific mapper plugins have been installed
370+
followerMapperService.merge(leaderIndex, MapperService.MergeReason.MAPPING_RECOVERY);
371+
}
372+
373+
private static Settings filter(Settings originalSettings) {
374+
Settings.Builder settings = Settings.builder().put(originalSettings);
375+
// Remove settings that are always going to be different between leader and follow index:
376+
settings.remove(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey());
377+
settings.remove(IndexMetaData.SETTING_INDEX_UUID);
378+
settings.remove(IndexMetaData.SETTING_INDEX_PROVIDED_NAME);
379+
settings.remove(IndexMetaData.SETTING_CREATION_DATE);
380+
381+
Iterator<String> iterator = settings.keys().iterator();
382+
while (iterator.hasNext()) {
383+
String key = iterator.next();
384+
for (Setting<?> whitelistedSetting : WHITELISTED_SETTINGS) {
385+
if (whitelistedSetting.match(key)) {
386+
iterator.remove();
387+
break;
388+
}
389+
}
390+
}
391+
return settings.build();
284392
}
285393

286394
}

0 commit comments

Comments
 (0)