Skip to content

Commit 569ef55

Browse files
committed
Merge remote-tracking branch 'es/ccr' into ccr_more_unit_tests
2 parents daddefa + d88c76e commit 569ef55

File tree

6 files changed

+229
-14
lines changed

6 files changed

+229
-14
lines changed

test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
9898

9999
protected final Index index = new Index("test", "uuid");
100100
private final ShardId shardId = new ShardId(index, 0);
101-
private final Map<String, String> indexMapping = Collections.singletonMap("type", "{ \"type\": {} }");
101+
protected final Map<String, String> indexMapping = Collections.singletonMap("type", "{ \"type\": {} }");
102102

103103
protected ReplicationGroup createGroup(int replicas) throws IOException {
104104
return createGroup(replicas, Settings.EMPTY);
@@ -157,7 +157,7 @@ protected class ReplicationGroup implements AutoCloseable, Iterable<IndexShard>
157157
}
158158
});
159159

160-
ReplicationGroup(final IndexMetaData indexMetaData) throws IOException {
160+
protected ReplicationGroup(final IndexMetaData indexMetaData) throws IOException {
161161
final ShardRouting primaryRouting = this.createShardRouting("s0", true);
162162
primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting), () -> {});
163163
replicas = new CopyOnWriteArrayList<>();
@@ -451,15 +451,15 @@ private void updateAllocationIDsOnPrimary() throws IOException {
451451
}
452452
}
453453

454-
abstract class ReplicationAction<Request extends ReplicationRequest<Request>,
454+
protected abstract class ReplicationAction<Request extends ReplicationRequest<Request>,
455455
ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
456456
Response extends ReplicationResponse> {
457457
private final Request request;
458458
private ActionListener<Response> listener;
459459
private final ReplicationGroup replicationGroup;
460460
private final String opType;
461461

462-
ReplicationAction(Request request, ActionListener<Response> listener, ReplicationGroup group, String opType) {
462+
protected ReplicationAction(Request request, ActionListener<Response> listener, ReplicationGroup group, String opType) {
463463
this.request = request;
464464
this.listener = listener;
465465
this.replicationGroup = group;
@@ -585,11 +585,11 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, R
585585
}
586586
}
587587

588-
class PrimaryResult implements ReplicationOperation.PrimaryResult<ReplicaRequest> {
588+
protected class PrimaryResult implements ReplicationOperation.PrimaryResult<ReplicaRequest> {
589589
final ReplicaRequest replicaRequest;
590590
final Response finalResponse;
591591

592-
PrimaryResult(ReplicaRequest replicaRequest, Response finalResponse) {
592+
public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponse) {
593593
this.replicaRequest = replicaRequest;
594594
this.finalResponse = finalResponse;
595595
}

x-pack/plugin/ccr/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ archivesBaseName = 'x-pack-ccr'
1717
integTest.enabled = false
1818

1919
compileJava.options.compilerArgs << "-Xlint:-try"
20+
compileTestJava.options.compilerArgs << "-Xlint:-try"
2021

2122
// Instead we create a separate task to run the
2223
// tests based on ESIntegTestCase

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
7171
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo));
7272

7373
ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers,
74-
ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler, TimeValue idleShardChangesRequestDelay,
75-
TimeValue retryTimeout) {
74+
ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler) {
7675
super(id, type, action, description, parentTask, headers);
7776
this.params = params;
7877
this.scheduler = scheduler;
79-
this.retryTimeout = retryTimeout;
80-
this.idleShardChangesRequestDelay = idleShardChangesRequestDelay;
78+
this.retryTimeout = params.getRetryTimeout();
79+
this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay();
8180
}
8281

8382
void start(long followerGlobalCheckpoint) {

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,7 @@ protected AllocatedPersistentTask createTask(long id, String type, String action
9090
Client followerClient = wrapClient(client, params);
9191
BiConsumer<TimeValue, Runnable> scheduler =
9292
(delay, command) -> threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, command);
93-
return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers, params,
94-
scheduler, params.getIdleShardRetryDelay(), params.getRetryTimeout()) {
93+
return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers, params, scheduler) {
9594

9695
@Override
9796
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ protected WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResp
6262
return shardOperationOnPrimary(request.shardId(), request.getOperations(), primary, logger);
6363
}
6464

