51
51
import org .elasticsearch .common .component .AbstractLifecycleComponent ;
52
52
import org .elasticsearch .common .compress .NotXContentException ;
53
53
import org .elasticsearch .common .io .stream .BytesStreamOutput ;
54
- import org .elasticsearch .common .io .stream .OutputStreamStreamOutput ;
55
- import org .elasticsearch .common .io .stream .StreamOutput ;
56
54
import org .elasticsearch .common .lucene .Lucene ;
57
55
import org .elasticsearch .common .lucene .store .InputStreamIndexInput ;
58
56
import org .elasticsearch .common .metrics .CounterMetric ;
62
60
import org .elasticsearch .common .util .set .Sets ;
63
61
import org .elasticsearch .common .xcontent .LoggingDeprecationHandler ;
64
62
import org .elasticsearch .common .xcontent .NamedXContentRegistry ;
65
- import org .elasticsearch .common .xcontent .ToXContent ;
66
- import org .elasticsearch .common .xcontent .XContentBuilder ;
67
63
import org .elasticsearch .common .xcontent .XContentFactory ;
68
64
import org .elasticsearch .common .xcontent .XContentHelper ;
69
65
import org .elasticsearch .common .xcontent .XContentParser ;
@@ -401,10 +397,7 @@ public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, Met
401
397
402
398
// write the index metadata for each index in the snapshot
403
399
for (IndexId index : indices ) {
404
- final IndexMetaData indexMetaData = clusterMetaData .index (index .getName ());
405
- final BlobPath indexPath = basePath ().add ("indices" ).add (index .getId ());
406
- final BlobContainer indexMetaDataBlobContainer = blobStore ().blobContainer (indexPath );
407
- indexMetaDataFormat .write (indexMetaData , indexMetaDataBlobContainer , snapshotId .getUUID ());
400
+ indexMetaDataFormat .write (clusterMetaData .index (index .getName ()), indexContainer (index ), snapshotId .getUUID ());
408
401
}
409
402
} catch (IOException ex ) {
410
403
throw new SnapshotCreationException (metadata .name (), snapshotId , ex );
@@ -452,7 +445,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action
452
445
snapshotId ,
453
446
ActionListener .map (listener , v -> {
454
447
try {
455
- blobStore ().blobContainer (basePath (). add ( "indices" )).deleteBlobsIgnoringIfNotExists (
448
+ blobStore ().blobContainer (indicesPath ( )).deleteBlobsIgnoringIfNotExists (
456
449
unreferencedIndices .stream ().map (IndexId ::getId ).collect (Collectors .toList ()));
457
450
} catch (IOException e ) {
458
451
logger .warn (() ->
@@ -504,9 +497,8 @@ protected void doRun() {
504
497
}
505
498
506
499
private void deleteIndexMetaDataBlobIgnoringErrors (SnapshotId snapshotId , IndexId indexId ) {
507
- BlobContainer indexMetaDataBlobContainer = blobStore ().blobContainer (basePath ().add ("indices" ).add (indexId .getId ()));
508
500
try {
509
- indexMetaDataFormat .delete (indexMetaDataBlobContainer , snapshotId .getUUID ());
501
+ indexMetaDataFormat .delete (indexContainer ( indexId ) , snapshotId .getUUID ());
510
502
} catch (IOException ex ) {
511
503
logger .warn (() -> new ParameterizedMessage ("[{}] failed to delete metadata for index [{}]" ,
512
504
snapshotId , indexId .getName ()), ex );
@@ -570,8 +562,19 @@ public MetaData getSnapshotGlobalMetaData(final SnapshotId snapshotId) {
570
562
571
563
@ Override
572
564
public IndexMetaData getSnapshotIndexMetaData (final SnapshotId snapshotId , final IndexId index ) throws IOException {
573
- final BlobPath indexPath = basePath ().add ("indices" ).add (index .getId ());
574
- return indexMetaDataFormat .read (blobStore ().blobContainer (indexPath ), snapshotId .getUUID ());
565
+ return indexMetaDataFormat .read (indexContainer (index ), snapshotId .getUUID ());
566
+ }
567
+
568
+ private BlobPath indicesPath () {
569
+ return basePath ().add ("indices" );
570
+ }
571
+
572
+ private BlobContainer indexContainer (IndexId indexId ) {
573
+ return blobStore ().blobContainer (indicesPath ().add (indexId .getId ()));
574
+ }
575
+
576
+ private BlobContainer shardContainer (IndexId indexId , ShardId shardId ) {
577
+ return blobStore ().blobContainer (indicesPath ().add (indexId .getId ()).add (Integer .toString (shardId .getId ())));
575
578
}
576
579
577
580
/**
@@ -619,10 +622,9 @@ public String startVerification() {
619
622
String seed = UUIDs .randomBase64UUID ();
620
623
byte [] testBytes = Strings .toUTF8Bytes (seed );
621
624
BlobContainer testContainer = blobStore ().blobContainer (basePath ().add (testBlobPrefix (seed )));
622
- String blobName = "master.dat" ;
623
625
BytesArray bytes = new BytesArray (testBytes );
624
626
try (InputStream stream = bytes .streamInput ()) {
625
- testContainer .writeBlobAtomic (blobName , stream , bytes .length (), true );
627
+ testContainer .writeBlobAtomic ("master.dat" , stream , bytes .length (), true );
626
628
}
627
629
return seed ;
628
630
}
@@ -695,7 +697,7 @@ public RepositoryData getRepositoryData() {
695
697
}
696
698
}
697
699
698
- public static String testBlobPrefix (String seed ) {
700
+ private static String testBlobPrefix (String seed ) {
699
701
return TESTS_FILE + seed ;
700
702
}
701
703
@@ -715,19 +717,10 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long rep
715
717
"] - possibly due to simultaneous snapshot deletion requests" );
716
718
}
717
719
final long newGen = currentGen + 1 ;
718
- final BytesReference snapshotsBytes ;
719
- try (BytesStreamOutput bStream = new BytesStreamOutput ()) {
720
- try (StreamOutput stream = new OutputStreamStreamOutput (bStream )) {
721
- XContentBuilder builder = XContentFactory .contentBuilder (XContentType .JSON , stream );
722
- repositoryData .snapshotsToXContent (builder , ToXContent .EMPTY_PARAMS );
723
- builder .close ();
724
- }
725
- snapshotsBytes = bStream .bytes ();
726
- }
727
720
// write the index file
728
721
final String indexBlob = INDEX_FILE_PREFIX + Long .toString (newGen );
729
722
logger .debug ("Repository [{}] writing new index generational blob [{}]" , metadata .name (), indexBlob );
730
- writeAtomic (indexBlob , snapshotsBytes , true );
723
+ writeAtomic (indexBlob , BytesReference . bytes ( repositoryData . snapshotsToXContent ( XContentFactory . jsonBuilder ())) , true );
731
724
// write the current generation to the index-latest file
732
725
final BytesReference genBytes ;
733
726
try (BytesStreamOutput bStream = new BytesStreamOutput ()) {
@@ -754,17 +747,9 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long rep
754
747
*/
755
748
void writeIncompatibleSnapshots (RepositoryData repositoryData ) throws IOException {
756
749
assert isReadOnly () == false ; // can not write to a read only repository
757
- final BytesReference bytes ;
758
- try (BytesStreamOutput bStream = new BytesStreamOutput ()) {
759
- try (StreamOutput stream = new OutputStreamStreamOutput (bStream )) {
760
- XContentBuilder builder = XContentFactory .contentBuilder (XContentType .JSON , stream );
761
- repositoryData .incompatibleSnapshotsToXContent (builder , ToXContent .EMPTY_PARAMS );
762
- builder .close ();
763
- }
764
- bytes = bStream .bytes ();
765
- }
766
750
// write the incompatible snapshots blob
767
- writeAtomic (INCOMPATIBLE_SNAPSHOTS_BLOB , bytes , false );
751
+ writeAtomic (INCOMPATIBLE_SNAPSHOTS_BLOB ,
752
+ BytesReference .bytes (repositoryData .incompatibleSnapshotsToXContent (XContentFactory .jsonBuilder ())), false );
768
753
}
769
754
770
755
/**
@@ -857,9 +842,8 @@ public void restoreShard(Store store, SnapshotId snapshotId,
857
842
Version version , IndexId indexId , ShardId snapshotShardId , RecoveryState recoveryState ) {
858
843
ShardId shardId = store .shardId ();
859
844
final Context context = new Context (snapshotId , indexId , shardId , snapshotShardId );
860
- BlobPath path = basePath ().add ("indices" ).add (indexId .getId ()).add (Integer .toString (snapshotShardId .getId ()));
861
- BlobContainer blobContainer = blobStore ().blobContainer (path );
862
- final RestoreContext snapshotContext = new RestoreContext (shardId , snapshotId , recoveryState , blobContainer );
845
+ final RestoreContext snapshotContext =
846
+ new RestoreContext (shardId , snapshotId , recoveryState , shardContainer (indexId , snapshotShardId ));
863
847
try {
864
848
BlobStoreIndexShardSnapshot snapshot = context .loadSnapshot ();
865
849
SnapshotFiles snapshotFiles = new SnapshotFiles (snapshot .snapshot (), snapshot .indexFiles ());
@@ -935,8 +919,7 @@ private class Context {
935
919
Context (SnapshotId snapshotId , IndexId indexId , ShardId shardId , ShardId snapshotShardId ) {
936
920
this .snapshotId = snapshotId ;
937
921
this .shardId = shardId ;
938
- blobContainer = blobStore ().blobContainer (basePath ().add ("indices" ).add (indexId .getId ())
939
- .add (Integer .toString (snapshotShardId .getId ())));
922
+ blobContainer = shardContainer (indexId , snapshotShardId );
940
923
}
941
924
942
925
/**
0 commit comments