-
Notifications
You must be signed in to change notification settings - Fork 306
/
Copy pathelasticsearch.rb
652 lines (549 loc) · 27.6 KB
/
elasticsearch.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
# encoding: utf-8
require "logstash/namespace"
require "logstash/environment"
require "logstash/outputs/base"
require "logstash/json"
require "concurrent/atomic/atomic_boolean"
require "stud/interval"
require "socket" # for Socket.gethostname
require "thread" # for safe queueing
require "uri" # for escaping user input
require "forwardable"
require "set"
# .Compatibility Note
# [NOTE]
# ================================================================================
# Starting with Elasticsearch 5.3, there's an {ref}modules-http.html[HTTP setting]
# called `http.content_type.required`. If this option is set to `true`, and you
# are using Logstash 2.4 through 5.2, you need to update the Elasticsearch output
# plugin to version 6.2.5 or higher.
#
# ================================================================================
#
# This plugin is the recommended method of storing logs in Elasticsearch.
# If you plan on using the Kibana web interface, you'll want to use this output.
#
# This output only speaks the HTTP protocol. HTTP is the preferred protocol for interacting with Elasticsearch as of Logstash 2.0.
# We strongly encourage the use of HTTP over the node protocol for a number of reasons. HTTP is only marginally slower,
# yet far easier to administer and work with. When using the HTTP protocol one may upgrade Elasticsearch versions without having
# to upgrade Logstash in lock-step.
#
# You can learn more about Elasticsearch at <https://www.elastic.co/products/elasticsearch>
#
# ==== Template management for Elasticsearch 5.x
# Index template for this version (Logstash 5.0) has been changed to reflect Elasticsearch's mapping changes in version 5.0.
# Most importantly, the subfield for string multi-fields has changed from `.raw` to `.keyword` to match ES default
# behavior.
#
# ** Users installing ES 5.x and LS 5.x **
# This change will not affect you and you will continue to use the ES defaults.
#
# ** Users upgrading from LS 2.x to LS 5.x with ES 5.x **
# LS will not force upgrade the template, if `logstash` template already exists. This means you will still use
# `.raw` for sub-fields coming from 2.x. If you choose to use the new template, you will have to reindex your data after
# the new template is installed.
#
# ==== Retry Policy
#
# The retry policy has changed significantly in the 2.2.0 release.
# This plugin uses the Elasticsearch bulk API to optimize its imports into Elasticsearch. These requests may experience
# either partial or total failures.
#
# The following errors are retried infinitely:
#
# - Network errors (inability to connect)
# - 429 (Too many requests) and
# - 503 (Service unavailable) errors
#
# NOTE: 409 exceptions are no longer retried. Please set a higher `retry_on_conflict` value if you experience 409 exceptions.
# It is more performant for Elasticsearch to retry these exceptions than this plugin.
#
# ==== Batch Sizes ====
# This plugin attempts to send batches of events as a single request. However, if
# a request exceeds 20MB we will break it up until multiple batch requests. If a single document exceeds 20MB it will be sent as a single request.
#
# ==== DNS Caching
#
# This plugin uses the JVM to lookup DNS entries and is subject to the value of https://docs.oracle.com/javase/7/docs/technotes/guides/net/properties.html[networkaddress.cache.ttl],
# a global setting for the JVM.
#
# As an example, to set your DNS TTL to 1 second you would set
# the `LS_JAVA_OPTS` environment variable to `-Dnetworkaddress.cache.ttl=1`.
#
# Keep in mind that a connection with keepalive enabled will
# not reevaluate its DNS value while the keepalive is in effect.
#
# ==== HTTP Compression
#
# This plugin supports request and response compression. Response compression is enabled by default and
# for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for
# it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` in
# Elasticsearch[https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html#modules-http] to take advantage of response compression when using this plugin
#
# For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression`
# setting in their Logstash config file.
#
class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
declare_threadsafe!
require "logstash/outputs/elasticsearch/license_checker"
require "logstash/outputs/elasticsearch/http_client"
require "logstash/outputs/elasticsearch/http_client_builder"
require "logstash/plugin_mixins/elasticsearch/api_configs"
require "logstash/plugin_mixins/elasticsearch/common"
require "logstash/outputs/elasticsearch/ilm"
require "logstash/outputs/elasticsearch/data_stream_support"
require 'logstash/plugin_mixins/ecs_compatibility_support'
require 'logstash/plugin_mixins/deprecation_logger_support'
# Protocol agnostic methods
include(LogStash::PluginMixins::ElasticSearch::Common)
# Methods for ILM support
include(LogStash::Outputs::ElasticSearch::Ilm)
# ecs_compatibility option, provided by Logstash core or the support adapter.
include(LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8))
# deprecation logger adapter for older Logstashes
include(LogStash::PluginMixins::DeprecationLoggerSupport)
# Generic/API config options that any document indexer output needs
include(LogStash::PluginMixins::ElasticSearch::APIConfigs)
# DS support
include(LogStash::Outputs::ElasticSearch::DataStreamSupport)
DEFAULT_POLICY = "logstash-policy"
config_name "elasticsearch"
# The Elasticsearch action to perform. Valid actions are:
#
# - index: indexes a document (an event from Logstash).
# - delete: deletes a document by id (An id is required for this action)
# - create: indexes a document, fails if a document by that id already exists in the index.
# - update: updates a document by id. Update has a special case where you can upsert -- update a
# document if not already present. See the `upsert` option. NOTE: This does not work and is not supported
# in Elasticsearch 1.x. Please upgrade to ES 2.x or greater to use this feature with Logstash!
# - A sprintf style string to change the action based on the content of the event. The value `%{[foo]}`
# would use the foo field for the action
#
# For more details on actions, check out the http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html[Elasticsearch bulk API documentation]
config :action, :validate => :string # :default => "index" unless data_stream
# The index to write events to. This can be dynamic using the `%{foo}` syntax.
# The default value will partition your indices by day so you can more easily
# delete old data or only search specific date ranges.
# Indexes may not contain uppercase characters.
# For weekly indexes ISO 8601 format is recommended, eg. logstash-%{+xxxx.ww}.
# LS uses Joda to format the index pattern from event timestamp.
# Joda formats are defined http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html[here].
config :index, :validate => :string
config :document_type,
:validate => :string,
:deprecated => "Document types are being deprecated in Elasticsearch 6.0, and removed entirely in 7.0. You should avoid this feature"
# From Logstash 1.3 onwards, a template is applied to Elasticsearch during
# Logstash's startup if one with the name `template_name` does not already exist.
# By default, the contents of this template is the default template for
# `logstash-%{+YYYY.MM.dd}` which always matches indices based on the pattern
# `logstash-*`. Should you require support for other index names, or would like
# to change the mappings in the template in general, a custom template can be
# specified by setting `template` to the path of a template file.
#
# Setting `manage_template` to false disables this feature. If you require more
# control over template creation, (e.g. creating indices dynamically based on
# field names) you should set `manage_template` to false and use the REST
# API to apply your templates manually.
#
# Default value is `true` unless data streams is enabled
config :manage_template, :validate => :boolean, :default => true
# This configuration option defines how the template is named inside Elasticsearch.
# Note that if you have used the template management features and subsequently
# change this, you will need to prune the old template manually, e.g.
#
# `curl -XDELETE <http://localhost:9200/_template/OldTemplateName?pretty>`
#
# where `OldTemplateName` is whatever the former setting was.
config :template_name, :validate => :string
# You can set the path to your own template here, if you so desire.
# If not set, the included template will be used.
config :template, :validate => :path
# The template_overwrite option will always overwrite the indicated template
# in Elasticsearch with either the one indicated by template or the included one.
# This option is set to false by default. If you always want to stay up to date
# with the template provided by Logstash, this option could be very useful to you.
# Likewise, if you have your own template file managed by puppet, for example, and
# you wanted to be able to update it regularly, this option could help there as well.
#
# Please note that if you are using your own customized version of the Logstash
# template (logstash), setting this to true will make Logstash to overwrite
# the "logstash" template (i.e. removing all customized settings)
config :template_overwrite, :validate => :boolean, :default => false
# Flag for enabling legacy template api for Elasticsearch 8
# Default auto will use index template api for Elasticsearch 8 and use legacy api for 7
# Set to legacy to use legacy template api
config :template_api, :validate => ['auto', 'legacy', 'composable'], :default => 'auto'
# The version to use for indexing. Use sprintf syntax like `%{my_version}` to use a field value here.
# See https://www.elastic.co/blog/elasticsearch-versioning-support.
config :version, :validate => :string
# The version_type to use for indexing.
# See https://www.elastic.co/blog/elasticsearch-versioning-support.
# See also https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#_version_types
config :version_type, :validate => ["internal", 'external', "external_gt", "external_gte", "force"]
# A routing override to be applied to all processed events.
# This can be dynamic using the `%{foo}` syntax.
config :routing, :validate => :string
# For child documents, ID of the associated parent.
# This can be dynamic using the `%{foo}` syntax.
config :parent, :validate => :string, :default => nil
# For child documents, name of the join field
config :join_field, :validate => :string, :default => nil
# Set upsert content for update mode.s
# Create a new document with this parameter as json string if `document_id` doesn't exists
config :upsert, :validate => :string, :default => ""
# Enable `doc_as_upsert` for update mode.
# Create a new document with source if `document_id` doesn't exist in Elasticsearch
config :doc_as_upsert, :validate => :boolean, :default => false
# Set script name for scripted update mode
config :script, :validate => :string, :default => ""
# Define the type of script referenced by "script" variable
# inline : "script" contains inline script
# indexed : "script" contains the name of script directly indexed in elasticsearch
# file : "script" contains the name of script stored in elasticseach's config directory
config :script_type, :validate => ["inline", 'indexed', "file"], :default => ["inline"]
# Set the language of the used script. If not set, this defaults to painless in ES 5.0
config :script_lang, :validate => :string, :default => "painless"
# Set variable name passed to script (scripted update)
config :script_var_name, :validate => :string, :default => "event"
# if enabled, script is in charge of creating non-existent document (scripted update)
config :scripted_upsert, :validate => :boolean, :default => false
# The number of times Elasticsearch should internally retry an update/upserted document
# See the https://www.elastic.co/guide/en/elasticsearch/guide/current/partial-updates.html[partial updates]
# for more info
config :retry_on_conflict, :validate => :number, :default => 1
# Set which ingest pipeline you wish to execute for an event. You can also use event dependent configuration
# here like `pipeline => "%{INGEST_PIPELINE}"`
config :pipeline, :validate => :string, :default => nil
# -----
# ILM configurations (beta)
# -----
# Flag for enabling Index Lifecycle Management integration.
config :ilm_enabled, :validate => [true, false, 'true', 'false', 'auto'], :default => 'auto'
# Rollover alias used for indexing data. If rollover alias doesn't exist, Logstash will create it and map it to the relevant index
config :ilm_rollover_alias, :validate => :string
# appends “{now/d}-000001” by default for new index creation, subsequent rollover indices will increment based on this pattern i.e. “000002”
# {now/d} is date math, and will insert the appropriate value automatically.
config :ilm_pattern, :validate => :string, :default => '{now/d}-000001'
# ILM policy to use, if undefined the default policy will be used.
config :ilm_policy, :validate => :string, :default => DEFAULT_POLICY
# List extra HTTP's error codes that are considered valid to move the events into the dead letter queue.
# It's considered a configuration error to re-use the same predefined codes for success, DLQ or conflict.
# The option accepts a list of natural numbers corresponding to HTTP errors codes.
config :dlq_custom_codes, :validate => :number, :list => true, :default => []
# if enabled, failed index name interpolation events go into dead letter queue.
config :dlq_on_failed_indexname_interpolation, :validate => :boolean, :default => true
attr_reader :client
attr_reader :default_index
attr_reader :default_ilm_rollover_alias
attr_reader :default_template_name
def initialize(*params)
super
setup_ecs_compatibility_related_defaults
end
def register
if !failure_type_logging_whitelist.empty?
log_message = "'failure_type_logging_whitelist' is deprecated and in a future version of Elasticsearch " +
"output plugin will be removed, please use 'silence_errors_in_log' instead."
@deprecation_logger.deprecated log_message
@logger.warn log_message
@silence_errors_in_log = silence_errors_in_log | failure_type_logging_whitelist
end
@after_successful_connection_done = Concurrent::AtomicBoolean.new(false)
@stopping = Concurrent::AtomicBoolean.new(false)
check_action_validity
@logger.info("New Elasticsearch output", :class => self.class.name, :hosts => @hosts.map(&:sanitized).map(&:to_s))
# the license_checking behaviour in the Pool class is externalized in the LogStash::ElasticSearchOutputLicenseChecker
# class defined in license_check.rb. This license checking is specific to the elasticsearch output here and passed
# to build_client down to the Pool class.
@client = build_client(LicenseChecker.new(@logger))
# Avoids race conditions in the @data_stream_config initialization (invoking check_data_stream_config! twice).
# It's being concurrently invoked by this register method and by the finish_register on the @after_successful_connection_thread
data_stream_enabled = data_stream_config?
setup_template_manager_defaults(data_stream_enabled)
@after_successful_connection_thread = after_successful_connection do
begin
finish_register
true # thread.value
rescue => e
# we do not want to halt the thread with an exception as that has consequences for LS
e # thread.value
ensure
@after_successful_connection_done.make_true
end
end
# To support BWC, we check if DLQ exists in core (< 5.4). If it doesn't, we use nil to resort to previous behavior.
@dlq_writer = dlq_enabled? ? execution_context.dlq_writer : nil
@dlq_codes = DOC_DLQ_CODES.to_set
if dlq_enabled?
check_dlq_custom_codes
@dlq_codes.merge(dlq_custom_codes)
else
raise LogStash::ConfigurationError, "DLQ feature (dlq_custom_codes) is configured while DLQ is not enabled" unless dlq_custom_codes.empty?
end
if data_stream_enabled
@event_mapper = -> (e) { data_stream_event_action_tuple(e) }
@event_target = -> (e) { data_stream_name(e) }
@index = "#{data_stream_type}-#{data_stream_dataset}-#{data_stream_namespace}".freeze # default name
else
@event_mapper = -> (e) { event_action_tuple(e) }
@event_target = -> (e) { e.sprintf(@index) }
end
@bulk_request_metrics = metric.namespace(:bulk_requests)
@document_level_metrics = metric.namespace(:documents)
if ecs_compatibility == :v8
@logger.warn("Elasticsearch Output configured with `ecs_compatibility => v8`, which resolved to an UNRELEASED preview of version 8.0.0 of the Elastic Common Schema. " +
"Once ECS v8 and an updated release of this plugin are publicly available, you will need to update this plugin to resolve this warning.")
end
end
# @override post-register when ES connection established
def finish_register
assert_es_version_supports_data_streams if data_stream_config?
discover_cluster_uuid
install_template
setup_ilm if ilm_in_use?
super
end
# @override to handle proxy => '' as if none was set
def config_init(params)
proxy = params['proxy']
if proxy.is_a?(String)
# environment variables references aren't yet resolved
proxy = deep_replace(proxy)
if proxy.empty?
params.delete('proxy')
@proxy = ''
else
params['proxy'] = proxy # do not do resolving again
end
end
super(params)
end
# Receive an array of events and immediately attempt to index them (no buffering)
def multi_receive(events)
wait_for_successful_connection if @after_successful_connection_done
events_mapped = safe_interpolation_map_events(events)
retrying_submit(events_mapped.successful_events)
unless events_mapped.event_mapping_errors.empty?
handle_event_mapping_errors(events_mapped.event_mapping_errors)
end
end
# @param: Arrays of FailedEventMapping
private
def handle_event_mapping_errors(event_mapping_errors)
# if DQL is enabled, log the events to provide issue insights to users.
if @dlq_writer
@logger.warn("Events could not be indexed and routing to DLQ, count: #{event_mapping_errors.size}")
end
event_mapping_errors.each do |event_mapping_error|
detailed_message = "#{event_mapping_error.message}; event: `#{event_mapping_error.event.to_hash_with_metadata}`"
handle_dlq_status(event_mapping_error.event, :warn, detailed_message)
end
@document_level_metrics.increment(:non_retryable_failures, event_mapping_errors.size)
end
MapEventsResult = Struct.new(:successful_events, :event_mapping_errors)
FailedEventMapping = Struct.new(:event, :message)
private
def safe_interpolation_map_events(events)
successful_events = [] # list of LogStash::Outputs::ElasticSearch::EventActionTuple
event_mapping_errors = [] # list of FailedEventMapping
events.each do |event|
begin
successful_events << @event_mapper.call(event)
rescue EventMappingError => ie
event_mapping_errors << FailedEventMapping.new(event, ie.message)
end
end
MapEventsResult.new(successful_events, event_mapping_errors)
end
public
def map_events(events)
safe_interpolation_map_events(events).successful_events
end
def wait_for_successful_connection
after_successful_connection_done = @after_successful_connection_done
return unless after_successful_connection_done
stoppable_sleep 1 until after_successful_connection_done.true?
status = @after_successful_connection_thread && @after_successful_connection_thread.value
if status.is_a?(Exception) # check if thread 'halted' with an error
# keep logging that something isn't right (from every #multi_receive)
@logger.error "Elasticsearch setup did not complete normally, please review previously logged errors",
message: status.message, exception: status.class
else
@after_successful_connection_done = nil # do not execute __method__ again if all went well
end
end
private :wait_for_successful_connection
def close
@stopping.make_true if @stopping
stop_after_successful_connection_thread
@client.close if @client
end
private
def stop_after_successful_connection_thread
@after_successful_connection_thread.join unless @after_successful_connection_thread.nil?
end
# Convert the event into a 3-tuple of action, params and event hash
def event_action_tuple(event)
params = common_event_params(event)
params[:_type] = get_event_type(event) if use_event_type?(nil)
if @parent
if @join_field
join_value = event.get(@join_field)
parent_value = event.sprintf(@parent)
event.set(@join_field, { "name" => join_value, "parent" => parent_value })
params[routing_field_name] = event.sprintf(@parent)
else
params[:parent] = event.sprintf(@parent)
end
end
action = event.sprintf(@action || 'index')
raise UnsupportedActionError, action unless VALID_HTTP_ACTIONS.include?(action)
if action == 'update'
params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @upsert != ""
params[:_script] = event.sprintf(@script) if @script != ""
params[retry_on_conflict_action_name] = @retry_on_conflict
end
params[:version] = event.sprintf(@version) if @version
params[:version_type] = event.sprintf(@version_type) if @version_type
EventActionTuple.new(action, params, event)
end
class EventActionTuple < Array # TODO: acting as an array for compatibility
def initialize(action, params, event, event_data = nil)
super(3)
self[0] = action
self[1] = params
self[2] = event_data || event.to_hash
@event = event
end
attr_reader :event
end
class EventMappingError < ArgumentError
def initialize(msg = nil)
super
end
end
class IndexInterpolationError < EventMappingError
def initialize(bad_formatted_index)
super("Badly formatted index, after interpolation still contains placeholder: [#{bad_formatted_index}]")
end
end
class UnsupportedActionError < EventMappingError
def initialize(bad_action)
super("Elasticsearch doesn't support [#{bad_action}] action")
end
end
# @return Hash (initial) parameters for given event
# @private shared event params factory between index and data_stream mode
def common_event_params(event)
sprintf_index = @event_target.call(event)
raise IndexInterpolationError, sprintf_index if sprintf_index.match(/%{.*?}/) && dlq_on_failed_indexname_interpolation
params = {
:_id => @document_id ? event.sprintf(@document_id) : nil,
:_index => sprintf_index,
routing_field_name => @routing ? event.sprintf(@routing) : nil
}
target_pipeline = resolve_pipeline(event)
# convention: empty string equates to not using a pipeline
# this is useful when using a field reference in the pipeline setting, e.g.
# elasticsearch {
# pipeline => "%{[@metadata][pipeline]}"
# }
params[:pipeline] = target_pipeline unless (target_pipeline.nil? || target_pipeline.empty?)
params
end
def resolve_pipeline(event)
pipeline_template = @pipeline || event.get("[@metadata][target_ingest_pipeline]")&.to_s
pipeline_template && event.sprintf(pipeline_template)
end
@@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch-/ }
@@plugins.each do |plugin|
name = plugin.name.split('-')[-1]
require "logstash/outputs/elasticsearch/#{name}"
end
def retry_on_conflict_action_name
maximum_seen_major_version >= 7 ? :retry_on_conflict : :_retry_on_conflict
end
def routing_field_name
:routing
end
# Determine the correct value for the 'type' field for the given event
DEFAULT_EVENT_TYPE_ES6 = "doc".freeze
DEFAULT_EVENT_TYPE_ES7 = "_doc".freeze
def get_event_type(event)
# Set the 'type' value for the index.
type = if @document_type
event.sprintf(@document_type)
else
major_version = maximum_seen_major_version
if major_version == 6
DEFAULT_EVENT_TYPE_ES6
elsif major_version == 7
DEFAULT_EVENT_TYPE_ES7
else
nil
end
end
type.to_s
end
##
# WARNING: This method is overridden in a subclass in Logstash Core 7.7-7.8's monitoring,
# where a `client` argument is both required and ignored. In later versions of
# Logstash Core it is optional and ignored, but to make it optional here would
# allow us to accidentally break compatibility with Logstashes where it was required.
# @param noop_required_client [nil]: required `nil` for legacy reasons.
# @return [Boolean]
def use_event_type?(noop_required_client)
# always set type for ES 6
# for ES 7 only set it if the user defined it
(maximum_seen_major_version < 7) || (maximum_seen_major_version == 7 && @document_type)
end
def install_template
TemplateManager.install_template(self)
rescue => e
@logger.error("Failed to install template", message: e.message, exception: e.class, backtrace: e.backtrace)
end
def setup_ecs_compatibility_related_defaults
case ecs_compatibility
when :disabled
@default_index = "logstash-%{+yyyy.MM.dd}"
@default_ilm_rollover_alias = "logstash"
@default_template_name = 'logstash'
when :v1, :v8
@default_index = "ecs-logstash-%{+yyyy.MM.dd}"
@default_ilm_rollover_alias = "ecs-logstash"
@default_template_name = 'ecs-logstash'
else
fail("unsupported ECS Compatibility `#{ecs_compatibility}`")
end
@index ||= default_index
@ilm_rollover_alias ||= default_ilm_rollover_alias
@template_name ||= default_template_name
end
def setup_template_manager_defaults(data_stream_enabled)
if original_params["manage_template"].nil? && data_stream_enabled
logger.debug("Disabling template management since data streams are enabled")
@manage_template = false
end
end
# To be overidden by the -java version
VALID_HTTP_ACTIONS = ["index", "delete", "create", "update"]
def valid_actions
VALID_HTTP_ACTIONS
end
def check_action_validity
return if @action.nil? # not set
raise LogStash::ConfigurationError, "No action specified!" if @action.empty?
# If we're using string interpolation, we're good!
return if @action =~ /%{.+}/
return if valid_actions.include?(@action)
raise LogStash::ConfigurationError, "Action '#{@action}' is invalid! Pick one of #{valid_actions} or use a sprintf style statement"
end
def check_dlq_custom_codes
intersection = dlq_custom_codes & DOC_DLQ_CODES
raise LogStash::ConfigurationError, "#{intersection} are already defined as standard DLQ error codes" unless intersection.empty?
intersection = dlq_custom_codes & DOC_SUCCESS_CODES
raise LogStash::ConfigurationError, "#{intersection} are success codes which cannot be redefined in dlq_custom_codes" unless intersection.empty?
intersection = dlq_custom_codes & [DOC_CONFLICT_CODE]
raise LogStash::ConfigurationError, "#{intersection} are error codes already defined as conflict which cannot be redefined in dlq_custom_codes" unless intersection.empty?
end
end