Skip to content

Commit 34d4a03

Browse files
committed
#6018: buffer up bytes before calling RateLimiter
1 parent 204e165 commit 34d4a03

File tree

2 files changed

+21
-6
lines changed

2 files changed

+21
-6
lines changed

src/main/java/org/apache/lucene/store/RateLimitedFSDirectory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ static final class RateLimitedIndexOutput extends BufferedIndexOutput {
8181
private final BufferedIndexOutput bufferedDelegate;
8282
private final RateLimiter rateLimiter;
8383
private final StoreRateLimiting.Listener rateListener;
84+
private long bytesSinceLastRateLimit;
8485

8586
RateLimitedIndexOutput(final RateLimiter rateLimiter, final StoreRateLimiting.Listener rateListener, final IndexOutput delegate) {
8687
super(delegate instanceof BufferedIndexOutput ? ((BufferedIndexOutput) delegate).getBufferSize() : BufferedIndexOutput.DEFAULT_BUFFER_SIZE);
@@ -97,7 +98,11 @@ static final class RateLimitedIndexOutput extends BufferedIndexOutput {
9798

9899
@Override
99100
protected void flushBuffer(byte[] b, int offset, int len) throws IOException {
100-
rateListener.onPause(rateLimiter.pause(len));
101+
bytesSinceLastRateLimit += len;
102+
if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) {
103+
bytesSinceLastRateLimit = 0;
104+
rateListener.onPause(rateLimiter.pause(bytesSinceLastRateLimit));
105+
}
101106
if (bufferedDelegate != null) {
102107
bufferedDelegate.flushBuffer(b, offset, len);
103108
} else {

src/main/java/org/elasticsearch/index/snapshots/blobstore/RateLimitingInputStream.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ public class RateLimitingInputStream extends InputStream {
3535

3636
private final Listener listener;
3737

38+
private long bytesSinceLastRateLimit;
39+
3840
public interface Listener {
3941
void onPause(long nanos);
4042
}
@@ -45,13 +47,21 @@ public RateLimitingInputStream(InputStream delegate, RateLimiter rateLimiter, Li
4547
this.listener = listener;
4648
}
4749

50+
private void maybePause(int bytes) {
51+
bytesSinceLastRateLimit += bytes;
52+
if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) {
53+
long pause = rateLimiter.pause(bytesSinceLastRateLimit);
54+
bytesSinceLastRateLimit = 0;
55+
if (pause > 0) {
56+
listener.onPause(pause);
57+
}
58+
}
59+
}
60+
4861
@Override
4962
public int read() throws IOException {
5063
int b = delegate.read();
51-
long pause = rateLimiter.pause(1);
52-
if (pause > 0) {
53-
listener.onPause(pause);
54-
}
64+
maybePause(1);
5565
return b;
5666
}
5767

@@ -64,7 +74,7 @@ public int read(byte[] b) throws IOException {
6474
public int read(byte[] b, int off, int len) throws IOException {
6575
int n = delegate.read(b, off, len);
6676
if (n > 0) {
67-
listener.onPause(rateLimiter.pause(n));
77+
maybePause(n);
6878
}
6979
return n;
7080
}

0 commit comments

Comments
 (0)