Skip to content

Commit 7e82230

Browse files
committed
Apply feedback
1 parent b08ca07 commit 7e82230

File tree

3 files changed

+58
-163
lines changed

3 files changed

+58
-163
lines changed

core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@
113113
import java.util.Collection;
114114
import java.util.Collections;
115115
import java.util.HashMap;
116-
import java.util.HashSet;
117116
import java.util.List;
118117
import java.util.Map;
119118
import java.util.Set;
@@ -1516,16 +1515,9 @@ public void restore() throws IOException {
15161515
logger.trace("no files to recover, all exists within the local store");
15171516
}
15181517

1519-
if (logger.isTraceEnabled()) {
1520-
logger.trace("[{}] [{}] recovering_files [{}] with total_size [{}], reusing_files [{}] with reused_size [{}]", shardId, snapshotId,
1521-
index.totalRecoverFiles(), new ByteSizeValue(index.totalRecoverBytes()), index.reusedFileCount(), new ByteSizeValue(index.reusedFileCount()));
1522-
}
15231518
try {
1524-
// list of all existing store files without the identical ones
1525-
final Set<String> deleteIfExistFiles = Sets.difference(
1526-
new HashSet<>(Arrays.asList(store.directory().listAll())),
1527-
diff.identical.stream().map(StoreFileMetaData::name).collect(Collectors.toSet())
1528-
);
1519+
// list of all existing store files
1520+
final List<String> deleteIfExistFiles = Arrays.asList(store.directory().listAll());
15291521

15301522
// restore the files from the snapshot to the Lucene store
15311523
for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) {
@@ -1622,5 +1614,4 @@ private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, fi
16221614
}
16231615
}
16241616
}
1625-
16261617
}

core/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java

Lines changed: 18 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -22,50 +22,32 @@
2222
import org.apache.lucene.store.Directory;
2323
import org.apache.lucene.util.IOUtils;
2424
import org.apache.lucene.util.TestUtil;
25-
import org.elasticsearch.Version;
2625
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
27-
import org.elasticsearch.cluster.node.DiscoveryNode;
28-
import org.elasticsearch.cluster.routing.RecoverySource;
2926
import org.elasticsearch.cluster.routing.ShardRouting;
3027
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
31-
import org.elasticsearch.cluster.routing.ShardRoutingState;
3228
import org.elasticsearch.common.UUIDs;
33-
import org.elasticsearch.common.blobstore.BlobContainer;
34-
import org.elasticsearch.common.blobstore.BlobMetaData;
35-
import org.elasticsearch.common.blobstore.BlobPath;
36-
import org.elasticsearch.common.blobstore.BlobStore;
37-
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
38-
import org.elasticsearch.common.io.Streams;
3929
import org.elasticsearch.common.settings.Settings;
40-
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
41-
import org.elasticsearch.index.Index;
42-
import org.elasticsearch.index.engine.Engine;
30+
import org.elasticsearch.env.Environment;
31+
import org.elasticsearch.env.TestEnvironment;
4332
import org.elasticsearch.index.shard.IndexShard;
4433
import org.elasticsearch.index.shard.IndexShardState;
4534
import org.elasticsearch.index.shard.IndexShardTestCase;
4635
import org.elasticsearch.index.shard.ShardId;
47-
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
4836
import org.elasticsearch.index.store.Store;
4937
import org.elasticsearch.index.store.StoreFileMetaData;
50-
import org.elasticsearch.indices.recovery.RecoveryState;
5138
import org.elasticsearch.repositories.IndexId;
39+
import org.elasticsearch.repositories.Repository;
40+
import org.elasticsearch.repositories.fs.FsRepository;
5241
import org.elasticsearch.snapshots.Snapshot;
5342
import org.elasticsearch.snapshots.SnapshotId;
5443

55-
import java.io.ByteArrayInputStream;
56-
import java.io.ByteArrayOutputStream;
57-
import java.io.FileNotFoundException;
5844
import java.io.IOException;
59-
import java.io.InputStream;
6045
import java.nio.file.Files;
46+
import java.nio.file.Path;
6147
import java.util.Arrays;
62-
import java.util.HashMap;
6348
import java.util.List;
64-
import java.util.Map;
65-
import java.util.stream.Collectors;
6649

6750
import static org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE;
68-
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
6951

