Skip to content

[CCR] Add validation checks that were left out of #30120 #30463

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public void testFollowIndex() throws Exception {
final String indexName2 = "index2";
if (runningAgainstLeaderCluster) {
logger.info("Running against leader cluster");
Settings indexSettings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.build();
createIndex(indexName1, indexSettings);
createIndex(indexName2, indexSettings);
for (int i = 0; i < numDocs; i++) {
logger.info("Indexing doc [{}]", i);
index(indexName1, Integer.toString(i), "field", i);
Expand Down Expand Up @@ -169,6 +174,10 @@ private static Map<String, Object> toMap(String response) {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false);
}

protected static void createIndex(String name, Settings settings) throws IOException {
createIndex(name, settings, "");
}

protected static void createIndex(String name, Settings settings, String mapping) throws IOException {
assertOK(adminClient().performRequest(HttpPut.METHOD_NAME, name, Collections.emptyMap(),
new StringEntity("{ \"settings\": " + Strings.toString(settings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
Expand Down Expand Up @@ -224,29 +225,13 @@ protected void doExecute(Request request, ActionListener<Response> listener) {
*/
void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMetadata, IndexMetaData followIndexMetadata,
ActionListener<Response> handler) {
if (leaderIndexMetadata == null) {
handler.onFailure(new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist"));
return;
}

if (followIndexMetadata == null) {
handler.onFailure(new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist"));
return;
}

if (leaderIndexMetadata.getNumberOfShards() != followIndexMetadata.getNumberOfShards()) {
handler.onFailure(new IllegalArgumentException("leader index primary shards [" +
leaderIndexMetadata.getNumberOfShards() + "] does not match with the number of " +
"shards of the follow index [" + followIndexMetadata.getNumberOfShards() + "]"));
// TODO: other validation checks
} else {
validate (leaderIndexMetadata ,followIndexMetadata , request);
final int numShards = followIndexMetadata.getNumberOfShards();
final AtomicInteger counter = new AtomicInteger(numShards);
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
for (int i = 0; i < numShards; i++) {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias,
Expand All @@ -261,39 +246,59 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowT
finalizeResponse();
}

@Override
public void onFailure(Exception e) {
responses.set(shardId, e);
finalizeResponse();
}
@Override
public void onFailure(Exception e) {
responses.set(shardId, e);
finalizeResponse();
}

void finalizeResponse() {
Exception error = null;
if (counter.decrementAndGet() == 0) {
for (int j = 0; j < responses.length(); j++) {
Object response = responses.get(j);
if (response instanceof Exception) {
if (error == null) {
error = (Exception) response;
} else {
error.addSuppressed((Throwable) response);
}
void finalizeResponse() {
Exception error = null;
if (counter.decrementAndGet() == 0) {
for (int j = 0; j < responses.length(); j++) {
Object response = responses.get(j);
if (response instanceof Exception) {
if (error == null) {
error = (Exception) response;
} else {
error.addSuppressed((Throwable) response);
}
}
}

if (error == null) {
// include task ids?
handler.onResponse(new Response(true));
} else {
// TODO: cancel all started tasks
handler.onFailure(error);
}
if (error == null) {
// include task ids?
handler.onResponse(new Response(true));
} else {
// TODO: cancel all started tasks
handler.onFailure(error);
}
}
}
);
}
}
);
}
}
}


static void validate(IndexMetaData leaderIndex, IndexMetaData followIndex, Request request) {
if (leaderIndex == null) {
throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist");
}

if (followIndex == null) {
throw new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist");
}
if (leaderIndex.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) == false) {
throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not have soft deletes enabled");
}

if (leaderIndex.getNumberOfShards() != followIndex.getNumberOfShards()) {
throw new IllegalArgumentException("leader index primary shards [" + leaderIndex.getNumberOfShards() +
"] does not match with the number of shards of the follow index [" + followIndex.getNumberOfShards() + "]");
}
// TODO: other validation checks
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
Expand Down Expand Up @@ -143,7 +144,8 @@ public void testGetOperationsBasedOnGlobalSequenceId() throws Exception {
public void testFollowIndex() throws Exception {
final int numberOfPrimaryShards = randomIntBetween(1, 3);

final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, Collections.emptyMap());
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards,
Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));

final String followerIndexSettings =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.test.ESTestCase;

import static org.hamcrest.Matchers.equalTo;

public class FollowExistingIndexActionTests extends ESTestCase {

public void testValidation() {
FollowExistingIndexAction.Request request = new FollowExistingIndexAction.Request();
request.setLeaderIndex("index1");
request.setFollowIndex("index2");

{
Exception e = expectThrows(IllegalArgumentException.class, () -> FollowExistingIndexAction.validate(null, null, request));
assertThat(e.getMessage(), equalTo("leader index [index1] does not exist"));
}
{
IndexMetaData leaderIMD = createIMD("index1", 5);
Exception e = expectThrows(IllegalArgumentException.class, () -> FollowExistingIndexAction.validate(leaderIMD, null, request));
assertThat(e.getMessage(), equalTo("follow index [index2] does not exist"));
}
{
IndexMetaData leaderIMD = createIMD("index1", 5);
IndexMetaData followIMD = createIMD("index2", 5);
Exception e = expectThrows(IllegalArgumentException.class,
() -> FollowExistingIndexAction.validate(leaderIMD, followIMD, request));
assertThat(e.getMessage(), equalTo("leader index [index1] does not have soft deletes enabled"));
}
{
IndexMetaData leaderIMD = createIMD("index1", 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
IndexMetaData followIMD = createIMD("index2", 4);
Exception e = expectThrows(IllegalArgumentException.class,
() -> FollowExistingIndexAction.validate(leaderIMD, followIMD, request));
assertThat(e.getMessage(),
equalTo("leader index primary shards [5] does not match with the number of shards of the follow index [4]"));
}
{
IndexMetaData leaderIMD = createIMD("index1", 5, new Tuple<>(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
IndexMetaData followIMD = createIMD("index2", 5);
FollowExistingIndexAction.validate(leaderIMD, followIMD, request);
}
}

private static IndexMetaData createIMD(String index, int numShards, Tuple<?, ?>... settings) {
Settings.Builder settingsBuilder = settings(Version.CURRENT);
for (Tuple<?, ?> setting : settings) {
settingsBuilder.put((String) setting.v1(), (String) setting.v2());
}
return IndexMetaData.builder(index).settings(settingsBuilder)
.numberOfShards(numShards)
.numberOfReplicas(0)
.setRoutingNumShards(numShards).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
indices.create:
index: foo
body:
settings:
index:
soft_deletes:
enabled: true
mappings:
doc:
properties:
Expand Down