diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb
index 071f8fcef..ccdb092c3 100644
--- a/lib/logstash/outputs/elasticsearch.rb
+++ b/lib/logstash/outputs/elasticsearch.rb
@@ -5,6 +5,7 @@
 require "logstash/json"
 require "stud/buffer"
 require "socket" # for Socket.gethostname
+require "uri" # for escaping user input
 require 'logstash-output-elasticsearch_jars.rb'
 
 # This output lets you store logs in Elasticsearch and is the most recommended
@@ -86,7 +87,10 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
   # The hostname or IP address of the host to use for Elasticsearch unicast discovery
   # This is only required if the normal multicast/cluster discovery stuff won't
   # work in your environment.
-  config :host, :validate => :string
+  #
+  #     "127.0.0.1"
+  #     ["127.0.0.1:9300","127.0.0.2:9300"]
+  config :host, :validate => :array
 
   # The port for Elasticsearch transport to use.
   #
@@ -182,18 +186,28 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
   config :plugins, :validate => :array
 
 
+  # Username and password (HTTP only)
+  config :user, :validate => :string
+  config :password, :validate => :password
+
+  # SSL Configurations (HTTP only)
+  #
+  # Enable SSL
+  config :ssl, :validate => :boolean, :default => false
+
+  # The .cer or .pem file to validate the server's certificate
+  config :cacert, :validate => :path
+
+  # The JKS truststore to validate the server's certificate
+  # Use either :truststore or :cacert
+  config :truststore, :validate => :path
+
+  # Set the truststore password
+  config :truststore_password, :validate => :password
+
   public
   def register
     client_settings = {}
-    client_settings["cluster.name"] = @cluster if @cluster
-    client_settings["network.host"] = @bind_host if @bind_host
-    client_settings["transport.tcp.port"] = @bind_port if @bind_port
-
-    if @node_name
-      client_settings["node.name"] = @node_name
-    else
-      client_settings["node.name"] = "logstash-#{Socket.gethostname}-#{$$}-#{object_id}"
-    end
 
     if @protocol.nil?
       @protocol = LogStash::Environment.jruby? ? "node" : "http"
@@ -214,6 +228,15 @@ def register
 
       # setup log4j properties for Elasticsearch
  #     LogStash::Logger.setup_log4j(@logger)
+       client_settings["cluster.name"] = @cluster if @cluster
+       client_settings["network.host"] = @bind_host if @bind_host
+       client_settings["transport.tcp.port"] = @bind_port if @bind_port
+ 
+       if @node_name
+         client_settings["node.name"] = @node_name
+       else
+         client_settings["node.name"] = "logstash-#{Socket.gethostname}-#{$$}-#{object_id}"
+       end
     end
 
     require "logstash/outputs/elasticsearch/protocol"
@@ -227,22 +250,48 @@ def register
 
     if @host.nil? && @protocol == "http"
       @logger.info("No 'host' set in elasticsearch output. Defaulting to localhost")
-      @host = "localhost"
+      @host = ["localhost"]
     end
 
