From 1f4449ba02b7b859548499c0f577d1ef483dc1c9 Mon Sep 17 00:00:00 2001 From: "Chen,Hao" Date: Fri, 26 Sep 2014 06:24:31 +0000 Subject: [PATCH 1/2] ElasticSearch output plugin to support multiple hosts and enhance stability. (by Hao Chen) Closes #1791 --- lib/logstash/outputs/elasticsearch.rb | 77 +++++++++++++++---- .../outputs/elasticsearch/protocol.rb | 40 ++++++++-- spec/outputs/elasticsearch.rb | 40 ++++++++++ 3 files changed, 132 insertions(+), 25 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 071f8fcef..d7939fb52 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -86,7 +86,10 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # The hostname or IP address of the host to use for Elasticsearch unicast discovery # This is only required if the normal multicast/cluster discovery stuff won't # work in your environment. - config :host, :validate => :string + # + # "127.0.0.1" + # ["127.0.0.1:9300","127.0.0.2:9300"] + config :host, :validate => :array # The port for Elasticsearch transport to use. # @@ -227,16 +230,9 @@ def register if @host.nil? && @protocol == "http" @logger.info("No 'host' set in elasticsearch output. Defaulting to localhost") - @host = "localhost" + @host = ["localhost"] end - options = { - :host => @host, - :port => @port, - :client_settings => client_settings - } - - client_class = case @protocol when "transport" LogStash::Outputs::Elasticsearch::Protocols::TransportClient @@ -253,23 +249,52 @@ def register # Default @host with embedded to localhost. This should help avoid # newbies tripping on ubuntu and other distros that have a default # firewall that blocks multicast. - @host ||= "localhost" + @host ||= ["localhost"] # Start Elasticsearch local. start_local_elasticsearch end - @client = client_class.new(options) + @client = Array.new + + if protocol == "node" or @host.nil? # if @protocol is "node" or @host is not set + options = { + :host => @host, + :port => @port, + :client_settings => client_settings + } + @client << client_class.new(options) + else # if @protocol in ["transport","http"] + @host.each do |host| + (_host,_port) = host.split ":" + options = { + :host => _host, + :port => _port || @port, + :client_settings => client_settings + } + @logger.info "Create client to elasticsearch server on #{_host}:#{_port}" + @client << client_class.new(options) + end # @host.each + end + + if @manage_template + for client in @client + begin + @logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s) + client.template_install(@template_name, get_template, @template_overwrite) + break + rescue => e + @logger.error("Failed to install template: #{e.message}") + end + end # for @client loop + end # if @manage_templates @logger.info("New Elasticsearch output", :cluster => @cluster, :host => @host, :port => @port, :embedded => @embedded, :protocol => @protocol) - - if @manage_template - @logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s) - @client.template_install(@template_name, get_template, @template_overwrite) - end # if @manage_templates + @client_idx = 0 + @current_client = @client[@client_idx] buffer_initialize( :max_items => @flush_size, @@ -278,6 +303,13 @@ def register ) end # def register + protected + def shift_client + @client_idx = (@client_idx+1) % @client.length + @current_client = @client[@client_idx] + @logger.debug? and @logger.debug("Switched current elasticsearch client to ##{@client_idx} at #{@host[@client_idx]}") + end + public def get_template if @template.nil? @@ -324,7 +356,18 @@ def receive(event) end # def receive def flush(actions, teardown=false) - @client.bulk(actions) + begin + @logger.debug? and @logger.debug "Sending bulk of actions to client[#{@client_idx}]: #{@host[@client_idx]}" + @current_client.bulk(actions) + rescue => e + @logger.error "Got error to send bulk of actions to elasticsearch server at #{@host[@client_idx]} : #{e.message}" + raise e + ensure + unless @protocol == "node" + @logger.debug? and @logger.debug "Shifting current elasticsearch client" + shift_client + end + end # TODO(sissel): Handle errors. Since bulk requests could mostly succeed # (aka partially fail), we need to figure out what documents need to be # retried. diff --git a/lib/logstash/outputs/elasticsearch/protocol.rb b/lib/logstash/outputs/elasticsearch/protocol.rb index 1a860e945..d0747980d 100644 --- a/lib/logstash/outputs/elasticsearch/protocol.rb +++ b/lib/logstash/outputs/elasticsearch/protocol.rb @@ -181,16 +181,40 @@ def setup(options={}) end def hosts(options) - if options[:port].to_s =~ /^\d+-\d+$/ - # port ranges are 'host[port1-port2]' according to - # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/ - # However, it seems to only query the first port. - # So generate our own list of unicast hosts to scan. - range = Range.new(*options[:port].split("-")) - return range.collect { |p| "#{options[:host]}:#{p}" }.join(",") + # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/ + result = Array.new + if options[:host].class == Array + options[:host].each do |host| + if host.to_s =~ /^.+:.+$/ + # For host in format: host:port, ignore options[:port] + result << host + else + if options[:port].to_s =~ /^\d+-\d+$/ + # port ranges are 'host[port1-port2]' + result << Range.new(*options[:port].split("-")).collect { |p| "#{host}:#{p}" } + else + result << "#{host}:#{options[:port]}" + end + end + end else - return "#{options[:host]}:#{options[:port]}" + if options[:host].to_s =~ /^.+:.+$/ + # For host in format: host:port, ignore options[:port] + result << options[:host] + else + if options[:port].to_s =~ /^\d+-\d+$/ + # port ranges are 'host[port1-port2]' according to + # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/ + # However, it seems to only query the first port. + # So generate our own list of unicast hosts to scan. + range = Range.new(*options[:port].split("-")) + result << range.collect { |p| "#{options[:host]}:#{p}" } + else + result << "#{options[:host]}:#{options[:port]}" + end + end end + result.flatten.join(",") end # def hosts def build_client(options) diff --git a/spec/outputs/elasticsearch.rb b/spec/outputs/elasticsearch.rb index 836c9ef56..2a6f0a77b 100644 --- a/spec/outputs/elasticsearch.rb +++ b/spec/outputs/elasticsearch.rb @@ -346,4 +346,44 @@ end end end + + describe "elasticsearch protocol" do + # ElasticSearch related jars + LogStash::Environment.load_elasticsearch_jars! + # Load elasticsearch protocol + require "logstash/outputs/elasticsearch/protocol" + + describe "elasticsearch node client" do + # Test ElasticSearch Node Client + # Reference: http://www.elasticsearch.org/guide/reference/modules/discovery/zen/ + + it "should support hosts in both string and array" do + # Because we defined *hosts* method in NodeClient as private, + # we use *obj.send :method,[args...]* to call method *hosts* + client = LogStash::Outputs::Elasticsearch::Protocols::NodeClient.new + + # Node client should support host in string + # Case 1: default :host in string + insist { client.send :hosts, :host => "host",:port => 9300 } == "host:9300" + # Case 2: :port =~ /^\d+_\d+$/ + insist { client.send :hosts, :host => "host",:port => "9300-9302"} == "host:9300,host:9301,host:9302" + # Case 3: :host =~ /^.+:.+$/ + insist { client.send :hosts, :host => "host:9303",:port => 9300 } == "host:9303" + # Case 4: :host =~ /^.+:.+$/ and :port =~ /^\d+_\d+$/ + insist { client.send :hosts, :host => "host:9303",:port => "9300-9302"} == "host:9303" + + # Node client should support host in array + # Case 5: :host in array with single item + insist { client.send :hosts, :host => ["host"],:port => 9300 } == ("host:9300") + # Case 6: :host in array with more than one items + insist { client.send :hosts, :host => ["host1","host2"],:port => 9300 } == "host1:9300,host2:9300" + # Case 7: :host in array with more than one items and :port =~ /^\d+_\d+$/ + insist { client.send :hosts, :host => ["host1","host2"],:port => "9300-9302" } == "host1:9300,host1:9301,host1:9302,host2:9300,host2:9301,host2:9302" + # Case 8: :host in array with more than one items and some :host =~ /^.+:.+$/ + insist { client.send :hosts, :host => ["host1","host2:9303"],:port => 9300 } == "host1:9300,host2:9303" + # Case 9: :host in array with more than one items, :port =~ /^\d+_\d+$/ and some :host =~ /^.+:.+$/ + insist { client.send :hosts, :host => ["host1","host2:9303"],:port => "9300-9302" } == "host1:9300,host1:9301,host1:9302,host2:9303" + end + end + end end From bedbba9f7ffc012afc5e1cc04b716f5815e67a69 Mon Sep 17 00:00:00 2001 From: Joao Duarte Date: Fri, 31 Oct 2014 11:36:47 +0000 Subject: [PATCH 2/2] Add HTTP Auth and SSL to the ES output plugin The typical use case is the placement of a transparent proxy in front of ES. This PR enables basic Auth and HTTPS to be configured independently. This patch makes logstash use only elasticsearch-ruby gem when writing to elasticsearch. Also leverages the manticore http transport that ships with es-ruby for the basic http auth and ssl features. It's possible to configure the CA certificate using either a .pem/.cer file ("cacert" option), or a .jks truststore ("truststore" option). --- lib/logstash/outputs/elasticsearch.rb | 105 +++++++++++++++--- .../outputs/elasticsearch/protocol.rb | 78 +++---------- logstash-output-elasticsearch.gemspec | 5 +- spec/outputs/elasticsearch.rb | 101 +++++++++++++++++ 4 files changed, 214 insertions(+), 75 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index d7939fb52..ccdb092c3 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -5,6 +5,7 @@ require "logstash/json" require "stud/buffer" require "socket" # for Socket.gethostname +require "uri" # for escaping user input require 'logstash-output-elasticsearch_jars.rb' # This output lets you store logs in Elasticsearch and is the most recommended @@ -185,18 +186,28 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base config :plugins, :validate => :array + # Username and password (HTTP only) + config :user, :validate => :string + config :password, :validate => :password + + # SSL Configurations (HTTP only) + # + # Enable SSL + config :ssl, :validate => :boolean, :default => false + + # The .cer or .pem file to validate the server's certificate + config :cacert, :validate => :path + + # The JKS truststore to validate the server's certificate + # Use either :truststore or :cacert + config :truststore, :validate => :path + + # Set the truststore password + config :truststore_password, :validate => :password + public def register client_settings = {} - client_settings["cluster.name"] = @cluster if @cluster - client_settings["network.host"] = @bind_host if @bind_host - client_settings["transport.tcp.port"] = @bind_port if @bind_port - - if @node_name - client_settings["node.name"] = @node_name - else - client_settings["node.name"] = "logstash-#{Socket.gethostname}-#{$$}-#{object_id}" - end if @protocol.nil? @protocol = LogStash::Environment.jruby? ? "node" : "http" @@ -217,6 +228,15 @@ def register # setup log4j properties for Elasticsearch # LogStash::Logger.setup_log4j(@logger) + client_settings["cluster.name"] = @cluster if @cluster + client_settings["network.host"] = @bind_host if @bind_host + client_settings["transport.tcp.port"] = @bind_port if @bind_port + + if @node_name + client_settings["node.name"] = @node_name + else + client_settings["node.name"] = "logstash-#{Socket.gethostname}-#{$$}-#{object_id}" + end end require "logstash/outputs/elasticsearch/protocol" @@ -233,12 +253,45 @@ def register @host = ["localhost"] end + if @ssl + if @protocol == "http" + @protocol = "https" + if @cacert && @truststore + raise(LogStash::ConfigurationError, "Use either \"cacert\" or \"truststore\" when configuring the CA certificate") if @truststore + end + ssl_options = {} + if @cacert then + @truststore, ssl_options[:truststore_password] = generate_jks @cacert + elsif @truststore + ssl_options[:truststore_password] = @truststore_password.value if @truststore_password + end + ssl_options[:truststore] = @truststore + client_settings[:ssl] = ssl_options + else + raise(LogStash::ConfigurationError, "SSL is not supported for '#{@protocol}'. Change the protocol to 'http' if you need SSL.") + end + end + + common_options = { + :protocol => @protocol, + :client_settings => client_settings + } + + if @user && @password + if @protocol =~ /http/ + common_options[:user] = ::URI.escape(@user, "@:") + common_options[:password] = ::URI.escape(@password.value, "@:") + else + raise(LogStash::ConfigurationError, "User and password parameters are not supported for '#{@protocol}'. Change the protocol to 'http' if you need them.") + end + end + client_class = case @protocol when "transport" LogStash::Outputs::Elasticsearch::Protocols::TransportClient when "node" LogStash::Outputs::Elasticsearch::Protocols::NodeClient - when "http" + when /http/ LogStash::Outputs::Elasticsearch::Protocols::HTTPClient end @@ -261,8 +314,7 @@ def register options = { :host => @host, :port => @port, - :client_settings => client_settings - } + }.merge(common_options) @client << client_class.new(options) else # if @protocol in ["transport","http"] @host.each do |host| @@ -270,8 +322,7 @@ def register options = { :host => _host, :port => _port || @port, - :client_settings => client_settings - } + }.merge(common_options) @logger.info "Create client to elasticsearch server on #{_host}:#{_port}" @client << client_class.new(options) end # @host.each @@ -338,6 +389,29 @@ def start_local_elasticsearch @embedded_elasticsearch.start end # def start_local_elasticsearch + private + def generate_jks cert_path + + require 'securerandom' + require 'tempfile' + require 'java' + import java.io.FileInputStream + import java.io.FileOutputStream + import java.security.KeyStore + import java.security.cert.CertificateFactory + + jks = java.io.File.createTempFile("cert", ".jks") + + ks = KeyStore.getInstance "JKS" + ks.load nil, nil + cf = CertificateFactory.getInstance "X.509" + cert = cf.generateCertificate FileInputStream.new(cert_path) + ks.setCertificateEntry "cacert", cert + pwd = SecureRandom.urlsafe_base64(9) + ks.store FileOutputStream.new(jks), pwd.to_java.toCharArray + [jks.path, pwd] + end + public def receive(event) return unless output?(event) @@ -376,6 +450,9 @@ def flush(actions, teardown=false) end # def flush def teardown + if @cacert # remove temporary jks store created from the cacert + File.delete(@truststore) + end buffer_flush(:final => true) end diff --git a/lib/logstash/outputs/elasticsearch/protocol.rb b/lib/logstash/outputs/elasticsearch/protocol.rb index d0747980d..2b12ff752 100644 --- a/lib/logstash/outputs/elasticsearch/protocol.rb +++ b/lib/logstash/outputs/elasticsearch/protocol.rb @@ -51,37 +51,33 @@ class HTTPClient < Base } def initialize(options={}) - require "ftw" super require "elasticsearch" # gem 'elasticsearch-ruby' + # manticore http transport + require "elasticsearch/transport/transport/http/manticore" @options = DEFAULT_OPTIONS.merge(options) @client = client end def build_client(options) - client = Elasticsearch::Client.new( - :host => [options[:host], options[:port]].join(":") - ) - - # Use FTW to do indexing requests, for now, until we - # can identify and resolve performance problems of elasticsearch-ruby - @bulk_url = "http://#{options[:host]}:#{options[:port]}/_bulk" - @agent = FTW::Agent.new - - return client - end - - if ENV["BULK"] == "esruby" - def bulk(actions) - bulk_esruby(actions) - end - else - def bulk(actions) - bulk_ftw(actions) + uri = "#{options[:protocol]}://#{options[:host]}:#{options[:port]}" + + client_options = { + :host => [uri], + :transport_options => options[:client_settings] + } + client_options[:transport_class] = ::Elasticsearch::Transport::Transport::HTTP::Manticore + client_options[:ssl] = client_options[:transport_options].delete(:ssl) + + if options[:user] && options[:password] then + token = Base64.strict_encode64(options[:user] + ":" + options[:password]) + client_options[:headers] = { "Authorization" => "Basic #{token}" } end + + Elasticsearch::Client.new client_options end - def bulk_esruby(actions) + def bulk(actions) @client.bulk(:body => actions.collect do |action, args, source| if source next [ { action => args }, source ] @@ -89,44 +85,7 @@ def bulk_esruby(actions) next { action => args } end end.flatten) - end # def bulk_esruby - - # Avoid creating a new string for newline every time - NEWLINE = "\n".freeze - def bulk_ftw(actions) - body = actions.collect do |action, args, source| - header = { action => args } - if source - next [ LogStash::Json.dump(header), NEWLINE, LogStash::Json.dump(source), NEWLINE ] - else - next [ LogStash::Json.dump(header), NEWLINE ] - end - end.flatten.join("") - begin - response = @agent.post!(@bulk_url, :body => body) - rescue EOFError - @logger.warn("EOF while writing request or reading response header from elasticsearch", :host => @host, :port => @port) - raise - end - - # Consume the body for error checking - # This will also free up the connection for reuse. - response_body = "" - begin - response.read_body { |chunk| response_body += chunk } - rescue EOFError - @logger.warn("EOF while reading response body from elasticsearch", - :url => @bulk_url) - raise - end - - if response.status != 200 - @logger.error("Error writing (bulk) to elasticsearch", - :response => response, :response_body => response_body, - :request_body => body) - raise "Non-OK response code from Elasticsearch: #{response.status}" - end - end # def bulk_ftw + end # def bulk def template_exists?(name) @client.indices.get_template(:name => name) @@ -292,4 +251,3 @@ class Index; end class Delete; end end end - diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index 48108d920..2e1f1d35d 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -23,12 +23,15 @@ Gem::Specification.new do |s| s.requirements << "jar 'org.elasticsearch:elasticsearch', '1.3.1'" # Gem dependencies - s.add_runtime_dependency 'elasticsearch' + s.add_runtime_dependency 'elasticsearch', ['~> 1.0.6'] s.add_runtime_dependency 'stud' s.add_runtime_dependency 'cabin', ['>=0.6.0'] s.add_runtime_dependency 'ftw', ['~> 0.0.39'] s.add_runtime_dependency 'logstash', '>= 1.4.0', '< 2.0.0' s.add_runtime_dependency 'jar-dependencies', ['~> 0.0.7'] + if RUBY_PLATFORM == 'java' + gem.add_runtime_dependency "manticore" + end end diff --git a/spec/outputs/elasticsearch.rb b/spec/outputs/elasticsearch.rb index 2a6f0a77b..bf949796f 100644 --- a/spec/outputs/elasticsearch.rb +++ b/spec/outputs/elasticsearch.rb @@ -386,4 +386,105 @@ end end end + + describe "Authentication option" do + ["node", "transport"].each do |protocol| + context "with protocol => #{protocol}" do + subject do + require "logstash/outputs/elasticsearch" + settings = { + "protocol" => protocol, + "node_name" => "logstash", + "cluster" => "elasticsearch", + "host" => "node01", + "user" => "test", + "password" => "test" + } + next LogStash::Outputs::ElasticSearch.new(settings) + end + + it "should fail in register" do + expect {subject.register}.to raise_error + end + end + end + end + + describe "SSL option" do + ["node", "transport"].each do |protocol| + context "with protocol => #{protocol}" do + subject do + require "logstash/outputs/elasticsearch" + settings = { + "protocol" => protocol, + "node_name" => "logstash", + "cluster" => "elasticsearch", + "host" => "node01", + "ssl" => true + } + next LogStash::Outputs::ElasticSearch.new(settings) + end + + it "should fail in register" do + expect {subject.register}.to raise_error + end + end + end + end + + describe "send messages to ElasticSearch using HTTPS", :elasticsearch_secure => true do + subject do + require "logstash/outputs/elasticsearch" + settings = { + "protocol" => "http", + "node_name" => "logstash", + "cluster" => "elasticsearch", + "host" => "node01", + "user" => "user", + "password" => "changeme", + "ssl" => true, + "cacert" => "/tmp/ca/certs/cacert.pem", + # or + #"truststore" => "/tmp/ca/truststore.jks", + #"truststore_password" => "testeteste" + } + next LogStash::Outputs::ElasticSearch.new(settings) + end + + before :each do + subject.register + end + + it "sends events to ES" do + expect { + subject.receive(LogStash::Event.new("message" => "sample message here")) + subject.buffer_flush(:final => true) + }.to_not raise_error + end + end + + describe "connect using HTTP Authentication", :elasticsearch_secure => true do + subject do + require "logstash/outputs/elasticsearch" + settings = { + "protocol" => "http", + "cluster" => "elasticsearch", + "host" => "node01", + "user" => "user", + "password" => "changeme", + } + next LogStash::Outputs::ElasticSearch.new(settings) + end + + before :each do + subject.register + end + + it "sends events to ES" do + expect { + subject.receive(LogStash::Event.new("message" => "sample message here")) + subject.buffer_flush(:final => true) + }.to_not raise_error + end + end end