Skip to content

Commit 88e44a9

Browse files
Simplify and optimize deduplication of RepositoryData for a non-caching repository instance (#91851)
This makes use of the new deduplicator infrastructure to move to more efficient deduplication mechanics. The existing solution hardly ever deduplicated because it would only deduplicate after the repository entered a consistent state. The adjusted solution is much simpler, in that it simply deduplicates such that only a single loading of `RepositoryData` will ever happen at a time, fixing memory issues from massively concurrent loading of the repo data as described in #89952. closes #89952
1 parent 261f184 commit 88e44a9

File tree

3 files changed

+77
-87
lines changed

3 files changed

+77
-87
lines changed

docs/changelog/91851.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pr: 91851
2+
summary: Simplify and optimize deduplication of `RepositoryData` for a non-caching
3+
repository instance
4+
area: Snapshot/Restore
5+
type: bug
6+
issues:
7+
- 89952

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 69 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.elasticsearch.Version;
2626
import org.elasticsearch.action.ActionListener;
2727
import org.elasticsearch.action.ActionRunnable;
28-
import org.elasticsearch.action.ResultDeduplicator;
28+
import org.elasticsearch.action.SingleResultDeduplicator;
2929
import org.elasticsearch.action.StepListener;
3030
import org.elasticsearch.action.support.GroupedActionListener;
3131
import org.elasticsearch.action.support.ListenableActionFuture;
@@ -413,7 +413,11 @@ protected BlobStoreRepository(
413413
this.namedXContentRegistry = namedXContentRegistry;
414414
this.basePath = basePath;
415415
this.maxSnapshotCount = MAX_SNAPSHOTS_SETTING.get(metadata.settings());
416-
this.repoDataDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext());
416+
this.repoDataLoadDeduplicator = new SingleResultDeduplicator<>(
417+
threadPool.getThreadContext(),
418+
listener -> threadPool.executor(ThreadPool.Names.SNAPSHOT_META)
419+
.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData))
420+
);
417421
shardSnapshotTaskRunner = new ShardSnapshotTaskRunner(
418422
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
419423
threadPool.executor(ThreadPool.Names.SNAPSHOT),
@@ -1787,19 +1791,7 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
17871791
metadata.name(),
17881792
latestKnownRepoGen
17891793
);
1790-
// Don't deduplicate repo data loading if we don't have strong consistency guarantees between the repo and the cluster state
1791-
// Also, if we are not caching repository data (for tests) we assume that the contents of the repository data at a given
1792-
// generation may change
1793-
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_META);
1794-
if (bestEffortConsistency || cacheRepositoryData == false) {
1795-
executor.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData));
1796-
} else {
1797-
repoDataDeduplicator.executeOnce(
1798-
metadata,
1799-
listener,
1800-
(metadata, l) -> executor.execute(ActionRunnable.wrap(l, this::doGetRepositoryData))
1801-
);
1802-
}
1794+
repoDataLoadDeduplicator.execute(listener);
18031795
}
18041796
}
18051797

