Skip to content

Commit e2931b9

Browse files
Merge pull request #1 from logstash-plugins/master
9.1.1 changes
2 parents d242dcb + 7c51ef4 commit e2931b9

13 files changed

+148
-17
lines changed

Diff for: CHANGELOG.md

+8
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
## 9.1.1
2+
- Docs: Set the default_codec doc attribute.
3+
4+
## 9.1.0
5+
- Set number_of_shards to 1 and document_type to '_doc' for es 7.x clusters #741 #747
6+
- Fix usage of upsert and script when update action is interpolated #239
7+
- Add metrics to track bulk level and document level responses #585
8+
19
## 9.0.3
210
- Ignore master-only nodes when using sniffing
311

Diff for: docs/index.asciidoc

+13-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
:plugin: elasticsearch
22
:type: output
3+
:default_codec: plain
34

45
///////////////////////////////////////////
56
START - GENERATED VARIABLES, DO NOT EDIT!
@@ -105,8 +106,8 @@ not reevaluate its DNS value while the keepalive is in effect.
105106

106107
This plugin supports request and response compression. Response compression is enabled by default and
107108
for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for
108-
it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` in
109-
Elasticsearch[https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html#modules-http] to take advantage of response compression when using this plugin
109+
it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html#modules-http[in
110+
Elasticsearch] to take advantage of response compression when using this plugin
110111

111112
For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression`
112113
setting in their Logstash config file.
@@ -476,6 +477,14 @@ This can be dynamic using the `%{foo}` syntax.
476477

477478
Set script name for scripted update mode
478479

480+
Example:
481+
[source,ruby]
482+
output {
483+
elasticsearch {
484+
script => "ctx._source.message = params.event.get('message')"
485+
}
486+
}
487+
479488
[id="plugins-{type}s-{plugin}-script_lang"]
480489
===== `script_lang`
481490

@@ -682,3 +691,5 @@ See also https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-in
682691

683692
[id="plugins-{type}s-{plugin}-common-options"]
684693
include::{include_path}/{type}.asciidoc[]
694+
695+
:default_codec!:

Diff for: lib/logstash/outputs/elasticsearch.rb

+1
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
227227
config :http_compression, :validate => :boolean, :default => false
228228

229229
def build_client
230+
params["metric"] = metric
230231
@client ||= ::LogStash::Outputs::ElasticSearch::HttpClientBuilder.build(@logger, @hosts, params)
231232
end
232233

Diff for: lib/logstash/outputs/elasticsearch/common.rb

+25-4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ def register
2525

2626
install_template
2727
check_action_validity
28+
@bulk_request_metrics = metric.namespace(:bulk_requests)
29+
@document_level_metrics = metric.namespace(:documents)
2830

2931
@logger.info("New Elasticsearch output", :class => self.class.name, :hosts => @hosts.map(&:sanitized).map(&:to_s))
3032
end
@@ -156,8 +158,16 @@ def submit(actions)
156158

157159
# If the response is nil that means we were in a retry loop
158160
# and aborted since we're shutting down
161+
return if bulk_response.nil?
162+
159163
# If it did return and there are no errors we're good as well
160-
return if bulk_response.nil? || !bulk_response["errors"]
164+
if bulk_response["errors"]
165+
@bulk_request_metrics.increment(:with_errors)
166+
else
167+
@bulk_request_metrics.increment(:successes)
168+
@document_level_metrics.increment(:successes, actions.size)
169+
return
170+
end
161171

