64
64
import org .elasticsearch .common .bytes .BytesReference ;
65
65
import org .elasticsearch .common .collect .Tuple ;
66
66
import org .elasticsearch .common .component .AbstractLifecycleComponent ;
67
+ import org .elasticsearch .common .compress .CompressorFactory ;
67
68
import org .elasticsearch .common .compress .NotXContentException ;
68
69
import org .elasticsearch .common .io .Streams ;
69
70
import org .elasticsearch .common .io .stream .BytesStreamOutput ;
71
+ import org .elasticsearch .common .io .stream .StreamOutput ;
70
72
import org .elasticsearch .common .lucene .Lucene ;
71
73
import org .elasticsearch .common .lucene .store .InputStreamIndexInput ;
72
74
import org .elasticsearch .common .metrics .CounterMetric ;
77
79
import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
78
80
import org .elasticsearch .common .xcontent .LoggingDeprecationHandler ;
79
81
import org .elasticsearch .common .xcontent .NamedXContentRegistry ;
82
+ import org .elasticsearch .common .xcontent .XContentBuilder ;
80
83
import org .elasticsearch .common .xcontent .XContentFactory ;
81
84
import org .elasticsearch .common .xcontent .XContentParser ;
82
85
import org .elasticsearch .common .xcontent .XContentType ;
129
132
import java .util .concurrent .LinkedBlockingQueue ;
130
133
import java .util .concurrent .TimeUnit ;
131
134
import java .util .concurrent .atomic .AtomicLong ;
135
+ import java .util .concurrent .atomic .AtomicReference ;
132
136
import java .util .function .Consumer ;
133
137
import java .util .stream .Collectors ;
134
138
import java .util .stream .LongStream ;
@@ -205,8 +209,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
205
209
public static final Setting <Boolean > ALLOW_CONCURRENT_MODIFICATION =
206
210
Setting .boolSetting ("allow_concurrent_modifications" , false , Setting .Property .Deprecated );
207
211
212
+ /**
213
+ * Setting to disable caching of the latest repository data.
214
+ */
215
+ public static final Setting <Boolean > CACHE_REPOSITORY_DATA =
216
+ Setting .boolSetting ("cache_repository_data" , true , Setting .Property .Deprecated );
217
+
208
218
private final boolean compress ;
209
219
220
+ private final boolean cacheRepositoryData ;
221
+
210
222
private final RateLimiter snapshotRateLimiter ;
211
223
212
224
private final RateLimiter restoreRateLimiter ;
@@ -282,6 +294,7 @@ protected BlobStoreRepository(
282
294
snapshotRateLimiter = getRateLimiter (metadata .settings (), "max_snapshot_bytes_per_sec" , new ByteSizeValue (40 , ByteSizeUnit .MB ));
283
295
restoreRateLimiter = getRateLimiter (metadata .settings (), "max_restore_bytes_per_sec" , new ByteSizeValue (40 , ByteSizeUnit .MB ));
284
296
readOnly = metadata .settings ().getAsBoolean ("readonly" , false );
297
+ cacheRepositoryData = CACHE_REPOSITORY_DATA .get (metadata .settings ());
285
298
this .basePath = basePath ;
286
299
287
300
indexShardSnapshotFormat = new ChecksumBlobStoreFormat <>(SNAPSHOT_CODEC , SNAPSHOT_NAME_FORMAT ,
@@ -510,13 +523,16 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Versio
510
523
* @param rootBlobs Blobs at the repository root
511
524
* @return RepositoryData
512
525
*/
513
- private RepositoryData safeRepositoryData (long repositoryStateId , Map <String , BlobMetaData > rootBlobs ) {
526
+ private RepositoryData safeRepositoryData (long repositoryStateId , Map <String , BlobMetaData > rootBlobs ) throws IOException {
514
527
final long generation = latestGeneration (rootBlobs .keySet ());
515
528
final long genToLoad ;
529
+ final Tuple <Long , BytesReference > cached ;
516
530
if (bestEffortConsistency ) {
517
531
genToLoad = latestKnownRepoGen .updateAndGet (known -> Math .max (known , repositoryStateId ));
532
+ cached = null ;
518
533
} else {
519
534
genToLoad = latestKnownRepoGen .get ();
535
+ cached = latestKnownRepositoryData .get ();
520
536
}
521
537
if (genToLoad > generation ) {
522
538
// It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just
@@ -529,6 +545,9 @@ private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, Bl
529
545
throw new RepositoryException (metadata .name (), "concurrent modification of the index-N file, expected current generation [" +
530
546
repositoryStateId + "], actual current generation [" + genToLoad + "]" );
531
547
}
548
+ if (cached != null && cached .v1 () == genToLoad ) {
549
+ return repositoryDataFromCachedEntry (cached );
550
+ }
532
551
return getRepositoryData (genToLoad );
533
552
}
534
553
@@ -1057,6 +1076,9 @@ public void endVerification(String seed) {
1057
1076
// and concurrent modifications.
1058
1077
private final AtomicLong latestKnownRepoGen = new AtomicLong (RepositoryData .UNKNOWN_REPO_GEN );
1059
1078
1079
+ // Best effort cache of the latest known repository data and its generation, cached serialized as compressed json
1080
+ private final AtomicReference <Tuple <Long , BytesReference >> latestKnownRepositoryData = new AtomicReference <>();
1081
+
1060
1082
@ Override
1061
1083
public void getRepositoryData (ActionListener <RepositoryData > listener ) {
1062
1084
if (latestKnownRepoGen .get () == RepositoryData .CORRUPTED_REPO_GEN ) {
@@ -1090,7 +1112,16 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
1090
1112
genToLoad = latestKnownRepoGen .get ();
1091
1113
}
1092
1114
try {
1093
- listener .onResponse (getRepositoryData (genToLoad ));
1115
+ final Tuple <Long , BytesReference > cached = latestKnownRepositoryData .get ();
1116
+ final RepositoryData loaded ;
1117
+ // Caching is not used with #bestEffortConsistency see docs on #cacheRepositoryData for details
1118
+ if (bestEffortConsistency == false && cached != null && cached .v1 () == genToLoad ) {
1119
+ loaded = repositoryDataFromCachedEntry (cached );
1120
+ } else {
1121
+ loaded = getRepositoryData (genToLoad );
1122
+ cacheRepositoryData (loaded );
1123
+ }
1124
+ listener .onResponse (loaded );
1094
1125
return ;
1095
1126
} catch (RepositoryException e ) {
1096
1127
// If the generation to load changed concurrently and we didn't just try loading the same generation before we retry
@@ -1116,6 +1147,59 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
1116
1147
}
1117
1148
}
1118
1149
1150
+ /**
1151
+ * Puts the given {@link RepositoryData} into the cache if it is of a newer generation and only if the repository is not using
1152
+ * {@link #bestEffortConsistency}. When using {@link #bestEffortConsistency} the repository is using listing to find the latest
1153
+ * {@code index-N} blob and there are no hard guarantees that a given repository generation won't be reused since an external
1154
+ * modification can lead to moving from a higher {@code N} to a lower {@code N} value which mean we can't safely assume that a given
1155
+ * generation will always contain the same {@link RepositoryData}.
1156
+ *
1157
+ * @param updated RepositoryData to cache if newer than the cache contents
1158
+ */
1159
+ private void cacheRepositoryData (RepositoryData updated ) {
1160
+ if (cacheRepositoryData && bestEffortConsistency == false ) {
1161
+ final BytesReference serialized ;
1162
+ BytesStreamOutput out = new BytesStreamOutput ();
1163
+ try {
1164
+ try (StreamOutput tmp = CompressorFactory .COMPRESSOR .streamOutput (out );
1165
+ XContentBuilder builder = XContentFactory .jsonBuilder (tmp )) {
1166
+ updated .snapshotsToXContent (builder , true );
1167
+ }
1168
+ serialized = out .bytes ();
1169
+ final int len = serialized .length ();
1170
+ if (len > ByteSizeUnit .KB .toBytes (500 )) {
1171
+ logger .debug ("Not caching repository data of size [{}] for repository [{}] because it is larger than 500KB in" +
1172
+ " serialized size" , len , metadata .name ());
1173
+ if (len > ByteSizeUnit .MB .toBytes (5 )) {
1174
+ logger .warn ("Your repository metadata blob for repository [{}] is larger than 5MB. Consider moving to a fresh" +
1175
+ " repository for new snapshots or deleting unneeded snapshots from your repository to ensure stable" +
1176
+ " repository behavior going forward." , metadata .name ());
1177
+ }
1178
+ // Set empty repository data to not waste heap for an outdated cached value
1179
+ latestKnownRepositoryData .set (null );
1180
+ return ;
1181
+ }
1182
+ } catch (IOException e ) {
1183
+ assert false : new AssertionError ("Impossible, no IO happens here" , e );
1184
+ logger .warn ("Failed to serialize repository data" , e );
1185
+ return ;
1186
+ }
1187
+ latestKnownRepositoryData .updateAndGet (known -> {
1188
+ if (known != null && known .v1 () > updated .getGenId ()) {
1189
+ return known ;
1190
+ }
1191
+ return new Tuple <>(updated .getGenId (), serialized );
1192
+ });
1193
+ }
1194
+ }
1195
+
1196
+ private RepositoryData repositoryDataFromCachedEntry (Tuple <Long , BytesReference > cacheEntry ) throws IOException {
1197
+ return RepositoryData .snapshotsFromXContent (
1198
+ XContentType .JSON .xContent ().createParser (NamedXContentRegistry .EMPTY ,
1199
+ LoggingDeprecationHandler .INSTANCE ,
1200
+ CompressorFactory .COMPRESSOR .streamInput (cacheEntry .v2 ().streamInput ())), cacheEntry .v1 ());
1201
+ }
1202
+
1119
1203
private RepositoryException corruptedStateException (@ Nullable Exception cause ) {
1120
1204
return new RepositoryException (metadata .name (),
1121
1205
"Could not read repository data because the contents of the repository do not match its " +
@@ -1362,6 +1446,7 @@ public void onFailure(String source, Exception e) {
1362
1446
1363
1447
@ Override
1364
1448
public void clusterStateProcessed (String source , ClusterState oldState , ClusterState newState ) {
1449
+ cacheRepositoryData (filteredRepositoryData .withGenId (newGen ));
1365
1450
threadPool .executor (ThreadPool .Names .SNAPSHOT ).execute (ActionRunnable .run (listener , () -> {
1366
1451
// Delete all now outdated index files up to 1000 blobs back from the new generation.
1367
1452
// If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.
0 commit comments