Skip to content

Commit b17c530

Browse files
garyrussellartembilan
authored andcommitted
GH-800: Fix Zombie Fencing
Resolves #800 Fix assignment of `transactional.id` to be consistent across consumers. Backported to 1.3.x, removing Java 8 constructs.
1 parent 48caf23 commit b17c530

File tree

6 files changed

+163
-10
lines changed

6 files changed

+163
-10
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

+88-8
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
import java.util.Collections;
2020
import java.util.HashMap;
21+
import java.util.Iterator;
2122
import java.util.List;
2223
import java.util.Map;
24+
import java.util.Map.Entry;
2325
import java.util.concurrent.BlockingQueue;
2426
import java.util.concurrent.Future;
2527
import java.util.concurrent.LinkedBlockingQueue;
@@ -44,6 +46,7 @@
4446

4547
import org.springframework.beans.factory.DisposableBean;
4648
import org.springframework.context.Lifecycle;
49+
import org.springframework.kafka.support.TransactionSupport;
4750
import org.springframework.util.Assert;
4851

4952
/**
@@ -80,6 +83,8 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
8083

8184
private final BlockingQueue<CloseSafeProducer<K, V>> cache = new LinkedBlockingQueue<>();
8285

86+
private final Map<String, Producer<K, V>> consumerProducers = new HashMap<>();
87+
8388
private volatile CloseSafeProducer<K, V> producer;
8489

8590
private Serializer<K> keySerializer;
@@ -92,6 +97,12 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
9297

9398
private volatile boolean running;
9499

100+
private boolean producerPerConsumerPartition = true;
101+
102+
/**
103+
* Construct a factory with the provided configuration.
104+
* @param configs the configuration.
105+
*/
95106
public DefaultKafkaProducerFactory(Map<String, Object> configs) {
96107
this(configs, null, null);
97108
}
@@ -131,6 +142,17 @@ public void setTransactionIdPrefix(String transactionIdPrefix) {
131142
this.transactionIdPrefix = transactionIdPrefix;
132143
}
133144

