Skip to content

Cleanup TransportReplicationAction #12395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ public NoShardAvailableActionException(ShardId shardId) {
this(shardId, null);
}

public NoShardAvailableActionException(ShardId shardId, String msg) {
this(shardId, msg, null);
public NoShardAvailableActionException(ShardId shardId, String msg, Object... args) {
this(shardId, msg, null, args);
}

public NoShardAvailableActionException(ShardId shardId, String msg, Throwable cause) {
super(msg, cause);
public NoShardAvailableActionException(ShardId shardId, String msg, Throwable cause, Object... args) {
super(msg, cause, args);
setShard(shardId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ protected void shardOperationOnReplica(ShardId shardId, BulkShardRequest request
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: [{}]", update);
}
operation.execute(indexShard);
location = locationToSync(location, operation.getTranslogLocation());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ protected void shardOperationOnReplica(ShardId shardId, IndexRequest request) {
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: [{}]", update);
}
operation.execute(indexShard);
processAfter(request, indexShard, operation.getTranslogLocation());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
Expand All @@ -41,12 +41,11 @@ public abstract class ReplicationRequest<T extends ReplicationRequest> extends A

public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES);

ShardId internalShardId;
ShardRouting internalShardRouting;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this change? see other comment on transport replication action

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the AsyncPrimaryAction is using this now. We can remove this but then we need to resolve the primary twice. I don't have a real preference, since it is all local now.


protected TimeValue timeout = DEFAULT_TIMEOUT;
protected String index;

private boolean threadedOperation = true;
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

w00t :)

private volatile boolean canHaveDuplicates = false;

Expand Down Expand Up @@ -76,7 +75,6 @@ protected ReplicationRequest(T request, ActionRequest originalRequest) {
super(originalRequest);
this.timeout = request.timeout();
this.index = request.index();
this.threadedOperation = request.operationThreaded();
this.consistencyLevel = request.consistencyLevel();
}

Expand All @@ -91,23 +89,6 @@ public boolean canHaveDuplicates() {
return canHaveDuplicates;
}

/**
* Controls if the operation will be executed on a separate thread when executed locally.
*/
public final boolean operationThreaded() {
return threadedOperation;
}

/**
* Controls if the operation will be executed on a separate thread when executed locally. Defaults
* to <tt>true</tt> when running in embedded mode.
*/
@SuppressWarnings("unchecked")
public final T operationThreaded(boolean threadedOperation) {
this.threadedOperation = threadedOperation;
return (T) this;
}

/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
Expand Down Expand Up @@ -174,19 +155,18 @@ public ActionRequestValidationException validate() {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.readBoolean()) {
internalShardId = ShardId.readShardId(in);
internalShardRouting = ShardRouting.readShardRoutingEntry(in);
}
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
timeout = TimeValue.readTimeValue(in);
index = in.readString();
canHaveDuplicates = in.readBoolean();
// no need to serialize threaded* parameters, since they only matter locally
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalStreamable(internalShardId);
out.writeOptionalStreamable(internalShardRouting);
out.writeByte(consistencyLevel.id());
timeout.writeTo(out);
out.writeString(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,6 @@ protected ReplicationRequestBuilder(ElasticsearchClient client, Action<Request,
super(client, action, request);
}

/**
* Controls if the operation will be executed on a separate thread when executed locally. Defaults
* to <tt>true</tt> when running in embedded mode.
*/
@SuppressWarnings("unchecked")
public final RequestBuilder setOperationThreaded(boolean threadedOperation) {
request.operationThreaded(threadedOperation);
return (RequestBuilder) this;
}

/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
Expand Down
Loading