Skip to content

Remove full flush / FlushType.NEW_WRITER #9559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions docs/reference/indices/flush.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,6 @@ flush can be executed if another flush operation is already executing.
The default is `false` and will cause an exception to be thrown on
the shard level if another flush operation is already running.

`full`:: If set to `true` a new index writer is created and settings that have
been changed related to the index writer will be refreshed. Note: if a full flush
is required for a setting to take effect this will be part of the settings update
process and it not required to be executed by the user.
(This setting can be considered as internal)

`force`:: Whether a flush should be forced even if it is not necessarily needed ie.
if no changes will be committed to the index. This is useful if transaction log IDs
should be incremented even if no uncommitted changes are present.
Expand Down
4 changes: 0 additions & 4 deletions rest-api-spec/api/indices.flush.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
"type" : "boolean",
"description" : "Whether a flush should be forced even if it is not necessarily needed ie. if no changes will be committed to the index. This is useful if transaction log IDs should be incremented even if no uncommitted changes are present. (This setting can be considered as internal)"
},
"full": {
"type" : "boolean",
"description" : "If set to true a new index writer is created and settings that have been changed related to the index writer will be refreshed. Note: if a full flush is required for a setting to take effect this will be part of the settings update process and it not required to be executed by the user. (This setting can be considered as internal)"
},
"wait_if_ongoing": {
"type" : "boolean",
"description" : "If set to true the flush operation will block until the flush can be executed if another flush operation is already executing. The default is false and will cause an exception to be thrown on the shard level if another flush operation is already running."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
public class FlushRequest extends BroadcastOperationRequest<FlushRequest> {

private boolean force = false;
private boolean full = false;
private boolean waitIfOngoing = false;

FlushRequest() {
Expand All @@ -63,21 +62,6 @@ public FlushRequest(String... indices) {
super(indices);
}

/**
* Should a "full" flush be performed.
*/
public boolean full() {
return this.full;
}

/**
* Should a "full" flush be performed.
*/
public FlushRequest full(boolean full) {
this.full = full;
return this;
}

/**
* Returns <tt>true</tt> iff a flush should block
* if a another flush operation is already running. Otherwise <tt>false</tt>
Expand Down Expand Up @@ -113,15 +97,13 @@ public FlushRequest force(boolean force) {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(full);
out.writeBoolean(force);
out.writeBoolean(waitIfOngoing);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
full = in.readBoolean();
force = in.readBoolean();
waitIfOngoing = in.readBoolean();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ public FlushRequestBuilder(IndicesAdminClient indicesClient) {
super(indicesClient, new FlushRequest());
}

public FlushRequestBuilder setFull(boolean full) {
request.full(full);
return this;
}

public FlushRequestBuilder setForce(boolean force) {
request.force(force);
return this;
Expand Down
4 changes: 0 additions & 4 deletions src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,6 @@ public void close() throws ElasticsearchException {
}

public static enum FlushType {
/**
* A flush that causes a new writer to be created.
*/
NEW_WRITER,
/**
* A flush that just commits the writer, without cleaning the translog.
*/
Expand Down
35 changes: 5 additions & 30 deletions src/main/java/org/elasticsearch/index/engine/EngineConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ public final class EngineConfig {
private volatile boolean failOnMergeFailure = true;
private volatile boolean failEngineOnCorruption = true;
private volatile ByteSizeValue indexingBufferSize;
private volatile int indexConcurrency = IndexWriterConfig.DEFAULT_MAX_THREAD_STATES;
private final int indexConcurrency;
private volatile boolean compoundOnFlush = true;
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
private volatile boolean enableGcDeletes = true;
private volatile String codecName = DEFAULT_CODEC_NAME;
private final String codecName;
private final boolean optimizeAutoGenerateId;
private final ThreadPool threadPool;
private final ShardIndexingService indexingService;
Expand All @@ -77,7 +77,7 @@ public final class EngineConfig {
/**
* Index setting for index concurrency / number of threadstates in the indexwriter.
* The default is depending on the number of CPUs in the system. We use a 0.65 the number of CPUs or at least {@value org.apache.lucene.index.IndexWriterConfig#DEFAULT_MAX_THREAD_STATES}
* This setting is realtime updateable
* This setting is <b>not</b> realtime updateable
*/
public static final String INDEX_CONCURRENCY_SETTING = "index.index_concurrency";

Expand Down Expand Up @@ -118,7 +118,7 @@ public final class EngineConfig {

/**
* Index setting to change the low level lucene codec used for writing new segments.
* This setting is realtime updateable.
* This setting is <b>not</b> realtime updateable.
*/
public static final String INDEX_CODEC_SETTING = "index.codec";

Expand Down Expand Up @@ -166,15 +166,6 @@ public void setIndexingBufferSize(ByteSizeValue indexingBufferSize) {
this.indexingBufferSize = indexingBufferSize;
}

/**
* Sets the index concurrency
* @see #getIndexConcurrency()
*/
public void setIndexConcurrency(int indexConcurrency) {
this.indexConcurrency = indexConcurrency;
}


/**
* Enables / disables gc deletes
*
Expand Down Expand Up @@ -245,9 +236,7 @@ public boolean isEnableGcDeletes() {
/**
* Returns the {@link Codec} used in the engines {@link org.apache.lucene.index.IndexWriter}
* <p>
* Note: this settings is only read on startup and if a new writer is created. This happens either due to a
* settings change in the {@link org.elasticsearch.index.engine.EngineConfig.EngineSettingsListener} or if
* {@link Engine#flush(org.elasticsearch.index.engine.Engine.FlushType, boolean, boolean)} with {@link org.elasticsearch.index.engine.Engine.FlushType#NEW_WRITER} is executed.
* Note: this settings is only read on startup.
* </p>
*/
public Codec getCodec() {
Expand Down Expand Up @@ -412,20 +401,6 @@ public final void onRefreshSettings(Settings settings) {
config.failEngineOnCorruption = failEngineOnCorruption;
change = true;
}
int indexConcurrency = settings.getAsInt(EngineConfig.INDEX_CONCURRENCY_SETTING, config.getIndexConcurrency());
if (indexConcurrency != config.getIndexConcurrency()) {
logger.info("updating index.index_concurrency from [{}] to [{}]", config.getIndexConcurrency(), indexConcurrency);
config.setIndexConcurrency(indexConcurrency);
// we have to flush in this case, since it only applies on a new index writer
change = true;
}
final String codecName = settings.get(EngineConfig.INDEX_CODEC_SETTING, config.codecName);
if (!codecName.equals(config.codecName)) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_CODEC_SETTING, config.codecName, codecName);
config.codecName = codecName;
// we want to flush in this case, so the new codec will be reflected right away...
change = true;
}
final boolean failOnMergeFailure = settings.getAsBoolean(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, config.isFailOnMergeFailure());
if (failOnMergeFailure != config.isFailOnMergeFailure()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, config.isFailOnMergeFailure(), failOnMergeFailure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,6 @@ private void updateSettings() {
if (closedOrFailed == false) {
final LiveIndexWriterConfig iwc = indexWriter.getConfig();
iwc.setUseCompoundFile(engineConfig.isCompoundOnFlush());
final boolean concurrencyNeedsUpdate = iwc.getMaxThreadStates() != engineConfig.getIndexConcurrency();
final boolean codecNeedsUpdate = iwc.getCodec().equals(engineConfig.getCodec()) == false;
if (codecNeedsUpdate || concurrencyNeedsUpdate) {
flush(FlushType.NEW_WRITER, false, false);
}
}
}

Expand Down Expand Up @@ -720,7 +715,7 @@ public void refresh(String source) throws EngineException {
@Override
public void flush(FlushType type, boolean force, boolean waitIfOngoing) throws EngineException {
ensureOpen();
if (type == FlushType.NEW_WRITER || type == FlushType.COMMIT_TRANSLOG) {
if (type == FlushType.COMMIT_TRANSLOG) {
// check outside the lock as well so we can check without blocking on the write lock
if (onGoingRecoveries.get() > 0) {
throw new FlushNotAllowedEngineException(shardId, "recovery is in progress, flush [" + type + "] is not allowed");
Expand All @@ -734,46 +729,7 @@ public void flush(FlushType type, boolean force, boolean waitIfOngoing) throws E

flushLock.lock();
try {
if (type == FlushType.NEW_WRITER) {
try (InternalLock _ = writeLock.acquire()) {
if (onGoingRecoveries.get() > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
}
try {
{ // commit and close the current writer - we write the current tanslog ID just in case
final long translogId = translog.currentId();
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
indexWriter.commit();
indexWriter.rollback();
}
indexWriter = createWriter();
// commit on a just opened writer will commit even if there are no changes done to it
// we rely on that for the commit data translog id key
if (flushNeeded || force) {
flushNeeded = false;
long translogId = translogIdGenerator.incrementAndGet();
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
indexWriter.commit();
translog.newTranslog(translogId);
}

SearcherManager current = this.searcherManager;
this.searcherManager = buildSearchManager(indexWriter);
versionMap.setManager(searcherManager);

try {
IOUtils.close(current);
} catch (Throwable t) {
logger.warn("Failed to close current SearcherManager", t);
}

maybePruneDeletedTombstones();

} catch (Throwable t) {
throw new FlushFailedEngineException(shardId, t);
}
}
} else if (type == FlushType.COMMIT_TRANSLOG) {
if (type == FlushType.COMMIT_TRANSLOG) {
try (InternalLock _ = readLock.acquire()) {
final IndexWriter indexWriter = currentIndexWriter();
if (onGoingRecoveries.get() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,8 @@ public IndexDynamicSettingsModule() {
indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_DOCS, Validator.POSITIVE_INTEGER);
indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_MERGE_FACTOR, Validator.INTEGER_GTE_2);
indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_COMPOUND_FORMAT);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_CONCURRENCY_SETTING, Validator.NON_NEGATIVE_INTEGER);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_COMPOUND_ON_FLUSH, Validator.BOOLEAN);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_CODEC_SETTING);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING);
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, Validator.TIME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ public void flush(FlushRequest request) throws ElasticsearchException {
logger.trace("flush with {}", request);
}
long time = System.nanoTime();
engine().flush(request.full() ? Engine.FlushType.NEW_WRITER : Engine.FlushType.COMMIT_TRANSLOG, request.force(), request.waitIfOngoing());
engine().flush(Engine.FlushType.COMMIT_TRANSLOG, request.force(), request.waitIfOngoing());
flushMetric.inc(System.nanoTime() - time);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public void handleRequest(final RestRequest request, final RestChannel channel,
FlushRequest flushRequest = new FlushRequest(Strings.splitStringByCommaToArray(request.param("index")));
flushRequest.listenerThreaded(false);
flushRequest.indicesOptions(IndicesOptions.fromRequest(request, flushRequest.indicesOptions()));
flushRequest.full(request.paramAsBoolean("full", flushRequest.full()));
flushRequest.force(request.paramAsBoolean("force", flushRequest.force()));
flushRequest.waitIfOngoing(request.paramAsBoolean("wait_if_ongoing", flushRequest.waitIfOngoing()));
client.admin().indices().flush(flushRequest, new RestBuilderListener<FlushResponse>(channel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ static void indexDocs(Client client, int numDocs) {
}
bulkRequest.execute().actionGet();
client.admin().indices().prepareRefresh("test").execute().actionGet();
client.admin().indices().prepareFlush("test").setFull(true).execute().actionGet();
client.admin().indices().prepareFlush("test").execute().actionGet();
System.out.println("done");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ static void indexData(long numDocs, Client client, boolean randomizeTerms) throw
}
bulkRequest.execute().actionGet();
client.admin().indices().prepareRefresh("test").execute().actionGet();
client.admin().indices().prepareFlush("test").setFull(true).execute().actionGet();
client.admin().indices().prepareFlush("test").execute().actionGet();
System.out.println("Done indexing " + numDocs + " documents");

}
Expand Down
Loading