Skip to content

Commit c2e722f

Browse files
committed
Backport Add coordinating object to track bytes (elastic#122460) and Simplify check to split bulk request (elastic#124035)
1 parent 150843f commit c2e722f

File tree

5 files changed

+236
-113
lines changed

5 files changed

+236
-113
lines changed

server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.ArrayList;
2727
import java.util.Collections;
2828
import java.util.List;
29+
import java.util.Optional;
2930
import java.util.concurrent.atomic.AtomicBoolean;
3031
import java.util.function.Supplier;
3132

@@ -91,19 +92,18 @@ public static class Handler implements Releasable {
9192
public static final BulkRequest.IncrementalState EMPTY_STATE = new BulkRequest.IncrementalState(Collections.emptyMap(), true);
9293

9394
private final Client client;
94-
private final IndexingPressure indexingPressure;
9595
private final ActiveShardCount waitForActiveShards;
9696
private final TimeValue timeout;
9797
private final String refresh;
9898

9999
private final ArrayList<Releasable> releasables = new ArrayList<>(4);
100100
private final ArrayList<BulkResponse> responses = new ArrayList<>(2);
101+
private final IndexingPressure.Incremental incrementalOperation;
101102
private boolean closed = false;
102103
private boolean globalFailure = false;
103104
private boolean incrementalRequestSubmitted = false;
104105
private boolean bulkInProgress = false;
105106
private Exception bulkActionLevelFailure = null;
106-
private long currentBulkSize = 0L;
107107
private BulkRequest bulkRequest = null;
108108

109109
protected Handler(
@@ -114,13 +114,17 @@ protected Handler(
114114
@Nullable String refresh
115115
) {
116116
this.client = client;
117-
this.indexingPressure = indexingPressure;
118117
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
119118
this.timeout = timeout;
120119
this.refresh = refresh;
120+
this.incrementalOperation = indexingPressure.startIncrementalCoordinating(0, 0, false);
121121
createNewBulkRequest(EMPTY_STATE);
122122
}
123123

124+
public IndexingPressure.Incremental getIncrementalOperation() {
125+
return incrementalOperation;
126+
}
127+
124128
public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {
125129
assert closed == false;
126130
assert bulkInProgress == false;
@@ -130,7 +134,9 @@ public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runn
130134
} else {
131135
assert bulkRequest != null;
132136
if (internalAddItems(items, releasable)) {
133-
if (shouldBackOff()) {
137+
Optional<Releasable> maybeSplit = incrementalOperation.maybeSplit();
138+
if (maybeSplit.isPresent()) {
139+
Releasable coordinating = maybeSplit.get();
134140
final boolean isFirstRequest = incrementalRequestSubmitted == false;
135141
incrementalRequestSubmitted = true;
136142
final ArrayList<Releasable> toRelease = new ArrayList<>(releasables);
@@ -153,6 +159,7 @@ public void onFailure(Exception e) {
153159
}, () -> {
154160
bulkInProgress = false;
155161
toRelease.forEach(Releasable::close);
162+
coordinating.close();
156163
nextItems.run();
157164
}));
158165
} else {
@@ -164,10 +171,6 @@ public void onFailure(Exception e) {
164171
}
165172
}
166173

167-
private boolean shouldBackOff() {
168-
return indexingPressure.shouldSplitBulk(currentBulkSize);
169-
}
170-
171174
public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, ActionListener<BulkResponse> listener) {
172175
assert bulkInProgress == false;
173176
if (bulkActionLevelFailure != null) {
@@ -176,6 +179,7 @@ public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, Act
176179
} else {
177180
assert bulkRequest != null;
178181
if (internalAddItems(items, releasable)) {
182+
Releasable coordinating = incrementalOperation.split();
179183
final ArrayList<Releasable> toRelease = new ArrayList<>(releasables);
180184
releasables.clear();
181185
// We do not need to set this back to false as this will be the last request.
@@ -195,7 +199,10 @@ public void onFailure(Exception e) {
195199
handleBulkFailure(isFirstRequest, e);
196200
errorResponse(listener);
197201
}
198-
}, () -> toRelease.forEach(Releasable::close)));
202+
}, () -> {
203+
toRelease.forEach(Releasable::close);
204+
coordinating.close();
205+
}));
199206
} else {
200207
errorResponse(listener);
201208
}
@@ -204,13 +211,17 @@ public void onFailure(Exception e) {
204211

205212
@Override
206213
public void close() {
207-
closed = true;
208-
releasables.forEach(Releasable::close);
209-
releasables.clear();
214+
if (closed == false) {
215+
closed = true;
216+
incrementalOperation.close();
217+
releasables.forEach(Releasable::close);
218+
releasables.clear();
219+
}
210220
}
211221

212222
private void shortCircuitDueToTopLevelFailure(List<DocWriteRequest<?>> items, Releasable releasable) {
213223
assert releasables.isEmpty();
224+
assert incrementalOperation.currentOperationsSize() == 0;
214225
assert bulkRequest == null;
215226
if (globalFailure == false) {
216227
addItemLevelFailures(items);
@@ -228,7 +239,6 @@ private void errorResponse(ActionListener<BulkResponse> listener) {
228239

229240
private void handleBulkSuccess(BulkResponse bulkResponse) {
230241
responses.add(bulkResponse);
231-
currentBulkSize = 0L;
232242
bulkRequest = null;
233243
}
234244

@@ -237,7 +247,6 @@ private void handleBulkFailure(boolean isFirstRequest, Exception e) {
237247
globalFailure = isFirstRequest;
238248
bulkActionLevelFailure = e;
239249
addItemLevelFailures(bulkRequest.requests());
240-
currentBulkSize = 0;
241250
bulkRequest = null;
242251
}
243252

@@ -257,19 +266,18 @@ private boolean internalAddItems(List<DocWriteRequest<?>> items, Releasable rele
257266
bulkRequest.add(items);
258267
releasables.add(releasable);
259268
long size = items.stream().mapToLong(Accountable::ramBytesUsed).sum();
260-
releasables.add(indexingPressure.markCoordinatingOperationStarted(items.size(), size, false));
261-
currentBulkSize += size;
269+
incrementalOperation.increment(items.size(), size);
262270
return true;
263271
} catch (EsRejectedExecutionException e) {
264272
handleBulkFailure(incrementalRequestSubmitted == false, e);
273+
incrementalOperation.split().close();
265274
releasables.forEach(Releasable::close);
266275
releasables.clear();
267276
return false;
268277
}
269278
}
270279

271280
private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState) {
272-
assert currentBulkSize == 0L;
273281
assert bulkRequest == null;
274282
bulkRequest = new BulkRequest();
275283
bulkRequest.incrementalState(incrementalState);

server/src/main/java/org/elasticsearch/index/IndexingPressure.java

Lines changed: 161 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.core.Releasable;
2020
import org.elasticsearch.index.stats.IndexingPressureStats;
2121

22+
import java.util.Optional;
2223
import java.util.concurrent.atomic.AtomicBoolean;
2324
import java.util.concurrent.atomic.AtomicLong;
2425

@@ -138,48 +139,167 @@ private static Releasable wrapReleasable(Releasable releasable) {
138139
};
139140
}
140141

141-
public Releasable markCoordinatingOperationStarted(int operations, long bytes, boolean forceExecution) {
142-
long combinedBytes = this.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
143-
long replicaWriteBytes = this.currentReplicaBytes.get();
144-
long totalBytes = combinedBytes + replicaWriteBytes;
145-
if (forceExecution == false && totalBytes > coordinatingLimit) {
146-
long bytesWithoutOperation = combinedBytes - bytes;
147-
long totalBytesWithoutOperation = totalBytes - bytes;
148-
this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
149-
this.coordinatingRejections.getAndIncrement();
150-
throw new EsRejectedExecutionException(
151-
"rejected execution of coordinating operation ["
152-
+ "coordinating_and_primary_bytes="
153-
+ bytesWithoutOperation
154-
+ ", "
155-
+ "replica_bytes="
156-
+ replicaWriteBytes
157-
+ ", "
158-
+ "all_bytes="
159-
+ totalBytesWithoutOperation
160-
+ ", "
161-
+ "coordinating_operation_bytes="
162-
+ bytes
163-
+ ", "
164-
+ "max_coordinating_bytes="
165-
+ coordinatingLimit
166-
+ "]",
167-
false
168-
);
142+
public Incremental startIncrementalCoordinating(int operations, long bytes, boolean forceExecution) {
143+
Incremental coordinating = new Incremental(forceExecution);
144+
coordinating.coordinating.increment(operations, bytes);
145+
return coordinating;
146+
}
147+
148+
public Coordinating markCoordinatingOperationStarted(int operations, long bytes, boolean forceExecution) {
149+
Coordinating coordinating = new Coordinating(forceExecution);
150+
coordinating.increment(operations, bytes);
151+
return coordinating;
152+
}
153+
154+
public class Incremental implements Releasable {
155+
156+
private final AtomicBoolean closed = new AtomicBoolean();
157+
private final boolean forceExecution;
158+
private long currentUnparsedSize = 0;
159+
private long totalParsedBytes = 0;
160+
private Coordinating coordinating;
161+
162+
public Incremental(boolean forceExecution) {
163+
this.forceExecution = forceExecution;
164+
this.coordinating = new Coordinating(forceExecution);
165+
}
166+
167+
public long totalParsedBytes() {
168+
return totalParsedBytes;
169+
}
170+
171+
public void incrementUnparsedBytes(long bytes) {
172+
assert closed.get() == false;
173+
// TODO: Implement integration with IndexingPressure for unparsed bytes
174+
currentUnparsedSize += bytes;
175+
}
176+
177+
public void transferUnparsedBytesToParsed(long bytes) {
178+
assert closed.get() == false;
179+
assert currentUnparsedSize >= bytes;
180+
currentUnparsedSize -= bytes;
181+
totalParsedBytes += bytes;
182+
}
183+
184+
public void increment(int operations, long bytes) {
185+
// TODO: Eventually most of the memory will already be accounted for in unparsed.
186+
coordinating.increment(operations, bytes);
187+
}
188+
189+
public long currentOperationsSize() {
190+
return coordinating.currentOperationsSize;
191+
}
192+
193+
public Optional<Releasable> maybeSplit() {
194+
long currentUsage = (currentCombinedCoordinatingAndPrimaryBytes.get() + currentReplicaBytes.get());
195+
long currentOperationsSize = coordinating.currentOperationsSize;
196+
if (currentUsage >= highWatermark && currentOperationsSize >= highWatermarkSize) {
197+
highWaterMarkSplits.getAndIncrement();
198+
logger.trace(
199+
() -> Strings.format(
200+
"Split bulk due to high watermark: current bytes [%d] and size [%d]",
201+
currentUsage,
202+
currentOperationsSize
203+
)
204+
);
205+
return Optional.of(split());
206+
}
207+
if (currentUsage >= lowWatermark && currentOperationsSize >= lowWatermarkSize) {
208+
lowWaterMarkSplits.getAndIncrement();
209+
logger.trace(
210+
() -> Strings.format(
211+
"Split bulk due to low watermark: current bytes [%d] and size [%d]",
212+
currentUsage,
213+
currentOperationsSize
214+
)
215+
);
216+
return Optional.of(split());
217+
}
218+
return Optional.empty();
219+
}
220+
221+
public Releasable split() {
222+
Coordinating toReturn = coordinating;
223+
coordinating = new Coordinating(forceExecution);
224+
return toReturn;
225+
}
226+
227+
@Override
228+
public void close() {
229+
coordinating.close();
230+
}
231+
}
232+
233+
// TODO: Maybe this should be re-named and used for primary operations too. Eventually we will need to account for: ingest pipeline
234+
// expansions, reading updates, etc. This could just be a generic OP that could be expanded as appropriate
235+
public class Coordinating implements Releasable {
236+
237+
private final AtomicBoolean closed = new AtomicBoolean();
238+
private final boolean forceExecution;
239+
private int currentOperations = 0;
240+
private long currentOperationsSize = 0;
241+
242+
public Coordinating(boolean forceExecution) {
243+
this.forceExecution = forceExecution;
244+
}
245+
246+
private void increment(int operations, long bytes) {
247+
assert closed.get() == false;
248+
long combinedBytes = currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
249+
long replicaWriteBytes = currentReplicaBytes.get();
250+
long totalBytes = combinedBytes + replicaWriteBytes;
251+
if (forceExecution == false && totalBytes > coordinatingLimit) {
252+
long bytesWithoutOperation = combinedBytes - bytes;
253+
long totalBytesWithoutOperation = totalBytes - bytes;
254+
currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
255+
coordinatingRejections.getAndIncrement();
256+
throw new EsRejectedExecutionException(
257+
"rejected execution of coordinating operation ["
258+
+ "coordinating_and_primary_bytes="
259+
+ bytesWithoutOperation
260+
+ ", "
261+
+ "replica_bytes="
262+
+ replicaWriteBytes
263+
+ ", "
264+
+ "all_bytes="
265+
+ totalBytesWithoutOperation
266+
+ ", "
267+
+ "coordinating_operation_bytes="
268+
+ bytes
269+
+ ", "
270+
+ "max_coordinating_bytes="
271+
+ coordinatingLimit
272+
+ "]",
273+
false
274+
);
275+
}
276+
currentOperations += operations;
277+
currentOperationsSize += bytes;
278+
logger.trace(() -> Strings.format("adding [%d] coordinating operations and [%d] bytes", operations, bytes));
279+
currentCoordinatingBytes.getAndAdd(bytes);
280+
currentCoordinatingOps.getAndAdd(operations);
281+
totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes);
282+
totalCoordinatingBytes.getAndAdd(bytes);
283+
totalCoordinatingOps.getAndAdd(operations);
284+
totalCoordinatingRequests.getAndIncrement();
285+
}
286+
287+
@Override
288+
public void close() {
289+
if (closed.compareAndSet(false, true)) {
290+
logger.trace(
291+
() -> Strings.format("removing [%d] coordinating operations and [%d] bytes", currentOperations, currentOperationsSize)
292+
);
293+
currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-currentOperationsSize);
294+
currentCoordinatingBytes.getAndAdd(-currentOperationsSize);
295+
currentCoordinatingOps.getAndAdd(-currentOperations);
296+
currentOperationsSize = 0;
297+
currentOperations = 0;
298+
} else {
299+
logger.error("IndexingPressure memory is adjusted twice", new IllegalStateException("Releasable is called twice"));
300+
assert false : "IndexingPressure is adjusted twice";
301+
}
169302
}
170-
logger.trace(() -> Strings.format("adding [%d] coordinating operations and [%d] bytes", operations, bytes));
171-
currentCoordinatingBytes.getAndAdd(bytes);
172-
currentCoordinatingOps.getAndAdd(operations);
173-
totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes);
174-
totalCoordinatingBytes.getAndAdd(bytes);
175-
totalCoordinatingOps.getAndAdd(operations);
176-
totalCoordinatingRequests.getAndIncrement();
177-
return wrapReleasable(() -> {
178-
logger.trace(() -> Strings.format("removing [%d] coordinating operations and [%d] bytes", operations, bytes));
179-
this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
180-
this.currentCoordinatingBytes.getAndAdd(-bytes);
181-
this.currentCoordinatingOps.getAndAdd(-operations);
182-
});
183303
}
184304

185305
public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(int operations, long bytes) {
@@ -266,21 +386,6 @@ public Releasable markReplicaOperationStarted(int operations, long bytes, boolea
266386
});
267387
}
268388

269-
public boolean shouldSplitBulk(long size) {
270-
long currentUsage = (currentCombinedCoordinatingAndPrimaryBytes.get() + currentReplicaBytes.get());
271-
if (currentUsage >= highWatermark && size >= highWatermarkSize) {
272-
highWaterMarkSplits.getAndIncrement();
273-
logger.trace(() -> Strings.format("Split bulk due to high watermark: current bytes [%d] and size [%d]", currentUsage, size));
274-
return (true);
275-
}
276-
if (currentUsage >= lowWatermark && size >= lowWatermarkSize) {
277-
lowWaterMarkSplits.getAndIncrement();
278-
logger.trace(() -> Strings.format("Split bulk due to low watermark: current bytes [%d] and size [%d]", currentUsage, size));
279-
return (true);
280-
}
281-
return (false);
282-
}
283-
284389
public IndexingPressureStats stats() {
285390
return new IndexingPressureStats(
286391
totalCombinedCoordinatingAndPrimaryBytes.get(),

0 commit comments

Comments
 (0)