Skip to content

Commit 2e32362

Browse files
garyrussellartembilan
authored andcommitted
GH-646: Transaction Improvements
Resolves #646 - add protection to the transactional producer cache to avoid multiple cache inserts - add try/catch around `abortTransaction()` - detect (and reject) an illegal call to `template.send()` when there is no existing transaction - from the send callback remove the `ThreadLocal` check (will always be null) and only close the producer if it's not transactional - remove autoFlush from template tests # Conflicts: # spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java
1 parent 0589bea commit 2e32362

File tree

4 files changed

+45
-15
lines changed

4 files changed

+45
-15
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -289,8 +289,8 @@ public void abortTransaction() throws ProducerFencedException {
289289
}
290290

291291
@Override
292-
public void close() {
293-
if (this.cache != null) {
292+
public synchronized void close() {
293+
if (this.cache != null && !this.cache.contains(this)) {
294294
this.cache.offer(this);
295295
}
296296
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

+22-8
Original file line numberDiff line numberDiff line change
@@ -244,10 +244,14 @@ public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
244244
result = callback.doInOperations(this);
245245
}
246246
catch (Exception e) {
247-
producer.abortTransaction();
248-
this.producers.remove();
249-
closeProducer(producer, false);
250-
producer = null;
247+
try {
248+
producer.abortTransaction();
249+
}
250+
finally {
251+
this.producers.remove();
252+
closeProducer(producer, false);
253+
producer = null;
254+
}
251255
}
252256
if (producer != null) {
253257
try {
@@ -307,12 +311,19 @@ protected void closeProducer(Producer<K, V> producer, boolean inLocalTx) {
307311
* @return a Future for the {@link RecordMetadata}.
308312
*/
309313
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
314+
if (this.transactional) {
315+
Assert.state(inTransaction(),
316+
"No transaction is in process; "
317+
+ "possible solutions: run the template operation within the scope of a "
318+
+ "template.executeInTransaction() operation, start a transaction with @Transactional "
319+
+ "before invoking the template method, "
320+
+ "run in a transaction started by a listener container when consuming a record");
321+
}
310322
final Producer<K, V> producer = getTheProducer();
311323
if (this.logger.isTraceEnabled()) {
312324
this.logger.trace("Sending: " + producerRecord);
313325
}
314326
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
315-
final boolean inLocalTx = inTransaction();
316327
producer.send(producerRecord, new Callback() {
317328

318329
@Override
@@ -338,8 +349,8 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
338349
}
339350
}
340351
finally {
341-
if (KafkaTemplate.this.producers.get() == null) {
342-
closeProducer(producer, inLocalTx);
352+
if (!KafkaTemplate.this.transactional) {
353+
closeProducer(producer, false);
343354
}
344355
}
345356
}
@@ -354,8 +365,11 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
354365
return future;
355366
}
356367

368+
357369
protected boolean inTransaction() {
358-
return this.producers.get() != null || TransactionSynchronizationManager.isActualTransactionActive();
370+
return this.transactional && (this.producers.get() != null
371+
|| TransactionSynchronizationManager.getResource(this.producerFactory) != null
372+
|| TransactionSynchronizationManager.isActualTransactionActive());
359373
}
360374

361375
private Producer<K, V> getTheProducer() {

Diff for: spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminBadContextTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,7 +41,7 @@ public class KafkaAdminBadContextTests {
4141
@Test
4242
public void testContextNotLoaded() {
4343
try {
44-
new AnnotationConfigApplicationContext(BadConfig.class);
44+
new AnnotationConfigApplicationContext(BadConfig.class).close();
4545
fail("Expected Exception");
4646
}
4747
catch (IllegalStateException e) {

Diff for: spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/*
2+
* Copyright 2017-2018 the original author or authors.
23
* Copyright 2017 the original author or authors.
34
*
45
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -17,6 +18,7 @@
1718
package org.springframework.kafka.core;
1819

1920
import static org.assertj.core.api.Assertions.assertThat;
21+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2022
import static org.mockito.ArgumentMatchers.any;
2123
import static org.mockito.ArgumentMatchers.eq;
2224
import static org.mockito.BDDMockito.given;
@@ -77,7 +79,7 @@ public void testLocalTransaction() throws Exception {
7779
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
7880
pf.setKeySerializer(new StringSerializer());
7981
pf.setTransactionIdPrefix("my.transaction.");
80-
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf, true);
82+
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
8183
template.setDefaultTopic(STRING_KEY_TOPIC);
8284
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testTxString", "false", embeddedKafka);
8385
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -113,7 +115,7 @@ public void testGlobalTransaction() throws Exception {
113115
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
114116
pf.setKeySerializer(new StringSerializer());
115117
pf.setTransactionIdPrefix("my.transaction.");
116-
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf, true);
118+
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
117119
template.setDefaultTopic(STRING_KEY_TOPIC);
118120
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testTxString", "false", embeddedKafka);
119121
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -165,6 +167,20 @@ public void testDeclarative() {
165167
ctx.close();
166168
}
167169

170+
@Test
171+
public void testNoTx() {
172+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
173+
senderProps.put(ProducerConfig.RETRIES_CONFIG, 1);
174+
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
175+
pf.setKeySerializer(new StringSerializer());
176+
pf.setTransactionIdPrefix("my.transaction.");
177+
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
178+
template.setDefaultTopic(STRING_KEY_TOPIC);
179+
assertThatThrownBy(() -> template.send("foo", "bar"))
180+
.isInstanceOf(IllegalStateException.class)
181+
.hasMessageContaining("No transaction is in process;");
182+
}
183+
168184
@Configuration
169185
@EnableTransactionManagement
170186
public static class DeclarativeConfig {

0 commit comments

Comments
 (0)