Skip to content

Commit 36d940f

Browse files
committed
Limit user to single concurrent auth per realm (#30794)
This commit reworks the way our realms perform caching in order to limit each principal to a single ongoing authentication per realm. In other words, this means that multiple requests made by the same user will not trigger more that one authentication attempt at a time if no entry has been stored in the cache. If an entry is present in our cache, there is no restriction on the number of concurrent authentications performed for this user. This change enables us to limit the load we place on an external system like an LDAP server and also preserve resources such as CPU on expensive operations such as BCrypt authentication. Closes #30355
1 parent d1c0d41 commit 36d940f

File tree

18 files changed

+713
-228
lines changed

18 files changed

+713
-228
lines changed

server/src/main/java/org/elasticsearch/common/cache/Cache.java

Lines changed: 64 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
* @param <V> The type of the values
6969
*/
7070
public class Cache<K, V> {
71+
7172
// positive if entries have an expiration
7273
private long expireAfterAccessNanos = -1;
7374

@@ -282,6 +283,39 @@ void remove(K key, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) {
282283
}
283284
}
284285

286+
/**
287+
* remove an entry from the segment iff the future is done and the value is equal to the
288+
* expected value
289+
*
290+
* @param key the key of the entry to remove from the cache
291+
* @param value the value expected to be associated with the key
292+
* @param onRemoval a callback for the removed entry
293+
*/
294+
void remove(K key, V value, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) {
295+
CompletableFuture<Entry<K, V>> future;
296+
boolean removed = false;
297+
try (ReleasableLock ignored = writeLock.acquire()) {
298+
future = map.get(key);
299+
try {
300+
if (future != null) {
301+
if (future.isDone()) {
302+
Entry<K, V> entry = future.get();
303+
if (Objects.equals(value, entry.value)) {
304+
removed = map.remove(key, future);
305+
}
306+
}
307+
}
308+
} catch (ExecutionException | InterruptedException e) {
309+
throw new IllegalStateException(e);
310+
}
311+
}
312+
313+
if (future != null && removed) {
314+
segmentStats.eviction();
315+
onRemoval.accept(future);
316+
}
317+
}
318+
285319
private static class SegmentStats {
286320
private final LongAdder hits = new LongAdder();
287321
private final LongAdder misses = new LongAdder();
@@ -314,7 +348,7 @@ void eviction() {
314348
Entry<K, V> tail;
315349

316350
// lock protecting mutations to the LRU list
317-
private ReleasableLock lruLock = new ReleasableLock(new ReentrantLock());
351+
private final ReleasableLock lruLock = new ReleasableLock(new ReentrantLock());
318352

319353
/**
320354
* Returns the value to which the specified key is mapped, or null if this map contains no mapping for the key.
@@ -455,6 +489,19 @@ private void put(K key, V value, long now) {
455489
}
456490
}
457491

492+
private final Consumer<CompletableFuture<Entry<K, V>>> invalidationConsumer = f -> {
493+
try {
494+
Entry<K, V> entry = f.get();
495+
try (ReleasableLock ignored = lruLock.acquire()) {
496+
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
497+
}
498+
} catch (ExecutionException e) {
499+
// ok
500+
} catch (InterruptedException e) {
501+
throw new IllegalStateException(e);
502+
}
503+
};
504+
458505
/**
459506
* Invalidate the association for the specified key. A removal notification will be issued for invalidated
460507
* entries with {@link org.elasticsearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED.
@@ -463,18 +510,20 @@ private void put(K key, V value, long now) {
463510
*/
464511
public void invalidate(K key) {
465512
CacheSegment<K, V> segment = getCacheSegment(key);
466-
segment.remove(key, f -> {
467-
try {
468-
Entry<K, V> entry = f.get();
469-
try (ReleasableLock ignored = lruLock.acquire()) {
470-
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
471-
}
472-
} catch (ExecutionException e) {
473-
// ok
474-
} catch (InterruptedException e) {
475-
throw new IllegalStateException(e);
476-
}
477-
});
513+
segment.remove(key, invalidationConsumer);
514+
}
515+
516+
/**
517+
* Invalidate the entry for the specified key and value. If the value provided is not equal to the value in
518+
* the cache, no removal will occur. A removal notification will be issued for invalidated
519+
* entries with {@link org.elasticsearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED.
520+
*
521+
* @param key the key whose mapping is to be invalidated from the cache
522+
* @param value the expected value that should be associated with the key
523+
*/
524+
public void invalidate(K key, V value) {
525+
CacheSegment<K, V> segment = getCacheSegment(key);
526+
segment.remove(key, value, invalidationConsumer);
478527
}
479528

480529
/**
@@ -625,7 +674,7 @@ public void remove() {
625674
Entry<K, V> entry = current;
626675
if (entry != null) {
627676
CacheSegment<K, V> segment = getCacheSegment(entry.key);
628-
segment.remove(entry.key, f -> {});
677+
segment.remove(entry.key, entry.value, f -> {});
629678
try (ReleasableLock ignored = lruLock.acquire()) {
630679
current = null;
631680
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
@@ -710,7 +759,7 @@ private void evictEntry(Entry<K, V> entry) {
710759

711760
CacheSegment<K, V> segment = getCacheSegment(entry.key);
712761
if (segment != null) {
713-
segment.remove(entry.key, f -> {});
762+
segment.remove(entry.key, entry.value, f -> {});
714763
}
715764
delete(entry, RemovalNotification.RemovalReason.EVICTED);
716765
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.common.util.concurrent;
21+
22+
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.common.collect.Tuple;
24+
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
import java.util.concurrent.ExecutorService;
28+
import java.util.concurrent.TimeUnit;
29+
30+
/**
31+
* A future implementation that allows for the result to be passed to listeners waiting for
32+
* notification. This is useful for cases where a computation is requested many times
33+
* concurrently, but really only needs to be performed a single time. Once the computation
34+
* has been performed the registered listeners will be notified by submitting a runnable
35+
* for execution in the provided {@link ExecutorService}. If the computation has already
36+
* been performed, a request to add a listener will simply result in execution of the listener
37+
* on the calling thread.
38+
*/
39+
public final class ListenableFuture<V> extends BaseFuture<V> implements ActionListener<V> {
40+
41+
private volatile boolean done = false;
42+
private final List<Tuple<ActionListener<V>, ExecutorService>> listeners = new ArrayList<>();
43+
44+
/**
45+
* Adds a listener to this future. If the future has not yet completed, the listener will be
46+
* notified of a response or exception in a runnable submitted to the ExecutorService provided.
47+
* If the future has completed, the listener will be notified immediately without forking to
48+
* a different thread.
49+
*/
50+
public void addListener(ActionListener<V> listener, ExecutorService executor) {
51+
if (done) {
52+
// run the callback directly, we don't hold the lock and don't need to fork!
53+
notifyListener(listener, EsExecutors.newDirectExecutorService());
54+
} else {
55+
final boolean run;
56+
// check done under lock since it could have been modified and protect modifications
57+
// to the list under lock
58+
synchronized (this) {
59+
if (done) {
60+
run = true;
61+
} else {
62+
listeners.add(new Tuple<>(listener, executor));
63+
run = false;
64+
}
65+
}
66+
67+
if (run) {
68+
// run the callback directly, we don't hold the lock and don't need to fork!
69+
notifyListener(listener, EsExecutors.newDirectExecutorService());
70+
}
71+
}
72+
}
73+
74+
@Override
75+
protected synchronized void done() {
76+
done = true;
77+
listeners.forEach(t -> notifyListener(t.v1(), t.v2()));
78+
// release references to any listeners as we no longer need them and will live
79+
// much longer than the listeners in most cases
80+
listeners.clear();
81+
}
82+
83+
private void notifyListener(ActionListener<V> listener, ExecutorService executorService) {
84+
try {
85+
executorService.submit(() -> {
86+
try {
87+
// call get in a non-blocking fashion as we could be on a network thread
88+
// or another thread like the scheduler, which we should never block!
89+
V value = FutureUtils.get(this, 0L, TimeUnit.NANOSECONDS);
90+
listener.onResponse(value);
91+
} catch (Exception e) {
92+
listener.onFailure(e);
93+
}
94+
});
95+
} catch (Exception e) {
96+
listener.onFailure(e);
97+
}
98+
}
99+
100+
@Override
101+
public void onResponse(V v) {
102+
final boolean set = set(v);
103+
if (set == false) {
104+
throw new IllegalStateException("did not set value, value or exception already set?");
105+
}
106+
}
107+
108+
@Override
109+
public void onFailure(Exception e) {
110+
final boolean set = setException(e);
111+
if (set == false) {
112+
throw new IllegalStateException("did not set exception, value already set or exception already set?");
113+
}
114+
}
115+
}

server/src/test/java/org/elasticsearch/common/cache/CacheTests.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,62 @@ public void testNotificationOnInvalidate() {
457457
assertEquals(notifications, invalidated);
458458
}
459459

460+
// randomly invalidate some cached entries, then check that a lookup for each of those and only those keys is null
461+
public void testInvalidateWithValue() {
462+
Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();
463+
for (int i = 0; i < numberOfEntries; i++) {
464+
cache.put(i, Integer.toString(i));
465+
}
466+
Set<Integer> keys = new HashSet<>();
467+
for (Integer key : cache.keys()) {
468+
if (rarely()) {
469+
if (randomBoolean()) {
470+
cache.invalidate(key, key.toString());
471+
keys.add(key);
472+
} else {
473+
// invalidate with incorrect value
474+
cache.invalidate(key, Integer.toString(key * randomIntBetween(2, 10)));
475+
}
476+
}
477+
}
478+
for (int i = 0; i < numberOfEntries; i++) {
479+
if (keys.contains(i)) {
480+
assertNull(cache.get(i));
481+
} else {
482+
assertNotNull(cache.get(i));
483+
}
484+
}
485+
}
486+
487+
// randomly invalidate some cached entries, then check that we receive invalidate notifications for those and only
488+
// those entries
489+
public void testNotificationOnInvalidateWithValue() {
490+
Set<Integer> notifications = new HashSet<>();
491+
Cache<Integer, String> cache =
492+
CacheBuilder.<Integer, String>builder()
493+
.removalListener(notification -> {
494+
assertEquals(RemovalNotification.RemovalReason.INVALIDATED, notification.getRemovalReason());
495+
notifications.add(notification.getKey());
496+
})
497+
.build();
498+
for (int i = 0; i < numberOfEntries; i++) {
499+
cache.put(i, Integer.toString(i));
500+
}
501+
Set<Integer> invalidated = new HashSet<>();
502+
for (int i = 0; i < numberOfEntries; i++) {
503+
if (rarely()) {
504+
if (randomBoolean()) {
505+
cache.invalidate(i, Integer.toString(i));
506+
invalidated.add(i);
507+
} else {
508+
// invalidate with incorrect value
509+
cache.invalidate(i, Integer.toString(i * randomIntBetween(2, 10)));
510+
}
511+
}
512+
}
513+
assertEquals(notifications, invalidated);
514+
}
515+
460516
// invalidate all cached entries, then check that the cache is empty
461517
public void testInvalidateAll() {
462518
Cache<Integer, String> cache = CacheBuilder.<Integer, String>builder().build();

0 commit comments

Comments
 (0)