Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch Listener when encountered a poison pill, records after that poison pill are moved to next poll. #3297

Closed
Tarun4572 opened this issue Jun 13, 2024 · 6 comments
Labels

Comments

@Tarun4572
Copy link

@garyrussell so in a batch if a record fails (even after retry) records after that failed record are sent to next poll. am i correct. that batch stops at failed record and rest of records are sent to next poll. could you help with this, what happening with my case is, if i have batch of 5 messages and all 5 of them are corrupt messages, so i could see, first message fails and rest 4 are sent to next poll and in next poll, first message fails again and rest are sent to next poll. is this expected behavior. thank you

@Tarun4572
Copy link
Author

Tarun4572 commented Jun 13, 2024

FixedBackOff fixedBackOff = new FixedBackOff(0L, 0L);
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate, (r, exception) -> {
log.error("sending message to dlt topic:{}", dltTopic);
// pushing message into dltTopic 0th partition
return new TopicPartition(dltTopic, r.partition());
});
return new DefaultErrorHandler(recoverer, fixedBackOff);

@service("kafkaStreamListener")
public class KafkaStreamListener implements BatchMessageListener<String, String> {

@Value("${dlt.uagDltTopic}")
private String uagDltTopic;

@Override
public void onMessage(List<ConsumerRecord<String, String>> data) {
    log.info("Kafka Stream Listener");
    
    UUID batchId = UUID.randomUUID();
    long start = System.currentTimeMillis();
    log.info("Received Batch, BatchId: {}", batchId);
    
    for (ConsumerRecord<String, String> consumerRecord : data) {
        String value = consumerRecord.value();
        log.info("<< Input JSON to UAG Service : " + value);
        try {
           throw new ProcessingException();
        } catch(ProcessingException e) {
        	if(!uagDltTopic.isEmpty())
        		throw new BatchListenerFailedException("Failed to process record", consumerRecord); 
        	log.error("Exception while processing Usage Event", e);
        } catch (Exception e) {
        	if(!uagDltTopic.isEmpty())
        		throw new BatchListenerFailedException("Failed to process record", consumerRecord);
        	log.error("Exception while processing Usage Event", e);
		}
    }
    
    long end = System.currentTimeMillis();
	long timeTakenInMillis = end - start;
	log.info("Completed Processing BatchId: {}, time: {} ms", batchId, timeTakenInMillis);
    

}

}

when there is a poison pill in batch, code after for loop is not executed.

@artembilan
Copy link
Member

That's correct.
Your logic is like this:

if(!uagDltTopic.isEmpty())
        		throw new BatchListenerFailedException("Failed to process record", consumerRecord); 

So, as long as you throw an exception from that try and uagDltTopic is not empty you don't go after that loop.

@Tarun4572
Copy link
Author

Tarun4572 commented Jun 13, 2024

we need to throw BatchListenerFailedException for our errorhandler to get triggered right, thats what i am doing refering kafka documentation for batch listener. catching the exception from our business logic and throwing BatchListenerFailedException which indicates which record has caused exception.
image

@artembilan

@Tarun4572
Copy link
Author

Tarun4572 commented Jun 13, 2024

so in a batch if a record fails (even after retry) records after that failed record are sent to next poll. am i correct. that batch stops at failed record and rest of records are sent to next poll. could you help with this, what happening with my case is, if i have batch of 5 messages and all 5 of them are corrupt messages, so i could see, first message fails and rest 4 are sent to next poll and in next poll, first message fails again and rest are sent to next poll. is this expected behavior.

is this right behavior, could you please answer this. @artembilan

@artembilan
Copy link
Member

I answer you in other discussion: #2577 (comment).

So, yeah, the behavior is expected.

@sobychacko
Copy link
Contributor

We are closing this issue. If you see further problems, please feel free to re-open.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants