Skip to content

LoomKafkaConsumer::close let closing task finish before interrupting #3957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

maschmid
Copy link
Contributor

Seeing quite a lot of InterruptedExceptions when deleting triggers,

{"@timestamp":"2024-06-27T12:26:59.548Z","@version":"1","message":"[Consumer clientId=consumer-knative-trigger-kb-newsubs-k8s-0-connection0-363-1179, groupId=knative-trigger-kb-newsubs-k8s-0-connection0-363] Failed to close coordinator","logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"","level":"ERROR","level_value":40000,"stack_trace":"org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException\n\tat org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:535)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:296)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:353)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:1100)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:1048)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2411)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2378)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2351)\n\tat dev.knative.eventing.kafka.broker.dispatcherloom.LoomKafkaConsumer.lambda$close$1(LoomKafkaConsumer.java:100)\n\tat dev.knative.eventing.kafka.broker.dispatcherloom.LoomKafkaConsumer.processTaskQueue(LoomKafkaConsumer.java:69)\n\tat java.base/java.lang.VirtualThread.run(VirtualThread.java:309)\nCaused by: java.lang.InterruptedException: null\n\t... 13 common frames omitted\n"}
{"@timestamp":"2024-06-27T12:26:59.549Z","@version":"1","message":"Failed to close closeable","logger_name":"dev.knative.eventing.kafka.broker.core.AsyncCloseable","thread_name":"","level":"WARN","level_value":30000,"stack_trace":"org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException\n\tat org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:535)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:296)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:353)\n\tat org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:1100)\n\tat org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:1048)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2411)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2378)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2351)\n\tat dev.knative.eventing.kafka.broker.dispatcherloom.LoomKafkaConsumer.lambda$close$1(LoomKafkaConsumer.java:100)\n\tat dev.knative.eventing.kafka.broker.dispatcherloom.LoomKafkaConsumer.processTaskQueue(LoomKafkaConsumer.java:69)\n\tat java.base/java.lang.VirtualThread.run(VirtualThread.java:309)\nCaused by: java.lang.InterruptedException: null\n\t... 13 common frames omitted\n"}

The LoomKafkaConsumer::close does not wait for the close task to finish before interrupting (it only waits for the queue to be empty, but it is empty while it's processing the last task)

Proposed Changes

  • 🐛 Add an additional 2000ms wait if the last task is not finished yet when closing

@knative-prow knative-prow bot requested review from creydr and lionelvillard June 27, 2024 12:43
@knative-prow knative-prow bot added the size/S Denotes a PR that changes 10-29 lines, ignoring generated files. label Jun 27, 2024
Copy link

codecov bot commented Jun 27, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 48.24%. Comparing base (2b755b0) to head (bb70509).

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #3957   +/-   ##
=======================================
  Coverage   48.24%   48.24%           
=======================================
  Files         246      246           
  Lines       14500    14500           
=======================================
  Hits         6995     6995           
  Misses       6799     6799           
  Partials      706      706           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Member

@pierDipi pierDipi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

/lgtm
/approve

@knative-prow knative-prow bot added the lgtm Indicates that a PR is ready to be merged. label Jun 27, 2024
Copy link

knative-prow bot commented Jun 27, 2024

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: maschmid, pierDipi

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@knative-prow knative-prow bot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Jun 27, 2024
@pierDipi
Copy link
Member

/test reconciler-tests-keda

Comment on lines +127 to -125
logger.debug("Queue is empty");

if (!isFinished.get()) {
logger.debug("Background thread not finished yet, waiting for it to complete");
Thread.sleep(2000L);

if (!isFinished.get()) {
logger.debug("Background thread still not finished yet, interrupting background thread");
taskRunnerThread.interrupt();
}
}

taskRunnerThread.interrupt();
Copy link
Member

@pierDipi pierDipi Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also consider using "isFinished" in the while above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now I wanted to preserve the logic of "wait forever for all the tasks except the last one and then give 2s for the last task".

If we are OK to never interrupt, we could just wait until !isFinished.get()

@pierDipi
Copy link
Member

pierDipi commented Jun 27, 2024

@maschmid we might have the same issue here

try {
while (!eventQueue.isEmpty()) {
logger.debug("Waiting for the eventQueue to become empty");
Thread.sleep(2000L);
}
logger.debug("Interrupting sendFromQueueThread thread");
sendFromQueueThread.interrupt();
logger.debug("Waiting for sendFromQueueThread thread to complete");
sendFromQueueThread.join();
logger.debug("Closing the producer");
producer.close();
closePromise.complete();

@knative-prow knative-prow bot merged commit e413d50 into knative-extensions:main Jun 27, 2024
33 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by an approver from all required OWNERS files. area/data-plane lgtm Indicates that a PR is ready to be merged. size/S Denotes a PR that changes 10-29 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants