Skip to content

Commit ac798ce

Browse files
committed
Add UnicastConfiguredHostsResolver
The `PeerFinder`, introduced in elastic#32246, obtains the collection of seed addresses configured by the user from a `ConfiguredHostsResolver`. In reality this collection comes from the `UnicastHostsProvider` via a slightly complicated threading model that performs the resolution of hostnames to addresses using a dedicated `ExecutorService`. This commit introduces an adapter to allow the `PeerFinder` to obtain its seed addresses in this manner.
1 parent 2176184 commit ac798ce

File tree

2 files changed

+209
-0
lines changed

2 files changed

+209
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.discovery;
21+
22+
import org.apache.lucene.util.SetOnce;
23+
import org.elasticsearch.common.component.AbstractLifecycleComponent;
24+
import org.elasticsearch.common.settings.Settings;
25+
import org.elasticsearch.common.transport.TransportAddress;
26+
import org.elasticsearch.common.unit.TimeValue;
27+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
28+
import org.elasticsearch.discovery.PeerFinder.ConfiguredHostsResolver;
29+
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
30+
import org.elasticsearch.discovery.zen.UnicastZenPing;
31+
import org.elasticsearch.threadpool.ThreadPool;
32+
import org.elasticsearch.transport.TransportService;
33+
34+
import java.util.List;
35+
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicBoolean;
38+
import java.util.function.Consumer;
39+
import java.util.function.Supplier;
40+
41+
public class UnicastConfiguredHostsResolver extends AbstractLifecycleComponent implements ConfiguredHostsResolver {
42+
43+
private final AtomicBoolean resolveInProgress = new AtomicBoolean();
44+
private final TransportService transportService;
45+
private final UnicastHostsProvider hostsProvider;
46+
private final SetOnce<ExecutorService> executorService = new SetOnce<>();
47+
private final TimeValue resolveTimeout;
48+
private final Supplier<ExecutorService> executorServiceFactory;
49+
50+
public UnicastConfiguredHostsResolver(Settings settings, TransportService transportService, UnicastHostsProvider hostsProvider,
51+
Supplier<ExecutorService> executorServiceFactory) {
52+
super(settings);
53+
this.transportService = transportService;
54+
this.hostsProvider = hostsProvider;
55+
resolveTimeout = UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
56+
this.executorServiceFactory = executorServiceFactory;
57+
}
58+
59+
@Override
60+
protected void doStart() {
61+
executorService.set(executorServiceFactory.get());
62+
}
63+
64+
@Override
65+
protected void doStop() {
66+
ThreadPool.terminate(executorService.get(), 10, TimeUnit.SECONDS);
67+
}
68+
69+
@Override
70+
protected void doClose() {
71+
}
72+
73+
public void resolveConfiguredHosts(Consumer<List<TransportAddress>> consumer) {
74+
assert lifecycle.started() : lifecycle;
75+
76+
if (resolveInProgress.compareAndSet(false, true)) {
77+
transportService.getThreadPool().generic().execute(new AbstractRunnable() {
78+
@Override
79+
public void onFailure(Exception e) {
80+
}
81+
82+
@Override
83+
protected void doRun() {
84+
List<TransportAddress> providedAddresses
85+
= hostsProvider.buildDynamicHosts((hosts, limitPortCounts)
86+
-> UnicastZenPing.resolveHostsLists(executorService.get(), logger, hosts, limitPortCounts,
87+
transportService, resolveTimeout));
88+
89+
consumer.accept(providedAddresses);
90+
}
91+
92+
@Override
93+
public void onAfter() {
94+
super.onAfter();
95+
resolveInProgress.set(false);
96+
}
97+
98+
@Override
99+
public String toString() {
100+
return "UnicastConfiguredHostsResolver resolving unicast hosts list";
101+
}
102+
});
103+
}
104+
}
105+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.discovery;
21+
22+
import org.elasticsearch.common.settings.Settings;
23+
import org.elasticsearch.common.transport.TransportAddress;
24+
import org.elasticsearch.common.util.concurrent.EsExecutors;
25+
import org.elasticsearch.test.ESTestCase;
26+
import org.elasticsearch.threadpool.ThreadPool;
27+
import org.elasticsearch.transport.TransportService;
28+
import org.junit.After;
29+
import org.junit.Before;
30+
31+
import java.util.ArrayList;
32+
import java.util.List;
33+
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.ThreadFactory;
35+
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.atomic.AtomicReference;
37+
38+
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
39+
import static org.hamcrest.Matchers.equalTo;
40+
import static org.hamcrest.core.IsNull.nullValue;
41+
import static org.mockito.Mockito.mock;
42+
import static org.mockito.Mockito.when;
43+
44+
public class UnicastConfiguredHostsResolverTests extends ESTestCase {
45+
46+
private List<TransportAddress> transportAddresses;
47+
private UnicastConfiguredHostsResolver unicastConfiguredHostsResolver;
48+
private ThreadPool threadPool;
49+
50+
@Before
51+
public void startResolver() {
52+
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
53+
threadPool = new ThreadPool(settings);
54+
transportAddresses = new ArrayList<>();
55+
56+
TransportService transportService = mock(TransportService.class);
57+
when(transportService.getThreadPool()).thenReturn(threadPool);
58+
59+
unicastConfiguredHostsResolver = new UnicastConfiguredHostsResolver(
60+
settings, transportService, hostsResolver -> transportAddresses, () -> {
61+
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory("thread-factory-name");
62+
return EsExecutors.newScaling("executor-name",
63+
0, 10, 60, TimeUnit.SECONDS, threadFactory, threadPool.getThreadContext());
64+
});
65+
66+
unicastConfiguredHostsResolver.start();
67+
}
68+
69+
@After
70+
public void stopResolver() {
71+
unicastConfiguredHostsResolver.stop();
72+
threadPool.shutdown();
73+
}
74+
75+
public void testResolvesAddressesInBackgroundAndIgnoresConcurrentCalls() throws Exception {
76+
final AtomicReference<List<TransportAddress>> resolvedAddressesRef = new AtomicReference<>();
77+
final CountDownLatch startLatch = new CountDownLatch(1);
78+
final CountDownLatch endLatch = new CountDownLatch(1);
79+
80+
final int addressCount = randomIntBetween(0, 5);
81+
for (int i = 0; i < addressCount; i++) {
82+
transportAddresses.add(buildNewFakeTransportAddress());
83+
}
84+
85+
unicastConfiguredHostsResolver.resolveConfiguredHosts(resolvedAddresses -> {
86+
try {
87+
assertTrue(startLatch.await(1, TimeUnit.SECONDS));
88+
} catch (InterruptedException e) {
89+
throw new AssertionError(e);
90+
}
91+
resolvedAddressesRef.set(resolvedAddresses);
92+
endLatch.countDown();
93+
});
94+
95+
unicastConfiguredHostsResolver.resolveConfiguredHosts(resolvedAddresses -> {
96+
throw new AssertionError("unexpected concurrent resolution");
97+
});
98+
99+
assertThat(resolvedAddressesRef.get(), nullValue());
100+
startLatch.countDown();
101+
assertTrue(endLatch.await(1, TimeUnit.SECONDS));
102+
assertThat(resolvedAddressesRef.get(), equalTo(transportAddresses));
103+
}
104+
}

0 commit comments

Comments
 (0)