Skip to content

Commit 64adb5a

Browse files
authored
Set acking timeout to 0 on dynamic mapping update (#31140)
As acking can fail for any reason (unrelated node being too slow, node disconnecting), it should not be required for acking to succeed in order for index requests with dynamic mapping updates to successfully complete. Relates to #30672 and Closes #30844
1 parent b6936e3 commit 64adb5a

File tree

4 files changed

+56
-6
lines changed

4 files changed

+56
-6
lines changed

server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.elasticsearch.cluster.action.index;
2121

22-
import org.elasticsearch.ElasticsearchTimeoutException;
2322
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
2423
import org.elasticsearch.client.Client;
2524
import org.elasticsearch.client.IndicesAdminClient;
@@ -67,7 +66,7 @@ private PutMappingRequestBuilder updateMappingRequest(Index index, String type,
6766
throw new IllegalArgumentException("_default_ mapping should not be updated");
6867
}
6968
return client.preparePutMapping().setConcreteIndex(index).setType(type).setSource(mappingUpdate.toString(), XContentType.JSON)
70-
.setMasterNodeTimeout(timeout).setTimeout(timeout);
69+
.setMasterNodeTimeout(timeout).setTimeout(TimeValue.ZERO);
7170
}
7271

7372
/**
@@ -84,8 +83,6 @@ public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdat
8483
* been applied to the master node and propagated to data nodes.
8584
*/
8685
public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate, TimeValue timeout) {
87-
if (updateMappingRequest(index, type, mappingUpdate, timeout).get().isAcknowledged() == false) {
88-
throw new ElasticsearchTimeoutException("Failed to acknowledge mapping update within [" + timeout + "]");
89-
}
86+
updateMappingRequest(index, type, mappingUpdate, timeout).get();
9087
}
9188
}

server/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,13 @@ protected Settings nodeSettings(int nodeOrdinal) {
6666
* This retry logic is implemented in TransportMasterNodeAction and tested by the following master failover scenario.
6767
*/
6868
@TestLogging("_root:DEBUG")
69-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/30844")
7069
public void testMasterFailoverDuringIndexingWithMappingChanges() throws Throwable {
7170
logger.info("--> start 4 nodes, 3 master, 1 data");
7271

7372
final Settings sharedSettings = Settings.builder()
7473
.put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly
7574
.put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly
75+
.put(TestZenDiscovery.USE_ZEN2.getKey(), false)
7676
.put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
7777
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly
7878
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2)

server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
*/
2121

2222
import com.carrotsearch.hppc.cursors.IntObjectCursor;
23+
import org.elasticsearch.action.DocWriteResponse;
2324
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder;
2425
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
26+
import org.elasticsearch.action.bulk.BulkResponse;
2527
import org.elasticsearch.action.index.IndexResponse;
2628
import org.elasticsearch.action.support.ActiveShardCount;
2729
import org.elasticsearch.cluster.ClusterState;
@@ -30,6 +32,7 @@
3032
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
3133
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
3234
import org.elasticsearch.cluster.service.ClusterService;
35+
import org.elasticsearch.common.Strings;
3336
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
3437
import org.elasticsearch.common.settings.Settings;
3538
import org.elasticsearch.common.util.set.Sets;
@@ -91,6 +94,34 @@ protected Settings nodeSettings(int nodeOrdinal) {
9194
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false).build();
9295
}
9396

97+
public void testBulkWeirdScenario() throws Exception {
98+
String master = internalCluster().startMasterOnlyNode(Settings.EMPTY);
99+
internalCluster().startDataOnlyNodes(2);
100+
101+
assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder()
102+
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)).get());
103+
ensureGreen();
104+
105+
BulkResponse bulkResponse = client().prepareBulk()
106+
.add(client().prepareIndex().setIndex("test").setType("_doc").setId("1").setSource("field1", "value1"))
107+
.add(client().prepareUpdate().setIndex("test").setType("_doc").setId("1").setDoc("field2", "value2"))
108+
.execute().actionGet();
109+
110+
assertThat(bulkResponse.hasFailures(), equalTo(false));
111+
assertThat(bulkResponse.getItems().length, equalTo(2));
112+
113+
logger.info(Strings.toString(bulkResponse, true, true));
114+
115+
internalCluster().assertSeqNos();
116+
117+
assertThat(bulkResponse.getItems()[0].getResponse().getId(), equalTo("1"));
118+
assertThat(bulkResponse.getItems()[0].getResponse().getVersion(), equalTo(1L));
119+
assertThat(bulkResponse.getItems()[0].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
120+
assertThat(bulkResponse.getItems()[1].getResponse().getId(), equalTo("1"));
121+
assertThat(bulkResponse.getItems()[1].getResponse().getVersion(), equalTo(2L));
122+
assertThat(bulkResponse.getItems()[1].getResponse().getResult(), equalTo(DocWriteResponse.Result.UPDATED));
123+
}
124+
94125
private void createStaleReplicaScenario(String master) throws Exception {
95126
client().prepareIndex("test", "type1").setSource(jsonBuilder()
96127
.startObject().field("field", "value1").endObject()).get();

server/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.ElasticsearchParseException;
2323
import org.elasticsearch.Version;
24+
import org.elasticsearch.action.ActionFuture;
2425
import org.elasticsearch.action.ActionListener;
2526
import org.elasticsearch.action.index.IndexResponse;
2627
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -57,6 +58,7 @@
5758

5859
import static java.util.Collections.emptyMap;
5960
import static java.util.Collections.emptySet;
61+
import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
6062
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
6163
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
6264
import static org.hamcrest.Matchers.equalTo;
@@ -397,6 +399,24 @@ public void onFailure(Exception e) {
397399

398400
assertBusy(() -> assertTrue(client().prepareGet("index", "type", "1").get().isExists()));
399401

402+
// index another document, this time using dynamic mappings.
403+
// The ack timeout of 0 on dynamic mapping updates makes it possible for the document to be indexed on the primary, even
404+
// if the dynamic mapping update is not applied on the replica yet.
405+
ActionFuture<IndexResponse> dynamicMappingsFut = client().prepareIndex("index", "type", "2").setSource("field2", 42).execute();
406+
407+
// ...and wait for second mapping to be available on master
408+
assertBusy(() -> {
409+
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, master);
410+
final IndexService indexService = indicesService.indexServiceSafe(index);
411+
assertNotNull(indexService);
412+
final MapperService mapperService = indexService.mapperService();
413+
DocumentMapper mapper = mapperService.documentMapper("type");
414+
assertNotNull(mapper);
415+
assertNotNull(mapper.mappers().getMapper("field2"));
416+
});
417+
418+
assertBusy(() -> assertTrue(client().prepareGet("index", "type", "2").get().isExists()));
419+
400420
// The mappings have not been propagated to the replica yet as a consequence the document count not be indexed
401421
// We wait on purpose to make sure that the document is not indexed because the shard operation is stalled
402422
// and not just because it takes time to replicate the indexing request to the replica
@@ -415,6 +435,8 @@ public void onFailure(Exception e) {
415435
assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()),
416436
2, docResp.getShardInfo().getTotal()); // both shards should have succeeded
417437
});
438+
439+
assertThat(dynamicMappingsFut.get().getResult(), equalTo(CREATED));
418440
}
419441

420442
}

0 commit comments

Comments
 (0)