Skip to content

Commit 6a0e8ff

Browse files
committed
Add support for ccr follow info api to HLRC. (#39115)
This API was introduces after #33824 was closed.
1 parent 49dff52 commit 6a0e8ff

File tree

11 files changed

+595
-0
lines changed

11 files changed

+595
-0
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java

+44
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.elasticsearch.client.ccr.CcrStatsRequest;
2424
import org.elasticsearch.client.ccr.CcrStatsResponse;
2525
import org.elasticsearch.client.ccr.DeleteAutoFollowPatternRequest;
26+
import org.elasticsearch.client.ccr.FollowInfoRequest;
27+
import org.elasticsearch.client.ccr.FollowInfoResponse;
2628
import org.elasticsearch.client.ccr.FollowStatsRequest;
2729
import org.elasticsearch.client.ccr.FollowStatsResponse;
2830
import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest;
@@ -452,4 +454,46 @@ public void getFollowStatsAsync(FollowStatsRequest request,
452454
);
453455
}
454456

457+
/**
458+
* Gets follow info for specific indices.
459+
*
460+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-get-follow-info.html">
461+
* the docs</a> for more.
462+
*
463+
* @param request the request
464+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
465+
* @return the response
466+
* @throws IOException in case there is a problem sending the request or parsing back the response
467+
*/
468+
public FollowInfoResponse getFollowInfo(FollowInfoRequest request, RequestOptions options) throws IOException {
469+
return restHighLevelClient.performRequestAndParseEntity(
470+
request,
471+
CcrRequestConverters::getFollowInfo,
472+
options,
473+
FollowInfoResponse::fromXContent,
474+
Collections.emptySet()
475+
);
476+
}
477+
478+
/**
479+
* Asynchronously gets follow info for specific indices.
480+
*
481+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-get-follow-info.html">
482+
* the docs</a> for more.
483+
*
484+
* @param request the request
485+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
486+
*/
487+
public void getFollowInfoAsync(FollowInfoRequest request,
488+
RequestOptions options,
489+
ActionListener<FollowInfoResponse> listener) {
490+
restHighLevelClient.performRequestAsyncAndParseEntity(
491+
request,
492+
CcrRequestConverters::getFollowInfo,
493+
options,
494+
FollowInfoResponse::fromXContent,
495+
listener,
496+
Collections.emptySet()
497+
);
498+
}
455499
}

client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java

+9
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.http.client.methods.HttpPut;
2626
import org.elasticsearch.client.ccr.CcrStatsRequest;
2727
import org.elasticsearch.client.ccr.DeleteAutoFollowPatternRequest;
28+
import org.elasticsearch.client.ccr.FollowInfoRequest;
2829
import org.elasticsearch.client.ccr.FollowStatsRequest;
2930
import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest;
3031
import org.elasticsearch.client.ccr.PauseFollowRequest;
@@ -119,4 +120,12 @@ static Request getFollowStats(FollowStatsRequest followStatsRequest) {
119120
return new Request(HttpGet.METHOD_NAME, endpoint);
120121
}
121122

123+
static Request getFollowInfo(FollowInfoRequest followInfoRequest) {
124+
String endpoint = new RequestConverters.EndpointBuilder()
125+
.addPathPart(followInfoRequest.getFollowerIndex())
126+
.addPathPartAsIs("_ccr", "info")
127+
.build();
128+
return new Request(HttpGet.METHOD_NAME, endpoint);
129+
}
130+
122131
}

client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/FollowConfig.java

+40
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import org.elasticsearch.common.ParseField;
2323
import org.elasticsearch.common.unit.ByteSizeValue;
2424
import org.elasticsearch.common.unit.TimeValue;
25+
import org.elasticsearch.common.xcontent.ObjectParser;
2526
import org.elasticsearch.common.xcontent.ToXContent;
2627
import org.elasticsearch.common.xcontent.XContentBuilder;
28+
import org.elasticsearch.common.xcontent.XContentParser;
2729

