Skip to content

Commit 21fd158

Browse files
committed
Core: Remove ability to run optimize and upgrade async
This has been very trappy. Rather than continue to allow buggy behavior of having upgrade/optimize requests sidestep the single shard per node limits optimize is supposed to be subject to, this removes the ability to run the upgrade/optimize async. closes #9638
1 parent a536f10 commit 21fd158

File tree

19 files changed

+41
-145
lines changed

19 files changed

+41
-145
lines changed

docs/reference/indices/optimize.asciidoc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ operations (and relates to the number of segments a Lucene index holds
77
within each shard). The optimize operation allows to reduce the number
88
of segments by merging them.
99

10+
This call will block until the optimize is complete. If the http connection
11+
is lost, the request will continue in the background, and
12+
any new requests will block until the previous optimize is complete.
13+
1014
[source,js]
1115
--------------------------------------------------
1216
$ curl -XPOST 'http://localhost:9200/twitter/_optimize'
@@ -33,10 +37,6 @@ deletes. Defaults to `false`. Note that this won't override the
3337
`flush`:: Should a flush be performed after the optimize. Defaults to
3438
`true`.
3539

36-
`wait_for_merge`:: Should the request wait for the merge to end. Defaults
37-
to `true`. Note, a merge can potentially be a very heavy operation, so
38-
it might make sense to run it set to `false`.
39-
4040
[float]
4141
[[optimize-multi-index]]
4242
=== Multi Index

docs/reference/indices/upgrade.asciidoc

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,9 @@ NOTE: Upgrading is an I/O intensive operation, and is limited to processing a
1717
single shard per node at a time. It also is not allowed to run at the same
1818
time as optimize.
1919

20-
[float]
21-
[[upgrade-parameters]]
22-
==== Request Parameters
23-
24-
The `upgrade` API accepts the following request parameters:
25-
26-
[horizontal]
27-
`wait_for_completion`:: Should the request wait for the upgrade to complete. Defaults
28-
to `false`.
20+
This call will block until the upgrade is complete. If the http connection
21+
is lost, the request will continue in the background, and
22+
any new requests will block until the previous upgrade is complete.
2923

3024
[float]
3125
=== Check upgrade status

