From f50240288e561f00ebb99c4c2612c752c75de3df Mon Sep 17 00:00:00 2001 From: Casey Weed Date: Thu, 12 Dec 2019 10:21:16 -0500 Subject: [PATCH] Add sniffing_attributes option --- lib/logstash/outputs/elasticsearch.rb | 4 +++ .../outputs/elasticsearch/http_client.rb | 1 + .../outputs/elasticsearch/http_client/pool.rb | 27 ++++++++++++------- .../elasticsearch/http_client_builder.rb | 1 + 4 files changed, 23 insertions(+), 10 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index b7654506f..42fe92f65 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -180,6 +180,10 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # do not use full URL here, only paths, e.g. "/sniff/_nodes/http" config :sniffing_path, :validate => :string + # Node attribute(s) to filter for. Those that possess and match the following hash of attributes + # will be selected. + config :sniffing_attributes, :validate => :hash, :default => {} + # Set the address of a forward HTTP proxy. # This used to accept hashes as arguments but now only accepts # arguments of the URI type to prevent leaking credentials. diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index 32a37e82a..4456a9074 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -289,6 +289,7 @@ def build_pool(options) :sniffing => sniffing, :sniffer_delay => options[:sniffer_delay], :sniffing_path => options[:sniffing_path], + :sniffing_attributes => options[:sniffing_attributes], :healthcheck_path => options[:healthcheck_path], :resurrect_delay => options[:resurrect_delay], :url_normalizer => self.method(:host_to_url), diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index 467141eaf..38a9ef747 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -28,7 +28,7 @@ def message end end - attr_reader :logger, :adapter, :sniffing, :sniffer_delay, :resurrect_delay, :healthcheck_path, :sniffing_path, :bulk_path + attr_reader :logger, :adapter, :sniffing, :sniffer_delay, :resurrect_delay, :healthcheck_path, :sniffing_path, :bulk_path, :sniffing_attributes ROOT_URI_PATH = '/'.freeze LICENSE_PATH = '/_license'.freeze @@ -40,6 +40,7 @@ def message :scheme => 'http', :resurrect_delay => 5, :sniffing => false, + :sniffing_attributes => {}, :sniffer_delay => 10, }.freeze @@ -48,7 +49,7 @@ def initialize(logger, adapter, initial_urls=[], options={}) @adapter = adapter @metric = options[:metric] @initial_urls = initial_urls - + raise ArgumentError, "No URL Normalizer specified!" unless options[:url_normalizer] @url_normalizer = options[:url_normalizer] DEFAULT_OPTIONS.merge(options).tap do |merged| @@ -58,6 +59,7 @@ def initialize(logger, adapter, initial_urls=[], options={}) @resurrect_delay = merged[:resurrect_delay] @sniffing = merged[:sniffing] @sniffer_delay = merged[:sniffer_delay] + @sniffing_attributes = merged[:sniffing_attributes] end # Used for all concurrent operations in this class @@ -71,7 +73,7 @@ def initialize(logger, adapter, initial_urls=[], options={}) def oss? LogStash::Outputs::ElasticSearch.oss? end - + def start update_initial_urls start_resurrectionist @@ -189,15 +191,20 @@ def check_sniff end end end - + def major_version(version_string) version_string.split('.').first.to_i end - + def sniff_5x_and_above(nodes) nodes.map do |id,info| # Skip master-only nodes next if info["roles"] && info["roles"] == ["master"] + # if !@sniffing_attributes.nil? or !@sniffing_attributes.to_h.empty? + if !@sniffing_attributes.to_h.empty? + attributes = info["attributes"].clone.delete_if { |key, value| !@sniffing_attributes.key? key } + next if attributes != @sniffing_attributes + end address_str_to_uri(info["http"]["publish_address"]) if info["http"] end.compact end @@ -215,7 +222,7 @@ def sniff_2x_1x(nodes) nodes.map do |id,info| # TODO Make sure this works with shield. Does that listed # stuff as 'https_address?' - + addr_str = info['http_address'].to_s next unless addr_str # Skip hosts with HTTP disabled @@ -344,7 +351,7 @@ def normalize_url(uri) def update_urls(new_urls) return if new_urls.nil? - + # Normalize URLs new_urls = new_urls.map(&method(:normalize_url)) @@ -374,14 +381,14 @@ def update_urls(new_urls) logger.info("Elasticsearch pool URLs updated", :changes => state_changes) end end - + # Run an inline healthcheck anytime URLs are updated # This guarantees that during startup / post-startup # sniffing we don't have idle periods waiting for the # periodic sniffer to allow new hosts to come online - healthcheck! + healthcheck! end - + def size @state_mutex.synchronize { @url_info.size } end diff --git a/lib/logstash/outputs/elasticsearch/http_client_builder.rb b/lib/logstash/outputs/elasticsearch/http_client_builder.rb index 3cb60bc9b..fb857853d 100644 --- a/lib/logstash/outputs/elasticsearch/http_client_builder.rb +++ b/lib/logstash/outputs/elasticsearch/http_client_builder.rb @@ -22,6 +22,7 @@ def self.build(logger, hosts, params) if params["sniffing"] common_options[:sniffing] = true common_options[:sniffer_delay] = params["sniffing_delay"] + common_options[:sniffing_attributes] = params["sniffing_attributes"] end common_options[:timeout] = params["timeout"] if params["timeout"]