Skip to content

Commit d3ad55c

Browse files
Reindex negative TimeValue fix
Reindex would use timeValueNanos(System.nanoTime()). The intended use for TimeValue is as a duration, not as absolute time. In particular, this could result in negative TimeValue's, being unsupported in elastic#53913.
1 parent d046489 commit d3ad55c

File tree

3 files changed

+24
-25
lines changed

3 files changed

+24
-25
lines changed

modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java

+10-10
Original file line numberDiff line numberDiff line change
@@ -247,16 +247,16 @@ public void start() {
247247
void onScrollResponse(ScrollableHitSource.AsyncResponse asyncResponse) {
248248
// lastBatchStartTime is essentially unused (see WorkerBulkByScrollTaskState.throttleWaitTime. Leaving it for now, since it seems
249249
// like a bug?
250-
onScrollResponse(new TimeValue(System.nanoTime()), this.lastBatchSize, asyncResponse);
250+
onScrollResponse(System.nanoTime(), this.lastBatchSize, asyncResponse);
251251
}
252252

253253
/**
254254
* Process a scroll response.
255-
* @param lastBatchStartTime the time when the last batch started. Used to calculate the throttling delay.
255+
* @param lastBatchStartTimeNS the time when the last batch started. Used to calculate the throttling delay.
256256
* @param lastBatchSize the size of the last batch. Used to calculate the throttling delay.
257257
* @param asyncResponse the response to process from ScrollableHitSource
258258
*/
259-
void onScrollResponse(TimeValue lastBatchStartTime, int lastBatchSize, ScrollableHitSource.AsyncResponse asyncResponse) {
259+
void onScrollResponse(long lastBatchStartTimeNS, int lastBatchSize, ScrollableHitSource.AsyncResponse asyncResponse) {
260260
ScrollableHitSource.Response response = asyncResponse.response();
261261
logger.debug("[{}]: got scroll response with [{}] hits", task.getId(), response.getHits().size());
262262
if (task.isCancelled()) {
@@ -284,7 +284,7 @@ protected void doRun() throws Exception {
284284
* It is important that the batch start time be calculated from here, scroll response to scroll response. That way the time
285285
* waiting on the scroll doesn't count against this batch in the throttle.
286286
*/
287-
prepareBulkRequest(timeValueNanos(System.nanoTime()), asyncResponse);
287+
prepareBulkRequest(System.nanoTime(), asyncResponse);
288288
}
289289

290290
@Override
@@ -293,15 +293,15 @@ public void onFailure(Exception e) {
293293
}
294294
};
295295
prepareBulkRequestRunnable = (AbstractRunnable) threadPool.getThreadContext().preserveContext(prepareBulkRequestRunnable);
296-
worker.delayPrepareBulkRequest(threadPool, lastBatchStartTime, lastBatchSize, prepareBulkRequestRunnable);
296+
worker.delayPrepareBulkRequest(threadPool, lastBatchStartTimeNS, lastBatchSize, prepareBulkRequestRunnable);
297297
}
298298

299299
/**
300300
* Prepare the bulk request. Called on the generic thread pool after some preflight checks have been done one the SearchResponse and any
301301
* delay has been slept. Uses the generic thread pool because reindex is rare enough not to need its own thread pool and because the
302302
* thread may be blocked by the user script.
303303
*/
304-
void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncResponse asyncResponse) {
304+
void prepareBulkRequest(long thisBatchStartTimeNS, ScrollableHitSource.AsyncResponse asyncResponse) {
305305
ScrollableHitSource.Response response = asyncResponse.response();
306306
logger.debug("[{}]: preparing bulk request", task.getId());
307307
if (task.isCancelled()) {
@@ -327,12 +327,12 @@ void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncR
327327
/*
328328
* If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation.
329329
*/
330-
notifyDone(thisBatchStartTime, asyncResponse, 0);
330+
notifyDone(thisBatchStartTimeNS, asyncResponse, 0);
331331
return;
332332
}
333333
request.timeout(mainRequest.getTimeout());
334334
request.waitForActiveShards(mainRequest.getWaitForActiveShards());
335-
sendBulkRequest(request, () -> notifyDone(thisBatchStartTime, asyncResponse, request.requests().size()));
335+
sendBulkRequest(request, () -> notifyDone(thisBatchStartTimeNS, asyncResponse, request.requests().size()));
336336
}
337337

338338
/**
@@ -418,14 +418,14 @@ void onBulkResponse(BulkResponse response, Runnable onSuccess) {
418418
}
419419
}
420420

421-
void notifyDone(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncResponse asyncResponse, int batchSize) {
421+
void notifyDone(long thisBatchStartTimeNS, ScrollableHitSource.AsyncResponse asyncResponse, int batchSize) {
422422
if (task.isCancelled()) {
423423
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
424424
finishHim(null);
425425
return;
426426
}
427427
this.lastBatchSize = batchSize;
428-
asyncResponse.done(worker.throttleWaitTime(thisBatchStartTime, timeValueNanos(System.nanoTime()), batchSize));
428+
asyncResponse.done(worker.throttleWaitTime(thisBatchStartTimeNS, System.nanoTime(), batchSize));
429429
}
430430

431431
private void recordFailure(Failure failure, List<Failure> failures) {

modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java

+10-11
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,6 @@
111111
import static org.apache.lucene.util.TestUtil.randomSimpleString;
112112
import static org.elasticsearch.action.bulk.BackoffPolicy.constantBackoff;
113113
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
114-
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
115114
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
116115
import static org.hamcrest.Matchers.contains;
117116
import static org.hamcrest.Matchers.containsString;
@@ -255,7 +254,7 @@ public void testScrollResponseSetsTotal() {
255254

256255
long total = randomIntBetween(0, Integer.MAX_VALUE);
257256
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null);
258-
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueSeconds(0), 0, response);
257+
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), 0, 0, response);
259258
assertEquals(total, testTask.getStatus().getTotal());
260259
}
261260

@@ -268,7 +267,7 @@ public void testScrollResponseBatchingBehavior() throws Exception {
268267
Hit hit = new ScrollableHitSource.BasicHit("index", "id", 0);
269268
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null);
270269
DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction();
271-
simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response);
270+
simulateScrollResponse(action, System.nanoTime(), 0, response);
272271

273272
// Use assert busy because the update happens on another thread
274273
final int expectedBatches = batches;
@@ -354,7 +353,7 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String n
354353
}
355354
});
356355
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 0, emptyList(), null);
357-
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 10, response);
356+
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 10, response);
358357
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
359358
assertThat(e.getCause(), instanceOf(EsRejectedExecutionException.class));
360359
assertThat(e.getCause(), hasToString(containsString("test")));
@@ -372,7 +371,7 @@ public void testShardFailuresAbortRequest() throws Exception {
372371
SearchFailure shardFailure = new SearchFailure(new RuntimeException("test"));
373372
ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(false, singletonList(shardFailure), 0,
374373
emptyList(), null);
375-
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse);
374+
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 0, scrollResponse);
376375
BulkByScrollResponse response = listener.get();
377376
assertThat(response.getBulkFailures(), empty());
378377
assertThat(response.getSearchFailures(), contains(shardFailure));
@@ -386,7 +385,7 @@ public void testShardFailuresAbortRequest() throws Exception {
386385
*/
387386
public void testSearchTimeoutsAbortRequest() throws Exception {
388387
ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(true, emptyList(), 0, emptyList(), null);
389-
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse);
388+
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 0, scrollResponse);
390389
BulkByScrollResponse response = listener.get();
391390
assertThat(response.getBulkFailures(), empty());
392391
assertThat(response.getSearchFailures(), empty());
@@ -423,7 +422,7 @@ protected AbstractAsyncBulkByScrollAction.RequestWrapper<?> buildRequest(Hit doc
423422
ScrollableHitSource.BasicHit hit = new ScrollableHitSource.BasicHit("index", "id", 0);
424423
hit.setSource(new BytesArray("{}"), XContentType.JSON);
425424
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null);
426-
simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response);
425+
simulateScrollResponse(action, System.nanoTime(), 0, response);
427426
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
428427
assertThat(e.getCause(), instanceOf(RuntimeException.class));
429428
assertThat(e.getCause().getMessage(), equalTo("surprise"));
@@ -619,7 +618,7 @@ public void testCancelBeforeInitialSearch() throws Exception {
619618
}
620619

