64
64
import org .elasticsearch .common .bytes .BytesReference ;
65
65
import org .elasticsearch .common .collect .Tuple ;
66
66
import org .elasticsearch .common .component .AbstractLifecycleComponent ;
67
+ import org .elasticsearch .common .compress .CompressorFactory ;
67
68
import org .elasticsearch .common .compress .NotXContentException ;
68
69
import org .elasticsearch .common .io .Streams ;
69
70
import org .elasticsearch .common .io .stream .BytesStreamOutput ;
71
+ import org .elasticsearch .common .io .stream .StreamOutput ;
70
72
import org .elasticsearch .common .lucene .Lucene ;
71
73
import org .elasticsearch .common .lucene .store .InputStreamIndexInput ;
72
74
import org .elasticsearch .common .metrics .CounterMetric ;
77
79
import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
78
80
import org .elasticsearch .common .xcontent .LoggingDeprecationHandler ;
79
81
import org .elasticsearch .common .xcontent .NamedXContentRegistry ;
82
+ import org .elasticsearch .common .xcontent .XContentBuilder ;
80
83
import org .elasticsearch .common .xcontent .XContentFactory ;
81
84
import org .elasticsearch .common .xcontent .XContentParser ;
82
85
import org .elasticsearch .common .xcontent .XContentType ;
130
133
import java .util .concurrent .LinkedBlockingQueue ;
131
134
import java .util .concurrent .TimeUnit ;
132
135
import java .util .concurrent .atomic .AtomicLong ;
136
+ import java .util .concurrent .atomic .AtomicReference ;
133
137
import java .util .function .Consumer ;
134
138
import java .util .stream .Collectors ;
135
139
import java .util .stream .LongStream ;
@@ -208,8 +212,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
208
212
public static final Setting <Boolean > ALLOW_CONCURRENT_MODIFICATION =
209
213
Setting .boolSetting ("allow_concurrent_modifications" , false , Setting .Property .Deprecated );
210
214
215
+ /**
216
+ * Setting to disable caching of the latest repository data.
217
+ */
218
+ public static final Setting <Boolean > CACHE_REPOSITORY_DATA =
219
+ Setting .boolSetting ("cache_repository_data" , true , Setting .Property .Deprecated );
220
+
211
221
private final boolean compress ;
212
222
223
+ private final boolean cacheRepositoryData ;
224
+
213
225
private final RateLimiter snapshotRateLimiter ;
214
226
215
227
private final RateLimiter restoreRateLimiter ;
@@ -284,8 +296,7 @@ protected BlobStoreRepository(
284
296
snapshotRateLimiter = getRateLimiter (metadata .settings (), "max_snapshot_bytes_per_sec" , new ByteSizeValue (40 , ByteSizeUnit .MB ));
285
297
restoreRateLimiter = getRateLimiter (metadata .settings (), "max_restore_bytes_per_sec" , new ByteSizeValue (40 , ByteSizeUnit .MB ));
286
298
readOnly = metadata .settings ().getAsBoolean ("readonly" , false );
287
-
288
-
299
+ cacheRepositoryData = CACHE_REPOSITORY_DATA .get (metadata .settings ());
289
300
indexShardSnapshotFormat = new ChecksumBlobStoreFormat <>(SNAPSHOT_CODEC , SNAPSHOT_NAME_FORMAT ,
290
301
BlobStoreIndexShardSnapshot ::fromXContent , namedXContentRegistry , compress );
291
302
indexShardSnapshotsFormat = new ChecksumBlobStoreFormat <>(SNAPSHOT_INDEX_CODEC , SNAPSHOT_INDEX_NAME_FORMAT ,
@@ -521,13 +532,16 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea
521
532
* @param rootBlobs Blobs at the repository root
522
533
* @return RepositoryData
523
534
*/
524
- private RepositoryData safeRepositoryData (long repositoryStateId , Map <String , BlobMetaData > rootBlobs ) {
535
+ private RepositoryData safeRepositoryData (long repositoryStateId , Map <String , BlobMetaData > rootBlobs ) throws IOException {
525
536
final long generation = latestGeneration (rootBlobs .keySet ());
526
537
final long genToLoad ;
538
+ final Tuple <Long , BytesReference > cached ;
527
539
if (bestEffortConsistency ) {
528
540
genToLoad = latestKnownRepoGen .updateAndGet (known -> Math .max (known , repositoryStateId ));
541
+ cached = null ;
529
542
} else {
530
543
genToLoad = latestKnownRepoGen .get ();
544
+ cached = latestKnownRepositoryData .get ();
531
545
}
532
546
if (genToLoad > generation ) {
533
547
// It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just
@@ -540,6 +554,9 @@ private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, Bl
540
554
throw new RepositoryException (metadata .name (), "concurrent modification of the index-N file, expected current generation [" +
541
555
repositoryStateId + "], actual current generation [" + genToLoad + "]" );
542
556
}
557
+ if (cached != null && cached .v1 () == genToLoad ) {
558
+ return repositoryDataFromCachedEntry (cached );
559
+ }
543
560
return getRepositoryData (genToLoad );
544
561
}
545
562
@@ -1067,6 +1084,9 @@ public void endVerification(String seed) {
1067
1084
// and concurrent modifications.
1068
1085
private final AtomicLong latestKnownRepoGen = new AtomicLong (RepositoryData .UNKNOWN_REPO_GEN );
1069
1086
1087
+ // Best effort cache of the latest known repository data and its generation, cached serialized as compressed json
1088
+ private final AtomicReference <Tuple <Long , BytesReference >> latestKnownRepositoryData = new AtomicReference <>();
1089
+
1070
1090
@ Override
1071
1091
public void getRepositoryData (ActionListener <RepositoryData > listener ) {
1072
1092
if (latestKnownRepoGen .get () == RepositoryData .CORRUPTED_REPO_GEN ) {
@@ -1100,7 +1120,16 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
1100
1120
genToLoad = latestKnownRepoGen .get ();
1101
1121
}
1102
1122
try {
1103
- listener .onResponse (getRepositoryData (genToLoad ));
1123
+ final Tuple <Long , BytesReference > cached = latestKnownRepositoryData .get ();
1124
+ final RepositoryData loaded ;
1125
+ // Caching is not used with #bestEffortConsistency see docs on #cacheRepositoryData for details
1126
+ if (bestEffortConsistency == false && cached != null && cached .v1 () == genToLoad ) {
1127
+ loaded = repositoryDataFromCachedEntry (cached );
1128
+ } else {
1129
+ loaded = getRepositoryData (genToLoad );
1130
+ cacheRepositoryData (loaded );
1131
+ }
1132
+ listener .onResponse (loaded );
1104
1133
return ;
1105
1134
} catch (RepositoryException e ) {
1106
1135
// If the generation to load changed concurrently and we didn't just try loading the same generation before we retry
@@ -1126,6 +1155,59 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
1126
1155
}
1127
1156
}
1128
1157
1158
+ /**
1159
+ * Puts the given {@link RepositoryData} into the cache if it is of a newer generation and only if the repository is not using
1160
+ * {@link #bestEffortConsistency}. When using {@link #bestEffortConsistency} the repository is using listing to find the latest
1161
+ * {@code index-N} blob and there are no hard guarantees that a given repository generation won't be reused since an external
1162
+ * modification can lead to moving from a higher {@code N} to a lower {@code N} value which mean we can't safely assume that a given
1163
+ * generation will always contain the same {@link RepositoryData}.
1164
+ *
1165
+ * @param updated RepositoryData to cache if newer than the cache contents
1166
+ */
1167
+ private void cacheRepositoryData (RepositoryData updated ) {
1168
+ if (cacheRepositoryData && bestEffortConsistency == false ) {
1169
+ final BytesReference serialized ;
1170
+ BytesStreamOutput out = new BytesStreamOutput ();
1171
+ try {
1172
+ try (StreamOutput tmp = CompressorFactory .COMPRESSOR .streamOutput (out );
1173
+ XContentBuilder builder = XContentFactory .jsonBuilder (tmp )) {
1174
+ updated .snapshotsToXContent (builder , true );
1175
+ }
1176
+ serialized = out .bytes ();
1177
+ final int len = serialized .length ();
1178
+ if (len > ByteSizeUnit .KB .toBytes (500 )) {
1179
+ logger .debug ("Not caching repository data of size [{}] for repository [{}] because it is larger than 500KB in" +
1180
+ " serialized size" , len , metadata .name ());
1181
+ if (len > ByteSizeUnit .MB .toBytes (5 )) {
1182
+ logger .warn ("Your repository metadata blob for repository [{}] is larger than 5MB. Consider moving to a fresh" +
1183
+ " repository for new snapshots or deleting unneeded snapshots from your repository to ensure stable" +
1184
+ " repository behavior going forward." , metadata .name ());
1185
+ }
1186
+ // Set empty repository data to not waste heap for an outdated cached value
1187
+ latestKnownRepositoryData .set (null );
1188
+ return ;
1189
+ }
1190
+ } catch (IOException e ) {
1191
+ assert false : new AssertionError ("Impossible, no IO happens here" , e );
1192
+ logger .warn ("Failed to serialize repository data" , e );
1193
+ return ;
1194
+ }
1195
+ latestKnownRepositoryData .updateAndGet (known -> {
1196
+ if (known != null && known .v1 () > updated .getGenId ()) {
1197
+ return known ;
1198
+ }
1199
+ return new Tuple <>(updated .getGenId (), serialized );
1200
+ });
1201
+ }
1202
+ }
1203
+
1204
+ private RepositoryData repositoryDataFromCachedEntry (Tuple <Long , BytesReference > cacheEntry ) throws IOException {
1205
+ return RepositoryData .snapshotsFromXContent (
1206
+ XContentType .JSON .xContent ().createParser (NamedXContentRegistry .EMPTY ,
1207
+ LoggingDeprecationHandler .INSTANCE ,
1208
+ CompressorFactory .COMPRESSOR .streamInput (cacheEntry .v2 ().streamInput ())), cacheEntry .v1 ());
1209
+ }
1210
+
1129
1211
private RepositoryException corruptedStateException (@ Nullable Exception cause ) {
1130
1212
return new RepositoryException (metadata .name (),
1131
1213
"Could not read repository data because the contents of the repository do not match its " +
@@ -1372,6 +1454,7 @@ public void onFailure(String source, Exception e) {
1372
1454
1373
1455
@ Override
1374
1456
public void clusterStateProcessed (String source , ClusterState oldState , ClusterState newState ) {
1457
+ cacheRepositoryData (filteredRepositoryData .withGenId (newGen ));
1375
1458
threadPool .executor (ThreadPool .Names .SNAPSHOT ).execute (ActionRunnable .run (listener , () -> {
1376
1459
// Delete all now outdated index files up to 1000 blobs back from the new generation.
1377
1460
// If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.
0 commit comments