Skip to content

Commit 0f27c0b

Browse files
committed
Extend systemd timeout during startup (elastic#49784)
When we are notifying systemd that we are fully started up, it can be that we do not notify systemd before its default timeout of sixty seconds elapses (e.g., if we are upgrading on-disk metadata). In this case, we need to notify systemd to extend this timeout so that we are not abruptly terminated. We do this by repeatedly sending EXTEND_TIMEOUT_USEC to extend the timeout by thirty seconds; we do this every fifteen seconds. This will prevent systemd from abruptly terminating us during a long startup. We cancel the scheduled execution of this notification after we have successfully started up.
1 parent d5eb937 commit 0f27c0b

File tree

2 files changed

+100
-15
lines changed

2 files changed

+100
-15
lines changed

modules/systemd/src/main/java/org/elasticsearch/systemd/SystemdPlugin.java

+54-3
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,22 @@
2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.logging.log4j.Logger;
2424
import org.elasticsearch.Build;
25+
import org.elasticsearch.client.Client;
26+
import org.elasticsearch.cluster.service.ClusterService;
27+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
28+
import org.elasticsearch.common.unit.TimeValue;
29+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
30+
import org.elasticsearch.env.Environment;
31+
import org.elasticsearch.env.NodeEnvironment;
2532
import org.elasticsearch.plugins.ClusterPlugin;
2633
import org.elasticsearch.plugins.Plugin;
34+
import org.elasticsearch.script.ScriptService;
35+
import org.elasticsearch.threadpool.Scheduler;
36+
import org.elasticsearch.threadpool.ThreadPool;
37+
import org.elasticsearch.watcher.ResourceWatcherService;
38+
39+
import java.util.Collection;
40+
import java.util.Collections;
2741

2842
public class SystemdPlugin extends Plugin implements ClusterPlugin {
2943

@@ -62,8 +76,44 @@ public SystemdPlugin() {
6276
enabled = Boolean.TRUE.toString().equals(esSDNotify);
6377
}
6478

79+
Scheduler.Cancellable extender;
80+
81+
@Override
82+
public Collection<Object> createComponents(
83+
final Client client,
84+
final ClusterService clusterService,
85+
final ThreadPool threadPool,
86+
final ResourceWatcherService resourceWatcherService,
87+
final ScriptService scriptService,
88+
final NamedXContentRegistry xContentRegistry,
89+
final Environment environment,
90+
final NodeEnvironment nodeEnvironment,
91+
final NamedWriteableRegistry namedWriteableRegistry) {
92+
if (enabled) {
93+
/*
94+
* Since we have set the service type to notify, by default systemd will wait up to sixty seconds for the process to send the
95+
* READY=1 status via sd_notify. Since our startup can take longer than that (e.g., if we are upgrading on-disk metadata) then
96+
* we need to repeatedly notify systemd that we are still starting up by sending EXTEND_TIMEOUT_USEC with an extension to the
97+
* timeout. Therefore, every fifteen seconds we send systemd a message via sd_notify to extend the timeout by thirty seconds.
98+
* We will cancel this scheduled task after we successfully notify systemd that we are ready.
99+
*/
100+
extender = threadPool.scheduleWithFixedDelay(
101+
() -> {
102+
final int rc = sd_notify(0, "EXTEND_TIMEOUT_USEC=30000000");
103+
if (rc < 0) {
104+
logger.warn("extending startup timeout via sd_notify failed with [{}]", rc);
105+
}
106+
},
107+
TimeValue.timeValueSeconds(15),
108+
ThreadPool.Names.SAME);
109+
}
110+
return Collections.emptyList();
111+
}
112+
65113
int sd_notify(@SuppressWarnings("SameParameterValue") final int unset_environment, final String state) {
66-
return Libsystemd.sd_notify(unset_environment, state);
114+
final int rc = Libsystemd.sd_notify(unset_environment, state);
115+
logger.trace("sd_notify({}, {}) returned [{}]", unset_environment, state, rc);
116+
return rc;
67117
}
68118

69119
@Override
@@ -72,11 +122,13 @@ public void onNodeStarted() {
72122
return;
73123
}
74124
final int rc = sd_notify(0, "READY=1");
75-
logger.trace("sd_notify returned [{}]", rc);
76125
if (rc < 0) {
77126
// treat failure to notify systemd of readiness as a startup failure
78127
throw new RuntimeException("sd_notify returned error [" + rc + "]");
79128
}
129+
assert extender != null;
130+
final boolean cancelled = extender.cancel();
131+
assert cancelled;
80132
}
81133

82134
@Override
@@ -85,7 +137,6 @@ public void close() {
85137
return;
86138
}
87139
final int rc = sd_notify(0, "STOPPING=1");
88-
logger.trace("sd_notify returned [{}]", rc);
89140
if (rc < 0) {
90141
// do not treat failure to notify systemd of stopping as a failure
91142
logger.warn("sd_notify returned error [{}]", rc);

modules/systemd/src/test/java/org/elasticsearch/systemd/SystemdPluginTests.java

+46-12
Original file line numberDiff line numberDiff line change
@@ -21,45 +21,70 @@
2121

2222
import org.elasticsearch.Build;
2323
import org.elasticsearch.common.CheckedConsumer;
24+
import org.elasticsearch.common.unit.TimeValue;
2425
import org.elasticsearch.test.ESTestCase;
2526
import org.elasticsearch.test.hamcrest.OptionalMatchers;
27+
import org.elasticsearch.threadpool.Scheduler;
28+
import org.elasticsearch.threadpool.ThreadPool;
2629

2730
import java.io.IOException;
2831
import java.util.Optional;
2932
import java.util.concurrent.atomic.AtomicBoolean;
3033
import java.util.concurrent.atomic.AtomicInteger;
3134
import java.util.concurrent.atomic.AtomicReference;
32-
import java.util.function.Consumer;
35+
import java.util.function.BiConsumer;
3336

3437
import static org.hamcrest.Matchers.containsString;
3538
import static org.hamcrest.Matchers.equalTo;
3639
import static org.hamcrest.Matchers.hasToString;
3740
import static org.hamcrest.Matchers.instanceOf;
41+
import static org.mockito.Matchers.any;
42+
import static org.mockito.Matchers.eq;
43+
import static org.mockito.Mockito.mock;
44+
import static org.mockito.Mockito.verify;
45+
import static org.mockito.Mockito.when;
3846

3947
public class SystemdPluginTests extends ESTestCase {
4048

4149
private final Build.Type randomPackageBuildType = randomFrom(Build.Type.DEB, Build.Type.RPM);
4250
private final Build.Type randomNonPackageBuildType =
4351
randomValueOtherThanMany(t -> t == Build.Type.DEB || t == Build.Type.RPM, () -> randomFrom(Build.Type.values()));
4452

53+
final Scheduler.Cancellable extender = mock(Scheduler.Cancellable.class);
54+
final ThreadPool threadPool = mock(ThreadPool.class);
55+
56+
{
57+
when(extender.cancel()).thenReturn(true);
58+
when(threadPool.scheduleWithFixedDelay(any(Runnable.class), eq(TimeValue.timeValueSeconds(15)), eq(ThreadPool.Names.SAME)))
59+
.thenReturn(extender);
60+
}
61+
4562
public void testIsEnabled() {
4663
final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, Boolean.TRUE.toString());
64+
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null);
4765
assertTrue(plugin.isEnabled());
66+
assertNotNull(plugin.extender);
4867
}
4968

5069
public void testIsNotPackageDistribution() {
5170
final SystemdPlugin plugin = new SystemdPlugin(false, randomNonPackageBuildType, Boolean.TRUE.toString());
71+
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null);
5272
assertFalse(plugin.isEnabled());
73+
assertNull(plugin.extender);
5374
}
5475

