Skip to content

Commit db78d30

Browse files
authored
Warn on slow metadata persistence (#47005)
Today if metadata persistence is excessively slow on a master-ineligible node then the `ClusterApplierService` emits a warning indicating that the `GatewayMetaState` applier was slow, but gives no further details. If it is excessively slow on a master-eligible node then we do not see any warning at all, although we might see other consequences such as a lagging node or a master failure. With this commit we emit a warning if metadata persistence takes longer than a configurable threshold, which defaults to `10s`. We also emit statistics that record how much index metadata was persisted and how much was skipped since this can help distinguish cases where IO was slow from cases where there are simply too many indices involved.
1 parent c363d27 commit db78d30

File tree

5 files changed

+186
-8
lines changed

5 files changed

+186
-8
lines changed

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.elasticsearch.env.Environment;
7272
import org.elasticsearch.env.NodeEnvironment;
7373
import org.elasticsearch.gateway.GatewayService;
74+
import org.elasticsearch.gateway.IncrementalClusterStateWriter;
7475
import org.elasticsearch.http.HttpTransportSettings;
7576
import org.elasticsearch.index.IndexModule;
7677
import org.elasticsearch.index.IndexSettings;
@@ -226,6 +227,7 @@ public void apply(Settings value, Settings current, Settings previous) {
226227
GatewayService.RECOVER_AFTER_MASTER_NODES_SETTING,
227228
GatewayService.RECOVER_AFTER_NODES_SETTING,
228229
GatewayService.RECOVER_AFTER_TIME_SETTING,
230+
IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD,
229231
NetworkModule.HTTP_DEFAULT_TYPE_SETTING,
230232
NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING,
231233
NetworkModule.HTTP_TYPE_SETTING,

server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,10 @@ public void start(Settings settings, TransportService transportService, ClusterS
9292
}
9393

9494
final IncrementalClusterStateWriter incrementalClusterStateWriter
95-
= new IncrementalClusterStateWriter(metaStateService, manifestClusterStateTuple.v1(),
96-
prepareInitialClusterState(transportService, clusterService, manifestClusterStateTuple.v2()));
95+
= new IncrementalClusterStateWriter(settings, clusterService.getClusterSettings(), metaStateService,
96+
manifestClusterStateTuple.v1(),
97+
prepareInitialClusterState(transportService, clusterService, manifestClusterStateTuple.v2()),
98+
transportService.getThreadPool()::relativeTimeInMillis);
9799
if (DiscoveryNode.isMasterNode(settings) == false) {
98100
if (DiscoveryNode.isDataNode(settings)) {
99101
// Master-eligible nodes persist index metadata for all indices regardless of whether they hold any shards or not. It's

server/src/main/java/org/elasticsearch/gateway/IncrementalClusterStateWriter.java

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,18 @@
1818
*/
1919
package org.elasticsearch.gateway;
2020

21+
import org.apache.logging.log4j.LogManager;
22+
import org.apache.logging.log4j.Logger;
2123
import org.elasticsearch.cluster.ClusterState;
2224
import org.elasticsearch.cluster.metadata.IndexMetaData;
2325
import org.elasticsearch.cluster.metadata.Manifest;
2426
import org.elasticsearch.cluster.metadata.MetaData;
2527
import org.elasticsearch.cluster.routing.RoutingNode;
2628
import org.elasticsearch.cluster.routing.ShardRouting;
29+
import org.elasticsearch.common.settings.ClusterSettings;
30+
import org.elasticsearch.common.settings.Setting;
31+
import org.elasticsearch.common.settings.Settings;
32+
import org.elasticsearch.common.unit.TimeValue;
2733
import org.elasticsearch.index.Index;
2834

2935
import java.util.ArrayList;
@@ -33,11 +39,17 @@
3339
import java.util.List;
3440
import java.util.Map;
3541
import java.util.Set;
42+
import java.util.function.LongSupplier;
3643

3744
/**
3845
* Tracks the metadata written to disk, allowing updated metadata to be written incrementally (i.e. only writing out the changed metadata).
3946
*/
40-
class IncrementalClusterStateWriter {
47+
public class IncrementalClusterStateWriter {
48+
49+
private static final Logger logger = LogManager.getLogger(IncrementalClusterStateWriter.class);
50+
51+
public static final Setting<TimeValue> SLOW_WRITE_LOGGING_THRESHOLD = Setting.timeSetting("gateway.slow_write_logging_threshold",
52+
TimeValue.timeValueSeconds(10), TimeValue.ZERO, Setting.Property.NodeScope, Setting.Property.Dynamic);
4153

4254
private final MetaStateService metaStateService;
4355

@@ -46,13 +58,24 @@ class IncrementalClusterStateWriter {
4658
// no need to synchronize access to these fields.
4759
private Manifest previousManifest;
4860
private ClusterState previousClusterState;
61+
private final LongSupplier relativeTimeMillisSupplier;
4962
private boolean incrementalWrite;
5063

51-
IncrementalClusterStateWriter(MetaStateService metaStateService, Manifest manifest, ClusterState clusterState) {
64+
private volatile TimeValue slowWriteLoggingThreshold;
65+
66+
IncrementalClusterStateWriter(Settings settings, ClusterSettings clusterSettings, MetaStateService metaStateService, Manifest manifest,
67+
ClusterState clusterState, LongSupplier relativeTimeMillisSupplier) {
5268
this.metaStateService = metaStateService;
5369
this.previousManifest = manifest;
5470
this.previousClusterState = clusterState;
71+
this.relativeTimeMillisSupplier = relativeTimeMillisSupplier;
5572
this.incrementalWrite = false;
73+
this.slowWriteLoggingThreshold = SLOW_WRITE_LOGGING_THRESHOLD.get(settings);
74+
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
75+
}
76+
77+
private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) {
78+
this.slowWriteLoggingThreshold = slowWriteLoggingThreshold;
5679
}
5780

5881
void setCurrentTerm(long currentTerm) throws WriteStateException {
@@ -85,14 +108,26 @@ void setIncrementalWrite(boolean incrementalWrite) {
85108
void updateClusterState(ClusterState newState, ClusterState previousState) throws WriteStateException {
86109
MetaData newMetaData = newState.metaData();
87110

111+
final long startTimeMillis = relativeTimeMillisSupplier.getAsLong();
112+
88113
final AtomicClusterStateWriter writer = new AtomicClusterStateWriter(metaStateService, previousManifest);
89114
long globalStateGeneration = writeGlobalState(writer, newMetaData);
90115
Map<Index, Long> indexGenerations = writeIndicesMetadata(writer, newState, previousState);
91116
Manifest manifest = new Manifest(previousManifest.getCurrentTerm(), newState.version(), globalStateGeneration, indexGenerations);
92117
writeManifest(writer, manifest);
93-
94118
previousManifest = manifest;
95119
previousClusterState = newState;
120+
121+
final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis;
122+
final TimeValue finalSlowWriteLoggingThreshold = this.slowWriteLoggingThreshold;
123+
if (durationMillis >= finalSlowWriteLoggingThreshold.getMillis()) {
124+
logger.warn("writing cluster state took [{}ms] which is above the warn threshold of [{}]; " +
125+
"wrote metadata for [{}] indices and skipped [{}] unchanged indices",
126+
durationMillis, finalSlowWriteLoggingThreshold, writer.getIndicesWritten(), writer.getIndicesSkipped());
127+
} else {
128+
logger.debug("writing cluster state took [{}ms]; wrote metadata for [{}] indices and skipped [{}] unchanged indices",
129+
durationMillis, writer.getIndicesWritten(), writer.getIndicesSkipped());
130+
}
96131
}
97132

98133
private void writeManifest(AtomicClusterStateWriter writer, Manifest manifest) throws WriteStateException {
@@ -256,6 +291,9 @@ static class AtomicClusterStateWriter {
256291
private final MetaStateService metaStateService;
257292
private boolean finished;
258293

294+
private int indicesWritten;
295+
private int indicesSkipped;
296+
259297
AtomicClusterStateWriter(MetaStateService metaStateService, Manifest previousManifest) {
260298
this.metaStateService = metaStateService;
261299
assert previousManifest != null;
@@ -320,6 +358,22 @@ void rollback() {
320358
rollbackCleanupActions.forEach(Runnable::run);
321359
finished = true;
322360
}
361+
362+
void incrementIndicesWritten() {
363+
indicesWritten++;
364+
}
365+
366+
void incrementIndicesSkipped() {
367+
indicesSkipped++;
368+
}
369+
370+
int getIndicesWritten() {
371+
return indicesWritten;
372+
}
373+
374+
int getIndicesSkipped() {
375+
return indicesSkipped;
376+
}
323377
}
324378

325379
static class KeepPreviousGeneration implements IndexMetaDataAction {
@@ -338,6 +392,7 @@ public Index getIndex() {
338392

339393
@Override
340394
public long execute(AtomicClusterStateWriter writer) {
395+
writer.incrementIndicesSkipped();
341396
return generation;
342397
}
343398
}
@@ -356,6 +411,7 @@ public Index getIndex() {
356411

357412
@Override
358413
public long execute(AtomicClusterStateWriter writer) throws WriteStateException {
414+
writer.incrementIndicesWritten();
359415
return writer.writeIndex("freshly created", indexMetaData);
360416
}
361417
}
@@ -376,6 +432,7 @@ public Index getIndex() {
376432

377433
@Override
378434
public long execute(AtomicClusterStateWriter writer) throws WriteStateException {
435+
writer.incrementIndicesWritten();
379436
return writer.writeIndex(
380437
"version changed from [" + oldIndexMetaData.getVersion() + "] to [" + newIndexMetaData.getVersion() + "]",
381438
newIndexMetaData);

server/src/test/java/org/elasticsearch/gateway/IncrementalClusterStateWriterTests.java

Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,35 @@
1818
*/
1919
package org.elasticsearch.gateway;
2020

21+
import org.apache.logging.log4j.Level;
22+
import org.apache.logging.log4j.LogManager;
23+
import org.apache.logging.log4j.Logger;
2124
import org.apache.lucene.store.Directory;
2225
import org.apache.lucene.store.MockDirectoryWrapper;
2326
import org.elasticsearch.Version;
27+
import org.elasticsearch.cluster.ClusterName;
2428
import org.elasticsearch.cluster.ClusterState;
2529
import org.elasticsearch.cluster.ESAllocationTestCase;
2630
import org.elasticsearch.cluster.metadata.IndexMetaData;
2731
import org.elasticsearch.cluster.metadata.Manifest;
2832
import org.elasticsearch.cluster.metadata.MetaData;
33+
import org.elasticsearch.cluster.node.DiscoveryNode;
2934
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
3035
import org.elasticsearch.cluster.node.DiscoveryNodes;
3136
import org.elasticsearch.cluster.routing.RoutingTable;
3237
import org.elasticsearch.cluster.routing.allocation.AllocationService;
3338
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
3439
import org.elasticsearch.common.collect.Tuple;
40+
import org.elasticsearch.common.logging.Loggers;
41+
import org.elasticsearch.common.settings.ClusterSettings;
3542
import org.elasticsearch.common.settings.Settings;
3643
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
3744
import org.elasticsearch.common.xcontent.XContentBuilder;
3845
import org.elasticsearch.common.xcontent.XContentParser;
3946
import org.elasticsearch.env.NodeEnvironment;
4047
import org.elasticsearch.index.Index;
48+
import org.elasticsearch.test.MockLogAppender;
49+
import org.elasticsearch.test.junit.annotations.TestLogging;
4150
import org.mockito.ArgumentCaptor;
4251

4352
import java.io.IOException;
@@ -48,15 +57,18 @@
4857
import java.util.List;
4958
import java.util.Map;
5059
import java.util.Set;
60+
import java.util.concurrent.atomic.AtomicLong;
5161

5262
import static org.hamcrest.Matchers.containsString;
5363
import static org.hamcrest.Matchers.equalTo;
5464
import static org.hamcrest.Matchers.hasSize;
65+
import static org.hamcrest.Matchers.lessThan;
5566
import static org.mockito.Matchers.anyString;
5667
import static org.mockito.Matchers.eq;
5768
import static org.mockito.Mockito.mock;
69+
import static org.mockito.Mockito.times;
5870
import static org.mockito.Mockito.verify;
59-
import static org.mockito.Mockito.verifyZeroInteractions;
71+
import static org.mockito.Mockito.verifyNoMoreInteractions;
6072
import static org.mockito.Mockito.when;
6173

6274
public class IncrementalClusterStateWriterTests extends ESAllocationTestCase {
@@ -250,20 +262,28 @@ public void testResolveStatesToBeWritten() throws WriteStateException {
250262

251263
assertThat(actions, hasSize(3));
252264

265+
boolean keptPreviousGeneration = false;
266+
boolean wroteNewIndex = false;
267+
boolean wroteChangedIndex = false;
268+
253269
for (IncrementalClusterStateWriter.IndexMetaDataAction action : actions) {
254270
if (action instanceof IncrementalClusterStateWriter.KeepPreviousGeneration) {
255271
assertThat(action.getIndex(), equalTo(notChangedIndex.getIndex()));
256272
IncrementalClusterStateWriter.AtomicClusterStateWriter writer
257273
= mock(IncrementalClusterStateWriter.AtomicClusterStateWriter.class);
258274
assertThat(action.execute(writer), equalTo(3L));
259-
verifyZeroInteractions(writer);
275+
verify(writer, times(1)).incrementIndicesSkipped();
276+
verifyNoMoreInteractions(writer);
277+
keptPreviousGeneration = true;
260278
}
261279
if (action instanceof IncrementalClusterStateWriter.WriteNewIndexMetaData) {
262280
assertThat(action.getIndex(), equalTo(newIndex.getIndex()));
263281
IncrementalClusterStateWriter.AtomicClusterStateWriter writer
264282
= mock(IncrementalClusterStateWriter.AtomicClusterStateWriter.class);
265283
when(writer.writeIndex("freshly created", newIndex)).thenReturn(0L);
266284
assertThat(action.execute(writer), equalTo(0L));
285+
verify(writer, times(1)).incrementIndicesWritten();
286+
wroteNewIndex = true;
267287
}
268288
if (action instanceof IncrementalClusterStateWriter.WriteChangedIndexMetaData) {
269289
assertThat(action.getIndex(), equalTo(newVersionChangedIndex.getIndex()));
@@ -273,10 +293,16 @@ public void testResolveStatesToBeWritten() throws WriteStateException {
273293
assertThat(action.execute(writer), equalTo(3L));
274294
ArgumentCaptor<String> reason = ArgumentCaptor.forClass(String.class);
275295
verify(writer).writeIndex(reason.capture(), eq(newVersionChangedIndex));
296+
verify(writer, times(1)).incrementIndicesWritten();
276297
assertThat(reason.getValue(), containsString(Long.toString(versionChangedIndex.getVersion())));
277298
assertThat(reason.getValue(), containsString(Long.toString(newVersionChangedIndex.getVersion())));
299+
wroteChangedIndex = true;
278300
}
279301
}
302+
303+
assertTrue(keptPreviousGeneration);
304+
assertTrue(wroteNewIndex);
305+
assertTrue(wroteChangedIndex);
280306
}
281307

282308
private static class MetaStateServiceWithFailures extends MetaStateService {
@@ -426,4 +452,84 @@ public void testAtomicityWithFailures() throws IOException {
426452
assertTrue(possibleMetaData.stream().anyMatch(md -> metaDataEquals(md, loadedMetaData)));
427453
}
428454
}
455+
456+
@TestLogging(value = "org.elasticsearch.gateway:WARN", reason = "to ensure that we log gateway events on WARN level")
457+
public void testSlowLogging() throws WriteStateException, IllegalAccessException {
458+
final long slowWriteLoggingThresholdMillis;
459+
final Settings settings;
460+
if (randomBoolean()) {
461+
slowWriteLoggingThresholdMillis = IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.get(Settings.EMPTY).millis();
462+
settings = Settings.EMPTY;
463+
} else {
464+
slowWriteLoggingThresholdMillis = randomLongBetween(2, 100000);
465+
settings = Settings.builder()
466+
.put(IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), slowWriteLoggingThresholdMillis + "ms")
467+
.build();
468+
}
469+
470+
final DiscoveryNode localNode = newNode("node");
471+
final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
472+
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build();
473+
474+
final long startTimeMillis = randomLongBetween(0L, Long.MAX_VALUE - slowWriteLoggingThresholdMillis * 10);
475+
final AtomicLong currentTime = new AtomicLong(startTimeMillis);
476+
final AtomicLong writeDurationMillis = new AtomicLong(slowWriteLoggingThresholdMillis);
477+
478+
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
479+
final IncrementalClusterStateWriter incrementalClusterStateWriter
480+
= new IncrementalClusterStateWriter(settings, clusterSettings, mock(MetaStateService.class),
481+
new Manifest(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), Collections.emptyMap()),
482+
clusterState, () -> currentTime.getAndAdd(writeDurationMillis.get()));
483+
484+
assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation(
485+
"should see warning at threshold",
486+
IncrementalClusterStateWriter.class.getCanonicalName(),
487+
Level.WARN,
488+
"writing cluster state took [*] which is above the warn threshold of [*]; " +
489+
"wrote metadata for [0] indices and skipped [0] unchanged indices"));
490+
491+
writeDurationMillis.set(randomLongBetween(slowWriteLoggingThresholdMillis, slowWriteLoggingThresholdMillis * 2));
492+
assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation(
493+
"should see warning above threshold",
494+
IncrementalClusterStateWriter.class.getCanonicalName(),
495+
Level.WARN,
496+
"writing cluster state took [*] which is above the warn threshold of [*]; " +
497+
"wrote metadata for [0] indices and skipped [0] unchanged indices"));
498+
499+
writeDurationMillis.set(randomLongBetween(1, slowWriteLoggingThresholdMillis - 1));
500+
assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.UnseenEventExpectation(
501+
"should not see warning below threshold",
502+
IncrementalClusterStateWriter.class.getCanonicalName(),
503+
Level.WARN,
504+
"*"));
505+
506+
clusterSettings.applySettings(Settings.builder()
507+
.put(IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), writeDurationMillis.get() + "ms")
508+
.build());
509+
assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation(
510+
"should see warning at reduced threshold",
511+
IncrementalClusterStateWriter.class.getCanonicalName(),
512+
Level.WARN,
513+
"writing cluster state took [*] which is above the warn threshold of [*]; " +
514+
"wrote metadata for [0] indices and skipped [0] unchanged indices"));
515+
516+
assertThat(currentTime.get(), lessThan(startTimeMillis + 10 * slowWriteLoggingThresholdMillis)); // ensure no overflow
517+
}
518+
519+
private void assertExpectedLogs(ClusterState clusterState, IncrementalClusterStateWriter incrementalClusterStateWriter,
520+
MockLogAppender.LoggingExpectation expectation) throws IllegalAccessException, WriteStateException {
521+
MockLogAppender mockAppender = new MockLogAppender();
522+
mockAppender.start();
523+
mockAppender.addExpectation(expectation);
524+
Logger classLogger = LogManager.getLogger(IncrementalClusterStateWriter.class);
525+
Loggers.addAppender(classLogger, mockAppender);
526+
527+
try {
528+
incrementalClusterStateWriter.updateClusterState(clusterState, clusterState);
529+
} finally {
530+
Loggers.removeAppender(classLogger, mockAppender);
531+
mockAppender.stop();
532+
}
533+
mockAppender.assertAllExpectationsMatched();
534+
}
429535
}

0 commit comments

Comments
 (0)