Skip to content

Commit 49b11cb

Browse files
committed
check if ES is licensed before pushing out events to it
* don't mark connection as not alive. only log a warn
1 parent 0c81b8c commit 49b11cb

File tree

4 files changed

+86
-5
lines changed

4 files changed

+86
-5
lines changed

lib/logstash/outputs/elasticsearch.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,10 @@ def close
244244
@client.close if @client
245245
end
246246

247+
def self.oss?
248+
LogStash::OSS
249+
end
250+
247251
@@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch-/ }
248252

249253
@@plugins.each do |plugin|

lib/logstash/outputs/elasticsearch/http_client/pool.rb

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def message
3131
attr_reader :logger, :adapter, :sniffing, :sniffer_delay, :resurrect_delay, :healthcheck_path, :sniffing_path, :bulk_path
3232

3333
ROOT_URI_PATH = '/'.freeze
34+
LICENSE_PATH = '/_license'.freeze
3435

3536
DEFAULT_OPTIONS = {
3637
:healthcheck_path => ROOT_URI_PATH,
@@ -66,13 +67,21 @@ def initialize(logger, adapter, initial_urls=[], options={})
6667
@url_info = {}
6768
@stopping = false
6869
end
70+
71+
def oss?
72+
LogStash::Outputs::ElasticSearch.oss?
73+
end
6974

7075
def start
71-
update_urls(@initial_urls)
76+
update_initial_urls
7277
start_resurrectionist
7378
start_sniffer if @sniffing
7479
end
7580

81+
def update_initial_urls
82+
update_urls(@initial_urls)
83+
end
84+
7685
def close
7786
@state_mutex.synchronize { @stopping = true }
7887

@@ -101,7 +110,7 @@ def in_use_connections
101110
end
102111

103112
def alive_urls_count
104-
@state_mutex.synchronize { @url_info.values.select {|v| !v[:state] == :alive }.count }
113+
@state_mutex.synchronize { @url_info.values.select {|v| v[:state] == :alive }.count }
105114
end
106115

107116
def url_info
@@ -236,13 +245,29 @@ def start_resurrectionist
236245
end
237246
end
238247

248+
def get_license(url)
249+
response = perform_request_to_url(url, :get, LICENSE_PATH)
250+
LogStash::Json.load(response.body)
251+
end
252+
253+
def valid_es_license?(url)
254+
license = get_license(url)
255+
license.fetch("license", {}).fetch("status", nil) == "active"
256+
rescue => e
257+
false
258+
end
259+
260+
def health_check_request(url)
261+
perform_request_to_url(url, :head, @healthcheck_path)
262+
end
263+
239264
def healthcheck!
240265
# Try to keep locking granularity low such that we don't affect IO...
241266
@state_mutex.synchronize { @url_info.select {|url,meta| meta[:state] != :alive } }.each do |url,meta|
242267
begin
243268
logger.debug("Running health check to see if an Elasticsearch connection is working",
244269
:healthcheck_url => url, :path => @healthcheck_path)
245-
response = perform_request_to_url(url, :head, @healthcheck_path)
270+
health_check_request(url)
246271
# If no exception was raised it must have succeeded!
247272
logger.warn("Restored connection to ES instance", :url => url.sanitized.to_s)
248273
# We reconnected to this node, check its ES version
@@ -254,10 +279,22 @@ def healthcheck!
254279
@logger.info("ES Output version determined", :es_version => major)
255280
set_new_major_version(major)
256281
elsif major > @maximum_seen_major_version
257-
@logger.warn("Detected a node with a higher major version than previously observed. This could be the result of an elasticsearch cluster upgrade.", :previous_major => @maximum_seen_major_version, :new_major => major, :node_url => url)
282+
@logger.warn("Detected a node with a higher major version than previously observed. This could be the result of an elasticsearch cluster upgrade.", :previous_major => @maximum_seen_major_version, :new_major => major, :node_url => url.sanitized.to_s)
258283
set_new_major_version(major)
259284
end
260-
meta[:state] = :alive
285+
if oss? || valid_es_license?(url)
286+
meta[:state] = :alive
287+
else
288+
# As this version is to be shipped with Logstash 7.x we won't mark the connection as unlicensed
289+
#
290+
# logger.error("Cannot connect to the Elasticsearch cluster configured in the Elasticsearch output. Logstash requires the default distribution of Elasticsearch. Please update to the default distribution of Elasticsearch for full access to all free features, or switch to the OSS distribution of Logstash.", :url => url.sanitized.to_s)
291+
# meta[:state] = :unlicensed
292+
#
293+
# Instead we'll log a deprecation warning and mark it as alive:
294+
#
295+
log_license_deprecation_warn(url)
296+
meta[:state] = :alive
297+
end
261298
end
262299
rescue HostUnreachableError, BadResponseCodeError => e
263300
logger.warn("Attempted to resurrect connection to dead ES instance, but got an error.", url: url.sanitized.to_s, error_type: e.class, error: e.message)
@@ -269,6 +306,10 @@ def stop_resurrectionist
269306
@resurrectionist.join if @resurrectionist
270307
end
271308

309+
def log_license_deprecation_warn(url)
310+
logger.warn("DEPRECATION WARNING: Connecting to an OSS distribution of Elasticsearch using the default distribution of Logstash will stop working in Logstash 8.0.0. Please upgrade to the default distribution of Elasticsearch, or use the OSS distribution of Logstash", :url => url.sanitized.to_s)
311+
end
312+
272313
def resurrectionist_alive?
273314
@resurrectionist ? @resurrectionist.alive? : nil
274315
end

spec/es_spec_helper.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@
99

1010
require 'json'
1111

12+
unless defined?(LogStash::OSS)
13+
LogStash::OSS = ENV['DISTRIBUTION'] != "default"
14+
end
15+
1216
module ESHelper
1317
def get_host_port
1418
if ENV["INTEGRATION"] == "true"

spec/unit/outputs/elasticsearch/http_client/pool_spec.rb

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,15 @@
88
let(:initial_urls) { [::LogStash::Util::SafeURI.new("http://localhost:9200")] }
99
let(:options) { {:resurrect_delay => 2, :url_normalizer => proc {|u| u}} } # Shorten the delay a bit to speed up tests
1010
let(:es_node_versions) { [ "0.0.0" ] }
11+
let(:oss) { true }
12+
let(:valid_license) { true }
1113

1214
subject { described_class.new(logger, adapter, initial_urls, options) }
1315

1416
let(:manticore_double) { double("manticore a") }
1517
before(:each) do
1618

19+
allow(::LogStash::Outputs::ElasticSearch).to receive(:oss?).and_return(oss)
1720
response_double = double("manticore response").as_null_object
1821
# Allow healtchecks
1922
allow(manticore_double).to receive(:head).with(any_args).and_return(response_double)
@@ -23,6 +26,8 @@
2326
allow(::Manticore::Client).to receive(:new).and_return(manticore_double)
2427

2528
allow(subject).to receive(:get_es_version).with(any_args).and_return(*es_node_versions)
29+
allow(subject).to receive(:oss?).and_return(oss)
30+
allow(subject).to receive(:valid_es_license?).and_return(valid_license)
2631
end
2732

2833
after do
@@ -235,4 +240,31 @@
235240
end
236241
end
237242
end
243+
244+
describe "license checking" do
245+
before(:each) do
246+
allow(subject).to receive(:health_check_request)
247+
end
248+
context "when using default logstash distribution" do
249+
let(:oss) { false }
250+
context "if ES doesn't return a valid license" do
251+
let(:valid_license) { false }
252+
it "marks the url as active" do
253+
subject.update_initial_urls
254+
expect(subject.alive_urls_count).to eq(1)
255+
end
256+
it "logs a warning" do
257+
expect(subject).to receive(:log_license_deprecation_warn).once
258+
subject.update_initial_urls
259+
end
260+
end
261+
context "if ES returns a valid license" do
262+
let(:valid_license) { true }
263+
it "marks the url as active" do
264+
subject.update_initial_urls
265+
expect(subject.alive_urls_count).to eq(1)
266+
end
267+
end
268+
end
269+
end
238270
end

0 commit comments

Comments
 (0)