Skip to content

Commit fe43eef

Browse files
committed
Port Primary Terms to master #17044
Primary terms is a way to make sure that operations replicated from stale primary are rejected by shards following a newly elected primary. Original PRs adding this to the seq# feature branch #14062 , #14651 . Unlike those PR, here we take a different approach (based on newer code in master) where the primary terms are stored in the meta data only (and not in `ShardRouting` objects). Relates to #17038 Closes #17044
1 parent 85b06f4 commit fe43eef

24 files changed

+1060
-303
lines changed

core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java

+14
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
5151
*/
5252
protected ShardId shardId;
5353

54+
long primaryTerm;
55+
5456
protected TimeValue timeout = DEFAULT_TIMEOUT;
5557
protected String index;
5658

@@ -148,6 +150,16 @@ long routedBasedOnClusterVersion() {
148150
return routedBasedOnClusterVersion;
149151
}
150152

153+
/** returns the primary term active at the time the operation was performed on the primary shard */
154+
public long primaryTerm() {
155+
return primaryTerm;
156+
}
157+
158+
/** marks the primary term in which the operation was performed */
159+
public void primaryTerm(long term) {
160+
primaryTerm = term;
161+
}
162+
151163
@Override
152164
public ActionRequestValidationException validate() {
153165
ActionRequestValidationException validationException = null;
@@ -169,6 +181,7 @@ public void readFrom(StreamInput in) throws IOException {
169181
timeout = TimeValue.readTimeValue(in);
170182
index = in.readString();
171183
routedBasedOnClusterVersion = in.readVLong();
184+
primaryTerm = in.readVLong();
172185
}
173186

174187
@Override
@@ -184,6 +197,7 @@ public void writeTo(StreamOutput out) throws IOException {
184197
timeout.writeTo(out);
185198
out.writeString(index);
186199
out.writeVLong(routedBasedOnClusterVersion);
200+
out.writeVLong(primaryTerm);
187201
}
188202

189203
@Override

core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

+29-35
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
5353
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
5454
import org.elasticsearch.common.util.concurrent.ThreadContext;
55-
import org.elasticsearch.index.Index;
5655
import org.elasticsearch.index.IndexService;
5756
import org.elasticsearch.index.engine.VersionConflictEngineException;
5857
import org.elasticsearch.index.shard.IndexShard;
@@ -359,32 +358,7 @@ public void onTimeout(TimeValue timeout) {
359358
}
360359
});
361360
} else {
362-
try {
363-
failReplicaIfNeeded(t);
364-
} catch (Throwable unexpected) {
365-
logger.error("{} unexpected error while failing replica", unexpected, request.shardId().id());
366-
} finally {
367361
responseWithFailure(t);
368-
}
369-
}
370-
}
371-
372-
private void failReplicaIfNeeded(Throwable t) {
373-
Index index = request.shardId().getIndex();
374-
int shardId = request.shardId().id();
375-
logger.trace("failure on replica [{}][{}], action [{}], request [{}]", t, index, shardId, actionName, request);
376-
if (ignoreReplicaException(t) == false) {
377-
IndexService indexService = indicesService.indexService(index);
378-
if (indexService == null) {
379-
logger.debug("ignoring failed replica {}[{}] because index was already removed.", index, shardId);
380-
return;
381-
}
382-
IndexShard indexShard = indexService.getShardOrNull(shardId);
383-
if (indexShard == null) {
384-
logger.debug("ignoring failed replica {}[{}] because index was already removed.", index, shardId);
385-
return;
386-
}
387-
indexShard.failShard(actionName + " failed on replica", t);
388362
}
389363
}
390364

