Skip to content

Add HTTP Auth and SSL to the ES output plugin #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 31, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 143 additions & 23 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require "logstash/json"
require "stud/buffer"
require "socket" # for Socket.gethostname
require "uri" # for escaping user input
require 'logstash-output-elasticsearch_jars.rb'

# This output lets you store logs in Elasticsearch and is the most recommended
Expand Down Expand Up @@ -86,7 +87,10 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# The hostname or IP address of the host to use for Elasticsearch unicast discovery
# This is only required if the normal multicast/cluster discovery stuff won't
# work in your environment.
config :host, :validate => :string
#
# "127.0.0.1"
# ["127.0.0.1:9300","127.0.0.2:9300"]
config :host, :validate => :array

# The port for Elasticsearch transport to use.
#
Expand Down Expand Up @@ -182,18 +186,28 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
config :plugins, :validate => :array


# Username and password (HTTP only)
config :user, :validate => :string
config :password, :validate => :password

# SSL Configurations (HTTP only)
#
# Enable SSL
config :ssl, :validate => :boolean, :default => false

# The .cer or .pem file to validate the server's certificate
config :cacert, :validate => :path

# The JKS truststore to validate the server's certificate
# Use either :truststore or :cacert
config :truststore, :validate => :path

# Set the truststore password
config :truststore_password, :validate => :password

public
def register
client_settings = {}
client_settings["cluster.name"] = @cluster if @cluster
client_settings["network.host"] = @bind_host if @bind_host
client_settings["transport.tcp.port"] = @bind_port if @bind_port

if @node_name
client_settings["node.name"] = @node_name
else
client_settings["node.name"] = "logstash-#{Socket.gethostname}-#{$$}-#{object_id}"
end

if @protocol.nil?
@protocol = LogStash::Environment.jruby? ? "node" : "http"
Expand All @@ -214,6 +228,15 @@ def register

# setup log4j properties for Elasticsearch
# LogStash::Logger.setup_log4j(@logger)
client_settings["cluster.name"] = @cluster if @cluster
client_settings["network.host"] = @bind_host if @bind_host
client_settings["transport.tcp.port"] = @bind_port if @bind_port

if @node_name
client_settings["node.name"] = @node_name
else
client_settings["node.name"] = "logstash-#{Socket.gethostname}-#{$$}-#{object_id}"
end
end

require "logstash/outputs/elasticsearch/protocol"
Expand All @@ -227,22 +250,48 @@ def register

if @host.nil? && @protocol == "http"
@logger.info("No 'host' set in elasticsearch output. Defaulting to localhost")
@host = "localhost"
@host = ["localhost"]
end

options = {
:host => @host,
:port => @port,
if @ssl
if @protocol == "http"
@protocol = "https"
if @cacert && @truststore
raise(LogStash::ConfigurationError, "Use either \"cacert\" or \"truststore\" when configuring the CA certificate") if @truststore
end
ssl_options = {}
if @cacert then
@truststore, ssl_options[:truststore_password] = generate_jks @cacert
elsif @truststore
ssl_options[:truststore_password] = @truststore_password.value if @truststore_password
end
ssl_options[:truststore] = @truststore
client_settings[:ssl] = ssl_options
else
raise(LogStash::ConfigurationError, "SSL is not supported for '#{@protocol}'. Change the protocol to 'http' if you need SSL.")
end
end

common_options = {
:protocol => @protocol,
:client_settings => client_settings
}

if @user && @password
if @protocol =~ /http/
common_options[:user] = ::URI.escape(@user, "@:")
common_options[:password] = ::URI.escape(@password.value, "@:")
else
raise(LogStash::ConfigurationError, "User and password parameters are not supported for '#{@protocol}'. Change the protocol to 'http' if you need them.")
end
end

client_class = case @protocol
when "transport"
LogStash::Outputs::Elasticsearch::Protocols::TransportClient
when "node"
LogStash::Outputs::Elasticsearch::Protocols::NodeClient
when "http"
when /http/
LogStash::Outputs::Elasticsearch::Protocols::HTTPClient
end

Expand All @@ -253,23 +302,50 @@ def register
# Default @host with embedded to localhost. This should help avoid
# newbies tripping on ubuntu and other distros that have a default
# firewall that blocks multicast.
@host ||= "localhost"
@host ||= ["localhost"]

# Start Elasticsearch local.
start_local_elasticsearch
end

@client = client_class.new(options)
@client = Array.new

if protocol == "node" or @host.nil? # if @protocol is "node" or @host is not set
options = {
:host => @host,
:port => @port,
}.merge(common_options)
@client << client_class.new(options)
else # if @protocol in ["transport","http"]
@host.each do |host|
(_host,_port) = host.split ":"
options = {
:host => _host,
:port => _port || @port,
}.merge(common_options)
@logger.info "Create client to elasticsearch server on #{_host}:#{_port}"
@client << client_class.new(options)
end # @host.each
end

if @manage_template
for client in @client
begin
@logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s)
client.template_install(@template_name, get_template, @template_overwrite)
break
rescue => e
@logger.error("Failed to install template: #{e.message}")
end
end # for @client loop
end # if @manage_templates

