Skip to content

Commit e383beb

Browse files
committed
Merge remote-tracking branch 'origin/snapshot-lifecycle-management' into slm-retention
2 parents 9879a4b + 88d915e commit e383beb

File tree

869 files changed

+20039
-6929
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

869 files changed

+20039
-6929
lines changed

buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchCluster.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ public void start() {
229229
if (Version.fromString(node.getVersion()).getMajor() >= 7) {
230230
node.defaultConfig.put("cluster.initial_master_nodes", "[" + nodeNames + "]");
231231
node.defaultConfig.put("discovery.seed_providers", "file");
232+
node.defaultConfig.put("discovery.seed_hosts", "[]");
232233
}
233234
}
234235
node.start();
@@ -286,14 +287,13 @@ public List<String> getAllTransportPortURI() {
286287
}
287288

288289
public void waitForAllConditions() {
289-
long startedAt = System.currentTimeMillis();
290290
LOGGER.info("Waiting for nodes");
291291
nodes.forEach(ElasticsearchNode::waitForAllConditions);
292292

293293
writeUnicastHostsFiles();
294294

295295
LOGGER.info("Starting to wait for cluster to form");
296-
waitForConditions(waitConditions, startedAt, CLUSTER_UP_TIMEOUT, CLUSTER_UP_TIMEOUT_UNIT, this);
296+
waitForConditions(waitConditions, System.currentTimeMillis(), CLUSTER_UP_TIMEOUT, CLUSTER_UP_TIMEOUT_UNIT, this);
297297
}
298298

299299
@Override

buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/ElasticsearchNode.java

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import java.nio.file.Files;
3838
import java.nio.file.Path;
3939
import java.nio.file.StandardCopyOption;
40+
import java.nio.file.StandardOpenOption;
41+
import java.time.Instant;
4042
import java.util.ArrayList;
4143
import java.util.Arrays;
4244
import java.util.Collection;
@@ -65,8 +67,10 @@ public class ElasticsearchNode implements TestClusterConfiguration {
6567
private static final Logger LOGGER = Logging.getLogger(ElasticsearchNode.class);
6668
private static final int ES_DESTROY_TIMEOUT = 20;
6769
private static final TimeUnit ES_DESTROY_TIMEOUT_UNIT = TimeUnit.SECONDS;
68-
private static final int NODE_UP_TIMEOUT = 60;
69-
private static final TimeUnit NODE_UP_TIMEOUT_UNIT = TimeUnit.SECONDS;
70+
private static final int NODE_UP_TIMEOUT = 2;
71+
private static final TimeUnit NODE_UP_TIMEOUT_UNIT = TimeUnit.MINUTES;
72+
private static final int ADDITIONAL_CONFIG_TIMEOUT = 15;
73+
private static final TimeUnit ADDITIONAL_CONFIG_TIMEOUT_UNIT = TimeUnit.SECONDS;
7074
private static final List<String> OVERRIDABLE_SETTINGS = Arrays.asList(
7175
"path.repo",
7276
"discovery.seed_providers"
@@ -310,6 +314,7 @@ public synchronized void start() {
310314

311315
try {
312316
if (isWorkingDirConfigured == false) {
317+
logToProcessStdout("Configuring working directory: " + workingDir);
313318
// Only configure working dir once so we don't loose data on restarts
314319
isWorkingDirConfigured = true;
315320
createWorkingDir(distroArtifact);
@@ -319,12 +324,16 @@ public synchronized void start() {
319324
}
320325
createConfiguration();
321326

322-
plugins.forEach(plugin -> runElaticsearchBinScript(
323-
"elasticsearch-plugin",
324-
"install", "--batch", plugin.toString())
325-
);
327+
if(plugins.isEmpty() == false) {
328+
logToProcessStdout("Installing " + plugins.size() + " plugins");
329+
plugins.forEach(plugin -> runElaticsearchBinScript(
330+
"elasticsearch-plugin",
331+
"install", "--batch", plugin.toString())
332+
);
333+
}
326334

327335
if (keystoreSettings.isEmpty() == false || keystoreFiles.isEmpty() == false) {
336+
logToProcessStdout("Adding " + keystoreSettings.size() + " keystore settings and " + keystoreFiles.size() + " keystore files");
328337
runElaticsearchBinScript("elasticsearch-keystore", "create");
329338

330339
checkSuppliers("Keystore", keystoreSettings.values());
@@ -347,6 +356,7 @@ public synchronized void start() {
347356
copyExtraConfigFiles();
348357

349358
if (isSettingMissingOrTrue("xpack.security.enabled")) {
359+
logToProcessStdout("Setting up " + credentials.size() + " users");
350360
if (credentials.isEmpty()) {
351361
user(Collections.emptyMap());
352362
}
@@ -358,9 +368,25 @@ public synchronized void start() {
358368
));
359369
}
360370

371+
logToProcessStdout("Starting Elasticsearch process");
361372
startElasticsearchProcess();
362373
}
363374

375+
private void logToProcessStdout(String message) {
376+
try {
377+
if (Files.exists(esStdoutFile.getParent()) == false) {
378+
Files.createDirectories(esStdoutFile.getParent());
379+
}
380+
Files.write(
381+
esStdoutFile,
382+
("[" + Instant.now().toString() + "] [BUILD] " + message + "\n").getBytes(StandardCharsets.UTF_8),
383+
StandardOpenOption.CREATE, StandardOpenOption.APPEND
384+
);
385+
} catch (IOException e) {
386+
throw new UncheckedIOException(e);
387+
}
388+
}
389+
364390
@Override
365391
public void restart() {
366392
LOGGER.info("Restarting {}", this);
@@ -380,6 +406,9 @@ private boolean isSettingMissingOrTrue(String name) {
380406
}
381407

382408
private void copyExtraConfigFiles() {
409+
if (extraConfigFiles.isEmpty() == false) {
410+
logToProcessStdout("Setting up " + extraConfigFiles.size() + " additional config files");
411+
}
383412
extraConfigFiles.forEach((destination, from) -> {
384413
if (Files.exists(from.toPath()) == false) {
385414
throw new TestClustersException("Can't create extra config file from " + from + " for " + this +
@@ -398,6 +427,7 @@ private void copyExtraConfigFiles() {
398427

399428
private void installModules() {
400429
if (distribution == Distribution.INTEG_TEST) {
430+
logToProcessStdout("Installing " + modules.size() + "modules");
401431
for (File module : modules) {
402432
Path destination = workingDir.resolve("modules").resolve(module.getName().replace(".zip", "").replace("-" + version, ""));
403433

@@ -843,7 +873,23 @@ public boolean isProcessAlive() {
843873
}
844874

845875
void waitForAllConditions() {
846-
waitForConditions(waitConditions, System.currentTimeMillis(), NODE_UP_TIMEOUT, NODE_UP_TIMEOUT_UNIT, this);
876+
waitForConditions(
877+
waitConditions,
878+
System.currentTimeMillis(),
879+
NODE_UP_TIMEOUT_UNIT.toMillis(NODE_UP_TIMEOUT) +
880+
// Installing plugins at config time and loading them when nods start requires additional time we need to
881+
// account for
882+
ADDITIONAL_CONFIG_TIMEOUT_UNIT.toMillis(ADDITIONAL_CONFIG_TIMEOUT *
883+
(
884+
plugins.size() +
885+
keystoreFiles.size() +
886+
keystoreSettings.size() +
887+
credentials.size()
888+
)
889+
),
890+
TimeUnit.MILLISECONDS,
891+
this
892+
);
847893
}
848894

849895
@Override
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package org.elasticsearch.gradle.testclusters;
2+
3+
import org.gradle.api.logging.Logger;
4+
import org.gradle.api.logging.Logging;
5+
6+
import java.util.Collection;
7+
import java.util.HashSet;
8+
import java.util.Iterator;
9+
import java.util.Set;
10+
11+
/**
12+
* Keep an inventory of all running Clusters and stop them when interrupted
13+
*
14+
* This takes advantage of the fact that Gradle interrupts all the threads in the daemon when the build completes.
15+
*/
16+
public class TestClusterCleanupOnShutdown implements Runnable {
17+
18+
private final Logger logger = Logging.getLogger(TestClusterCleanupOnShutdown.class);
19+
20+
private Set<ElasticsearchCluster> clustersToWatch = new HashSet<>();
21+
22+
public void watch(Collection<ElasticsearchCluster> cluster) {
23+
synchronized (clustersToWatch) {
24+
clustersToWatch.addAll(clustersToWatch);
25+
}
26+
}
27+
28+
public void unWatch(Collection<ElasticsearchCluster> cluster) {
29+
synchronized (clustersToWatch) {
30+
clustersToWatch.removeAll(clustersToWatch);
31+
}
32+
}
33+
34+
@Override
35+
public void run() {
36+
try {
37+
while (true) {
38+
Thread.sleep(Long.MAX_VALUE);
39+
}
40+
} catch (InterruptedException interrupted) {
41+
synchronized (clustersToWatch) {
42+
if (clustersToWatch.isEmpty()) {
43+
return;
44+
}
45+
logger.info("Cleanup thread was interrupted, shutting down all clusters");
46+
Iterator<ElasticsearchCluster> iterator = clustersToWatch.iterator();
47+
while (iterator.hasNext()) {
48+
ElasticsearchCluster cluster = iterator.next();
49+
iterator.remove();
50+
try {
51+
cluster.stop(false);
52+
} catch (Exception e) {
53+
logger.warn("Could not shut down {}", cluster, e);
54+
}
55+
}
56+
}
57+
}
58+
}
59+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package org.elasticsearch.gradle.testclusters;
2+
3+
import org.gradle.api.Project;
4+
import org.gradle.api.logging.Logger;
5+
import org.gradle.api.logging.Logging;
6+
7+
import java.util.concurrent.ExecutorService;
8+
import java.util.concurrent.Executors;
9+
import java.util.concurrent.TimeUnit;
10+
11+
/**
12+
* This extensions was meant to be used internally by testclusters
13+
*
14+
* It holds synchronization primitives needed to implement the rate limiting.
15+
* This is tricky because we can't use Gradle workers as there's no way to make sure that tests and their clusters are
16+
* allocated atomically, so we could be in a situation where all workers are tests waiting for clusters to start up.
17+
*
18+
* Also auto configures cleanup of executors to make sure we don't leak threads in the daemon.
19+
*/
20+
public class TestClustersCleanupExtension {
21+
22+
private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1;
23+
private static final TimeUnit EXECUTOR_SHUTDOWN_TIMEOUT_UNIT = TimeUnit.MINUTES;
24+
25+
private static final Logger logger = Logging.getLogger(TestClustersCleanupExtension.class);
26+
27+
private final ExecutorService executorService;
28+
private final TestClusterCleanupOnShutdown cleanupThread;
29+
30+
public TestClustersCleanupExtension() {
31+
executorService = Executors.newSingleThreadExecutor();
32+
cleanupThread = new TestClusterCleanupOnShutdown();
33+
executorService.submit(cleanupThread);
34+
}
35+
36+
37+
public static void createExtension(Project project) {
38+
if (project.getRootProject().getExtensions().findByType(TestClustersCleanupExtension.class) != null) {
39+
return;
40+
}
41+
// Configure the extension on the root project so we have a single instance per run
42+
TestClustersCleanupExtension ext = project.getRootProject().getExtensions().create(
43+
"__testclusters_rate_limit",
44+
TestClustersCleanupExtension.class
45+
);
46+
Thread shutdownHook = new Thread(ext.cleanupThread::run);
47+
Runtime.getRuntime().addShutdownHook(shutdownHook);
48+
project.getGradle().buildFinished(buildResult -> {
49+
ext.executorService.shutdownNow();
50+
try {
51+
if (ext.executorService.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, EXECUTOR_SHUTDOWN_TIMEOUT_UNIT) == false) {
52+
throw new IllegalStateException(
53+
"Failed to shut down executor service after " +
54+
EXECUTOR_SHUTDOWN_TIMEOUT + " " + EXECUTOR_SHUTDOWN_TIMEOUT_UNIT
55+
);
56+
}
57+
} catch (InterruptedException e) {
58+
Thread.currentThread().interrupt();
59+
}
60+
try {
61+
if (false == Runtime.getRuntime().removeShutdownHook(shutdownHook)) {
62+
logger.warn("Trying to deregister shutdown hook when it was not registered.");
63+
}
64+
} catch (IllegalStateException ese) {
65+
// Thrown when shutdown is in progress
66+
logger.warn("Can't remove shutdown hook", ese);
67+
}
68+
});
69+
}
70+
71+
public TestClusterCleanupOnShutdown getCleanupThread() {
72+
return cleanupThread;
73+
}
74+
}

0 commit comments

Comments
 (0)