@@ -375,34 +375,30 @@ void decodeToken(String token, ActionListener<UserToken> listener) throws IOExce
375
375
final Cipher cipher = getDecryptionCipher (iv , decodeKey , version , decodedSalt );
376
376
if (version .onOrAfter (Version .V_6_2_0 )) {
377
377
// we only have the id and need to get the token from the doc!
378
- decryptTokenId (in , cipher , version , ActionListener .wrap (tokenId -> {
379
- if (securityIndex .isAvailable () == false ) {
380
- logger .warn ("failed to get token [{}] since index is not available" , tokenId );
381
- listener .onResponse (null );
382
- } else {
383
- securityIndex .checkIndexVersionThenExecute (listener ::onFailure , () -> {
384
- final GetRequest getRequest =
378
+ decryptTokenId (in , cipher , version , ActionListener .wrap (tokenId ->
379
+ securityIndex .prepareIndexIfNeededThenExecute (listener ::onFailure , () -> {
380
+ final GetRequest getRequest =
385
381
client .prepareGet (SecurityIndexManager .SECURITY_INDEX_NAME , TYPE ,
386
- getTokenDocumentId (tokenId )).request ();
387
- executeAsyncWithOrigin (client .threadPool ().getThreadContext (), SECURITY_ORIGIN , getRequest ,
382
+ getTokenDocumentId (tokenId )).request ();
383
+ executeAsyncWithOrigin (client .threadPool ().getThreadContext (), SECURITY_ORIGIN , getRequest ,
388
384
ActionListener .<GetResponse >wrap (response -> {
389
385
if (response .isExists ()) {
390
386
Map <String , Object > accessTokenSource =
391
- (Map <String , Object >) response .getSource ().get ("access_token" );
387
+ (Map <String , Object >) response .getSource ().get ("access_token" );
392
388
if (accessTokenSource == null ) {
393
389
listener .onFailure (new IllegalStateException ("token document is missing " +
394
- "the access_token field" ));
390
+ "the access_token field" ));
395
391
} else if (accessTokenSource .containsKey ("user_token" ) == false ) {
396
392
listener .onFailure (new IllegalStateException ("token document is missing " +
397
- "the user_token field" ));
393
+ "the user_token field" ));
398
394
} else {
399
395
Map <String , Object > userTokenSource =
400
- (Map <String , Object >) accessTokenSource .get ("user_token" );
396
+ (Map <String , Object >) accessTokenSource .get ("user_token" );
401
397
listener .onResponse (UserToken .fromSourceMap (userTokenSource ));
402
398
}
403
399
} else {
404
400
listener .onFailure (
405
- new IllegalStateException ("token document is missing and must be present" ));
401
+ new IllegalStateException ("token document is missing and must be present" ));
406
402
}
407
403
}, e -> {
408
404
// if the index or the shard is not there / available we assume that
@@ -415,8 +411,7 @@ void decodeToken(String token, ActionListener<UserToken> listener) throws IOExce
415
411
listener .onFailure (e );
416
412
}
417
413
}), client ::get );
418
- });
419
- }}, listener ::onFailure ));
414
+ }), listener ::onFailure ));
420
415
} else {
421
416
decryptToken (in , cipher , version , listener );
422
417
}
@@ -692,36 +687,30 @@ private void findTokenFromRefreshToken(String refreshToken, ActionListener<Tuple
692
687
.setVersion (true )
693
688
.request ();
694
689
695
- if (securityIndex .isAvailable () == false ) {
696
- logger .debug ("security index is not available to find token from refresh token, retrying" );
697
- attemptCount .incrementAndGet ();
698
- findTokenFromRefreshToken (refreshToken , listener , attemptCount );
699
- } else {
700
- securityIndex .checkIndexVersionThenExecute (listener ::onFailure , () ->
690
+ securityIndex .prepareIndexIfNeededThenExecute (listener ::onFailure , () ->
701
691
executeAsyncWithOrigin (client .threadPool ().getThreadContext (), SECURITY_ORIGIN , request ,
702
- ActionListener .<SearchResponse >wrap (searchResponse -> {
703
- if (searchResponse .isTimedOut ()) {
704
- attemptCount .incrementAndGet ();
705
- findTokenFromRefreshToken (refreshToken , listener , attemptCount );
706
- } else if (searchResponse .getHits ().getHits ().length < 1 ) {
707
- logger .info ("could not find token document with refresh_token [{}]" , refreshToken );
708
- listener .onFailure (invalidGrantException ("could not refresh the requested token" ));
709
- } else if (searchResponse .getHits ().getHits ().length > 1 ) {
710
- listener .onFailure (new IllegalStateException ("multiple tokens share the same refresh token" ));
711
- } else {
712
- listener .onResponse (new Tuple <>(searchResponse , attemptCount ));
713
- }
714
- }, e -> {
715
- if (isShardNotAvailableException (e )) {
716
- logger .debug ("failed to search for token document, retrying" , e );
717
- attemptCount .incrementAndGet ();
718
- findTokenFromRefreshToken (refreshToken , listener , attemptCount );
719
- } else {
720
- listener .onFailure (e );
721
- }
722
- }),
723
- client ::search ));
724
- }
692
+ ActionListener .<SearchResponse >wrap (searchResponse -> {
693
+ if (searchResponse .isTimedOut ()) {
694
+ attemptCount .incrementAndGet ();
695
+ findTokenFromRefreshToken (refreshToken , listener , attemptCount );
696
+ } else if (searchResponse .getHits ().getHits ().length < 1 ) {
697
+ logger .info ("could not find token document with refresh_token [{}]" , refreshToken );
698
+ listener .onFailure (invalidGrantException ("could not refresh the requested token" ));
699
+ } else if (searchResponse .getHits ().getHits ().length > 1 ) {
700
+ listener .onFailure (new IllegalStateException ("multiple tokens share the same refresh token" ));
701
+ } else {
702
+ listener .onResponse (new Tuple <>(searchResponse , attemptCount ));
703
+ }
704
+ }, e -> {
705
+ if (isShardNotAvailableException (e )) {
706
+ logger .debug ("failed to search for token document, retrying" , e );
707
+ attemptCount .incrementAndGet ();
708
+ findTokenFromRefreshToken (refreshToken , listener , attemptCount );
709
+ } else {
710
+ listener .onFailure (e );
711
+ }
712
+ }),
713
+ client ::search ));
725
714
}
726
715
}
727
716
@@ -856,33 +845,32 @@ public void findActiveTokensForRealm(String realmName, ActionListener<Collection
856
845
857
846
if (Strings .isNullOrEmpty (realmName )) {
858
847
listener .onFailure (new IllegalArgumentException ("Realm name is required" ));
859
- } else if ( securityIndex . isAvailable () == false ) {
860
- listener . onResponse ( Collections . emptyList ());
861
- } else {
862
- final Instant now = clock .instant ();
863
- final BoolQueryBuilder boolQuery = QueryBuilders .boolQuery ()
848
+ return ;
849
+ }
850
+
851
+ final Instant now = clock .instant ();
852
+ final BoolQueryBuilder boolQuery = QueryBuilders .boolQuery ()
864
853
.filter (QueryBuilders .termQuery ("doc_type" , "token" ))
865
854
.filter (QueryBuilders .termQuery ("access_token.realm" , realmName ))
866
855
.filter (QueryBuilders .boolQuery ()
867
- .should (QueryBuilders .boolQuery ()
868
- .must (QueryBuilders .termQuery ("access_token.invalidated" , false ))
869
- .must (QueryBuilders .rangeQuery ("access_token.user_token.expiration_time" ).gte (now .toEpochMilli ()))
870
- )
871
- .should (QueryBuilders .termQuery ("refresh_token.invalidated" , false ))
856
+ .should (QueryBuilders .boolQuery ()
857
+ .must (QueryBuilders .termQuery ("access_token.invalidated" , false ))
858
+ .must (QueryBuilders .rangeQuery ("access_token.user_token.expiration_time" ).gte (now .toEpochMilli ()))
859
+ )
860
+ .should (QueryBuilders .termQuery ("refresh_token.invalidated" , false ))
872
861
);
873
862
874
- final SearchRequest request = client .prepareSearch (SecurityIndexManager .SECURITY_INDEX_NAME )
863
+ final SearchRequest request = client .prepareSearch (SecurityIndexManager .SECURITY_INDEX_NAME )
875
864
.setScroll (DEFAULT_KEEPALIVE_SETTING .get (settings ))
876
865
.setQuery (boolQuery )
877
866
.setVersion (false )
878
867
.setSize (1000 )
879
868
.setFetchSource (true )
880
869
.request ();
881
870
882
- final Supplier <ThreadContext .StoredContext > supplier = client .threadPool ().getThreadContext ().newRestorableContext (false );
883
- securityIndex .checkIndexVersionThenExecute (listener ::onFailure , () ->
884
- ScrollHelper .fetchAllByEntity (client , request , new ContextPreservingActionListener <>(supplier , listener ), this ::parseHit ));
885
- }
871
+ final Supplier <ThreadContext .StoredContext > supplier = client .threadPool ().getThreadContext ().newRestorableContext (false );
872
+ securityIndex .prepareIndexIfNeededThenExecute (listener ::onFailure , () ->
873
+ ScrollHelper .fetchAllByEntity (client , request , new ContextPreservingActionListener <>(supplier , listener ), this ::parseHit ));
886
874
}
887
875
888
876
private Tuple <UserToken , String > parseHit (SearchHit hit ) {
@@ -949,12 +937,10 @@ private void ensureEnabled() {
949
937
*/
950
938
private void checkIfTokenIsRevoked (UserToken userToken , ActionListener <UserToken > listener ) {
951
939
if (securityIndex .indexExists () == false ) {
952
- // index doesn't exist so the token is considered valid. it is important to note that
953
- // we do not use isAvailable as the lack of a shard being available is not equivalent
954
- // to the index not existing in the case of revocation checking.
940
+ // index doesn't exist so the token is considered valid.
955
941
listener .onResponse (userToken );
956
942
} else {
957
- securityIndex .checkIndexVersionThenExecute (listener ::onFailure , () -> {
943
+ securityIndex .prepareIndexIfNeededThenExecute (listener ::onFailure , () -> {
958
944
MultiGetRequest mGetRequest = client .prepareMultiGet ()
959
945
.add (SecurityIndexManager .SECURITY_INDEX_NAME , TYPE , getInvalidatedTokenDocumentId (userToken ))
960
946
.add (SecurityIndexManager .SECURITY_INDEX_NAME , TYPE , getTokenDocumentId (userToken ))
0 commit comments