|
21 | 21 | import org.elasticsearch.client.RestClientBuilder;
|
22 | 22 | import org.elasticsearch.client.sniff.ElasticsearchNodesSniffer;
|
23 | 23 | import org.elasticsearch.client.sniff.Sniffer;
|
| 24 | +import org.elasticsearch.cluster.ClusterStateListener; |
24 | 25 | import org.elasticsearch.cluster.service.ClusterService;
|
25 | 26 | import org.elasticsearch.common.Nullable;
|
26 | 27 | import org.elasticsearch.common.Strings;
|
@@ -363,7 +364,7 @@ public Iterator<Setting<?>> settings() {
|
363 | 364 | private static final ConcurrentHashMap<String, SecureString> SECURE_AUTH_PASSWORDS = new ConcurrentHashMap<>();
|
364 | 365 | private final ThreadContext threadContext;
|
365 | 366 | private final DateFormatter dateTimeFormatter;
|
366 |
| - |
| 367 | + private final ClusterStateListener onLocalMasterListener; |
367 | 368 | /**
|
368 | 369 | * Create an {@link HttpExporter}.
|
369 | 370 | *
|
@@ -424,6 +425,14 @@ public HttpExporter(final Config config, final SSLService sslService, final Thre
|
424 | 425 |
|
425 | 426 | // mark resources as dirty after any node failure or license change
|
426 | 427 | listener.setResource(resource);
|
| 428 | + |
| 429 | + //for a mixed cluster upgrade, ensure that if master changes and this is the master, allow the resources to re-publish |
| 430 | + onLocalMasterListener = clusterChangedEvent -> { |
| 431 | + if (clusterChangedEvent.nodesDelta().masterNodeChanged() && clusterChangedEvent.localNodeMaster()) { |
| 432 | + resource.markDirty(); |
| 433 | + } |
| 434 | + }; |
| 435 | + config.clusterService().addListener(onLocalMasterListener); |
427 | 436 | }
|
428 | 437 |
|
429 | 438 | /**
|
@@ -864,9 +873,11 @@ public void openBulk(final ActionListener<ExportBulk> listener) {
|
864 | 873 | @Override
|
865 | 874 | public void doClose() {
|
866 | 875 | try {
|
| 876 | + config.clusterService().removeListener(onLocalMasterListener); |
867 | 877 | if (sniffer != null) {
|
868 | 878 | sniffer.close();
|
869 | 879 | }
|
| 880 | + |
870 | 881 | } catch (Exception e) {
|
871 | 882 | logger.error("an error occurred while closing the internal client sniffer", e);
|
872 | 883 | } finally {
|
|
0 commit comments