Skip to content

Commit 45dc3ef

Browse files
committed
[ENGINE] Remove full flush / FlushType.NEW_WRITER
The `full` option and `FlushType.NEW_WRITER` only exists to allow realtime changes to two settings (`index.codec` and `index.concurrency`). Those settings are very expert and don't really need to be updateable in realtime. Conflicts: src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java src/main/java/org/elasticsearch/index/shard/IndexShard.java src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java
1 parent aaa040a commit 45dc3ef

File tree

16 files changed

+48
-155
lines changed

16 files changed

+48
-155
lines changed

docs/reference/indices/flush.asciidoc

-6
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,6 @@ flush can be executed if another flush operation is already executing.
2525
The default is `false` and will cause an exception to be thrown on
2626
the shard level if another flush operation is already running.
2727

28-
`full`:: If set to `true` a new index writer is created and settings that have
29-
been changed related to the index writer will be refreshed. Note: if a full flush
30-
is required for a setting to take effect this will be part of the settings update
31-
process and it not required to be executed by the user.
32-
(This setting can be considered as internal)
33-
3428
`force`:: Whether a flush should be forced even if it is not necessarily needed ie.
3529
if no changes will be committed to the index. This is useful if transaction log IDs
3630
should be incremented even if no uncommitted changes are present.

rest-api-spec/api/indices.flush.json

