Skip to content

Commit e73140e

Browse files
authored
Use CcrRepository to init follower index (#35719) (#37988)
This commit modifies the put follow index action to use a CcrRepository when creating a follower index. It routes the logic through the snapshot/restore process. A wait_for_active_shards parameter can be used to configure how long to wait before returning the response.
1 parent 0b794d4 commit e73140e

File tree

34 files changed

+668
-183
lines changed

34 files changed

+668
-183
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java

+3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.http.client.methods.HttpGet;
2424
import org.apache.http.client.methods.HttpPost;
2525
import org.apache.http.client.methods.HttpPut;
26+
import org.elasticsearch.action.support.ActiveShardCount;
2627
import org.elasticsearch.client.ccr.CcrStatsRequest;
2728
import org.elasticsearch.client.ccr.DeleteAutoFollowPatternRequest;
2829
import org.elasticsearch.client.ccr.FollowStatsRequest;
@@ -46,6 +47,8 @@ static Request putFollow(PutFollowRequest putFollowRequest) throws IOException {
4647
.addPathPartAsIs("_ccr", "follow")
4748
.build();
4849
Request request = new Request(HttpPut.METHOD_NAME, endpoint);
50+
RequestConverters.Params parameters = new RequestConverters.Params(request);
51+
parameters.withWaitForActiveShards(putFollowRequest.waitForActiveShards(), ActiveShardCount.NONE);
4952
request.setEntity(createEntity(putFollowRequest, REQUEST_BODY_CONTENT_TYPE));
5053
return request;
5154
}

client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/PutFollowRequest.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.client.ccr;
2121

22+
import org.elasticsearch.action.support.ActiveShardCount;
2223
import org.elasticsearch.client.Validatable;
2324
import org.elasticsearch.common.ParseField;
2425
import org.elasticsearch.common.xcontent.ToXContentObject;
@@ -36,11 +37,17 @@ public final class PutFollowRequest extends FollowConfig implements Validatable,
3637
private final String remoteCluster;
3738
private final String leaderIndex;
3839
private final String followerIndex;
40+
private final ActiveShardCount waitForActiveShards;
3941

4042
public PutFollowRequest(String remoteCluster, String leaderIndex, String followerIndex) {
43+
this(remoteCluster, leaderIndex, followerIndex, ActiveShardCount.NONE);
44+
}
45+
46+
public PutFollowRequest(String remoteCluster, String leaderIndex, String followerIndex, ActiveShardCount waitForActiveShards) {
4147
this.remoteCluster = Objects.requireNonNull(remoteCluster, "remoteCluster");
4248
this.leaderIndex = Objects.requireNonNull(leaderIndex, "leaderIndex");
4349
this.followerIndex = Objects.requireNonNull(followerIndex, "followerIndex");
50+
this.waitForActiveShards = waitForActiveShards;
4451
}
4552

4653
@Override
@@ -66,13 +73,18 @@ public String getFollowerIndex() {
6673
return followerIndex;
6774
}
6875

76+
public ActiveShardCount waitForActiveShards() {
77+
return waitForActiveShards;
78+
}
79+
6980
@Override
7081
public boolean equals(Object o) {
7182
if (this == o) return true;
7283
if (o == null || getClass() != o.getClass()) return false;
7384
if (!super.equals(o)) return false;
7485
PutFollowRequest that = (PutFollowRequest) o;
75-
return Objects.equals(remoteCluster, that.remoteCluster) &&
86+
return Objects.equals(waitForActiveShards, that.waitForActiveShards) &&
87+
Objects.equals(remoteCluster, that.remoteCluster) &&
7688
Objects.equals(leaderIndex, that.leaderIndex) &&
7789
Objects.equals(followerIndex, that.followerIndex);
7890
}
@@ -83,7 +95,7 @@ public int hashCode() {
8395
super.hashCode(),
8496
remoteCluster,
8597
leaderIndex,
86-
followerIndex
87-
);
98+
followerIndex,
99+
waitForActiveShards);
88100
}
89101
}

client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.action.index.IndexRequest;
2727
import org.elasticsearch.action.search.SearchRequest;
2828
import org.elasticsearch.action.search.SearchResponse;
29+
import org.elasticsearch.action.support.ActiveShardCount;
2930
import org.elasticsearch.action.support.WriteRequest;
3031
import org.elasticsearch.client.ccr.CcrStatsRequest;
3132
import org.elasticsearch.client.ccr.CcrStatsResponse;
@@ -98,7 +99,7 @@ public void testIndexFollowing() throws Exception {
9899
CreateIndexResponse response = highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT);
99100
assertThat(response.isAcknowledged(), is(true));
100101