-    options = {
-      :host => @host,
-      :port => @port,
+    if @ssl
+      if @protocol == "http"
+        @protocol = "https"
+        if @cacert && @truststore
+          raise(LogStash::ConfigurationError, "Use either \"cacert\" or \"truststore\" when configuring the CA certificate") if @truststore
+        end
+        ssl_options = {}
+        if @cacert then
+          @truststore, ssl_options[:truststore_password] = generate_jks @cacert
+        elsif @truststore
+          ssl_options[:truststore_password] = @truststore_password.value if @truststore_password
+        end
+        ssl_options[:truststore] = @truststore
+        client_settings[:ssl] = ssl_options
+      else
+        raise(LogStash::ConfigurationError, "SSL is not supported for '#{@protocol}'. Change the protocol to 'http' if you need SSL.")
+      end
+    end
+
+    common_options = {
+      :protocol => @protocol,
       :client_settings => client_settings
     }
 
+    if @user && @password
+      if @protocol =~ /http/
+        common_options[:user] = ::URI.escape(@user, "@:")
+        common_options[:password] = ::URI.escape(@password.value, "@:")
+      else
+        raise(LogStash::ConfigurationError, "User and password parameters are not supported for '#{@protocol}'. Change the protocol to 'http' if you need them.")
+      end
+    end
 
     client_class = case @protocol
       when "transport"
         LogStash::Outputs::Elasticsearch::Protocols::TransportClient
       when "node"
         LogStash::Outputs::Elasticsearch::Protocols::NodeClient
-      when "http"
+      when /http/
         LogStash::Outputs::Elasticsearch::Protocols::HTTPClient
     end
 
@@ -253,23 +302,50 @@ def register
       # Default @host with embedded to localhost. This should help avoid
       # newbies tripping on ubuntu and other distros that have a default
       # firewall that blocks multicast.
-      @host ||= "localhost"
+      @host ||= ["localhost"]
 
       # Start Elasticsearch local.
       start_local_elasticsearch
     end
 
-    @client = client_class.new(options)
+    @client = Array.new
+
+    if protocol == "node" or @host.nil? # if @protocol is "node" or @host is not set
+      options = {
+          :host => @host,
+          :port => @port,
+      }.merge(common_options)
+      @client << client_class.new(options)
+    else # if @protocol in ["transport","http"]
+      @host.each do |host|
+          (_host,_port) = host.split ":"
+          options = {
+            :host => _host,
+            :port => _port || @port,
+          }.merge(common_options)
+          @logger.info "Create client to elasticsearch server on #{_host}:#{_port}"
+          @client << client_class.new(options)
+      end # @host.each
+    end
+
+    if @manage_template
+      for client in @client
+          begin
+            @logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s)
+            client.template_install(@template_name, get_template, @template_overwrite)
+            break
+          rescue => e
+            @logger.error("Failed to install template: #{e.message}")
+          end
+      end # for @client loop
+    end # if @manage_templates
 
     @logger.info("New Elasticsearch output", :cluster => @cluster,
                  :host => @host, :port => @port, :embedded => @embedded,
                  :protocol => @protocol)
 
-
-    if @manage_template
-      @logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s)
-      @client.template_install(@template_name, get_template, @template_overwrite)
-    end # if @manage_templates
+    @client_idx = 0
+    @current_client = @client[@client_idx]
 
     buffer_initialize(
       :max_items => @flush_size,
@@ -278,6 +354,13 @@ def register
     )
   end # def register
 
+  protected
+  def shift_client
+    @client_idx = (@client_idx+1) % @client.length
+    @current_client = @client[@client_idx]
+    @logger.debug? and @logger.debug("Switched current elasticsearch client to ##{@client_idx} at #{@host[@client_idx]}")
+  end
+
   public
   def get_template
     if @template.nil?
@@ -306,6 +389,29 @@ def start_local_elasticsearch
     @embedded_elasticsearch.start
   end # def start_local_elasticsearch
 
+  private
+  def generate_jks cert_path
+
+    require 'securerandom'
+    require 'tempfile'
+    require 'java'
+    import java.io.FileInputStream
+    import java.io.FileOutputStream
+    import java.security.KeyStore
+    import java.security.cert.CertificateFactory
+
+    jks = java.io.File.createTempFile("cert", ".jks")
+
+    ks = KeyStore.getInstance "JKS"
+    ks.load nil, nil
+    cf = CertificateFactory.getInstance "X.509"
+    cert = cf.generateCertificate FileInputStream.new(cert_path)
+    ks.setCertificateEntry "cacert", cert
+    pwd = SecureRandom.urlsafe_base64(9)
+    ks.store FileOutputStream.new(jks), pwd.to_java.toCharArray
+    [jks.path, pwd]
+  end
+
   public
   def receive(event)
     return unless output?(event)
@@ -324,7 +430,18 @@ def receive(event)
   end # def receive
 
   def flush(actions, teardown=false)
-    @client.bulk(actions)
+    begin
+      @logger.debug? and @logger.debug "Sending bulk of actions to client[#{@client_idx}]: #{@host[@client_idx]}"
+      @current_client.bulk(actions)
+    rescue => e
+      @logger.error "Got error to send bulk of actions to elasticsearch server at #{@host[@client_idx]} : #{e.message}"
+      raise e
+    ensure
+      unless @protocol == "node"
+          @logger.debug? and @logger.debug "Shifting current elasticsearch client"
+          shift_client
+      end
+    end
     # TODO(sissel): Handle errors. Since bulk requests could mostly succeed
     # (aka partially fail), we need to figure out what documents need to be
     # retried.
