Skip to content

Commit b58934f

Browse files
committed
Support routing option for documents
The bulk API supports a specified `_routing` value for document placement within a shard in an index. This should be exposed for custom routing for aliases, filtering, etc. http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-routing This also respects the move away from the routing parameter within the event as defined in: elastic/elasticsearch#6730
1 parent 17b9e19 commit b58934f

File tree

1 file changed

+16
-12
lines changed

1 file changed

+16
-12
lines changed

lib/logstash/outputs/elasticsearch.rb

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
4949
# similar events to the same 'type'. String expansion `%{foo}` works here.
5050
config :index_type, :validate => :string
5151

52+
# A routing override to be applied to all processed events.
53+
# This can be dynamic using the `%{foo}` syntax.
54+
config :routing, :validate => :string
55+
5256
# Starting in Logstash 1.3 (unless you set option `manage_template` to false)
5357
# a default mapping template for Elasticsearch will be applied, if you do not
5458
# already have one set to match the index pattern defined (default of
@@ -117,7 +121,7 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
117121
# Run the Elasticsearch server embedded in this process.
118122
# This option is useful if you want to run a single Logstash process that
119123
# handles log processing and indexing; it saves you from needing to run
120-
# a separate Elasticsearch process. An example use case is
124+
# a separate Elasticsearch process. An example use case is
121125
# proof-of-concept testing.
122126
# WARNING: This is not recommended for production use!
123127
config :embedded, :validate => :boolean, :default => false
@@ -250,7 +254,7 @@ def register
250254
client_settings["network.host"] = @bind_host if @bind_host
251255
client_settings["transport.tcp.port"] = @bind_port if @bind_port
252256
client_settings["client.transport.sniff"] = @sniffing
253-
257+
254258
if @node_name
255259
client_settings["node.name"] = @node_name
256260
else
@@ -382,7 +386,7 @@ def get_template
382386
def receive(event)
383387
return unless output?(event)
384388

385-
# block until we have not maxed out our
389+
# block until we have not maxed out our
386390
# retry queue. This is applying back-pressure
387391
# to slow down the receive-rate
388392
@retry_flush_mutex.synchronize {
@@ -393,11 +397,11 @@ def receive(event)
393397

394398
# Set the 'type' value for the index.
395399
type = @index_type ? event.sprintf(@index_type) : (event["type"] || "logs")
396-
397400
index = event.sprintf(@index)
398-
399401
document_id = @document_id ? event.sprintf(@document_id) : nil
400-
buffer_receive([event.sprintf(@action), { :_id => document_id, :_index => index, :_type => type }, event])
402+
routing = @routing ? event.sprintf(@routing) : nil
403+
404+
buffer_receive([event.sprintf(@action), { :_id => document_id, :_index => index, :_type => type, :_routing => routing }, event])
401405
end # def receive
402406

403407
public
@@ -447,19 +451,19 @@ def teardown
447451

448452
@retry_teardown_requested.make_true
449453
# First, make sure retry_timer_thread is stopped
450-
# to ensure we do not signal a retry based on
454+
# to ensure we do not signal a retry based on
451455
# the retry interval.
452456
Thread.kill(@retry_timer_thread)
453457
@retry_timer_thread.join
454-
# Signal flushing in the case that #retry_flush is in
458+
# Signal flushing in the case that #retry_flush is in
455459
# the process of waiting for a signal.
456460
@retry_flush_mutex.synchronize { @retry_queue_needs_flushing.signal }
457-
# Now, #retry_flush is ensured to not be in a state of
461+
# Now, #retry_flush is ensured to not be in a state of
458462
# waiting and can be safely joined into the main thread
459463
# for further final execution of an in-process remaining call.
460464
@retry_thread.join
461465

462-
# execute any final actions along with a proceeding retry for any
466+
# execute any final actions along with a proceeding retry for any
463467
# final actions that did not succeed.
464468
buffer_flush(:final => true)
465469
retry_flush
@@ -544,11 +548,11 @@ def generate_jks cert_path
544548
end
545549

546550
private
547-
# in charge of submitting any actions in @retry_queue that need to be
551+
# in charge of submitting any actions in @retry_queue that need to be
548552
# retried
549553
#
550554
# This method is not called concurrently. It is only called by @retry_thread
551-
# and once that thread is ended during the teardown process, a final call
555+
# and once that thread is ended during the teardown process, a final call
552556
# to this method is done upon teardown in the main thread.
553557
def retry_flush()
554558
unless @retry_queue.empty?

0 commit comments

Comments
 (0)