Skip to content

Commit 77dd711

Browse files
committed
Tidy up GroupedActionListener (#39633)
Today the `GroupedActionListener` accepts a `defaults` parameter but all callers pass an empty list. Also it is permitted to pass an empty group but this is trappy because the delegated listener is never be called in that case. This commit removes the `defaults` parameter and forbids an empty group.
1 parent c91dcbd commit 77dd711

File tree

9 files changed

+23
-36
lines changed

9 files changed

+23
-36
lines changed

server/src/main/java/org/elasticsearch/action/support/GroupedActionListener.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
import java.util.concurrent.atomic.AtomicReference;
3030

3131
/**
32-
* An action listener that delegates it's results to another listener once
33-
* it has received one or more failures or N results. This allows synchronous
32+
* An action listener that delegates its results to another listener once
33+
* it has received N results (either successes or failures). This allows synchronous
3434
* tasks to be forked off in a loop with the same listener and respond to a
3535
* higher level listener once all tasks responded.
3636
*/
@@ -39,20 +39,20 @@ public final class GroupedActionListener<T> implements ActionListener<T> {
3939
private final AtomicInteger pos = new AtomicInteger();
4040
private final AtomicArray<T> results;
4141
private final ActionListener<Collection<T>> delegate;
42-
private final Collection<T> defaults;
4342
private final AtomicReference<Exception> failure = new AtomicReference<>();
4443

4544
/**
4645
* Creates a new listener
4746
* @param delegate the delegate listener
4847
* @param groupSize the group size
4948
*/
50-
public GroupedActionListener(ActionListener<Collection<T>> delegate, int groupSize,
51-
Collection<T> defaults) {
49+
public GroupedActionListener(ActionListener<Collection<T>> delegate, int groupSize) {
50+
if (groupSize <= 0) {
51+
throw new IllegalArgumentException("groupSize must be greater than 0 but was " + groupSize);
52+
}
5253
results = new AtomicArray<>(groupSize);
5354
countDown = new CountDown(groupSize);
5455
this.delegate = delegate;
55-
this.defaults = defaults;
5656
}
5757

5858
@Override
@@ -63,7 +63,6 @@ public void onResponse(T element) {
6363
delegate.onFailure(failure.get());
6464
} else {
6565
List<T> collect = this.results.asList();
66-
collect.addAll(defaults);
6766
delegate.onResponse(Collections.unmodifiableList(collect));
6867
}
6968
}

server/src/test/java/org/elasticsearch/action/support/GroupedActionListenerTests.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,7 @@ public void onFailure(Exception e) {
5151
};
5252
final int groupSize = randomIntBetween(10, 1000);
5353
AtomicInteger count = new AtomicInteger();
54-
Collection<Integer> defaults = randomBoolean() ? Collections.singletonList(-1) :
55-
Collections.emptyList();
56-
GroupedActionListener<Integer> listener = new GroupedActionListener<>(result, groupSize,
57-
defaults);
54+
GroupedActionListener<Integer> listener = new GroupedActionListener<>(result, groupSize);
5855
int numThreads = randomIntBetween(2, 5);
5956
Thread[] threads = new Thread[numThreads];
6057
CyclicBarrier barrier = new CyclicBarrier(numThreads);
@@ -65,7 +62,7 @@ public void onFailure(Exception e) {
6562
} catch (Exception e) {
6663
throw new AssertionError(e);
6764
}
68-
int c = 0;
65+
int c;
6966
while((c = count.incrementAndGet()) <= groupSize) {
7067
listener.onResponse(c-1);
7168
}
@@ -78,10 +75,9 @@ public void onFailure(Exception e) {
7875
assertNotNull(resRef.get());
7976
ArrayList<Integer> list = new ArrayList<>(resRef.get());
8077
Collections.sort(list);
81-
int expectedSize = groupSize + defaults.size();
82-
assertEquals(expectedSize, resRef.get().size());
83-
int expectedValue = defaults.isEmpty() ? 0 : -1;
84-
for (int i = 0; i < expectedSize; i++) {
78+
assertEquals(groupSize, resRef.get().size());
79+
int expectedValue = 0;
80+
for (int i = 0; i < groupSize; i++) {
8581
assertEquals(Integer.valueOf(expectedValue++), list.get(i));
8682
}
8783
}
@@ -101,9 +97,8 @@ public void onFailure(Exception e) {
10197
excRef.set(e);
10298
}
10399
};
104-
Collection<Integer> defaults = randomBoolean() ? Collections.singletonList(-1) : Collections.emptyList();
105100
int size = randomIntBetween(3, 4);
106-
GroupedActionListener<Integer> listener = new GroupedActionListener<>(result, size, defaults);
101+
GroupedActionListener<Integer> listener = new GroupedActionListener<>(result, size);
107102
listener.onResponse(0);
108103
IOException ioException = new IOException();
109104
RuntimeException rtException = new RuntimeException();
@@ -124,8 +119,7 @@ public void onFailure(Exception e) {
124119
public void testConcurrentFailures() throws InterruptedException {
125120
AtomicReference<Exception> finalException = new AtomicReference<>();
126121
int numGroups = randomIntBetween(10, 100);
127-
GroupedActionListener<Void> listener = new GroupedActionListener<>(
128-
ActionListener.wrap(r -> {}, finalException::set), numGroups, Collections.emptyList());
122+
GroupedActionListener<Void> listener = new GroupedActionListener<>(ActionListener.wrap(r -> {}, finalException::set), numGroups);
129123
ExecutorService executorService = Executors.newFixedThreadPool(numGroups);
130124
for (int i = 0; i < numGroups; i++) {
131125
executorService.submit(() -> listener.onFailure(new IOException()));

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ private void checkAutoFollowPattern(String autoFollowPattenName,
433433
ActionListener.wrap(
434434
rs -> resultHandler.accept(new AutoFollowResult(autoFollowPattenName, new ArrayList<>(rs))),
435435
e -> { throw new AssertionError("must never happen", e); }),
436-
leaderIndicesToFollow.size(), Collections.emptyList());
436+
leaderIndicesToFollow.size());
437437

438438
for (final Index indexToFollow : leaderIndicesToFollow) {
439439
List<String> otherMatchingPatterns = patternsForTheSameRemoteCluster.stream()

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
4242

4343
import java.util.Collection;
44-
import java.util.Collections;
4544
import java.util.Map;
4645
import java.util.Objects;
4746

@@ -137,8 +136,8 @@ public void onFailure(final Exception e) {
137136
}
138137

139138
},
140-
numberOfShards,
141-
Collections.emptyList());
139+
numberOfShards
140+
);
142141
for (int i = 0; i < numberOfShards; i++) {
143142
final ShardId followerShardId = new ShardId(indexMetaData.getIndex(), i);
144143
final ShardId leaderShardId = new ShardId(leaderIndex, i);

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionAction.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.elasticsearch.xpack.security.authc.saml.SamlUtils;
2929
import org.opensaml.saml.saml2.core.LogoutResponse;
3030

31-
import java.util.Collections;
3231
import java.util.List;
3332
import java.util.Map;
3433
import java.util.Objects;
@@ -98,9 +97,7 @@ private void findAndInvalidateTokens(SamlRealm realm, SamlLogoutRequestHandler.R
9897
listener.onResponse(0);
9998
} else {
10099
GroupedActionListener<TokensInvalidationResult> groupedListener = new GroupedActionListener<>(
101-
ActionListener.wrap(collection -> listener.onResponse(collection.size()), listener::onFailure),
102-
tokens.size(), Collections.emptyList()
103-
);
100+
ActionListener.wrap(collection -> listener.onResponse(collection.size()), listener::onFailure), tokens.size());
104101
tokens.forEach(tuple -> invalidateTokenPair(tuple, groupedListener));
105102
}
106103
}, listener::onFailure

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersAction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ protected void doExecute(Task task, final GetUsersRequest request, final ActionL
7070
listener.onResponse(new GetUsersResponse(users));
7171
}, listener::onFailure);
7272
final GroupedActionListener<Collection<User>> groupListener =
73-
new GroupedActionListener<>(sendingListener, 2, Collections.emptyList());
73+
new GroupedActionListener<>(sendingListener, 2);
7474
// We have two sources for the users object, the reservedRealm and the usersStore, we query both at the same time with a
7575
// GroupedActionListener
7676
if (realmLookup.isEmpty()) {
@@ -84,8 +84,7 @@ protected void doExecute(Task task, final GetUsersRequest request, final ActionL
8484
} else {
8585
// nested group listener action here - for each of the users we got and fetch it concurrently - once we are done we notify
8686
// the "global" group listener.
87-
GroupedActionListener<User> realmGroupListener = new GroupedActionListener<>(groupListener, realmLookup.size(),
88-
Collections.emptyList());
87+
GroupedActionListener<User> realmGroupListener = new GroupedActionListener<>(groupListener, realmLookup.size());
8988
for (String user : realmLookup) {
9089
reservedRealm.lookupUser(user, realmGroupListener);
9190
}

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/mapper/CompositeRoleMapper.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import java.util.ArrayList;
99
import java.util.Arrays;
10-
import java.util.Collections;
1110
import java.util.List;
1211
import java.util.Set;
1312
import java.util.stream.Collectors;
@@ -43,7 +42,7 @@ private CompositeRoleMapper(UserRoleMapper... delegates) {
4342
public void resolveRoles(UserData user, ActionListener<Set<String>> listener) {
4443
GroupedActionListener<Set<String>> groupListener = new GroupedActionListener<>(ActionListener.wrap(
4544
composite -> listener.onResponse(composite.stream().flatMap(Set::stream).collect(Collectors.toSet())), listener::onFailure
46-
), delegates.size(), Collections.emptyList());
45+
), delegates.size());
4746
this.delegates.forEach(mapper -> mapper.resolveRoles(user, groupListener));
4847
}
4948

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,7 @@ private void authorizeBulkItems(RequestInfo requestInfo, AuthorizationInfo authz
506506
listener.onResponse(null);
507507
}, listener::onFailure);
508508
final ActionListener<Tuple<String, IndexAuthorizationResult>> groupedActionListener = wrapPreservingContext(
509-
new GroupedActionListener<>(bulkAuthzListener, actionToIndicesMap.size(), Collections.emptyList()), threadContext);
509+
new GroupedActionListener<>(bulkAuthzListener, actionToIndicesMap.size()), threadContext);
510510

