Skip to content

Commit deafb59

Browse files
Make Timestamps Returned by Snapshot APIs Consistent (#43148)
* We don't have to calculate the start and end times form the shards for the status API, we have the start time available from the CS or the `SnapshotInfo` in the repo and can either take the end time form the `SnapshotInfo` or take the most recent time from the shard stats for in progress snapshots * Closes #43074
1 parent 399d53e commit deafb59

File tree

8 files changed

+120
-22
lines changed

8 files changed

+120
-22
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexStatus.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class SnapshotIndexStatus implements Iterable<SnapshotIndexShardStatus>,
5858
stats = new SnapshotStats();
5959
for (SnapshotIndexShardStatus shard : shards) {
6060
indexShards.put(shard.getShardId().getId(), shard);
61-
stats.add(shard.getStats());
61+
stats.add(shard.getStats(), true);
6262
}
6363
shardsStats = new SnapshotShardsStats(shards);
6464
this.indexShards = unmodifiableMap(indexShards);

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,12 @@ public static SnapshotStats fromXContent(XContentParser parser) throws IOExcepti
296296
processedSize);
297297
}
298298

299-
void add(SnapshotStats stats) {
299+
/**
300+
* Add stats instance to the total
301+
* @param stats Stats instance to add
302+
* @param updateTimestamps Whether or not start time and duration should be updated
303+
*/
304+
void add(SnapshotStats stats, boolean updateTimestamps) {
300305
incrementalFileCount += stats.incrementalFileCount;
301306
totalFileCount += stats.totalFileCount;
302307
processedFileCount += stats.processedFileCount;
@@ -309,7 +314,7 @@ void add(SnapshotStats stats) {
309314
// First time here
310315
startTime = stats.startTime;
311316
time = stats.time;
312-
} else {
317+
} else if (updateTimestamps) {
313318
// The time the last snapshot ends
314319
long endTime = Math.max(startTime + time, stats.startTime + stats.time);
315320

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java

+23-8
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@
1919

2020
package org.elasticsearch.action.admin.cluster.snapshots.status;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.cluster.SnapshotsInProgress;
2324
import org.elasticsearch.cluster.SnapshotsInProgress.State;
25+
import org.elasticsearch.common.Nullable;
2426
import org.elasticsearch.common.ParseField;
2527
import org.elasticsearch.common.Strings;
26-
import org.elasticsearch.common.Nullable;
2728
import org.elasticsearch.common.io.stream.StreamInput;
2829
import org.elasticsearch.common.io.stream.StreamOutput;
2930
import org.elasticsearch.common.io.stream.Streamable;
@@ -71,14 +72,14 @@ public class SnapshotStatus implements ToXContentObject, Streamable {
7172
@Nullable
7273
private Boolean includeGlobalState;
7374

74-
SnapshotStatus(final Snapshot snapshot, final State state, final List<SnapshotIndexShardStatus> shards,
75-
final Boolean includeGlobalState) {
75+
SnapshotStatus(Snapshot snapshot, State state, List<SnapshotIndexShardStatus> shards, Boolean includeGlobalState,
76+
long startTime, long time) {
7677
this.snapshot = Objects.requireNonNull(snapshot);
7778
this.state = Objects.requireNonNull(state);
7879
this.shards = Objects.requireNonNull(shards);
7980
this.includeGlobalState = includeGlobalState;
8081
shardsStats = new SnapshotShardsStats(shards);
81-
updateShardStats();
82+
updateShardStats(startTime, time);
8283
}
8384

8485
private SnapshotStatus(Snapshot snapshot, State state, List<SnapshotIndexShardStatus> shards,
@@ -169,7 +170,16 @@ public void readFrom(StreamInput in) throws IOException {
169170
}
170171
shards = Collections.unmodifiableList(builder);
171172
includeGlobalState = in.readOptionalBoolean();
172-
updateShardStats();
173+
final long startTime;
174+
final long time;
175+
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
176+
startTime = in.readLong();
177+
time = in.readLong();
178+
} else {
179+
startTime = 0L;
180+
time = 0L;
181+
}
182+
updateShardStats(startTime, time);
173183
}
174184

175185
@Override
@@ -181,6 +191,10 @@ public void writeTo(StreamOutput out) throws IOException {
181191
shard.writeTo(out);
182192
}
183193
out.writeOptionalBoolean(includeGlobalState);
194+
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
195+
out.writeLong(stats.getStartTime());
196+
out.writeLong(stats.getTime());
197+
}
184198
}
185199

186200
/**
@@ -281,11 +295,12 @@ public static SnapshotStatus fromXContent(XContentParser parser) throws IOExcept
281295
return PARSER.parse(parser, null);
282296
}
283297

284-
private void updateShardStats() {
285-
stats = new SnapshotStats();
298+
private void updateShardStats(long startTime, long time) {
299+
stats = new SnapshotStats(startTime, time, 0, 0, 0, 0, 0, 0);
286300
shardsStats = new SnapshotShardsStats(shards);
287301
for (SnapshotIndexShardStatus shard : shards) {
288-
stats.add(shard.getStats());
302+
// BWC: only update timestamps when we did not get a start time from an old node
303+
stats.add(shard.getStats(), startTime == 0L);
289304
}
290305
}
291306

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,8 @@ private SnapshotsStatusResponse buildResponse(SnapshotsStatusRequest request, Li
187187
shardStatusBuilder.add(shardStatus);
188188
}
189189
builder.add(new SnapshotStatus(entry.snapshot(), entry.state(),
190-
Collections.unmodifiableList(shardStatusBuilder), entry.includeGlobalState()));
190+
Collections.unmodifiableList(shardStatusBuilder), entry.includeGlobalState(), entry.startTime(),
191+
Math.max(threadPool.absoluteTimeInMillis() - entry.startTime(), 0L)));
191192
}
192193
}
193194
// Now add snapshots on disk that are not currently running
@@ -240,8 +241,10 @@ private SnapshotsStatusResponse buildResponse(SnapshotsStatusRequest request, Li
240241
default:
241242
throw new IllegalArgumentException("Unknown snapshot state " + snapshotInfo.state());
242243
}
244+
final long startTime = snapshotInfo.startTime();
243245
builder.add(new SnapshotStatus(new Snapshot(repositoryName, snapshotId), state,
244-
Collections.unmodifiableList(shardStatusBuilder), snapshotInfo.includeGlobalState()));
246+
Collections.unmodifiableList(shardStatusBuilder), snapshotInfo.includeGlobalState(),
247+
startTime, snapshotInfo.endTime() - startTime));
245248
}
246249
}
247250
}

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId,
561561
final Map<String, Object> userMetadata) {
562562
SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId,
563563
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
564-
startTime, failure, System.currentTimeMillis(), totalShards, shardFailures,
564+
startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures,
565565
includeGlobalState, userMetadata);
566566
try {
567567
final RepositoryData updatedRepositoryData = getRepositoryData().addSnapshot(snapshotId, blobStoreSnapshot.state(), indices);
@@ -853,7 +853,7 @@ private void writeAtomic(final String blobName, final BytesReference bytesRef, b
853853
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
854854
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
855855
final ShardId shardId = store.shardId();
856-
final long startTime = threadPool.relativeTimeInMillis();
856+
final long startTime = threadPool.absoluteTimeInMillis();
857857
try {
858858
logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, metadata.name());
859859

@@ -953,7 +953,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
953953
lastSnapshotStatus.getStartTime(),
954954
// snapshotStatus.startTime() is assigned on the same machine,
955955
// so it's safe to use the relative time in millis
956-
threadPool.relativeTimeInMillis() - lastSnapshotStatus.getStartTime(),
956+
threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(),
957957
lastSnapshotStatus.getIncrementalFileCount(),
958958
lastSnapshotStatus.getIncrementalSize()
959959
);
@@ -976,9 +976,9 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
976976
// finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index
977977
finalizeShard(newSnapshotsList, fileListGeneration, blobs, "snapshot creation [" + snapshotId + "]", shardContainer,
978978
shardId, snapshotId);
979-
snapshotStatus.moveToDone(threadPool.relativeTimeInMillis());
979+
snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis());
980980
} catch (Exception e) {
981-
snapshotStatus.moveToFailed(threadPool.relativeTimeInMillis(), ExceptionsHelper.detailedMessage(e));
981+
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.detailedMessage(e));
982982
if (e instanceof IndexShardSnapshotFailedException) {
983983
throw (IndexShardSnapshotFailedException) e;
984984
} else {

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ public ClusterState execute(ClusterState currentState) {
285285
request.partial(),
286286
State.INIT,
287287
snapshotIndices,
288-
System.currentTimeMillis(),
288+
threadPool.absoluteTimeInMillis(),
289289
repositoryData.getGenId(),
290290
null,
291291
request.userMetadata());
@@ -1169,7 +1169,7 @@ public ClusterState execute(ClusterState currentState) {
11691169
// add the snapshot deletion to the cluster state
11701170
SnapshotDeletionsInProgress.Entry entry = new SnapshotDeletionsInProgress.Entry(
11711171
snapshot,
1172-
System.currentTimeMillis(),
1172+
threadPool.absoluteTimeInMillis(),
11731173
repositoryStateId
11741174
);
11751175
if (deletionsInProgress != null) {

server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatusTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void testToString() throws Exception {
5050
List<SnapshotIndexShardStatus> snapshotIndexShardStatuses = new ArrayList<>();
5151
snapshotIndexShardStatuses.add(snapshotIndexShardStatus);
5252
boolean includeGlobalState = randomBoolean();
53-
SnapshotStatus status = new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState);
53+
SnapshotStatus status = new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState, 0L, 0L);
5454

5555
int initializingShards = 0;
5656
int startedShards = 0;
@@ -166,7 +166,7 @@ protected SnapshotStatus createTestInstance() {
166166
snapshotIndexShardStatuses.add(snapshotIndexShardStatus);
167167
}
168168
boolean includeGlobalState = randomBoolean();
169-
return new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState);
169+
return new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState, 0L, 0L);
170170
}
171171

172172
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.snapshots;
20+
21+
import org.elasticsearch.Version;
22+
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
23+
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
24+
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
25+
import org.elasticsearch.client.Client;
26+
import org.elasticsearch.common.settings.Settings;
27+
28+
import java.util.List;
29+
30+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
31+
import static org.hamcrest.Matchers.equalTo;
32+
import static org.hamcrest.Matchers.greaterThan;
33+
34+
public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
35+
36+
public void testStatusApiConsistency() {
37+
Client client = client();
38+
39+
logger.info("--> creating repository");
40+
assertAcked(client.admin().cluster().preparePutRepository("test-repo").setType("fs").setSettings(
41+
Settings.builder().put("location", randomRepoPath()).build()));
42+
43+
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
44+
ensureGreen();
45+
46+
logger.info("--> indexing some data");
47+
for (int i = 0; i < 100; i++) {
48+
index("test-idx-1", "_doc", Integer.toString(i), "foo", "bar" + i);
49+
index("test-idx-2", "_doc", Integer.toString(i), "foo", "baz" + i);
50+
index("test-idx-3", "_doc", Integer.toString(i), "foo", "baz" + i);
51+
}
52+
refresh();
53+
54+
logger.info("--> snapshot");
55+
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
56+
.setWaitForCompletion(true).get();
57+
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
58+
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
59+
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
60+
61+
List<SnapshotInfo> snapshotInfos =
62+
client.admin().cluster().prepareGetSnapshots("test-repo").get().getSnapshots("test-repo");
63+
assertThat(snapshotInfos.size(), equalTo(1));
64+
SnapshotInfo snapshotInfo = snapshotInfos.get(0);
65+
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
66+
assertThat(snapshotInfo.version(), equalTo(Version.CURRENT));
67+
68+
final List<SnapshotStatus> snapshotStatus = client.admin().cluster().snapshotsStatus(
69+
new SnapshotsStatusRequest("test-repo", new String[]{"test-snap"})).actionGet().getSnapshots();
70+
assertThat(snapshotStatus.size(), equalTo(1));
71+
final SnapshotStatus snStatus = snapshotStatus.get(0);
72+
assertEquals(snStatus.getStats().getStartTime(), snapshotInfo.startTime());
73+
assertEquals(snStatus.getStats().getTime(), snapshotInfo.endTime() - snapshotInfo.startTime());
74+
}
75+
}

0 commit comments

Comments
 (0)