-4
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,6 @@
1616
"type" : "boolean",
1717
"description" : "Whether a flush should be forced even if it is not necessarily needed ie. if no changes will be committed to the index. This is useful if transaction log IDs should be incremented even if no uncommitted changes are present. (This setting can be considered as internal)"
1818
},
19-
"full": {
20-
"type" : "boolean",
21-
"description" : "If set to true a new index writer is created and settings that have been changed related to the index writer will be refreshed. Note: if a full flush is required for a setting to take effect this will be part of the settings update process and it not required to be executed by the user. (This setting can be considered as internal)"
22-
},
2319
"wait_if_ongoing": {
2420
"type" : "boolean",
2521
"description" : "If set to true the flush operation will block until the flush can be executed if another flush operation is already executing. The default is false and will cause an exception to be thrown on the shard level if another flush operation is already running."

src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java

-18
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
public class FlushRequest extends BroadcastOperationRequest<FlushRequest> {
4242

4343
private boolean force = false;
44-
private boolean full = false;
4544
private boolean waitIfOngoing = false;
4645

4746
FlushRequest() {
@@ -63,21 +62,6 @@ public FlushRequest(String... indices) {
6362
super(indices);
6463
}
6564

66-
/**
67-
* Should a "full" flush be performed.
68-
*/
69-
public boolean full() {
70-
return this.full;
71-
}
72-
73-
/**
74-
* Should a "full" flush be performed.
75-
*/
76-
public FlushRequest full(boolean full) {
77-
this.full = full;
78-
return this;
79-
}
80-
8165
/**
8266
* Returns <tt>true</tt> iff a flush should block
8367
* if a another flush operation is already running. Otherwise <tt>false</tt>
@@ -113,7 +97,6 @@ public FlushRequest force(boolean force) {
11397
@Override
11498
public void writeTo(StreamOutput out) throws IOException {
11599
super.writeTo(out);
116-
out.writeBoolean(full);
117100
out.writeBoolean(force);
118101
if (out.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) {
119102
out.writeBoolean(waitIfOngoing);
@@ -123,7 +106,6 @@ public void writeTo(StreamOutput out) throws IOException {
123106
@Override
124107
public void readFrom(StreamInput in) throws IOException {
125108
super.readFrom(in);
126-
full = in.readBoolean();
127109
force = in.readBoolean();
128110
if (in.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) {
129111
waitIfOngoing = in.readBoolean();

src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequestBuilder.java

-5
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,6 @@ public FlushRequestBuilder(IndicesAdminClient indicesClient) {
3232
super(indicesClient, new FlushRequest());
3333
}
3434

35-
public FlushRequestBuilder setFull(boolean full) {
36-
request.full(full);
37-
return this;
38-
}
39-
4035
public FlushRequestBuilder setForce(boolean force) {
4136
request.force(force);
4237
return this;

src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java

+2-8
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
*/
3333
class ShardFlushRequest extends BroadcastShardOperationRequest {
3434

35-
private boolean full;
3635
private boolean force;
3736
private boolean waitIfOngoing;
3837

@@ -41,15 +40,10 @@ class ShardFlushRequest extends BroadcastShardOperationRequest {
4140

4241
ShardFlushRequest(ShardId shardId, FlushRequest request) {
4342
super(shardId, request);
44-
this.full = request.full();
4543
this.force = request.force();
4644
this.waitIfOngoing = request.waitIfOngoing();
4745
}
4846

49-
public boolean full() {
50-
return this.full;
51-
}
52-
5347
public boolean force() {
5448
return this.force;
5549
}
@@ -61,7 +55,7 @@ public boolean waitIfOngoing() {
6155
@Override
6256
public void readFrom(StreamInput in) throws IOException {
6357
super.readFrom(in);
64-
full = in.readBoolean();
58+
in.readBoolean();
6559
force = in.readBoolean();
6660
if (in.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) {
6761
waitIfOngoing = in.readBoolean();
@@ -73,7 +67,7 @@ public void readFrom(StreamInput in) throws IOException {
7367
@Override
7468
public void writeTo(StreamOutput out) throws IOException {
7569
super.writeTo(out);
76-
out.writeBoolean(full);
70+
out.writeBoolean(false);
7771
out.writeBoolean(force);
7872
if (out.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) {
7973
out.writeBoolean(waitIfOngoing);

src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,7 @@ protected ShardFlushResponse newShardResponse() {
106106
@Override
107107
protected ShardFlushResponse shardOperation(ShardFlushRequest request) throws ElasticsearchException {
108108
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id());
109-
110-
indexShard.flush(new FlushRequest().force(request.force()).full(request.full()).waitIfOngoing(request.waitIfOngoing()));
109+
indexShard.flush(new FlushRequest().force(request.force()).waitIfOngoing(request.waitIfOngoing()));
111110
return new ShardFlushResponse(request.shardId());
112111
}
113112

src/main/java/org/elasticsearch/index/engine/Engine.java

-4
Original file line numberDiff line numberDiff line change
@@ -197,10 +197,6 @@ public void close() throws ElasticsearchException {
197197
}
198198

199199
public static enum FlushType {
200-
/**
201-
* A flush that causes a new writer to be created.
202-
*/
203-
NEW_WRITER,
204200
/**
205201
* A flush that just commits the writer, without cleaning the translog.
206202
*/

src/main/java/org/elasticsearch/index/engine/EngineConfig.java

+5-30
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,11 @@ public final class EngineConfig {
5353
private volatile boolean failOnMergeFailure = true;
5454
private volatile boolean failEngineOnCorruption = true;
5555
private volatile ByteSizeValue indexingBufferSize;
56-
private volatile int indexConcurrency = IndexWriterConfig.DEFAULT_MAX_THREAD_STATES;
56+
private final int indexConcurrency;
5757
private volatile boolean compoundOnFlush = true;
5858
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
5959
private volatile boolean enableGcDeletes = true;
60-
private volatile String codecName = DEFAULT_CODEC_NAME;
60+
private final String codecName;
6161
private final boolean optimizeAutoGenerateId;
6262
private volatile boolean checksumOnMerge;
6363
private final ThreadPool threadPool;
@@ -78,7 +78,7 @@ public final class EngineConfig {
7878
/**
7979
* Index setting for index concurrency / number of threadstates in the indexwriter.
8080
* The default is depending on the number of CPUs in the system. We use a 0.65 the number of CPUs or at least {@value org.apache.lucene.index.IndexWriterConfig#DEFAULT_MAX_THREAD_STATES}
81-
* This setting is realtime updateable
81+
* This setting is <b>not</b> realtime updateable
8282
*/
8383
public static final String INDEX_CONCURRENCY_SETTING = "index.index_concurrency";
8484

@@ -119,7 +119,7 @@ public final class EngineConfig {
119119

120120
/**
121121
* Index setting to change the low level lucene codec used for writing new segments.
122-
* This setting is realtime updateable.
122+
* This setting is <b>not</b> realtime updateable.
123123
*/
124124
public static final String INDEX_CODEC_SETTING = "index.codec";
125125

@@ -175,15 +175,6 @@ public void setIndexingBufferSize(ByteSizeValue indexingBufferSize) {
175175
this.indexingBufferSize = indexingBufferSize;
176176
}
177177

178-
/**
179-
* Sets the index concurrency
180-
* @see #getIndexConcurrency()
181-
*/
182-
public void setIndexConcurrency(int indexConcurrency) {
183-
this.indexConcurrency = indexConcurrency;
184-
}
185-
186-
187178
/**
188179
* Enables / disables gc deletes
189180
*
@@ -254,9 +245,7 @@ public boolean isEnableGcDeletes() {
254245
/**
255246
* Returns the {@link Codec} used in the engines {@link org.apache.lucene.index.IndexWriter}
256247
* <p>
257-
* Note: this settings is only read on startup and if a new writer is created. This happens either due to a
258-
* settings change in the {@link org.elasticsearch.index.engine.EngineConfig.EngineSettingsListener} or if
259-
* {@link Engine#flush(org.elasticsearch.index.engine.Engine.FlushType, boolean, boolean)} with {@link org.elasticsearch.index.engine.Engine.FlushType#NEW_WRITER} is executed.
248+
* Note: this settings is only read on startup.
260249
* </p>
261250
*/
262251
public Codec getCodec() {
@@ -425,20 +414,6 @@ public final void onRefreshSettings(Settings settings) {
425414
config.failEngineOnCorruption = failEngineOnCorruption;
426415
change = true;
427416
}
428-
int indexConcurrency = settings.getAsInt(EngineConfig.INDEX_CONCURRENCY_SETTING, config.getIndexConcurrency());
429-
if (indexConcurrency != config.getIndexConcurrency()) {
430-
logger.info("updating index.index_concurrency from [{}] to [{}]", config.getIndexConcurrency(), indexConcurrency);
431-
config.setIndexConcurrency(indexConcurrency);
432-
// we have to flush in this case, since it only applies on a new index writer
433-
change = true;
434-
}
435-
final String codecName = settings.get(EngineConfig.INDEX_CODEC_SETTING, config.codecName);
436-
if (!codecName.equals(config.codecName)) {
437-
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_CODEC_SETTING, config.codecName, codecName);
438-
config.codecName = codecName;
439-
// we want to flush in this case, so the new codec will be reflected right away...
440-
change = true;
441-
}
442417
final boolean failOnMergeFailure = settings.getAsBoolean(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, config.isFailOnMergeFailure());
443418
if (failOnMergeFailure != config.isFailOnMergeFailure()) {
444419
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, config.isFailOnMergeFailure(), failOnMergeFailure);

src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java

+2-48
Original file line numberDiff line numberDiff line change
@@ -279,11 +279,6 @@ private void updateSettings() {
279279
final LiveIndexWriterConfig iwc = indexWriter.getConfig();
280280
iwc.setUseCompoundFile(engineConfig.isCompoundOnFlush());
281281
iwc.setCheckIntegrityAtMerge(engineConfig.isChecksumOnMerge());
282-
final boolean concurrencyNeedsUpdate = iwc.getMaxThreadStates() != engineConfig.getIndexConcurrency();
283-
final boolean codecNeedsUpdate = iwc.getCodec().equals(engineConfig.getCodec()) == false;
284-
if (codecNeedsUpdate || concurrencyNeedsUpdate) {
285-
flush(FlushType.NEW_WRITER, false, false);
286-
}
287282
}
288283
}
289284

@@ -761,7 +756,7 @@ public void refresh(String source, boolean force) throws EngineException {
761756
@Override
762757
public void flush(FlushType type, boolean force, boolean waitIfOngoing) throws EngineException {
763758
ensureOpen();
764-
if (type == FlushType.NEW_WRITER || type == FlushType.COMMIT_TRANSLOG) {
759+
if (type == FlushType.COMMIT_TRANSLOG) {
765760
// check outside the lock as well so we can check without blocking on the write lock
766761
if (onGoingRecoveries.get() > 0) {
767762
throw new FlushNotAllowedEngineException(shardId, "recovery is in progress, flush [" + type + "] is not allowed");
@@ -775,48 +770,7 @@ public void flush(FlushType type, boolean force, boolean waitIfOngoing) throws E
775770

776771
flushLock.lock();
777772
try {
778-
if (type == FlushType.NEW_WRITER) {
779-
try (InternalLock _ = writeLock.acquire()) {
780-
if (onGoingRecoveries.get() > 0) {
781-
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
782-
}
783-
// disable refreshing, not dirty
784-
dirty = false;
785-
try {
786-
{ // commit and close the current writer - we write the current tanslog ID just in case
787-
final long translogId = translog.currentId();
788-
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
789-
indexWriter.commit();
790-
indexWriter.rollback();
791-
}
792-
indexWriter = createWriter();
793-
// commit on a just opened writer will commit even if there are no changes done to it
794-
// we rely on that for the commit data translog id key
795-
if (flushNeeded || force) {
796-
flushNeeded = false;
797-
long translogId = translogIdGenerator.incrementAndGet();
798-
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
799-
indexWriter.commit();
800-
translog.newTranslog(translogId);
801-
}
802-
803-
SearcherManager current = this.searcherManager;
804-
this.searcherManager = buildSearchManager(indexWriter);
805-
versionMap.setManager(searcherManager);
806-
807-
try {
808-
IOUtils.close(current);
809-
} catch (Throwable t) {
810-
logger.warn("Failed to close current SearcherManager", t);
811-
}
812-
813-
maybePruneDeletedTombstones();
814-
815-
} catch (Throwable t) {
816-
throw new FlushFailedEngineException(shardId, t);
817-
}
818-
}
819-
} else if (type == FlushType.COMMIT_TRANSLOG) {
773+
if (type == FlushType.COMMIT_TRANSLOG) {
820774
try (InternalLock _ = readLock.acquire()) {
821775
final IndexWriter indexWriter = currentIndexWriter();
822776
if (onGoingRecoveries.get() > 0) {

src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java

-2
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,8 @@ public IndexDynamicSettingsModule() {
8181
indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_DOCS, Validator.POSITIVE_INTEGER);
8282
indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_MERGE_FACTOR, Validator.INTEGER_GTE_2);
8383
indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_COMPOUND_FORMAT);
84-
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_CONCURRENCY_SETTING, Validator.NON_NEGATIVE_INTEGER);
8584
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_COMPOUND_ON_FLUSH, Validator.BOOLEAN);
8685
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME);
87-
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_CODEC_SETTING);
8886
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING);
8987
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING);
9088
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_CHECKSUM_ON_MERGE, Validator.BOOLEAN);

src/main/java/org/elasticsearch/index/shard/IndexShard.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,7 @@ public void flush(FlushRequest request) throws ElasticsearchException {
610610
logger.trace("flush with {}", request);
611611
}
612612
long time = System.nanoTime();
613-
engineSafe().flush(request.full() ? Engine.FlushType.NEW_WRITER : Engine.FlushType.COMMIT_TRANSLOG, request.force(), request.waitIfOngoing());
613+
engineSafe().flush(Engine.FlushType.COMMIT_TRANSLOG, request.force(), request.waitIfOngoing());
614614
flushMetric.inc(System.nanoTime() - time);
615615
}
616616

src/main/java/org/elasticsearch/rest/action/admin/indices/flush/RestFlushAction.java

-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ public void handleRequest(final RestRequest request, final RestChannel channel,
5555
FlushRequest flushRequest = new FlushRequest(Strings.splitStringByCommaToArray(request.param("index")));
5656
flushRequest.listenerThreaded(false);
5757
flushRequest.indicesOptions(IndicesOptions.fromRequest(request, flushRequest.indicesOptions()));
58-
flushRequest.full(request.paramAsBoolean("full", flushRequest.full()));
5958
flushRequest.force(request.paramAsBoolean("force", flushRequest.force()));
6059
flushRequest.waitIfOngoing(request.paramAsBoolean("wait_if_ongoing", flushRequest.waitIfOngoing()));
6160
client.admin().indices().flush(flushRequest, new RestBuilderListener<FlushResponse>(channel) {

src/test/java/org/elasticsearch/benchmark/scripts/expression/ScriptComparisonBenchmark.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ static void indexDocs(Client client, int numDocs) {
137137
}
138138
bulkRequest.execute().actionGet();
139139
client.admin().indices().prepareRefresh("test").execute().actionGet();
140-
client.admin().indices().prepareFlush("test").setFull(true).execute().actionGet();
140+
client.admin().indices().prepareFlush("test").execute().actionGet();
141141
System.out.println("done");
142142
}
143143

src/test/java/org/elasticsearch/benchmark/scripts/score/BasicScriptBenchmark.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ static void indexData(long numDocs, Client client, boolean randomizeTerms) throw
244244
}
245245
bulkRequest.execute().actionGet();
246246
client.admin().indices().prepareRefresh("test").execute().actionGet();
247-
client.admin().indices().prepareFlush("test").setFull(true).execute().actionGet();
247+
client.admin().indices().prepareFlush("test").execute().actionGet();
248248
System.out.println("Done indexing " + numDocs + " documents");
249249

250250
}

0 commit comments

Comments
 (0)