101-
PutFollowRequest putFollowRequest = new PutFollowRequest("local_cluster", "leader", "follower");
102+
PutFollowRequest putFollowRequest = new PutFollowRequest("local_cluster", "leader", "follower", ActiveShardCount.ONE);
102103
PutFollowResponse putFollowResponse = execute(putFollowRequest, ccrClient::putFollow, ccrClient::putFollowAsync);
103104
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
104105
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
2727
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
2828
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
29+
import org.elasticsearch.action.support.ActiveShardCount;
2930
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
3031
import org.elasticsearch.client.Request;
3132
import org.elasticsearch.client.RequestOptions;
@@ -97,7 +98,8 @@ public void testPutFollow() throws Exception {
9798
PutFollowRequest putFollowRequest = new PutFollowRequest(
9899
"local", // <1>
99100
"leader", // <2>
100-
"follower" // <3>
101+
"follower", // <3>
102+
ActiveShardCount.ONE // <4>
101103
);
102104
// end::ccr-put-follow-request
103105

@@ -175,7 +177,7 @@ public void testPauseFollow() throws Exception {
175177
String followIndex = "follower";
176178
// Follow index, so that it can be paused:
177179
{
178-
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex);
180+
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex, ActiveShardCount.ONE);
179181
PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT);
180182
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
181183
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
@@ -241,7 +243,7 @@ public void testResumeFollow() throws Exception {
241243
String followIndex = "follower";
242244
// Follow index, so that it can be paused:
243245
{
244-
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex);
246+
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex, ActiveShardCount.ONE);
245247
PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT);
246248
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
247249
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
@@ -317,7 +319,7 @@ public void testUnfollow() throws Exception {
317319
String followIndex = "follower";
318320
// Follow index, pause and close, so that it can be unfollowed:
319321
{
320-
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex);
322+
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex, ActiveShardCount.ONE);
321323
PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT);
322324
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
323325
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
@@ -349,7 +351,7 @@ public void testUnfollow() throws Exception {
349351
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(followIndex);
350352
assertThat(client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT).isAcknowledged(), is(true));
351353

352-
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex);
354+
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex, ActiveShardCount.ONE);
353355
PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT);
354356
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
355357
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
@@ -639,7 +641,7 @@ public void testGetFollowStats() throws Exception {
639641
}
640642
{
641643
// Follow index, so that we can query for follow stats:
642-
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", "follower");
644+
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", "follower", ActiveShardCount.ONE);
643645
PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT);
644646
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
645647
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));

docs/java-rest/high-level/ccr/put_follow.asciidoc

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ include-tagged::{doc-tests-file}[{api}-request]
2020
<1> The name of the remote cluster alias.
2121
<2> The name of the leader in the remote cluster.
2222
<3> The name of the follower index that gets created as part of the put follow API call.
23+
<4> The number of active shard copies to wait for before the put follow API returns a
24+
response, as an `ActiveShardCount`
2325

2426
[id="{upid}-{api}-response"]
2527
==== Response

