Skip to content

Commit 3aaa986

Browse files
committed
Make peer recovery send file chunks async (#44040)
1 parent b75003f commit 3aaa986

File tree

7 files changed

+384
-182
lines changed

7 files changed

+384
-182
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.indices.recovery;
21+
22+
import org.apache.logging.log4j.Logger;
23+
import org.apache.logging.log4j.message.ParameterizedMessage;
24+
import org.elasticsearch.Assertions;
25+
import org.elasticsearch.action.ActionListener;
26+
import org.elasticsearch.common.collect.Tuple;
27+
import org.elasticsearch.common.util.concurrent.AsyncIOProcessor;
28+
import org.elasticsearch.common.util.concurrent.ThreadContext;
29+
import org.elasticsearch.core.internal.io.IOUtils;
30+
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
31+
import org.elasticsearch.index.store.StoreFileMetaData;
32+
33+
import java.io.Closeable;
34+
import java.io.IOException;
35+
import java.util.Iterator;
36+
import java.util.List;
37+
import java.util.function.Consumer;
38+
39+
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
40+
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
41+
42+
/**
43+
* File chunks are sent/requested sequentially by at most one thread at any time. However, the sender/requestor won't wait for the response
44+
* before processing the next file chunk request to reduce the recovery time especially on secure/compressed or high latency communication.
45+
* <p>
46+
* The sender/requestor can send up to {@code maxConcurrentFileChunks} file chunk requests without waiting for responses. Since the recovery
47+
* target can receive file chunks out of order, it has to buffer those file chunks in memory and only flush to disk when there's no gap.
48+
* To ensure the recover target never buffers more than {@code maxConcurrentFileChunks} file chunks, we allow the sender/requestor to send
49+
* only up to {@code maxConcurrentFileChunks} file chunk requests from the last flushed (and acknowledged) file chunk. We leverage the local
50+
* checkpoint tracker for this purpose. We generate a new sequence number and assign it to each file chunk request before sending; then mark
51+
* that sequence number as processed when we receive a response for the corresponding file chunk request. With the local checkpoint tracker,
52+
* we know the last acknowledged-flushed file-chunk is a file chunk whose {@code requestSeqId} equals to the local checkpoint because the
53+
* recover target can flush all file chunks up to the local checkpoint.
54+
* <p>
55+
* When the number of un-replied file chunk requests reaches the limit (i.e. the gap between the max_seq_no and the local checkpoint is
56+
* greater than {@code maxConcurrentFileChunks}), the sending/requesting thread will abort its execution. That process will be resumed by
57+
* one of the networking threads which receive/handle the responses of the current pending file chunk requests. This process will continue
58+
* until all chunk requests are sent/responded.
59+
*/
60+
abstract class MultiFileTransfer<Request extends MultiFileTransfer.ChunkRequest> implements Closeable {
61+
private Status status = Status.PROCESSING;
62+
private final Logger logger;
63+
private final ActionListener<Void> listener;
64+
private final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
65+
private final AsyncIOProcessor<FileChunkResponseItem> processor;
66+
private final int maxConcurrentFileChunks;
67+
private StoreFileMetaData currentFile = null;
68+
private final Iterator<StoreFileMetaData> remainingFiles;
69+
private Tuple<StoreFileMetaData, Request> readAheadRequest = null;
70+
71+
protected MultiFileTransfer(Logger logger, ThreadContext threadContext, ActionListener<Void> listener,
72+
int maxConcurrentFileChunks, List<StoreFileMetaData> files) {
73+
this.logger = logger;
74+
this.maxConcurrentFileChunks = maxConcurrentFileChunks;
75+
this.listener = listener;
76+
this.processor = new AsyncIOProcessor<>(logger, maxConcurrentFileChunks, threadContext) {
77+
@Override
78+
protected void write(List<Tuple<FileChunkResponseItem, Consumer<Exception>>> items) {
79+
handleItems(items);
80+
}
81+
};
82+
this.remainingFiles = files.iterator();
83+
}
84+
85+
public final void start() {
86+
addItem(UNASSIGNED_SEQ_NO, null, null); // put a dummy item to start the processor
87+
}
88+
89+
private void addItem(long requestSeqId, StoreFileMetaData md, Exception failure) {
90+
processor.put(new FileChunkResponseItem(requestSeqId, md, failure), e -> { assert e == null : e; });
91+
}
92+
93+
private void handleItems(List<Tuple<FileChunkResponseItem, Consumer<Exception>>> items) {
94+
if (status != Status.PROCESSING) {
95+
assert status == Status.FAILED : "must not receive any response after the transfer was completed";
96+
// These exceptions will be ignored as we record only the first failure, log them for debugging purpose.
97+
items.stream().filter(item -> item.v1().failure != null).forEach(item ->
98+
logger.debug(new ParameterizedMessage("failed to transfer a file chunk request {}", item.v1().md), item.v1().failure));
99+
return;
100+
}
101+
try {
102+
for (Tuple<FileChunkResponseItem, Consumer<Exception>> item : items) {
103+
final FileChunkResponseItem resp = item.v1();
104+
if (resp.requestSeqId == UNASSIGNED_SEQ_NO) {
105+
continue; // not an actual item
106+
}
107+
requestSeqIdTracker.markSeqNoAsProcessed(resp.requestSeqId);
108+
if (resp.failure != null) {
109+
handleError(resp.md, resp.failure);
110+
throw resp.failure;
111+
}
112+
}
113+
while (requestSeqIdTracker.getMaxSeqNo() - requestSeqIdTracker.getProcessedCheckpoint() < maxConcurrentFileChunks) {
114+
final Tuple<StoreFileMetaData, Request> request = readAheadRequest != null ? readAheadRequest : getNextRequest();
115+
readAheadRequest = null;
116+
if (request == null) {
117+
assert currentFile == null && remainingFiles.hasNext() == false;
118+
if (requestSeqIdTracker.getMaxSeqNo() == requestSeqIdTracker.getProcessedCheckpoint()) {
119+
onCompleted(null);
120+
}
121+
return;
122+
}
123+
final long requestSeqId = requestSeqIdTracker.generateSeqNo();
124+
sendChunkRequest(request.v2(), ActionListener.wrap(
125+
r -> addItem(requestSeqId, request.v1(), null),
126+
e -> addItem(requestSeqId, request.v1(), e)));
127+
}
128+
// While we are waiting for the responses, we can prepare the next request in advance
129+
// so we can send it immediately when the responses arrive to reduce the transfer time.
130+
if (readAheadRequest == null) {
131+
readAheadRequest = getNextRequest();
132+
}
133+
} catch (Exception e) {
134+
onCompleted(e);
135+
}
136+
}
137+
138+
private void onCompleted(Exception failure) {
139+
if (Assertions.ENABLED && status != Status.PROCESSING) {
140+
throw new AssertionError("invalid status: expected [" + Status.PROCESSING + "] actual [" + status + "]", failure);
141+
}
142+
status = failure == null ? Status.SUCCESS : Status.FAILED;
143+
try {
144+
IOUtils.close(failure, this);
145+
} catch (Exception e) {
146+
listener.onFailure(e);
147+
return;
148+
}
149+
listener.onResponse(null);
150+
}
151+
152+
private Tuple<StoreFileMetaData, Request> getNextRequest() throws Exception {
153+
try {
154+
if (currentFile == null) {
155+
if (remainingFiles.hasNext()) {
156+
currentFile = remainingFiles.next();
157+
onNewFile(currentFile);
158+
} else {
159+
return null;
160+
}
161+
}
162+
final StoreFileMetaData md = currentFile;
163+
final Request request = nextChunkRequest(md);
164+
if (request.lastChunk()) {
165+
currentFile = null;
166+
}
167+
return Tuple.tuple(md, request);
168+
} catch (Exception e) {
169+
handleError(currentFile, e);
170+
throw e;
171+
}
172+
}
173+
174+
/**
175+
* This method is called when starting sending/requesting a new file. Subclasses should override
176+
* this method to reset the file offset or close the previous file and open a new file if needed.
177+
*/
178+
protected abstract void onNewFile(StoreFileMetaData md) throws IOException;
179+
180+
protected abstract Request nextChunkRequest(StoreFileMetaData md) throws IOException;
181+
182+
protected abstract void sendChunkRequest(Request request, ActionListener<Void> listener);
183+
184+
protected abstract void handleError(StoreFileMetaData md, Exception e) throws Exception;
185+
186+
private static class FileChunkResponseItem {
187+
final long requestSeqId;
188+
final StoreFileMetaData md;
189+
final Exception failure;
190+
191+
FileChunkResponseItem(long requestSeqId, StoreFileMetaData md, Exception failure) {
192+
this.requestSeqId = requestSeqId;
193+
this.md = md;
194+
this.failure = failure;
195+
}
196+
}
197+
198+
protected interface ChunkRequest {
199+
/**
200+
* @return {@code true} if this chunk request is the last chunk of the current file
201+
*/
202+
boolean lastChunk();
203+
}
204+
205+
private enum Status {
206+
PROCESSING,
207+
SUCCESS,
208+
FAILED
209+
}
210+
}

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ private RecoverySourceHandler createRecoverySourceHandler(StartRecoveryRequest r
176176
final RemoteRecoveryTargetHandler recoveryTarget =
177177
new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService,
178178
request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime));
179-
handler = new RecoverySourceHandler(shard, recoveryTarget, request,
179+
handler = new RecoverySourceHandler(shard, recoveryTarget, shard.getThreadPool(), request,
180180
Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks());
181181
return handler;
182182
}

0 commit comments

Comments
 (0)