src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequest.java

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,6 @@
3030
* A request to optimize one or more indices. In order to optimize on all the indices, pass an empty array or
3131
* <tt>null</tt> for the indices.
3232
* <p/>
33-
* <p>{@link #waitForMerge(boolean)} allows to control if the call will block until the optimize completes and
34-
* defaults to <tt>true</tt>.
35-
* <p/>
3633
* <p>{@link #maxNumSegments(int)} allows to control the number of segments to optimize down to. By default, will
3734
* cause the optimize process to optimize down to half the configured number of segments.
3835
*
@@ -43,14 +40,12 @@
4340
public class OptimizeRequest extends BroadcastOperationRequest<OptimizeRequest> {
4441

4542
public static final class Defaults {
46-
public static final boolean WAIT_FOR_MERGE = true;
4743
public static final int MAX_NUM_SEGMENTS = -1;
4844
public static final boolean ONLY_EXPUNGE_DELETES = false;
4945
public static final boolean FLUSH = true;
5046
public static final boolean UPGRADE = false;
5147
}
52-
53-
private boolean waitForMerge = Defaults.WAIT_FOR_MERGE;
48+
5449
private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS;
5550
private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
5651
private boolean flush = Defaults.FLUSH;
@@ -69,21 +64,6 @@ public OptimizeRequest() {
6964

7065
}
7166

72-
/**
73-
* Should the call block until the optimize completes. Defaults to <tt>true</tt>.
74-
*/
75-
public boolean waitForMerge() {
76-
return waitForMerge;
77-
}
78-
79-
/**
80-
* Should the call block until the optimize completes. Defaults to <tt>true</tt>.
81-
*/
82-
public OptimizeRequest waitForMerge(boolean waitForMerge) {
83-
this.waitForMerge = waitForMerge;
84-
return this;
85-
}
86-
8767
/**
8868
* Will optimize the index down to <= maxNumSegments. By default, will cause the optimize
8969
* process to optimize down to half the configured number of segments.
@@ -168,7 +148,6 @@ public OptimizeRequest upgrade(boolean upgrade) {
168148

169149
public void readFrom(StreamInput in) throws IOException {
170150
super.readFrom(in);
171-
waitForMerge = in.readBoolean();
172151
maxNumSegments = in.readInt();
173152
onlyExpungeDeletes = in.readBoolean();
174153
flush = in.readBoolean();
@@ -179,7 +158,6 @@ public void readFrom(StreamInput in) throws IOException {
179158

180159
public void writeTo(StreamOutput out) throws IOException {
181160
super.writeTo(out);
182-
out.writeBoolean(waitForMerge);
183161
out.writeInt(maxNumSegments);
184162
out.writeBoolean(onlyExpungeDeletes);
185163
out.writeBoolean(flush);
@@ -191,8 +169,7 @@ public void writeTo(StreamOutput out) throws IOException {
191169
@Override
192170
public String toString() {
193171
return "OptimizeRequest{" +
194-
"waitForMerge=" + waitForMerge +
195-
", maxNumSegments=" + maxNumSegments +
172+
"maxNumSegments=" + maxNumSegments +
196173
", onlyExpungeDeletes=" + onlyExpungeDeletes +
197174
", flush=" + flush +
198175
", upgrade=" + upgrade +

src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequestBuilder.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,6 @@
2727
* A request to optimize one or more indices. In order to optimize on all the indices, pass an empty array or
2828
* <tt>null</tt> for the indices.
2929
* <p/>
30-
* <p>{@link #setWaitForMerge(boolean)} allows to control if the call will block until the optimize completes and
31-
* defaults to <tt>true</tt>.
32-
* <p/>
3330
* <p>{@link #setMaxNumSegments(int)} allows to control the number of segments to optimize down to. By default, will
3431
* cause the optimize process to optimize down to half the configured number of segments.
3532
*/
@@ -39,14 +36,6 @@ public OptimizeRequestBuilder(IndicesAdminClient indicesClient) {
3936
super(indicesClient, new OptimizeRequest());
4037
}
4138

42-
/**
43-
* Should the call block until the optimize completes. Defaults to <tt>true</tt>.
44-
*/
45-
public OptimizeRequestBuilder setWaitForMerge(boolean waitForMerge) {
46-
request.waitForMerge(waitForMerge);
47-
return this;
48-
}
49-
5039
/**
5140
* Will optimize the index down to <= maxNumSegments. By default, will cause the optimize
5241
* process to optimize down to half the configured number of segments.

src/main/java/org/elasticsearch/action/admin/indices/optimize/ShardOptimizeRequest.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@
3232
*
3333
*/
3434
class ShardOptimizeRequest extends BroadcastShardOperationRequest {
35-
36-
private boolean waitForMerge = OptimizeRequest.Defaults.WAIT_FOR_MERGE;
35+
3736
private int maxNumSegments = OptimizeRequest.Defaults.MAX_NUM_SEGMENTS;
3837
private boolean onlyExpungeDeletes = OptimizeRequest.Defaults.ONLY_EXPUNGE_DELETES;
3938
private boolean flush = OptimizeRequest.Defaults.FLUSH;
@@ -44,17 +43,12 @@ class ShardOptimizeRequest extends BroadcastShardOperationRequest {
4443

4544
ShardOptimizeRequest(ShardId shardId, OptimizeRequest request) {
4645
super(shardId, request);
47-
waitForMerge = request.waitForMerge();
4846
maxNumSegments = request.maxNumSegments();
4947
onlyExpungeDeletes = request.onlyExpungeDeletes();
5048
flush = request.flush();
5149
upgrade = request.force() || request.upgrade();
5250
}
5351

54-
boolean waitForMerge() {
55-
return waitForMerge;
56-
}
57-
5852
int maxNumSegments() {
5953
return maxNumSegments;
6054
}
@@ -74,7 +68,9 @@ public boolean upgrade() {
7468
@Override
7569
public void readFrom(StreamInput in) throws IOException {
7670
super.readFrom(in);
77-
waitForMerge = in.readBoolean();
71+
if (in.getVersion().before(Version.V_1_5_0)) {
72+
in.readBoolean(); // backcompat for waitForMerges, no longer exists
73+
}
7874
maxNumSegments = in.readInt();
7975
onlyExpungeDeletes = in.readBoolean();
8076
flush = in.readBoolean();
@@ -86,7 +82,9 @@ public void readFrom(StreamInput in) throws IOException {
8682
@Override
8783
public void writeTo(StreamOutput out) throws IOException {
8884
super.writeTo(out);
89-
out.writeBoolean(waitForMerge);
85+
if (out.getVersion().before(Version.V_1_5_0)) {
86+
out.writeBoolean(true);
87+
}
9088
out.writeInt(maxNumSegments);
9189
out.writeBoolean(onlyExpungeDeletes);
9290
out.writeBoolean(flush);

src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ protected ShardOptimizeResponse newShardResponse() {
108108
protected ShardOptimizeResponse shardOperation(ShardOptimizeRequest request) throws ElasticsearchException {
109109
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id());
110110
indexShard.optimize(new OptimizeRequest().flush(request.flush()).onlyExpungeDeletes(request.onlyExpungeDeletes())
111-
.maxNumSegments(request.maxNumSegments()).waitForMerge(request.waitForMerge()).upgrade(request.upgrade()));
111+
.maxNumSegments(request.maxNumSegments()).upgrade(request.upgrade()));
112112
return new ShardOptimizeResponse(request.shardId());
113113
}
114114

src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,12 +227,12 @@ public Condition newCondition() {
227227
/**
228228
* Optimizes to 1 segment
229229
*/
230-
abstract void forceMerge(boolean flush, boolean waitForMerge);
230+
abstract void forceMerge(boolean flush);
231231

232232
/**
233233
* Triggers a forced merge on this engine
234234
*/
235-
public abstract void forceMerge(boolean flush, boolean waitForMerge, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException;
235+
public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException;
236236

237237
/**
238238
* Snapshots the index and returns a handle to it. Will always try and "commit" the

src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -843,12 +843,12 @@ private void waitForMerges(boolean flushAfter, boolean upgrade) {
843843
}
844844

845845
@Override
846-
public void forceMerge(boolean flush, boolean waitForMerge) {
847-
forceMerge(flush, waitForMerge, 1, false, false);
846+
public void forceMerge(boolean flush) {
847+
forceMerge(flush, 1, false, false);
848848
}
849849

850850
@Override
851-
public void forceMerge(final boolean flush, boolean waitForMerge, int maxNumSegments, boolean onlyExpungeDeletes, final boolean upgrade) throws EngineException {
851+
public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, final boolean upgrade) throws EngineException {
852852
if (optimizeMutex.compareAndSet(false, true)) {
853853
try (ReleasableLock _ = readLock.acquire()) {
854854
ensureOpen();
@@ -881,23 +881,7 @@ public void forceMerge(final boolean flush, boolean waitForMerge, int maxNumSegm
881881
}
882882
}
883883

884-
// wait for the merges outside of the read lock
885-
if (waitForMerge) {
886-
waitForMerges(flush, upgrade);
887-
} else if (flush || upgrade) {
888-
// we only need to monitor merges for async calls if we are going to flush
889-
engineConfig.getThreadPool().executor(ThreadPool.Names.OPTIMIZE).execute(new AbstractRunnable() {
890-
@Override
891-
public void onFailure(Throwable t) {
892-
logger.error("Exception while waiting for merges asynchronously after optimize", t);
893-
}
894-
895-
@Override
896-
protected void doRun() throws Exception {
897-
waitForMerges(flush, upgrade);
898-
}
899-
});
900-
}
884+
waitForMerges(flush, upgrade);
901885
}
902886

903887

src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -616,8 +616,7 @@ public void optimize(OptimizeRequest optimize) throws ElasticsearchException {
616616
if (logger.isTraceEnabled()) {
617617
logger.trace("optimize with {}", optimize);
618618
}
619-
engine().forceMerge(optimize.flush(), optimize.waitForMerge(), optimize
620-
.maxNumSegments(), optimize.onlyExpungeDeletes(), optimize.upgrade());
619+
engine().forceMerge(optimize.flush(), optimize.maxNumSegments(), optimize.onlyExpungeDeletes(), optimize.upgrade());
621620
}
622621

623622
public SnapshotIndexCommit snapshotIndex() throws EngineException {

src/main/java/org/elasticsearch/rest/action/admin/indices/optimize/RestOptimizeAction.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ public void handleRequest(final RestRequest request, final RestChannel channel,
5555
OptimizeRequest optimizeRequest = new OptimizeRequest(Strings.splitStringByCommaToArray(request.param("index")));
5656
optimizeRequest.listenerThreaded(false);
5757
optimizeRequest.indicesOptions(IndicesOptions.fromRequest(request, optimizeRequest.indicesOptions()));
58-
optimizeRequest.waitForMerge(request.paramAsBoolean("wait_for_merge", optimizeRequest.waitForMerge()));
5958
optimizeRequest.maxNumSegments(request.paramAsInt("max_num_segments", optimizeRequest.maxNumSegments()));
6059
optimizeRequest.onlyExpungeDeletes(request.paramAsBoolean("only_expunge_deletes", optimizeRequest.onlyExpungeDeletes()));
6160
optimizeRequest.flush(request.paramAsBoolean("flush", optimizeRequest.flush()));

src/main/java/org/elasticsearch/rest/action/admin/indices/upgrade/RestUpgradeAction.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ public RestResponse buildResponse(IndicesSegmentResponse response, XContentBuild
9090

9191
void handlePost(RestRequest request, RestChannel channel, Client client) {
9292
OptimizeRequest optimizeReq = new OptimizeRequest(Strings.splitStringByCommaToArray(request.param("index")));
93-
optimizeReq.waitForMerge(request.paramAsBoolean("wait_for_completion", false));
9493
optimizeReq.flush(true);
9594
optimizeReq.upgrade(true);
9695
optimizeReq.maxNumSegments(Integer.MAX_VALUE); // we just want to upgrade the segments, not actually optimize to a single segment

src/test/java/org/elasticsearch/gateway/local/SimpleRecoveryLocalGatewayTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ public void testReusePeerRecovery() throws Exception {
365365
}
366366
logger.info("Running Cluster Health");
367367
ensureGreen();
368-
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).setMaxNumSegments(100).get(); // just wait for merges
368+
client().admin().indices().prepareOptimize("test").setMaxNumSegments(100).get(); // just wait for merges
369369
client().admin().indices().prepareFlush().setWaitIfOngoing(true).setForce(true).get();
370370

371371
logger.info("--> shutting down the nodes");

src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 6 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -385,31 +385,10 @@ public void testSegments() throws Exception {
385385
public void testSegmentsWithMergeFlag() throws Exception {
386386

387387
ConcurrentMergeSchedulerProvider mergeSchedulerProvider = new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS));
388-
final AtomicReference<CountDownLatch> waitTillMerge = new AtomicReference<>();
389-
final AtomicReference<CountDownLatch> waitForMerge = new AtomicReference<>();
390-
mergeSchedulerProvider.addListener(new MergeSchedulerProvider.Listener() {
391-
@Override
392-
public void beforeMerge(OnGoingMerge merge) {
393-
try {
394-
if (waitTillMerge.get() != null) {
395-
waitTillMerge.get().countDown();
396-
}
397-
if (waitForMerge.get() != null) {
398-
waitForMerge.get().await();
399-
}
400-
} catch (InterruptedException e) {
401-
throw ExceptionsHelper.convertToRuntime(e);
402-
}
403-
}
404-
405-
@Override
406-
public void afterMerge(OnGoingMerge merge) {
407-
}
408-
});
409-
410-
final Store store = createStore();
411388
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
389+
final Store store = createStore();
412390
final Engine engine = createEngine(indexSettingsService, store, createTranslog(), mergeSchedulerProvider);
391+
413392
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false);
414393
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
415394
engine.index(index);
@@ -430,23 +409,12 @@ public void afterMerge(OnGoingMerge merge) {
430409
assertThat(segment.getMergeId(), nullValue());
431410
}
432411

433-
waitTillMerge.set(new CountDownLatch(1));
434-
waitForMerge.set(new CountDownLatch(1));
435-
engine.forceMerge(false, false);
436-
waitTillMerge.get().await();
437-
438-
for (Segment segment : engine.segments()) {
439-
assertThat(segment.getMergeId(), notNullValue());
440-
}
441-
442-
waitForMerge.get().countDown();
443-
444412
index = new Engine.Index(null, newUid("4"), doc);
445413
engine.index(index);
446414
engine.flush();
447415
final long gen1 = store.readLastCommittedSegmentsInfo().getGeneration();
448416
// now, optimize and wait for merges, see that we have no merge flag
449-
engine.forceMerge(true, true);
417+
engine.forceMerge(true);
450418

451419
for (Segment segment : engine.segments()) {
452420
assertThat(segment.getMergeId(), nullValue());
@@ -456,25 +424,14 @@ public void afterMerge(OnGoingMerge merge) {
456424

457425
final boolean flush = randomBoolean();
458426
final long gen2 = store.readLastCommittedSegmentsInfo().getGeneration();
459-
engine.forceMerge(flush, false);
460-
waitTillMerge.get().await();
427+
engine.forceMerge(flush);
461428
for (Segment segment : engine.segments()) {
462429
assertThat(segment.getMergeId(), nullValue());
463430
}
464-
waitForMerge.get().countDown();
465431

466432
if (flush) {
467-
awaitBusy(new Predicate<Object>() {
468-
@Override
469-
public boolean apply(Object o) {
470-
try {
471-
// we should have had just 1 merge, so last generation should be exact
472-
return store.readLastCommittedSegmentsInfo().getLastGeneration() == gen2;
473-
} catch (IOException e) {
474-
throw ExceptionsHelper.convertToRuntime(e);
475-
}
476-
}
477-
});
433+
// we should have had just 1 merge, so last generation should be exact
434+
assertEquals(gen2 + 1, store.readLastCommittedSegmentsInfo().getLastGeneration());
478435
}
479436

480437
engine.close();

0 commit comments

Comments
 (0)