65-
static WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
65+
// public for testing purposes only
66+
public static WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
6667
final ShardId shardId,
6768
final List<Translog.Operation> sourceOperations,
6869
final IndexShard primary,
@@ -115,7 +116,8 @@ protected WriteReplicaResult<BulkShardOperationsRequest> shardOperationOnReplica
115116
return new WriteReplicaResult<>(request, location, null, replica, logger);
116117
}
117118

118-
private static Translog.Location applyTranslogOperations(
119+
// public for testing purposes only
120+
public static Translog.Location applyTranslogOperations(
119121
final List<Translog.Operation> operations, final IndexShard shard, final Engine.Operation.Origin origin) throws IOException {
120122
Translog.Location location = null;
121123
for (final Translog.Operation operation : operations) {
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ccr.action;
7+
8+
import org.elasticsearch.Version;
9+
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.support.replication.TransportWriteAction;
11+
import org.elasticsearch.cluster.metadata.IndexMetaData;
12+
import org.elasticsearch.cluster.routing.ShardRouting;
13+
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.common.unit.TimeValue;
15+
import org.elasticsearch.index.IndexSettings;
16+
import org.elasticsearch.index.engine.Engine.Operation.Origin;
17+
import org.elasticsearch.index.engine.EngineFactory;
18+
import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase;
19+
import org.elasticsearch.index.shard.IndexShard;
20+
import org.elasticsearch.index.shard.ShardId;
21+
import org.elasticsearch.index.translog.Translog;
22+
import org.elasticsearch.threadpool.ThreadPool;
23+
import org.elasticsearch.xpack.ccr.CcrSettings;
24+
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
25+
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
26+
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
27+
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
28+
29+
import java.io.IOException;
30+
import java.util.ArrayList;
31+
import java.util.Collections;
32+
import java.util.List;
33+
import java.util.concurrent.atomic.AtomicBoolean;
34+
import java.util.function.BiConsumer;
35+
import java.util.function.Consumer;
36+
import java.util.function.LongConsumer;
37+
38+
public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTestCase {
39+
40+
public void testSimpleCcrReplication() throws Exception {
41+
try (ReplicationGroup leaderGroup = createGroup(randomInt(2));
42+
ReplicationGroup followerGroup = createFollowGroup(randomInt(2))) {
43+
leaderGroup.startAll();
44+
int docCount = leaderGroup.appendDocs(randomInt(64));
45+
leaderGroup.assertAllEqual(docCount);
46+
followerGroup.startAll();
47+
ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
48+
shardFollowTask.start(followerGroup.getPrimary().getGlobalCheckpoint());
49+
docCount += leaderGroup.appendDocs(randomInt(128));
50+
leaderGroup.syncGlobalCheckpoint();
51+
52+
leaderGroup.assertAllEqual(docCount);
53+
int expectedCount = docCount;
54+
assertBusy(() -> followerGroup.assertAllEqual(expectedCount));
55+
shardFollowTask.markAsCompleted();
56+
}
57+
}
58+
59+
public void testFailLeaderReplicaShard() throws Exception {
60+
try (ReplicationGroup leaderGroup = createGroup(1 + randomInt(1));
61+
ReplicationGroup followerGroup = createFollowGroup(randomInt(2))) {
62+
leaderGroup.startAll();
63+
followerGroup.startAll();
64+
ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
65+
shardFollowTask.start(followerGroup.getPrimary().getGlobalCheckpoint());
66+
int docCount = 256;
67+
leaderGroup.appendDocs(1);
68+
Runnable task = () -> {
69+
try {
70+
leaderGroup.appendDocs(docCount - 1);
71+
leaderGroup.syncGlobalCheckpoint();
72+
} catch (Exception e) {
73+
throw new AssertionError(e);
74+
}
75+
};
76+
Thread thread = new Thread(task);
77+
thread.start();
78+
79+
// Remove and add a new replica
80+
IndexShard luckyReplica = randomFrom(leaderGroup.getReplicas());
81+
leaderGroup.removeReplica(luckyReplica);
82+
luckyReplica.close("stop replica", false);
83+
luckyReplica.store().close();
84+
leaderGroup.addReplica();
85+
leaderGroup.startReplicas(1);
86+
thread.join();
87+
88+
leaderGroup.assertAllEqual(docCount);
89+
assertBusy(() -> followerGroup.assertAllEqual(docCount));
90+
shardFollowTask.markAsCompleted();
91+
}
92+
}
93+
94+
@Override
95+
protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException {
96+
Settings newSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
97+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, replicas)
98+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
99+
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
100+
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10000)
101+
.put(settings)
102+
.build();
103+
if (CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(newSettings)) {
104+
IndexMetaData metaData = buildIndexMetaData(replicas, newSettings, indexMapping);
105+
return new ReplicationGroup(metaData) {
106+
107+
@Override
108+
protected EngineFactory getEngineFactory(ShardRouting routing) {
109+
return new FollowingEngineFactory();
110+
}
111+
};
112+
} else {
113+
return super.createGroup(replicas, newSettings);
114+
}
115+
}
116+
117+
private ReplicationGroup createFollowGroup(int replicas) throws IOException {
118+
Settings.Builder settingsBuilder = Settings.builder();
119+
settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
120+
return createGroup(replicas, settingsBuilder.build());
121+
}
122+
123+
private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, ReplicationGroup followerGroup) {
124+
ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0),
125+
new ShardId("leader_index", "", 0), 1024, 1, Long.MAX_VALUE, 1, 10240,
126+
TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap());
127+
128+
BiConsumer<TimeValue, Runnable> scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task);
129+
AtomicBoolean stopped = new AtomicBoolean(false);
130+
return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler) {
131+
@Override
132+
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
133+
// noop, as mapping updates are not tested
134+
handler.accept(1L);
135+
}
136+
137+
@Override
138+
protected void innerSendBulkShardOperationsRequest(List<Translog.Operation> operations, LongConsumer handler,
139+
Consumer<Exception> errorHandler) {
140+
Runnable task = () -> {
141+
BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(), operations);
142+
ActionListener<BulkShardOperationsResponse> listener =
143+
ActionListener.wrap(r -> handler.accept(r.getGlobalCheckpoint()), errorHandler);
144+
new CCRAction(request, listener, followerGroup).execute();
145+
};
146+
threadPool.executor(ThreadPool.Names.GENERIC).execute(task);
147+
}
148+
149+
@Override
150+
protected void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer<ShardChangesAction.Response> handler,
151+
Consumer<Exception> errorHandler) {
152+
Runnable task = () -> {
153+
List<IndexShard> indexShards = new ArrayList<>(leaderGroup.getReplicas());
154+
indexShards.add(leaderGroup.getPrimary());
155+
Collections.shuffle(indexShards, random());
156+
157+
Exception exception = null;
158+
for (IndexShard indexShard : indexShards) {
159+
long globalCheckpoint = indexShard.getGlobalCheckpoint();
160+
try {
161+
Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, globalCheckpoint, from,
162+
maxOperationCount, params.getMaxBatchSizeInBytes());
163+
// Hard code index metadata version, this is ok, as mapping updates are not tested here.
164+
handler.accept(new ShardChangesAction.Response(1L, globalCheckpoint, ops));
165+
return;
166+
} catch (Exception e) {
167+
exception = e;
168+
}
169+
}
170+
assert exception != null;
171+
errorHandler.accept(exception);
172+
};
173+
threadPool.executor(ThreadPool.Names.GENERIC).execute(task);
174+
}
175+
176+
@Override
177+
protected boolean isStopped() {
178+
return stopped.get();
179+
}
180+
181+
@Override
182+
public void markAsCompleted() {
183+
stopped.set(true);
184+
}
185+
186+
@Override
187+
public void markAsFailed(Exception e) {
188+
stopped.set(true);
189+
}
190+
191+
};
192+
}
193+
194+
class CCRAction extends ReplicationAction<BulkShardOperationsRequest, BulkShardOperationsRequest, BulkShardOperationsResponse> {
195+
196+
CCRAction(BulkShardOperationsRequest request, ActionListener<BulkShardOperationsResponse> listener, ReplicationGroup group) {
197+
super(request, listener, group, "ccr");
198+
}
199+
200+
@Override
201+
protected PrimaryResult performOnPrimary(IndexShard primary, BulkShardOperationsRequest request) throws Exception {
202+
TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> result =
203+
TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getOperations(),
204+
primary, logger);
205+
return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful);
206+
}
207+
208+
@Override
209+
protected void performOnReplica(BulkShardOperationsRequest request, IndexShard replica) throws Exception {
210+
TransportBulkShardOperationsAction.applyTranslogOperations(request.getOperations(), replica, Origin.REPLICA);
211+
}
212+
}
213+
214+
}

0 commit comments

Comments
 (0)