Skip to content

Commit 03983ae

Browse files
[8.6] Simplify and optimize deduplication of RepositoryData for a non-caching repository instance (#91851) (#91866)
* Simplify and optimize deduplication of RepositoryData for a non-caching repository instance (#91851) This makes use of the new deduplicator infrastructure to move to more efficient deduplication mechanics. The existing solution hardly ever deduplicated because it would only deduplicate after the repository entered a consistent state. The adjusted solution is much simpler, in that it simply deduplicates such that only a single loading of `RepositoryData` will ever happen at a time, fixing memory issues from massively concurrent loading of the repo data as described in #89952. closes #89952 * fix compile
1 parent c2329ca commit 03983ae

File tree

4 files changed

+175
-87
lines changed

4 files changed

+175
-87
lines changed

docs/changelog/91851.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pr: 91851
2+
summary: Simplify and optimize deduplication of `RepositoryData` for a non-caching
3+
repository instance
4+
area: Snapshot/Restore
5+
type: bug
6+
issues:
7+
- 89952
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action;
10+
11+
import org.elasticsearch.action.support.ContextPreservingActionListener;
12+
import org.elasticsearch.common.util.concurrent.ThreadContext;
13+
14+
import java.util.ArrayList;
15+
import java.util.List;
16+
import java.util.function.Consumer;
17+
18+
/**
19+
*
20+
* Wraps an async action that consumes an {@link ActionListener} such that multiple invocations of {@link #execute(ActionListener)} can
21+
* share the result from a single call to the wrapped action. This implementation is similar to {@link ResultDeduplicator} but offers
22+
* stronger guarantees of not seeing a stale result ever. Concretely, every invocation of {@link #execute(ActionListener)} is guaranteed to
23+
* be resolved with a response that has been computed at a time after the call to {@code execute} has been made. This allows this class to
24+
* be used to deduplicate results from actions that produce results that change over time transparently.
25+
*
26+
* @param <T> Result type
27+
*/
28+
public final class SingleResultDeduplicator<T> {
29+
30+
private final ThreadContext threadContext;
31+
32+
/**
33+
* List of listeners waiting for the execution after the current in-progress execution. If {@code null} then no execution is in
34+
* progress currently, otherwise an execution is in progress and will trigger another execution that will resolve any listeners queued
35+
* up here once done.
36+
*/
37+
private List<ActionListener<T>> waitingListeners;
38+
39+
private final Consumer<ActionListener<T>> executeAction;
40+
41+
public SingleResultDeduplicator(ThreadContext threadContext, Consumer<ActionListener<T>> executeAction) {
42+
this.threadContext = threadContext;
43+
this.executeAction = executeAction;
44+
}
45+
46+
/**
47+
* Execute the action for the given {@code listener}.
48+
* @param listener listener to resolve with execution result
49+
*/
50+
public void execute(ActionListener<T> listener) {
51+
synchronized (this) {
52+
if (waitingListeners == null) {
53+
// no queued up listeners, just execute this one directly without deduplication and instantiate the list so that
54+
// subsequent executions will wait
55+
waitingListeners = new ArrayList<>();
56+
} else {
57+
// already running an execution, queue this one up
58+
waitingListeners.add(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext));
59+
return;
60+
}
61+
}
62+
doExecute(listener);
63+
}
64+
65+
private void doExecute(ActionListener<T> listener) {
66+
final ActionListener<T> wrappedListener = ActionListener.runBefore(listener, () -> {
67+
final List<ActionListener<T>> listeners;
68+
synchronized (this) {
69+
if (waitingListeners.isEmpty()) {
70+
// no listeners were queued up while this execution ran, so we just reset the state to not having a running execution
71+
waitingListeners = null;
72+
return;
73+
} else {
74+
// we have queued up listeners, so we create a fresh list for the next execution and execute once to handle the
75+
// listeners currently queued up
76+
listeners = waitingListeners;
77+
waitingListeners = new ArrayList<>();
78+
}
79+
}
80+
doExecute(new ActionListener<>() {
81+
@Override
82+
public void onResponse(T response) {
83+
ActionListener.onResponse(listeners, response);
84+
}
85+
86+
@Override
87+
public void onFailure(Exception e) {
88+
ActionListener.onFailure(listeners, e);
89+
}
90+
});
91+
});
92+
try {
93+
executeAction.accept(wrappedListener);
94+
} catch (Exception e) {
95+
wrappedListener.onFailure(e);
96+
}
97+
}
98+
}

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 69 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.elasticsearch.Version;
2626
import org.elasticsearch.action.ActionListener;
2727
import org.elasticsearch.action.ActionRunnable;
28-
import org.elasticsearch.action.ResultDeduplicator;
28+
import org.elasticsearch.action.SingleResultDeduplicator;
2929
import org.elasticsearch.action.StepListener;
3030
import org.elasticsearch.action.support.GroupedActionListener;
3131
import org.elasticsearch.action.support.ListenableActionFuture;
@@ -413,7 +413,11 @@ protected BlobStoreRepository(
413413
this.namedXContentRegistry = namedXContentRegistry;
414414
this.basePath = basePath;
415415
this.maxSnapshotCount = MAX_SNAPSHOTS_SETTING.get(metadata.settings());
416-
this.repoDataDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext());
416+
this.repoDataLoadDeduplicator = new SingleResultDeduplicator<>(
417+
threadPool.getThreadContext(),
418+
listener -> threadPool.executor(ThreadPool.Names.SNAPSHOT_META)
419+
.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData))
420+
);
417421
shardSnapshotTaskRunner = new ShardSnapshotTaskRunner(
418422
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
419423
threadPool.executor(ThreadPool.Names.SNAPSHOT),
@@ -1787,19 +1791,7 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
17871791
metadata.name(),
17881792
latestKnownRepoGen
17891793
);
1790-
// Don't deduplicate repo data loading if we don't have strong consistency guarantees between the repo and the cluster state
1791-
// Also, if we are not caching repository data (for tests) we assume that the contents of the repository data at a given
1792-
// generation may change
1793-
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_META);
1794-
if (bestEffortConsistency || cacheRepositoryData == false) {
1795-
executor.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
1796-
} else {
1797-
repoDataDeduplicator.executeOnce(
1798-
metadata,
1799-
listener,
1800-
(metadata, l) -> executor.execute(ActionRunnable.wrap(l, this::doGetRepositoryData))
1801-
);
1802-
}
1794+
repoDataLoadDeduplicator.execute(listener);
18031795
}
18041796
}
18051797