162172
actions_to_retry = []
163173
bulk_response["items"].each_with_index do |response,idx|
@@ -173,8 +183,10 @@ def submit(actions)
173183
# - For a mapping error, we send to dead letter queue for a human to intervene at a later point.
174184
# - For everything else there's mastercard. Yep, and we retry indefinitely. This should fix #572 and other transient network issues
175185
if DOC_SUCCESS_CODES.include?(status)
186+
@document_level_metrics.increment(:successes)
176187
next
177188
elsif DOC_CONFLICT_CODE == status
189+
@document_level_metrics.increment(:non_retryable_failures)
178190
@logger.warn "Failed action.", status: status, action: action, response: response if !failure_type_logging_whitelist.include?(failure["type"])
179191
next
180192
elsif DOC_DLQ_CODES.include?(status)
@@ -186,9 +198,11 @@ def submit(actions)
186198
else
187199
@logger.warn "Could not index event to Elasticsearch.", status: status, action: action, response: response
188200
end
201+
@document_level_metrics.increment(:non_retryable_failures)
189202
next
190203
else
191204
# only log what the user whitelisted
205+
@document_level_metrics.increment(:retryable_failures)
192206
@logger.info "retrying failed action with response code: #{status} (#{failure})" if !failure_type_logging_whitelist.include?(failure["type"])
193207
actions_to_retry << action
194208
end
@@ -198,16 +212,19 @@ def submit(actions)
198212
end
199213

200214
# Determine the correct value for the 'type' field for the given event
201-
DEFAULT_EVENT_TYPE="doc".freeze
215+
DEFAULT_EVENT_TYPE_ES6="doc".freeze
216+
DEFAULT_EVENT_TYPE_ES7="_doc".freeze
202217
def get_event_type(event)
203218
# Set the 'type' value for the index.
204219
type = if @document_type
205220
event.sprintf(@document_type)
206221
else
207222
if client.maximum_seen_major_version < 6
208-
event.get("type") || DEFAULT_EVENT_TYPE
223+
event.get("type") || DEFAULT_EVENT_TYPE_ES6
224+
elsif client.maximum_seen_major_version == 6
225+
DEFAULT_EVENT_TYPE_ES6
209226
else
210-
DEFAULT_EVENT_TYPE
227+
DEFAULT_EVENT_TYPE_ES7
211228
end
212229
end
213230

@@ -239,6 +256,7 @@ def safe_bulk(actions)
239256

240257
# We retry until there are no errors! Errors should all go to the retry queue
241258
sleep_interval = sleep_for_interval(sleep_interval)
259+
@bulk_request_metrics.increment(:failures)
242260
retry unless @stopping.true?
243261
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::NoConnectionAvailableError => e
244262
@logger.error(
@@ -249,8 +267,10 @@ def safe_bulk(actions)
249267
)
250268
Stud.stoppable_sleep(sleep_interval) { @stopping.true? }
251269
sleep_interval = next_sleep_interval(sleep_interval)
270+
@bulk_request_metrics.increment(:failures)
252271
retry unless @stopping.true?
253272
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
273+
@bulk_request_metrics.increment(:failures)
254274
log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s}
255275
log_hash[:body] = e.response_body if @logger.debug? # Generally this is too verbose
256276
message = "Encountered a retryable error. Will Retry with exponential backoff "
@@ -280,6 +300,7 @@ def safe_bulk(actions)
280300
@logger.debug("Failed actions for last bad bulk request!", :actions => actions)
281301

282302
sleep_interval = sleep_for_interval(sleep_interval)
303+
@bulk_request_metrics.increment(:failures)
283304
retry unless @stopping.true?
284305
end
285306
end