@@ -401,7 +375,7 @@ protected void responseWithFailure(Throwable t) {
401375
protected void doRun() throws Exception {
402376
setPhase(task, "replica");
403377
assert request.shardId() != null : "request shardId must be set";
404-
try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId())) {
378+
try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId(), request.primaryTerm())) {
405379
shardOperationOnReplica(request);
406380
if (logger.isTraceEnabled()) {
407381
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, request.shardId(), request);
@@ -707,7 +681,6 @@ protected void doRun() throws Exception {
707681
indexShardReference = getIndexShardReferenceOnPrimary(shardId);
708682
if (indexShardReference.isRelocated() == false) {
709683
executeLocally();
710-
711684
} else {
712685
executeRemotely();
713686
}
@@ -716,6 +689,7 @@ protected void doRun() throws Exception {
716689
private void executeLocally() throws Exception {
717690
// execute locally
718691
Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(state.metaData(), request);
692+
primaryResponse.v2().primaryTerm(indexShardReference.opPrimaryTerm());
719693
if (logger.isTraceEnabled()) {
720694
logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version());
721695
}
@@ -825,17 +799,17 @@ void finishBecauseUnavailable(ShardId shardId, String message) {
825799
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
826800
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
827801
IndexShard indexShard = indexService.getShard(shardId.id());
828-
return new IndexShardReferenceImpl(indexShard, true);
802+
return IndexShardReferenceImpl.createOnPrimary(indexShard);
829803
}
830804

831805
/**
832806
* returns a new reference to {@link IndexShard} on a node that the request is replicated to. The reference is closed as soon as
833807
* replication is completed on the node.
834808
*/
835-
protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId) {
809+
protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, long primaryTerm) {
836810
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
837811
IndexShard indexShard = indexService.getShard(shardId.id());
838-
return new IndexShardReferenceImpl(indexShard, false);
812+
return IndexShardReferenceImpl.createOnReplica(indexShard, primaryTerm);
839813
}
840814

841815
/**
@@ -1098,9 +1072,13 @@ private void doFinish() {
10981072
totalShards,
10991073
success.get(),
11001074
failuresArray
1101-
11021075
)
11031076
);
1077+
if (logger.isTraceEnabled()) {
1078+
logger.trace("finished replicating action [{}], request [{}], shardInfo [{}]", actionName, replicaRequest,
1079+
finalResponse.getShardInfo());
1080+
}
1081+
11041082
try {
11051083
channel.sendResponse(finalResponse);
11061084
} catch (IOException responseException) {
@@ -1125,22 +1103,33 @@ interface IndexShardReference extends Releasable {
11251103
boolean isRelocated();
11261104
void failShard(String reason, @Nullable Throwable e);
11271105
ShardRouting routingEntry();
1106+
1107+
/** returns the primary term of the current operation */
1108+
long opPrimaryTerm();
11281109
}
11291110

11301111
static final class IndexShardReferenceImpl implements IndexShardReference {
11311112

11321113
private final IndexShard indexShard;
11331114
private final Releasable operationLock;
11341115

1135-
IndexShardReferenceImpl(IndexShard indexShard, boolean primaryAction) {
1116+
private IndexShardReferenceImpl(IndexShard indexShard, long primaryTerm) {
11361117
this.indexShard = indexShard;
1137-
if (primaryAction) {
1118+
if (primaryTerm < 0) {
11381119
operationLock = indexShard.acquirePrimaryOperationLock();
11391120
} else {
1140-
operationLock = indexShard.acquireReplicaOperationLock();
1121+
operationLock = indexShard.acquireReplicaOperationLock(primaryTerm);
11411122
}
11421123
}
11431124

1125+
static IndexShardReferenceImpl createOnPrimary(IndexShard indexShard) {
1126+
return new IndexShardReferenceImpl(indexShard, -1);
1127+
}
1128+
1129+
static IndexShardReferenceImpl createOnReplica(IndexShard indexShard, long primaryTerm) {
1130+
return new IndexShardReferenceImpl(indexShard, primaryTerm);
1131+
}
1132+
11441133
@Override
11451134
public void close() {
11461135
operationLock.close();
@@ -1160,6 +1149,11 @@ public void failShard(String reason, @Nullable Throwable e) {
11601149
public ShardRouting routingEntry() {
11611150
return indexShard.routingEntry();
11621151
}
1152+
1153+
@Override
1154+
public long opPrimaryTerm() {
1155+
return indexShard.getPrimaryTerm();
1156+
}
11631157
}
11641158

11651159
protected final void processAfterWrite(boolean refresh, IndexShard indexShard, Translog.Location location) {

core/src/main/java/org/elasticsearch/cluster/ClusterState.java

+34-17
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363

6464
/**
6565
* Represents the current state of the cluster.
66-
*
66+
* <p>
6767
* The cluster state object is immutable with an
6868
* exception of the {@link RoutingNodes} structure, which is built on demand from the {@link RoutingTable},
6969
* and cluster state {@link #status}, which is updated during cluster state publishing and applying
@@ -74,7 +74,7 @@
7474
* the type of discovery. For example, for local discovery it is implemented by the {@link LocalDiscovery#publish}
7575
* method. In the Zen Discovery it is handled in the {@link PublishClusterStateAction#publish} method. The
7676
* publishing mechanism can be overridden by other discovery.
77-
*
77+
* <p>
7878
* The cluster state implements the {@link Diffable} interface in order to support publishing of cluster state
7979
* differences instead of the entire state on each change. The publishing mechanism should only send differences
8080
* to a node if this node was present in the previous version of the cluster state. If a node is not present was
@@ -135,7 +135,7 @@ public static <T extends Custom> T lookupPrototype(String type) {
135135

136136
public static <T extends Custom> T lookupPrototypeSafe(String type) {
137137
@SuppressWarnings("unchecked")
138-
T proto = (T)customPrototypes.get(type);
138+
T proto = (T) customPrototypes.get(type);
139139
if (proto == null) {
140140
throw new IllegalArgumentException("No custom state prototype registered for type [" + type + "], node likely missing plugins");
141141
}
@@ -281,6 +281,16 @@ public String prettyPrint() {
281281
sb.append("state uuid: ").append(stateUUID).append("\n");
282282
sb.append("from_diff: ").append(wasReadFromDiff).append("\n");
283283
sb.append("meta data version: ").append(metaData.version()).append("\n");
284+
for (IndexMetaData indexMetaData : metaData) {
285+
final String TAB = " ";
286+
sb.append(TAB).append(indexMetaData.getIndex());
287+
sb.append(": v[").append(indexMetaData.getVersion()).append("]\n");
288+
for (int shard = 0; shard < indexMetaData.getNumberOfShards(); shard++) {
289+
sb.append(TAB).append(TAB).append(shard).append(": ");
290+
sb.append("p_term [").append(indexMetaData.primaryTerm(shard)).append("], ");
291+
sb.append("a_ids ").append(indexMetaData.activeAllocationIds(shard)).append("\n");
292+
}
293+
}
284294
sb.append(blocks().prettyPrint());
285295
sb.append(nodes().prettyPrint());
286296
sb.append(routingTable().prettyPrint());
@@ -477,6 +487,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
477487
}
478488
builder.endArray();
479489

490+
builder.startObject(IndexMetaData.KEY_PRIMARY_TERMS);
491+
for (int shard = 0; shard < indexMetaData.getNumberOfShards(); shard++) {
492+
builder.field(Integer.toString(shard), indexMetaData.primaryTerm(shard));
493+
}
494+
builder.endObject();
495+
480496
builder.startObject(IndexMetaData.KEY_ACTIVE_ALLOCATIONS);
481497
for (IntObjectCursor<Set<String>> cursor : indexMetaData.getActiveAllocationIds()) {
482498
builder.startArray(String.valueOf(cursor.key));
@@ -487,6 +503,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
487503
}
488504
builder.endObject();
489505

506+
// index metadata
490507
builder.endObject();
491508
}
492509
builder.endObject();
@@ -683,16 +700,16 @@ public static byte[] toBytes(ClusterState state) throws IOException {
683700
}
684701

685702
/**
686-
* @param data input bytes
687-
* @param localNode used to set the local node in the cluster state.
703+
* @param data input bytes
704+
* @param localNode used to set the local node in the cluster state.
688705
*/
689706
public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode) throws IOException {
690707
return readFrom(StreamInput.wrap(data), localNode);
691708
}
692709

693710
/**
694-
* @param in input stream
695-
* @param localNode used to set the local node in the cluster state. can be null.
711+
* @param in input stream
712+
* @param localNode used to set the local node in the cluster state. can be null.
696713
*/
697714
public static ClusterState readFrom(StreamInput in, @Nullable DiscoveryNode localNode) throws IOException {
698715
return PROTO.readFrom(in, localNode);
@@ -791,17 +808,17 @@ public ClusterStateDiff(StreamInput in, ClusterState proto) throws IOException {
791808
metaData = proto.metaData.readDiffFrom(in);
792809
blocks = proto.blocks.readDiffFrom(in);
793810
customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(),
794-
new DiffableUtils.DiffableValueSerializer<String, Custom>() {
795-
@Override
796-
public Custom read(StreamInput in, String key) throws IOException {
797-
return lookupPrototypeSafe(key).readFrom(in);
798-
}
811+
new DiffableUtils.DiffableValueSerializer<String, Custom>() {
812+
@Override
813+
public Custom read(StreamInput in, String key) throws IOException {
814+
return lookupPrototypeSafe(key).readFrom(in);
815+
}
799816

800-
@Override
801-
public Diff<Custom> readDiff(StreamInput in, String key) throws IOException {
802-
return lookupPrototypeSafe(key).readDiffFrom(in);
803-
}
804-
});
817+
@Override
818+
public Diff<Custom> readDiff(StreamInput in, String key) throws IOException {
819+
return lookupPrototypeSafe(key).readDiffFrom(in);
820+
}
821+
});
805822
}
806823

807824
@Override

0 commit comments

Comments
 (0)