Skip to content

Commit 0420300

Browse files
DaveCTurneralbertzaharovits
authored andcommitted
Further reduce allocations in TransportGetSnapshotsAction (#110817)
Collecting the list of snapshot IDs over which to iterate within each repository today involves several other potentially-large intermediate collections and a bunch of other unnecessary allocations. This commit replaces those temporary collections with an iterator which saves all this temporary memory usage. Relates ES-8906
1 parent 134ff6f commit 0420300

File tree

3 files changed

+176
-57
lines changed

3 files changed

+176
-57
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java

Lines changed: 99 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.util.Collections;
5858
import java.util.HashMap;
5959
import java.util.HashSet;
60+
import java.util.Iterator;
6061
import java.util.List;
6162
import java.util.Map;
6263
import java.util.Queue;
@@ -248,18 +249,8 @@ void getMultipleReposSnapshotInfo(ActionListener<GetSnapshotsResponse> listener)
248249
return;
249250
}
250251

251-
SubscribableListener
252-
253-
.<RepositoryData>newForked(repositoryDataListener -> {
254-
if (snapshotNamePredicate == SnapshotNamePredicate.MATCH_CURRENT_ONLY) {
255-
repositoryDataListener.onResponse(null);
256-
} else {
257-
repositoriesService.repository(repoName).getRepositoryData(executor, repositoryDataListener);
258-
}
259-
})
260-
252+
SubscribableListener.<RepositoryData>newForked(l -> maybeGetRepositoryData(repoName, l))
261253
.<Void>andThen((l, repositoryData) -> loadSnapshotInfos(repoName, repositoryData, l))
262-
263254
.addListener(listeners.acquire());
264255
}
265256
}
@@ -268,6 +259,14 @@ void getMultipleReposSnapshotInfo(ActionListener<GetSnapshotsResponse> listener)
268259
.addListener(listener.map(ignored -> buildResponse()), executor, threadPool.getThreadContext());
269260
}
270261

