Skip to content

Commit e11b6d3

Browse files
committed
ECS Compatibility (#12305)
Implements a plugin `ecs_compatibility` option, whose default value is powered by the pipeline-level setting `pipeline.ecs_compatibility`, in line with the proposal in #11623: In order to increase the confidence a user has when upgrading Logstash, this implementation uses the deprecation logger to warn when `ecs_compatibility` is used without an explicit directive. For now, as we continue to add ECS Compatibility Modes, an opting into a specific ECS Compatibility mode at a pipeline level is considered a BETA feature. All plugins using the [ECS Compatibility Support][] adapter will use the setting correctly, but pipelines configured in this way do not guarantee consistent behaviour across minor versions of Logstash or the plugins it bundles (e.g., upgraded plugins that have newly-implemented an ECS Compatibility mode will use the pipeline-level setting as a default, causing them to potentially behave differently after the upgrade). This change-set also includes a significant amount of work within the `PluginFactory`, which allows us to ensure that pipeline-level settings are available to a Logstash plugin _before_ its `initialize` is executed, including the maintaining of context for codecs that are routinely cloned. * JEE: instantiate codecs only once * PluginFactory: use passed FilterDelegator class * PluginFactory: require engine name in init * NOOP: remove useless secondary plugin factory interface * PluginFactory: simplify, compute java args only when necessary * PluginFactory: accept explicit id when vertex unavailable * PluginFactory: make source optional, args required * PluginFactory: threadsafe refactor of id duplicate tracking * PluginFactory: make id extraction/geration more abstract/understandable * PluginFactory: extract or generate ID when source not available * PluginFactory: inject ExecutionContext before initializing plugins * Codec: propagate execution_context and metric to clones * Plugin: intercept string-specified codecs and propagate execution_context * Plugin: implement `ecs_compatibility` for all plugins * Plugin: deprecate use of `Config::Mixin::DSL::validate_value(String, :codec)`
1 parent ccbc569 commit e11b6d3

36 files changed

+754
-372
lines changed

docker/data/logstash/env2yaml/env2yaml.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ func normalizeSetting(setting string) (string, error) {
5757
"pipeline.batch.delay",
5858
"pipeline.unsafe_shutdown",
5959
"pipeline.java_execution",
60+
"pipeline.ecs_compatibility"
6061
"pipeline.plugin_classloaders",
6162
"path.config",
6263
"config.string",

logstash-core/lib/logstash/agent.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,14 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil)
7575
logger.warn("deprecated setting `config.field_reference.parser` set; field reference parsing is strict by default")
7676
end
7777

78+
if @settings.set?('pipeline.ecs_compatibility')
79+
ecs_compatibility_value = settings.get('pipeline.ecs_compatibility')
80+
if ecs_compatibility_value != 'disabled'
81+
logger.warn("Setting `pipeline.ecs_compatibility` given as `#{ecs_compatibility_value}`; " +
82+
"values other than `disabled` are currently considered BETA and may have unintended consequences when upgrading minor versions of Logstash.")
83+
end
84+
end
85+
7886
# This is for backward compatibility in the tests
7987
if source_loader.nil?
8088
@source_loader = LogStash::Config::SourceLoader.new

logstash-core/lib/logstash/codecs/base.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ def flush(&block)
9292

9393
public
9494
def clone
95-
return self.class.new(params)
95+
LogStash::Plugins::Contextualizer.initialize_plugin(execution_context, self.class, params).tap do |klone|
96+
klone.metric = @metric if klone.instance_variable_get(:@metric).nil?
97+
end
9698
end
9799
end; end # class LogStash::Codecs::Base

logstash-core/lib/logstash/config/config_ast.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,12 +249,12 @@ def compile_initializer
249249
# If any parent is a Plugin, this must be a codec.
250250

251251
if attributes.elements.nil?
252-
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, line_to_source(#{source_meta.line}, #{source_meta.column}))" << (plugin_type == "codec" ? "" : "\n")
252+
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, {}, line_to_source(#{source_meta.line}, #{source_meta.column}))" << (plugin_type == "codec" ? "" : "\n")
253253
else
254254
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
255255

256256
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
257-
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, line_to_source(#{source_meta.line}, #{source_meta.column}), #{attributes_code})" << (plugin_type == "codec" ? "" : "\n")
257+
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code}, line_to_source(#{source_meta.line}, #{source_meta.column}))" << (plugin_type == "codec" ? "" : "\n")
258258
end
259259
end
260260

@@ -271,7 +271,7 @@ def compile
271271
when "codec"
272272
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
273273
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
274-
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, line_to_source(#{source_meta.line}, #{source_meta.column}), #{attributes_code})"
274+
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code}, line_to_source(#{source_meta.line}, #{source_meta.column}))"
275275
end
276276
end
277277

logstash-core/lib/logstash/config/mixin.rb

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
module LogStash::Config::Mixin
5050

5151
include LogStash::Util::SubstitutionVariables
52+
include LogStash::Util::Loggable
5253

5354
attr_accessor :config
5455
attr_accessor :original_params
@@ -99,6 +100,17 @@ def config_init(params)
99100
params[name.to_s] = deep_replace(value)
100101
end
101102

103+
# Intercept codecs that have not been instantiated
104+
params.each do |name, value|
105+
validator = self.class.validator_find(name)
106+
next unless validator && validator[:validate] == :codec && value.kind_of?(String)
107+
108+
codec_klass = LogStash::Plugin.lookup("codec", value)
109+
codec_instance = LogStash::Plugins::Contextualizer.initialize_plugin(execution_context, codec_klass)
110+
111+
params[name.to_s] = LogStash::Codecs::Delegator.new(codec_instance)
112+
end
113+
102114
if !self.class.validate(params)
103115
raise LogStash::ConfigurationError,
104116
I18n.t("logstash.runner.configuration.invalid_plugin_settings")
@@ -190,7 +202,7 @@ def config(name, opts={})
190202
name = name.to_s if name.is_a?(Symbol)
191203
@config[name] = opts # ok if this is empty
192204

193-
if name.is_a?(String)
205+
if name.is_a?(String) && opts.fetch(:attr_accessor, true)
194206
define_method(name) { instance_variable_get("@#{name}") }
195207
define_method("#{name}=") { |v| instance_variable_set("@#{name}", v) }
196208
end
@@ -429,6 +441,11 @@ def validate_value(value, validator)
429441
case validator
430442
when :codec
431443
if value.first.is_a?(String)
444+
# A plugin's codecs should be instantiated by `PluginFactory` or in `Config::Mixin#config_init(Hash)`,
445+
# which ensure the inner plugin has access to the outer's execution context and metric store.
446+
# This deprecation exists to warn plugins that call `Config::Mixin::validate_value` directly.
447+
self.deprecation_logger.deprecated("Codec instantiated by `Config::Mixin::DSL::validate_value(String, :codec)` which cannot propagate parent plugin's execution context or metrics. ",
448+
self.logger.debug? ? {:backtrace => caller} : {})
432449
value = LogStash::Codecs::Delegator.new LogStash::Plugin.lookup("codec", value.first).new
433450
return true, value
434451
else

logstash-core/lib/logstash/environment.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ module Environment
6262
Setting::Boolean.new("pipeline.plugin_classloaders", false),
6363
Setting::Boolean.new("pipeline.separate_logs", false),
6464
Setting::CoercibleString.new("pipeline.ordered", "auto", true, ["auto", "true", "false"]),
65+
Setting::CoercibleString.new("pipeline.ecs_compatibility", "disabled", true, %w(disabled v1 v2)),
6566
Setting.new("path.plugins", Array, []),
6667
Setting::NullableString.new("interactive", nil, false),
6768
Setting::Boolean.new("config.debug", false),

logstash-core/lib/logstash/inputs/base.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,10 @@ def metric=(metric)
124124

125125
def execution_context=(context)
126126
super
127-
# There is no easy way to propage an instance variable into the codec, because the codec
128-
# are created at the class level
129-
# TODO(talevy): Codecs should have their own execution_context, for now they will inherit their
130-
# parent plugin's
127+
# Setting the execution context after initialization is deprecated and will be removed in
128+
# a future release of Logstash. While this code is no longer executed from Logstash core,
129+
# we continue to propagate a set execution context to an input's codec, and rely on super's
130+
# deprecation warning.
131131
@codec.execution_context = context
132132
context
133133
end

logstash-core/lib/logstash/outputs/base.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,10 @@ def metric=(metric)
127127

128128
def execution_context=(context)
129129
super
130-
# There is no easy way to propage an instance variable into the codec, because the codec
131-
# are created at the class level
132-
# TODO(talevy): Codecs should have their own execution_context, for now they will inherit their
133-
# parent plugin's
130+
# Setting the execution context after initialization is deprecated and will be removed in
131+
# a future release of Logstash. While this code is no longer executed from Logstash core,
132+
# we continue to propagate a set execution context to an output's codec, and rely on super's
133+
# deprecation warning.
134134
@codec.execution_context = context
135135
context
136136
end

logstash-core/lib/logstash/pipeline.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,8 @@ def non_reloadable_plugins
8989
private
9090

9191

92-
def plugin(plugin_type, name, source, *args)
93-
@plugin_factory.plugin(plugin_type, name, source, *args)
92+
def plugin(plugin_type, name, args, source)
93+
@plugin_factory.plugin(plugin_type, name, args, source)
9494
end
9595

9696
def default_logging_keys(other_keys = {})

logstash-core/lib/logstash/plugin.rb

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# under the License.
1717

1818
require "logstash/config/mixin"
19+
require "logstash/plugins/ecs_compatibility_support"
1920
require "concurrent"
2021
require "securerandom"
2122

@@ -24,11 +25,12 @@
2425
class LogStash::Plugin
2526
include LogStash::Util::Loggable
2627

27-
attr_accessor :params, :execution_context
28+
attr_accessor :params
2829

2930
NL = "\n"
3031

3132
include LogStash::Config::Mixin
33+
include LogStash::Plugins::ECSCompatibilitySupport
3234

3335
# Disable or enable metric logging for this specific plugin instance
3436
# by default we record all the metrics we can, but you can disable metrics collection
@@ -60,7 +62,7 @@ def eql?(other)
6062
self.class.name == other.class.name && @params == other.params
6163
end
6264

63-
def initialize(params=nil)
65+
def initialize(params={})
6466
@logger = self.logger
6567
@deprecation_logger = self.deprecation_logger
6668
# need to access settings statically because plugins are initialized in config_ast with no context.
@@ -177,4 +179,14 @@ def self.lookup(type, name)
177179
def plugin_metadata
178180
LogStash::PluginMetadata.for_plugin(self.id)
179181
end
182+
183+
# Deprecated attr_writer for execution_context
184+
def execution_context=(new_context)
185+
@deprecation_logger.deprecated("LogStash::Plugin#execution_context=(new_ctx) is deprecated. Use LogStash::Plugins::Contextualizer#initialize_plugin(new_ctx, klass, args) instead", :caller => caller.first)
186+
@execution_context = new_context
187+
end
188+
189+
def execution_context
190+
@execution_context || LogStash::ExecutionContext::Empty
191+
end
180192
end # class LogStash::Plugin
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
module LogStash
2+
module Plugins
3+
module ECSCompatibilitySupport
4+
def self.included(base)
5+
base.extend(ArgumentValidator)
6+
base.config(:ecs_compatibility, :validate => :ecs_compatibility_argument,
7+
:attr_accessor => false)
8+
end
9+
10+
MUTEX = Mutex.new
11+
private_constant :MUTEX
12+
13+
def ecs_compatibility
14+
@_ecs_compatibility || MUTEX.synchronize do
15+
@_ecs_compatibility ||= begin
16+
# use config_init-set value if present
17+
break @ecs_compatibility unless @ecs_compatibility.nil?
18+
19+
pipeline = execution_context.pipeline
20+
pipeline_settings = pipeline && pipeline.settings
21+
pipeline_settings ||= LogStash::SETTINGS
22+
23+
if !pipeline_settings.set?('pipeline.ecs_compatibility')
24+
deprecation_logger.deprecated("Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. " +
25+
"To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.")
26+
end
27+
28+
pipeline_settings.get_value('pipeline.ecs_compatibility').to_sym
29+
end
30+
end
31+
end
32+
33+
module ArgumentValidator
34+
V_PREFIXED_INTEGER_PATTERN = %r(\Av[1-9][0-9]?\Z).freeze
35+
private_constant :V_PREFIXED_INTEGER_PATTERN
36+
37+
def validate_value(value, validator)
38+
return super unless validator == :ecs_compatibility_argument
39+
40+
value = deep_replace(value)
41+
value = hash_or_array(value)
42+
43+
if value.size == 1
44+
return true, :disabled if value.first.to_s == 'disabled'
45+
return true, value.first.to_sym if value.first.to_s =~ V_PREFIXED_INTEGER_PATTERN
46+
end
47+
48+
return false, "Expected a v-prefixed integer major-version number (e.g., `v1`) or the literal `disabled`, got #{value.inspect}"
49+
end
50+
end
51+
end
52+
end
53+
end

logstash-core/lib/logstash/runner.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,11 @@ class LogStash::Runner < Clamp::StrictCommand
154154
:attribute_name => "pipeline.unsafe_shutdown",
155155
:default => LogStash::SETTINGS.get_default("pipeline.unsafe_shutdown")
156156

157+
option ["--pipeline.ecs_compatibility"], "STRING",
158+
I18n.t("logstash.runner.flag.ecs_compatibility"),
159+
:attribute_name => "pipeline.ecs_compatibility",
160+
:default => LogStash::SETTINGS.get_default('pipeline.ecs_compatibility')
161+
157162
# Data Path Setting
158163
option ["--path.data"] , "PATH",
159164
I18n.t("logstash.runner.flag.datapath"),

logstash-core/lib/logstash/settings.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class Settings
4848
"pipeline.system",
4949
"pipeline.workers",
5050
"pipeline.ordered",
51+
"pipeline.ecs_compatibility",
5152
"queue.checkpoint.acks",
5253
"queue.checkpoint.interval",
5354
"queue.checkpoint.writes",

logstash-core/locales/en.yml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,21 @@ en:
362362
if there are still inflight events in memory.
363363
By default, logstash will refuse to quit until all
364364
received events have been pushed to the outputs.
365+
ecs_compatibility: |+
366+
Sets the pipeline's default value for `ecs_compatibility`,
367+
a setting that is available to plugins that implement
368+
an ECS Compatibility mode for use with the Elastic Common
369+
Schema.
370+
Possible values are:
371+
- disabled (default)
372+
- v1
373+
- v2
374+
This option allows the early opt-in (or preemptive opt-out)
375+
of ECS Compatibility modes in plugins, which is scheduled to
376+
be on-by-default in a future major release of Logstash.
377+
378+
Values other than `disabled` are currently considered BETA,
379+
and may produce unintended consequences when upgrading Logstash.
365380
rubyshell: |+
366381
Drop to shell instead of running as normal.
367382
Valid shells are "irb" and "pry"

logstash-core/spec/logstash/config/mixin_spec.rb

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,31 @@
4747
end
4848
end
4949

50+
context 'DSL::validate_value(String, :codec)' do
51+
subject(:plugin_class) { Class.new(LogStash::Filters::Base) { config_name "test_deprecated_two" } }
52+
let(:codec_class) { Class.new(LogStash::Codecs::Base) { config_name 'dummy' } }
53+
let(:deprecation_logger) { double("DeprecationLogger").as_null_object }
54+
55+
before(:each) do
56+
allow(plugin_class).to receive(:deprecation_logger).and_return(deprecation_logger)
57+
allow(LogStash::Plugin).to receive(:lookup).with("codec", codec_class.config_name).and_return(codec_class)
58+
end
59+
60+
it 'instantiates the codec' do
61+
success, codec = plugin_class.validate_value(codec_class.config_name, :codec)
62+
63+
expect(success).to be true
64+
expect(codec.class).to eq(codec_class)
65+
end
66+
67+
it 'logs a deprecation' do
68+
plugin_class.validate_value(codec_class.config_name, :codec)
69+
expect(deprecation_logger).to have_received(:deprecated) do |message|
70+
expect(message).to include("validate_value(String, :codec)")
71+
end
72+
end
73+
end
74+
5075
context "when validating :bytes successfully" do
5176
subject do
5277
local_num_bytes = num_bytes # needs to be locally scoped :(
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Licensed to Elasticsearch B.V. under one or more contributor
2+
# license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright
4+
# ownership. Elasticsearch B.V. licenses this file to you under
5+
# the Apache License, Version 2.0 (the "License"); you may
6+
# not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
require "spec_helper"
19+
20+
describe LogStash::Plugins::ExecutionContextFactory do
21+
let(:pipeline) { double('Pipeline') }
22+
let(:agent) { double('Agent') }
23+
let(:inner_dlq_writer) { nil }
24+
25+
subject(:factory) { described_class.new(agent, pipeline, inner_dlq_writer) }
26+
27+
context '#create' do
28+
let(:plugin_id) { SecureRandom.uuid }
29+
let(:plugin_type) { 'input' }
30+
31+
context 'the resulting instance' do
32+
subject(:instance) { factory.create(plugin_id, plugin_type) }
33+
34+
it 'retains the pipeline from the factory' do
35+
expect(instance.pipeline).to be(pipeline)
36+
end
37+
38+
it 'retains the agent from the factory' do
39+
expect(instance.agent).to be(agent)
40+
end
41+
42+
it 'has a dlq_writer' do
43+
expect(instance.dlq_writer).to_not be_nil
44+
end
45+
46+
context 'dlq_writer' do
47+
subject(:instance_dlq_writer) { instance.dlq_writer }
48+
49+
it 'retains the plugin id' do
50+
expect(instance_dlq_writer.plugin_id).to eq(plugin_id)
51+
end
52+
53+
it 'retains the plugin type' do
54+
expect(instance_dlq_writer.plugin_type).to eq(plugin_type)
55+
end
56+
end
57+
end
58+
end
59+
end

0 commit comments

Comments
 (0)