Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check pipeline shutdown status while waiting for valid connection or while issuing a bulk to ES #1119

Conversation

andsel
Copy link
Contributor

@andsel andsel commented Mar 8, 2023

Release notes

Introduce the ability to negatively acknowledge the batch under processing if the plugin is blocked in a retry-error-loop and a shutdown is requested.

What does this PR do?

Updates the wait_for_successful_connection method and safe_bulk to react to shutdown requests.
When the plugin is in a retry loop and Logstash it's running into provide the ability to negatively ACK the batch under processing and a pipeline shutdown was requested then the plugin terminate signalling that the batch under processing shouldn't be acknowledged, raising an AbortedBatchException.

Why is it important/What is the impact to the user?

Let the user to update the configuration of the plugin, when managed by CPM, resuming from a retry-loop due to bad configuration values (or expired credentials);

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • [ ] I have made corresponding changes to the documentation
  • [ ] I have made corresponding change to the default configuration files (and/or docker env variables)
  • I have added tests that prove my fix is effective or that my feature works

Author's Checklist

  • Test with cloud instance

How to test this PR locally

This PR has to be tested in lockstep with the changes on Logstash core applied with elastic/logstash#14940
Checkout a branch which contains that change or a release if already published.
The test plan needs to create an Elasticsearch instance in Elastic cloud, recording the following data:

  • cloud_id
  • cloud_auth
  • api_key

Test plan

After created the Elastic deployment execute the following steps:

  • checkout this branch
  • install this plugin in your Logstash instance
gem "logstash-output-elasticsearch", :path => "/path/to/logstash-output-elasticsearch"
bin/logstash-plugin install --no-verify
  • configure your Logstash's config/logstash.yml file to connect to central pipeline management (CPM) in Elastic cloud
xpack.management.enabled: true
xpack.management.pipeline.id: ["test*"]
xpack.management.elasticsearch.cloud_id: "<your cloud_id>"
xpack.management.elasticsearch.cloud_auth: "<your cloud_auth>"
  • in CPM create a new pipeline like the following:
input {
  tcp {
    port => 1234
  }
}

output {
  elasticsearch {
    cloud_id => "<your cloud_id>"
    api_key => "<your api_key>"
    index => "andsel_test"
    ssl => true
  }
  stdout{}
}
  • run Logstash (bin/logstash)
  • verify that it can process some input
echo "test input line" | netcat localhost 1234
  • now update the the pipeline definition on CPM, corrupting the api_key such as removing last char of the key (remember somewhere the correct key)
  • the output should start blaming a connectivity problem with log lines such as:
[2023-03-22T14:38:23,927][WARN ][logstash.outputs.elasticsearch][test_remote_1] Attempted to resurrect connection to dead ES instance, but got an error {:url=>"https://blabla.gcp.cloud.es.io:443/", :exception=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError, :message=>"Got response code '401' contacting Elasticsearch at URL 'https://blabla.gcp.cloud.es.io:443/'"}
  • in CPM restore the correct api_key. Without this fix the previous lines continues and no shutdown and restart of the pipeline happens, With this change you should fine a line like
[INFO ][org.logstash.execution.WorkerLoop][test_remote_1] Worker loop notified of aborting a batch

and the pipeline effectively reloads and is back to normal functioning.

Related issues

Use cases

As a user using the central pipeline management I want that once a credential is updated the pipeline running the plugin is effectively restarted with the updated values without manual intervention on the Logstash instance.

Screenshots

Logs

@andsel andsel force-pushed the fix/accept_shutdown_on_waiting_for_connections branch from 1f8222a to 092c961 Compare March 22, 2023 14:17
@andsel
Copy link
Contributor Author

andsel commented Mar 23, 2023

The tests for version 8.x are green because the selected version results to be 8.7.0-SNAPSHOT.
The testing against 8.8.0-SNAPSHOT will happen after the 8.7.0 release, when selected snapshot version change.

@andsel andsel marked this pull request as ready for review March 23, 2023 07:52
@roaksoax roaksoax requested a review from jsvd March 27, 2023 13:46
Copy link
Member

@jsvd jsvd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested a few scenarios:
A) Writing to ES when the API key is revoked and the Pipeline is updated with new API Key: pipeline restarts correctly ✅
B) Writing to ES when ES goes down, binds to a different port, Pipeline is updated with new port: the pipeline never terminates 🛑

B) seems to fail as the ressureccionist thread never terminates, likely due to https://github.com/logstash-plugins/logstash-output-elasticsearch/blob/main/lib/logstash/outputs/elasticsearch/http_client/pool.rb#L129C11-L146

@andsel
Copy link
Contributor Author

andsel commented Apr 3, 2023

To cover the case B the resurrectionist thread should exit when a pipeline shutdown is requested.
When a pipeline is shutdown, with the abort batch fix proposed by this PR, the Elasticsearch.close method should be invoked, which in chain (-> http_client.close -> pool.close and pool's close flip a boolean flag checked by the resurrectionist to exit)) shutdown everything.

From a local test, when the Elasticsearch is not reachable a can see these repeated log lines:

[2023-04-03T14:49:22,555][INFO ][logstash.outputs.elasticsearch][test_local_es] Failed to perform request {:message=>"Connect to localhost:9200 [localhost/127.0.0.1] failed: Connection refused (Connection refused)", :exception=>Manticore::SocketException, :cause=>#<Java::OrgApacheHttpConn::HttpHostConnectException: Connect to localhost:9200 [localhost/127.0.0.1] failed: Connection refused (Connection refused)>}
[2023-04-03T14:49:22,556][WARN ][logstash.outputs.elasticsearch][test_local_es] Attempted to resurrect connection to dead ES instance, but got an error {:url=>"https://elastic:xxxxxx@localhost:9200/", :exception=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :message=>"Elasticsearch Unreachable: [https://localhost:9200/][Manticore::SocketException] Connect to localhost:9200 [localhost/127.0.0.1] failed: Connection refused (Connection refused)"}

but when I update the pipeline config to point to the new port, it seems that it doesn't recognize the change because the pipeline is not reloadable. I think this is the source of the problem:

[2023-04-03T14:49:23,153][ERROR][logstash.agent           ] Failed to execute action {:id=>:test_local_es, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Cannot reload pipeline, because the existing pipeline is not reloadable", :backtrace=>nil}

Update

It's my fault, I've used stdin pluing

@andsel
Copy link
Contributor Author

andsel commented Apr 3, 2023

The stop method is invoked as a notification after all the pipeline workers has terminated. The @stopping at


is invoked after the plugin has terminated his #multi_process from the LS's pipeline during shutdown_workers execution.

A solution to exit the resurrectionist loop would be to propagate the shutdown_requested down to the ES pool so that the @stopping could be flipped both during #close and also when shutdown is requested.

@andsel
Copy link
Contributor Author

andsel commented Apr 4, 2023

With commit 5e57c3a was introduced the abort of a batch in case no connections are available in the pool and the pipeline is shutting down.

Copy link
Member

@jsvd jsvd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, tested manually a few scenarios, ran the unit tests without the necessary patches, everything is working as expected. :shipit:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

After config error, Logstash doesn't seem to restart broken pipeline with new config from Central Management
3 participants