@@ -375,30 +375,34 @@ void decodeToken(String token, ActionListener<UserToken> listener) throws IOExce
375
375
final Cipher cipher = getDecryptionCipher (iv , decodeKey , version , decodedSalt );
376
376
if (version .onOrAfter (Version .V_6_2_0 )) {
377
377
// we only have the id and need to get the token from the doc!
378
- decryptTokenId (in , cipher , version , ActionListener .wrap (tokenId ->
379
- securityIndex .prepareIndexIfNeededThenExecute (listener ::onFailure , () -> {
380
- final GetRequest getRequest =
378
+ decryptTokenId (in , cipher , version , ActionListener .wrap (tokenId -> {
379
+ if (securityIndex .isAvailable () == false ) {
380
+ logger .warn ("failed to get token [{}] since index is not available" , tokenId );
381
+ listener .onResponse (null );
382
+ } else {
383
+ securityIndex .checkIndexVersionThenExecute (listener ::onFailure , () -> {
384
+ final GetRequest getRequest =
381
385
client .prepareGet (SecurityIndexManager .SECURITY_INDEX_NAME , TYPE ,
382
- getTokenDocumentId (tokenId )).request ();
383
- executeAsyncWithOrigin (client .threadPool ().getThreadContext (), SECURITY_ORIGIN , getRequest ,
386
+ getTokenDocumentId (tokenId )).request ();
387
+ executeAsyncWithOrigin (client .threadPool ().getThreadContext (), SECURITY_ORIGIN , getRequest ,
384
388
ActionListener .<GetResponse >wrap (response -> {
385
389
if (response .isExists ()) {
386
390
Map <String , Object > accessTokenSource =
387
- (Map <String , Object >) response .getSource ().get ("access_token" );
391
+ (Map <String , Object >) response .getSource ().get ("access_token" );
388
392
if (accessTokenSource == null ) {
389
393
listener .onFailure (new IllegalStateException ("token document is missing " +
390
- "the access_token field" ));
394
+ "the access_token field" ));
391
395
} else if (accessTokenSource .containsKey ("user_token" ) == false ) {
392
396
listener .onFailure (new IllegalStateException ("token document is missing " +
393
- "the user_token field" ));
397
+ "the user_token field" ));
394
398
} else {
395
399
Map <String , Object > userTokenSource =
396
- (Map <String , Object >) accessTokenSource .get ("user_token" );
400
+ (Map <String , Object >) accessTokenSource .get ("user_token" );
397
401
listener .onResponse (UserToken .fromSourceMap (userTokenSource ));
398
402
}
399
403
} else {
400
404
listener .onFailure (
401
- new IllegalStateException ("token document is missing and must be present" ));
405
+ new IllegalStateException ("token document is missing and must be present" ));
402
406
}
403
407
}, e -> {
404
408
// if the index or the shard is not there / available we assume that
@@ -411,7 +415,8 @@ void decodeToken(String token, ActionListener<UserToken> listener) throws IOExce
411
415
listener .onFailure (e );
412
416
}
413
417
}), client ::get );
414
- }), listener ::onFailure ));
418
+ });
419
+ }}, listener ::onFailure ));
415
420
} else {
416
421
decryptToken (in , cipher , version , listener );
417
422
}
@@ -687,30 +692,36 @@ private void findTokenFromRefreshToken(String refreshToken, ActionListener<Tuple
687
692
.setVersion (true )
688
693
.request ();
689
694
690
- securityIndex .prepareIndexIfNeededThenExecute (listener ::onFailure , () ->
695
+ if (securityIndex .isAvailable () == false ) {
696
+ logger .debug ("security index is not available to find token from refresh token, retrying" );
697
+ attemptCount .incrementAndGet ();
698
+ findTokenFromRefreshToken (refreshToken , listener , attemptCount );
699
+ } else {
700
+ securityIndex .checkIndexVersionThenExecute (listener ::onFailure , () ->
691
701
executeAsyncWithOrigin (client .threadPool ().getThreadContext (), SECURITY_ORIGIN , request ,
692
- ActionListener .<SearchResponse >wrap (searchResponse -> {
693
- if (searchResponse .isTimedOut ()) {
694
- attemptCount .incrementAndGet ();
695
- findTokenFromRefreshToken (refreshToken , listener , attemptCount );
696
- } else if (searchResponse .getHits ().getHits ().length < 1 ) {
697
- logger .info ("could not find token document with refresh_token [{}]" , refreshToken );
698
- listener .onFailure (invalidGrantException ("could not refresh the requested token" ));
699
- } else if (searchResponse .getHits ().getHits ().length > 1 ) {
700
- listener .onFailure (new IllegalStateException ("multiple tokens share the same refresh token" ));
701
- } else {
702
- listener .onResponse (new Tuple <>(searchResponse , attemptCount ));
703
- }
704
- }, e -> {
705
- if (isShardNotAvailableException (e )) {
706
- logger .debug ("failed to search for token document, retrying" , e );
707
- attemptCount .incrementAndGet ();
708
- findTokenFromRefreshToken (refreshToken , listener , attemptCount );
709
- } else {
710
- listener .onFailure (e );
711
- }
712
- }),
713
- client ::search ));
702
+ ActionListener .<SearchResponse >wrap (searchResponse -> {
703
+ if (searchResponse .isTimedOut ()) {
704
+ attemptCount .incrementAndGet ();
705
+ findTokenFromRefreshToken (refreshToken , listener , attemptCount );
706
+ } else if (searchResponse .getHits ().getHits ().length < 1 ) {
707
+ logger .info ("could not find token document with refresh_token [{}]" , refreshToken );
708
+ listener .onFailure (invalidGrantException ("could not refresh the requested token" ));
709
+ } else if (searchResponse .getHits ().getHits ().length > 1 ) {
710
+ listener .onFailure (new IllegalStateException ("multiple tokens share the same refresh token" ));
711
+ } else {
712
+ listener .onResponse (new Tuple <>(searchResponse , attemptCount ));
713
+ }
714
+ }, e -> {
715
+ if (isShardNotAvailableException (e )) {
716
+ logger .debug ("failed to search for token document, retrying" , e );
717
+ attemptCount .incrementAndGet ();
718
+ findTokenFromRefreshToken (refreshToken , listener , attemptCount );
719
+ } else {
720
+ listener .onFailure (e );
721
+ }
722
+ }),
723
+ client ::search ));
724
+ }
714
725
}
715
726
}
716
727
@@ -845,32 +856,33 @@ public void findActiveTokensForRealm(String realmName, ActionListener<Collection
845
856
846
857
if (Strings .isNullOrEmpty (realmName )) {
847
858
listener .onFailure (new IllegalArgumentException ("Realm name is required" ));
848
- return ;
849
- }
850
-
851
- final Instant now = clock .instant ();
852
- final BoolQueryBuilder boolQuery = QueryBuilders .boolQuery ()
859
+ } else if ( securityIndex . isAvailable () == false ) {
860
+ listener . onResponse ( Collections . emptyList ());
861
+ } else {
862
+ final Instant now = clock .instant ();
863
+ final BoolQueryBuilder boolQuery = QueryBuilders .boolQuery ()
853
864
.filter (QueryBuilders .termQuery ("doc_type" , "token" ))
854
865
.filter (QueryBuilders .termQuery ("access_token.realm" , realmName ))
855
866
.filter (QueryBuilders .boolQuery ()
856
- .should (QueryBuilders .boolQuery ()
857
- .must (QueryBuilders .termQuery ("access_token.invalidated" , false ))
858
- .must (QueryBuilders .rangeQuery ("access_token.user_token.expiration_time" ).gte (now .toEpochMilli ()))
859
- )
860
- .should (QueryBuilders .termQuery ("refresh_token.invalidated" , false ))
867
+ .should (QueryBuilders .boolQuery ()
868
+ .must (QueryBuilders .termQuery ("access_token.invalidated" , false ))
869
+ .must (QueryBuilders .rangeQuery ("access_token.user_token.expiration_time" ).gte (now .toEpochMilli ()))
870
+ )
871
+ .should (QueryBuilders .termQuery ("refresh_token.invalidated" , false ))
861
872
);
862
873
863
- final SearchRequest request = client .prepareSearch (SecurityIndexManager .SECURITY_INDEX_NAME )
874
+ final SearchRequest request = client .prepareSearch (SecurityIndexManager .SECURITY_INDEX_NAME )
864
875
.setScroll (DEFAULT_KEEPALIVE_SETTING .get (settings ))
865
876
.setQuery (boolQuery )
866
877
.setVersion (false )
867
878
.setSize (1000 )
868
879
.setFetchSource (true )
869
880
.request ();
870
881
871
- final Supplier <ThreadContext .StoredContext > supplier = client .threadPool ().getThreadContext ().newRestorableContext (false );
872
- securityIndex .prepareIndexIfNeededThenExecute (listener ::onFailure , () ->
873
- ScrollHelper .fetchAllByEntity (client , request , new ContextPreservingActionListener <>(supplier , listener ), this ::parseHit ));
882
+ final Supplier <ThreadContext .StoredContext > supplier = client .threadPool ().getThreadContext ().newRestorableContext (false );
883
+ securityIndex .checkIndexVersionThenExecute (listener ::onFailure , () ->
884
+ ScrollHelper .fetchAllByEntity (client , request , new ContextPreservingActionListener <>(supplier , listener ), this ::parseHit ));
885
+ }
874
886
}
875
887
876
888
private Tuple <UserToken , String > parseHit (SearchHit hit ) {
@@ -937,10 +949,12 @@ private void ensureEnabled() {
937
949
*/
938
950
private void checkIfTokenIsRevoked (UserToken userToken , ActionListener <UserToken > listener ) {
939
951
if (securityIndex .indexExists () == false ) {
940
- // index doesn't exist so the token is considered valid.
952
+ // index doesn't exist so the token is considered valid. it is important to note that
953
+ // we do not use isAvailable as the lack of a shard being available is not equivalent
954
+ // to the index not existing in the case of revocation checking.
941
955
listener .onResponse (userToken );
942
956
} else {
943
- securityIndex .prepareIndexIfNeededThenExecute (listener ::onFailure , () -> {
957
+ securityIndex .checkIndexVersionThenExecute (listener ::onFailure , () -> {
944
958
MultiGetRequest mGetRequest = client .prepareMultiGet ()
945
959
.add (SecurityIndexManager .SECURITY_INDEX_NAME , TYPE , getInvalidatedTokenDocumentId (userToken ))
946
960
.add (SecurityIndexManager .SECURITY_INDEX_NAME , TYPE , getTokenDocumentId (userToken ))
0 commit comments