Skip to content

Commit 1f6bf50

Browse files
committed
Snapshot: Use TransportMasterNodeAction to update
Currently, we are using a plain TransportRequestHandler to post snapshot status messages to the master. However, it doesn't have a robust retry mechanism as TransportMasterNodeAction. This changes migrate from TransportRequestHandler to TransportMasterNodeAction. Most of code in TransportSnapshotUpdateStatusAction is copied from SnapshotShardsService. Serializing a MasterNodeRequest requires 8 bytes more than a TransportRequest. In order to maintain the BWC in a mixed cluster, we have to serialize/deserialize a MasterNodeRequest as a TransportRequest without timeout. Closes elastic#27151
1 parent d01ad93 commit 1f6bf50

File tree

12 files changed

+596
-196
lines changed

12 files changed

+596
-196
lines changed

core/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,6 @@
181181
import org.elasticsearch.action.search.TransportMultiSearchAction;
182182
import org.elasticsearch.action.search.TransportSearchAction;
183183
import org.elasticsearch.action.search.TransportSearchScrollAction;
184-
import org.elasticsearch.action.support.ActionFilter;
185184
import org.elasticsearch.action.support.ActionFilters;
186185
import org.elasticsearch.action.support.AutoCreateIndex;
187186
import org.elasticsearch.action.support.DestructiveOperations;
@@ -199,7 +198,6 @@
199198
import org.elasticsearch.common.NamedRegistry;
200199
import org.elasticsearch.common.inject.AbstractModule;
201200
import org.elasticsearch.common.inject.multibindings.MapBinder;
202-
import org.elasticsearch.common.inject.multibindings.Multibinder;
203201
import org.elasticsearch.common.logging.ESLoggerFactory;
204202
import org.elasticsearch.common.settings.ClusterSettings;
205203
import org.elasticsearch.common.settings.IndexScopedSettings;
@@ -311,6 +309,8 @@
311309
import org.elasticsearch.rest.action.search.RestMultiSearchAction;
312310
import org.elasticsearch.rest.action.search.RestSearchAction;
313311
import org.elasticsearch.rest.action.search.RestSearchScrollAction;
312+
import org.elasticsearch.snapshots.TransportSnapshotUpdateStatusAction;
313+
import org.elasticsearch.snapshots.UpdateSnapshotStatusAction;
314314
import org.elasticsearch.threadpool.ThreadPool;
315315
import org.elasticsearch.usage.UsageService;
316316

@@ -324,7 +324,6 @@
324324
import java.util.function.UnaryOperator;
325325
import java.util.stream.Collectors;
326326

327-
import static java.util.Collections.unmodifiableList;
328327
import static java.util.Collections.unmodifiableMap;
329328

330329
/**
@@ -432,6 +431,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
432431
actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);
433432
actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
434433
actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);
434+
actions.register(UpdateSnapshotStatusAction.INSTANCE, TransportSnapshotUpdateStatusAction.class);
435435

436436
actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
437437
actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);

core/src/main/java/org/elasticsearch/action/support/master/MasterNodeRequest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,22 @@ public void readFrom(StreamInput in) throws IOException {
7676
super.readFrom(in);
7777
masterNodeTimeout = new TimeValue(in);
7878
}
79+
80+
/**
81+
* CAUTION: Use this method for the BWC purpose only.
82+
* This method serializes a {@link MasterNodeRequest} as a {@link org.elasticsearch.transport.TransportRequest}
83+
* without timeout. The master will have to use the default timeout setting.
84+
*/
85+
protected final void readFromAsTransportRequest(StreamInput in) throws IOException {
86+
super.readFrom(in);
87+
}
88+
89+
/**
90+
* CAUTION: Use this method for the BWC purpose only.
91+
* This method deserializes a {@link MasterNodeRequest} from a {@link org.elasticsearch.transport.TransportRequest}
92+
* without timeout. The master will have to use the default timeout setting.
93+
*/
94+
protected final void writeToAsTransportRequest(StreamOutput out) throws IOException {
95+
super.writeTo(out);
96+
}
7997
}

core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

Lines changed: 32 additions & 192 deletions
Large diffs are not rendered by default.

