|
133 | 133 | import org.elasticsearch.index.store.FsDirectoryFactory;
|
134 | 134 | import org.elasticsearch.index.store.Store;
|
135 | 135 | import org.elasticsearch.index.translog.SnapshotMatchers;
|
| 136 | +import org.elasticsearch.index.translog.TestTranslog; |
136 | 137 | import org.elasticsearch.index.translog.Translog;
|
137 | 138 | import org.elasticsearch.index.translog.TranslogConfig;
|
138 | 139 | import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
|
183 | 184 | import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
|
184 | 185 | import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
|
185 | 186 | import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
|
| 187 | +import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; |
186 | 188 | import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
|
187 | 189 | import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
|
188 | 190 | import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
|
@@ -5931,4 +5933,34 @@ public long addDocument(Iterable<? extends IndexableField> doc) throws IOExcepti
|
5931 | 5933 | assertNotNull(engine.failedEngine.get());
|
5932 | 5934 | }
|
5933 | 5935 | }
|
| 5936 | + |
| 5937 | + /** |
| 5938 | + * We can trim translog on the primary promotion and peer recovery based on the fact we add operations with either |
| 5939 | + * REPLICA or PEER_RECOVERY origin to translog although they already exist in the engine (i.e. hasProcessed() == true). |
| 5940 | + * If we decide not to add those already-processed operations to translog, we need to study carefully the consequence |
| 5941 | + * of the translog trimming in these two places. |
| 5942 | + */ |
| 5943 | + public void testAlwaysRecordReplicaOrPeerRecoveryOperationsToTranslog() throws Exception { |
| 5944 | + List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean()); |
| 5945 | + applyOperations(engine, operations); |
| 5946 | + Set<Long> seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet()); |
| 5947 | + try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) { |
| 5948 | + assertThat(snapshot.totalOperations(), equalTo(operations.size())); |
| 5949 | + assertThat(TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()), |
| 5950 | + equalTo(seqNos)); |
| 5951 | + } |
| 5952 | + primaryTerm.set(randomLongBetween(primaryTerm.get(), Long.MAX_VALUE)); |
| 5953 | + engine.rollTranslogGeneration(); |
| 5954 | + engine.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); |
| 5955 | + try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) { |
| 5956 | + assertThat(snapshot.totalOperations(), equalTo(operations.size())); |
| 5957 | + assertNull(snapshot.next()); |
| 5958 | + } |
| 5959 | + applyOperations(engine, operations); |
| 5960 | + try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) { |
| 5961 | + assertThat(snapshot.totalOperations(), equalTo(operations.size() * 2)); |
| 5962 | + assertThat(TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()), |
| 5963 | + equalTo(seqNos)); |
| 5964 | + } |
| 5965 | + } |
5934 | 5966 | }
|
0 commit comments