Skip to content

Commit d223845

Browse files
committed
Ensure to release translog snapshot in primary-replica resync (#32045)
Previously we create a translog snapshot inside the resync method, and that snapshot will be closed by the resync listener. However, if the resync method throws an exception before the resync listener is initialized, the translog snapshot won't be released. Closes #32030
1 parent ef398b1 commit d223845

File tree

3 files changed

+75
-42
lines changed

3 files changed

+75
-42
lines changed

server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java

+37-34
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.common.unit.ByteSizeValue;
3636
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3737
import org.elasticsearch.common.xcontent.XContentBuilder;
38+
import org.elasticsearch.core.internal.io.IOUtils;
3839
import org.elasticsearch.index.seqno.SequenceNumbers;
3940
import org.elasticsearch.index.translog.Translog;
4041
import org.elasticsearch.tasks.Task;
@@ -80,48 +81,25 @@ void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests
8081
}
8182

8283
public void resync(final IndexShard indexShard, final ActionListener<ResyncTask> listener) {
83-
ActionListener<ResyncTask> resyncListener = null;
84+
Translog.Snapshot snapshot = null;
8485
try {
8586
final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1;
86-
Translog.Snapshot snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo);
8787
final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
88-
resyncListener = new ActionListener<ResyncTask>() {
89-
@Override
90-
public void onResponse(final ResyncTask resyncTask) {
91-
try {
92-
snapshot.close();
93-
listener.onResponse(resyncTask);
94-
} catch (final Exception e) {
95-
onFailure(e);
96-
}
97-
}
98-
99-
@Override
100-
public void onFailure(final Exception e) {
101-
try {
102-
snapshot.close();
103-
} catch (final Exception inner) {
104-
e.addSuppressed(inner);
105-
} finally {
106-
listener.onFailure(e);
107-
}
108-
}
109-
};
110-
ShardId shardId = indexShard.shardId();
111-
88+
final ShardId shardId = indexShard.shardId();
11289
// Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
11390
// Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible
11491
// Also fail the resync early if the shard is shutting down
115-
Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {
116-
92+
snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo);
93+
final Translog.Snapshot originalSnapshot = snapshot;
94+
final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {
11795
@Override
11896
public synchronized void close() throws IOException {
119-
snapshot.close();
97+
originalSnapshot.close();
12098
}
12199

122100
@Override
123101
public synchronized int totalOperations() {
124-
return snapshot.totalOperations();
102+
return originalSnapshot.totalOperations();
125103
}
126104

127105
@Override
@@ -132,15 +110,40 @@ public synchronized Translog.Operation next() throws IOException {
132110
} else {
133111
assert state == IndexShardState.STARTED : "resync should only happen on a started shard, but state was: " + state;
134112
}
135-
return snapshot.next();
113+
return originalSnapshot.next();
136114
}
137115
};
116+
final ActionListener<ResyncTask> resyncListener = new ActionListener<ResyncTask>() {
117+
@Override
118+
public void onResponse(final ResyncTask resyncTask) {
119+
try {
120+
wrappedSnapshot.close();
121+
listener.onResponse(resyncTask);
122+
} catch (final Exception e) {
123+
onFailure(e);
124+
}
125+
}
126+
127+
@Override
128+
public void onFailure(final Exception e) {
129+
try {
130+
wrappedSnapshot.close();
131+
} catch (final Exception inner) {
132+
e.addSuppressed(inner);
133+
} finally {
134+
listener.onFailure(e);
135+
}
136+
}
137+
};
138+
138139
resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot,
139140
startingSeqNo, maxSeqNo, resyncListener);
140141
} catch (Exception e) {
141-
if (resyncListener != null) {
142-
resyncListener.onFailure(e);
143-
} else {
142+
try {
143+
IOUtils.close(snapshot);
144+
} catch (IOException inner) {
145+
e.addSuppressed(inner);
146+
} finally {
144147
listener.onFailure(e);
145148
}
146149
}

server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java

+20-8
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,10 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
123123
public void testSyncerOnClosingShard() throws Exception {
124124
IndexShard shard = newStartedShard(true);
125125
AtomicBoolean syncActionCalled = new AtomicBoolean();
126-
CountDownLatch syncCalledLatch = new CountDownLatch(1);
127126
PrimaryReplicaSyncer.SyncAction syncAction =
128127
(request, parentTask, allocationId, primaryTerm, listener) -> {
129128
logger.info("Sending off {} operations", request.getOperations().length);
130129
syncActionCalled.set(true);
131-
syncCalledLatch.countDown();
132130
threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse()));
133131
};
134132
PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY,
@@ -147,13 +145,27 @@ public void testSyncerOnClosingShard() throws Exception {
147145
shard.updateShardState(shard.routingEntry(), shard.getPrimaryTerm(), null, 1000L, Collections.singleton(allocationId),
148146
new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build(), Collections.emptySet());
149147

150-
PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
151-
threadPool.generic().execute(() -> {
152-
try {
153-
syncer.resync(shard, fut);
154-
} catch (AlreadyClosedException ace) {
155-
fut.onFailure(ace);
148+
CountDownLatch syncCalledLatch = new CountDownLatch(1);
149+
PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<PrimaryReplicaSyncer.ResyncTask>() {
150+
@Override
151+
public void onFailure(Exception e) {
152+
try {
153+
super.onFailure(e);
154+
} finally {
155+
syncCalledLatch.countDown();
156+
}
157+
}
158+
@Override
159+
public void onResponse(PrimaryReplicaSyncer.ResyncTask result) {
160+
try {
161+
super.onResponse(result);
162+
} finally {
163+
syncCalledLatch.countDown();
164+
}
156165
}
166+
};
167+
threadPool.generic().execute(() -> {
168+
syncer.resync(shard, fut);
157169
});
158170
if (randomBoolean()) {
159171
syncCalledLatch.await();

server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

+18
Original file line numberDiff line numberDiff line change
@@ -2949,6 +2949,24 @@ public void testSnapshotDedupOperations() throws Exception {
29492949
}
29502950
}
29512951

2952+
/** Make sure that it's ok to close a translog snapshot multiple times */
2953+
public void testCloseSnapshotTwice() throws Exception {
2954+
int numOps = between(0, 10);
2955+
for (int i = 0; i < numOps; i++) {
2956+
Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), i, primaryTerm.get(), new byte[]{1});
2957+
translog.add(op);
2958+
if (randomBoolean()) {
2959+
translog.rollGeneration();
2960+
}
2961+
}
2962+
for (int i = 0; i < 5; i++) {
2963+
Translog.Snapshot snapshot = translog.newSnapshot();
2964+
assertThat(snapshot, SnapshotMatchers.size(numOps));
2965+
snapshot.close();
2966+
snapshot.close();
2967+
}
2968+
}
2969+
29522970
static class SortedSnapshot implements Translog.Snapshot {
29532971
private final Translog.Snapshot snapshot;
29542972
private List<Translog.Operation> operations = null;

0 commit comments

Comments
 (0)