Skip to content

Port Primary Terms to master #17044

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
*/
protected ShardId shardId;

long primaryTerm;

protected TimeValue timeout = DEFAULT_TIMEOUT;
protected String index;

Expand Down Expand Up @@ -148,6 +150,16 @@ long routedBasedOnClusterVersion() {
return routedBasedOnClusterVersion;
}

/** returns the primary term active at the time the operation was performed on the primary shard */
public long primaryTerm() {
return primaryTerm;
}

/** marks the primary term in which the operation was performed */
public void primaryTerm(long term) {
primaryTerm = term;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this can be reused to replace routedBasedOnClusterVersion (#16274). Yay 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting idea - it's currently not incremented when relocating a primary though... requires more thought.


@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand All @@ -169,6 +181,7 @@ public void readFrom(StreamInput in) throws IOException {
timeout = TimeValue.readTimeValue(in);
index = in.readString();
routedBasedOnClusterVersion = in.readVLong();
primaryTerm = in.readVLong();
}

@Override
Expand All @@ -184,6 +197,7 @@ public void writeTo(StreamOutput out) throws IOException {
timeout.writeTo(out);
out.writeString(index);
out.writeVLong(routedBasedOnClusterVersion);
out.writeVLong(primaryTerm);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -359,32 +358,7 @@ public void onTimeout(TimeValue timeout) {
}
});
} else {
try {
failReplicaIfNeeded(t);
} catch (Throwable unexpected) {
logger.error("{} unexpected error while failing replica", unexpected, request.shardId().id());
} finally {
responseWithFailure(t);
}
}
}

private void failReplicaIfNeeded(Throwable t) {
Index index = request.shardId().getIndex();
int shardId = request.shardId().id();
logger.trace("failure on replica [{}][{}], action [{}], request [{}]", t, index, shardId, actionName, request);
if (ignoreReplicaException(t) == false) {
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
logger.debug("ignoring failed replica {}[{}] because index was already removed.", index, shardId);
return;
}
IndexShard indexShard = indexService.getShardOrNull(shardId);
if (indexShard == null) {
logger.debug("ignoring failed replica {}[{}] because index was already removed.", index, shardId);
return;
}
indexShard.failShard(actionName + " failed on replica", t);
}
}

Expand All @@ -401,7 +375,7 @@ protected void responseWithFailure(Throwable t) {
protected void doRun() throws Exception {
setPhase(task, "replica");
assert request.shardId() != null : "request shardId must be set";
try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId())) {
try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId(), request.primaryTerm())) {
shardOperationOnReplica(request);
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, request.shardId(), request);
Expand Down Expand Up @@ -707,7 +681,6 @@ protected void doRun() throws Exception {
indexShardReference = getIndexShardReferenceOnPrimary(shardId);
if (indexShardReference.isRelocated() == false) {
executeLocally();

} else {
executeRemotely();
}
Expand All @@ -716,6 +689,7 @@ protected void doRun() throws Exception {
private void executeLocally() throws Exception {
// execute locally
Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(state.metaData(), request);
primaryResponse.v2().primaryTerm(indexShardReference.opPrimaryTerm());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the primary term should instead be returned by the call to shardOperationOnPrimary above. This would give it the semantics of "I indexed this document when shard had the primary term XYZ". Now there is no clear connection between the two.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My goal here is not to force a subclass to worry about these things. As you noted later on - the primary term of a specific primary shard can not change.

if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version());
}
Expand Down Expand Up @@ -825,17 +799,17 @@ void finishBecauseUnavailable(ShardId shardId, String message) {
protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
return new IndexShardReferenceImpl(indexShard, true);
return IndexShardReferenceImpl.createOnPrimary(indexShard);
}

/**
* returns a new reference to {@link IndexShard} on a node that the request is replicated to. The reference is closed as soon as
* replication is completed on the node.
*/
protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId) {
protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, long primaryTerm) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
return new IndexShardReferenceImpl(indexShard, false);
return IndexShardReferenceImpl.createOnReplica(indexShard, primaryTerm);
}

/**
Expand Down Expand Up @@ -1098,9 +1072,13 @@ private void doFinish() {
totalShards,
success.get(),
failuresArray

)
);
if (logger.isTraceEnabled()) {
logger.trace("finished replicating action [{}], request [{}], shardInfo [{}]", actionName, replicaRequest,
finalResponse.getShardInfo());
}

try {
channel.sendResponse(finalResponse);
} catch (IOException responseException) {
Expand All @@ -1125,22 +1103,33 @@ interface IndexShardReference extends Releasable {
boolean isRelocated();
void failShard(String reason, @Nullable Throwable e);
ShardRouting routingEntry();

/** returns the primary term of the current operation */
long opPrimaryTerm();
}

