Skip to content

Commit 216dda3

Browse files
committed
Throttling of recovery (both gateway recovery and peer node recovery), closes #176.
1 parent 2301b0b commit 216dda3

File tree

16 files changed

+436
-179
lines changed

16 files changed

+436
-179
lines changed

modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public class ClusterHealthRequest extends MasterNodeOperationRequest {
4444

4545
private int waitForRelocatingShards = -1;
4646

47+
private int waitForActiveShards = -1;
48+
4749
ClusterHealthRequest() {
4850
}
4951

@@ -95,6 +97,15 @@ public ClusterHealthRequest waitForRelocatingShards(int waitForRelocatingShards)
9597
return this;
9698
}
9799

100+
public int waitForActiveShards() {
101+
return waitForActiveShards;
102+
}
103+
104+
public ClusterHealthRequest waitForActiveShards(int waitForActiveShards) {
105+
this.waitForActiveShards = waitForActiveShards;
106+
return this;
107+
}
108+
98109
@Override public ActionRequestValidationException validate() {
99110
return null;
100111
}
@@ -115,6 +126,7 @@ public ClusterHealthRequest waitForRelocatingShards(int waitForRelocatingShards)
115126
waitForStatus = ClusterHealthStatus.fromValue(in.readByte());
116127
}
117128
waitForRelocatingShards = in.readInt();
129+
waitForActiveShards = in.readInt();
118130
}
119131

120132
@Override public void writeTo(StreamOutput out) throws IOException {
@@ -135,5 +147,6 @@ public ClusterHealthRequest waitForRelocatingShards(int waitForRelocatingShards)
135147
out.writeByte(waitForStatus.value());
136148
}
137149
out.writeInt(waitForRelocatingShards);
150+
out.writeInt(waitForActiveShards);
138151
}
139152
}

modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,16 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
6565
}
6666