2830
import java.io.IOException;
2931
import java.util.Objects;
@@ -41,6 +43,44 @@ public class FollowConfig {
4143
static final ParseField MAX_RETRY_DELAY_FIELD = new ParseField("max_retry_delay");
4244
static final ParseField READ_POLL_TIMEOUT = new ParseField("read_poll_timeout");
4345

46+
private static final ObjectParser<FollowConfig, Void> PARSER = new ObjectParser<>(
47+
"follow_config",
48+
true,
49+
FollowConfig::new);
50+
51+
static {
52+
PARSER.declareInt(FollowConfig::setMaxReadRequestOperationCount, MAX_READ_REQUEST_OPERATION_COUNT);
53+
PARSER.declareInt(FollowConfig::setMaxOutstandingReadRequests, MAX_OUTSTANDING_READ_REQUESTS);
54+
PARSER.declareField(
55+
FollowConfig::setMaxReadRequestSize,
56+
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_READ_REQUEST_SIZE.getPreferredName()),
57+
MAX_READ_REQUEST_SIZE,
58+
ObjectParser.ValueType.STRING);
59+
PARSER.declareInt(FollowConfig::setMaxWriteRequestOperationCount, MAX_WRITE_REQUEST_OPERATION_COUNT);
60+
PARSER.declareField(
61+
FollowConfig::setMaxWriteRequestSize,
62+
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_REQUEST_SIZE.getPreferredName()),
63+
MAX_WRITE_REQUEST_SIZE,
64+
ObjectParser.ValueType.STRING);
65+
PARSER.declareInt(FollowConfig::setMaxOutstandingWriteRequests, MAX_OUTSTANDING_WRITE_REQUESTS);
66+
PARSER.declareInt(FollowConfig::setMaxWriteBufferCount, MAX_WRITE_BUFFER_COUNT);
67+
PARSER.declareField(
68+
FollowConfig::setMaxWriteBufferSize,
69+
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()),
70+
MAX_WRITE_BUFFER_SIZE,
71+
ObjectParser.ValueType.STRING);
72+
PARSER.declareField(FollowConfig::setMaxRetryDelay,
73+
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY_FIELD.getPreferredName()),
74+
MAX_RETRY_DELAY_FIELD, ObjectParser.ValueType.STRING);
75+
PARSER.declareField(FollowConfig::setReadPollTimeout,
76+
(p, c) -> TimeValue.parseTimeValue(p.text(), READ_POLL_TIMEOUT.getPreferredName()),
77+
READ_POLL_TIMEOUT, ObjectParser.ValueType.STRING);
78+
}
79+
80+
static FollowConfig fromXContent(XContentParser parser) {
81+
return PARSER.apply(parser, null);
82+
}
83+
4484
private Integer maxReadRequestOperationCount;
4585
private Integer maxOutstandingReadRequests;
4686
private ByteSizeValue maxReadRequestSize;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.client.ccr;
21+
22+
import org.elasticsearch.client.Validatable;
23+
24+
import java.util.Objects;
25+
26+
public final class FollowInfoRequest implements Validatable {
27+
28+
private final String followerIndex;
29+
30+
public FollowInfoRequest(String followerIndex) {
31+
this.followerIndex = Objects.requireNonNull(followerIndex);
32+
}
33+
34+
public String getFollowerIndex() {
35+
return followerIndex;
36+
}
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.client.ccr;
21+
22+
import org.elasticsearch.common.ParseField;
23+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
24+
import org.elasticsearch.common.xcontent.XContentParser;
25+
26+
import java.util.List;
27+
import java.util.Objects;
28+
29+
public final class FollowInfoResponse {
30+
31+
static final ParseField FOLLOWER_INDICES_FIELD = new ParseField("follower_indices");
32+
33+
private static final ConstructingObjectParser<FollowInfoResponse, Void> PARSER = new ConstructingObjectParser<>(
34+
"indices",
35+
true,
36+
args -> {
37+
@SuppressWarnings("unchecked")
38+
List<FollowerInfo> infos = (List<FollowerInfo>) args[0];
39+
return new FollowInfoResponse(infos);
40+
});
41+
42+
static {
43+
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), FollowerInfo.PARSER, FOLLOWER_INDICES_FIELD);
44+
}
45+
46+
public static FollowInfoResponse fromXContent(XContentParser parser) {
47+
return PARSER.apply(parser, null);
48+
}
49+
50+
private final List<FollowerInfo> infos;
51+
52+
FollowInfoResponse(List<FollowerInfo> infos) {
53+
this.infos = infos;
54+
}
55+
56+
public List<FollowerInfo> getInfos() {
57+
return infos;
58+
}
59+
60+
@Override
61+
public boolean equals(Object o) {
62+
if (this == o) return true;
63+
if (o == null || getClass() != o.getClass()) return false;
64+
FollowInfoResponse that = (FollowInfoResponse) o;
65+
return infos.equals(that.infos);
66+
}
67+
68+
@Override
69+
public int hashCode() {
70+
return Objects.hash(infos);
71+
}
72+
73+
public static final class FollowerInfo {
74+
75+
static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");
76+
static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster");
77+
static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index");
78+
static final ParseField STATUS_FIELD = new ParseField("status");
79+
static final ParseField PARAMETERS_FIELD = new ParseField("parameters");
80+
81+
private static final ConstructingObjectParser<FollowerInfo, Void> PARSER = new ConstructingObjectParser<>(
82+
"follower_info",
83+
true,
84+
args -> {
85+
return new FollowerInfo((String) args[0], (String) args[1], (String) args[2],
86+
Status.fromString((String) args[3]), (FollowConfig) args[4]);
87+
});
88+
89+
static {
90+
PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOWER_INDEX_FIELD);
91+
PARSER.declareString(ConstructingObjectParser.constructorArg(), REMOTE_CLUSTER_FIELD);
92+
PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX_FIELD);
93+
PARSER.declareString(ConstructingObjectParser.constructorArg(), STATUS_FIELD);
94+
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
95+
(p, c) -> FollowConfig.fromXContent(p), PARAMETERS_FIELD);
96+
}
97+
98+
private final String followerIndex;
99+
private final String remoteCluster;
100+
private final String leaderIndex;
101+
private final Status status;
102+
private final FollowConfig parameters;
103+
104+
FollowerInfo(String followerIndex, String remoteCluster, String leaderIndex, Status status,
105+
FollowConfig parameters) {
106+
this.followerIndex = followerIndex;
107+
this.remoteCluster = remoteCluster;
108+
this.leaderIndex = leaderIndex;
109+
this.status = status;
110+
this.parameters = parameters;
111+
}
112+
113+
public String getFollowerIndex() {
114+
return followerIndex;
115+
}
116+
117+
public String getRemoteCluster() {
118+
return remoteCluster;
119+
}
120+
121+
public String getLeaderIndex() {
122+
return leaderIndex;
123+
}
124+
125+
public Status getStatus() {
126+
return status;
127+
}
128+
129+
public FollowConfig getParameters() {
130+
return parameters;
131+
}
132+
133+
@Override
134+
public boolean equals(Object o) {
135+
if (this == o) return true;
136+
if (o == null || getClass() != o.getClass()) return false;
137+
FollowerInfo that = (FollowerInfo) o;
138+
return Objects.equals(followerIndex, that.followerIndex) &&
139+
Objects.equals(remoteCluster, that.remoteCluster) &&
140+
Objects.equals(leaderIndex, that.leaderIndex) &&
141+
status == that.status &&
142+
Objects.equals(parameters, that.parameters);
143+
}
144+
145+
@Override
146+
public int hashCode() {
147+
return Objects.hash(followerIndex, remoteCluster, leaderIndex, status, parameters);
148+
}
149+
150+
}
151+
152+
public enum Status {
153+
154+
ACTIVE("active"),
155+
PAUSED("paused");
156+
157+
private final String name;
158+
159+
Status(String name) {
160+
this.name = name;
161+
}
162+
163+
public String getName() {
164+
return name;
165+
}
166+
167+
public static Status fromString(String value) {
168+
switch (value) {
169+
case "active":
170+
return Status.ACTIVE;
171+
case "paused":
172+
return Status.PAUSED;
173+
default:
174+
throw new IllegalArgumentException("unexpected status value [" + value + "]");
175+
}
176+
}
177+
}
178+
}

