Skip to content

Commit f2eaacb

Browse files
committed
Merge branch 'master' into ccr
* master: Fix resync request serialization Fix issue where pages aren't released (#27459) Add YAML REST tests for filters bucket agg (#27128) Remove tcp profile from low level nio channel (#27441)
2 parents 58591f2 + 28660be commit f2eaacb

File tree

25 files changed

+515
-118
lines changed

25 files changed

+515
-118
lines changed

core/src/main/java/org/elasticsearch/action/resync/ResyncReplicationRequest.java

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,42 +18,68 @@
1818
*/
1919
package org.elasticsearch.action.resync;
2020

21+
import org.elasticsearch.Version;
2122
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
2223
import org.elasticsearch.common.io.stream.StreamInput;
2324
import org.elasticsearch.common.io.stream.StreamOutput;
2425
import org.elasticsearch.index.shard.ShardId;
2526
import org.elasticsearch.index.translog.Translog;
2627

2728
import java.io.IOException;
28-
import java.util.List;
29+
import java.util.Arrays;
2930

31+
/**
32+
* Represents a batch of operations sent from the primary to its replicas during the primary-replica resync.
33+
*/
3034
public final class ResyncReplicationRequest extends ReplicatedWriteRequest<ResyncReplicationRequest> {
3135

32-
private List<Translog.Operation> operations;
36+
private Translog.Operation[] operations;
3337

3438
ResyncReplicationRequest() {
3539
super();
3640
}
3741

38-
public ResyncReplicationRequest(ShardId shardId, List<Translog.Operation> operations) {
42+
public ResyncReplicationRequest(final ShardId shardId, final Translog.Operation[] operations) {
3943
super(shardId);
4044
this.operations = operations;
4145
}
4246

43-
public List<Translog.Operation> getOperations() {
47+
public Translog.Operation[] getOperations() {
4448
return operations;
4549
}
4650

4751
@Override
48-
public void readFrom(StreamInput in) throws IOException {
52+
public void readFrom(final StreamInput in) throws IOException {
53+
if (in.getVersion().equals(Version.V_6_0_0)) {
54+
/*
55+
* Resync replication request serialization was broken in 6.0.0 due to the elements of the stream not being prefixed with a
56+
* byte indicating the type of the operation.
57+
*/
58+
// TODO: remove this check in 8.0.0 which provides no BWC guarantees with 6.x.
59+
assert Version.CURRENT.major <= 7;
60+
throw new IllegalStateException("resync replication request serialization is broken in 6.0.0");
61+
}
4962
super.readFrom(in);
50-
operations = in.readList(Translog.Operation::readType);
63+
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
5164
}
5265

5366
@Override
54-
public void writeTo(StreamOutput out) throws IOException {
67+
public void writeTo(final StreamOutput out) throws IOException {
5568
super.writeTo(out);
56-
out.writeList(operations);
69+
out.writeArray(Translog.Operation::writeOperation, operations);
70+
}
71+
72+
@Override
73+
public boolean equals(final Object o) {
74+
if (this == o) return true;
75+
if (o == null || getClass() != o.getClass()) return false;
76+
final ResyncReplicationRequest that = (ResyncReplicationRequest) o;
77+
return Arrays.equals(operations, that.operations);
78+
}
79+
80+
@Override
81+
public int hashCode() {
82+
return Arrays.hashCode(operations);
5783
}
5884

5985
@Override
@@ -62,7 +88,8 @@ public String toString() {
6288
"shardId=" + shardId +
6389
", timeout=" + timeout +
6490
", index='" + index + '\'' +
65-
", ops=" + operations.size() +
91+
", ops=" + operations.length +
6692
"}";
6793
}
94+
6895
}

core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -688,9 +688,21 @@ public byte[] readByteArray() throws IOException {
688688
return bytes;
689689
}
690690

691-
public <T> T[] readArray(Writeable.Reader<T> reader, IntFunction<T[]> arraySupplier) throws IOException {
692-
int length = readArraySize();
693-
T[] values = arraySupplier.apply(length);
691+
/**
692+
* Reads an array from the stream using the specified {@link org.elasticsearch.common.io.stream.Writeable.Reader} to read array elements
693+
* from the stream. This method can be seen as the reader version of {@link StreamOutput#writeArray(Writeable.Writer, Object[])}. It is
694+
* assumed that the stream first contains a variable-length integer representing the size of the array, and then contains that many
695+
* elements that can be read from the stream.
696+
*
697+
* @param reader the reader used to read individual elements
698+
* @param arraySupplier a supplier used to construct a new array
699+
* @param <T> the type of the elements of the array
700+
* @return an array read from the stream
701+
* @throws IOException if an I/O exception occurs while reading the array
702+
*/
703+
public <T> T[] readArray(final Writeable.Reader<T> reader, final IntFunction<T[]> arraySupplier) throws IOException {
704+
final int length = readArraySize();
705+
final T[] values = arraySupplier.apply(length);
694706
for (int i = 0; i < length; i++) {
695707
values[i] = reader.read(this);
696708
}

core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.util.LinkedHashMap;
5959
import java.util.List;
6060
import java.util.Map;
61+
import java.util.function.IntFunction;
6162

6263
/**
6364
* A stream from another node to this node. Technically, it can also be streamed from a byte array but that is mostly for testing.
@@ -706,6 +707,23 @@ public void writeDoubleArray(double[] values) throws IOException {
706707
}
707708
}
708709

710+
/**
711+
* Writes the specified array to the stream using the specified {@link Writer} for each element in the array. This method can be seen as
712+
* writer version of {@link StreamInput#readArray(Writeable.Reader, IntFunction)}. The length of array encoded as a variable-length
713+
* integer is first written to the stream, and then the elements of the array are written to the stream.
714+
*
715+
* @param writer the writer used to write individual elements
716+
* @param array the array
717+
* @param <T> the type of the elements of the array
718+
* @throws IOException if an I/O exception occurs while writing the array
719+
*/
720+
public <T> void writeArray(final Writer<T> writer, final T[] array) throws IOException {
721+
writeVInt(array.length);
722+
for (T value : array) {
723+
writer.write(this, value);
724+
}
725+
}
726+
709727
public <T extends Writeable> void writeArray(T[] array) throws IOException {
710728
writeVInt(array.length);
711729
for (T value: array) {

core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,8 @@ public void onFailure(Exception e) {
218218
}
219219
}
220220

221+
private static Translog.Operation[] EMPTY_ARRAY = new Translog.Operation[0];
222+
221223
@Override
222224
protected void doRun() throws Exception {
223225
long size = 0;
@@ -247,7 +249,7 @@ protected void doRun() throws Exception {
247249

248250
if (!operations.isEmpty()) {
249251
task.setPhase("sending_ops");
250-
ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations);
252+
ResyncReplicationRequest request = new ResyncReplicationRequest(shardId, operations.toArray(EMPTY_ARRAY));
251253
logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(),
252254
new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get());
253255
syncAction.sync(request, task, primaryAllocationId, primaryTerm, this);

core/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
3434
import org.elasticsearch.common.io.stream.StreamInput;
3535
import org.elasticsearch.common.io.stream.StreamOutput;
36-
import org.elasticsearch.common.io.stream.Writeable;
3736
import org.elasticsearch.common.lease.Releasable;
3837
import org.elasticsearch.common.lease.Releasables;
3938
import org.elasticsearch.common.lucene.uid.Versions;
@@ -861,7 +860,7 @@ public interface Snapshot extends Closeable {
861860
* A generic interface representing an operation performed on the transaction log.
862861
* Each is associated with a type.
863862
*/
864-
public interface Operation extends Writeable {
863+
public interface Operation {
865864
enum Type {
866865
@Deprecated
867866
CREATE((byte) 1),
@@ -890,7 +889,7 @@ public static Type fromId(byte id) {
890889
case 4:
891890
return NO_OP;
892891
default:
893-
throw new IllegalArgumentException("No type mapped for [" + id + "]");
892+
throw new IllegalArgumentException("no type mapped for [" + id + "]");
894893
}
895894
}
896895
}
@@ -907,31 +906,44 @@ public static Type fromId(byte id) {
907906

908907
/**
909908
* Reads the type and the operation from the given stream. The operation must be written with
910-
* {@link Operation#writeType(Operation, StreamOutput)}
909+
* {@link Operation#writeOperation(StreamOutput, Operation)}
911910
*/
912-
static Operation readType(StreamInput input) throws IOException {
913-
Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte());
911+
static Operation readOperation(final StreamInput input) throws IOException {
912+
final Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte());
914913
switch (type) {
915914
case CREATE:
916-
// the deserialization logic in Index was identical to that of Create when create was deprecated
915+
// the de-serialization logic in Index was identical to that of Create when create was deprecated
916+
case INDEX:
917917
return new Index(input);
918918
case DELETE:
919919
return new Delete(input);
920-
case INDEX:
921-
return new Index(input);
922920
case NO_OP:
923921
return new NoOp(input);
924922
default:
925-
throw new IOException("No type for [" + type + "]");
923+
throw new AssertionError("no case for [" + type + "]");
926924
}
927925
}
928926

929927
/**
930928
* Writes the type and translog operation to the given stream
931929
*/
932-
static void writeType(Translog.Operation operation, StreamOutput output) throws IOException {
930+
static void writeOperation(final StreamOutput output, final Operation operation) throws IOException {
933931
output.writeByte(operation.opType().id());
934-
operation.writeTo(output);
932+
switch(operation.opType()) {
933+
case CREATE:
934+
// the serialization logic in Index was identical to that of Create when create was deprecated
935+
case INDEX:
936+
((Index) operation).write(output);
937+
break;
938+
case DELETE:
939+
((Delete) operation).write(output);
940+
break;
941+
case NO_OP:
942+
((NoOp) operation).write(output);
943+
break;
944+
default:
945+
throw new AssertionError("no case for [" + operation.opType() + "]");
946+
}
935947
}
936948

937949
}
@@ -968,7 +980,7 @@ public static class Index implements Operation {
968980
private final String routing;
969981
private final String parent;
970982

971-
public Index(StreamInput in) throws IOException {
983+
private Index(final StreamInput in) throws IOException {
972984
final int format = in.readVInt(); // SERIALIZATION_FORMAT
973985
assert format >= FORMAT_2_X : "format was: " + format;
974986
id = in.readString();
@@ -1081,8 +1093,7 @@ public Source getSource() {
10811093
return new Source(source, routing, parent);
10821094
}
10831095

1084-
@Override
1085-
public void writeTo(StreamOutput out) throws IOException {
1096+
private void write(final StreamOutput out) throws IOException {
10861097
out.writeVInt(SERIALIZATION_FORMAT);
10871098
out.writeString(id);
10881099
out.writeString(type);
@@ -1170,7 +1181,7 @@ public static class Delete implements Operation {
11701181
private final long version;
11711182
private final VersionType versionType;
11721183

1173-
public Delete(StreamInput in) throws IOException {
1184+
private Delete(final StreamInput in) throws IOException {
11741185
final int format = in.readVInt();// SERIALIZATION_FORMAT
11751186
assert format >= FORMAT_5_0 : "format was: " + format;
11761187
if (format >= FORMAT_SINGLE_TYPE) {
@@ -1265,8 +1276,7 @@ public Source getSource() {
12651276
throw new IllegalStateException("trying to read doc source from delete operation");
12661277
}
12671278

1268-
@Override
1269-
public void writeTo(StreamOutput out) throws IOException {
1279+
private void write(final StreamOutput out) throws IOException {
12701280
out.writeVInt(SERIALIZATION_FORMAT);
12711281
out.writeString(type);
12721282
out.writeString(id);
@@ -1336,7 +1346,7 @@ public String reason() {
13361346
return reason;
13371347
}
13381348

1339-
NoOp(final StreamInput in) throws IOException {
1349+
private NoOp(final StreamInput in) throws IOException {
13401350
seqNo = in.readLong();
13411351
primaryTerm = in.readLong();
13421352
reason = in.readString();
@@ -1351,8 +1361,7 @@ public NoOp(final long seqNo, final long primaryTerm, final String reason) {
13511361
this.reason = reason;
13521362
}
13531363

1354-
@Override
1355-
public void writeTo(StreamOutput out) throws IOException {
1364+
private void write(final StreamOutput out) throws IOException {
13561365
out.writeLong(seqNo);
13571366
out.writeLong(primaryTerm);
13581367
out.writeString(reason);
@@ -1454,7 +1463,7 @@ static Translog.Operation readOperation(BufferedChecksumStreamInput in) throws I
14541463
verifyChecksum(in);
14551464
in.reset();
14561465
}
1457-
operation = Translog.Operation.readType(in);
1466+
operation = Translog.Operation.readOperation(in);
14581467
verifyChecksum(in);
14591468
} catch (TranslogCorruptedException e) {
14601469
throw e;
@@ -1497,7 +1506,7 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl
14971506
// because closing it closes the underlying stream, which we don't
14981507
// want to do here.
14991508
out.resetDigest();
1500-
Translog.Operation.writeType(op, out);
1509+
Translog.Operation.writeOperation(out, op);
15011510
long checksum = out.getChecksum();
15021511
out.writeInt((int) checksum);
15031512
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.resync;
21+
22+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
23+
import org.elasticsearch.common.io.stream.StreamInput;
24+
import org.elasticsearch.common.lucene.uid.Versions;
25+
import org.elasticsearch.index.Index;
26+
import org.elasticsearch.index.VersionType;
27+
import org.elasticsearch.index.shard.ShardId;
28+
import org.elasticsearch.index.translog.Translog;
29+
import org.elasticsearch.test.ESTestCase;
30+
31+
import java.io.IOException;
32+
import java.nio.charset.Charset;
33+
34+
import static org.hamcrest.Matchers.equalTo;
35+
36+
public class ResyncReplicationRequestTests extends ESTestCase {
37+
38+
public void testSerialization() throws IOException {
39+
final byte[] bytes = "{}".getBytes(Charset.forName("UTF-8"));
40+
final Translog.Index index = new Translog.Index("type", "id", 0, Versions.MATCH_ANY, VersionType.INTERNAL, bytes, null, null, -1);
41+
final ShardId shardId = new ShardId(new Index("index", "uuid"), 0);
42+
final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, new Translog.Operation[]{index});
43+
44+
final BytesStreamOutput out = new BytesStreamOutput();
45+
before.writeTo(out);
46+
47+
final StreamInput in = out.bytes().streamInput();
48+
final ResyncReplicationRequest after = new ResyncReplicationRequest();
49+
after.readFrom(in);
50+
51+
assertThat(after, equalTo(before));
52+
}
53+
54+
}

core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
4848
AtomicBoolean syncActionCalled = new AtomicBoolean();
4949
PrimaryReplicaSyncer.SyncAction syncAction =
5050
(request, parentTask, allocationId, primaryTerm, listener) -> {
51-
logger.info("Sending off {} operations", request.getOperations().size());
51+
logger.info("Sending off {} operations", request.getOperations().length);
5252
syncActionCalled.set(true);
5353
assertThat(parentTask, instanceOf(PrimaryReplicaSyncer.ResyncTask.class));
5454
listener.onResponse(new ResyncReplicationResponse());
@@ -98,7 +98,7 @@ public void testSyncerOnClosingShard() throws Exception {
9898
CountDownLatch syncCalledLatch = new CountDownLatch(1);
9999
PrimaryReplicaSyncer.SyncAction syncAction =
100100
(request, parentTask, allocationId, primaryTerm, listener) -> {
101-
logger.info("Sending off {} operations", request.getOperations().size());
101+
logger.info("Sending off {} operations", request.getOperations().length);
102102
syncActionCalled.set(true);
103103
syncCalledLatch.countDown();
104104
threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse()));

0 commit comments

Comments
 (0)