Diff for: lib/logstash/outputs/elasticsearch/elasticsearch-template-es7x.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
"template" : "logstash-*",
33
"version" : 60001,
44
"settings" : {
5-
"index.refresh_interval" : "5s"
5+
"index.refresh_interval" : "5s",
6+
"number_of_shards": 1
67
},
78
"mappings" : {
89
"_doc" : {

Diff for: lib/logstash/outputs/elasticsearch/http_client.rb

+9-2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ class HttpClient
5050
# through a special http path, such as using mod_rewrite.
5151
def initialize(options={})
5252
@logger = options[:logger]
53+
@metric = options[:metric]
54+
@bulk_request_metrics = @metric.namespace(:bulk_requests)
55+
@bulk_response_metrics = @bulk_request_metrics.namespace(:responses)
5356

5457
# Again, in case we use DEFAULT_OPTIONS in the future, uncomment this.
5558
# @options = DEFAULT_OPTIONS.merge(options)
@@ -142,9 +145,12 @@ def bulk_send(body_stream)
142145
body_stream.seek(0)
143146
end
144147

148+
@bulk_response_metrics.increment(response.code.to_s)
149+
145150
if response.code != 200
151+
url = ::LogStash::Util::SafeURI.new(response.final_url)
146152
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
147-
response.code, @bulk_path, body_stream.to_s, response.body
153+
response.code, url, body_stream.to_s, response.body
148154
)
149155
end
150156

@@ -282,7 +288,8 @@ def build_pool(options)
282288
:sniffing_path => options[:sniffing_path],
283289
:healthcheck_path => options[:healthcheck_path],
284290
:resurrect_delay => options[:resurrect_delay],
285-
:url_normalizer => self.method(:host_to_url)
291+
:url_normalizer => self.method(:host_to_url),
292+
:metric => options[:metric]
286293
}
287294
pool_options[:scheme] = self.scheme if self.scheme
288295

Diff for: lib/logstash/outputs/elasticsearch/http_client/pool.rb

+2-1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def message
4545
def initialize(logger, adapter, initial_urls=[], options={})
4646
@logger = logger
4747
@adapter = adapter
48+
@metric = options[:metric]
4849
@initial_urls = initial_urls
4950

5051
raise ArgumentError, "No URL Normalizer specified!" unless options[:url_normalizer]
@@ -161,8 +162,8 @@ def sniff!
161162
# Sniffs and returns the results. Does not update internal URLs!
162163
def check_sniff
163164
_, url_meta, resp = perform_request(:get, @sniffing_path)
165+
@metric.increment(:sniff_requests)
164166
parsed = LogStash::Json.load(resp.body)
165-
166167
nodes = parsed['nodes']
167168
if !nodes || nodes.empty?
168169
@logger.warn("Sniff returned no nodes! Will not update hosts.")

Diff for: lib/logstash/outputs/elasticsearch/http_client_builder.rb

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ def self.build(logger, hosts, params)
1414

1515
common_options = {
1616
:client_settings => client_settings,
17+
:metric => params["metric"],
1718
:resurrect_delay => params["resurrect_delay"]
1819
}
1920

Diff for: logstash-output-elasticsearch.gemspec

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-elasticsearch'
3-
s.version = '9.0.3'
3+
s.version = '9.1.1'
44
s.licenses = ['apache-2.0']
55
s.summary = "Stores logs in Elasticsearch"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

