Skip to content

Commit 611fb03

Browse files
authored
Implement rejections in WriteMemoryLimits (#58885)
This commit adds rejections when the indexing memory limits are exceeded for primary or coordinating operations. The amount of bytes allow for indexing is controlled by a new setting indexing_limits.memory.limit.
1 parent 51438c3 commit 611fb03

File tree

14 files changed

+310
-74
lines changed

14 files changed

+310
-74
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java

Lines changed: 235 additions & 47 deletions
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,70 @@
2020
package org.elasticsearch.action.bulk;
2121

2222
import org.elasticsearch.common.lease.Releasable;
23+
import org.elasticsearch.common.settings.ClusterSettings;
24+
import org.elasticsearch.common.settings.Setting;
25+
import org.elasticsearch.common.settings.Settings;
26+
import org.elasticsearch.common.unit.ByteSizeValue;
27+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2328

2429
import java.util.concurrent.atomic.AtomicLong;
2530

2631
public class WriteMemoryLimits {
2732

33+
public static final Setting<ByteSizeValue> MAX_INDEXING_BYTES =
34+
Setting.memorySizeSetting("indexing_limits.memory.limit", "10%", Setting.Property.NodeScope);
35+
2836
private final AtomicLong writeBytes = new AtomicLong(0);
2937
private final AtomicLong replicaWriteBytes = new AtomicLong(0);
38+
private final long writeLimits;
39+
40+
public WriteMemoryLimits(Settings settings) {
41+
this.writeLimits = MAX_INDEXING_BYTES.get(settings).getBytes();
42+
}
43+
44+
public WriteMemoryLimits(Settings settings, ClusterSettings clusterSettings) {
45+
this.writeLimits = MAX_INDEXING_BYTES.get(settings).getBytes();
46+
}
3047

3148
public Releasable markWriteOperationStarted(long bytes) {
32-
writeBytes.addAndGet(bytes);
33-
return () -> writeBytes.getAndAdd(-bytes);
49+
return markWriteOperationStarted(bytes, false);
50+
}
51+
52+
public Releasable markWriteOperationStarted(long bytes, boolean forceExecution) {
53+
long currentWriteLimits = this.writeLimits;
54+
long writeBytes = this.writeBytes.addAndGet(bytes);
55+
long replicaWriteBytes = this.replicaWriteBytes.get();
56+
long totalBytes = writeBytes + replicaWriteBytes;
57+
if (forceExecution == false && totalBytes > currentWriteLimits) {
58+
long bytesWithoutOperation = writeBytes - bytes;
59+
long totalBytesWithoutOperation = totalBytes - bytes;
60+
this.writeBytes.getAndAdd(-bytes);
61+
throw new EsRejectedExecutionException("rejected execution of write operation [" +
62+
"write_bytes=" + bytesWithoutOperation + ", " +
63+
"replica_write_bytes=" + replicaWriteBytes + ", " +
64+
"total_write_bytes=" + totalBytesWithoutOperation + ", " +
65+
"current_operation_bytes=" + bytes + ", " +
66+
"max_write_bytes=" + currentWriteLimits + "]", false);
67+
}
68+
return () -> this.writeBytes.getAndAdd(-bytes);
3469
}
3570

3671
public long getWriteBytes() {
3772
return writeBytes.get();
3873
}
3974

40-
public Releasable markReplicaWriteStarted(long bytes) {
41-
replicaWriteBytes.getAndAdd(bytes);
42-
return () -> replicaWriteBytes.getAndAdd(-bytes);
75+
public Releasable markReplicaWriteStarted(long bytes, boolean forceExecution) {
76+
long currentReplicaWriteLimits = (long) (this.writeLimits * 1.5);
77+
long replicaWriteBytes = this.replicaWriteBytes.getAndAdd(bytes);
78+
if (forceExecution == false && replicaWriteBytes > currentReplicaWriteLimits) {
79+
long replicaBytesWithoutOperation = replicaWriteBytes - bytes;
80+
this.replicaWriteBytes.getAndAdd(-bytes);
81+
throw new EsRejectedExecutionException("rejected execution of replica write operation [" +
82+
"replica_write_bytes=" + replicaBytesWithoutOperation + ", " +
83+
"current_replica_operation_bytes=" + bytes + ", " +
84+
"max_replica_write_bytes=" + currentReplicaWriteLimits + "]", false);
85+
}
86+
return () -> this.replicaWriteBytes.getAndAdd(-bytes);
4387
}
4488

4589
public long getReplicaWriteBytes() {

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -701,6 +701,6 @@ public long getAutoGeneratedTimestamp() {
701701

702702
@Override
703703
public long ramBytesUsed() {
704-
return SHALLOW_SIZE + RamUsageEstimator.sizeOf(id) + (source == null ? 0 : source.ramBytesUsed());
704+
return SHALLOW_SIZE + RamUsageEstimator.sizeOf(id) + (source == null ? 0 : source.length());
705705
}
706706
}

server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public abstract class TransportWriteAction<
6060
Response extends ReplicationResponse & WriteResponse
6161
> extends TransportReplicationAction<Request, ReplicaRequest, Response> {
6262

63-
private final boolean forceExecutionOnPrimary;
63+
private final boolean forceExecution;
6464
private final WriteMemoryLimits writeMemoryLimits;
6565
private final String executor;
6666

@@ -74,13 +74,13 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe
7474
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
7575
request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary);
7676
this.executor = executor;
77-
this.forceExecutionOnPrimary = forceExecutionOnPrimary;
77+
this.forceExecution = forceExecutionOnPrimary;
7878
this.writeMemoryLimits = writeMemoryLimits;
7979
}
8080

8181
@Override
8282
protected Releasable checkOperationLimits(Request request) {
83-
return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request));
83+
return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request), forceExecution);
8484
}
8585