5576
public void testIsImplicitlyNotEnabled() {
5677
final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, null);
78+
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null);
5779
assertFalse(plugin.isEnabled());
80+
assertNull(plugin.extender);
5881
}
5982

6083
public void testIsExplicitlyNotEnabled() {
6184
final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, Boolean.FALSE.toString());
85+
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null);
6286
assertFalse(plugin.isEnabled());
87+
assertNull(plugin.extender);
6388
}
6489

6590
public void testInvalid() {
@@ -75,15 +100,18 @@ public void testOnNodeStartedSuccess() {
75100
runTestOnNodeStarted(
76101
Boolean.TRUE.toString(),
77102
randomIntBetween(0, Integer.MAX_VALUE),
78-
maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
103+
(maybe, plugin) -> {
104+
assertThat(maybe, OptionalMatchers.isEmpty());
105+
verify(plugin.extender).cancel();
106+
});
79107
}
80108

81109
public void testOnNodeStartedFailure() {
82110
final int rc = randomIntBetween(Integer.MIN_VALUE, -1);
83111
runTestOnNodeStarted(
84112
Boolean.TRUE.toString(),
85113
rc,
86-
maybe -> {
114+
(maybe, plugin) -> {
87115
assertThat(maybe, OptionalMatchers.isPresent());
88116
// noinspection OptionalGetWithoutIsPresent
89117
assertThat(maybe.get(), instanceOf(RuntimeException.class));
@@ -95,48 +123,48 @@ public void testOnNodeStartedNotEnabled() {
95123
runTestOnNodeStarted(
96124
Boolean.FALSE.toString(),
97125
randomInt(),
98-
maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
126+
(maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty()));
99127
}
100128

101129
private void runTestOnNodeStarted(
102130
final String esSDNotify,
103131
final int rc,
104-
final Consumer<Optional<Exception>> assertions) {
132+
final BiConsumer<Optional<Exception>, SystemdPlugin> assertions) {
105133
runTest(esSDNotify, rc, assertions, SystemdPlugin::onNodeStarted, "READY=1");
106134
}
107135

108136
public void testCloseSuccess() {
109137
runTestClose(
110138
Boolean.TRUE.toString(),
111139
randomIntBetween(1, Integer.MAX_VALUE),
112-
maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
140+
(maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty()));
113141
}
114142

115143
public void testCloseFailure() {
116144
runTestClose(
117145
Boolean.TRUE.toString(),
118146
randomIntBetween(Integer.MIN_VALUE, -1),
119-
maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
147+
(maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty()));
120148
}
121149

122150
public void testCloseNotEnabled() {
123151
runTestClose(
124152
Boolean.FALSE.toString(),
125153
randomInt(),
126-
maybe -> assertThat(maybe, OptionalMatchers.isEmpty()));
154+
(maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty()));
127155
}
128156

129157
private void runTestClose(
130158
final String esSDNotify,
131159
final int rc,
132-
final Consumer<Optional<Exception>> assertions) {
160+
final BiConsumer<Optional<Exception>, SystemdPlugin> assertions) {
133161
runTest(esSDNotify, rc, assertions, SystemdPlugin::close, "STOPPING=1");
134162
}
135163

136164
private void runTest(
137165
final String esSDNotify,
138166
final int rc,
139-
final Consumer<Optional<Exception>> assertions,
167+
final BiConsumer<Optional<Exception>, SystemdPlugin> assertions,
140168
final CheckedConsumer<SystemdPlugin, IOException> invocation,
141169
final String expectedState) {
142170
final AtomicBoolean invoked = new AtomicBoolean();
@@ -153,16 +181,22 @@ int sd_notify(final int unset_environment, final String state) {
153181
}
154182

155183
};
184+
plugin.createComponents(null, null, threadPool, null, null, null, null, null, null);
185+
if (Boolean.TRUE.toString().equals(esSDNotify)) {
186+
assertNotNull(plugin.extender);
187+
} else {
188+
assertNull(plugin.extender);
189+
}
156190

157191
boolean success = false;
158192
try {
159193
invocation.accept(plugin);
160194
success = true;
161195
} catch (final Exception e) {
162-
assertions.accept(Optional.of(e));
196+
assertions.accept(Optional.of(e), plugin);
163197
}
164198
if (success) {
165-
assertions.accept(Optional.empty());
199+
assertions.accept(Optional.empty(), plugin);
166200
}
167201
if (Boolean.TRUE.toString().equals(esSDNotify)) {
168202
assertTrue(invoked.get());

0 commit comments

Comments
 (0)