@logger.info("New Elasticsearch output", :cluster => @cluster,
:host => @host, :port => @port, :embedded => @embedded,
:protocol => @protocol)


if @manage_template
@logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s)
@client.template_install(@template_name, get_template, @template_overwrite)
end # if @manage_templates
@client_idx = 0
@current_client = @client[@client_idx]

buffer_initialize(
:max_items => @flush_size,
Expand All @@ -278,6 +354,13 @@ def register
)
end # def register

protected
def shift_client
@client_idx = (@client_idx+1) % @client.length
@current_client = @client[@client_idx]
@logger.debug? and @logger.debug("Switched current elasticsearch client to ##{@client_idx} at #{@host[@client_idx]}")
end

public
def get_template
if @template.nil?
Expand Down Expand Up @@ -306,6 +389,29 @@ def start_local_elasticsearch
@embedded_elasticsearch.start
end # def start_local_elasticsearch

private
def generate_jks cert_path

require 'securerandom'
require 'tempfile'
require 'java'
import java.io.FileInputStream
import java.io.FileOutputStream
import java.security.KeyStore
import java.security.cert.CertificateFactory

jks = java.io.File.createTempFile("cert", ".jks")

ks = KeyStore.getInstance "JKS"
ks.load nil, nil
cf = CertificateFactory.getInstance "X.509"
cert = cf.generateCertificate FileInputStream.new(cert_path)
ks.setCertificateEntry "cacert", cert
pwd = SecureRandom.urlsafe_base64(9)
ks.store FileOutputStream.new(jks), pwd.to_java.toCharArray
[jks.path, pwd]
end

public
def receive(event)
return unless output?(event)
Expand All @@ -324,7 +430,18 @@ def receive(event)
end # def receive

def flush(actions, teardown=false)
@client.bulk(actions)
begin
@logger.debug? and @logger.debug "Sending bulk of actions to client[#{@client_idx}]: #{@host[@client_idx]}"
@current_client.bulk(actions)
rescue => e
@logger.error "Got error to send bulk of actions to elasticsearch server at #{@host[@client_idx]} : #{e.message}"
raise e
ensure
unless @protocol == "node"
@logger.debug? and @logger.debug "Shifting current elasticsearch client"
shift_client
end
end
# TODO(sissel): Handle errors. Since bulk requests could mostly succeed
# (aka partially fail), we need to figure out what documents need to be
# retried.
Expand All @@ -333,6 +450,9 @@ def flush(actions, teardown=false)
end # def flush

def teardown
if @cacert # remove temporary jks store created from the cacert
File.delete(@truststore)
end
buffer_flush(:final => true)
end

Expand Down
Loading