8686
@Override
@@ -90,7 +90,7 @@ protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal
9090
if (rerouteWasLocal) {
9191
return () -> {};
9292
} else {
93-
return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request));
93+
return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request), forceExecution);
9494
}
9595
}
9696

@@ -100,7 +100,7 @@ protected long primaryOperationSize(Request request) {
100100

101101
@Override
102102
protected Releasable checkReplicaLimits(ReplicaRequest request) {
103-
return writeMemoryLimits.markReplicaWriteStarted(replicaOperationSize(request));
103+
return writeMemoryLimits.markReplicaWriteStarted(replicaOperationSize(request), forceExecution);
104104
}
105105

106106
protected long replicaOperationSize(ReplicaRequest request) {
@@ -156,7 +156,7 @@ protected void doRun() {
156156

157157
@Override
158158
public boolean isForceExecution() {
159-
return forceExecutionOnPrimary;
159+
return forceExecution;
160160
}
161161
});
162162
}

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.logging.log4j.LogManager;
2222
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
2323
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
24+
import org.elasticsearch.action.bulk.WriteMemoryLimits;
2425
import org.elasticsearch.action.search.TransportSearchAction;
2526
import org.elasticsearch.action.support.AutoCreateIndex;
2627
import org.elasticsearch.action.support.DestructiveOperations;
@@ -488,7 +489,8 @@ public void apply(Settings value, Settings current, Settings previous) {
488489
HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING,
489490
FsHealthService.ENABLED_SETTING,
490491
FsHealthService.REFRESH_INTERVAL_SETTING,
491-
FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING);
492+
FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING,
493+
WriteMemoryLimits.MAX_INDEXING_BYTES);
492494

493495
static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.emptyList();
494496

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,7 @@ protected Node(final Environment initialEnvironment,
584584
new PersistentTasksClusterService(settings, registry, clusterService, threadPool);
585585
resourcesToClose.add(persistentTasksClusterService);
586586
final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
587-
final WriteMemoryLimits bulkIndexingLimits = new WriteMemoryLimits();
587+
final WriteMemoryLimits bulkIndexingLimits = new WriteMemoryLimits(settings);
588588

589589
modules.add(b -> {
590590
b.bind(Node.class).toInstance(this);

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.cluster.node.DiscoveryNode;
3232
import org.elasticsearch.cluster.node.DiscoveryNodes;
3333
import org.elasticsearch.cluster.service.ClusterService;
34+
import org.elasticsearch.common.settings.Settings;
3435
import org.elasticsearch.common.unit.TimeValue;
3536
import org.elasticsearch.common.util.concurrent.AtomicArray;
3637
import org.elasticsearch.common.util.concurrent.EsExecutors;
@@ -120,7 +121,8 @@ private void indicesThatCannotBeCreatedTestCase(Set<String> expected,
120121
final ExecutorService direct = EsExecutors.newDirectExecutorService();
121122
when(threadPool.executor(anyString())).thenReturn(direct);
122123
TransportBulkAction action = new TransportBulkAction(threadPool, mock(TransportService.class), clusterService,
123-
null, null, mock(ActionFilters.class), null, null, new WriteMemoryLimits()) {
124+
null, null, mock(ActionFilters.class), null, null,
125+
new WriteMemoryLimits(Settings.EMPTY)) {
124126
@Override
125127
void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener<BulkResponse> listener,
126128
AtomicArray<BulkItemResponse> responses, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ null, new ActionFilters(Collections.emptySet()), null,
143143
new AutoCreateIndex(
144144
SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
145145
new IndexNameExpressionResolver()
146-
), new WriteMemoryLimits()
146+
), new WriteMemoryLimits(SETTINGS)
147147
);
148148
}
149149
@Override

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class TestTransportBulkAction extends TransportBulkAction {
8282
super(TransportBulkActionTests.this.threadPool, transportService, clusterService, null,
8383
null, new ActionFilters(Collections.emptySet()), new Resolver(),
8484
new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver()),
85-
new WriteMemoryLimits());
85+
new WriteMemoryLimits(Settings.EMPTY));
8686
}
8787