145+
/**
146+
* Set to false to revert to the previous behavior of a simple incrementing
147+
* trasactional.id suffix for each producer instead of maintaining a producer
148+
* for each group/topic/partition.
149+
* @param producerPerConsumerPartition false to revert.
150+
* @since 1.3.7
151+
*/
152+
public void setProducerPerConsumerPartition(boolean producerPerConsumerPartition) {
153+
this.producerPerConsumerPartition = producerPerConsumerPartition;
154+
}
155+
134156
/**
135157
* Return an unmodifiable reference to the configuration map for this factory.
136158
* Useful for cloning to make a similar factory.
@@ -164,6 +186,13 @@ public void destroy() throws Exception { //NOSONAR
164186
}
165187
producer = this.cache.poll();
166188
}
189+
synchronized (this.consumerProducers) {
190+
for (Entry<String, Producer<K, V>> entry : this.consumerProducers.entrySet()) {
191+
((CloseSafeProducer<K, V>) entry.getValue()).delegate
192+
.close(this.physicalCloseTimeout, TimeUnit.SECONDS);
193+
}
194+
this.consumerProducers.clear();
195+
}
167196
}
168197

169198
@Override
@@ -192,7 +221,12 @@ public boolean isRunning() {
192221
@Override
193222
public Producer<K, V> createProducer() {
194223
if (this.transactionIdPrefix != null) {
195-
return createTransactionalProducer();
224+
if (this.producerPerConsumerPartition) {
225+
return createTransactionalProducerForPartition();
226+
}
227+
else {
228+
return createTransactionalProducer();
229+
}
196230
}
197231
if (this.producer == null) {
198232
synchronized (this) {
@@ -213,6 +247,25 @@ protected Producer<K, V> createKafkaProducer() {
213247
return new KafkaProducer<K, V>(this.configs, this.keySerializer, this.valueSerializer);
214248
}
215249

250+
private Producer<K, V> createTransactionalProducerForPartition() {
251+
String suffix = TransactionSupport.getTransactionIdSuffix();
252+
if (suffix == null) {
253+
return createTransactionalProducer();
254+
}
255+
else {
256+
synchronized (this.consumerProducers) {
257+
if (!this.consumerProducers.containsKey(suffix)) {
258+
Producer<K, V> newProducer = doCreateTxProducer(suffix);
259+
this.consumerProducers.put(suffix, newProducer);
260+
return newProducer;
261+
}
262+
else {
263+
return this.consumerProducers.get(suffix);
264+
}
265+
}
266+
}
267+
}
268+
216269
/**
217270
* Subclasses must return a producer from the {@link #getCache()} or a
218271
* new raw producer wrapped in a {@link CloseSafeProducer}.
@@ -222,18 +275,22 @@ protected Producer<K, V> createKafkaProducer() {
222275
protected Producer<K, V> createTransactionalProducer() {
223276
Producer<K, V> producer = this.cache.poll();
224277
if (producer == null) {
225-
Map<String, Object> configs = new HashMap<>(this.configs);
226-
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
227-
this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());
228-
producer = new KafkaProducer<K, V>(configs, this.keySerializer, this.valueSerializer);
229-
producer.initTransactions();
230-
return new CloseSafeProducer<K, V>(producer, this.cache);
278+
return doCreateTxProducer("" + this.transactionIdSuffix.getAndIncrement());
231279
}
232280
else {
233281
return producer;
234282
}
235283
}
236284

285+
private Producer<K, V> doCreateTxProducer(String suffix) {
286+
Producer<K, V> producer;
287+
Map<String, Object> configs = new HashMap<>(this.configs);
288+
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionIdPrefix + suffix);
289+
producer = new KafkaProducer<K, V>(configs, this.keySerializer, this.valueSerializer);
290+
producer.initTransactions();
291+
return new CloseSafeProducer<K, V>(producer, this.cache, this.consumerProducers);
292+
}
293+
237294
protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
238295
return this.cache;
239296
}
@@ -251,16 +308,24 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
251308

252309
private final BlockingQueue<CloseSafeProducer<K, V>> cache;
253310

311+
private final Map<String, Producer<K, V>> consumerProducers;
312+
254313
private volatile boolean txFailed;
255314

256315
CloseSafeProducer(Producer<K, V> delegate) {
257-
this(delegate, null);
316+
this(delegate, null, null);
258317
Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer");
259318
}
260319

261320
CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache) {
321+
this(delegate, cache, null);
322+
}
323+
324+
CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache,
325+
Map<String, Producer<K, V>> consumerProducers) {
262326
this.delegate = delegate;
263327
this.cache = cache;
328+
this.consumerProducers = consumerProducers;
264329
}
265330

266331
@Override
@@ -340,6 +405,9 @@ public void close() {
340405
+ "broker restarted during transaction");
341406

342407
this.delegate.close();
408+
if (this.consumerProducers != null) {
409+
removeConsumerProducer();
410+
}
343411
}
344412
else {
345413
synchronized (this) {
@@ -356,6 +424,18 @@ public void close(long timeout, TimeUnit unit) {
356424
close();
357425
}
358426

427+
private void removeConsumerProducer() {
428+
synchronized (this.consumerProducers) {
429+
Iterator<Entry<String, Producer<K, V>>> iterator = this.consumerProducers.entrySet().iterator();
430+
while (iterator.hasNext()) {
431+
if (iterator.next().getValue().equals(this)) {
432+
iterator.remove();
433+
break;
434+
}
435+
}
436+
}
437+
}
438+
359439
@Override
360440
public String toString() {
361441
return "CloseSafeProducer [delegate=" + this.delegate + "]";

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+6
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.springframework.kafka.support.Acknowledgment;
6060
import org.springframework.kafka.support.TopicPartitionInitialOffset;
6161
import org.springframework.kafka.support.TopicPartitionInitialOffset.SeekPosition;
62+
import org.springframework.kafka.support.TransactionSupport;
6263
import org.springframework.kafka.transaction.KafkaTransactionManager;
6364
import org.springframework.scheduling.SchedulingAwareRunnable;
6465
import org.springframework.scheduling.TaskScheduler;
@@ -894,6 +895,8 @@ private void innvokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
894895
this.logger.trace("Processing " + record);
895896
}
896897
try {
898+
TransactionSupport.setTransactionIdSuffix(
899+
this.consumerGroupId + "." + record.topic() + "." + record.partition());
897900
this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {
898901

899902
@Override
@@ -920,6 +923,9 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
920923
}
921924
getAfterRollbackProcessor().process(unprocessed, this.consumer);
922925
}
926+
finally {
927+
TransactionSupport.clearTransactionIdSuffix();
928+
}
923929
}
924930
}
925931

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support;
18+
19+
/**
20+
* Utilities for supporting transactions.
21+
*
22+
* @author Gary Russell
23+
* @since 1.3.7
24+
*
25+
*/
26+
public final class TransactionSupport {
27+
28+
private static final ThreadLocal<String> transactionIdSuffix = new ThreadLocal<>();
29+
30+
private TransactionSupport() {
31+
super();
32+
}
33+
34+
public static void setTransactionIdSuffix(String suffix) {
35+
transactionIdSuffix.set(suffix);
36+
}
37+
38+
public static String getTransactionIdSuffix() {
39+
return transactionIdSuffix.get();
40+
}
41+
42+
public static void clearTransactionIdSuffix() {
43+
transactionIdSuffix.remove();
44+
}
45+
46+
}

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.concurrent.CountDownLatch;
4040
import java.util.concurrent.TimeUnit;
4141
import java.util.concurrent.atomic.AtomicBoolean;
42+
import java.util.concurrent.atomic.AtomicReference;
4243

