Skip to content

Commit b8e414d

Browse files
garyrussellartembilan
authored andcommitted
GH-3637: Fix KafkaMessageSource First Poll
Resolves #3637 If there are no records to receive, the poll blocked for `20*pollTimeout`. Wake the consumer during partition assignment; however, this changes the behavior when there are records present - the first poll always returns no records. Detect that the wakeup was due to assignment and perform another poll. Also add some debug logging, and don't create a new consumer if the source was stopped. **cherry-pick to 5.4.x**
1 parent fe57fd2 commit b8e414d

File tree

1 file changed

+31
-6
lines changed

1 file changed

+31
-6
lines changed

Diff for: spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java

+31-6
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object> impl
138138

139139
private Duration closeTimeout = Duration.ofSeconds(DEFAULT_CLOSE_TIMEOUT);
140140

141+
public boolean newAssignment;
142+
141143
private volatile Consumer<K, V> consumer;
142144

143145
private volatile boolean pausing;
@@ -146,6 +148,8 @@ public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object> impl
146148

147149
private volatile Iterator<ConsumerRecord<K, V>> recordsIterator;
148150

151+
private volatile boolean stopped;
152+
149153
/**
150154
* Construct an instance with the supplied parameters. Fetching multiple
151155
* records per poll will be disabled.
@@ -386,12 +390,14 @@ public synchronized boolean isRunning() {
386390
@Override
387391
public synchronized void start() {
388392
this.running = true;
393+
this.stopped = false;
389394
}
390395

391396
@Override
392397
public synchronized void stop() {
393398
stopConsumer();
394399
this.running = false;
400+
this.stopped = true;
395401
}
396402

397403
@Override
@@ -411,6 +417,10 @@ public boolean isPaused() {
411417

412418
@Override
413419
protected synchronized Object doReceive() {
420+
if (this.stopped) {
421+
this.logger.debug("Message source is stopped; no records will be returned");
422+
return null;
423+
}
414424
if (this.consumer == null) {
415425
createConsumer();
416426
this.running = true;
@@ -511,14 +521,27 @@ private ConsumerRecord<K, V> pollRecord() {
511521
}
512522
else {
513523
synchronized (this.consumerMonitor) {
514-
ConsumerRecords<K, V> records = this.consumer
515-
.poll(this.assignedPartitions.isEmpty() ? this.assignTimeout : this.pollTimeout);
516-
if (records == null || records.count() == 0) {
524+
try {
525+
ConsumerRecords<K, V> records = this.consumer
526+
.poll(this.assignedPartitions.isEmpty() ? this.assignTimeout : this.pollTimeout);
527+
this.logger.debug(() -> records == null
528+
? "Received null"
529+
: "Received " + records.count() + " records");
530+
if (records == null || records.count() == 0) {
531+
return null;
532+
}
533+
this.remainingCount.set(records.count());
534+
this.recordsIterator = records.iterator();
535+
return nextRecord();
536+
}
537+
catch (WakeupException ex) {
538+
this.logger.debug("Woken");
539+
if (this.newAssignment) {
540+
this.newAssignment = false;
541+
return pollRecord();
542+
}
517543
return null;
518544
}
519-
this.remainingCount.set(records.count());
520-
this.recordsIterator = records.iterator();
521-
return nextRecord();
522545
}
523546
}
524547
}
@@ -632,6 +655,8 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
632655
this.providedRebalanceListener.onPartitionsAssigned(partitions);
633656
}
634657
}
658+
KafkaMessageSource.this.consumer.wakeup();
659+
KafkaMessageSource.this.newAssignment = true;
635660
}
636661

637662
}

0 commit comments

Comments
 (0)