511511
actionToIndicesMap.forEach((bulkItemAction, indices) -> {
512512
final RequestInfo bulkItemInfo =

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ public void putPrivileges(Collection<ApplicationPrivilegeDescriptor> privileges,
191191
.map(NativePrivilegeStore::nameFromDocId)
192192
.collect(TUPLES_TO_MAP);
193193
clearRolesCache(listener, createdNames);
194-
}, listener::onFailure), privileges.size(), Collections.emptyList());
194+
}, listener::onFailure), privileges.size());
195195
for (ApplicationPrivilegeDescriptor privilege : privileges) {
196196
innerPutPrivilege(privilege, refreshPolicy, groupListener);
197197
}
@@ -232,7 +232,7 @@ public void deletePrivileges(String application, Collection<String> names, Write
232232
.map(NativePrivilegeStore::nameFromDocId)
233233
.collect(TUPLES_TO_MAP);
234234
clearRolesCache(listener, deletedNames);
235-
}, listener::onFailure), names.size(), Collections.emptyList());
235+
}, listener::onFailure), names.size());
236236
for (String name : names) {
237237
ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN,
238238
client.prepareDelete(SECURITY_INDEX_NAME, SINGLE_MAPPING_NAME, toDocId(application, name))

0 commit comments

Comments
 (0)