Skip to content

Commit f76bc09

Browse files
authored
GH-2760: Add SmartLifecycle to Producer Factory
Resolves #2760
1 parent 6ee6148 commit f76bc09

File tree

2 files changed

+141
-3
lines changed

2 files changed

+141
-3
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

+48-2
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@
2323
import java.util.Iterator;
2424
import java.util.List;
2525
import java.util.Map;
26+
import java.util.Set;
2627
import java.util.concurrent.BlockingQueue;
2728
import java.util.concurrent.ConcurrentHashMap;
2829
import java.util.concurrent.Future;
2930
import java.util.concurrent.LinkedBlockingQueue;
31+
import java.util.concurrent.atomic.AtomicBoolean;
3032
import java.util.concurrent.atomic.AtomicInteger;
3133
import java.util.function.BiPredicate;
3234
import java.util.function.Supplier;
@@ -55,6 +57,7 @@
5557
import org.springframework.context.ApplicationContext;
5658
import org.springframework.context.ApplicationContextAware;
5759
import org.springframework.context.ApplicationListener;
60+
import org.springframework.context.SmartLifecycle;
5861
import org.springframework.context.event.ContextStoppedEvent;
5962
import org.springframework.core.log.LogAccessor;
6063
import org.springframework.kafka.KafkaException;
@@ -111,7 +114,7 @@
111114
*/
112115
public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
113116
implements ProducerFactory<K, V>, ApplicationContextAware,
114-
BeanNameAware, ApplicationListener<ContextStoppedEvent>, DisposableBean {
117+
BeanNameAware, ApplicationListener<ContextStoppedEvent>, DisposableBean, SmartLifecycle {
115118

116119
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(DefaultKafkaProducerFactory.class));
117120

@@ -123,6 +126,8 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
123126

124127
private final ThreadLocal<CloseSafeProducer<K, V>> threadBoundProducers = new ThreadLocal<>();
125128

129+
private final Set<CloseSafeProducer<K, V>> threadBoundProducersAll = ConcurrentHashMap.newKeySet();
130+
126131
private final AtomicInteger epoch = new AtomicInteger();
127132

128133
private final AtomicInteger clientIdCounter = new AtomicInteger();
@@ -131,6 +136,8 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
131136

132137
private final List<ProducerPostProcessor<K, V>> postProcessors = new ArrayList<>();
133138

139+
private final AtomicBoolean running = new AtomicBoolean();
140+
134141
private Supplier<Serializer<K>> keySerializerSupplier;
135142

136143
private Supplier<Serializer<V>> valueSerializerSupplier;
@@ -519,6 +526,27 @@ public void setMaxAge(Duration maxAge) {
519526
this.maxAge = maxAge.toMillis();
520527
}
521528

529+
@Override
530+
public void start() {
531+
this.running.set(true);
532+
}
533+
534+
@Override
535+
public void stop() {
536+
this.running.set(false);
537+
destroy();
538+
}
539+
540+
@Override
541+
public boolean isRunning() {
542+
return this.running.get();
543+
}
544+
545+
@Override
546+
public int getPhase() {
547+
return Integer.MIN_VALUE;
548+
}
549+
522550
/**
523551
* Copy properties of the instance and the given properties to create a new producer factory.
524552
* <p>If the {@link org.springframework.kafka.core.DefaultKafkaProducerFactory} makes a
@@ -677,7 +705,12 @@ public void destroy() {
677705
this.producer = null;
678706
}
679707
if (producerToClose != null) {
680-
producerToClose.closeDelegate(this.physicalCloseTimeout, this.listeners);
708+
try {
709+
producerToClose.closeDelegate(this.physicalCloseTimeout, this.listeners);
710+
}
711+
catch (Exception e) {
712+
LOGGER.error(e, "Exception while closing producer");
713+
}
681714
}
682715
this.cache.values().forEach(queue -> {
683716
CloseSafeProducer<K, V> next = queue.poll();
@@ -691,6 +724,16 @@ public void destroy() {
691724
next = queue.poll();
692725
}
693726
});
727+
this.cache.clear();
728+
this.threadBoundProducersAll.forEach(prod -> {
729+
try {
730+
prod.closeDelegate(this.physicalCloseTimeout, this.listeners);
731+
}
732+
catch (Exception e) {
733+
LOGGER.error(e, "Exception while closing producer");
734+
}
735+
});
736+
this.threadBoundProducersAll.clear();
694737
this.epoch.incrementAndGet();
695738
}
696739

@@ -760,6 +803,7 @@ private Producer<K, V> getOrCreateThreadBoundProducer() {
760803
CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
761804
if (tlProducer != null && (tlProducer.closed || this.epoch.get() != tlProducer.epoch || expire(tlProducer))) {
762805
closeThreadBoundProducer();
806+
this.threadBoundProducersAll.remove(tlProducer);
763807
tlProducer = null;
764808
}
765809
if (tlProducer == null) {
@@ -769,6 +813,7 @@ private Producer<K, V> getOrCreateThreadBoundProducer() {
769813
listener.producerAdded(tlProducer.clientId, tlProducer);
770814
}
771815
this.threadBoundProducers.set(tlProducer);
816+
this.threadBoundProducersAll.add(tlProducer);
772817
}
773818
return tlProducer;
774819
}
@@ -907,6 +952,7 @@ public void closeThreadBoundProducer() {
907952
CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
908953
if (tlProducer != null) {
909954
this.threadBoundProducers.remove();
955+
this.threadBoundProducersAll.remove(tlProducer);
910956
tlProducer.closeDelegate(this.physicalCloseTimeout, this.listeners);
911957
}
912958
}

Diff for: spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java

+93-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.List;
3737
import java.util.Map;
3838
import java.util.Queue;
39+
import java.util.Set;
3940
import java.util.concurrent.CompletableFuture;
4041
import java.util.concurrent.atomic.AtomicBoolean;
4142
import java.util.concurrent.atomic.AtomicInteger;
@@ -151,6 +152,35 @@ protected Producer createRawProducer(Map configs) {
151152
verify(producer, times(2)).close(any(Duration.class));
152153
}
153154

155+
@Test
156+
@SuppressWarnings({ "rawtypes", "unchecked" })
157+
void singleLifecycle() throws InterruptedException {
158+
final Producer producer = mock(Producer.class);
159+
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
160+
161+
@Override
162+
protected Producer createRawProducer(Map configs) {
163+
return producer;
164+
}
165+
166+
};
167+
Producer aProducer = pf.createProducer();
168+
assertThat(aProducer).isNotNull();
169+
Producer bProducer = pf.createProducer();
170+
assertThat(bProducer).isSameAs(aProducer);
171+
aProducer.close(ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT);
172+
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNotNull();
173+
pf.setMaxAge(Duration.ofMillis(10));
174+
Thread.sleep(50);
175+
aProducer = pf.createProducer();
176+
assertThat(aProducer).isNotSameAs(bProducer);
177+
Map<?, ?> cache = KafkaTestUtils.getPropertyValue(pf, "cache", Map.class);
178+
assertThat(cache.size()).isEqualTo(0);
179+
pf.stop();
180+
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull();
181+
verify(producer, times(2)).close(any(Duration.class));
182+
}
183+
154184
@Test
155185
@SuppressWarnings({ "rawtypes", "unchecked" })
156186
void testResetTx() throws Exception {
@@ -186,6 +216,42 @@ protected Producer createRawProducer(Map configs) {
186216
verify(producer).close(any(Duration.class));
187217
}
188218

219+
@Test
220+
@SuppressWarnings({ "rawtypes", "unchecked" })
221+
void txLifecycle() throws Exception {
222+
final Producer producer = mock(Producer.class);
223+
ApplicationContext ctx = mock(ApplicationContext.class);
224+
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
225+
226+
@Override
227+
protected Producer createRawProducer(Map configs) {
228+
return producer;
229+
}
230+
231+
};
232+
pf.setApplicationContext(ctx);
233+
pf.setTransactionIdPrefix("foo");
234+
Producer aProducer = pf.createProducer();
235+
assertThat(aProducer).isNotNull();
236+
aProducer.close();
237+
Producer bProducer = pf.createProducer();
238+
assertThat(bProducer).isSameAs(aProducer);
239+
bProducer.close();
240+
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull();
241+
Map<?, ?> cache = KafkaTestUtils.getPropertyValue(pf, "cache", Map.class);
242+
assertThat(cache.size()).isEqualTo(1);
243+
Queue queue = (Queue) cache.get("foo");
244+
assertThat(queue.size()).isEqualTo(1);
245+
pf.setMaxAge(Duration.ofMillis(10));
246+
Thread.sleep(50);
247+
aProducer = pf.createProducer();
248+
assertThat(aProducer).isNotSameAs(bProducer);
249+
pf.stop();
250+
assertThat(queue.size()).isEqualTo(0);
251+
assertThat(cache.size()).isEqualTo(0);
252+
verify(producer).close(any(Duration.class));
253+
}
254+
189255
@Test
190256
@SuppressWarnings({ "rawtypes", "unchecked" })
191257
void dontReturnToCacheAfterReset() {
@@ -255,6 +321,32 @@ protected Producer createKafkaProducer() {
255321
verify(producer, times(3)).close(any(Duration.class));
256322
}
257323

324+
@Test
325+
@SuppressWarnings({ "rawtypes", "unchecked" })
326+
void threadLocalLifecycle() throws InterruptedException {
327+
final Producer producer = mock(Producer.class);
328+
AtomicBoolean created = new AtomicBoolean();
329+
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
330+
331+
@Override
332+
protected Producer createKafkaProducer() {
333+
assertThat(created.get()).isFalse();
334+
created.set(true);
335+
return producer;
336+
}
337+
338+
};
339+
pf.setProducerPerThread(true);
340+
Producer aProducer = pf.createProducer();
341+
assertThat(aProducer).isNotNull();
342+
aProducer.close();
343+
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull();
344+
assertThat(KafkaTestUtils.getPropertyValue(pf, "threadBoundProducers", ThreadLocal.class).get()).isNotNull();
345+
pf.stop();
346+
assertThat(KafkaTestUtils.getPropertyValue(pf, "threadBoundProducersAll", Set.class)).hasSize(0);
347+
verify(producer).close(any(Duration.class));
348+
}
349+
258350
@Test
259351
@SuppressWarnings({ "rawtypes", "unchecked" })
260352
void testThreadLocalReset() {
@@ -281,7 +373,7 @@ protected Producer createKafkaProducer() {
281373
bProducer = pf.createProducer();
282374
assertThat(bProducer).isNotSameAs(aProducer);
283375
bProducer.close();
284-
verify(producer1).close(any(Duration.class));
376+
verify(producer1, times(2)).close(any(Duration.class));
285377
}
286378

287379
@Test

0 commit comments

Comments
 (0)