Skip to content

Commit 6f8d7f7

Browse files
kareskarenzone
andauthored
Feat: adjust fields for ECS compatibility (#28)
Co-authored-by: Karen Metts <[email protected]>
1 parent d60cb06 commit 6f8d7f7

File tree

6 files changed

+155
-53
lines changed

6 files changed

+155
-53
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 3.1.0
2+
- Feat: adjust fields for ECS compatibility [#28](https://github.com/logstash-plugins/logstash-input-unix/pull/28)
3+
14
## 3.0.7
25
- Docs: Set the default_codec doc attribute.
36

docs/index.asciidoc

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,23 @@ Like `stdin` and `file` inputs, each event is assumed to be one line of text.
2828
Can either accept connections from clients or connect to a server,
2929
depending on `mode`.
3030

31+
[id="plugins-{type}s-{plugin}-ecs"]
32+
==== Compatibility with the Elastic Common Schema (ECS)
33+
34+
This plugin adds extra fields about the event's source.
35+
Configure the <<plugins-{type}s-{plugin}-ecs_compatibility>> option if you want
36+
to ensure that these fields are compatible with {ecs-ref}[ECS].
37+
38+
These fields are added after the event has been decoded by the appropriate codec,
39+
and will not overwrite existing values.
40+
41+
|========
42+
| ECS Disabled | ECS v1 , v8 | Description
43+
44+
| `host` | `[host][name]` | The name of the {ls} host that processed the event
45+
| `path` | `[file][path]` | The socket path configured in the plugin
46+
|========
47+
3148
[id="plugins-{type}s-{plugin}-options"]
3249
==== Unix Input Configuration Options
3350

@@ -37,6 +54,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
3754
|=======================================================================
3855
|Setting |Input type|Required
3956
| <<plugins-{type}s-{plugin}-data_timeout>> |<<number,number>>|No
57+
| <<plugins-{type}s-{plugin}-ecs_compatibility>> |<<string,string>>|No
4058
| <<plugins-{type}s-{plugin}-force_unlink>> |<<boolean,boolean>>|No
4159
| <<plugins-{type}s-{plugin}-mode>> |<<string,string>>, one of `["server", "client"]`|No
4260
| <<plugins-{type}s-{plugin}-path>> |<<string,string>>|Yes
@@ -59,6 +77,44 @@ more than this timeout period, we will assume it is dead and close it.
5977

6078
If you never want to timeout, use -1.
6179

80+
[id="plugins-{type}s-{plugin}-ecs_compatibility"]
81+
===== `ecs_compatibility`
82+
83+
* Value type is <<string,string>>
84+
* Supported values are:
85+
** `disabled`: uses backwards compatible field names, such as `[host]`
86+
** `v1`, `v8`: uses fields that are compatible with ECS, such as `[host][name]`
87+
88+
Controls this plugin's compatibility with the {ecs-ref}[Elastic Common Schema (ECS)].
89+
See <<plugins-{type}s-{plugin}-ecs>> for detailed information.
90+
91+
92+
**Sample output: ECS enabled**
93+
[source,ruby]
94+
-----
95+
{
96+
"@timestamp" => 2021-11-16T13:20:06.308Z,
97+
"file" => {
98+
"path" => "/tmp/sock41299"
99+
},
100+
"host" => {
101+
"name" => "deus-ex-machina"
102+
},
103+
"message" => "foo"
104+
}
105+
-----
106+
107+
**Sample output: ECS disabled**
108+
[source,ruby]
109+
-----
110+
{
111+
"@timestamp" => 2021-11-16T13:20:06.308Z,
112+
"path" => "/tmp/sock41299",
113+
"host" => "deus-ex-machina",
114+
"message" => "foo"
115+
}
116+
-----
117+
62118
[id="plugins-{type}s-{plugin}-force_unlink"]
63119
===== `force_unlink`
64120

lib/logstash/inputs/unix.rb

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,20 @@
33
require "logstash/namespace"
44
require "logstash/util/socket_peer"
55

6+
require 'logstash/plugin_mixins/ecs_compatibility_support'
7+
68
# Read events over a UNIX socket.
79
#
810
# Like `stdin` and `file` inputs, each event is assumed to be one line of text.
911
#
1012
# Can either accept connections from clients or connect to a server,
1113
# depending on `mode`.
1214
class LogStash::Inputs::Unix < LogStash::Inputs::Base
15+
16+
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
17+
1318
class Interrupted < StandardError; end
19+
1420
config_name "unix"
1521

1622
default :codec, "line"
@@ -38,8 +44,11 @@ class Interrupted < StandardError; end
3844
# This setting is only used if `mode` is `client`.
3945
config :socket_not_present_retry_interval_seconds, :validate => :number, :required => true, :default => 5
4046

41-
def initialize(*args)
42-
super(*args)
47+
def initialize(*params)
48+
super
49+
50+
@host_name_field = ecs_select[disabled: 'host', v1: '[host][name]']
51+
@file_path_field = ecs_select[disabled: 'path', v1: '[file][path]']
4352
end # def initialize
4453

4554
public
@@ -48,7 +57,7 @@ def register
4857
require "timeout"
4958

5059
if server?
51-
@logger.info("Starting unix input listener", :address => "#{@path}", :force_unlink => "#{@force_unlink}")
60+
@logger.info("Starting unix input listener", :address => @path, :force_unlink => @force_unlink)
5261
begin
5362
@server_socket = UNIXServer.new(@path)
5463
rescue Errno::EADDRINUSE, IOError
@@ -58,18 +67,16 @@ def register
5867
@server_socket = UNIXServer.new(@path)
5968
return
6069
rescue Errno::EADDRINUSE, IOError
61-
@logger.error("!!!Could not start UNIX server: Address in use",
62-
:path => @path)
70+
@logger.error("Could not start UNIX server: address in use", :path => @path)
6371
raise
6472
end
6573
end
66-
@logger.error("Could not start UNIX server: Address in use",
67-
:path => @path)
74+
@logger.error("Could not start UNIX server: address in use", :path => @path)
6875
raise
6976
end
7077
else # client
71-
if @socket_not_present_retry_interval_seconds < 0
72-
@logger.warn("Value #{@socket_not_present_retry_interval_seconds} for socket_not_present_retry_interval_seconds is not valid, using default value of 5 instead")
78+
if socket_not_present_retry_interval_seconds < 0
79+
@logger.warn("Value #{socket_not_present_retry_interval_seconds} for socket_not_present_retry_interval_seconds is not valid, using default value of 5 instead")
7380
@socket_not_present_retry_interval_seconds = 5
7481
end
7582
end
@@ -93,16 +100,20 @@ def handle_socket(socket, output_queue)
93100
end
94101
@codec.decode(buf) do |event|
95102
decorate(event)
96-
event.set("host", hostname) unless event.include?("host")
97-
event.set("path", @path) unless event.include?("path")
103+
event.set(@host_name_field, hostname) unless event.include?(@host_name_field)
104+
event.set(@file_path_field, @path) unless event.include?(@file_path_field)
98105
output_queue << event
99106
end
100107
end
101-
rescue => e
102-
@logger.debug("Closing connection", :path => @path, :exception => e, :backtrace => e.backtrace)
103108
rescue Timeout::Error
104-
@logger.debug("Closing connection after read timeout", :path => @path)
105-
end # begin
109+
@logger.info("Closing connection after read timeout", :path => @path)
110+
rescue => e
111+
if @logger.debug?
112+
@logger.debug("Closing connection", :path => @path, :exception => e, :backtrace => e.backtrace)
113+
else
114+
@logger.info("Closing connection", :path => @path, :exception => e)
115+
end
116+
end
106117

107118
ensure
108119
begin
@@ -124,7 +135,7 @@ def run(output_queue)
124135
while !stop?
125136
# Start a new thread for each connection.
126137
@client_threads << Thread.start(@server_socket.accept) do |s|
127-
@logger.debug("Accepted connection", :server => "#{@path}")
138+
@logger.debug("Accepted connection", :server => @path)
128139
handle_socket(s, output_queue)
129140
end
130141
end
@@ -136,8 +147,8 @@ def run(output_queue)
136147
@logger.debug("Opened connection", :client => @path)
137148
handle_socket(@client_socket, output_queue)
138149
else
139-
@logger.warn("Socket not present, wait for #{@subscription_retry_interval_seconds} seconds for socket to appear", :client => @path)
140-
sleep @socket_not_present_retry_interval_seconds
150+
@logger.warn("Socket not present, wait for #{socket_not_present_retry_interval_seconds} seconds for socket to appear", :client => @path)
151+
sleep socket_not_present_retry_interval_seconds
141152
end
142153
end
143154
end
@@ -158,6 +169,6 @@ def stop
158169
rescue IOError
159170
# if socket with @mode == client was closed by the client, an other call to @client_socket.close
160171
# will raise an IOError. We catch IOError here and do nothing, just let logstash terminate
161-
@logger.warn("Cloud not close socket while Logstash is shutting down. Socket already closed by the other party?", :path => @path)
172+
@logger.warn("Could not close socket while Logstash is shutting down. Socket already closed by the other party?", :path => @path)
162173
end # def stop
163174
end # class LogStash::Inputs::Unix

logstash-input-unix.gemspec

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-input-unix'
4-
s.version = '3.0.7'
4+
s.version = '3.1.0'
55
s.licenses = ['Apache License (2.0)']
66
s.summary = "Reads events over a UNIX socket"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
@@ -21,8 +21,9 @@ Gem::Specification.new do |s|
2121

2222
# Gem dependencies
2323
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
24-
24+
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~> 1.3'
2525
s.add_runtime_dependency 'logstash-codec-line'
26+
2627
s.add_development_dependency 'logstash-devutils'
2728
end
2829

spec/inputs/unix_spec.rb

Lines changed: 58 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,95 @@
11
# encoding: utf-8
22
require_relative "../spec_helper"
33
require "logstash/devutils/rspec/shared_examples"
4+
require 'logstash/plugin_mixins/ecs_compatibility_support/spec_helper'
45
require "stud/temporary"
56
require "tempfile"
67

78
describe LogStash::Inputs::Unix do
89

9-
let(:tempfile) { Tempfile.new("/tmp/foo") }
10+
let(:config) { { 'path' => tempfile.path, 'socket_not_present_retry_interval_seconds' => 1, 'force_unlink' => true } }
11+
let(:tempfile) { Tempfile.new("unix-input-test") }
12+
13+
subject(:input) { described_class.new(config) }
1014

1115
it "should register without errors" do
12-
plugin = LogStash::Plugin.lookup("input", "unix").new({ "path" => tempfile.path, "force_unlink" => true })
13-
expect { plugin.register }.to_not raise_error
16+
expect { subject.register }.to_not raise_error
1417
end
1518

1619
describe "when mode is client" do
1720

18-
let(:mode) { "client" }
21+
let(:config) { super().merge("mode" => 'client', "socket_not_present_retry_interval_seconds" => -1) }
1922

2023
context "if socket_not_present_retry_interval_seconds is out of bounds" do
2124
it "should fallback to default value" do
22-
plugin = LogStash::Plugin.lookup("input", "unix").new({ "path" => tempfile.path, "force_unlink" => true, "mode" => mode, "socket_not_present_retry_interval_seconds" => -1 })
23-
plugin.register
24-
expect(plugin.instance_variable_get(:@socket_not_present_retry_interval_seconds)).to be 5
25+
subject.register
26+
expect( subject.socket_not_present_retry_interval_seconds ).to eql 5
2527
end
2628
end
2729
end
2830

29-
describe "when interrupting the plugin" do
31+
context "#server" do
32+
it_behaves_like "an interruptible input plugin" do
33+
let(:config) { super().merge "mode" => 'server' }
34+
end
35+
end
3036

31-
context "#server" do
32-
it_behaves_like "an interruptible input plugin" do
33-
let(:config) { { "path" => tempfile.path, "force_unlink" => true } }
34-
end
37+
context "#client", :ecs_compatibility_support do
38+
let(:temp_path) { "/tmp/sock#{rand(65532)}" }
39+
let(:config) { super().merge "path" => temp_path, "mode" => "client" }
40+
let(:unix_socket) { UnixSocketHelper.new('foo').new_socket(temp_path) }
41+
let(:run_forever) { true }
42+
43+
before(:each) do
44+
unix_socket.loop(run_forever)
3545
end
3646

37-
context "#client" do
38-
let(:tempfile) { "/tmp/sock#{rand(65532)}" }
39-
let(:config) { { "path" => tempfile, "mode" => "client" } }
40-
let(:unix_socket) { UnixSocketHelper.new.new_socket(tempfile) }
41-
let(:run_forever) { true }
47+
after(:each) do
48+
unix_socket.close
49+
end
4250

43-
before(:each) do
44-
unix_socket.loop(run_forever)
51+
context "when the unix socket has data to be read" do
52+
it_behaves_like "an interruptible input plugin" do
53+
let(:run_forever) { true }
4554
end
55+
end
4656

47-
after(:each) do
48-
unix_socket.close
57+
context "when the unix socket has no data to be read" do
58+
it_behaves_like "an interruptible input plugin" do
59+
let(:run_forever) { false }
4960
end
61+
end
62+
63+
ecs_compatibility_matrix(:disabled, :v1, :v8) do |ecs_select|
64+
65+
let(:config) { super().merge 'ecs_compatibility' => ecs_compatibility }
5066

51-
context "when the unix socket has data to be read" do
52-
it_behaves_like "an interruptible input plugin" do
53-
let(:run_forever) { true }
67+
let(:queue) { java.util.Vector.new }
68+
69+
it 'generates events with host, path and message set' do
70+
subject.register
71+
Thread.new(subject, queue) { |subject, queue| subject.run(queue) }
72+
try(10) do
73+
expect( queue.size ).to_not eql 0
5474
end
55-
end
75+
subject.do_stop # stop the plugin
5676

57-
context "when the unix socket has no data to be read" do
58-
it_behaves_like "an interruptible input plugin" do
59-
let(:run_forever) { false }
77+
event = queue.first
78+
79+
if ecs_select.active_mode == :disabled
80+
expect( event.get('host') ).to be_a String
81+
expect( event.get('path') ).to eql temp_path
82+
else
83+
expect( event.get('[host][name]') ).to be_a String
84+
expect( event.get('[file][path]') ).to eql temp_path
85+
expect( event.include?('path') ).to be false
6086
end
87+
88+
expect( event.get('message') ).to eql 'foo'
6189
end
90+
6291
end
6392

6493
end
94+
6595
end

spec/spec_helper.rb

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ class UnixSocketHelper
66

77
attr_reader :path
88

9-
def initialize
9+
def initialize(line = 'hi!')
1010
@socket = nil
11+
@line = line
1112
end
1213

1314
def new_socket(path)
@@ -21,9 +22,9 @@ def loop(forever=false)
2122
@thread = Thread.new do
2223
begin
2324
s = @socket.accept
24-
s.puts "hi" while forever
25-
rescue Errno::EPIPE, Errno::ECONNRESET
26-
# ...
25+
s.puts @line while forever
26+
rescue Errno::EPIPE, Errno::ECONNRESET => e
27+
warn e.inspect if $VERBOSE
2728
end
2829
end
2930
self

0 commit comments

Comments
 (0)