Skip to content

Commit 65c48f8

Browse files
move discover_cluster_id into common
1 parent 4bd84c7 commit 65c48f8

File tree

2 files changed

+36
-31
lines changed

2 files changed

+36
-31
lines changed

lib/logstash/outputs/elasticsearch.rb

Lines changed: 8 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -262,12 +262,18 @@ def register
262262
# To support BWC, we check if DLQ exists in core (< 5.4). If it doesn't, we use nil to resort to previous behavior.
263263
@dlq_writer = dlq_enabled? ? execution_context.dlq_writer : nil
264264

265+
check_action_validity
266+
265267
# the license_checking behaviour in the Pool class is externalized in the LogStash::ElasticSearchOutputLicenseChecker
266268
# class defined in license_check.rb. This license checking is specific to the elasticsearch output here and passed
267269
# to build_client down to the Pool class.
268270
build_client(LicenseChecker.new(@logger))
269-
setup_after_successful_connection
270-
check_action_validity
271+
272+
@template_installer = setup_after_successful_connection do
273+
discover_cluster_uuid
274+
install_template
275+
setup_ilm if ilm_in_use?
276+
end
271277
@bulk_request_metrics = metric.namespace(:bulk_requests)
272278
@document_level_metrics = metric.namespace(:documents)
273279
@logger.info("New Elasticsearch output", :class => self.class.name, :hosts => @hosts.map(&:sanitized).map(&:to_s))
@@ -373,19 +379,6 @@ def retry_on_conflict_action_name
373379

374380
private
375381

376-
def discover_cluster_uuid
377-
return unless defined?(plugin_metadata)
378-
cluster_info = client.get('/')
379-
plugin_metadata.set(:cluster_uuid, cluster_info['cluster_uuid'])
380-
rescue => e
381-
# TODO introducing this logging message breaks many tests that need refactoring
382-
# @logger.error("Unable to retrieve elasticsearch cluster uuid", error => e.message)
383-
end
384-
385-
def successful_connection?
386-
!!maximum_seen_major_version
387-
end
388-
389382
def routing_field_name
390383
maximum_seen_major_version >= 6 ? :routing : :_routing
391384
end
@@ -432,22 +425,6 @@ def install_template
432425
@template_installed.make_true
433426
end
434427

435-
def setup_after_successful_connection
436-
@template_installer ||= Thread.new do
437-
sleep_interval = @retry_initial_interval
438-
until successful_connection? || @stopping.true?
439-
@logger.debug("Waiting for connectivity to Elasticsearch cluster. Retrying in #{sleep_interval}s")
440-
Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
441-
sleep_interval = next_sleep_interval(sleep_interval)
442-
end
443-
if successful_connection?
444-
discover_cluster_uuid
445-
install_template
446-
setup_ilm if ilm_in_use?
447-
end
448-
end
449-
end
450-
451428
def setup_ecs_compatibility_related_defaults
452429
case ecs_compatibility
453430
when :disabled

lib/logstash/plugin_mixins/elasticsearch/common.rb

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,34 @@ def maximum_seen_major_version
119119
client.maximum_seen_major_version
120120
end
121121

122+
def successful_connection?
123+
!!maximum_seen_major_version
124+
end
125+
126+
# launch a thread that waits for an initial successful connection to the ES cluster to call the given block
127+
# @param block [Proc] the block to execute upon initial successful connection
128+
# @return [Thread] the successful connection wait thread
129+
def setup_after_successful_connection(&block)
130+
Thread.new do
131+
sleep_interval = @retry_initial_interval
132+
until successful_connection? || @stopping.true?
133+
@logger.debug("Waiting for connectivity to Elasticsearch cluster. Retrying in #{sleep_interval}s")
134+
Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
135+
sleep_interval = next_sleep_interval(sleep_interval)
136+
end
137+
block.call if successful_connection?
138+
end
139+
end
140+
141+
def discover_cluster_uuid
142+
return unless defined?(plugin_metadata)
143+
cluster_info = client.get('/')
144+
plugin_metadata.set(:cluster_uuid, cluster_info['cluster_uuid'])
145+
rescue => e
146+
# TODO introducing this logging message breaks many tests that need refactoring
147+
# @logger.error("Unable to retrieve elasticsearch cluster uuid", error => e.message)
148+
end
149+
122150
def retrying_submit(actions)
123151
# Initially we submit the full list of actions
124152
submit_actions = actions

0 commit comments

Comments
 (0)