@@ -1843,78 +1835,70 @@ private void initializeRepoGenerationTracking(ActionListener<RepositoryData> lis
18431835
}
18441836
existingListener.onFailure(e);
18451837
};
1846-
threadPool.generic()
1847-
.execute(
1848-
ActionRunnable.wrap(
1849-
ActionListener.wrap(
1850-
repoData -> submitUnbatchedTask(
1851-
"set initial safe repository generation [" + metadata.name() + "][" + repoData.getGenId() + "]",
1852-
new ClusterStateUpdateTask() {
1853-
@Override
1854-
public ClusterState execute(ClusterState currentState) {
1855-
RepositoryMetadata metadata = getRepoMetadata(currentState);
1856-
// No update to the repository generation should have occurred concurrently in general except
1857-
// for
1858-
// extreme corner cases like failing over to an older version master node and back to the
1859-
// current
1860-
// node concurrently
1861-
if (metadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
1862-
throw new RepositoryException(
1863-
metadata.name(),
1864-
"Found unexpected initialized repo metadata [" + metadata + "]"
1865-
);
1866-
}
1867-
return ClusterState.builder(currentState)
1868-
.metadata(
1869-
Metadata.builder(currentState.getMetadata())
1870-
.putCustom(
1871-
RepositoriesMetadata.TYPE,
1872-
currentState.metadata()
1873-
.<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE)
1874-
.withUpdatedGeneration(
1875-
metadata.name(),
1876-
repoData.getGenId(),
1877-
repoData.getGenId()
1878-
)
1879-
)
1838+
repoDataLoadDeduplicator.execute(
1839+
ActionListener.wrap(
1840+
repoData -> submitUnbatchedTask(
1841+
"set initial safe repository generation [" + metadata.name() + "][" + repoData.getGenId() + "]",
1842+
new ClusterStateUpdateTask() {
1843+
@Override
1844+
public ClusterState execute(ClusterState currentState) {
1845+
RepositoryMetadata metadata = getRepoMetadata(currentState);
1846+
// No update to the repository generation should have occurred concurrently in general except
1847+
// for
1848+
// extreme corner cases like failing over to an older version master node and back to the
1849+
// current
1850+
// node concurrently
1851+
if (metadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
1852+
throw new RepositoryException(
1853+
metadata.name(),
1854+
"Found unexpected initialized repo metadata [" + metadata + "]"
1855+
);
1856+
}
1857+
return ClusterState.builder(currentState)
1858+
.metadata(
1859+
Metadata.builder(currentState.getMetadata())
1860+
.putCustom(
1861+
RepositoriesMetadata.TYPE,
1862+
currentState.metadata()
1863+
.<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE)
1864+
.withUpdatedGeneration(metadata.name(), repoData.getGenId(), repoData.getGenId())
18801865
)
1881-
.build();
1882-
}
1866+
)
1867+
.build();
1868+
}
18831869

1884-
@Override
1885-
public void onFailure(Exception e) {
1886-
onFailure.accept(e);
1887-
}
1870+
@Override
1871+
public void onFailure(Exception e) {
1872+
onFailure.accept(e);
1873+
}
18881874

1889-
@Override
1890-
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
1891-
logger.trace(
1892-
"[{}] initialized repository generation in cluster state to [{}]",
1893-
metadata.name(),
1894-
repoData.getGenId()
1895-
);
1896-
// Resolve listeners on generic pool since some callbacks for repository data do additional IO
1897-
threadPool.generic().execute(() -> {
1898-
final ActionListener<RepositoryData> existingListener;
1899-
synchronized (BlobStoreRepository.this) {
1900-
existingListener = repoDataInitialized;
1901-
repoDataInitialized = null;
1902-
}
1903-
existingListener.onResponse(repoData);
1904-
logger.trace(
1905-
"[{}] called listeners after initializing repository to generation [{}]",
1906-
metadata.name(),
1907-
repoData.getGenId()
1908-
);
1909-
});
1875+
@Override
1876+
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
1877+
logger.trace(
1878+
"[{}] initialized repository generation in cluster state to [{}]",
1879+
metadata.name(),
1880+
repoData.getGenId()
1881+
);
1882+
// Resolve listeners on generic pool since some callbacks for repository data do additional IO
1883+
threadPool.generic().execute(() -> {
1884+
final ActionListener<RepositoryData> existingListener;
1885+
synchronized (BlobStoreRepository.this) {
1886+
existingListener = repoDataInitialized;
1887+
repoDataInitialized = null;
19101888
}
1911-
}
1912-
),
1913-
onFailure
1914-
),
1915-
this::doGetRepositoryData
1916-
)
1917-
);
1889+
existingListener.onResponse(repoData);
1890+
logger.trace(
1891+
"[{}] called listeners after initializing repository to generation [{}]",
1892+
metadata.name(),
1893+
repoData.getGenId()
1894+
);
1895+
});
1896+
}
1897+
}
1898+
),
1899+
onFailure
1900+
)
1901+
);
19181902
} else {
19191903
logger.trace(
19201904
"[{}] waiting for existing initialization of repository metadata generation in cluster state",
@@ -1926,11 +1910,9 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
19261910
}
19271911

19281912
/**
1929-
* {@link RepositoryData} loading deduplicator. This may only be used with consistent generation repositories, meaning
1930-
* {@link #bestEffortConsistency} must be {@code false}, in which case we can assume that the {@link RepositoryData} loaded is
1931-
* unique for a given value of {@link #metadata} at any point in time.
1913+
* Deduplicator that deduplicates the physical loading of {@link RepositoryData} from the repositories' underlying storage.
19321914
*/
1933-
private final ResultDeduplicator<RepositoryMetadata, RepositoryData> repoDataDeduplicator;
1915+
private final SingleResultDeduplicator<RepositoryData> repoDataLoadDeduplicator;
19341916

19351917
private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
19361918
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.

test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,7 @@ private static ClusterService mockClusterService(ClusterState initialState) {
423423
final ThreadPool threadPool = mock(ThreadPool.class);
424424
when(threadPool.getThreadContext()).thenReturn(threadContext);
425425
when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService());
426+
when(threadPool.executor(ThreadPool.Names.SNAPSHOT_META)).thenReturn(new SameThreadExecutorService());
426427
when(threadPool.generic()).thenReturn(new SameThreadExecutorService());
427428
when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn(
428429
new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10))

0 commit comments

Comments
 (0)