Skip to content

Commit b46dd1c

Browse files
authored
Simplify check to split bulk request (#124035)
Use an optional to check if bulk operation should be split.
1 parent d61b864 commit b46dd1c

File tree

3 files changed

+37
-29
lines changed

3 files changed

+37
-29
lines changed

server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.ArrayList;
2727
import java.util.Collections;
2828
import java.util.List;
29+
import java.util.Optional;
2930
import java.util.concurrent.atomic.AtomicBoolean;
3031
import java.util.function.Supplier;
3132

@@ -133,8 +134,9 @@ public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runn
133134
} else {
134135
assert bulkRequest != null;
135136
if (internalAddItems(items, releasable)) {
136-
if (incrementalOperation.shouldSplit()) {
137-
IndexingPressure.Coordinating coordinating = incrementalOperation.split();
137+
Optional<Releasable> maybeSplit = incrementalOperation.maybeSplit();
138+
if (maybeSplit.isPresent()) {
139+
Releasable coordinating = maybeSplit.get();
138140
final boolean isFirstRequest = incrementalRequestSubmitted == false;
139141
incrementalRequestSubmitted = true;
140142
final ArrayList<Releasable> toRelease = new ArrayList<>(releasables);
@@ -156,8 +158,8 @@ public void onFailure(Exception e) {
156158
}
157159
}, () -> {
158160
bulkInProgress = false;
159-
coordinating.close();
160161
toRelease.forEach(Releasable::close);
162+
coordinating.close();
161163
nextItems.run();
162164
}));
163165
} else {
@@ -177,7 +179,7 @@ public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, Act
177179
} else {
178180
assert bulkRequest != null;
179181
if (internalAddItems(items, releasable)) {
180-
IndexingPressure.Coordinating coordinating = incrementalOperation.split();
182+
Releasable coordinating = incrementalOperation.split();
181183
final ArrayList<Releasable> toRelease = new ArrayList<>(releasables);
182184
releasables.clear();
183185
// We do not need to set this back to false as this will be the last request.
@@ -198,8 +200,8 @@ public void onFailure(Exception e) {
198200
errorResponse(listener);
199201
}
200202
}, () -> {
201-
coordinating.close();
202203
toRelease.forEach(Releasable::close);
204+
coordinating.close();
203205
}));
204206
} else {
205207
errorResponse(listener);

server/src/main/java/org/elasticsearch/index/IndexingPressure.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.core.Releasable;
2020
import org.elasticsearch.index.stats.IndexingPressureStats;
2121

22+
import java.util.Optional;
2223
import java.util.concurrent.atomic.AtomicBoolean;
2324
import java.util.concurrent.atomic.AtomicLong;
2425

@@ -189,7 +190,7 @@ public long currentOperationsSize() {
189190
return coordinating.currentOperationsSize;
190191
}
191192

192-
public boolean shouldSplit() {
193+
public Optional<Releasable> maybeSplit() {
193194
long currentUsage = (currentCombinedCoordinatingAndPrimaryBytes.get() + currentReplicaBytes.get());
194195
long currentOperationsSize = coordinating.currentOperationsSize;
195196
if (currentUsage >= highWatermark && currentOperationsSize >= highWatermarkSize) {
@@ -201,7 +202,7 @@ public boolean shouldSplit() {
201202
currentOperationsSize
202203
)
203204
);
204-
return true;
205+
return Optional.of(split());
205206
}
206207
if (currentUsage >= lowWatermark && currentOperationsSize >= lowWatermarkSize) {
207208
lowWaterMarkSplits.getAndIncrement();
@@ -212,12 +213,12 @@ public boolean shouldSplit() {
212213
currentOperationsSize
213214
)
214215
);
215-
return true;
216+
return Optional.of(split());
216217
}
217-
return false;
218+
return Optional.empty();
218219
}
219220

220-
public Coordinating split() {
221+
public Releasable split() {
221222
Coordinating toReturn = coordinating;
222223
coordinating = new Coordinating(forceExecution);
223224
return toReturn;

server/src/test/java/org/elasticsearch/index/IndexingPressureTests.java

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import org.elasticsearch.test.ESTestCase;
1818
import org.hamcrest.Matchers;
1919

20+
import java.util.Optional;
21+
2022
public class IndexingPressureTests extends ESTestCase {
2123

2224
private final Settings settings = Settings.builder()
@@ -58,28 +60,31 @@ public void testHighAndLowWatermarkSplits() {
5860
false
5961
)
6062
) {
61-
assertFalse(coordinating1.shouldSplit());
62-
assertFalse(coordinating2.shouldSplit());
63+
assertFalse(coordinating1.maybeSplit().isPresent());
64+
assertFalse(coordinating2.maybeSplit().isPresent());
6365
assertEquals(indexingPressure.stats().getHighWaterMarkSplits(), 0L);
6466
assertEquals(indexingPressure.stats().getLowWaterMarkSplits(), 0L);
65-
assertTrue(coordinating3.shouldSplit());
66-
assertEquals(indexingPressure.stats().getHighWaterMarkSplits(), 0L);
67-
assertEquals(indexingPressure.stats().getLowWaterMarkSplits(), 1L);
68-
69-
try (
70-
Releasable ignored2 = indexingPressure.markCoordinatingOperationStarted(
71-
10,
72-
1 + (9 * 1024) - indexingPressure.stats().getCurrentCoordinatingBytes(),
73-
false
74-
)
75-
) {
76-
assertFalse(coordinating1.shouldSplit());
77-
assertTrue(coordinating2.shouldSplit());
78-
assertEquals(indexingPressure.stats().getHighWaterMarkSplits(), 1L);
67+
Optional<Releasable> split1 = coordinating3.maybeSplit();
68+
assertTrue(split1.isPresent());
69+
try (Releasable ignored2 = split1.get()) {
70+
assertEquals(indexingPressure.stats().getHighWaterMarkSplits(), 0L);
7971
assertEquals(indexingPressure.stats().getLowWaterMarkSplits(), 1L);
80-
assertTrue(coordinating3.shouldSplit());
81-
assertEquals(indexingPressure.stats().getLowWaterMarkSplits(), 1L);
82-
assertEquals(indexingPressure.stats().getHighWaterMarkSplits(), 2L);
72+
73+
try (
74+
Releasable ignored3 = indexingPressure.markCoordinatingOperationStarted(
75+
10,
76+
1 + (9 * 1024) - indexingPressure.stats().getCurrentCoordinatingBytes(),
77+
false
78+
)
79+
) {
80+
assertFalse(coordinating1.maybeSplit().isPresent());
81+
Optional<Releasable> split2 = coordinating2.maybeSplit();
82+
assertTrue(split2.isPresent());
83+
try (Releasable ignored4 = split2.get()) {
84+
assertEquals(indexingPressure.stats().getHighWaterMarkSplits(), 1L);
85+
assertEquals(indexingPressure.stats().getLowWaterMarkSplits(), 1L);
86+
}
87+
}
8388
}
8489
}
8590
}

0 commit comments

Comments
 (0)