@@ -333,6 +450,9 @@ def flush(actions, teardown=false)
   end # def flush
 
   def teardown
+    if @cacert # remove temporary jks store created from the cacert
+      File.delete(@truststore)
+    end
     buffer_flush(:final => true)
   end
 
diff --git a/lib/logstash/outputs/elasticsearch/protocol.rb b/lib/logstash/outputs/elasticsearch/protocol.rb
index 1a860e945..2b12ff752 100644
--- a/lib/logstash/outputs/elasticsearch/protocol.rb
+++ b/lib/logstash/outputs/elasticsearch/protocol.rb
@@ -51,37 +51,33 @@ class HTTPClient < Base
       }
 
       def initialize(options={})
-        require "ftw"
         super
         require "elasticsearch" # gem 'elasticsearch-ruby'
+        # manticore http transport
+        require "elasticsearch/transport/transport/http/manticore"
         @options = DEFAULT_OPTIONS.merge(options)
         @client = client
       end
 
       def build_client(options)
-        client = Elasticsearch::Client.new(
-          :host => [options[:host], options[:port]].join(":")
-        )
-
-        # Use FTW to do indexing requests, for now, until we
-        # can identify and resolve performance problems of elasticsearch-ruby
-        @bulk_url = "http://#{options[:host]}:#{options[:port]}/_bulk"
-        @agent = FTW::Agent.new
-
-        return client
-      end
-
-      if ENV["BULK"] == "esruby"
-        def bulk(actions)
-          bulk_esruby(actions)
-        end
-      else
-        def bulk(actions)
-          bulk_ftw(actions)
+        uri = "#{options[:protocol]}://#{options[:host]}:#{options[:port]}"
+
+        client_options = {
+          :host => [uri],
+          :transport_options => options[:client_settings]
+        }
+        client_options[:transport_class] = ::Elasticsearch::Transport::Transport::HTTP::Manticore
+        client_options[:ssl] = client_options[:transport_options].delete(:ssl)
+
+        if options[:user] && options[:password] then
+          token = Base64.strict_encode64(options[:user] + ":" + options[:password])
+          client_options[:headers] = { "Authorization" => "Basic #{token}" }
         end
+
+        Elasticsearch::Client.new client_options
       end
 
-      def bulk_esruby(actions)
+      def bulk(actions)
         @client.bulk(:body => actions.collect do |action, args, source|
           if source
             next [ { action => args }, source ]
@@ -89,44 +85,7 @@ def bulk_esruby(actions)
             next { action => args }
           end
         end.flatten)
-      end # def bulk_esruby
-
-      # Avoid creating a new string for newline every time
-      NEWLINE = "\n".freeze
-      def bulk_ftw(actions)
-        body = actions.collect do |action, args, source|
-          header = { action => args }
-          if source
-            next [ LogStash::Json.dump(header), NEWLINE, LogStash::Json.dump(source), NEWLINE ]
-          else
-            next [ LogStash::Json.dump(header), NEWLINE ]
-          end
-        end.flatten.join("")
-        begin
-          response = @agent.post!(@bulk_url, :body => body)
-        rescue EOFError
-          @logger.warn("EOF while writing request or reading response header from elasticsearch", :host => @host, :port => @port)
-          raise
-        end
-
-        # Consume the body for error checking
-        # This will also free up the connection for reuse.
-        response_body = ""
-        begin
-          response.read_body { |chunk| response_body += chunk }
-        rescue EOFError
-          @logger.warn("EOF while reading response body from elasticsearch",
-                       :url => @bulk_url)
-          raise
-        end
-
-        if response.status != 200
-          @logger.error("Error writing (bulk) to elasticsearch",
-                        :response => response, :response_body => response_body,
-                        :request_body => body)
-          raise "Non-OK response code from Elasticsearch: #{response.status}"
-        end
-      end # def bulk_ftw
+      end # def bulk
 
       def template_exists?(name)
         @client.indices.get_template(:name => name)
@@ -181,16 +140,40 @@ def setup(options={})
       end
 
       def hosts(options)
