Skip to content

Commit cf1c2e0

Browse files
committed
Re-shape the logic to avoid unnecessary usage of Tuple, instead plain Event is considered. Giving a noti to users when DQL enabled and failed events sent to it.
1 parent 22c868d commit cf1c2e0

File tree

3 files changed

+30
-12
lines changed

3 files changed

+30
-12
lines changed

Diff for: lib/logstash/outputs/elasticsearch.rb

+13-9
Original file line numberDiff line numberDiff line change
@@ -372,29 +372,33 @@ def multi_receive(events)
372372
end
373373
end
374374

375-
# @param: Arrays of EventActionTuple with error messages
375+
# @param: Arrays of FailedEventMapping
376376
private
377377
def handle_event_mapping_errors(event_mapping_errors)
378-
event_mapping_errors.each do |action, message|
379-
detail_message = message + ", " + action.to_s
380-
handle_dlq_status(action.event, :warn, detail_message)
381-
@document_level_metrics.increment(:non_retryable_failures) unless @dlq_writer
378+
# if DQL is enabled, log the events to provide issue insights to users.
379+
if @dlq_writer
380+
@logger.warn("Events could not be indexed and routing to DLQ, count: #{event_mapping_errors.size}")
382381
end
382+
383+
event_mapping_errors.each do |event_mapping_error|
384+
detailed_message = "#{event_mapping_error.message}; event: `#{event_mapping_error.event.to_hash_with_metadata}`"
385+
handle_dlq_status(event_mapping_error.event, :warn, detailed_message)
386+
end
387+
@document_level_metrics.increment(:non_retryable_failures, event_mapping_errors.size)
383388
end
384389

385390
MapEventsResult = Struct.new(:successful_events, :event_mapping_errors)
391+
FailedEventMapping = Struct.new(:event, :message)
386392

387393
private
388394
def safe_interpolation_map_events(events)
389395
successful_events = [] # list of LogStash::Outputs::ElasticSearch::EventActionTuple
390-
event_mapping_errors = [] # list of LogStash::Outputs::ElasticSearch::EventActionTuple with error messages
396+
event_mapping_errors = [] # list of FailedEventMapping
391397
events.each do |event|
392398
begin
393399
successful_events << @event_mapper.call(event)
394400
rescue EventMappingError => ie
395-
action = event.sprintf(@action || 'index')
396-
event_action_tuple = EventActionTuple.new(action, [], event)
397-
event_mapping_errors << [event_action_tuple, ie.message]
401+
event_mapping_errors << FailedEventMapping.new(event, ie.message)
398402
end
399403
end
400404
MapEventsResult.new(successful_events, event_mapping_errors)

Diff for: spec/integration/outputs/index_spec.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false);
137137
before { subject.instance_variable_set('@dlq_writer', dlq_writer) }
138138

139139
it "should doesn't create an index name with unresolved placeholders" do
140-
expect(dlq_writer).to receive(:write).once.with(event, /Could not resolve dynamic index/)
140+
expect(dlq_writer).to receive(:write).once.with(event, a_string_including("Badly formatted index, after interpolation still contains placeholder"))
141141
subject.multi_receive(events)
142142

143143
escaped_index_name = CGI.escape("%{[index_name]}_dynamic")

Diff for: spec/unit/outputs/elasticsearch_spec.rb

+16-2
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,9 @@
398398
event_result.successful_events.each do |action, _|
399399
expect(action).to_not eql("unsupported_action")
400400
end
401+
event_result.event_mapping_errors.each do |event_mapping_error|
402+
expect(event_mapping_error.message).to eql("Elasticsearch doesn't support [unsupported_action] action")
403+
end
401404
end
402405
end
403406

@@ -414,6 +417,9 @@
414417
event_result.successful_events.each do |action, _|
415418
expect(action).to_not eql("unsupported_action")
416419
end
420+
event_result.event_mapping_errors.each do |event_mapping_error|
421+
expect(event_mapping_error.message).to eql("Elasticsearch doesn't support [unsupported_action] action")
422+
end
417423
end
418424
end
419425

@@ -430,6 +436,9 @@
430436
event_result.successful_events.each do |action, _|
431437
expect(action).to_not eql("unsupported_action")
432438
end
439+
event_result.event_mapping_errors.each do |event_mapping_error|
440+
expect(event_mapping_error.message).to include "Elasticsearch doesn't support"
441+
end
433442
end
434443
end
435444

@@ -443,7 +452,10 @@
443452
it "rejects unsupported actions" do
444453
event_result = subject.send(:safe_interpolation_map_events, events)
445454
expect(event_result.successful_events.size).to be == 2
446-
# expect(logger_stub).to have_received(:warn).with(a_string_including "Could not index event to Elasticsearch because its action is not supported.")
455+
expect(event_result.event_mapping_errors.size).to be == 2
456+
event_result.event_mapping_errors.each do |event_mapping_error|
457+
expect(event_mapping_error.message).to include "Elasticsearch doesn't support"
458+
end
447459
end
448460
end
449461

@@ -457,7 +469,9 @@
457469
it "rejects unsupported action" do
458470
event_result = subject.send(:safe_interpolation_map_events, events)
459471
expect(event_result.successful_events.size).to be == 3
460-
# expect(logger_stub).to have_received(:warn).with(a_string_including "Could not index event to Elasticsearch because its action is not supported.")
472+
event_result.event_mapping_errors.each do |event_mapping_error|
473+
expect(event_mapping_error.message).to eql("Elasticsearch doesn't support [unsupported_action3] action")
474+
end
461475
end
462476
end
463477
end

0 commit comments

Comments
 (0)