Skip to content

Commit 0b4e8db

Browse files
authored
Security: don't call prepare index for reads (#34246)
The security native stores follow a pattern where `SecurityIndexManager#prepareIndexIfNeededThenExecute` wraps most calls made for the security index. The reasoning behind this was to check if the security index had been upgraded to the latest version in a consistent manner. However, this has the potential side effect that a read will trigger the creation of the security index or an updating of its mappings, which can lead to issues such as failures due to put mapping requests timing out even though we might have been able to read from the index and get the data necessary. This change introduces a new method, `checkIndexVersionThenExecute`, that provides the consistent checking of the security index to make sure it has been upgraded. That is the only check that this method performs prior to running the passed in operation, which removes the possible triggering of index creation and mapping updates for reads. Additionally, areas where we do reads now check the availability of the security index and can short circuit requests. Availability in this context means that the index exists and all primaries are active. Relates #33205
1 parent a93aefb commit 0b4e8db

File tree

12 files changed

+318
-179
lines changed

12 files changed

+318
-179
lines changed

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java

Lines changed: 64 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -361,30 +361,34 @@ void decodeToken(String token, ActionListener<UserToken> listener) throws IOExce
361361
final Cipher cipher = getDecryptionCipher(iv, decodeKey, version, decodedSalt);
362362
if (version.onOrAfter(Version.V_6_2_0)) {
363363
// we only have the id and need to get the token from the doc!
364-
decryptTokenId(in, cipher, version, ActionListener.wrap(tokenId ->
365-
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
366-
final GetRequest getRequest =
364+
decryptTokenId(in, cipher, version, ActionListener.wrap(tokenId -> {
365+
if (securityIndex.isAvailable() == false) {
366+
logger.warn("failed to get token [{}] since index is not available", tokenId);
367+
listener.onResponse(null);
368+
} else {
369+
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
370+
final GetRequest getRequest =
367371
client.prepareGet(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE,
368-
getTokenDocumentId(tokenId)).request();
369-
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest,
372+
getTokenDocumentId(tokenId)).request();
373+
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest,
370374
ActionListener.<GetResponse>wrap(response -> {
371375
if (response.isExists()) {
372376
Map<String, Object> accessTokenSource =
373-
(Map<String, Object>) response.getSource().get("access_token");
377+
(Map<String, Object>) response.getSource().get("access_token");
374378
if (accessTokenSource == null) {
375379
listener.onFailure(new IllegalStateException("token document is missing " +
376-
"the access_token field"));
380+
"the access_token field"));
377381
} else if (accessTokenSource.containsKey("user_token") == false) {
378382
listener.onFailure(new IllegalStateException("token document is missing " +
379-
"the user_token field"));
383+
"the user_token field"));
380384
} else {
381385
Map<String, Object> userTokenSource =
382-
(Map<String, Object>) accessTokenSource.get("user_token");
386+
(Map<String, Object>) accessTokenSource.get("user_token");
383387
listener.onResponse(UserToken.fromSourceMap(userTokenSource));
384388
}
385389
} else {
386390
listener.onFailure(
387-
new IllegalStateException("token document is missing and must be present"));
391+
new IllegalStateException("token document is missing and must be present"));
388392
}
389393
}, e -> {
390394
// if the index or the shard is not there / available we assume that
@@ -397,7 +401,8 @@ void decodeToken(String token, ActionListener<UserToken> listener) throws IOExce
397401
listener.onFailure(e);
398402
}
399403
}), client::get);
400-
}), listener::onFailure));
404+
});
405+
}}, listener::onFailure));
401406
} else {
402407
decryptToken(in, cipher, version, listener);
403408
}
@@ -673,30 +678,36 @@ private void findTokenFromRefreshToken(String refreshToken, ActionListener<Tuple
673678
.setVersion(true)
674679
.request();
675680

676-
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
681+
if (securityIndex.isAvailable() == false) {
682+
logger.debug("security index is not available to find token from refresh token, retrying");
683+
attemptCount.incrementAndGet();
684+
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
685+
} else {
686+
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
677687
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
678-
ActionListener.<SearchResponse>wrap(searchResponse -> {
679-
if (searchResponse.isTimedOut()) {
680-
attemptCount.incrementAndGet();
681-
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
682-
} else if (searchResponse.getHits().getHits().length < 1) {
683-
logger.info("could not find token document with refresh_token [{}]", refreshToken);
684-
listener.onFailure(invalidGrantException("could not refresh the requested token"));
685-
} else if (searchResponse.getHits().getHits().length > 1) {
686-
listener.onFailure(new IllegalStateException("multiple tokens share the same refresh token"));
687-
} else {
688-
listener.onResponse(new Tuple<>(searchResponse, attemptCount));
689-
}
690-
}, e -> {
691-
if (isShardNotAvailableException(e)) {
692-
logger.debug("failed to search for token document, retrying", e);
693-
attemptCount.incrementAndGet();
694-
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
695-
} else {
696-
listener.onFailure(e);
697-
}
698-
}),
699-
client::search));
688+
ActionListener.<SearchResponse>wrap(searchResponse -> {
689+
if (searchResponse.isTimedOut()) {
690+
attemptCount.incrementAndGet();
691+
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
692+
} else if (searchResponse.getHits().getHits().length < 1) {
693+
logger.info("could not find token document with refresh_token [{}]", refreshToken);
694+
listener.onFailure(invalidGrantException("could not refresh the requested token"));
695+
} else if (searchResponse.getHits().getHits().length > 1) {
696+
listener.onFailure(new IllegalStateException("multiple tokens share the same refresh token"));
697+
} else {
698+
listener.onResponse(new Tuple<>(searchResponse, attemptCount));
699+
}
700+
}, e -> {
701+
if (isShardNotAvailableException(e)) {
702+
logger.debug("failed to search for token document, retrying", e);
703+
attemptCount.incrementAndGet();
704+
findTokenFromRefreshToken(refreshToken, listener, attemptCount);
705+
} else {
706+
listener.onFailure(e);
707+
}
708+
}),
709+
client::search));
710+
}
700711
}
701712
}
702713