Diff for: spec/integration/outputs/metrics_spec.rb

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
require_relative "../../../spec/es_spec_helper"
2+
3+
describe "metrics", :integration => true do
4+
subject! do
5+
require "logstash/outputs/elasticsearch"
6+
settings = {
7+
"manage_template" => false,
8+
"hosts" => "#{get_host_port()}"
9+
}
10+
plugin = LogStash::Outputs::ElasticSearch.new(settings)
11+
end
12+
13+
let(:metric) { subject.metric }
14+
let(:bulk_request_metrics) { subject.instance_variable_get(:@bulk_request_metrics) }
15+
let(:document_level_metrics) { subject.instance_variable_get(:@document_level_metrics) }
16+
17+
before :each do
18+
require "elasticsearch"
19+
20+
# Clean ES of data before we start.
21+
@es = get_client
22+
@es.indices.delete_template(:name => "*")
23+
24+
# This can fail if there are no indexes, ignore failure.
25+
@es.indices.delete(:index => "*") rescue nil
26+
#@es.indices.refresh
27+
subject.register
28+
end
29+
30+
context "after a succesful bulk insert" do
31+
let(:bulk) { [
32+
LogStash::Event.new("message" => "sample message here"),
33+
LogStash::Event.new("somemessage" => { "message" => "sample nested message here" }),
34+
LogStash::Event.new("somevalue" => 100),
35+
LogStash::Event.new("somevalue" => 10),
36+
LogStash::Event.new("somevalue" => 1),
37+
LogStash::Event.new("country" => "us"),
38+
LogStash::Event.new("country" => "at"),
39+
LogStash::Event.new("geoip" => { "location" => [ 0.0, 0.0 ] })
40+
]}
41+
42+
it "increases successful bulk request metric" do
43+
expect(bulk_request_metrics).to receive(:increment).with(:successes).once
44+
subject.multi_receive(bulk)
45+
end
46+
47+
it "increases number of successful inserted documents" do
48+
expect(document_level_metrics).to receive(:increment).with(:successes, bulk.size).once
49+
subject.multi_receive(bulk)
50+
end
51+
end
52+
53+
context "after a bulk insert that generates errors" do
54+
let(:bulk) { [
55+
LogStash::Event.new("message" => "sample message here"),
56+
LogStash::Event.new("message" => { "message" => "sample nested message here" }),
57+
]}
58+
it "increases bulk request with error metric" do
59+
expect(bulk_request_metrics).to receive(:increment).with(:with_errors).once
60+
expect(bulk_request_metrics).to_not receive(:increment).with(:successes)
61+
subject.multi_receive(bulk)
62+
end
63+
64+
it "increases number of successful and non retryable documents" do
65+
expect(document_level_metrics).to receive(:increment).with(:non_retryable_failures).once
66+
expect(document_level_metrics).to receive(:increment).with(:successes).once
67+
subject.multi_receive(bulk)
68+
end
69+
end
70+
end

Diff for: spec/integration/outputs/sniffer_spec.rb

+7-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,13 @@
77
let(:logger) { Cabin::Channel.get }
88
let(:adapter) { LogStash::Outputs::ElasticSearch::HttpClient::ManticoreAdapter.new(logger) }
99
let(:initial_urls) { [::LogStash::Util::SafeURI.new("http://#{get_host_port}")] }
10-
let(:options) { {:resurrect_delay => 2, :url_normalizer => proc {|u| u}} } # Shorten the delay a bit to speed up tests
10+
let(:options) do
11+
{
12+
:resurrect_delay => 2, # Shorten the delay a bit to speed up tests
13+
:url_normalizer => proc {|u| u},
14+
:metric => ::LogStash::Instrument::NullMetric.new(:dummy).namespace(:alsodummy)
15+
}
16+
end
1117

1218
subject { LogStash::Outputs::ElasticSearch::HttpClient::Pool.new(logger, adapter, initial_urls, options) }
1319

Diff for: spec/unit/outputs/elasticsearch/http_client_spec.rb

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
let(:base_options) do
88
opts = {
99
:hosts => [::LogStash::Util::SafeURI.new("127.0.0.1")],
10-
:logger => Cabin::Channel.get
10+
:logger => Cabin::Channel.get,
11+
:metric => ::LogStash::Instrument::NamespacedNullMetric.new(:dummy_metric)
1112
}
1213

1314
if !ssl.nil? # Shortcut to set this

Diff for: spec/unit/outputs/elasticsearch_spec.rb

+7-4
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,15 @@
4141
let(:manticore_url) { manticore_urls.first }
4242

4343
describe "getting a document type" do
44-
it "should default to 'doc'" do
45-
expect(subject.send(:get_event_type, LogStash::Event.new)).to eql("doc")
46-
end
47-
4844
context "if document_type isn't set" do
4945
let(:options) { super.merge("document_type" => nil)}
46+
context "for 7.x elasticsearch clusters" do
47+
let(:maximum_seen_major_version) { 7 }
48+
it "should return '_doc'" do
49+
expect(subject.send(:get_event_type, LogStash::Event.new("type" => "foo"))).to eql("_doc")
50+
end
51+
end
52+
5053
context "for 6.x elasticsearch clusters" do
5154
let(:maximum_seen_major_version) { 6 }
5255
it "should return 'doc'" do

0 commit comments

Comments
 (0)