Skip to content

Commit e487f14

Browse files
authored
Add CcrRestoreSourceService to track sessions (elastic#36578) (elastic#36911)
This commit is related to elastic#36127. It adds a CcrRestoreSourceService to track Engine.IndexCommitRef need for in-process file restores. When a follower starts restoring a shard through the CcrRepository it opens a session with the leader through the PutCcrRestoreSessionAction. The leader responds to the request by telling the follower what files it needs to fetch for a restore. This is not yet implemented. Once, the restore is complete, the follower closes the session with the DeleteCcrRestoreSessionAction action.
1 parent 3dda271 commit e487f14

File tree

12 files changed

+843
-5
lines changed

12 files changed

+843
-5
lines changed

server/src/main/java/org/elasticsearch/common/util/iterable/Iterables.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public Iterables() {
3434

3535
public static <T> Iterable<T> concat(Iterable<T>... inputs) {
3636
Objects.requireNonNull(inputs);
37-
return new ConcatenatedIterable(inputs);
37+
return new ConcatenatedIterable<>(inputs);
3838
}
3939

4040
static class ConcatenatedIterable<T> implements Iterable<T> {

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2626
import org.elasticsearch.env.Environment;
2727
import org.elasticsearch.env.NodeEnvironment;
28+
import org.elasticsearch.index.IndexModule;
2829
import org.elasticsearch.index.IndexSettings;
2930
import org.elasticsearch.index.engine.EngineFactory;
3031
import org.elasticsearch.license.XPackLicenseState;
@@ -59,10 +60,13 @@
5960
import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction;
6061
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
6162
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
63+
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
6264
import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction;
65+
import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction;
6366
import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction;
6467
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
6568
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
69+
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
6670
import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction;
6771
import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction;
6872
import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction;
@@ -113,7 +117,9 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
113117
private final boolean enabled;
114118
private final Settings settings;
115119
private final CcrLicenseChecker ccrLicenseChecker;
120+
116121
private final SetOnce<CcrRepositoryManager> repositoryManager = new SetOnce<>();
122+
private final SetOnce<CcrRestoreSourceService> restoreSourceService = new SetOnce<>();
117123
private Client client;
118124

119125
private final boolean tribeNode;
@@ -160,9 +166,12 @@ public Collection<Object> createComponents(
160166
}
161167

162168
this.repositoryManager.set(new CcrRepositoryManager(settings, clusterService, (NodeClient) client));
163-
169+
CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(settings);
170+
this.restoreSourceService.set(restoreSourceService);
164171
return Arrays.asList(
165172
ccrLicenseChecker,
173+
restoreSourceService,
174+
repositoryManager.get(),
166175
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis)
167176
);
168177
}
@@ -189,6 +198,10 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
189198
PutInternalCcrRepositoryAction.TransportPutInternalRepositoryAction.class),
190199
new ActionHandler<>(DeleteInternalCcrRepositoryAction.INSTANCE,
191200
DeleteInternalCcrRepositoryAction.TransportDeleteInternalRepositoryAction.class),
201+
new ActionHandler<>(PutCcrRestoreSessionAction.INSTANCE,
202+
PutCcrRestoreSessionAction.TransportPutCcrRestoreSessionAction.class),
203+
new ActionHandler<>(ClearCcrRestoreSessionAction.INSTANCE,
204+
ClearCcrRestoreSessionAction.TransportDeleteCcrRestoreSessionAction.class),
192205
// stats action
193206
new ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class),
194207
new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class),
@@ -288,6 +301,11 @@ public Map<String, Repository.Factory> getInternalRepositories(Environment env,
288301
return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory);
289302
}
290303

304+
@Override
305+
public void onIndexModule(IndexModule indexModule) {
306+
indexModule.addIndexEventListener(this.restoreSourceService.get());
307+
}
308+
291309
protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); }
292310

