Skip to content

Commit 0cd03d3

Browse files
authored
Use RoleRetrievalResult for better caching (#34197)
Security caches the result of role lookups and negative lookups are cached indefinitely. In the case of transient failures this leads to a bad experience as the roles could truly exist. The CompositeRolesStore needs to know if a failure occurred in one of the roles stores in order to make the appropriate decision as it relates to caching. In order to provide this information to the CompositeRolesStore, the return type of methods to retrieve roles has changed to a new class, RoleRetrievalResult. This class provides the ability to pass back an exception to the roles store. This exception does not mean that a request should be failed but instead serves as a signal to the roles store that missing roles should not be cached and neither should the combined role if there are missing roles. As part of this, the negative lookup cache was also changed from an unbounded cache to a cache with a configurable limit. Relates #33205
1 parent a6d1cc6 commit 0cd03d3

File tree

16 files changed

+554
-216
lines changed

16 files changed

+554
-216
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportXPackUsageAction.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,7 @@ protected XPackUsageResponse newResponse() {
5353
}
5454

5555
@Override
56-
protected void masterOperation(XPackUsageRequest request, ClusterState state, ActionListener<XPackUsageResponse> listener)
57-
throws Exception {
56+
protected void masterOperation(XPackUsageRequest request, ClusterState state, ActionListener<XPackUsageResponse> listener) {
5857
final ActionListener<List<XPackFeatureSet.Usage>> usageActionListener = new ActionListener<List<Usage>>() {
5958
@Override
6059
public void onResponse(List<Usage> usages) {
@@ -73,7 +72,8 @@ public void onFailure(Exception e) {
7372
@Override
7473
public void onResponse(Usage usage) {
7574
featureSetUsages.set(position.getAndIncrement(), usage);
76-
iteratingListener.onResponse(null); // just send null back and keep iterating
75+
// the value sent back doesn't matter since our predicate keeps iterating
76+
iteratingListener.onResponse(Collections.emptyList());
7777
}
7878

7979
@Override
@@ -84,13 +84,13 @@ public void onFailure(Exception e) {
8484
};
8585
IteratingActionListener<List<XPackFeatureSet.Usage>, XPackFeatureSet> iteratingActionListener =
8686
new IteratingActionListener<>(usageActionListener, consumer, featureSets,
87-
threadPool.getThreadContext(), () -> {
87+
threadPool.getThreadContext(), (ignore) -> {
8888
final List<Usage> usageList = new ArrayList<>(featureSetUsages.length());
8989
for (int i = 0; i < featureSetUsages.length(); i++) {
9090
usageList.add(featureSetUsages.get(i));
9191
}
9292
return usageList;
93-
});
93+
}, (ignore) -> true);
9494
iteratingActionListener.run();
9595
}
9696

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/IteratingActionListener.java

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@
66
package org.elasticsearch.xpack.core.common;
77

88
import org.elasticsearch.action.ActionListener;
9-
import org.elasticsearch.common.Nullable;
109
import org.elasticsearch.common.util.concurrent.ThreadContext;
1110

1211
import java.util.Collections;
1312
import java.util.List;
13+
import java.util.Objects;
1414
import java.util.function.BiConsumer;
15+
import java.util.function.Function;
16+
import java.util.function.Predicate;
1517
import java.util.function.Supplier;
1618

1719
/**
@@ -32,7 +34,8 @@ public final class IteratingActionListener<T, U> implements ActionListener<T>, R
3234
private final ActionListener<T> delegate;
3335
private final BiConsumer<U, ActionListener<T>> consumer;
3436
private final ThreadContext threadContext;
35-
private final Supplier<T> consumablesFinishedResponse;
37+
private final Function<T, T> finalResultFunction;
38+
private final Predicate<T> iterationPredicate;
3639

3740
private int position = 0;
3841

@@ -46,7 +49,7 @@ public final class IteratingActionListener<T, U> implements ActionListener<T>, R
4649
*/
4750
public IteratingActionListener(ActionListener<T> delegate, BiConsumer<U, ActionListener<T>> consumer, List<U> consumables,
4851
ThreadContext threadContext) {
49-
this(delegate, consumer, consumables, threadContext, null);
52+
this(delegate, consumer, consumables, threadContext, Function.identity());
5053
}
5154

5255
/**
@@ -56,18 +59,36 @@ public IteratingActionListener(ActionListener<T> delegate, BiConsumer<U, ActionL
5659
* @param consumer the consumer that is executed for each consumable instance
5760
* @param consumables the instances that can be consumed to produce a response which is ultimately sent on the delegate listener
5861
* @param threadContext the thread context for the thread pool that created the listener
59-
* @param consumablesFinishedResponse a supplier that maps the last consumable's response to a response
60-
* to be sent on the delegate listener, in case the last consumable returns a
61-
* {@code null} value, but the delegate listener should respond with some other value
62-
* (perhaps a concatenation of the results of all the consumables).
62+
* @param finalResultFunction a function that maps the response which terminated iteration to a response that will be sent to the
63+
* delegate listener. This is useful if the delegate listener should receive some other value (perhaps
64+
* a concatenation of the results of all the called consumables).
6365
*/
6466
public IteratingActionListener(ActionListener<T> delegate, BiConsumer<U, ActionListener<T>> consumer, List<U> consumables,
65-
ThreadContext threadContext, @Nullable Supplier<T> consumablesFinishedResponse) {
67+
ThreadContext threadContext, Function<T, T> finalResultFunction) {
68+
this(delegate, consumer, consumables, threadContext, finalResultFunction, Objects::isNull);
69+
}
70+
71+
/**
72+
* Constructs an {@link IteratingActionListener}.
73+
*
74+
* @param delegate the delegate listener to call when all consumables have finished executing
75+
* @param consumer the consumer that is executed for each consumable instance
76+
* @param consumables the instances that can be consumed to produce a response which is ultimately sent on the delegate listener
77+
* @param threadContext the thread context for the thread pool that created the listener
78+
* @param finalResultFunction a function that maps the response which terminated iteration to a response that will be sent to the
79+
* delegate listener. This is useful if the delegate listener should receive some other value (perhaps
80+
* a concatenation of the results of all the called consumables).
81+
* @param iterationPredicate a {@link Predicate} that checks if iteration should continue based on the returned result
82+
*/
83+
public IteratingActionListener(ActionListener<T> delegate, BiConsumer<U, ActionListener<T>> consumer, List<U> consumables,
84+
ThreadContext threadContext, Function<T, T> finalResultFunction,
85+
Predicate<T> iterationPredicate) {
6686
this.delegate = delegate;
6787
this.consumer = consumer;
6888
this.consumables = Collections.unmodifiableList(consumables);
6989
this.threadContext = threadContext;
70-
this.consumablesFinishedResponse = consumablesFinishedResponse;
90+
this.finalResultFunction = finalResultFunction;
91+
this.iterationPredicate = iterationPredicate;
7192
}
7293

7394
@Override
@@ -88,18 +109,15 @@ public void onResponse(T response) {
88109
// we need to store the context here as there is a chance that this method is called from a thread outside of the ThreadPool
89110
// like a LDAP connection reader thread and we can pollute the context in certain cases
90111
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext(false)) {
91-
if (response == null) {
112+
final boolean continueIteration = iterationPredicate.test(response);
113+
if (continueIteration) {
92114
if (position == consumables.size()) {
93-
if (consumablesFinishedResponse != null) {
94-
delegate.onResponse(consumablesFinishedResponse.get());
95-
} else {
96-
delegate.onResponse(null);
97-
}
115+
delegate.onResponse(finalResultFunction.apply(response));
98116
} else {
99117
consumer.accept(consumables.get(position++), this);
100118
}
101119
} else {
102-
delegate.onResponse(response);
120+
delegate.onResponse(finalResultFunction.apply(response));
103121
}
104122
}
105123
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/SecurityExtension.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.xpack.core.security.authc.Realm;
1515
import org.elasticsearch.xpack.core.security.authc.RealmConfig;
1616
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
17+
import org.elasticsearch.xpack.core.security.authz.store.RoleRetrievalResult;
1718

1819
import java.util.ArrayList;
1920
import java.util.Collections;
@@ -72,16 +73,20 @@ default AuthenticationFailureHandler getAuthenticationFailureHandler() {
7273
* should be asynchronous if the computation is lengthy or any disk and/or network
7374
* I/O is involved. The implementation is responsible for resolving whatever roles
7475
* it can into a set of {@link RoleDescriptor} instances. If successful, the
75-
* implementation must invoke {@link ActionListener#onResponse(Object)} to pass along
76-
* the resolved set of role descriptors. If a failure was encountered, the
77-
* implementation must invoke {@link ActionListener#onFailure(Exception)}.
76+
* implementation must wrap the set of {@link RoleDescriptor} instances in a
77+
* {@link RoleRetrievalResult} using {@link RoleRetrievalResult#success(Set)} and then invoke
78+
* {@link ActionListener#onResponse(Object)}. If a failure was encountered, the
79+
* implementation should wrap the failure in a {@link RoleRetrievalResult} using
80+
* {@link RoleRetrievalResult#failure(Exception)} and then invoke
81+
* {@link ActionListener#onResponse(Object)} unless the failure needs to terminate the request,
82+
* in which case the implementation should invoke {@link ActionListener#onFailure(Exception)}.
7883
*
7984
* By default, an empty list is returned.
8085
*
8186
* @param settings The configured settings for the node
8287
* @param resourceWatcherService Use to watch configuration files for changes
8388
*/
84-
default List<BiConsumer<Set<String>, ActionListener<Set<RoleDescriptor>>>>
89+
default List<BiConsumer<Set<String>, ActionListener<RoleRetrievalResult>>>
8590
getRolesProviders(Settings settings, ResourceWatcherService resourceWatcherService) {
8691
return Collections.emptyList();
8792
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStore.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.core.security.authz.store;
77

8+
import org.elasticsearch.action.ActionListener;
89
import org.elasticsearch.common.collect.MapBuilder;
910
import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkAction;
1011
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
@@ -21,9 +22,12 @@
2122
import java.util.Collection;
2223
import java.util.Collections;
2324
import java.util.Map;
25+
import java.util.Objects;
2426
import java.util.Set;
27+
import java.util.function.BiConsumer;
28+
import java.util.stream.Collectors;
2529

26-
public class ReservedRolesStore {
30+
public class ReservedRolesStore implements BiConsumer<Set<String>, ActionListener<RoleRetrievalResult>> {
2731

2832
public static final RoleDescriptor SUPERUSER_ROLE_DESCRIPTOR = new RoleDescriptor("superuser",
2933
new String[] { "all" },
@@ -165,4 +169,18 @@ public Collection<RoleDescriptor> roleDescriptors() {
165169
public static Set<String> names() {
166170
return RESERVED_ROLES.keySet();
167171
}
172+
173+
@Override
174+
public void accept(Set<String> roleNames, ActionListener<RoleRetrievalResult> listener) {
175+
final Set<RoleDescriptor> descriptors = roleNames.stream()
176+
.map(RESERVED_ROLES::get)
177+
.filter(Objects::nonNull)
178+
.collect(Collectors.toSet());
179+
listener.onResponse(RoleRetrievalResult.success(descriptors));
180+
}
181+
182+
@Override
183+
public String toString() {
184+
return "reserved roles store";
185+
}
168186
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.core.security.authz.store;
8+
9+
import org.elasticsearch.common.Nullable;
10+
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
11+
12+
import java.util.Objects;
13+
import java.util.Set;
14+
15+
/**
16+
* The result of attempting to retrieve roles from a roles provider. The result can either be
17+
* successful or a failure. A successful result indicates that no errors occurred while retrieving
18+
* roles, even if none of the requested roles could be found. A failure indicates an error
19+
* occurred while retrieving the results but the error is not fatal and the request may be able
20+
* to continue.
21+
*/
22+
public final class RoleRetrievalResult {
23+
24+
private final Set<RoleDescriptor> descriptors;
25+
26+
@Nullable
27+
private final Exception failure;
28+
29+
private RoleRetrievalResult(Set<RoleDescriptor> descriptors, @Nullable Exception failure) {
30+
if (descriptors != null && failure != null) {
31+
throw new IllegalArgumentException("either descriptors or failure must be null");
32+
}
33+
this.descriptors = descriptors;
34+
this.failure = failure;
35+
}
36+
37+
/**
38+
* @return the resolved descriptors or {@code null} if there was a failure
39+
*/
40+
public Set<RoleDescriptor> getDescriptors() {
41+
return descriptors;
42+
}
43+
44+
/**
45+
* @return the failure or {@code null} if retrieval succeeded
46+
*/
47+
@Nullable
48+
public Exception getFailure() {
49+
return failure;
50+
}
51+
52+
/**
53+
* @return true if the retrieval succeeded
54+
*/
55+
public boolean isSuccess() {
56+
return descriptors != null;
57+
}
58+
59+
/**
60+
* Creates a successful result with the provided {@link RoleDescriptor} set,
61+
* which must be non-null
62+
*/
63+
public static RoleRetrievalResult success(Set<RoleDescriptor> descriptors) {
64+
Objects.requireNonNull(descriptors, "descriptors must not be null if successful");
65+
return new RoleRetrievalResult(descriptors, null);
66+
}
67+
68+
/**
69+
* Creates a failed result with the provided non-null exception
70+
*/
71+
public static RoleRetrievalResult failure(Exception e) {
72+
Objects.requireNonNull(e, "Exception must be provided");
73+
return new RoleRetrievalResult(null, e);
74+
}
75+
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/IteratingActionListenerTests.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import org.elasticsearch.ElasticsearchException;
99
import org.elasticsearch.action.ActionListener;
10-
import org.elasticsearch.common.collect.HppcMaps.Object;
1110
import org.elasticsearch.common.settings.Settings;
1211
import org.elasticsearch.common.util.concurrent.ThreadContext;
1312
import org.elasticsearch.test.ESTestCase;
@@ -18,8 +17,12 @@
1817
import java.util.List;
1918
import java.util.concurrent.atomic.AtomicBoolean;
2019
import java.util.concurrent.atomic.AtomicInteger;
20+
import java.util.concurrent.atomic.AtomicReference;
2121
import java.util.function.BiConsumer;
22+
import java.util.function.Function;
23+
import java.util.function.Predicate;
2224

25+
import static org.hamcrest.Matchers.not;
2326
import static org.hamcrest.Matchers.sameInstance;
2427

2528
public class IteratingActionListenerTests extends ESTestCase {
@@ -136,4 +139,49 @@ public void testFailure() {
136139
assertEquals(numberOfIterations, iterations.get());
137140
assertTrue(onFailureCalled.get());
138141
}
142+
143+
public void testFunctionApplied() {
144+
final int numberOfItems = scaledRandomIntBetween(2, 32);
145+
final int numberOfIterations = scaledRandomIntBetween(1, numberOfItems);
146+
List<Object> items = new ArrayList<>(numberOfItems);
147+
for (int i = 0; i < numberOfItems; i++) {
148+
items.add(new Object());
149+
}
150+
151+
final AtomicInteger iterations = new AtomicInteger(0);
152+
final Predicate<Object> iterationPredicate = object -> {
153+
final int current = iterations.incrementAndGet();
154+
return current != numberOfIterations;
155+
};
156+
final BiConsumer<Object, ActionListener<Object>> consumer = (listValue, listener) -> {
157+
listener.onResponse(items.get(iterations.get()));
158+
};
159+
160+
final AtomicReference<Object> originalObject = new AtomicReference<>();
161+
final AtomicReference<Object> result = new AtomicReference<>();
162+
final Function<Object, Object> responseFunction = object -> {
163+
originalObject.set(object);
164+
Object randomResult;
165+
do {
166+
randomResult = randomFrom(items);
167+
} while (randomResult == object);
168+
result.set(randomResult);
169+
return randomResult;
170+
};
171+
172+
IteratingActionListener<Object, Object> iteratingListener = new IteratingActionListener<>(ActionListener.wrap((object) -> {
173+
assertNotNull(object);
174+
assertNotNull(originalObject.get());
175+
assertThat(object, sameInstance(result.get()));
176+
assertThat(object, not(sameInstance(originalObject.get())));
177+
assertThat(originalObject.get(), sameInstance(items.get(iterations.get() - 1)));
178+
}, (e) -> {
179+
logger.error("unexpected exception", e);
180+
fail("exception should not have been thrown");
181+
}), consumer, items, new ThreadContext(Settings.EMPTY), responseFunction, iterationPredicate);
182+
iteratingListener.run();
183+
184+
// we never really went async, its all chained together so verify this for sanity
185+
assertEquals(numberOfIterations, iterations.get());
186+
}
139187
}

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@
116116
import org.elasticsearch.xpack.core.security.authc.RealmSettings;
117117
import org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken;
118118
import org.elasticsearch.xpack.core.security.authz.AuthorizationServiceField;
119-
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
120119
import org.elasticsearch.xpack.core.security.authz.accesscontrol.IndicesAccessControl;
121120
import org.elasticsearch.xpack.core.security.authz.accesscontrol.SecurityIndexSearcherWrapper;
122121
import org.elasticsearch.xpack.core.security.authz.permission.FieldPermissions;
@@ -184,6 +183,7 @@
184183
import org.elasticsearch.xpack.security.authz.store.FileRolesStore;
185184
import org.elasticsearch.xpack.security.authz.store.NativePrivilegeStore;
186185
import org.elasticsearch.xpack.security.authz.store.NativeRolesStore;
186+
import org.elasticsearch.xpack.core.security.authz.store.RoleRetrievalResult;
187187
import org.elasticsearch.xpack.security.ingest.SetSecurityUserProcessor;
188188
import org.elasticsearch.xpack.security.rest.SecurityRestFilter;
189189
import org.elasticsearch.xpack.security.rest.action.RestAuthenticateAction;
@@ -458,7 +458,7 @@ Collection<Object> createComponents(Client client, ThreadPool threadPool, Cluste
458458
final FileRolesStore fileRolesStore = new FileRolesStore(settings, env, resourceWatcherService, getLicenseState());
459459
final NativeRolesStore nativeRolesStore = new NativeRolesStore(settings, client, getLicenseState(), securityIndex.get());
460460
final ReservedRolesStore reservedRolesStore = new ReservedRolesStore();
461-
List<BiConsumer<Set<String>, ActionListener<Set<RoleDescriptor>>>> rolesProviders = new ArrayList<>();
461+
List<BiConsumer<Set<String>, ActionListener<RoleRetrievalResult>>> rolesProviders = new ArrayList<>();
462462
for (SecurityExtension extension : securityExtensions) {
463463
rolesProviders.addAll(extension.getRolesProviders(settings, resourceWatcherService));
464464
}
@@ -610,7 +610,7 @@ public static List<Setting<?>> getSettings(boolean transportClientMode, List<Sec
610610
AuthenticationService.addSettings(settingsList);
611611
AuthorizationService.addSettings(settingsList);
612612
Automatons.addSettings(settingsList);
613-
settingsList.add(CompositeRolesStore.CACHE_SIZE_SETTING);
613+
settingsList.addAll(CompositeRolesStore.getSettings());
614614
settingsList.add(FieldPermissionsCache.CACHE_SIZE_SETTING);
615615
settingsList.add(TokenService.TOKEN_EXPIRATION);
616616
settingsList.add(TokenService.DELETE_INTERVAL);

0 commit comments

Comments
 (0)