20
20
package org .elasticsearch .client ;
21
21
22
22
import org .apache .http .util .EntityUtils ;
23
+ import org .apache .logging .log4j .message .ParameterizedMessage ;
23
24
import org .elasticsearch .action .admin .cluster .settings .ClusterUpdateSettingsRequest ;
24
25
import org .elasticsearch .action .admin .cluster .settings .ClusterUpdateSettingsResponse ;
25
26
import org .elasticsearch .action .admin .indices .close .CloseIndexRequest ;
36
37
import org .elasticsearch .client .ccr .FollowStatsResponse ;
37
38
import org .elasticsearch .client .ccr .GetAutoFollowPatternRequest ;
38
39
import org .elasticsearch .client .ccr .GetAutoFollowPatternResponse ;
40
+ import org .elasticsearch .client .ccr .IndicesFollowStats ;
39
41
import org .elasticsearch .client .ccr .IndicesFollowStats .ShardFollowStats ;
40
42
import org .elasticsearch .client .ccr .PauseFollowRequest ;
41
43
import org .elasticsearch .client .ccr .PutAutoFollowPatternRequest ;
@@ -85,7 +87,6 @@ public void setupRemoteClusterConfig() throws Exception {
85
87
});
86
88
}
87
89
88
- @ AwaitsFix (bugUrl ="https://github.com/elastic/elasticsearch/issues/36339" )
89
90
public void testIndexFollowing () throws Exception {
90
91
CcrClient ccrClient = highLevelClient ().ccr ();
91
92
@@ -109,21 +110,33 @@ public void testIndexFollowing() throws Exception {
109
110
SearchResponse leaderSearchResponse = highLevelClient ().search (leaderSearchRequest , RequestOptions .DEFAULT );
110
111
assertThat (leaderSearchResponse .getHits ().getTotalHits ().value , equalTo (1L ));
111
112
112
- assertBusy (() -> {
113
- FollowStatsRequest followStatsRequest = new FollowStatsRequest ("follower" );
114
- FollowStatsResponse followStatsResponse =
115
- execute (followStatsRequest , ccrClient ::getFollowStats , ccrClient ::getFollowStatsAsync );
116
- List <ShardFollowStats > shardFollowStats = followStatsResponse .getIndicesFollowStats ().getShardFollowStats ("follower" );
117
- long followerGlobalCheckpoint = shardFollowStats .stream ()
118
- .mapToLong (ShardFollowStats ::getFollowerGlobalCheckpoint )
119
- .max ()
120
- .getAsLong ();
121
- assertThat (followerGlobalCheckpoint , equalTo (0L ));
122
-
123
- SearchRequest followerSearchRequest = new SearchRequest ("follower" );
124
- SearchResponse followerSearchResponse = highLevelClient ().search (followerSearchRequest , RequestOptions .DEFAULT );
125
- assertThat (followerSearchResponse .getHits ().getTotalHits ().value , equalTo (1L ));
126
- });
113
+ try {
114
+ assertBusy (() -> {
115
+ FollowStatsRequest followStatsRequest = new FollowStatsRequest ("follower" );
116
+ FollowStatsResponse followStatsResponse =
117
+ execute (followStatsRequest , ccrClient ::getFollowStats , ccrClient ::getFollowStatsAsync );
118
+ List <ShardFollowStats > shardFollowStats = followStatsResponse .getIndicesFollowStats ().getShardFollowStats ("follower" );
119
+ long followerGlobalCheckpoint = shardFollowStats .stream ()
120
+ .mapToLong (ShardFollowStats ::getFollowerGlobalCheckpoint )
121
+ .max ()
122
+ .getAsLong ();
123
+ assertThat (followerGlobalCheckpoint , equalTo (0L ));
124
+
125
+ SearchRequest followerSearchRequest = new SearchRequest ("follower" );
126
+ SearchResponse followerSearchResponse = highLevelClient ().search (followerSearchRequest , RequestOptions .DEFAULT );
127
+ assertThat (followerSearchResponse .getHits ().getTotalHits ().value , equalTo (1L ));
128
+ });
129
+ } catch (Exception e ) {
130
+ IndicesFollowStats followStats = ccrClient .getCcrStats (new CcrStatsRequest (), RequestOptions .DEFAULT ).getIndicesFollowStats ();
131
+ for (Map .Entry <String , List <ShardFollowStats >> entry : followStats .getShardFollowStats ().entrySet ()) {
132
+ for (ShardFollowStats shardFollowStats : entry .getValue ()) {
133
+ if (shardFollowStats .getFatalException () != null ) {
134
+ logger .warn (new ParameterizedMessage ("fatal shard follow exception {}" , shardFollowStats .getShardId ()),
135
+ shardFollowStats .getFatalException ());
136
+ }
137
+ }
138
+ }
139
+ }
127
140
128
141
PauseFollowRequest pauseFollowRequest = new PauseFollowRequest ("follower" );
129
142
AcknowledgedResponse pauseFollowResponse = execute (pauseFollowRequest , ccrClient ::pauseFollow , ccrClient ::pauseFollowAsync );
0 commit comments