7052
/**
7153
* This class tests the behavior of {@link BlobStoreRepository} when it
@@ -96,7 +78,7 @@ public void testRestoreSnapshotWithExistingFiles() throws IOException {
9678
assertDocCount(shard, numDocs);
9779

9880
// snapshot the shard
99-
final BlobStoreRepository repository = createRepository();
81+
final Repository repository = createRepository();
10082
final Snapshot snapshot = new Snapshot(repository.getMetadata().name(), new SnapshotId(randomAlphaOfLength(10), "_uuid"));
10183
snapshotShard(shard, snapshot, repository);
10284

@@ -143,135 +125,19 @@ public void testRestoreSnapshotWithExistingFiles() throws IOException {
143125
}
144126
}
145127

146-
/** Recover a shard from a snapshot using a given repository **/
147-
private void recoverShardFromSnapshot(final IndexShard shard,
148-
final Snapshot snapshot,
149-
final BlobStoreRepository repository) throws IOException {
150-
final Version version = Version.CURRENT;
151-
final ShardId shardId = shard.shardId();
152-
final String index = shard.shardId().getIndexName();
153-
final IndexId indexId = new IndexId(index, UUIDs.randomBase64UUID());
154-
final DiscoveryNode node = new DiscoveryNode(randomAlphaOfLength(25), buildNewFakeTransportAddress(), version);
155-
final RecoverySource.SnapshotRecoverySource recoverySource = new RecoverySource.SnapshotRecoverySource(snapshot, version, index);
156-
final ShardRouting shardRouting = newShardRouting(shardId, node.getId(), true, recoverySource, ShardRoutingState.INITIALIZING);
157-
158-
shard.markAsRecovering("from snapshot", new RecoveryState(shardRouting, node, null));
159-
repository.restoreShard(shard, snapshot.getSnapshotId(), version, indexId, shard.shardId(), shard.recoveryState());
160-
}
161-
162-
/** Snapshot a shard using a given repository **/
163-
private void snapshotShard(final IndexShard shard,
164-
final Snapshot snapshot,
165-
final BlobStoreRepository repository) throws IOException {
166-
final IndexShardSnapshotStatus snapshotStatus = new IndexShardSnapshotStatus();
167-
try (Engine.IndexCommitRef indexCommitRef = shard.acquireIndexCommit(true)) {
168-
Index index = shard.shardId().getIndex();
169-
IndexId indexId = new IndexId(index.getName(), index.getUUID());
170-
171-
repository.snapshotShard(shard, snapshot.getSnapshotId(), indexId, indexCommitRef.getIndexCommit(), snapshotStatus);
172-
}
173-
assertEquals(IndexShardSnapshotStatus.Stage.DONE, snapshotStatus.stage());
174-
assertEquals(shard.snapshotStoreMetadata().size(), snapshotStatus.numberOfFiles());
175-
assertNull(snapshotStatus.failure());
176-
}
177-
178-
179-
/**
180-
* A {@link BlobStoreRepository} implementation that works in memory.
181-
*
182-
* It implements only the methods required by the tests and is not thread safe.
183-
*/
184-
class MemoryBlobStoreRepository extends BlobStoreRepository {
185-
186-
private final Map<String, byte[]> files = new HashMap<>();
187-
188-
MemoryBlobStoreRepository(final RepositoryMetaData metadata, final Settings settings, final NamedXContentRegistry registry) {
189-
super(metadata, settings, registry);
190-
}
191-
192-
@Override
193-
protected BlobStore blobStore() {
194-
return new BlobStore() {
195-
@Override
196-
public BlobContainer blobContainer(BlobPath path) {
197-
return new BlobContainer() {
198-
@Override
199-
public BlobPath path() {
200-
return new BlobPath();
201-
}
202-
203-
@Override
204-
public boolean blobExists(String blobName) {
205-
return files.containsKey(blobName);
206-
}
207-
208-
@Override
209-
public InputStream readBlob(String blobName) throws IOException {
210-
if (blobExists(blobName) == false) {
211-
throw new FileNotFoundException(blobName);
212-
}
213-
return new ByteArrayInputStream(files.get(blobName));
214-
}
215-
216-
@Override
217-
public void writeBlob(String blobName, InputStream in, long blobSize) throws IOException {
218-
try (ByteArrayOutputStream out = new ByteArrayOutputStream((int) blobSize)) {
219-
Streams.copy(in, out);
220-
files.put(blobName, out.toByteArray());
221-
}
222-
}
223-
224-
@Override
225-
public void deleteBlob(String blobName) throws IOException {
226-
files.remove(blobName);
227-
}
228-
229-
@Override
230-
public Map<String, BlobMetaData> listBlobs() throws IOException {
231-
final Map<String, BlobMetaData> blobs = new HashMap<>(files.size());
232-
files.forEach((key, value) -> blobs.put(key, new PlainBlobMetaData(key, value.length)));
233-
return blobs;
234-
}
235-
236-
@Override
237-
public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException {
238-
return listBlobs().entrySet().stream()
239-
.filter(e -> e.getKey().startsWith(blobNamePrefix))
240-
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
241-
}
242-
243-
@Override
244-
public void move(String sourceBlobName, String targetBlobName) throws IOException {
245-
byte[] bytes = files.remove(sourceBlobName);
246-
if (bytes == null) {
247-
throw new FileNotFoundException(sourceBlobName);
248-
}
249-
files.put(targetBlobName, bytes);
250-
}
251-
};
252-
}
253-
254-
@Override
255-
public void delete(BlobPath path) throws IOException {
256-
throw new UnsupportedOperationException("MemoryBlobStoreRepository does not support this method");
257-
}
258-
259-
@Override
260-
public void close() throws IOException {
261-
files.clear();
262-
}
263-
};
264-
}
265-
266-
@Override
267-
protected BlobPath basePath() {
268-
return new BlobPath();
269-
}
128+
/** Create a {@link Repository} with a random name **/
129+
private Repository createRepository() throws IOException {
130+
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
131+
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
132+
return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry());
270133
}
271134

