forked from logstash-plugins/logstash-output-elasticsearch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathretry_spec.rb
182 lines (149 loc) · 6.7 KB
/
retry_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
require "logstash/outputs/elasticsearch"
require_relative "../../../spec/es_spec_helper"
describe "failures in bulk class expected behavior", :integration => true do
let(:template) { '{"template" : "not important, will be updated by :index"}' }
let(:event1) { LogStash::Event.new("somevalue" => 100, "@timestamp" => "2014-11-17T20:37:17.223Z", "@metadata" => {"retry_count" => 0}) }
let(:action1) do
if ESHelper.es_version_satisfies?("< 7")
ESHelper.action_for_version(["index", {:_id=>nil, routing_field_name =>nil, :_index=>"logstash-2014.11.17", :_type=> doc_type }, event1.to_hash])
else
ESHelper.action_for_version(["index", {:_id=>nil, routing_field_name =>nil, :_index=>"logstash-2014.11.17" }, event1.to_hash])
end
end
let(:event2) { LogStash::Event.new("geoip" => { "location" => [ 0.0, 0.0] }, "@timestamp" => "2014-11-17T20:37:17.223Z", "@metadata" => {"retry_count" => 0}) }
let(:action2) do
if ESHelper.es_version_satisfies?("< 7")
ESHelper.action_for_version(["index", {:_id=>nil, routing_field_name =>nil, :_index=>"logstash-2014.11.17", :_type=> doc_type }, event2.to_hash])
else
ESHelper.action_for_version(["index", {:_id=>nil, routing_field_name =>nil, :_index=>"logstash-2014.11.17" }, event2.to_hash])
end
end
let(:invalid_event) { LogStash::Event.new("geoip" => { "location" => "notlatlon" }, "@timestamp" => "2014-11-17T20:37:17.223Z") }
def mock_actions_with_response(*resp)
raise ArgumentError, "Cannot mock actions until subject is registered and has a client!" unless subject.client
expanded_responses = resp.map do |resp|
items = resp["statuses"] && resp["statuses"].map do |status|
{"create" => {"status" => status, "error" => "Error for #{status}"}}
end
{
"errors" => resp["errors"],
"items" => items
}
end
allow(subject.client).to receive(:bulk).and_return(*expanded_responses)
end
subject! do
settings = {
"manage_template" => true,
"index" => "logstash-2014.11.17",
"template_overwrite" => true,
"hosts" => get_host_port(),
"retry_max_interval" => 64,
"retry_initial_interval" => 2,
"ecs_compatibility" => "disabled", # specs are tightly tied to non-ECS defaults
}
next LogStash::Outputs::ElasticSearch.new(settings)
end
before :each do
# Delete all templates first.
require "elasticsearch"
allow(Stud).to receive(:stoppable_sleep)
# Clean ES of data before we start.
@es = get_client
@es.indices.delete_template(:name => "*")
@es.indices.delete(:index => "*")
@es.indices.refresh
end
after :each do
subject.close
end
it "should retry exactly once if all bulk actions are successful" do
expect(subject).to receive(:submit).with([action1, action2]).once.and_call_original
subject.register
mock_actions_with_response({"errors" => false})
subject.multi_receive([event1, event2])
end
it "retry exceptions within the submit body" do
call_count = 0
subject.register
expect(subject.client).to receive(:bulk).with(anything).exactly(3).times do
if (call_count += 1) <= 2
raise "error first two times"
else
{"errors" => false}
end
end
subject.multi_receive([event1])
end
it "should retry actions with response status of 503" do expect(subject).to receive(:submit).with([action1, action1, action1, action2]).ordered.once.and_call_original
expect(subject).to receive(:submit).with([action1, action2]).ordered.once.and_call_original
expect(subject).to receive(:submit).with([action2]).ordered.once.and_call_original
subject.register
mock_actions_with_response({"errors" => true, "statuses" => [200, 200, 503, 503]},
{"errors" => true, "statuses" => [200, 503]},
{"errors" => false})
subject.multi_receive([event1, event1, event1, event2])
end
retryable_codes = [429, 502, 503]
retryable_codes.each do |code|
it "should retry actions with response status of #{code}" do
subject.register
mock_actions_with_response({"errors" => true, "statuses" => [code]},
{"errors" => false})
expect(subject).to receive(:submit).with([action1]).twice.and_call_original
subject.multi_receive([event1])
end
end
it "should retry an event infinitely until a non retryable status occurs" do
expect(subject).to receive(:submit).with([action1]).exactly(6).times.and_call_original
subject.register
mock_actions_with_response({"errors" => true, "statuses" => [429]},
{"errors" => true, "statuses" => [429]},
{"errors" => true, "statuses" => [429]},
{"errors" => true, "statuses" => [429]},
{"errors" => true, "statuses" => [429]},
{"errors" => true, "statuses" => [400]})
subject.multi_receive([event1])
end
it "should sleep for an exponentially increasing amount of time on each retry, capped by the max" do
[2, 4, 8, 16, 32, 64, 64].each_with_index do |interval,i|
expect(Stud).to receive(:stoppable_sleep).with(interval).ordered
end
subject.register
mock_actions_with_response({"errors" => true, "statuses" => [429]},
{"errors" => true, "statuses" => [429]},
{"errors" => true, "statuses" => [429]},
{"errors" => true, "statuses" => [429]},
{"errors" => true, "statuses" => [429]},
{"errors" => true, "statuses" => [429]},
{"errors" => true, "statuses" => [429]},
{"errors" => true, "statuses" => [400]})
subject.multi_receive([event1])
end
it "non-retryable errors like mapping errors (400) should be dropped and not be retried (unfortunately)" do
subject.register
expect(subject).to receive(:submit).once.and_call_original
subject.multi_receive([invalid_event])
subject.close
@es.indices.refresh
r = @es.search(index: 'logstash-*')
expect(r).to have_hits(0)
end
it "successful requests should not be appended to retry queue" do
expect(subject).to receive(:submit).once.and_call_original
subject.register
subject.multi_receive([event1])
subject.close
@es.indices.refresh
r = @es.search(index: 'logstash-*')
expect(r).to have_hits(1)
end
it "should only index proper events" do
subject.register
subject.multi_receive([invalid_event, event1])
subject.close
@es.indices.refresh
r = @es.search(index: 'logstash-*')
expect(r).to have_hits(1)
end
end