-        if options[:port].to_s =~ /^\d+-\d+$/
-          # port ranges are 'host[port1-port2]' according to
-          # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/
-          # However, it seems to only query the first port.
-          # So generate our own list of unicast hosts to scan.
-          range = Range.new(*options[:port].split("-"))
-          return range.collect { |p| "#{options[:host]}:#{p}" }.join(",")
+        # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/
+        result = Array.new
+        if options[:host].class == Array
+          options[:host].each do |host|
+            if host.to_s =~ /^.+:.+$/
+              # For host in format: host:port, ignore options[:port]
+              result << host
+            else
+              if options[:port].to_s =~ /^\d+-\d+$/
+                # port ranges are 'host[port1-port2]'
+                result << Range.new(*options[:port].split("-")).collect { |p| "#{host}:#{p}" }
+              else
+                result << "#{host}:#{options[:port]}"
+              end
+            end
+          end
         else
-          return "#{options[:host]}:#{options[:port]}"
+          if options[:host].to_s =~ /^.+:.+$/
+            # For host in format: host:port, ignore options[:port]
+            result << options[:host]
+          else
+            if options[:port].to_s =~ /^\d+-\d+$/
+              # port ranges are 'host[port1-port2]' according to
+              # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/
+              # However, it seems to only query the first port.
+              # So generate our own list of unicast hosts to scan.
+              range = Range.new(*options[:port].split("-"))
+              result << range.collect { |p| "#{options[:host]}:#{p}" }
+            else
+              result << "#{options[:host]}:#{options[:port]}"
+            end
+          end
         end
+        result.flatten.join(",")
       end # def hosts
 
       def build_client(options)
@@ -268,4 +251,3 @@ class Index; end
     class Delete; end
   end
 end
-
diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec
index 48108d920..2e1f1d35d 100644
--- a/logstash-output-elasticsearch.gemspec
+++ b/logstash-output-elasticsearch.gemspec
@@ -23,12 +23,15 @@ Gem::Specification.new do |s|
   s.requirements << "jar 'org.elasticsearch:elasticsearch', '1.3.1'"
 
   # Gem dependencies
-  s.add_runtime_dependency 'elasticsearch'
+  s.add_runtime_dependency 'elasticsearch', ['~> 1.0.6']
   s.add_runtime_dependency 'stud'
   s.add_runtime_dependency 'cabin', ['>=0.6.0']
   s.add_runtime_dependency 'ftw', ['~> 0.0.39']
   s.add_runtime_dependency 'logstash', '>= 1.4.0', '< 2.0.0'
   s.add_runtime_dependency 'jar-dependencies', ['~> 0.0.7']
 
+  if RUBY_PLATFORM == 'java'
+    gem.add_runtime_dependency "manticore"
+  end
 end
 
diff --git a/spec/outputs/elasticsearch.rb b/spec/outputs/elasticsearch.rb
index 836c9ef56..bf949796f 100644
--- a/spec/outputs/elasticsearch.rb
+++ b/spec/outputs/elasticsearch.rb
@@ -346,4 +346,145 @@
       end
     end
   end