293311
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ccr.action.repositories;
8+
9+
import org.elasticsearch.action.Action;
10+
import org.elasticsearch.action.FailedNodeException;
11+
import org.elasticsearch.action.support.ActionFilters;
12+
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
13+
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
14+
import org.elasticsearch.action.support.nodes.TransportNodesAction;
15+
import org.elasticsearch.client.ElasticsearchClient;
16+
import org.elasticsearch.cluster.ClusterName;
17+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
18+
import org.elasticsearch.cluster.node.DiscoveryNode;
19+
import org.elasticsearch.cluster.service.ClusterService;
20+
import org.elasticsearch.common.inject.Inject;
21+
import org.elasticsearch.common.io.stream.StreamInput;
22+
import org.elasticsearch.common.io.stream.StreamOutput;
23+
import org.elasticsearch.common.settings.Settings;
24+
import org.elasticsearch.threadpool.ThreadPool;
25+
import org.elasticsearch.transport.TransportService;
26+
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
27+
28+
import java.io.IOException;
29+
import java.util.List;
30+
31+
public class ClearCcrRestoreSessionAction extends Action<ClearCcrRestoreSessionRequest,
32+
ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse, ClearCcrRestoreSessionRequestBuilder> {
33+
34+
public static final ClearCcrRestoreSessionAction INSTANCE = new ClearCcrRestoreSessionAction();
35+
private static final String NAME = "internal:admin/ccr/restore/session/clear";
36+
37+
private ClearCcrRestoreSessionAction() {
38+
super(NAME);
39+
}
40+
41+
@Override
42+
public ClearCcrRestoreSessionResponse newResponse() {
43+
return new ClearCcrRestoreSessionResponse();
44+
}
45+
46+
@Override
47+
public ClearCcrRestoreSessionRequestBuilder newRequestBuilder(ElasticsearchClient client) {
48+
return new ClearCcrRestoreSessionRequestBuilder(client);
49+
}
50+
51+
public static class TransportDeleteCcrRestoreSessionAction extends TransportNodesAction<ClearCcrRestoreSessionRequest,
52+
ClearCcrRestoreSessionResponse, ClearCcrRestoreSessionRequest.Request, Response> {
53+
54+
private final CcrRestoreSourceService ccrRestoreService;
55+
56+
@Inject
57+
public TransportDeleteCcrRestoreSessionAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
58+
ActionFilters actionFilters, IndexNameExpressionResolver resolver,
59+
TransportService transportService, CcrRestoreSourceService ccrRestoreService) {
60+
super(settings, NAME, threadPool, clusterService, transportService, actionFilters, resolver,
61+
ClearCcrRestoreSessionRequest::new, ClearCcrRestoreSessionRequest.Request::new, ThreadPool.Names.GENERIC, Response.class);
62+
this.ccrRestoreService = ccrRestoreService;
63+
}
64+
65+
@Override
66+
protected ClearCcrRestoreSessionResponse newResponse(ClearCcrRestoreSessionRequest request, List<Response> responses,
67+
List<FailedNodeException> failures) {
68+
return new ClearCcrRestoreSessionResponse(clusterService.getClusterName(), responses, failures);
69+
}
70+
71+
@Override
72+
protected ClearCcrRestoreSessionRequest.Request newNodeRequest(String nodeId, ClearCcrRestoreSessionRequest request) {
73+
return request.getRequest();
74+
}
75+
76+
@Override
77+
protected Response newNodeResponse() {
78+
return new Response();
79+
}
80+
81+
@Override
82+
protected Response nodeOperation(ClearCcrRestoreSessionRequest.Request request) {
83+
ccrRestoreService.closeSession(request.getSessionUUID());
84+
return new Response(clusterService.localNode());
85+
}
86+
}
87+
88+
public static class Response extends BaseNodeResponse {
89+
90+
private Response() {
91+
}
92+
93+
private Response(StreamInput in) throws IOException {
94+
readFrom(in);
95+
}
96+
97+
private Response(DiscoveryNode node) {
98+
super(node);
99+
}
100+
101+
@Override
102+
public void writeTo(StreamOutput out) throws IOException {
103+
super.writeTo(out);
104+
}
105+
106+
@Override
107+
public void readFrom(StreamInput in) throws IOException {
108+
super.readFrom(in);
109+
}
110+
}
111+
112+
public static class ClearCcrRestoreSessionResponse extends BaseNodesResponse<Response> {
113+
114+
ClearCcrRestoreSessionResponse() {
115+
}
116+
117+
ClearCcrRestoreSessionResponse(ClusterName clusterName, List<Response> chunkResponses, List<FailedNodeException> failures) {
118+
super(clusterName, chunkResponses, failures);
119+
}
120+
121+
@Override
122+
protected List<Response> readNodesFrom(StreamInput in) throws IOException {
123+
return in.readList(Response::new);
124+
}
125+
126+
@Override
127+
protected void writeNodesTo(StreamOutput out, List<Response> nodes) throws IOException {
128+
out.writeList(nodes);
129+
}
130+
}
131+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ccr.action.repositories;
8+
9+
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
10+
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
13+
14+
import java.io.IOException;
15+
16+
public class ClearCcrRestoreSessionRequest extends BaseNodesRequest<ClearCcrRestoreSessionRequest> {
17+
18+
private Request request;
19+
20+
ClearCcrRestoreSessionRequest() {
21+
}
22+
23+
public ClearCcrRestoreSessionRequest(String nodeId, Request request) {
24+
super(nodeId);
25+
this.request = request;
26+
}
27+
28+
@Override
29+
public void readFrom(StreamInput streamInput) throws IOException {
30+
super.readFrom(streamInput);
31+
request = new Request();
32+
request.readFrom(streamInput);
33+
}
34+
35+
@Override
36+
public void writeTo(StreamOutput streamOutput) throws IOException {
37+
super.writeTo(streamOutput);
38+
request.writeTo(streamOutput);
39+
}
40+
41+
public Request getRequest() {
42+
return request;
43+
}
44+
45+
public static class Request extends BaseNodeRequest {
46+
47+
private String sessionUUID;
48+
49+
Request() {
50+
}
51+
52+
public Request(String nodeId, String sessionUUID) {
53+
super(nodeId);
54+
this.sessionUUID = sessionUUID;
55+
}
56+
57+
@Override
58+
public void readFrom(StreamInput in) throws IOException {
59+
super.readFrom(in);
60+
sessionUUID = in.readString();
61+
}
62+
63+
@Override
64+
public void writeTo(StreamOutput out) throws IOException {
65+
super.writeTo(out);
66+
out.writeString(sessionUUID);
67+
}
68+
69+
public String getSessionUUID() {
70+
return sessionUUID;
71+
}
72+
}
73+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ccr.action.repositories;
8+
9+
import org.elasticsearch.action.ActionRequestBuilder;
10+
import org.elasticsearch.client.ElasticsearchClient;
11+
12+
class ClearCcrRestoreSessionRequestBuilder extends ActionRequestBuilder<ClearCcrRestoreSessionRequest,
13+
ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse, ClearCcrRestoreSessionRequestBuilder> {
14+
15+
ClearCcrRestoreSessionRequestBuilder(ElasticsearchClient client) {
16+
super(client, ClearCcrRestoreSessionAction.INSTANCE, new ClearCcrRestoreSessionRequest());
17+
}
18+
}

0 commit comments

Comments
 (0)