272-
/** Create a {@link BlobStoreRepository} with a random name **/
273-
private BlobStoreRepository createRepository() {
274-
String name = randomAlphaOfLength(10);
275-
return new MemoryBlobStoreRepository(new RepositoryMetaData(name, "in-memory", Settings.EMPTY), Settings.EMPTY, xContentRegistry());
135+
/** Create a {@link Environment} with random path.home and path.repo **/
136+
private Environment createEnvironment() {
137+
Path home = createTempDir();
138+
return TestEnvironment.newEnvironment(Settings.builder()
139+
.put(Environment.PATH_HOME_SETTING.getKey(), home.toAbsolutePath())
140+
.put(Environment.PATH_REPO_SETTING.getKey(), home.resolve("repo").toAbsolutePath())
141+
.build());
276142
}
277143
}

test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.elasticsearch.common.util.BigArrays;
4747
import org.elasticsearch.common.xcontent.XContentType;
4848
import org.elasticsearch.env.NodeEnvironment;
49+
import org.elasticsearch.index.Index;
4950
import org.elasticsearch.index.IndexSettings;
5051
import org.elasticsearch.index.MapperTestUtils;
5152
import org.elasticsearch.index.VersionType;
@@ -60,6 +61,7 @@
6061
import org.elasticsearch.index.mapper.Uid;
6162
import org.elasticsearch.index.seqno.SequenceNumbers;
6263
import org.elasticsearch.index.similarity.SimilarityService;
64+
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
6365
import org.elasticsearch.index.store.DirectoryService;
6466
import org.elasticsearch.index.store.Store;
6567
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
@@ -69,6 +71,9 @@
6971
import org.elasticsearch.indices.recovery.RecoveryTarget;
7072
import org.elasticsearch.indices.recovery.StartRecoveryRequest;
7173
import org.elasticsearch.node.Node;
74+
import org.elasticsearch.repositories.IndexId;
75+
import org.elasticsearch.repositories.Repository;
76+
import org.elasticsearch.snapshots.Snapshot;
7277
import org.elasticsearch.test.DummyShardLock;
7378
import org.elasticsearch.test.ESTestCase;
7479
import org.elasticsearch.threadpool.TestThreadPool;
@@ -85,6 +90,7 @@
8590
import java.util.function.BiFunction;
8691
import java.util.function.Consumer;
8792

93+
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
8894
import static org.hamcrest.Matchers.contains;
8995
import static org.hamcrest.Matchers.hasSize;
9096

@@ -583,6 +589,38 @@ protected void flushShard(IndexShard shard, boolean force) {
583589
shard.flush(new FlushRequest(shard.shardId().getIndexName()).force(force));
584590
}
585591

592+
/** Recover a shard from a snapshot using a given repository **/
593+
protected void recoverShardFromSnapshot(final IndexShard shard,
594+
final Snapshot snapshot,
595+
final Repository repository) throws IOException {
596+
final Version version = Version.CURRENT;
597+
final ShardId shardId = shard.shardId();
598+
final String index = shardId.getIndexName();
599+
final IndexId indexId = new IndexId(shardId.getIndex().getName(), shardId.getIndex().getUUID());
600+
final DiscoveryNode node = getFakeDiscoNode(shard.routingEntry().currentNodeId());
601+
final RecoverySource.SnapshotRecoverySource recoverySource = new RecoverySource.SnapshotRecoverySource(snapshot, version, index);
602+
final ShardRouting shardRouting = newShardRouting(shardId, node.getId(), true, recoverySource, ShardRoutingState.INITIALIZING);
603+
604+
shard.markAsRecovering("from snapshot", new RecoveryState(shardRouting, node, null));
605+
repository.restoreShard(shard, snapshot.getSnapshotId(), version, indexId, shard.shardId(), shard.recoveryState());
606+
}
607+
608+
/** Snapshot a shard using a given repository **/
609+
protected void snapshotShard(final IndexShard shard,
610+
final Snapshot snapshot,
611+
final Repository repository) throws IOException {
612+
final IndexShardSnapshotStatus snapshotStatus = new IndexShardSnapshotStatus();
613+
try (Engine.IndexCommitRef indexCommitRef = shard.acquireIndexCommit(true)) {
614+
Index index = shard.shardId().getIndex();
615+
IndexId indexId = new IndexId(index.getName(), index.getUUID());
616+
617+
repository.snapshotShard(shard, snapshot.getSnapshotId(), indexId, indexCommitRef.getIndexCommit(), snapshotStatus);
618+
}
619+
assertEquals(IndexShardSnapshotStatus.Stage.DONE, snapshotStatus.stage());
620+
assertEquals(shard.snapshotStoreMetadata().size(), snapshotStatus.numberOfFiles());
621+
assertNull(snapshotStatus.failure());
622+
}
623+
586624
/**
587625
* Helper method to access (package-protected) engine from tests
588626
*/

0 commit comments

Comments
 (0)