Skip to content

Commit f44ba04

Browse files
authored
[Zen2] Add UnicastConfiguredHostsResolver (#32642)
The `PeerFinder`, introduced in #32246, obtains the collection of seed addresses configured by the user from a `ConfiguredHostsResolver`. In reality this collection comes from the `UnicastHostsProvider` via a slightly complicated threading model that performs the resolution of hostnames to addresses using a dedicated `ExecutorService`. This commit introduces an adapter to allow the `PeerFinder` to obtain its seed addresses in this manner.
1 parent 289e34a commit f44ba04

File tree

2 files changed

+214
-0
lines changed

2 files changed

+214
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.discovery;
21+
22+
import org.apache.lucene.util.SetOnce;
23+
import org.elasticsearch.common.component.AbstractLifecycleComponent;
24+
import org.elasticsearch.common.settings.Settings;
25+
import org.elasticsearch.common.transport.TransportAddress;
26+
import org.elasticsearch.common.unit.TimeValue;
27+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
28+
import org.elasticsearch.common.util.concurrent.EsExecutors;
29+
import org.elasticsearch.discovery.PeerFinder.ConfiguredHostsResolver;
30+
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
31+
import org.elasticsearch.discovery.zen.UnicastZenPing;
32+
import org.elasticsearch.threadpool.ThreadPool;
33+
import org.elasticsearch.transport.TransportService;
34+
35+
import java.util.List;
36+
import java.util.concurrent.ExecutorService;
37+
import java.util.concurrent.ThreadFactory;
38+
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.atomic.AtomicBoolean;
40+
import java.util.function.Consumer;
41+
42+
import static org.elasticsearch.discovery.zen.UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING;
43+
44+
public class UnicastConfiguredHostsResolver extends AbstractLifecycleComponent implements ConfiguredHostsResolver {
45+
46+
private final AtomicBoolean resolveInProgress = new AtomicBoolean();
47+
private final TransportService transportService;
48+
private final UnicastHostsProvider hostsProvider;
49+
private final SetOnce<ExecutorService> executorService = new SetOnce<>();
50+
private final TimeValue resolveTimeout;
51+
52+
public UnicastConfiguredHostsResolver(Settings settings, TransportService transportService, UnicastHostsProvider hostsProvider) {
53+
super(settings);
54+
this.transportService = transportService;
55+
this.hostsProvider = hostsProvider;
56+
resolveTimeout = UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
57+
}
58+
59+
@Override
60+
protected void doStart() {
61+
final int concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
62+
logger.debug("using concurrent_connects [{}], resolve_timeout [{}]", concurrentConnects, resolveTimeout);
63+
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_configured_hosts_resolver]");
64+
executorService.set(EsExecutors.newScaling(nodeName() + "/" + "unicast_configured_hosts_resolver",
65+
0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory, transportService.getThreadPool().getThreadContext()));
66+
}
67+
68+
@Override
69+
protected void doStop() {
70+
ThreadPool.terminate(executorService.get(), 10, TimeUnit.SECONDS);
71+
}
72+
73+
@Override
74+
protected void doClose() {
75+
}
76+
77+
@Override
78+
public void resolveConfiguredHosts(Consumer<List<TransportAddress>> consumer) {
79+
if (lifecycle.started() == false) {
80+
logger.debug("resolveConfiguredHosts: lifecycle is {}, not proceeding", lifecycle);
81+
return;
82+
}
83+
84+
if (resolveInProgress.compareAndSet(false, true)) {
85+
transportService.getThreadPool().generic().execute(new AbstractRunnable() {
86+
@Override
87+
public void onFailure(Exception e) {
88+
logger.debug("failure when resolving unicast hosts list", e);
89+
}
90+
91+
@Override
92+
protected void doRun() {
93+
if (lifecycle.started() == false) {
94+
logger.debug("resolveConfiguredHosts.doRun: lifecycle is {}, not proceeding", lifecycle);
95+
return;
96+
}
97+
98+
List<TransportAddress> providedAddresses
99+
= hostsProvider.buildDynamicHosts((hosts, limitPortCounts)
100+
-> UnicastZenPing.resolveHostsLists(executorService.get(), logger, hosts, limitPortCounts,
101+
transportService, resolveTimeout));
102+
103+
consumer.accept(providedAddresses);
104+
}
105+
106+
@Override
107+
public void onAfter() {
108+
resolveInProgress.set(false);
109+
}
110+
111+
@Override
112+
public String toString() {
113+
return "UnicastConfiguredHostsResolver resolving unicast hosts list";
114+
}
115+
});
116+
}
117+
}
118+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.discovery;
21+
22+
import org.elasticsearch.common.settings.Settings;
23+
import org.elasticsearch.common.transport.TransportAddress;
24+
import org.elasticsearch.test.ESTestCase;
25+
import org.elasticsearch.threadpool.TestThreadPool;
26+
import org.elasticsearch.threadpool.ThreadPool;
27+
import org.elasticsearch.transport.TransportService;
28+
import org.junit.After;
29+
import org.junit.Before;
30+
31+
import java.util.ArrayList;
32+
import java.util.List;
33+
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.atomic.AtomicReference;
36+
37+
import static org.hamcrest.Matchers.equalTo;
38+
import static org.hamcrest.core.IsNull.nullValue;
39+
import static org.mockito.Mockito.mock;
40+
import static org.mockito.Mockito.when;
41+
42+
public class UnicastConfiguredHostsResolverTests extends ESTestCase {
43+
44+
private List<TransportAddress> transportAddresses;
45+
private UnicastConfiguredHostsResolver unicastConfiguredHostsResolver;
46+
private ThreadPool threadPool;
47+
48+
@Before
49+
public void startResolver() {
50+
threadPool = new TestThreadPool("node");
51+
transportAddresses = new ArrayList<>();
52+
53+
TransportService transportService = mock(TransportService.class);
54+
when(transportService.getThreadPool()).thenReturn(threadPool);
55+
56+
unicastConfiguredHostsResolver
57+
= new UnicastConfiguredHostsResolver(Settings.EMPTY, transportService, hostsResolver -> transportAddresses);
58+
unicastConfiguredHostsResolver.start();
59+
}
60+
61+
@After
62+
public void stopResolver() {
63+
unicastConfiguredHostsResolver.stop();
64+
threadPool.shutdown();
65+
}
66+
67+
public void testResolvesAddressesInBackgroundAndIgnoresConcurrentCalls() throws Exception {
68+
final AtomicReference<List<TransportAddress>> resolvedAddressesRef = new AtomicReference<>();
69+
final CountDownLatch startLatch = new CountDownLatch(1);
70+
final CountDownLatch endLatch = new CountDownLatch(1);
71+
72+
final int addressCount = randomIntBetween(0, 5);
73+
for (int i = 0; i < addressCount; i++) {
74+
transportAddresses.add(buildNewFakeTransportAddress());
75+
}
76+
77+
unicastConfiguredHostsResolver.resolveConfiguredHosts(resolvedAddresses -> {
78+
try {
79+
assertTrue(startLatch.await(30, TimeUnit.SECONDS));
80+
} catch (InterruptedException e) {
81+
throw new AssertionError(e);
82+
}
83+
resolvedAddressesRef.set(resolvedAddresses);
84+
endLatch.countDown();
85+
});
86+
87+
unicastConfiguredHostsResolver.resolveConfiguredHosts(resolvedAddresses -> {
88+
throw new AssertionError("unexpected concurrent resolution");
89+
});
90+
91+
assertThat(resolvedAddressesRef.get(), nullValue());
92+
startLatch.countDown();
93+
assertTrue(endLatch.await(30, TimeUnit.SECONDS));
94+
assertThat(resolvedAddressesRef.get(), equalTo(transportAddresses));
95+
}
96+
}

0 commit comments

Comments
 (0)