Skip to content

Commit 3d5afcb

Browse files
committed
fix double version increments + some improved docs
1 parent f5499d3 commit 3d5afcb

File tree

5 files changed

+38
-18
lines changed

5 files changed

+38
-18
lines changed

core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -285,9 +285,9 @@ public long getVersion() {
285285

286286
/**
287287
* The term of the current selected primary. This is a non-negative number incremented when
288-
* a primary shard is assigned after a full cluster restart or a replica shard is promoted
289-
* to a primary (see {@link ShardRouting#moveToPrimary()})
290-
*/
288+
* a primary shard is assigned after a full cluster restart (see {@link ShardRouting#initialize(java.lang.String, long)}
289+
* or a replica shard is promoted to a primary (see {@link ShardRouting#moveToPrimary()}).
290+
**/
291291
public int primaryTerm(int shardId) {
292292
return this.primaryTerms[shardId];
293293
}
@@ -632,7 +632,6 @@ public Builder index(String index) {
632632

633633
public Builder numberOfShards(int numberOfShards) {
634634
settings = settingsBuilder().put(settings).put(SETTING_NUMBER_OF_SHARDS, numberOfShards).build();
635-
primaryTerms = new int[numberOfShards];
636635
return this;
637636
}
638637

@@ -736,13 +735,21 @@ public Builder version(long version) {
736735
return this;
737736
}
738737

738+
/**
739+
* returns the primary term for the given shard.
740+
* See {@link IndexMetaData#primaryTerm(int)} for more information.
741+
*/
739742
public int primaryTerm(int shardId) {
740743
if (primaryTerms == null) {
741744
initializePrimaryTerms();
742745
}
743746
return this.primaryTerms[shardId];
744747
}
745748

749+
/**
750+
* sets the primary term for the given shard.
751+
* See {@link IndexMetaData#primaryTerm(int)} for more information.
752+
*/
746753
public Builder primaryTerm(int shardId, int primaryTerm) {
747754
if (primaryTerms == null) {
748755
initializePrimaryTerms();
@@ -795,7 +802,6 @@ static final class Fields {
795802
static final XContentBuilderString PRIMARY_TERMS = new XContentBuilderString("primary_terms");
796803
}
797804

798-
799805
public static void toXContent(IndexMetaData indexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException {
800806
builder.startObject(indexMetaData.getIndex(), XContentBuilder.FieldCaseConversion.NONE);
801807

core/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java

+3
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ public String getIndex() {
100100

101101
/**
102102
* creates a new {@link IndexRoutingTable} with all shard versions & primary terms set to the highest found.
103+
* This allows incrementing {@link ShardRouting#version()} and {@link ShardRouting#primaryTerm()} where we work on
104+
* the individual shards without worrying about synchronization between {@link ShardRouting} instances. This method
105+
* takes care of it.
103106
*
104107
* @return new {@link IndexRoutingTable}
105108
*/

core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,10 @@ public boolean primary() {
256256
}
257257

258258
/**
259-
* Returns the term of the current primary shard for this shard. The term is incremented with every primary promotion/initial assignment
259+
* Returns the term of the current primary shard for this shard.
260+
* The term is incremented with every primary promotion/initial assignment.
261+
*
262+
* See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)} for more info.
260263
*/
261264
public int primaryTerm() {
262265
return this.primaryTerm;

core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,7 @@ protected RoutingAllocation.Result buildChangedResult(MetaData metaData, Routing
9090

9191
}
9292
protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes, RoutingExplanations explanations) {
93-
final RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes)
94-
.version(routingNodes.getRoutingTable().version() + 1).build();
93+
final RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build();
9594
MetaData newMetaData = updateMetaDataWithRoutingTable(metaData,routingTable);
9695
return new RoutingAllocation.Result(true, routingTable.validateRaiseException(newMetaData), newMetaData, explanations);
9796
}
@@ -136,7 +135,7 @@ static MetaData updateMetaDataWithRoutingTable(MetaData currentMetaData, Routing
136135
}
137136
}
138137
if (metaDataBuilder != null) {
139-
return metaDataBuilder.version(currentMetaData.version() + 1).build();
138+
return metaDataBuilder.build();
140139
} else {
141140
return currentMetaData;
142141
}

core/src/test/java/org/elasticsearch/cluster/routing/RoutingTableTests.java

+18-9
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.elasticsearch.common.settings.Settings;
3232
import org.elasticsearch.index.IndexNotFoundException;
3333
import org.elasticsearch.test.ESAllocationTestCase;
34-
import org.junit.Before;
3534
import org.junit.Test;
3635

3736
import java.util.*;
@@ -97,9 +96,8 @@ private void initPrimaries() {
9796
}
9897
this.clusterState = ClusterState.builder(clusterState).nodes(discoBuilder).build();
9998
RoutingAllocation.Result rerouteResult = ALLOCATION_SERVICE.reroute(clusterState);
100-
this.testRoutingTable = rerouteResult.routingTable();
10199
assertThat(rerouteResult.changed(), is(true));
102-
this.clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
100+
applyRerouteResult(rerouteResult);
103101
versionsPerIndex.keySet().forEach(this::incrementVersion);
104102
primaryTermsPerIndex.keySet().forEach(this::incrementPrimaryTerm);
105103
}
@@ -130,11 +128,25 @@ private void startInitializingShards(String index) {
130128
this.clusterState = ClusterState.builder(clusterState).routingTable(this.testRoutingTable).build();
131129
logger.info("start primary shards for index " + index);
132130
RoutingAllocation.Result rerouteResult = ALLOCATION_SERVICE.applyStartedShards(this.clusterState, this.clusterState.getRoutingNodes().shardsWithState(index, INITIALIZING));
133-
this.clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
134-
this.testRoutingTable = rerouteResult.routingTable();
131+
// TODO: this simulate the code in InternalClusterState.UpdateTask.run() we should unify this.
132+
applyRerouteResult(rerouteResult);
135133
incrementVersion(index);
136134
}
137135

136+
private void applyRerouteResult(RoutingAllocation.Result rerouteResult) {
137+
ClusterState previousClusterState = this.clusterState;
138+
ClusterState newClusterState = ClusterState.builder(previousClusterState).routingResult(rerouteResult).build();
139+
ClusterState.Builder builder = ClusterState.builder(newClusterState).incrementVersion();
140+
if (previousClusterState.routingTable() != newClusterState.routingTable()) {
141+
builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1).build());
142+
}
143+
if (previousClusterState.metaData() != newClusterState.metaData()) {
144+
builder.metaData(MetaData.builder(newClusterState.metaData()).version(newClusterState.metaData().version() + 1));
145+
}
146+
this.clusterState = builder.build();
147+
this.testRoutingTable = rerouteResult.routingTable();
148+
}
149+
138150
private void failSomePrimaries(String index) {
139151
this.clusterState = ClusterState.builder(clusterState).routingTable(this.testRoutingTable).build();
140152
final IndexRoutingTable indexShardRoutingTable = testRoutingTable.index(index);
@@ -151,10 +163,7 @@ private void failSomePrimaries(String index) {
151163
incrementVersion(index, shard); // and another time when the primary flag is set to false
152164
}
153165
RoutingAllocation.Result rerouteResult = ALLOCATION_SERVICE.applyFailedShards(this.clusterState, failedShards);
154-
assertThat(rerouteResult.routingTable().version(), greaterThan(clusterState.routingTable().version()));
155-
assertThat(rerouteResult.metaData().version(), greaterThan(clusterState.metaData().version()));
156-
this.clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
157-
this.testRoutingTable = rerouteResult.routingTable();
166+
applyRerouteResult(rerouteResult);
158167
}
159168

160169
private IndexMetaData.Builder createIndexMetaData(String indexName) {

0 commit comments

Comments
 (0)