262+
private void maybeGetRepositoryData(String repositoryName, ActionListener<RepositoryData> listener) {
263+
if (snapshotNamePredicate == SnapshotNamePredicate.MATCH_CURRENT_ONLY) {
264+
listener.onResponse(null);
265+
} else {
266+
repositoriesService.repository(repositoryName).getRepositoryData(executor, listener);
267+
}
268+
}
269+
271270
private boolean skipRepository(String repositoryName) {
272271
if (sortBy == SnapshotSortKey.REPOSITORY && fromSortValue != null) {
273272
// If we are sorting by repository name with an offset given by fromSortValue, skip earlier repositories
@@ -277,61 +276,101 @@ private boolean skipRepository(String repositoryName) {
277276
}
278277
}
279278

280-
private void loadSnapshotInfos(String repo, @Nullable RepositoryData repositoryData, ActionListener<Void> listener) {
279+
private void loadSnapshotInfos(String repositoryName, @Nullable RepositoryData repositoryData, ActionListener<Void> listener) {
281280
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.MANAGEMENT);
282281

283-
if (cancellableTask.notifyIfCancelled(listener)) {
284-
return;
285-
}
286-
287-
final Set<String> unmatchedRequiredNames = new HashSet<>(snapshotNamePredicate.requiredNames());
288-
final Set<Snapshot> toResolve = new HashSet<>();
289-
290-
for (final var snapshotInProgress : snapshotsInProgress.forRepo(repo)) {
291-
final var snapshotName = snapshotInProgress.snapshot().getSnapshotId().getName();
292-
unmatchedRequiredNames.remove(snapshotName);
293-
if (snapshotNamePredicate.test(snapshotName, true)) {
294-
toResolve.add(snapshotInProgress.snapshot());
295-
}
296-
}
297-
298-
if (repositoryData != null) {
299-
for (final var snapshotId : repositoryData.getSnapshotIds()) {
300-
final var snapshotName = snapshotId.getName();
301-
unmatchedRequiredNames.remove(snapshotName);
302-
if (snapshotNamePredicate.test(snapshotName, false) && matchesPredicates(snapshotId, repositoryData)) {
303-
toResolve.add(new Snapshot(repo, snapshotId));
304-
}
305-
}
306-
}
307-
308-
if (unmatchedRequiredNames.isEmpty() == false) {
309-
throw new SnapshotMissingException(repo, unmatchedRequiredNames.iterator().next());
310-
}
282+
cancellableTask.ensureNotCancelled();
283+
ensureRequiredNamesPresent(repositoryName, repositoryData);
311284

312285
if (verbose) {
313-
loadSnapshotInfos(repo, toResolve.stream().map(Snapshot::getSnapshotId).toList(), listener);
286+
loadSnapshotInfos(repositoryName, getSnapshotIdIterator(repositoryName, repositoryData), listener);
314287
} else {
315288
assert fromSortValuePredicates.isMatchAll() : "filtering is not supported in non-verbose mode";
316289
assert slmPolicyPredicate == SlmPolicyPredicate.MATCH_ALL_POLICIES : "filtering is not supported in non-verbose mode";
317290

318291
addSimpleSnapshotInfos(
319-
toResolve,
320-
repo,
292+
getSnapshotIdIterator(repositoryName, repositoryData),
293+
repositoryName,
321294
repositoryData,
322-
snapshotsInProgress.forRepo(repo).stream().map(entry -> SnapshotInfo.inProgress(entry).basic()).toList()
295+
snapshotsInProgress.forRepo(repositoryName).stream().map(entry -> SnapshotInfo.inProgress(entry).basic()).toList()
323296
);
324297
listener.onResponse(null);
325298
}
326299
}
327300

328-
private void loadSnapshotInfos(String repositoryName, Collection<SnapshotId> snapshotIds, ActionListener<Void> listener) {
301+
/**
302+
* Check that the repository contains every <i>required</i> name according to {@link #snapshotNamePredicate}.
303+
*
304+
* @throws SnapshotMissingException if one or more required names are missing.
305+
*/
306+
private void ensureRequiredNamesPresent(String repositoryName, @Nullable RepositoryData repositoryData) {
307+
if (snapshotNamePredicate.requiredNames().isEmpty()) {
308+
return;
309+
}
310+
311+
final var unmatchedRequiredNames = new HashSet<>(snapshotNamePredicate.requiredNames());
312+
for (final var snapshotInProgress : snapshotsInProgress.forRepo(repositoryName)) {
313+
unmatchedRequiredNames.remove(snapshotInProgress.snapshot().getSnapshotId().getName());
314+
}
315+
if (unmatchedRequiredNames.isEmpty()) {
316+
return;
317+
}
318+
if (repositoryData != null) {
319+
for (final var snapshotId : repositoryData.getSnapshotIds()) {
320+
unmatchedRequiredNames.remove(snapshotId.getName());
321+
}
322+
if (unmatchedRequiredNames.isEmpty()) {
323+
return;
324+
}
325+
}
326+
throw new SnapshotMissingException(repositoryName, unmatchedRequiredNames.iterator().next());
327+
}
328+
329+
/**
330+
* @return an iterator over the snapshot IDs in the given repository which match {@link #snapshotNamePredicate}.
331+
*/
332+
private Iterator<SnapshotId> getSnapshotIdIterator(String repositoryName, @Nullable RepositoryData repositoryData) {
333+
334+
// now iterate through the snapshots again, returning matching IDs (or null)
335+
final Set<SnapshotId> matchingInProgressSnapshots = new HashSet<>();
336+
return Iterators.concat(
337+
// matching in-progress snapshots first
338+
Iterators.filter(
339+
Iterators.map(
340+
snapshotsInProgress.forRepo(repositoryName).iterator(),
341+
snapshotInProgress -> snapshotInProgress.snapshot().getSnapshotId()
342+
),
343+
snapshotId -> {
344+
if (snapshotNamePredicate.test(snapshotId.getName(), true)) {
345+
matchingInProgressSnapshots.add(snapshotId);
346+
return true;
347+
} else {
348+
return false;
349+
}
350+
}
351+
),
352+
repositoryData == null
353+
// only returning in-progress snapshots
354+
? Collections.emptyIterator()
355+
// also return matching completed snapshots (except any ones that were also found to be in-progress)
356+
: Iterators.filter(
357+
repositoryData.getSnapshotIds().iterator(),
358+
snapshotId -> matchingInProgressSnapshots.contains(snapshotId) == false
359+
&& snapshotNamePredicate.test(snapshotId.getName(), false)
360+
&& matchesPredicates(snapshotId, repositoryData)
361+
)
362+
);
363+
}
364+
365+
private void loadSnapshotInfos(String repositoryName, Iterator<SnapshotId> snapshotIdIterator, ActionListener<Void> listener) {
329366
if (cancellableTask.notifyIfCancelled(listener)) {
330367
return;
331368
}
332369
final AtomicInteger repositoryTotalCount = new AtomicInteger();
333-
final List<SnapshotInfo> snapshots = new ArrayList<>(snapshotIds.size());
334-
final Set<SnapshotId> snapshotIdsToIterate = new HashSet<>(snapshotIds);
370+
final Set<SnapshotId> snapshotIdsToIterate = new HashSet<>();
371+
snapshotIdIterator.forEachRemaining(snapshotIdsToIterate::add);
372+
373+
final List<SnapshotInfo> snapshots = new ArrayList<>(snapshotIdsToIterate.size());
335374
// first, look at the snapshots in progress
336375
final List<SnapshotsInProgress.Entry> entries = SnapshotsService.currentSnapshots(
337376
snapshotsInProgress,
@@ -409,7 +448,7 @@ public void onFailure(Exception e) {
409448
}
410449
})
411450

412-
// no need to synchronize access to snapshots: Repository#getSnapshotInfo fails fast but we're on the success path here
451+
// no need to synchronize access to snapshots: all writes happen-before this read
413452
.andThenAccept(ignored -> addResults(repositoryTotalCount.get(), snapshots))
414453

415454
.addListener(listener);
@@ -422,9 +461,9 @@ private void addResults(int repositoryTotalCount, List<SnapshotInfo> snapshots)
422461
}
423462

424463
private void addSimpleSnapshotInfos(
425-
final Set<Snapshot> toResolve,
426-
final String repoName,
427-
final RepositoryData repositoryData,
464+
final Iterator<SnapshotId> snapshotIdIterator,
465+
final String repositoryName,
466+
@Nullable final RepositoryData repositoryData,
428467
final List<SnapshotInfo> currentSnapshots
429468
) {
430469
if (repositoryData == null) {
@@ -433,11 +472,14 @@ private void addSimpleSnapshotInfos(
433472
return;
434473
} // else want non-current snapshots as well, which are found in the repository data
435474

475+
final Set<SnapshotId> toResolve = new HashSet<>();
476+
snapshotIdIterator.forEachRemaining(toResolve::add);
477+
436478
List<SnapshotInfo> snapshotInfos = new ArrayList<>(currentSnapshots.size() + toResolve.size());
437479
int repositoryTotalCount = 0;
438480
for (SnapshotInfo snapshotInfo : currentSnapshots) {
439481
assert snapshotInfo.startTime() == 0L && snapshotInfo.endTime() == 0L && snapshotInfo.totalShards() == 0L : snapshotInfo;
440-
if (toResolve.remove(snapshotInfo.snapshot())) {
482+
if (toResolve.remove(snapshotInfo.snapshot().getSnapshotId())) {
441483
repositoryTotalCount += 1;
442484
if (afterPredicate.test(snapshotInfo)) {
443485
snapshotInfos.add(snapshotInfo);
@@ -448,19 +490,19 @@ private void addSimpleSnapshotInfos(
448490
if (indices) {
449491
for (IndexId indexId : repositoryData.getIndices().values()) {
450492
for (SnapshotId snapshotId : repositoryData.getSnapshots(indexId)) {
451-
if (toResolve.contains(new Snapshot(repoName, snapshotId))) {
493+
if (toResolve.contains(snapshotId)) {
452494
snapshotsToIndices.computeIfAbsent(snapshotId, (k) -> new ArrayList<>()).add(indexId.getName());
453495
}
454496
}
455497
}
456498
}
457-
for (Snapshot snapshot : toResolve) {
499+
for (SnapshotId snapshotId : toResolve) {
458500
final var snapshotInfo = new SnapshotInfo(
459-
snapshot,
460-
snapshotsToIndices.getOrDefault(snapshot.getSnapshotId(), Collections.emptyList()),
501+
new Snapshot(repositoryName, snapshotId),
502+
snapshotsToIndices.getOrDefault(snapshotId, Collections.emptyList()),
461503
Collections.emptyList(),
462504
Collections.emptyList(),
463-
repositoryData.getSnapshotState(snapshot.getSnapshotId())
505+
repositoryData.getSnapshotState(snapshotId)
464506
);
465507
repositoryTotalCount += 1;
466508
if (afterPredicate.test(snapshotInfo)) {

server/src/main/java/org/elasticsearch/common/collect/Iterators.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.function.Consumer;
2222
import java.util.function.Function;
2323
import java.util.function.IntFunction;
24+
import java.util.function.Predicate;
2425
import java.util.function.Supplier;
2526
import java.util.function.ToIntFunction;
2627

@@ -179,6 +180,59 @@ public void forEachRemaining(Consumer<? super U> action) {
179180
}
180181
}
181182

183+
/**
184+
* @param input An iterator over <i>non-null</i> values.
185+
* @param predicate The predicate with which to filter the input.
186+
* @return an iterator which returns the values from {@code input} which match {@code predicate}.
187+
*/
188+
public static <T> Iterator<T> filter(Iterator<? extends T> input, Predicate<T> predicate) {
189+
while (input.hasNext()) {
190+
final var value = input.next();
191+
assert value != null;
192+
if (predicate.test(value)) {
193+
return new FilterIterator<>(value, input, predicate);
194+
}
195+
}
196+
return Collections.emptyIterator();
197+
}
198+
199+
private static final class FilterIterator<T> implements Iterator<T> {
200+
private final Iterator<? extends T> input;
201+
private final Predicate<T> predicate;
202+
private T next;
203+
204+
FilterIterator(T value, Iterator<? extends T> input, Predicate<T> predicate) {
205+
this.next = value;
206+
this.input = input;
207+
this.predicate = predicate;
208+
assert next != null;
209+
assert predicate.test(next);
210+
}
211+
212+
@Override
213+
public boolean hasNext() {
214+
return next != null;
215+
}
216+
217+
@Override
218+
public T next() {
219+
if (hasNext() == false) {
220+
throw new NoSuchElementException();
221+
}
222+
final var value = next;
223+
while (input.hasNext()) {
224+
final var laterValue = input.next();
225+
assert laterValue != null;
226+
if (predicate.test(laterValue)) {
227+
next = laterValue;
228+
return value;
229+
}
230+
}
231+
next = null;
232+
return value;
233+
}
234+
}
235+
182236
public static <T, U> Iterator<U> flatMap(Iterator<? extends T> input, Function<T, Iterator<? extends U>> fn) {
183237
while (input.hasNext()) {
184238
final var value = fn.apply(input.next());

server/src/test/java/org/elasticsearch/common/collect/IteratorsTests.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.common.collect;
1010

1111
import org.elasticsearch.common.Randomness;
12+
import org.elasticsearch.core.Assertions;
1213
import org.elasticsearch.core.Tuple;
1314
import org.elasticsearch.test.ESTestCase;
1415

@@ -23,6 +24,7 @@
2324
import java.util.concurrent.atomic.AtomicBoolean;
2425
import java.util.concurrent.atomic.AtomicInteger;
2526
import java.util.function.BiPredicate;
27+
import java.util.function.Predicate;
2628
import java.util.function.ToIntFunction;
2729
import java.util.stream.IntStream;
2830

@@ -219,6 +221,27 @@ public void testMap() {
219221
assertEquals(array.length, index.get());
220222
}
221223

224+
public void testFilter() {
225+
assertSame(Collections.emptyIterator(), Iterators.filter(Collections.emptyIterator(), i -> fail(null, "not called")));
226+
227+
final var array = randomIntegerArray();
228+
assertSame(Collections.emptyIterator(), Iterators.filter(Iterators.forArray(array), i -> false));
229+
230+
final var threshold = array.length > 0 && randomBoolean() ? randomFrom(array) : randomIntBetween(0, 1000);
231+
final Predicate<Integer> predicate = i -> i <= threshold;
232+
final var expectedResults = Arrays.stream(array).filter(predicate).toList();
233+
final var index = new AtomicInteger();
234+
Iterators.filter(Iterators.forArray(array), predicate)
235+
.forEachRemaining(i -> assertEquals(expectedResults.get(index.getAndIncrement()), i));
236+
237+
if (Assertions.ENABLED) {
238+
final var predicateCalled = new AtomicBoolean();
239+
final var inputIterator = Iterators.forArray(new Object[] { null });
240+
expectThrows(AssertionError.class, () -> Iterators.filter(inputIterator, i -> predicateCalled.compareAndSet(false, true)));
241+
assertFalse(predicateCalled.get());
242+
}
243+
}
244+
222245
public void testFailFast() {
223246
final var array = randomIntegerArray();
224247
assertEmptyIterator(Iterators.failFast(Iterators.forArray(array), () -> true));

0 commit comments

Comments
 (0)