22
22
import org .elasticsearch .common .xcontent .XContentType ;
23
23
import org .elasticsearch .common .xcontent .support .XContentMapValues ;
24
24
import org .elasticsearch .index .IndexSettings ;
25
+ import org .elasticsearch .index .mapper .DocumentMapper ;
25
26
import org .elasticsearch .index .shard .IndexShard ;
26
27
import org .elasticsearch .index .shard .ShardId ;
27
28
import org .elasticsearch .indices .IndicesService ;
@@ -230,8 +231,7 @@ public void testAddNewReplicasOnFollower() throws Exception {
230
231
pauseFollow ("follower-index" );
231
232
}
232
233
233
- @ AwaitsFix (bugUrl = "https://github.com/elastic/elasticsearch/issues/37807" )
234
- public void testReadRequestsReturnsLatestMappingVersion () throws Exception {
234
+ public void testReadRequestsReturnLatestMappingVersion () throws Exception {
235
235
InternalTestCluster leaderCluster = getLeaderCluster ();
236
236
Settings nodeAttributes = Settings .builder ().put ("node.attr.box" , "large" ).build ();
237
237
String dataNode = leaderCluster .startDataOnlyNode (nodeAttributes );
@@ -244,6 +244,9 @@ public void testReadRequestsReturnsLatestMappingVersion() throws Exception {
244
244
.put ("index.routing.allocation.require.box" , "large" ))
245
245
.get ()
246
246
);
247
+ getFollowerCluster ().startDataOnlyNode (nodeAttributes );
248
+ followerClient ().execute (PutFollowAction .INSTANCE , putFollow ("leader-index" , "follower-index" )).get ();
249
+ ensureFollowerGreen ("follower-index" );
247
250
ClusterService clusterService = leaderCluster .clusterService (dataNode );
248
251
ShardId shardId = clusterService .state ().routingTable ().index ("leader-index" ).shard (0 ).shardId ();
249
252
IndicesService indicesService = leaderCluster .getInstance (IndicesService .class , dataNode );
@@ -265,22 +268,30 @@ public void testReadRequestsReturnsLatestMappingVersion() throws Exception {
265
268
});
266
269
leaderCluster .client ().admin ().indices ().preparePutMapping ().setType ("doc" )
267
270
.setSource ("balance" , "type=long" ).setTimeout (TimeValue .ZERO ).get ();
268
- IndexResponse indexResp = leaderCluster .client (dataNode ).prepareIndex ("leader-index" , "doc" , "1" )
269
- .setSource ("{\" balance\" : 100}" , XContentType .JSON ).setTimeout (TimeValue .ZERO ).get ();
270
- assertThat (indexResp .getResult (), equalTo (DocWriteResponse .Result .CREATED ));
271
- assertThat (indexShard .getGlobalCheckpoint (), equalTo (0L ));
272
- getFollowerCluster ().startDataOnlyNode (nodeAttributes );
273
- followerClient ().execute (PutFollowAction .INSTANCE , putFollow ("leader-index" , "follower-index" )).get ();
274
- ensureFollowerGreen ("follower-index" );
275
- // Make sure at least one read-request which requires mapping sync is completed.
276
- assertBusy (() -> {
277
- CcrClient ccrClient = new CcrClient (followerClient ());
278
- FollowStatsAction .StatsResponses responses = ccrClient .followStats (new FollowStatsAction .StatsRequest ()).actionGet ();
279
- long bytesRead = responses .getStatsResponses ().stream ().mapToLong (r -> r .status ().bytesRead ()).sum ();
280
- assertThat (bytesRead , Matchers .greaterThan (0L ));
281
- }, 60 , TimeUnit .SECONDS );
282
- latch .countDown ();
283
- assertIndexFullyReplicatedToFollower ("leader-index" , "follower-index" );
284
- pauseFollow ("follower-index" );
271
+ try {
272
+ // Make sure the mapping is ready on the shard before we execute the index request; otherwise the index request
273
+ // will perform a dynamic mapping update which however will be blocked because the latch is remained closed.
274
+ assertBusy (() -> {
275
+ DocumentMapper mapper = indexShard .mapperService ().documentMapper ("doc" );
276
+ assertNotNull (mapper );
277
+ assertNotNull (mapper .mappers ().getMapper ("balance" ));
278
+ });
279
+ IndexResponse indexResp = leaderCluster .client ().prepareIndex ("leader-index" , "doc" , "1" )
280
+ .setSource ("{\" balance\" : 100}" , XContentType .JSON ).setTimeout (TimeValue .ZERO ).get ();
281
+ assertThat (indexResp .getResult (), equalTo (DocWriteResponse .Result .CREATED ));
282
+ assertThat (indexShard .getGlobalCheckpoint (), equalTo (0L ));
283
+ // Make sure at least one read-request which requires mapping sync is completed.
284
+ assertBusy (() -> {
285
+ CcrClient ccrClient = new CcrClient (followerClient ());
286
+ FollowStatsAction .StatsResponses responses = ccrClient .followStats (new FollowStatsAction .StatsRequest ()).actionGet ();
287
+ long bytesRead = responses .getStatsResponses ().stream ().mapToLong (r -> r .status ().bytesRead ()).sum ();
288
+ assertThat (bytesRead , Matchers .greaterThan (0L ));
289
+ }, 60 , TimeUnit .SECONDS );
290
+ latch .countDown ();
291
+ assertIndexFullyReplicatedToFollower ("leader-index" , "follower-index" );
292
+ } finally {
293
+ latch .countDown (); // no effect if latch was counted down - this makes sure teardown can make progress.
294
+ pauseFollow ("follower-index" );
295
+ }
285
296
}
286
297
}
0 commit comments