client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java

+22
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.elasticsearch.client.ccr.CcrStatsRequest;
3333
import org.elasticsearch.client.ccr.CcrStatsResponse;
3434
import org.elasticsearch.client.ccr.DeleteAutoFollowPatternRequest;
35+
import org.elasticsearch.client.ccr.FollowInfoRequest;
36+
import org.elasticsearch.client.ccr.FollowInfoResponse;
3537
import org.elasticsearch.client.ccr.FollowStatsRequest;
3638
import org.elasticsearch.client.ccr.FollowStatsResponse;
3739
import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest;
@@ -113,6 +115,15 @@ public void testIndexFollowing() throws Exception {
113115

114116
try {
115117
assertBusy(() -> {
118+
FollowInfoRequest followInfoRequest = new FollowInfoRequest("follower");
119+
FollowInfoResponse followInfoResponse =
120+
execute(followInfoRequest, ccrClient::getFollowInfo, ccrClient::getFollowInfoAsync);
121+
assertThat(followInfoResponse.getInfos().size(), equalTo(1));
122+
assertThat(followInfoResponse.getInfos().get(0).getFollowerIndex(), equalTo("follower"));
123+
assertThat(followInfoResponse.getInfos().get(0).getLeaderIndex(), equalTo("leader"));
124+
assertThat(followInfoResponse.getInfos().get(0).getRemoteCluster(), equalTo("local_cluster"));
125+
assertThat(followInfoResponse.getInfos().get(0).getStatus(), equalTo(FollowInfoResponse.Status.ACTIVE));
126+
116127
FollowStatsRequest followStatsRequest = new FollowStatsRequest("follower");
117128
FollowStatsResponse followStatsResponse =
118129
execute(followStatsRequest, ccrClient::getFollowStats, ccrClient::getFollowStatsAsync);
@@ -170,6 +181,17 @@ public void testIndexFollowing() throws Exception {
170181
pauseFollowResponse = execute(pauseFollowRequest, ccrClient::pauseFollow, ccrClient::pauseFollowAsync);
171182
assertThat(pauseFollowResponse.isAcknowledged(), is(true));
172183

184+
assertBusy(() -> {
185+
FollowInfoRequest followInfoRequest = new FollowInfoRequest("follower");
186+
FollowInfoResponse followInfoResponse =
187+
execute(followInfoRequest, ccrClient::getFollowInfo, ccrClient::getFollowInfoAsync);
188+
assertThat(followInfoResponse.getInfos().size(), equalTo(1));
189+
assertThat(followInfoResponse.getInfos().get(0).getFollowerIndex(), equalTo("follower"));
190+
assertThat(followInfoResponse.getInfos().get(0).getLeaderIndex(), equalTo("leader"));
191+
assertThat(followInfoResponse.getInfos().get(0).getRemoteCluster(), equalTo("local_cluster"));
192+
assertThat(followInfoResponse.getInfos().get(0).getStatus(), equalTo(FollowInfoResponse.Status.PAUSED));
193+
});
194+
173195
// Need to close index prior to unfollowing it:
174196
CloseIndexRequest closeIndexRequest = new CloseIndexRequest("follower");
175197
org.elasticsearch.action.support.master.AcknowledgedResponse closeIndexReponse =

0 commit comments

Comments
 (0)