Skip to content

Tolerate the elasticsearch-ruby v8 client in integration tests. #1208

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 34 additions & 22 deletions spec/es_spec_helper.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
require_relative './spec_helper'

require 'elasticsearch'
require_relative "support/elasticsearch/api/actions/delete_ilm_policy"
require_relative "support/elasticsearch/api/actions/get_alias"
require_relative "support/elasticsearch/api/actions/put_alias"
require_relative "support/elasticsearch/api/actions/get_ilm_policy"
require_relative "support/elasticsearch/api/actions/put_ilm_policy"

require 'json'
require 'cabin'

# remove this condition and support package once plugin starts consuming elasticsearch-ruby v8 client
# in elasticsearch-ruby v7, ILM APIs were in a separate xpack gem, now directly available
unless elastic_ruby_v8_client_available?
require_relative "support/elasticsearch/api/actions/delete_ilm_policy"
require_relative "support/elasticsearch/api/actions/get_ilm_policy"
require_relative "support/elasticsearch/api/actions/put_ilm_policy"
end

module ESHelper
def get_host_port
if ENV["INTEGRATION"] == "true"
Expand All @@ -20,8 +23,12 @@ def get_host_port
end

def get_client
Elasticsearch::Client.new(:hosts => [get_host_port]).tap do |client|
allow(client).to receive(:verify_elasticsearch).and_return(true) # bypass client side version checking
if elastic_ruby_v8_client_available?
Elasticsearch::Client.new(:hosts => [get_host_port])
else
Elasticsearch::Client.new(:hosts => [get_host_port]).tap do |client|
allow(client).to receive(:verify_elasticsearch).and_return(true) # bypass client side version checking
end
end
end

Expand Down Expand Up @@ -126,31 +133,36 @@ def get_cluster_settings(client)
end

def get_policy(client, policy_name)
client.get_ilm_policy(name: policy_name)
if elastic_ruby_v8_client_available?
client.index_lifecycle_management.get_lifecycle(policy: policy_name)
else
client.get_ilm_policy(name: policy_name)
end
end

def put_policy(client, policy_name, policy)
client.put_ilm_policy({:name => policy_name, :body=> policy})
end

def put_alias(client, the_alias, index)
body = {
"aliases" => {
index => {
"is_write_index"=> true
}
}
}
client.put_alias({name: the_alias, body: body})
if elastic_ruby_v8_client_available?
client.index_lifecycle_management.put_lifecycle({:policy => policy_name, :body=> policy})
else
client.put_ilm_policy({:name => policy_name, :body=> policy})
end
end

def clean_ilm(client)
client.get_ilm_policy.each_key { |key| client.delete_ilm_policy(name: key) if key =~ /logstash-policy/ }
if elastic_ruby_v8_client_available?
client.index_lifecycle_management.get_lifecycle.each_key { |key| client.index_lifecycle_management.delete_lifecycle(policy: key) if key =~ /logstash-policy/ }
else
client.get_ilm_policy.each_key { |key| client.delete_ilm_policy(name: key) if key =~ /logstash-policy/ }
end
end

def supports_ilm?(client)
begin
client.get_ilm_policy
if elastic_ruby_v8_client_available?
client.index_lifecycle_management.get_lifecycle
else
client.get_ilm_policy
end
true
rescue
false
Expand Down
8 changes: 4 additions & 4 deletions spec/integration/outputs/delete_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,25 @@
it "should ignore non-monotonic external version updates" do
id = "ev2"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "index", "message" => "foo", "my_version" => 99)])
r = es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true)
r = es.get(:index => 'logstash-delete', :id => id, :refresh => true)
expect(r['_version']).to eq(99)
expect(r['_source']['message']).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "delete", "message" => "foo", "my_version" => 98)])
r2 = es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true)
r2 = es.get(:index => 'logstash-delete', :id => id, :refresh => true)
expect(r2['_version']).to eq(99)
expect(r2['_source']['message']).to eq('foo')
end

it "should commit monotonic external version updates" do
id = "ev3"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "index", "message" => "foo", "my_version" => 99)])
r = es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true)
r = es.get(:index => 'logstash-delete', :id => id, :refresh => true)
expect(r['_version']).to eq(99)
expect(r['_source']['message']).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "delete", "message" => "foo", "my_version" => 100)])
expect { es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true) }.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect { es.get(:index => 'logstash-delete', :id => id, :refresh => true) }.to raise_error(get_expected_error_class)
end
end
end
30 changes: 18 additions & 12 deletions spec/integration/outputs/ilm_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
it 'should not install the default policy' do
subject.register
sleep(1)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
end

it 'should not write the ILM settings into the template' do
Expand Down Expand Up @@ -282,12 +282,12 @@
subject.register
sleep(1)
expect(@es.indices.exists_alias(name: "logstash")).to be_truthy
expect(@es.get_alias(name: "logstash")).to include("logstash-000001")
expect(@es.indices.get_alias(name: "logstash")).to include("logstash-000001")
end
end

it 'should install it if it is not present' do
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
subject.register
sleep(1)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.not_to raise_error
Expand All @@ -298,7 +298,7 @@
subject.register
sleep(1)
expect(@es.indices.exists_alias(name: "logstash")).to be_truthy
expect(@es.get_alias(name: "logstash")).to include("logstash-#{todays_date}-000001")
expect(@es.indices.get_alias(name: "logstash")).to include("logstash-#{todays_date}-000001")
end

it 'should ingest into a single index' do
Expand Down Expand Up @@ -340,14 +340,14 @@
let (:policy) { small_max_doc_policy }

before do
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
put_policy(@es,ilm_policy_name, policy)
end

it 'should not install the default policy if it is not used' do
subject.register
sleep(1)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
end
end

Expand All @@ -357,14 +357,14 @@
let (:policy) { max_age_policy("1d") }

before do
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
put_policy(@es,ilm_policy_name, policy)
end

it 'should not install the default policy if it is not used' do
subject.register
sleep(1)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
end
end

Expand All @@ -374,7 +374,7 @@
subject.register
sleep(1)
expect(@es.indices.exists_alias(name: expected_index)).to be_truthy
expect(@es.get_alias(name: expected_index)).to include("#{expected_index}-#{todays_date}-000001")
expect(@es.indices.get_alias(name: expected_index)).to include("#{expected_index}-#{todays_date}-000001")
end

it 'should write the ILM settings into the template' do
Expand Down Expand Up @@ -443,17 +443,18 @@
subject.register
sleep(1)
expect(@es.indices.exists_alias(name: ilm_rollover_alias)).to be_truthy
expect(@es.get_alias(name: ilm_rollover_alias)).to include("#{ilm_rollover_alias}-#{todays_date}-000001")
expect(@es.indices.get_alias(name: ilm_rollover_alias)).to include("#{ilm_rollover_alias}-#{todays_date}-000001")
end

context 'when the custom rollover alias already exists' do
it 'should ignore the already exists error' do
expect(@es.indices.exists_alias(name: ilm_rollover_alias)).to be_falsey
put_alias(@es, "#{ilm_rollover_alias}-#{todays_date}-000001", ilm_rollover_alias)
@es.indices.create(index: "#{ilm_rollover_alias}-#{todays_date}-000001")
@es.indices.put_alias(name: ilm_rollover_alias, index: "#{ilm_rollover_alias}-#{todays_date}-000001")
expect(@es.indices.exists_alias(name: ilm_rollover_alias)).to be_truthy
subject.register
sleep(1)
expect(@es.get_alias(name: ilm_rollover_alias)).to include("#{ilm_rollover_alias}-#{todays_date}-000001")
expect(@es.indices.get_alias(name: ilm_rollover_alias)).to include("#{ilm_rollover_alias}-#{todays_date}-000001")
end

end
Expand Down Expand Up @@ -532,3 +533,8 @@
end

end

def get_expected_error_class
return Elastic::Transport::Transport::Errors::NotFound if elastic_ruby_v8_client_available?
Elasticsearch::Transport::Transport::Errors::NotFound
end
14 changes: 7 additions & 7 deletions spec/integration/outputs/index_version_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@

it "should default to ES version" do
subject.multi_receive([LogStash::Event.new("my_id" => "123", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => doc_type, :id => "123", :refresh => true)
r = es.get(:index => 'logstash-index', :id => '123', :refresh => true)
expect(r["_version"]).to eq(1)
expect(r["_source"]["message"]).to eq('foo')
subject.multi_receive([LogStash::Event.new("my_id" => "123", "message" => "foobar")])
r2 = es.get(:index => 'logstash-index', :type => doc_type, :id => "123", :refresh => true)
r2 = es.get(:index => 'logstash-index', :id => '123', :refresh => true)
expect(r2["_version"]).to eq(2)
expect(r2["_source"]["message"]).to eq('foobar')
end
Expand All @@ -63,33 +63,33 @@
it "should respect the external version" do
id = "ev1"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
r = es.get(:index => 'logstash-index', :id => id, :refresh => true)
expect(r["_version"]).to eq(99)
expect(r["_source"]["message"]).to eq('foo')
end

it "should ignore non-monotonic external version updates" do
id = "ev2"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
r = es.get(:index => 'logstash-index', :id => id, :refresh => true)
expect(r["_version"]).to eq(99)
expect(r["_source"]["message"]).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "98", "message" => "foo")])
r2 = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
r2 = es.get(:index => 'logstash-index', :id => id, :refresh => true)
expect(r2["_version"]).to eq(99)
expect(r2["_source"]["message"]).to eq('foo')
end