8888
@Override

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ static class TestTransportBulkAction extends TransportBulkAction {
233233
actionFilters,
234234
indexNameExpressionResolver,
235235
autoCreateIndex,
236-
new WriteMemoryLimits(),
236+
new WriteMemoryLimits(Settings.EMPTY),
237237
relativeTimeProvider);
238238
}
239239

server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception {
145145

146146
final TransportResyncReplicationAction action = new TransportResyncReplicationAction(Settings.EMPTY, transportService,
147147
clusterService, indexServices, threadPool, shardStateAction, new ActionFilters(new HashSet<>()),
148-
new WriteMemoryLimits());
148+
new WriteMemoryLimits(Settings.EMPTY));
149149

150150
assertThat(action.globalBlockLevel(), nullValue());
151151
assertThat(action.indexBlockLevel(), nullValue());

server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentF
367367
new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
368368
x -> null, null, Collections.emptySet()), TransportWriteActionTests.this.clusterService, null, null, null,
369369
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false,
370-
new WriteMemoryLimits());
370+
new WriteMemoryLimits(Settings.EMPTY));
371371
this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
372372
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
373373
}
@@ -377,7 +377,7 @@ protected TestAction(Settings settings, String actionName, TransportService tran
377377
super(settings, actionName, transportService, clusterService,
378378
mockIndicesService(clusterService), threadPool, shardStateAction,
379379
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false,
380-
new WriteMemoryLimits());
380+
new WriteMemoryLimits(settings));
381381
this.withDocumentFailureOnPrimary = false;
382382
this.withDocumentFailureOnReplica = false;
383383
}

server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public void testRetentionLeaseSyncActionOnPrimary() {
106106
threadPool,
107107
shardStateAction,
108108
new ActionFilters(Collections.emptySet()),
109-
new WriteMemoryLimits());
109+
new WriteMemoryLimits(Settings.EMPTY));
110110
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
111111
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
112112
action.dispatchedShardOperationOnPrimary(request, indexShard,
@@ -143,7 +143,7 @@ public void testRetentionLeaseSyncActionOnReplica() throws WriteStateException {
143143
threadPool,
144144
shardStateAction,
145145
new ActionFilters(Collections.emptySet()),
146-
new WriteMemoryLimits());
146+
new WriteMemoryLimits(Settings.EMPTY));
147147
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
148148
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
149149

@@ -182,7 +182,7 @@ public void testBlocks() {
182182
threadPool,
183183
shardStateAction,
184184
new ActionFilters(Collections.emptySet()),
185-
new WriteMemoryLimits());
185+
new WriteMemoryLimits(Settings.EMPTY));
186186

187187
assertNull(action.indexBlockLevel());
188188
}

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1498,7 +1498,7 @@ public void onFailure(final Exception e) {
14981498
threadPool,
14991499
shardStateAction,
15001500
actionFilters,
1501-
new WriteMemoryLimits())),
1501+
new WriteMemoryLimits(settings))),
15021502
RetentionLeaseSyncer.EMPTY,
15031503
client);
15041504
final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService);
@@ -1513,7 +1513,7 @@ allocationService, new AliasValidator(), shardLimitValidator, environment, index
15131513
actionFilters, indexNameExpressionResolver
15141514
));
15151515
final MappingUpdatedAction mappingUpdatedAction = new MappingUpdatedAction(settings, clusterSettings, clusterService);
1516-
final WriteMemoryLimits indexingMemoryLimits = new WriteMemoryLimits();
1516+
final WriteMemoryLimits indexingMemoryLimits = new WriteMemoryLimits(settings);
15171517
mappingUpdatedAction.setClient(client);
15181518
actions.put(BulkAction.INSTANCE,
15191519
new TransportBulkAction(threadPool, transportService, clusterService,
@@ -1523,7 +1523,7 @@ allocationService, new AliasValidator(), shardLimitValidator, environment, index
15231523
Collections.emptyList(), client),
15241524
client, actionFilters, indexNameExpressionResolver,
15251525
new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver),
1526-
new WriteMemoryLimits()
1526+
new WriteMemoryLimits(settings)
15271527
));
15281528
final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService,
15291529
clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService),

0 commit comments

Comments
 (0)