static final class IndexShardReferenceImpl implements IndexShardReference {

private final IndexShard indexShard;
private final Releasable operationLock;

IndexShardReferenceImpl(IndexShard indexShard, boolean primaryAction) {
private IndexShardReferenceImpl(IndexShard indexShard, long primaryTerm) {
this.indexShard = indexShard;
if (primaryAction) {
if (primaryTerm < 0) {
operationLock = indexShard.acquirePrimaryOperationLock();
} else {
operationLock = indexShard.acquireReplicaOperationLock();
operationLock = indexShard.acquireReplicaOperationLock(primaryTerm);
}
}

static IndexShardReferenceImpl createOnPrimary(IndexShard indexShard) {
return new IndexShardReferenceImpl(indexShard, -1);
}

static IndexShardReferenceImpl createOnReplica(IndexShard indexShard, long primaryTerm) {
return new IndexShardReferenceImpl(indexShard, primaryTerm);
}

@Override
public void close() {
operationLock.close();
Expand All @@ -1160,6 +1149,11 @@ public void failShard(String reason, @Nullable Throwable e) {
public ShardRouting routingEntry() {
return indexShard.routingEntry();
}

@Override
public long opPrimaryTerm() {
return indexShard.getPrimaryTerm();
}
}

protected final void processAfterWrite(boolean refresh, IndexShard indexShard, Translog.Location location) {
Expand Down
51 changes: 34 additions & 17 deletions core/src/main/java/org/elasticsearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@

/**
* Represents the current state of the cluster.
*
* <p>
* The cluster state object is immutable with an
* exception of the {@link RoutingNodes} structure, which is built on demand from the {@link RoutingTable},
* and cluster state {@link #status}, which is updated during cluster state publishing and applying
Expand All @@ -74,7 +74,7 @@
* the type of discovery. For example, for local discovery it is implemented by the {@link LocalDiscovery#publish}
* method. In the Zen Discovery it is handled in the {@link PublishClusterStateAction#publish} method. The
* publishing mechanism can be overridden by other discovery.
*
* <p>
* The cluster state implements the {@link Diffable} interface in order to support publishing of cluster state
* differences instead of the entire state on each change. The publishing mechanism should only send differences
* to a node if this node was present in the previous version of the cluster state. If a node is not present was
Expand Down Expand Up @@ -135,7 +135,7 @@ public static <T extends Custom> T lookupPrototype(String type) {

public static <T extends Custom> T lookupPrototypeSafe(String type) {
@SuppressWarnings("unchecked")
T proto = (T)customPrototypes.get(type);
T proto = (T) customPrototypes.get(type);
if (proto == null) {
throw new IllegalArgumentException("No custom state prototype registered for type [" + type + "], node likely missing plugins");
}
Expand Down Expand Up @@ -281,6 +281,16 @@ public String prettyPrint() {
sb.append("state uuid: ").append(stateUUID).append("\n");
sb.append("from_diff: ").append(wasReadFromDiff).append("\n");
sb.append("meta data version: ").append(metaData.version()).append("\n");
for (IndexMetaData indexMetaData : metaData) {
final String TAB = " ";
sb.append(TAB).append(indexMetaData.getIndex());
sb.append(": v[").append(indexMetaData.getVersion()).append("]\n");
for (int shard = 0; shard < indexMetaData.getNumberOfShards(); shard++) {
sb.append(TAB).append(TAB).append(shard).append(": ");
sb.append("p_term [").append(indexMetaData.primaryTerm(shard)).append("], ");
sb.append("a_ids ").append(indexMetaData.activeAllocationIds(shard)).append("\n");
}
}
sb.append(blocks().prettyPrint());
sb.append(nodes().prettyPrint());
sb.append(routingTable().prettyPrint());
Expand Down Expand Up @@ -477,6 +487,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endArray();

builder.startObject(IndexMetaData.KEY_PRIMARY_TERMS);
for (int shard = 0; shard < indexMetaData.getNumberOfShards(); shard++) {
builder.field(Integer.toString(shard), indexMetaData.primaryTerm(shard));
}
builder.endObject();

builder.startObject(IndexMetaData.KEY_ACTIVE_ALLOCATIONS);
for (IntObjectCursor<Set<String>> cursor : indexMetaData.getActiveAllocationIds()) {
builder.startArray(String.valueOf(cursor.key));
Expand All @@ -487,6 +503,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endObject();

// index metadata
builder.endObject();
}
builder.endObject();
Expand Down Expand Up @@ -683,16 +700,16 @@ public static byte[] toBytes(ClusterState state) throws IOException {
}

/**
* @param data input bytes
* @param localNode used to set the local node in the cluster state.
* @param data input bytes
* @param localNode used to set the local node in the cluster state.
*/
public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode) throws IOException {
return readFrom(StreamInput.wrap(data), localNode);
}

/**
* @param in input stream
* @param localNode used to set the local node in the cluster state. can be null.
* @param in input stream
* @param localNode used to set the local node in the cluster state. can be null.
*/
public static ClusterState readFrom(StreamInput in, @Nullable DiscoveryNode localNode) throws IOException {
return PROTO.readFrom(in, localNode);
Expand Down Expand Up @@ -791,17 +808,17 @@ public ClusterStateDiff(StreamInput in, ClusterState proto) throws IOException {
metaData = proto.metaData.readDiffFrom(in);
blocks = proto.blocks.readDiffFrom(in);
customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(),
new DiffableUtils.DiffableValueSerializer<String, Custom>() {
@Override
public Custom read(StreamInput in, String key) throws IOException {
return lookupPrototypeSafe(key).readFrom(in);
}
new DiffableUtils.DiffableValueSerializer<String, Custom>() {
@Override
public Custom read(StreamInput in, String key) throws IOException {
return lookupPrototypeSafe(key).readFrom(in);
}

@Override
public Diff<Custom> readDiff(StreamInput in, String key) throws IOException {
return lookupPrototypeSafe(key).readDiffFrom(in);
}
});
@Override
public Diff<Custom> readDiff(StreamInput in, String key) throws IOException {
return lookupPrototypeSafe(key).readDiffFrom(in);
}
});
}

@Override
Expand Down
Loading