it "should commit monotonic external version updates" do
id = "ev3"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
r = es.get(:index => 'logstash-index', :id => id, :refresh => true)
expect(r["_version"]).to eq(99)
expect(r["_source"]["message"]).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "100", "message" => "foo")])
r2 = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
r2 = es.get(:index => 'logstash-index', :id => id, :refresh => true)
expect(r2["_version"]).to eq(100)
expect(r2["_source"]["message"]).to eq('foo')
end
Expand Down
21 changes: 11 additions & 10 deletions spec/integration/outputs/painless_update_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ def get_es_output( options={} )
# This can fail if there are no indexes, ignore failure.
@es.indices.delete(:index => "*") rescue nil
@es.index(
:index => 'logstash-update',
:type => doc_type,
:id => "123",
:body => { :message => 'Test', :counter => 1 }
)
{
:index => 'logstash-update',
:id => '123',
:body => { :message => 'Test', :counter => 1 },
:refresh => true
})
@es.indices.refresh
end

Expand All @@ -46,7 +47,7 @@ def get_es_output( options={} )
subject = get_es_output(plugin_parameters)
subject.register
subject.multi_receive([LogStash::Event.new("count" => 4 )])
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "123", :refresh => true)
r = @es.get(:index => 'logstash-update', :id => "123", :refresh => true)
expect(r["_source"]["counter"]).to eq(5)
end
end
Expand All @@ -57,15 +58,15 @@ def get_es_output( options={} )
subject = get_es_output({ 'document_id' => "456", 'upsert' => '{"message": "upsert message"}' })
subject.register
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :id => "456", :refresh => true)
expect(r["_source"]["message"]).to eq('upsert message')
end

it "should create new documents with event/doc as upsert" do
subject = get_es_output({ 'document_id' => "456", 'doc_as_upsert' => true })
subject.register
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :id => "456", :refresh => true)
expect(r["_source"]["message"]).to eq('sample message here')
end

Expand All @@ -82,7 +83,7 @@ def get_es_output( options={} )
subject.register

subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :id => "456", :refresh => true)
expect(r["_source"]["message"]).to eq('upsert message')
end

Expand All @@ -91,7 +92,7 @@ def get_es_output( options={} )
subject.register
subject.multi_receive([LogStash::Event.new("counter" => 1)])
@es.indices.refresh
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
r = @es.get(:index => 'logstash-update', :id => "456", :refresh => true)
expect(r["_source"]["counter"]).to eq(1)
end
end
Expand Down
25 changes: 15 additions & 10 deletions spec/integration/outputs/unsupported_actions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,21 @@ def get_es_output( options={} )
@es.indices.delete(:index => "*") rescue nil
# index single doc for update purpose
@es.index(
:index => INDEX,
:type => doc_type,
:id => "2",
:body => { :message => 'Test to doc indexing', :counter => 1 }
{
:index => INDEX,
:id => '2',
:body => { :message => 'Test to doc indexing', :counter => 1 },
:refresh => true
}
)

@es.index(
:index => INDEX,
:type => doc_type,
:id => "3",
:body => { :message => 'Test to doc deletion', :counter => 2 }
{
:index => INDEX,
:id => '3',
:body => { :message => 'Test to doc deletion', :counter => 2 },
:refresh => true
}
)
@es.indices.refresh
end
Expand All @@ -63,12 +68,12 @@ def get_es_output( options={} )
rejected_events = events.select { |event| !index_or_update.call(event) }

indexed_events.each do |event|
response = @es.get(:index => INDEX, :type => doc_type, :id => event.get("doc_id"), :refresh => true)
response = @es.get(:index => INDEX, :id => event.get("doc_id"), :refresh => true)
expect(response['_source']['message']).to eq(event.get("message"))
end

rejected_events.each do |event|
expect {@es.get(:index => INDEX, :type => doc_type, :id => event.get("doc_id"), :refresh => true)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect {@es.get(:index => INDEX, :id => event.get("doc_id"), :refresh => true)}.to raise_error(get_expected_error_class)
end
end
end
Expand Down
Loading