@@ -831,32 +842,33 @@ public void findActiveTokensForRealm(String realmName, ActionListener<Collection
831842

832843
if (Strings.isNullOrEmpty(realmName)) {
833844
listener.onFailure(new IllegalArgumentException("Realm name is required"));
834-
return;
835-
}
836-
837-
final Instant now = clock.instant();
838-
final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
845+
} else if (securityIndex.isAvailable() == false) {
846+
listener.onResponse(Collections.emptyList());
847+
} else {
848+
final Instant now = clock.instant();
849+
final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
839850
.filter(QueryBuilders.termQuery("doc_type", "token"))
840851
.filter(QueryBuilders.termQuery("access_token.realm", realmName))
841852
.filter(QueryBuilders.boolQuery()
842-
.should(QueryBuilders.boolQuery()
843-
.must(QueryBuilders.termQuery("access_token.invalidated", false))
844-
.must(QueryBuilders.rangeQuery("access_token.user_token.expiration_time").gte(now.toEpochMilli()))
845-
)
846-
.should(QueryBuilders.termQuery("refresh_token.invalidated", false))
853+
.should(QueryBuilders.boolQuery()
854+
.must(QueryBuilders.termQuery("access_token.invalidated", false))
855+
.must(QueryBuilders.rangeQuery("access_token.user_token.expiration_time").gte(now.toEpochMilli()))
856+
)
857+
.should(QueryBuilders.termQuery("refresh_token.invalidated", false))
847858
);
848859

849-
final SearchRequest request = client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME)
860+
final SearchRequest request = client.prepareSearch(SecurityIndexManager.SECURITY_INDEX_NAME)
850861
.setScroll(DEFAULT_KEEPALIVE_SETTING.get(settings))
851862
.setQuery(boolQuery)
852863
.setVersion(false)
853864
.setSize(1000)
854865
.setFetchSource(true)
855866
.request();
856867

857-
final Supplier<ThreadContext.StoredContext> supplier = client.threadPool().getThreadContext().newRestorableContext(false);
858-
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () ->
859-
ScrollHelper.fetchAllByEntity(client, request, new ContextPreservingActionListener<>(supplier, listener), this::parseHit));
868+
final Supplier<ThreadContext.StoredContext> supplier = client.threadPool().getThreadContext().newRestorableContext(false);
869+
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () ->
870+
ScrollHelper.fetchAllByEntity(client, request, new ContextPreservingActionListener<>(supplier, listener), this::parseHit));
871+
}
860872
}
861873

862874
private Tuple<UserToken, String> parseHit(SearchHit hit) {
@@ -923,10 +935,12 @@ private void ensureEnabled() {
923935
*/
924936
private void checkIfTokenIsRevoked(UserToken userToken, ActionListener<UserToken> listener) {
925937
if (securityIndex.indexExists() == false) {
926-
// index doesn't exist so the token is considered valid.
938+
// index doesn't exist so the token is considered valid. it is important to note that
939+
// we do not use isAvailable as the lack of a shard being available is not equivalent
940+
// to the index not existing in the case of revocation checking.
927941
listener.onResponse(userToken);
928942
} else {
929-
securityIndex.prepareIndexIfNeededThenExecute(listener::onFailure, () -> {
943+
securityIndex.checkIndexVersionThenExecute(listener::onFailure, () -> {
930944
MultiGetRequest mGetRequest = client.prepareMultiGet()
931945
.add(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, getInvalidatedTokenDocumentId(userToken))
932946
.add(SecurityIndexManager.SECURITY_INDEX_NAME, TYPE, getTokenDocumentId(userToken))

0 commit comments

Comments
 (0)