core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@
9797
* <li>Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes
9898
* start processing them through {@link SnapshotShardsService#processIndexShardSnapshots(ClusterChangedEvent)} method</li>
9999
* <li>Once shard snapshot is created data node updates state of the shard in the cluster state using the {@link SnapshotShardsService#updateIndexShardSnapshotStatus} method</li>
100-
* <li>When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot as completed</li>
100+
* <li>When last shard is completed master node in {@link TransportSnapshotUpdateStatusAction#innerUpdateSnapshotState} method marks the snapshot as completed</li>
101101
* <li>After cluster state is updated, the {@link #endSnapshot(SnapshotsInProgress.Entry)} finalizes snapshot in the repository,
102102
* notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls {@link #removeSnapshotFromClusterState(Snapshot, SnapshotInfo, Exception)} to remove snapshot from cluster state</li>
103103
* </ul>
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.snapshots;
21+
22+
import org.apache.logging.log4j.Logger;
23+
import org.apache.logging.log4j.message.ParameterizedMessage;
24+
import org.apache.logging.log4j.util.Supplier;
25+
import org.elasticsearch.action.ActionListener;
26+
import org.elasticsearch.action.support.ActionFilters;
27+
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
28+
import org.elasticsearch.cluster.ClusterState;
29+
import org.elasticsearch.cluster.ClusterStateTaskConfig;
30+
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
31+
import org.elasticsearch.cluster.ClusterStateTaskListener;
32+
import org.elasticsearch.cluster.NotMasterException;
33+
import org.elasticsearch.cluster.SnapshotsInProgress;
34+
import org.elasticsearch.cluster.block.ClusterBlockException;
35+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
36+
import org.elasticsearch.cluster.service.ClusterService;
37+
import org.elasticsearch.common.Priority;
38+
import org.elasticsearch.common.collect.ImmutableOpenMap;
39+
import org.elasticsearch.common.inject.Inject;
40+
import org.elasticsearch.common.settings.Settings;
41+
import org.elasticsearch.index.shard.ShardId;
42+
import org.elasticsearch.threadpool.ThreadPool;
43+
import org.elasticsearch.transport.TransportService;
44+
45+
import java.util.ArrayList;
46+
import java.util.List;
47+
48+
import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
49+
50+
/**
51+
* A {@link TransportSnapshotUpdateStatusAction} receives snapshot state messages from {@link SnapshotShardsService},
52+
* then computes and updates the {@link ClusterState}.
53+
*/
54+
public class TransportSnapshotUpdateStatusAction extends TransportMasterNodeAction<UpdateSnapshotStatusRequest,
55+
UpdateSnapshotStatusResponse> {
56+
private final SnapshotUpdateStateExecutor snapshotStateExecutor;
57+
58+
@Inject
59+
public TransportSnapshotUpdateStatusAction(Settings settings, TransportService transportService, ClusterService clusterService,
60+
ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver,
61+
ActionFilters actionFilters, SnapshotsService snapshotsService) {
62+
super(settings, UpdateSnapshotStatusAction.NAME, transportService, clusterService,
63+
threadPool, actionFilters, indexNameExpressionResolver, UpdateSnapshotStatusRequest::new);
64+
this.snapshotStateExecutor = new SnapshotUpdateStateExecutor(snapshotsService, logger);
65+
}
66+
67+
@Override
68+
protected String executor() {
69+
return ThreadPool.Names.SAME;
70+
}
71+
72+
@Override
73+
protected UpdateSnapshotStatusResponse newResponse() {
74+
return new UpdateSnapshotStatusResponse();
75+
}
76+
77+
@Override
78+
protected void masterOperation(UpdateSnapshotStatusRequest request, ClusterState state,
79+
ActionListener<UpdateSnapshotStatusResponse> listener) throws Exception {
80+
innerUpdateSnapshotState(request, listener);
81+
}
82+
83+
@Override
84+
protected ClusterBlockException checkBlock(UpdateSnapshotStatusRequest request, ClusterState state) {
85+
return null;
86+
}
87+
88+
void innerUpdateSnapshotState(final UpdateSnapshotStatusRequest request, ActionListener<UpdateSnapshotStatusResponse> listener) {
89+
logger.trace((Supplier<?>) () -> new ParameterizedMessage("received updated snapshot restore status [{}]", request));
90+
clusterService.submitStateUpdateTask(
91+
"update snapshot state",
92+
request,
93+
ClusterStateTaskConfig.build(Priority.NORMAL),
94+
snapshotStateExecutor,
95+
new ClusterStateTaskListener() {
96+
@Override
97+
public void onFailure(String source, Exception e) {
98+
logger.error((Supplier<?>) () -> new ParameterizedMessage(
99+
"unexpected failure while updating snapshot status [{}]", request), e);
100+
try {
101+
listener.onFailure(e);
102+
} catch (Exception channelException) {
103+
channelException.addSuppressed(e);
104+
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
105+
"failed to send failure [{}] while updating snapshot status [{}]", e, request), channelException);
106+
}
107+
}
108+
109+
@Override
110+
public void onNoLongerMaster(String source) {
111+
logger.error("no longer master while updating snapshot status [{}]", request);
112+
try {
113+
listener.onFailure(new NotMasterException(source));
114+
} catch (Exception channelException) {
115+
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
116+
"{} failed to send no longer master updating snapshot[{}]", request.snapshot(), request), channelException);
117+
}
118+
}
119+
120+
@Override
121+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
122+
try {
123+
listener.onResponse(new UpdateSnapshotStatusResponse());
124+
} catch (Exception channelException) {
125+
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
126+
"failed to send response after updating snapshot status [{}]", request), channelException);
127+
}
128+
}
129+
}
130+
);
131+
}
132+
133+
// The client node sends the update message to the master, then the master node updates the ClusterState.
134+
static class SnapshotUpdateStateExecutor implements ClusterStateTaskExecutor<UpdateSnapshotStatusRequest> {
135+
private final SnapshotsService snapshotsService;
136+
private final Logger logger;
137+
138+
SnapshotUpdateStateExecutor(SnapshotsService snapshotsService, Logger logger) {
139+
this.snapshotsService = snapshotsService;
140+
this.logger = logger;
141+
}
142+
143+
@Override
144+
public ClusterTasksResult<UpdateSnapshotStatusRequest> execute(ClusterState currentState,
145+
List<UpdateSnapshotStatusRequest> tasks) throws Exception {
146+
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
147+
if (snapshots != null) {
148+
int changedCount = 0;
149+
final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
150+
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
151+
ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
152+
boolean updated = false;
153+
154+
for (UpdateSnapshotStatusRequest updateSnapshotState : tasks) {
155+
if (entry.snapshot().equals(updateSnapshotState.snapshot())) {
156+
if (logger.isTraceEnabled()) {
157+
logger.trace("[{}] Updating shard [{}] with status [{}]",
158+
updateSnapshotState.snapshot(), updateSnapshotState.shardId(), updateSnapshotState.status().state());
159+
}
160+
if (updated == false) {
161+
shards.putAll(entry.shards());
162+
updated = true;
163+
}
164+
shards.put(updateSnapshotState.shardId(), updateSnapshotState.status());
165+
changedCount++;
166+
}
167+
}
168+
169+
if (updated) {
170+
if (completed(shards.values()) == false) {
171+
entries.add(new SnapshotsInProgress.Entry(entry, shards.build()));
172+
} else {
173+
// Snapshot is finished - mark it as done
174+
// TODO: Add PARTIAL_SUCCESS status?
175+
SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry,
176+
SnapshotsInProgress.State.SUCCESS, shards.build());
177+
entries.add(updatedEntry);
178+
// Finalize snapshot in the repository
179+
snapshotsService.endSnapshot(updatedEntry);
180+
logger.info("snapshot [{}] is done", updatedEntry.snapshot());
181+
}
182+
} else {
183+
entries.add(entry);
184+
}
185+
}
186+
if (changedCount > 0) {
187+
if (logger.isTraceEnabled()) {
188+
logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount);
189+
}
190+
final SnapshotsInProgress updatedSnapshots = new SnapshotsInProgress(
191+
entries.toArray(new SnapshotsInProgress.Entry[entries.size()]));
192+
return ClusterTasksResult.<UpdateSnapshotStatusRequest>builder().successes(tasks).build(
193+
ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, updatedSnapshots).build());
194+
}
195+
}
196+
return ClusterTasksResult.<UpdateSnapshotStatusRequest>builder().successes(tasks).build(currentState);
197+
}
198+
}
199+
200+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.snapshots;
21+
22+
import org.elasticsearch.action.Action;
23+
import org.elasticsearch.client.ElasticsearchClient;
24+
25+
public class UpdateSnapshotStatusAction extends Action<UpdateSnapshotStatusRequest, UpdateSnapshotStatusResponse,
26+
UpdateSnapshotStatusRequestBuilder> {
27+
public static final UpdateSnapshotStatusAction INSTANCE = new UpdateSnapshotStatusAction();
28+
public static final String NAME = "internal:cluster/snapshot/update_snapshot";
29+
30+
public UpdateSnapshotStatusAction() {
31+
super(NAME);
32+
}
33+
34+
@Override
35+
public UpdateSnapshotStatusRequestBuilder newRequestBuilder(ElasticsearchClient client) {
36+
return new UpdateSnapshotStatusRequestBuilder(client, UpdateSnapshotStatusAction.INSTANCE);
37+
}
38+
39+
@Override
40+
public UpdateSnapshotStatusResponse newResponse() {
41+
return new UpdateSnapshotStatusResponse();
42+
}
43+
}

0 commit comments

Comments
 (0)