docs/reference/ccr/apis/follow/get-follow-info.asciidoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ replication options and whether the follower indices are active or paused.
2222
2323
[source,js]
2424
--------------------------------------------------
25-
PUT /follower_index/_ccr/follow
25+
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
2626
{
2727
"remote_cluster" : "remote_cluster",
2828
"leader_index" : "leader_index"

docs/reference/ccr/apis/follow/get-follow-stats.asciidoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ following tasks associated with each shard for the specified indices.
2121
2222
[source,js]
2323
--------------------------------------------------
24-
PUT /follower_index/_ccr/follow
24+
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
2525
{
2626
"remote_cluster" : "remote_cluster",
2727
"leader_index" : "leader_index"

docs/reference/ccr/apis/follow/post-pause-follow.asciidoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ following task.
2424
2525
[source,js]
2626
--------------------------------------------------
27-
PUT /follower_index/_ccr/follow
27+
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
2828
{
2929
"remote_cluster" : "remote_cluster",
3030
"leader_index" : "leader_index"

docs/reference/ccr/apis/follow/post-resume-follow.asciidoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ returns, the follower index will resume fetching operations from the leader inde
2323
2424
[source,js]
2525
--------------------------------------------------
26-
PUT /follower_index/_ccr/follow
26+
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
2727
{
2828
"remote_cluster" : "remote_cluster",
2929
"leader_index" : "leader_index"

docs/reference/ccr/apis/follow/post-unfollow.asciidoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ irreversible operation.
2727
2828
[source,js]
2929
--------------------------------------------------
30-
PUT /follower_index/_ccr/follow
30+
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
3131
{
3232
"remote_cluster" : "remote_cluster",
3333
"leader_index" : "leader_index"

docs/reference/ccr/apis/follow/put-follow.asciidoc

+7-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ POST /follower_index/_ccr/pause_follow
3131

3232
[source,js]
3333
--------------------------------------------------
34-
PUT /<follower_index>/_ccr/follow
34+
PUT /<follower_index>/_ccr/follow?wait_for_active_shards=1
3535
{
3636
"remote_cluster" : "<remote_cluster>",
3737
"leader_index" : "<leader_index>"
@@ -43,6 +43,11 @@ PUT /<follower_index>/_ccr/follow
4343
// TEST[s/<remote_cluster>/remote_cluster/]
4444
// TEST[s/<leader_index>/leader_index/]
4545

46+
The `wait_for_active_shards` parameter specifies the number of shards to wait on being active
47+
before responding. This defaults to waiting on none of the shards to be active. A shard must
48+
be restored from the leader index being active. Restoring a follower shard requires transferring
49+
all the remote Lucene segment files to the follower index.
50+
4651
==== Path Parameters
4752

4853
`follower_index` (required)::
@@ -73,7 +78,7 @@ This example creates a follower index named `follower_index`:
7378

7479
[source,js]
7580
--------------------------------------------------
76-
PUT /follower_index/_ccr/follow
81+
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
7782
{
7883
"remote_cluster" : "remote_cluster",
7984
"leader_index" : "leader_index",

docs/reference/ccr/apis/get-ccr-stats.asciidoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ shard-level stats as in the <<ccr-get-follow-stats,get follower stats API>>.
2222
2323
[source,js]
2424
--------------------------------------------------
25-
PUT /follower_index/_ccr/follow
25+
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
2626
{
2727
"remote_cluster" : "remote_cluster",
2828
"leader_index" : "leader_index"

docs/reference/ccr/getting-started.asciidoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ cluster.
238238

239239
[source,js]
240240
--------------------------------------------------
241-
PUT /server-metrics-copy/_ccr/follow
241+
PUT /server-metrics-copy/_ccr/follow?wait_for_active_shards=1
242242
{
243243
"remote_cluster" : "leader",
244244
"leader_index" : "server-metrics"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.admin.cluster.snapshots.restore;
21+
22+
import org.apache.logging.log4j.LogManager;
23+
import org.apache.logging.log4j.Logger;
24+
import org.elasticsearch.action.ActionListener;
25+
import org.elasticsearch.cluster.ClusterChangedEvent;
26+
import org.elasticsearch.cluster.ClusterStateListener;
27+
import org.elasticsearch.cluster.RestoreInProgress;
28+
import org.elasticsearch.cluster.service.ClusterService;
29+
import org.elasticsearch.common.collect.ImmutableOpenMap;
30+
import org.elasticsearch.index.shard.ShardId;
31+
import org.elasticsearch.snapshots.RestoreInfo;
32+
import org.elasticsearch.snapshots.RestoreService;
33+
34+
import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;
35+
36+
public class RestoreClusterStateListener implements ClusterStateListener {
37+
38+
private static final Logger logger = LogManager.getLogger(RestoreClusterStateListener.class);
39+
40+
private final ClusterService clusterService;
41+
private final String uuid;
42+
private final ActionListener<RestoreSnapshotResponse> listener;
43+
44+
45+
private RestoreClusterStateListener(ClusterService clusterService, RestoreService.RestoreCompletionResponse response,
46+
ActionListener<RestoreSnapshotResponse> listener) {
47+
this.clusterService = clusterService;
48+
this.uuid = response.getUuid();
49+
this.listener = listener;
50+
}
51+
52+
@Override
53+
public void clusterChanged(ClusterChangedEvent changedEvent) {
54+
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
55+
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
56+
if (prevEntry == null) {
57+
// When there is a master failure after a restore has been started, this listener might not be registered
58+
// on the current master and as such it might miss some intermediary cluster states due to batching.
59+
// Clean up listener in that case and acknowledge completion of restore operation to client.
60+
clusterService.removeListener(this);
61+
listener.onResponse(new RestoreSnapshotResponse(null));
62+
} else if (newEntry == null) {
63+
clusterService.removeListener(this);
64+
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
65+
assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state();
66+
assert RestoreService.completed(shards) : "expected all restore entries to be completed";
67+
RestoreInfo ri = new RestoreInfo(prevEntry.snapshot().getSnapshotId().getName(),
68+
prevEntry.indices(),
69+
shards.size(),
70+
shards.size() - RestoreService.failedShards(shards));
71+
RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri);
72+
logger.debug("restore of [{}] completed", prevEntry.snapshot().getSnapshotId());
73+
listener.onResponse(response);
74+
} else {
75+
// restore not completed yet, wait for next cluster state update
76+
}
77+
}
78+
79+
/**
80+
* Creates a cluster state listener and registers it with the cluster service. The listener passed as a
81+
* parameter will be called when the restore is complete.
82+
*/
83+
public static void createAndRegisterListener(ClusterService clusterService, RestoreService.RestoreCompletionResponse response,
84+
ActionListener<RestoreSnapshotResponse> listener) {
85+
clusterService.addListener(new RestoreClusterStateListener(clusterService, response, listener));
86+
}
87+
}

0 commit comments

Comments
 (0)