@@ -1843,78 +1835,70 @@ private void initializeRepoGenerationTracking(ActionListener<RepositoryData> lis
18431835
}
18441836
existingListener.onFailure(e);
18451837
};
1846-
threadPool.generic()
1847-
.execute(
1848-
ActionRunnable.wrap(
1849-
ActionListener.wrap(
1850-
repoData -> submitUnbatchedTask(
1851-
"set initial safe repository generation [" + metadata.name() + "][" + repoData.getGenId() + "]",
1852-
new ClusterStateUpdateTask() {
1853-
@Override
1854-
public ClusterState execute(ClusterState currentState) {
1855-
RepositoryMetadata metadata = getRepoMetadata(currentState);
1856-
// No update to the repository generation should have occurred concurrently in general except
1857-
// for
1858-
// extreme corner cases like failing over to an older version master node and back to the
1859-
// current
1860-
// node concurrently
1861-
if (metadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
1862-
throw new RepositoryException(
1863-
metadata.name(),
1864-
"Found unexpected initialized repo metadata [" + metadata + "]"
1865-
);
1866-
}
1867-
return ClusterState.builder(currentState)
1868-
.metadata(
1869-
Metadata.builder(currentState.getMetadata())
1870-
.putCustom(
1871-
RepositoriesMetadata.TYPE,
1872-
currentState.metadata()
1873-
.<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE)
1874-
.withUpdatedGeneration(
1875-
metadata.name(),
1876-
repoData.getGenId(),
1877-
repoData.getGenId()
1878-
)
1879-
)
1838+
repoDataLoadDeduplicator.execute(
1839+
ActionListener.wrap(
1840+
repoData -> submitUnbatchedTask(
1841+
"set initial safe repository generation [" + metadata.name() + "][" + repoData.getGenId() + "]",
1842+
new ClusterStateUpdateTask() {
1843+
@Override
1844+
public ClusterState execute(ClusterState currentState) {
1845+
RepositoryMetadata metadata = getRepoMetadata(currentState);
1846+
// No update to the repository generation should have occurred concurrently in general except
1847+
// for
1848+
// extreme corner cases like failing over to an older version master node and back to the
1849+
// current
1850+
// node concurrently
1851+
if (metadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
1852+
throw new RepositoryException(
1853+
metadata.name(),
1854+
"Found unexpected initialized repo metadata [" + metadata + "]"
1855+
);
1856+
}
1857+
return ClusterState.builder(currentState)
1858+
.metadata(
1859+
Metadata.builder(currentState.getMetadata())
1860+
.putCustom(
1861+
RepositoriesMetadata.TYPE,
1862+
currentState.metadata()
1863+
.<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE)
1864+
.withUpdatedGeneration(metadata.name(), repoData.getGenId(), repoData.getGenId())
18801865
)
1881-
.build();
1882-
}
1866+
)
1867+
.build();
1868+
}
18831869

1884-
@Override
1885-
public void onFailure(Exception e) {
1886-
onFailure.accept(e);
1887-
}
1870+
@Override
1871+
public void onFailure(Exception e) {
1872+
onFailure.accept(e);
1873+
}
18881874

1889-
@Override
1890-
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
1891-
logger.trace(
1892-
"[{}] initialized repository generation in cluster state to [{}]",
1893-
metadata.name(),
1894-
repoData.getGenId()
1895-
);
1896-
// Resolve listeners on generic pool since some callbacks for repository data do additional IO
1897-
threadPool.generic().execute(() -> {
1898-
final ActionListener<RepositoryData> existingListener;
1899-
synchronized (BlobStoreRepository.this) {
1900-
existingListener = repoDataInitialized;
1901-
repoDataInitialized = null;
1902-
}
1903-
existingListener.onResponse(repoData);
1904-
logger.trace(
1905-
"[{}] called listeners after initializing repository to generation [{}]",
1906-
metadata.name(),
1907-
repoData.getGenId()
1908-
);
1909-
});
1875+
@Override
1876+
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
1877+
logger.trace(
1878+
"[{}] initialized repository generation in cluster state to [{}]",
1879+
metadata.name(),
1880+
repoData.getGenId()
1881+
);
1882+
// Resolve listeners on generic pool since some callbacks for repository data do additional IO
1883+
threadPool.generic().execute(() -> {
1884+
final ActionListener<RepositoryData> existingListener;
1885+
synchronized (BlobStoreRepository.this) {
1886+
existingListener = repoDataInitialized;
1887+
repoDataInitialized = null;
19101888
}
1911-
}
1912-
),
1913-
onFailure
1914-
),
1915-
this::doGetRepositoryData
1916-
)
1917-
);
1889+
existingListener.onResponse(repoData);
1890+
logger.trace(
1891+
"[{}] called listeners after initializing repository to generation [{}]",
1892+
metadata.name(),
1893+
repoData.getGenId()
1894+
);
1895+
});
1896+
}
1897+
}
1898+
),
1899+
onFailure
1900+
)
1901+
);
19181902
} else {
19191903
logger.trace(
19201904
"[{}] waiting for existing initialization of repository metadata generation in cluster state",
@@ -1926,11 +1910,9 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
19261910
}
19271911

19281912
/**
1929-
* {@link RepositoryData} loading deduplicator. This may only be used with consistent generation repositories, meaning
1930-
* {@link #bestEffortConsistency} must be {@code false}, in which case we can assume that the {@link RepositoryData} loaded is
1931-
* unique for a given value of {@link #metadata} at any point in time.
1913+
* Deduplicator that deduplicates the physical loading of {@link RepositoryData} from the repositories' underlying storage.
19321914
*/
1933-
private final ResultDeduplicator<RepositoryMetadata, RepositoryData> repoDataDeduplicator;
1915+
private final SingleResultDeduplicator<RepositoryData> repoDataLoadDeduplicator;
19341916

19351917
private void doGetRepositoryData(ActionListener<RepositoryData> listener) {
19361918
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.

test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,7 @@ private static ClusterService mockClusterService(ClusterState initialState) {
423423
final ThreadPool threadPool = mock(ThreadPool.class);
424424
when(threadPool.getThreadContext()).thenReturn(threadContext);
425425
when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService());
426+
when(threadPool.executor(ThreadPool.Names.SNAPSHOT_META)).thenReturn(new SameThreadExecutorService());
426427
when(threadPool.generic()).thenReturn(new SameThreadExecutorService());
427428
when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn(
428429
new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10))

0 commit comments

Comments
 (0)