@@ -347,13 +347,12 @@ private void getUserTokenFromId(String userTokenId, Version tokenVersion, Action
347
347
logger .warn ("failed to get access token [{}] because index [{}] is not available" , userTokenId , tokensIndex .aliasName ());
348
348
listener .onResponse (null );
349
349
} else {
350
+ final GetRequest getRequest = client .prepareGet (tokensIndex .aliasName (), SINGLE_MAPPING_NAME ,
351
+ getTokenDocumentId (userTokenId )).request ();
352
+ final Consumer <Exception > onFailure = ex -> listener .onFailure (traceLog ("decode token" , userTokenId , ex ));
350
353
tokensIndex .checkIndexVersionThenExecute (
351
354
ex -> listener .onFailure (traceLog ("prepare tokens index [" + tokensIndex .aliasName () +"]" , userTokenId , ex )),
352
- () -> {
353
- final GetRequest getRequest = client .prepareGet (tokensIndex .aliasName (), SINGLE_MAPPING_NAME ,
354
- getTokenDocumentId (userTokenId )).request ();
355
- Consumer <Exception > onFailure = ex -> listener .onFailure (traceLog ("decode token" , userTokenId , ex ));
356
- executeAsyncWithOrigin (client .threadPool ().getThreadContext (), SECURITY_ORIGIN , getRequest ,
355
+ () -> executeAsyncWithOrigin (client .threadPool ().getThreadContext (), SECURITY_ORIGIN , getRequest ,
357
356
ActionListener .<GetResponse >wrap (response -> {
358
357
if (response .isExists ()) {
359
358
Map <String , Object > accessTokenSource =
@@ -384,8 +383,8 @@ private void getUserTokenFromId(String userTokenId, Version tokenVersion, Action
384
383
logger .error (new ParameterizedMessage ("failed to get access token [{}]" , userTokenId ), e );
385
384
listener .onFailure (e );
386
385
}
387
- }), client ::get );
388
- } );
386
+ }), client ::get )
387
+ );
389
388
}
390
389
}
391
390
@@ -862,7 +861,9 @@ private void innerRefresh(String tokenDocId, Map<String, Object> source, long se
862
861
.setRefreshPolicy (RefreshPolicy .IMMEDIATE )
863
862
.setIfSeqNo (seqNo )
864
863
.setIfPrimaryTerm (primaryTerm );
865
- executeAsyncWithOrigin (client .threadPool ().getThreadContext (), SECURITY_ORIGIN , updateRequest .request (),
864
+ refreshedTokenIndex .prepareIndexIfNeededThenExecute (
865
+ ex -> listener .onFailure (traceLog ("prepare index [" + refreshedTokenIndex .aliasName () + "]" , ex )),
866
+ () -> executeAsyncWithOrigin (client .threadPool ().getThreadContext (), SECURITY_ORIGIN , updateRequest .request (),
866
867
ActionListener .<UpdateResponse >wrap (updateResponse -> {
867
868
if (updateResponse .getResult () == DocWriteResponse .Result .UPDATED ) {
868
869
logger .debug (() -> new ParameterizedMessage ("updated the original token document to {}" ,
@@ -931,7 +932,7 @@ public void onFailure(Exception e) {
931
932
} else {
932
933
onFailure .accept (e );
933
934
}
934
- }), client ::update );
935
+ }), client ::update )) ;
935
936
}
936
937
}
937
938
@@ -1005,7 +1006,9 @@ private void getSupersedingTokenDocAsync(RefreshTokenStatus refreshTokenStatus,
1005
1006
1006
1007
private void getTokenDocAsync (String tokenDocId , SecurityIndexManager tokensIndex , ActionListener <GetResponse > listener ) {
1007
1008
final GetRequest getRequest = client .prepareGet (tokensIndex .aliasName (), SINGLE_MAPPING_NAME , tokenDocId ).request ();
1008
- executeAsyncWithOrigin (client .threadPool ().getThreadContext (), SECURITY_ORIGIN , getRequest , listener , client ::get );
1009
+ tokensIndex .checkIndexVersionThenExecute (
1010
+ ex -> listener .onFailure (traceLog ("prepare tokens index [" + tokensIndex .aliasName () + "]" , tokenDocId , ex )),
1011
+ () -> executeAsyncWithOrigin (client .threadPool ().getThreadContext (), SECURITY_ORIGIN , getRequest , listener , client ::get ));
1009
1012
}
1010
1013
1011
1014
private Version getTokenVersionCompatibility () {
@@ -1392,10 +1395,10 @@ private void checkIfTokenIsValid(UserToken userToken, ActionListener<UserToken>
1392
1395
logger .warn ("failed to validate access token because the index [" + tokensIndex .aliasName () + "] doesn't exist" );
1393
1396
listener .onResponse (null );
1394
1397
} else {
1398
+ final GetRequest getRequest = client
1399
+ .prepareGet (tokensIndex .aliasName (), SINGLE_MAPPING_NAME , getTokenDocumentId (userToken )).request ();
1400
+ Consumer <Exception > onFailure = ex -> listener .onFailure (traceLog ("check token state" , userToken .getId (), ex ));
1395
1401
tokensIndex .checkIndexVersionThenExecute (listener ::onFailure , () -> {
1396
- final GetRequest getRequest = client
1397
- .prepareGet (tokensIndex .aliasName (), SINGLE_MAPPING_NAME , getTokenDocumentId (userToken )).request ();
1398
- Consumer <Exception > onFailure = ex -> listener .onFailure (traceLog ("check token state" , userToken .getId (), ex ));
1399
1402
executeAsyncWithOrigin (client .threadPool ().getThreadContext (), SECURITY_ORIGIN , getRequest ,
1400
1403
ActionListener .<GetResponse >wrap (response -> {
1401
1404
if (response .isExists ()) {
0 commit comments