4344
import org.apache.commons.logging.Log;
4445
import org.apache.commons.logging.LogFactory;
@@ -63,6 +64,7 @@
6364
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
6465
import org.springframework.kafka.core.KafkaTemplate;
6566
import org.springframework.kafka.core.ProducerFactory;
67+
import org.springframework.kafka.core.ProducerFactoryUtils;
6668
import org.springframework.kafka.listener.config.ContainerProperties;
6769
import org.springframework.kafka.support.TopicPartitionInitialOffset;
6870
import org.springframework.kafka.test.rule.KafkaEmbedded;
@@ -150,7 +152,8 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
150152
}
151153
});
152154
if (handleError) {
153-
props.setErrorHandler((e, data) -> { });
155+
props.setErrorHandler((e, data) -> {
156+
});
154157
}
155158
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, props);
156159
container.setBeanName("commit");
@@ -367,6 +370,7 @@ public void testConsumeAndProduceTransactionExternalTM() throws Exception {
367370
verify(pf).createProducer();
368371
}
369372

373+
@SuppressWarnings("unchecked")
370374
@Test
371375
public void testRollbackRecord() throws Exception {
372376
logger.info("Start testRollbackRecord");
@@ -387,6 +391,7 @@ public void testRollbackRecord() throws Exception {
387391
final KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
388392
final AtomicBoolean failed = new AtomicBoolean();
389393
final CountDownLatch latch = new CountDownLatch(3);
394+
final AtomicReference<String> transactionalId = new AtomicReference<>();
390395
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
391396
latch.countDown();
392397
if (failed.compareAndSet(false, true)) {
@@ -398,6 +403,9 @@ public void testRollbackRecord() throws Exception {
398403
if (message.topic().equals(topic1)) {
399404
template.send(topic2, "bar");
400405
template.flush();
406+
transactionalId.set(KafkaTestUtils.getPropertyValue(
407+
ProducerFactoryUtils.getTransactionalResourceHolder(pf).getProducer(),
408+
"delegate.transactionManager.transactionalId", String.class));
401409
}
402410
});
403411

@@ -434,8 +442,11 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
434442
ConsumerRecords<Integer, String> records = consumer.poll(0);
435443
assertThat(records.count()).isEqualTo(0);
436444
assertThat(consumer.position(new TopicPartition(topic1, 0))).isEqualTo(1);
445+
assertThat(transactionalId.get()).startsWith("rr.group.txTopic");
437446
logger.info("Stop testRollbackRecord");
438447
pf.destroy();
448+
assertThat(KafkaTestUtils.getPropertyValue(pf, "consumerProducers", Map.class)).isEmpty();
449+
consumer.close();
439450
}
440451

441452
@SuppressWarnings("serial")

Diff for: src/reference/asciidoc/kafka.adoc

+6-1
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,12 @@ Spring for Apache Kafka adds support in several ways.
196196
Transactions are enabled by providing the `DefaultKafkaProducerFactory` with a `transactionIdPrefix`.
197197
In that case, instead of managing a single shared `Producer`, the factory maintains a cache of transactional producers.
198198
When the user `close()` s a producer, it is returned to the cache for reuse instead of actually being closed.
199-
The `transactional.id` property of each producer is `transactionIdPrefix` + `n`, where `n` starts with `0` and is incremented for each new producer.
199+
The `transactional.id` property of each producer is `transactionIdPrefix` + `n`, where `n` starts with `0` and is incremented for each new producer, unless the transaction is started by a listener container with a record-based listener.
200+
In that case, the `transactional.id` is `<transactionIdPrefix>.<group.id>.<topic>.<partition>`; this is to properly support fencing zombies https://www.confluent.io/blog/transactions-apache-kafka/[as described here].
201+
This new behavior was added in versions 1.3.7, 2.0.6, 2.1.10, and 2.2.0.
202+
If you wish to revert to the previous behavior, set the `producerPerConsumerPartition` property on the `DefaultKafkaProducerFactory` to `false`.
203+
204+
NOTE: While transactions are supported with batch listeners, zombie fencing cannot be supported because a batch may contain records from multiple topics/partitions.
200205

201206
====== KafkaTransactionManager
202207

Diff for: src/reference/asciidoc/whats-new.adoc

+5
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,8 @@ Starting with _version 1.3.5_, a new `AfterRollbackProcessor` strategy is provid
4646

4747
Support for configuring Kerberos is now provided.
4848
See <<kerberos>> for more information.
49+
50+
==== Transactional Id
51+
52+
When a transaction is started by the listener container, the `transactional.id` is now the `transactionIdPrefix` appended with `<group.id>.<topic>.<partition>`.
53+
This is to allow proper fencing of zombies https://www.confluent.io/blog/transactions-apache-kafka/[as described here].

0 commit comments

Comments
 (0)