53
53
import org .elasticsearch .transport .TransportService ;
54
54
55
55
import java .util .ArrayList ;
56
- import java .util .Collection ;
57
56
import java .util .Collections ;
58
57
import java .util .HashMap ;
59
58
import java .util .HashSet ;
60
59
import java .util .Iterator ;
61
60
import java .util .List ;
62
61
import java .util .Map ;
63
- import java .util .Queue ;
64
62
import java .util .Set ;
65
63
import java .util .concurrent .atomic .AtomicInteger ;
66
64
import java .util .function .BiPredicate ;
@@ -182,19 +180,13 @@ private class GetSnapshotsOperation {
182
180
private final GetSnapshotInfoExecutor getSnapshotInfoExecutor ;
183
181
184
182
// results
185
- private final Queue < List <SnapshotInfo >> allSnapshotInfos = ConcurrentCollections . newQueue ( );
183
+ private final List <SnapshotInfo > allSnapshotInfos = Collections . synchronizedList ( new ArrayList <>() );
186
184
187
185
/**
188
186
* Accumulates number of snapshots that match the name/fromSortValue/slmPolicy predicates, to be returned in the response.
189
187
*/
190
188
private final AtomicInteger totalCount = new AtomicInteger ();
191
189
192
- /**
193
- * Accumulates the number of snapshots that match the name/fromSortValue/slmPolicy/after predicates, for sizing the final result
194
- * list.
195
- */
196
- private final AtomicInteger resultsCount = new AtomicInteger ();
197
-
198
190
GetSnapshotsOperation (
199
191
CancellableTask cancellableTask ,
200
192
List <RepositoryMetadata > repositories ,
@@ -454,18 +446,7 @@ private void loadSnapshotInfos(Iterator<AsyncSnapshotInfo> asyncSnapshotInfoIter
454
446
if (cancellableTask .notifyIfCancelled (listener )) {
455
447
return ;
456
448
}
457
- final var repositoryTotalCount = new AtomicInteger ();
458
-
459
- final List <SnapshotInfo > snapshots = new ArrayList <>();
460
- final List <SnapshotInfo > syncSnapshots = Collections .synchronizedList (snapshots );
461
-
462
449
try (var listeners = new RefCountingListener (listener )) {
463
- final var iterationCompleteListener = listeners .acquire (ignored -> {
464
- totalCount .addAndGet (repositoryTotalCount .get ());
465
- // no need to synchronize access to snapshots: all writes happen-before this read
466
- resultsCount .addAndGet (snapshots .size ());
467
- allSnapshotInfos .add (snapshots );
468
- });
469
450
ThrottledIterator .run (
470
451
Iterators .failFast (asyncSnapshotInfoIterator , () -> cancellableTask .isCancelled () || listeners .isFailing ()),
471
452
(ref , asyncSnapshotInfo ) -> {
@@ -474,9 +455,9 @@ private void loadSnapshotInfos(Iterator<AsyncSnapshotInfo> asyncSnapshotInfoIter
474
455
@ Override
475
456
public void onResponse (SnapshotInfo snapshotInfo ) {
476
457
if (matchesPredicates (snapshotInfo )) {
477
- repositoryTotalCount .incrementAndGet ();
458
+ totalCount .incrementAndGet ();
478
459
if (afterPredicate .test (snapshotInfo )) {
479
- syncSnapshots .add (snapshotInfo .maybeWithoutIndices (indices ));
460
+ allSnapshotInfos .add (snapshotInfo .maybeWithoutIndices (indices ));
480
461
}
481
462
}
482
463
refListener .onResponse (null );
@@ -495,7 +476,7 @@ public void onFailure(Exception e) {
495
476
},
496
477
getSnapshotInfoExecutor .getMaxRunningTasks (),
497
478
() -> {},
498
- () -> iterationCompleteListener . onResponse ( null )
479
+ () -> {}
499
480
);
500
481
}
501
482
}
@@ -505,12 +486,11 @@ private GetSnapshotsResponse buildResponse() {
505
486
cancellableTask .ensureNotCancelled ();
506
487
int remaining = 0 ;
507
488
final var resultsStream = allSnapshotInfos .stream ()
508
- .flatMap (Collection ::stream )
509
489
.peek (this ::assertSatisfiesAllPredicates )
510
490
.sorted (sortBy .getSnapshotInfoComparator (order ))
511
491
.skip (offset );
512
492
final List <SnapshotInfo > snapshotInfos ;
513
- if (size == GetSnapshotsRequest .NO_LIMIT || resultsCount . get () <= size ) {
493
+ if (size == GetSnapshotsRequest .NO_LIMIT || allSnapshotInfos . size () <= size ) {
514
494
snapshotInfos = resultsStream .toList ();
515
495
} else {
516
496
snapshotInfos = new ArrayList <>(size );
0 commit comments