+
+  describe "elasticsearch protocol" do
+    # ElasticSearch related jars
+    LogStash::Environment.load_elasticsearch_jars!
+    # Load elasticsearch protocol
+    require "logstash/outputs/elasticsearch/protocol"
+
+    describe "elasticsearch node client" do
+      # Test ElasticSearch Node Client
+      # Reference: http://www.elasticsearch.org/guide/reference/modules/discovery/zen/
+
+      it "should support hosts in both string and array" do
+        # Because we defined *hosts* method in NodeClient as private,
+        # we use *obj.send :method,[args...]* to call method *hosts*
+        client = LogStash::Outputs::Elasticsearch::Protocols::NodeClient.new
+
+        # Node client should support host in string
+        # Case 1: default :host in string
+        insist { client.send :hosts, :host => "host",:port => 9300 } == "host:9300"
+        # Case 2: :port =~ /^\d+_\d+$/
+        insist { client.send :hosts, :host => "host",:port => "9300-9302"} == "host:9300,host:9301,host:9302"
+        # Case 3: :host =~ /^.+:.+$/
+        insist { client.send :hosts, :host => "host:9303",:port => 9300 } == "host:9303"
+        # Case 4:  :host =~ /^.+:.+$/ and :port =~ /^\d+_\d+$/
+        insist { client.send :hosts, :host => "host:9303",:port => "9300-9302"} == "host:9303"
+
+        # Node client should support host in array
+        # Case 5: :host in array with single item
+        insist { client.send :hosts, :host => ["host"],:port => 9300 } == ("host:9300")
+        # Case 6: :host in array with more than one items
+        insist { client.send :hosts, :host => ["host1","host2"],:port => 9300 } == "host1:9300,host2:9300"
+        # Case 7: :host in array with more than one items and :port =~ /^\d+_\d+$/
+        insist { client.send :hosts, :host => ["host1","host2"],:port => "9300-9302" } == "host1:9300,host1:9301,host1:9302,host2:9300,host2:9301,host2:9302"
+        # Case 8: :host in array with more than one items and some :host =~ /^.+:.+$/
+        insist { client.send :hosts, :host => ["host1","host2:9303"],:port => 9300 } == "host1:9300,host2:9303"
+        # Case 9: :host in array with more than one items, :port =~ /^\d+_\d+$/ and some :host =~ /^.+:.+$/
+        insist { client.send :hosts, :host => ["host1","host2:9303"],:port => "9300-9302" } == "host1:9300,host1:9301,host1:9302,host2:9303"
+      end
+    end
+  end
+
+  describe "Authentication option" do
+    ["node", "transport"].each do |protocol|
+      context "with protocol => #{protocol}" do
+        subject do
+          require "logstash/outputs/elasticsearch"
+          settings = {
+            "protocol" => protocol,
+            "node_name" => "logstash",
+            "cluster" => "elasticsearch",
+            "host" => "node01",
+            "user" => "test",
+            "password" => "test"
+          }
+          next LogStash::Outputs::ElasticSearch.new(settings)
+        end
+
+        it "should fail in register" do
+          expect {subject.register}.to raise_error
+        end
+      end
+    end
+  end
+
+  describe "SSL option" do
+    ["node", "transport"].each do |protocol|
+      context "with protocol => #{protocol}" do
+        subject do
+          require "logstash/outputs/elasticsearch"
+          settings = {
+            "protocol" => protocol,
+            "node_name" => "logstash",
+            "cluster" => "elasticsearch",
+            "host" => "node01",
+            "ssl" => true
+          }
+          next LogStash::Outputs::ElasticSearch.new(settings)
+        end
+
+        it "should fail in register" do
+          expect {subject.register}.to raise_error
+        end
+      end
+    end
+  end
+
+  describe "send messages to ElasticSearch using HTTPS", :elasticsearch_secure => true do
+    subject do
+      require "logstash/outputs/elasticsearch"
+      settings = {
+        "protocol" => "http",
+        "node_name" => "logstash",
+        "cluster" => "elasticsearch",
+        "host" => "node01",
+        "user" => "user",
+        "password" => "changeme",
+        "ssl" => true,
+        "cacert" => "/tmp/ca/certs/cacert.pem",
+        # or
+        #"truststore" => "/tmp/ca/truststore.jks",
+        #"truststore_password" => "testeteste"
+      }
+      next LogStash::Outputs::ElasticSearch.new(settings)
+    end
+
+    before :each do
+      subject.register
+    end
+
+    it "sends events to ES" do
+      expect {
+        subject.receive(LogStash::Event.new("message" => "sample message here"))
+        subject.buffer_flush(:final => true)
+      }.to_not raise_error
+    end
+  end
+
+  describe "connect using HTTP Authentication", :elasticsearch_secure => true do
+    subject do
+      require "logstash/outputs/elasticsearch"
+      settings = {
+        "protocol" => "http",
+        "cluster" => "elasticsearch",
+        "host" => "node01",
+        "user" => "user",
+        "password" => "changeme",
+      }
+      next LogStash::Outputs::ElasticSearch.new(settings)
+    end
+
+    before :each do
+      subject.register
+    end
+
+    it "sends events to ES" do
+      expect {
+        subject.receive(LogStash::Event.new("message" => "sample message here"))
+        subject.buffer_flush(:final => true)
+      }.to_not raise_error
+    end
+  end
 end