Skip to content

Commit 42e2d01

Browse files
garyrussellartembilan
authored andcommitted
GH-646: Transaction Improvements
Resolves #646 - add protection to the transactional producer cache to avoid multiple cache inserts - add try/catch around `abortTransaction()` - detect (and reject) an illegal call to `template.send()` when there is no existing transaction - from the send callback remove the `ThreadLocal` check (will always be null) and only close the producer if it's not transactional - remove autoFlush from template tests
1 parent 9f724ea commit 42e2d01

File tree

4 files changed

+44
-15
lines changed

4 files changed

+44
-15
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -302,8 +302,8 @@ public void abortTransaction() throws ProducerFencedException {
302302
}
303303

304304
@Override
305-
public void close() {
306-
if (this.cache != null) {
305+
public synchronized void close() {
306+
if (this.cache != null && !this.cache.contains(this)) {
307307
this.cache.offer(this);
308308
}
309309
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

+22-8
Original file line numberDiff line numberDiff line change
@@ -265,10 +265,14 @@ public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
265265
result = callback.doInOperations(this);
266266
}
267267
catch (Exception e) {
268-
producer.abortTransaction();
269-
this.producers.remove();
270-
closeProducer(producer, false);
271-
producer = null;
268+
try {
269+
producer.abortTransaction();
270+
}
271+
finally {
272+
this.producers.remove();
273+
closeProducer(producer, false);
274+
producer = null;
275+
}
272276
}
273277
if (producer != null) {
274278
try {
@@ -328,12 +332,19 @@ protected void closeProducer(Producer<K, V> producer, boolean inLocalTx) {
328332
* @return a Future for the {@link RecordMetadata}.
329333
*/
330334
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
335+
if (this.transactional) {
336+
Assert.state(inTransaction(),
337+
"No transaction is in process; "
338+
+ "possible solutions: run the template operation within the scope of a "
339+
+ "template.executeInTransaction() operation, start a transaction with @Transactional "
340+
+ "before invoking the template method, "
341+
+ "run in a transaction started by a listener container when consuming a record");
342+
}
331343
final Producer<K, V> producer = getTheProducer();
332344
if (this.logger.isTraceEnabled()) {
333345
this.logger.trace("Sending: " + producerRecord);
334346
}
335347
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
336-
final boolean inLocalTx = inTransaction();
337348
producer.send(producerRecord, new Callback() {
338349

339350
@Override
@@ -365,8 +376,8 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
365376
}
366377
}
367378
finally {
368-
if (KafkaTemplate.this.producers.get() == null) {
369-
closeProducer(producer, inLocalTx);
379+
if (!KafkaTemplate.this.transactional) {
380+
closeProducer(producer, false);
370381
}
371382
}
372383
}
@@ -381,8 +392,11 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
381392
return future;
382393
}
383394

395+
384396
protected boolean inTransaction() {
385-
return this.producers.get() != null || TransactionSynchronizationManager.isActualTransactionActive();
397+
return this.transactional && (this.producers.get() != null
398+
|| TransactionSynchronizationManager.getResource(this.producerFactory) != null
399+
|| TransactionSynchronizationManager.isActualTransactionActive());
386400
}
387401

388402
private Producer<K, V> getTheProducer() {

Diff for: spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminBadContextTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,7 +41,7 @@ public class KafkaAdminBadContextTests {
4141
@Test
4242
public void testContextNotLoaded() {
4343
try {
44-
new AnnotationConfigApplicationContext(BadConfig.class);
44+
new AnnotationConfigApplicationContext(BadConfig.class).close();
4545
fail("Expected Exception");
4646
}
4747
catch (IllegalStateException e) {

Diff for: spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.core;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2021
import static org.mockito.ArgumentMatchers.any;
2122
import static org.mockito.ArgumentMatchers.eq;
2223
import static org.mockito.BDDMockito.given;
@@ -83,7 +84,7 @@ public void testLocalTransaction() throws Exception {
8384
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
8485
pf.setKeySerializer(new StringSerializer());
8586
pf.setTransactionIdPrefix("my.transaction.");
86-
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf, true);
87+
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
8788
template.setDefaultTopic(STRING_KEY_TOPIC);
8889
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testTxString", "false", embeddedKafka);
8990
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -119,7 +120,7 @@ public void testGlobalTransaction() throws Exception {
119120
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
120121
pf.setKeySerializer(new StringSerializer());
121122
pf.setTransactionIdPrefix("my.transaction.");
122-
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf, true);
123+
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
123124
template.setDefaultTopic(STRING_KEY_TOPIC);
124125
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testTxString", "false", embeddedKafka);
125126
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -194,6 +195,20 @@ public void testOverrideProducerIdempotentConfig() throws Exception {
194195
.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)).isEqualTo(false);
195196
}
196197

198+
@Test
199+
public void testNoTx() {
200+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
201+
senderProps.put(ProducerConfig.RETRIES_CONFIG, 1);
202+
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
203+
pf.setKeySerializer(new StringSerializer());
204+
pf.setTransactionIdPrefix("my.transaction.");
205+
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
206+
template.setDefaultTopic(STRING_KEY_TOPIC);
207+
assertThatThrownBy(() -> template.send("foo", "bar"))
208+
.isInstanceOf(IllegalStateException.class)
209+
.hasMessageContaining("No transaction is in process;");
210+
}
211+
197212
@Configuration
198213
@EnableTransactionManagement
199214
public static class DeclarativeConfig {

0 commit comments

Comments
 (0)