|
50 | 50 | subject.close
|
51 | 51 | end
|
52 | 52 |
|
| 53 | + context "check aborting of a batch" do |
| 54 | + context "on an unreachable ES instance" do |
| 55 | + let(:events) { [ ::LogStash::Event.new("foo" => "bar1"), ::LogStash::Event.new("foo" => "bar2") ] } |
| 56 | + |
| 57 | + let(:shutdown_value) { true } |
| 58 | + |
| 59 | + let(:logger) { double("logger") } |
| 60 | + |
| 61 | + before(:each) do |
| 62 | + allow(subject).to receive(:logger).and_return(logger) |
| 63 | + allow(logger).to receive(:info) |
| 64 | + |
| 65 | + allow(subject).to receive(:pipeline_shutdown_requested?) do |
| 66 | + shutdown_value |
| 67 | + end |
| 68 | + end |
| 69 | + |
| 70 | + it "the #multi_receive abort while waiting on unreachable and a shutdown is requested" do |
| 71 | + expect { subject.multi_receive(events) }.to raise_error(org.logstash.execution.AbortedBatchException) |
| 72 | + expect(logger).to have_received(:info).with(/Aborting the batch due to shutdown request while waiting for connections to become live/i) |
| 73 | + end |
| 74 | + end |
| 75 | + |
| 76 | + context "when a connected ES becomes unreachable with 429 errors" do |
| 77 | + let(:options) { |
| 78 | + { |
| 79 | + "index" => "my-index", |
| 80 | + "hosts" => ["localhost","localhost:9202"], |
| 81 | + "path" => "some-path", |
| 82 | + "manage_template" => false |
| 83 | + } |
| 84 | + } |
| 85 | + |
| 86 | + let(:manticore_urls) { subject.client.pool.urls } |
| 87 | + let(:manticore_url) { manticore_urls.first } |
| 88 | + |
| 89 | + let(:stub_http_client_pool!) do |
| 90 | + [:start_resurrectionist, :start_sniffer, :healthcheck!].each do |method| |
| 91 | + allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(method) |
| 92 | + end |
| 93 | + end |
| 94 | + |
| 95 | + let(:event) { ::LogStash::Event.new("foo" => "bar") } |
| 96 | + let(:error) do |
| 97 | + ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new( |
| 98 | + 429, double("url").as_null_object, request_body, double("response body") |
| 99 | + ) |
| 100 | + end |
| 101 | + let(:logger) { double("logger").as_null_object } |
| 102 | + let(:response) { { :errors => [], :items => [] } } |
| 103 | + |
| 104 | + let(:request_body) { double(:request_body, :bytesize => 1023) } |
| 105 | + |
| 106 | + before(:each) do |
| 107 | + bulk_param = [["index", anything, event.to_hash]] |
| 108 | + |
| 109 | + allow(subject).to receive(:logger).and_return(logger) |
| 110 | + |
| 111 | + # fail consistently for ever |
| 112 | + allow(subject.client).to receive(:bulk).with(bulk_param).and_raise(error) |
| 113 | + end |
| 114 | + |
| 115 | + it "should exit the retry with an abort exception if shutdown is requested" do |
| 116 | + # execute in another thread because it blocks in a retry loop until the shutdown is triggered |
| 117 | + th = Thread.new do |
| 118 | + subject.multi_receive([event]) |
| 119 | + rescue org.logstash.execution.AbortedBatchException => e |
| 120 | + # return exception's class so that it can be verified when retrieving the thread's value |
| 121 | + e.class |
| 122 | + end |
| 123 | + |
| 124 | + # trigger the shutdown signal |
| 125 | + allow(subject).to receive(:pipeline_shutdown_requested?) { true } |
| 126 | + |
| 127 | + expect(th.value).to eq(org.logstash.execution.AbortedBatchException) |
| 128 | + end |
| 129 | + end |
| 130 | + end if LOGSTASH_VERSION >= '8.8' |
53 | 131 |
|
54 | 132 | context "with an active instance" do
|
55 | 133 | let(:options) {
|
|
0 commit comments