7
7
package org .elasticsearch .xpack .ccr .repository ;
8
8
9
9
import org .apache .lucene .index .IndexCommit ;
10
+ import org .apache .lucene .store .RateLimiter ;
10
11
import org .elasticsearch .Version ;
11
12
import org .elasticsearch .action .admin .cluster .state .ClusterStateRequest ;
12
13
import org .elasticsearch .action .admin .cluster .state .ClusterStateResponse ;
25
26
import org .elasticsearch .common .component .AbstractLifecycleComponent ;
26
27
import org .elasticsearch .common .io .stream .StreamInput ;
27
28
import org .elasticsearch .common .settings .Settings ;
29
+ import org .elasticsearch .common .unit .ByteSizeUnit ;
28
30
import org .elasticsearch .common .unit .ByteSizeValue ;
29
31
import org .elasticsearch .index .Index ;
30
32
import org .elasticsearch .index .IndexSettings ;
35
37
import org .elasticsearch .index .snapshots .IndexShardRestoreFailedException ;
36
38
import org .elasticsearch .index .snapshots .IndexShardSnapshotStatus ;
37
39
import org .elasticsearch .index .snapshots .blobstore .BlobStoreIndexShardSnapshot ;
40
+ import org .elasticsearch .index .snapshots .blobstore .RateLimitingInputStream ;
38
41
import org .elasticsearch .index .snapshots .blobstore .SnapshotFiles ;
39
42
import org .elasticsearch .index .store .Store ;
40
43
import org .elasticsearch .index .store .StoreFileMetaData ;
@@ -82,6 +85,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
82
85
private final String remoteClusterAlias ;
83
86
private final Client client ;
84
87
private final CcrLicenseChecker ccrLicenseChecker ;
88
+ private final RateLimiter .SimpleRateLimiter rateLimiter ;
85
89
86
90
public CcrRepository (RepositoryMetaData metadata , Client client , CcrLicenseChecker ccrLicenseChecker , Settings settings ) {
87
91
super (settings );
@@ -90,6 +94,7 @@ public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseCheck
90
94
this .remoteClusterAlias = Strings .split (metadata .name (), NAME_PREFIX )[1 ];
91
95
this .ccrLicenseChecker = ccrLicenseChecker ;
92
96
this .client = client ;
97
+ this .rateLimiter = new RateLimiter .SimpleRateLimiter (new ByteSizeValue (40 , ByteSizeUnit .MB ).getMbFrac ());
93
98
}
94
99
95
100
@ Override
@@ -258,7 +263,7 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v
258
263
// TODO: There should be some local timeout. And if the remote cluster returns an unknown session
259
264
// response, we should be able to retry by creating a new session.
260
265
String name = metadata .name ();
261
- try (RestoreSession restoreSession = RestoreSession . openSession (name , remoteClient , leaderShardId , indexShard , recoveryState )) {
266
+ try (RestoreSession restoreSession = openSession (name , remoteClient , leaderShardId , indexShard , recoveryState )) {
262
267
restoreSession .restoreFiles ();
263
268
} catch (Exception e ) {
264
269
throw new IndexShardRestoreFailedException (indexShard .shardId (), "failed to restore snapshot [" + snapshotId + "]" , e );
@@ -286,7 +291,16 @@ private void maybeUpdateMappings(Client localClient, Client remoteClient, Index
286
291
}
287
292
}
288
293
289
- private static class RestoreSession extends FileRestoreContext implements Closeable {
294
+ private RestoreSession openSession (String repositoryName , Client remoteClient , ShardId leaderShardId , IndexShard indexShard ,
295
+ RecoveryState recoveryState ) {
296
+ String sessionUUID = UUIDs .randomBase64UUID ();
297
+ PutCcrRestoreSessionAction .PutCcrRestoreSessionResponse response = remoteClient .execute (PutCcrRestoreSessionAction .INSTANCE ,
298
+ new PutCcrRestoreSessionRequest (sessionUUID , leaderShardId )).actionGet ();
299
+ return new RestoreSession (repositoryName , remoteClient , sessionUUID , response .getNode (), indexShard , recoveryState ,
300
+ response .getStoreFileMetaData ());
301
+ }
302
+
303
+ private class RestoreSession extends FileRestoreContext implements Closeable {
290
304
291
305
private static final int BUFFER_SIZE = 1 << 16 ;
292
306
@@ -304,15 +318,6 @@ private static class RestoreSession extends FileRestoreContext implements Closea
304
318
this .sourceMetaData = sourceMetaData ;
305
319
}
306
320
307
- static RestoreSession openSession (String repositoryName , Client remoteClient , ShardId leaderShardId , IndexShard indexShard ,
308
- RecoveryState recoveryState ) {
309
- String sessionUUID = UUIDs .randomBase64UUID ();
310
- PutCcrRestoreSessionAction .PutCcrRestoreSessionResponse response = remoteClient .execute (PutCcrRestoreSessionAction .INSTANCE ,
311
- new PutCcrRestoreSessionRequest (sessionUUID , leaderShardId )).actionGet ();
312
- return new RestoreSession (repositoryName , remoteClient , sessionUUID , response .getNode (), indexShard , recoveryState ,
313
- response .getStoreFileMetaData ());
314
- }
315
-
316
321
void restoreFiles () throws IOException {
317
322
ArrayList <BlobStoreIndexShardSnapshot .FileInfo > fileInfos = new ArrayList <>();
318
323
for (StoreFileMetaData fileMetaData : sourceMetaData ) {
@@ -325,7 +330,8 @@ void restoreFiles() throws IOException {
325
330
326
331
@ Override
327
332
protected InputStream fileInputStream (BlobStoreIndexShardSnapshot .FileInfo fileInfo ) {
328
- return new RestoreFileInputStream (remoteClient , sessionUUID , node , fileInfo .metadata ());
333
+ RestoreFileInputStream restoreInputStream = new RestoreFileInputStream (remoteClient , sessionUUID , node , fileInfo .metadata ());
334
+ return new RateLimitingInputStream (restoreInputStream , rateLimiter , (n ) -> {});
329
335
}
330
336
331
337
@ Override
@@ -336,7 +342,7 @@ public void close() {
336
342
}
337
343
}
338
344
339
- private static class RestoreFileInputStream extends InputStream {
345
+ private class RestoreFileInputStream extends InputStream {
340
346
341
347
private final Client remoteClient ;
342
348
private final String sessionUUID ;
0 commit comments