6767
@Override protected ClusterHealthResponse masterOperation(ClusterHealthRequest request) throws ElasticSearchException {
68-
int waitFor = 2;
68+
int waitFor = 3;
6969
if (request.waitForStatus() == null) {
7070
waitFor--;
7171
}
7272
if (request.waitForRelocatingShards() == -1) {
7373
waitFor--;
7474
}
75+
if (request.waitForActiveShards() == -1) {
76+
waitFor--;
77+
}
7578
if (waitFor == 0) {
7679
// no need to wait for anything
7780
return clusterHealth(request);
@@ -86,6 +89,9 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
8689
if (request.waitForRelocatingShards() != -1 && response.relocatingShards() <= request.waitForRelocatingShards()) {
8790
waitForCounter++;
8891
}
92+
if (request.waitForActiveShards() != -1 && response.activeShards() >= request.waitForActiveShards()) {
93+
waitForCounter++;
94+
}
8995
if (waitForCounter == waitFor) {
9096
return response;
9197
}

modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,11 +235,13 @@ public static class Index {
235235
private long version;
236236
private int numberOfFiles;
237237
private SizeValue totalSize;
238+
private TimeValue throttlingWaitTime;
238239

239-
public Index(long version, int numberOfFiles, SizeValue totalSize) {
240+
public Index(long version, int numberOfFiles, SizeValue totalSize, TimeValue throttlingWaitTime) {
240241
this.version = version;
241242
this.numberOfFiles = numberOfFiles;
242243
this.totalSize = totalSize;
244+
this.throttlingWaitTime = throttlingWaitTime;
243245
}
244246

245247
public long version() {
@@ -253,6 +255,10 @@ public int numberOfFiles() {
253255
public SizeValue totalSize() {
254256
return totalSize;
255257
}
258+
259+
public TimeValue throttlingWaitTime() {
260+
return throttlingWaitTime;
261+
}
256262
}
257263
}
258264
}

modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java

Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.index.shard.service.InternalIndexShard;
3333
import org.elasticsearch.index.store.Store;
3434
import org.elasticsearch.index.translog.Translog;
35+
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
3536
import org.elasticsearch.threadpool.ThreadPool;
3637
import org.elasticsearch.util.StopWatch;
3738
import org.elasticsearch.util.TimeValue;
@@ -58,6 +59,8 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
5859

5960
private final Store store;
6061

62+
private final RecoveryThrottler recoveryThrottler;
63+
6164

6265
private volatile long lastIndexVersion;
6366

@@ -73,12 +76,13 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
7376

7477
@Inject public IndexShardGatewayService(ShardId shardId, @IndexSettings Settings indexSettings,
7578
ThreadPool threadPool, IndexShard indexShard, IndexShardGateway shardGateway,
76-
Store store) {
79+
Store store, RecoveryThrottler recoveryThrottler) {
7780
super(shardId, indexSettings);
7881
this.threadPool = threadPool;
7982
this.indexShard = (InternalIndexShard) indexShard;
8083
this.shardGateway = shardGateway;
8184
this.store = store;
85+
this.recoveryThrottler = recoveryThrottler;
8286

8387
this.snapshotOnClose = componentSettings.getAsBoolean("snapshot_on_close", true);
8488
this.snapshotInterval = componentSettings.getAsTime("snapshot_interval", TimeValue.timeValueSeconds(10));
@@ -99,36 +103,57 @@ public synchronized void recover() throws IndexShardGatewayRecoveryException, Ig
99103
if (!indexShard.routingEntry().primary()) {
100104
throw new ElasticSearchIllegalStateException("Trying to recover when the shard is in backup state");
101105
}
106+
102107
// clear the store, we are going to recover into it
103108
try {
104109
store.deleteContent();
105110
} catch (IOException e) {
106111
logger.debug("Failed to delete store before recovery from gateway", e);
107112
}
108113
indexShard.recovering();
109-
logger.debug("Starting recovery from {}", shardGateway);
110-
StopWatch stopWatch = new StopWatch().start();
111-
IndexShardGateway.RecoveryStatus recoveryStatus = shardGateway.recover();
112114

113-
lastIndexVersion = recoveryStatus.index().version();
114-
lastTranslogId = recoveryStatus.translog().translogId();
115-
lastTranslogSize = recoveryStatus.translog().numberOfOperations();
116-
117-
// start the shard if the gateway has not started it already
118-
if (indexShard.state() != IndexShardState.STARTED) {
119-
indexShard.start();
115+
StopWatch throttlingWaitTime = new StopWatch().start();
116+
// we know we are on a thread, we can spin till we can engage in recovery
117+
while (!recoveryThrottler.tryRecovery(shardId, "gateway")) {
118+
try {
119+
Thread.sleep(recoveryThrottler.throttleInterval().millis());
120+
} catch (InterruptedException e) {
121+
if (indexShard.ignoreRecoveryAttempt()) {
122+
throw new IgnoreGatewayRecoveryException(shardId, "Interrupted while waiting for recovery, but we should ignore ...");
123+
}
124+
// we got interrupted, mark it as failed
125+
throw new IndexShardGatewayRecoveryException(shardId, "Interrupted while waiting to recovery", e);
126+
}
120127
}
121-
stopWatch.stop();
122-
if (logger.isDebugEnabled()) {
123-
StringBuilder sb = new StringBuilder();
124-
sb.append("Recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("]\n");
125-
sb.append(" Index : number_of_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(recoveryStatus.index().totalSize()).append("]\n");
126-
sb.append(" Translog : translog_id [").append(recoveryStatus.translog().translogId()).append("], number_of_operations [").append(recoveryStatus.translog().numberOfOperations()).append("] with total_size[").append(recoveryStatus.translog().totalSize()).append("]");
127-
logger.debug(sb.toString());
128+
throttlingWaitTime.stop();
129+
130+
try {
131+
logger.debug("Starting recovery from {}", shardGateway);
132+
StopWatch stopWatch = new StopWatch().start();
133+
IndexShardGateway.RecoveryStatus recoveryStatus = shardGateway.recover();
134+
135+
lastIndexVersion = recoveryStatus.index().version();
136+
lastTranslogId = recoveryStatus.translog().translogId();
137+
lastTranslogSize = recoveryStatus.translog().numberOfOperations();
138+
139+
// start the shard if the gateway has not started it already
140+
if (indexShard.state() != IndexShardState.STARTED) {
141+
indexShard.start();
142+
}
143+
stopWatch.stop();
144+
if (logger.isDebugEnabled()) {
145+
StringBuilder sb = new StringBuilder();
146+
sb.append("Recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n");
147+
sb.append(" Index : number_of_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(recoveryStatus.index().totalSize()).append("], throttling_wait [").append(recoveryStatus.index().throttlingWaitTime()).append("]\n");
148+
sb.append(" Translog : translog_id [").append(recoveryStatus.translog().translogId()).append("], number_of_operations [").append(recoveryStatus.translog().numberOfOperations()).append("] with total_size[").append(recoveryStatus.translog().totalSize()).append("]");
149+
logger.debug(sb.toString());
150+
}
151+
// refresh the shard
152+
indexShard.refresh(new Engine.Refresh(false));
153+
scheduleSnapshotIfNeeded();
154+
} finally {
155+
recoveryThrottler.recoveryDone(shardId, "gateway");
128156
}
129-
// refresh the shard
130-
indexShard.refresh(new Engine.Refresh(false));
131-
scheduleSnapshotIfNeeded();
132157
} else {
133158
throw new IgnoreGatewayRecoveryException(shardId, "Already recovered");
134159
}

modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/fs/FsIndexShardGateway.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.index.shard.service.InternalIndexShard;
3333
import org.elasticsearch.index.store.Store;
3434
import org.elasticsearch.index.translog.Translog;
35+
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
3536
import org.elasticsearch.threadpool.ThreadPool;
3637
import org.elasticsearch.util.SizeUnit;
3738
import org.elasticsearch.util.SizeValue;
@@ -48,6 +49,7 @@
4849
import java.io.RandomAccessFile;
4950
import java.util.ArrayList;
5051
import java.util.concurrent.CountDownLatch;
52+
import java.util.concurrent.atomic.AtomicLong;
5153
import java.util.concurrent.atomic.AtomicReference;
5254

5355
import static org.elasticsearch.index.translog.TranslogStreams.*;
@@ -64,6 +66,8 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
6466

6567
private final ThreadPool threadPool;
6668

69+
private final RecoveryThrottler recoveryThrottler;
70+
6771
private final Store store;
6872

6973
private final File location;
@@ -72,11 +76,14 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
7276

7377
private final File locationTranslog;
7478

75-
@Inject public FsIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, FsIndexGateway fsIndexGateway, IndexShard indexShard, Store store) {
79+
@Inject public FsIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, FsIndexGateway fsIndexGateway,
80+
IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) {
7681
super(shardId, indexSettings);
7782
this.threadPool = threadPool;
7883
this.indexShard = (InternalIndexShard) indexShard;
7984
this.store = store;
85+
this.recoveryThrottler = recoveryThrottler;
86+
8087
this.location = new File(fsIndexGateway.indexGatewayHome(), Integer.toString(shardId.id()));
8188
this.locationIndex = new File(location, "index");
8289
this.locationTranslog = new File(location, "translog");
@@ -157,10 +164,11 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
157164
}
158165
threadPool.execute(new Runnable() {
159166
@Override public void run() {
167+
File copyTo = new File(locationIndex, fileName);
160168
try {
161-
copyFromDirectory(snapshotIndexCommit.getDirectory(), fileName, new File(locationIndex, fileName));
169+
copyFromDirectory(snapshotIndexCommit.getDirectory(), fileName, copyTo);
162170
} catch (Exception e) {
163-
lastException.set(e);
171+
lastException.set(new IndexShardGatewaySnapshotFailedException(shardId, "Failed to copy to [" + copyTo + "], from dir [" + snapshotIndexCommit.getDirectory() + "] and file [" + fileName + "]", e));
164172
} finally {
165173
latch.countDown();
166174
}
@@ -200,7 +208,9 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
200208
translogTime = System.currentTimeMillis() - time;
201209
} catch (Exception e) {
202210
try {
203-
translogRaf.close();
211+
if (translogRaf != null) {
212+
translogRaf.close();
213+
}
204214
} catch (IOException e1) {
205215
// ignore
206216
}
@@ -303,15 +313,22 @@ private RecoveryStatus.Index recoverIndex() throws IndexShardGatewayRecoveryExce
303313
File[] files = locationIndex.listFiles();
304314
final CountDownLatch latch = new CountDownLatch(files.length);
305315
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
316+
final AtomicLong throttlingWaitTime = new AtomicLong();
306317
for (final File file : files) {
307318
threadPool.execute(new Runnable() {
308319
@Override public void run() {
309320
try {
321+
long throttlingStartTime = System.currentTimeMillis();
322+
while (!recoveryThrottler.tryStream(shardId, file.getName())) {
323+
Thread.sleep(recoveryThrottler.throttleInterval().millis());
324+
}
325+
throttlingWaitTime.addAndGet(System.currentTimeMillis() - throttlingStartTime);
310326
copyToDirectory(file, store.directory(), file.getName());
311327
} catch (Exception e) {
312328
logger.debug("Failed to read [" + file + "] into [" + store + "]", e);
313329
lastException.set(e);
314330
} finally {
331+
recoveryThrottler.streamDone(shardId, file.getName());
315332
latch.countDown();
316333
}
317334
}
@@ -339,7 +356,7 @@ private RecoveryStatus.Index recoverIndex() throws IndexShardGatewayRecoveryExce
339356
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e);
340357
}
341358

342-
return new RecoveryStatus.Index(version, files.length, new SizeValue(totalSize, SizeUnit.BYTES));
359+
return new RecoveryStatus.Index(version, files.length, new SizeValue(totalSize, SizeUnit.BYTES), TimeValue.timeValueMillis(throttlingWaitTime.get()));
343360
}
344361

345362
private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException {

modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/none/NoneIndexShardGateway.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.index.shard.service.InternalIndexShard;
2929
import org.elasticsearch.util.SizeUnit;
3030
import org.elasticsearch.util.SizeValue;
31+
import org.elasticsearch.util.TimeValue;
3132
import org.elasticsearch.util.inject.Inject;
3233
import org.elasticsearch.util.settings.Settings;
3334

@@ -46,7 +47,7 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
4647
@Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
4748
// in the none case, we simply start the shard
4849
indexShard.start();
49-
return new RecoveryStatus(new RecoveryStatus.Index(-1, 0, new SizeValue(0, SizeUnit.BYTES)), new RecoveryStatus.Translog(-1, 0, new SizeValue(0, SizeUnit.BYTES)));
50+
return new RecoveryStatus(new RecoveryStatus.Index(-1, 0, new SizeValue(0, SizeUnit.BYTES), TimeValue.timeValueMillis(0)), new RecoveryStatus.Translog(-1, 0, new SizeValue(0, SizeUnit.BYTES)));
5051
}
5152

5253
@Override public SnapshotStatus snapshot(Snapshot snapshot) {

modules/elasticsearch/src/main/java/org/elasticsearch/index/service/IndexService.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@ public interface IndexService extends IndexComponent, Iterable<IndexShard>, Clos
5252

5353
IndexShard createShard(int sShardId) throws ElasticSearchException;
5454

55-
void deleteShard(int shardId) throws ElasticSearchException;
55+
/**
56+
* Cleans the shard locally, does not touch the gateway!.
57+
*/
58+
void cleanShard(int shardId) throws ElasticSearchException;
5659

5760
int numberOfShards();
5861

modules/elasticsearch/src/main/java/org/elasticsearch/index/service/InternalIndexService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
171171
@Override public synchronized void close(boolean delete) {
172172
try {
173173
for (int shardId : shardIds()) {
174-
deleteShard(shardId, delete);
174+
deleteShard(shardId, delete, delete);
175175
}
176176
} finally {
177177
indicesLifecycle.removeListener(cleanCacheOnIndicesLifecycleListener);
@@ -234,11 +234,11 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
234234
return indexShard;
235235
}
236236

237-
@Override public synchronized void deleteShard(int shardId) throws ElasticSearchException {
238-
deleteShard(shardId, true);
237+
@Override public synchronized void cleanShard(int shardId) throws ElasticSearchException {
238+
deleteShard(shardId, true, false);
239239
}
240240

241-
private synchronized void deleteShard(int shardId, boolean delete) throws ElasticSearchException {
241+
private synchronized void deleteShard(int shardId, boolean delete, boolean deleteGateway) throws ElasticSearchException {
242242
Map<Integer, Injector> tmpShardInjectors = newHashMap(shardsInjectors);
243243
Injector shardInjector = tmpShardInjectors.remove(shardId);
244244
if (shardInjector == null) {
@@ -268,7 +268,7 @@ private synchronized void deleteShard(int shardId, boolean delete) throws Elasti
268268
RecoveryAction recoveryAction = shardInjector.getInstance(RecoveryAction.class);
269269
if (recoveryAction != null) recoveryAction.close();
270270

271-
shardInjector.getInstance(IndexShardGatewayService.class).close(delete);
271+
shardInjector.getInstance(IndexShardGatewayService.class).close(deleteGateway);
272272

273273
indexShard.close();
274274

0 commit comments

Comments
 (0)