Skip to content

Commit b01322e

Browse files
authored
Allows SparseFileTracker to progressively execute listeners during Gap processing (#58477)
Today SparseFileTracker allows to wait for a range to become available before executing a given listener. In the case of searchable snapshot, we'd like to be able to wait for a large range to be filled (ie, downloaded and written to disk) while being able to execute the listener as soon as a smaller range is available. This pull request is an extract from #58164 which introduces a ProgressListenableActionFuture that is used internally by SparseFileTracker. The progressive listenable future allows to register listeners attached to SparseFileTracker.Gap so that they are executed once the Gap is completed (with success or failure) or as soon as the Gap progress reaches a given progress value. This progress value is defined when the tracker.waitForRange() method is called; this method has been modified to accept a range and another listener's range to operate on.
1 parent 94eb5a0 commit b01322e

File tree

8 files changed

+797
-65
lines changed

8 files changed

+797
-65
lines changed

server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ public void addListener(final ActionListener<T> listener) {
4949
}
5050

5151
@Override
52-
protected void done() {
53-
super.done();
52+
protected void done(boolean success) {
53+
super.done(success);
5454
synchronized (this) {
5555
executedListeners = true;
5656
}

server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {
110110
if (!sync.cancel()) {
111111
return false;
112112
}
113-
done();
113+
done(false);
114114
if (mayInterruptIfRunning) {
115115
interruptTask();
116116
}
@@ -132,7 +132,7 @@ protected void interruptTask() {
132132
/**
133133
* Subclasses should invoke this method to set the result of the computation
134134
* to {@code value}. This will set the state of the future to
135-
* {@link BaseFuture.Sync#COMPLETED} and call {@link #done()} if the
135+
* {@link BaseFuture.Sync#COMPLETED} and call {@link #done(boolean)} if the
136136
* state was successfully changed.
137137
*
138138
* @param value the value that was the result of the task.
@@ -141,15 +141,15 @@ protected void interruptTask() {
141141
protected boolean set(@Nullable V value) {
142142
boolean result = sync.set(value);
143143
if (result) {
144-
done();
144+
done(true);
145145
}
146146
return result;
147147
}
148148

149149
/**
150150
* Subclasses should invoke this method to set the result of the computation
151151
* to an error, {@code throwable}. This will set the state of the future to
152-
* {@link BaseFuture.Sync#COMPLETED} and call {@link #done()} if the
152+
* {@link BaseFuture.Sync#COMPLETED} and call {@link #done(boolean)} if the
153153
* state was successfully changed.
154154
*
155155
* @param throwable the exception that the task failed with.
@@ -159,7 +159,7 @@ protected boolean set(@Nullable V value) {
159159
protected boolean setException(Throwable throwable) {
160160
boolean result = sync.setException(Objects.requireNonNull(throwable));
161161
if (result) {
162-
done();
162+
done(false);
163163
}
164164

165165
// If it's an Error, we want to make sure it reaches the top of the
@@ -173,7 +173,14 @@ protected boolean setException(Throwable throwable) {
173173
return result;
174174
}
175175

176-
protected void done() {
176+
/**
177+
* Called when the {@link BaseFuture} is completed. The {@code success} boolean indicates if the {@link BaseFuture} was successfully
178+
* completed (the value is {@code true}). In the cases the {@link BaseFuture} was completed with an error or cancelled the
179+
* value is {@code false}.
180+
*
181+
* @param success indicates if the {@link BaseFuture} was completed with success (true); in other cases it equals to false
182+
*/
183+
protected void done(boolean success) {
177184
}
178185

179186
/**

server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public void addListener(ActionListener<V> listener, ExecutorService executor, Th
9393
}
9494

9595
@Override
96-
protected synchronized void done() {
96+
protected synchronized void done(boolean ignored) {
9797
done = true;
9898
listeners.forEach(t -> notifyListener(t.v1(), t.v2()));
9999
// release references to any listeners as we no longer need them and will live

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -274,8 +274,8 @@ CompletableFuture<Integer> fetchRange(
274274
}
275275
ensureOpen();
276276
final List<SparseFileTracker.Gap> gaps = tracker.waitForRange(
277-
start,
278-
end,
277+
Tuple.tuple(start, end),
278+
Tuple.tuple(start, end), // TODO use progressive sub range to trigger read operations sooner
279279
ActionListener.wrap(
280280
rangeReady -> future.complete(onRangeAvailable.apply(start, end)),
281281
rangeFailure -> future.completeExceptionally(rangeFailure)
@@ -286,7 +286,8 @@ CompletableFuture<Integer> fetchRange(
286286
try {
287287
ensureOpen();
288288
onRangeMissing.accept(gap.start, gap.end);
289-
gap.onResponse(null);
289+
gap.onProgress(gap.end); // TODO update progress in onRangeMissing
290+
gap.onCompletion();
290291
} catch (Exception e) {
291292
gap.onFailure(e);
292293
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.index.store.cache;
8+
9+
import org.elasticsearch.action.ActionFuture;
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.support.AdapterActionFuture;
12+
import org.elasticsearch.common.collect.Tuple;
13+
14+
import java.util.ArrayList;
15+
import java.util.List;
16+
import java.util.function.Supplier;
17+
18+
/**
19+
* An {@link ActionFuture} that listeners can be attached to. Listeners are executed when the future is completed
20+
* or when a given progress is reached. Progression is updated using the {@link #onProgress(long)} method.
21+
*
22+
* Listeners are executed within the thread that triggers the completion, the failure or the progress update and
23+
* the progress value passed to the listeners on execution is the last updated value.
24+
*/
25+
class ProgressListenableActionFuture extends AdapterActionFuture<Long, Long> {
26+
27+
protected final long start;
28+
protected final long end;
29+
30+
// modified under 'this' mutex
31+
private volatile List<Tuple<Long, ActionListener<Long>>> listeners;
32+
protected volatile long progress;
33+
private volatile boolean completed;
34+
35+
/**
36+
* Creates a {@link ProgressListenableActionFuture} that accepts the progression
37+
* to be within {@code start} (inclusive) and {@code end} (exclusive) values.
38+
*
39+
* @param start the start (inclusive)
40+
* @param end the end (exclusive)
41+
*/
42+
ProgressListenableActionFuture(long start, long end) {
43+
super();
44+
this.start = start;
45+
this.end = end;
46+
this.progress = start;
47+
this.completed = false;
48+
assert invariant();
49+
}
50+
51+
private boolean invariant() {
52+
assert start < end : start + " < " + end;
53+
synchronized (this) {
54+
assert completed == false || listeners == null;
55+
assert start <= progress : start + " <= " + progress;
56+
assert progress <= end : progress + " <= " + end;
57+
assert listeners == null || listeners.stream().allMatch(listener -> progress < listener.v1());
58+
}
59+
return true;
60+
}
61+
62+
/**
63+
* Updates the progress of the current {@link ActionFuture} with the given value, indicating that the range from {@code start}
64+
* (inclusive) to {@code progress} (exclusive) is available. Calling this method potentially triggers the execution of one or
65+
* more listeners that are waiting for the progress to reach a value lower than the one just updated.
66+
*
67+
* @param progress the new progress value
68+
*/
69+
public void onProgress(final long progress) {
70+
ensureNotCompleted();
71+
72+
if (progress <= start) {
73+
assert false : progress + " <= " + start;
74+
throw new IllegalArgumentException("Cannot update progress with a value less than [start=" + start + ']');
75+
}
76+
if (end < progress) {
77+
assert false : end + " < " + progress;
78+
throw new IllegalArgumentException("Cannot update progress with a value greater than [end=" + end + ']');
79+
}
80+
81+
List<ActionListener<Long>> listenersToExecute = null;
82+
synchronized (this) {
83+
assert this.progress < progress : this.progress + " < " + progress;
84+
this.progress = progress;
85+
86+
final List<Tuple<Long, ActionListener<Long>>> listeners = this.listeners;
87+
if (listeners != null) {
88+
List<Tuple<Long, ActionListener<Long>>> listenersToKeep = null;
89+
for (Tuple<Long, ActionListener<Long>> listener : listeners) {
90+
if (progress < listener.v1()) {
91+
if (listenersToKeep == null) {
92+
listenersToKeep = new ArrayList<>();
93+
}
94+
listenersToKeep.add(listener);
95+
} else {
96+
if (listenersToExecute == null) {
97+
listenersToExecute = new ArrayList<>();
98+
}
99+
listenersToExecute.add(listener.v2());
100+
}
101+
}
102+
this.listeners = listenersToKeep;
103+
}
104+
}
105+
if (listenersToExecute != null) {
106+
listenersToExecute.forEach(listener -> executeListener(listener, () -> progress));
107+
}
108+
assert invariant();
109+
}
110+
111+
@Override
112+
public void onResponse(Long result) {
113+
ensureNotCompleted();
114+
super.onResponse(result);
115+
}
116+
117+
@Override
118+
public void onFailure(Exception e) {
119+
ensureNotCompleted();
120+
super.onFailure(e);
121+
}
122+
123+
private void ensureNotCompleted() {
124+
if (completed) {
125+
throw new IllegalStateException("Future is already completed");
126+
}
127+
}
128+
129+
@Override
130+
protected void done(boolean success) {
131+
super.done(success);
132+
final List<Tuple<Long, ActionListener<Long>>> listenersToExecute;
133+
synchronized (this) {
134+
assert progress == end || success == false;
135+
completed = true;
136+
listenersToExecute = this.listeners;
137+
listeners = null;
138+
}
139+
if (listenersToExecute != null) {
140+
listenersToExecute.stream().map(Tuple::v2).forEach(listener -> executeListener(listener, () -> actionGet(0L)));
141+
}
142+
assert invariant();
143+
}
144+
145+
/**
146+
* Attach a {@link ActionListener} to the current future. The listener will be executed once the future is completed or once the
147+
* progress reaches the given {@code value}, whichever comes first.
148+
*
149+
* @param listener the {@link ActionListener} to add
150+
* @param value the value
151+
*/
152+
public void addListener(ActionListener<Long> listener, long value) {
153+
boolean executeImmediate = false;
154+
final long progress;
155+
synchronized (this) {
156+
progress = this.progress;
157+
if (completed || value <= progress) {
158+
executeImmediate = true;
159+
} else {
160+
List<Tuple<Long, ActionListener<Long>>> listeners = this.listeners;
161+
if (listeners == null) {
162+
listeners = new ArrayList<>();
163+
}
164+
listeners.add(Tuple.tuple(value, listener));
165+
this.listeners = listeners;
166+
}
167+
}
168+
if (executeImmediate) {
169+
executeListener(listener, completed ? () -> actionGet(0L) : () -> progress);
170+
}
171+
assert invariant();
172+
}
173+
174+
private void executeListener(final ActionListener<Long> listener, final Supplier<Long> result) {
175+
try {
176+
listener.onResponse(result.get());
177+
} catch (Exception e) {
178+
listener.onFailure(e);
179+
}
180+
}
181+
182+
@Override
183+
protected Long convert(Long response) {
184+
if (response == null || response < start || end < response) {
185+
assert false : start + " < " + response + " < " + end;
186+
throw new IllegalArgumentException("Invalid completion value [start=" + start + ",end=" + end + ",response=" + response + ']');
187+
}
188+
return response;
189+
}
190+
191+
@Override
192+
public String toString() {
193+
return "ProgressListenableActionFuture[start="
194+
+ start
195+
+ ", end="
196+
+ end
197+
+ ", progress="
198+
+ progress
199+
+ ", completed="
200+
+ completed
201+
+ ", listeners="
202+
+ (listeners != null ? listeners.size() : 0)
203+
+ ']';
204+
}
205+
}

0 commit comments

Comments
 (0)