621620
public void testCancelBeforeScrollResponse() throws Exception {
622-
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1,
621+
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> simulateScrollResponse(action, System.nanoTime(), 1,
623622
new ScrollableHitSource.Response(false, emptyList(), between(1, 100000), emptyList(), null)));
624623
}
625624

@@ -634,7 +633,7 @@ public void testCancelBeforeOnBulkResponse() throws Exception {
634633
}
635634

636635
public void testCancelBeforeStartNextScroll() throws Exception {
637-
TimeValue now = timeValueNanos(System.nanoTime());
636+
long now = System.nanoTime();
638637
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.notifyDone(now, null, 0));
639638
}
640639

@@ -683,7 +682,7 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String n
683682
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null);
684683
// Use a long delay here so the test will time out if the cancellation doesn't reschedule the throttled task
685684
worker.rethrottle(1);
686-
simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1000, response);
685+
simulateScrollResponse(action, System.nanoTime(), 1000, response);
687686

688687
// Now that we've got our cancel we'll just verify that it all came through all right
689688
assertEquals(reason, listener.get(10, TimeUnit.SECONDS).getReasonCancelled());
@@ -712,7 +711,7 @@ private void cancelTaskCase(Consumer<DummyAsyncBulkByScrollAction> testMe) throw
712711
/**
713712
* Simulate a scroll response by setting the scroll id and firing the onScrollResponse method.
714713
*/
715-
private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, TimeValue lastBatchTime, int lastBatchSize,
714+
private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, long lastBatchTime, int lastBatchSize,
716715
ScrollableHitSource.Response response) {
717716
action.setScroll(scrollId());
718717
action.onScrollResponse(lastBatchTime, lastBatchSize, new ScrollableHitSource.AsyncResponse() {

server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,11 @@ TimeValue throttledUntil() {
182182
* Schedule prepareBulkRequestRunnable to run after some delay. This is where throttling plugs into reindexing so the request can be
183183
* rescheduled over and over again.
184184
*/
185-
public void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue lastBatchStartTime, int lastBatchSize,
185+
public void delayPrepareBulkRequest(ThreadPool threadPool, long lastBatchStartTimeNS, int lastBatchSize,
186186
AbstractRunnable prepareBulkRequestRunnable) {
187187
// Synchronize so we are less likely to schedule the same request twice.
188188
synchronized (delayedPrepareBulkRequestReference) {
189-
TimeValue delay = throttleWaitTime(lastBatchStartTime, timeValueNanos(System.nanoTime()), lastBatchSize);
189+
TimeValue delay = throttleWaitTime(lastBatchStartTimeNS, System.nanoTime(), lastBatchSize);
190190
logger.debug("[{}]: preparing bulk request for [{}]", task.getId(), delay);
191191
try {
192192
delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(),
@@ -197,8 +197,8 @@ public void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue lastBatchSt
197197
}
198198
}
199199

200-
public TimeValue throttleWaitTime(TimeValue lastBatchStartTime, TimeValue now, int lastBatchSize) {
201-
long earliestNextBatchStartTime = now.nanos() + (long) perfectlyThrottledBatchTime(lastBatchSize);
200+
public TimeValue throttleWaitTime(long lastBatchStartTimeNS, long nowNS, int lastBatchSize) {
201+
long earliestNextBatchStartTime = nowNS + (long) perfectlyThrottledBatchTime(lastBatchSize);
202202
long waitTime = min(MAX_THROTTLE_WAIT_TIME.nanos(), max(0, earliestNextBatchStartTime - System.nanoTime()));
203203
return timeValueNanos(waitTime);
204204
}

0 commit comments

Comments
 (0)