Skip to content

Commit b486bee

Browse files
committed
spring-projectsGH-646: Transaction Improvements
Resolves spring-projects#646 - add protection to the transactional producer cache to avoid multiple cache inserts - detect (and reject) an illegal call to `template.send()` when there is no existing transaction - from the send callback remove the `ThreadLocal` check (will always be null) and only close the producer if it's not transactional - remove autoFlush from template tests
1 parent 9f724ea commit b486bee

File tree

4 files changed

+44
-14
lines changed

4 files changed

+44
-14
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,8 @@ public void abortTransaction() throws ProducerFencedException {
302302
}
303303

304304
@Override
305-
public void close() {
306-
if (this.cache != null) {
305+
public synchronized void close() {
306+
if (this.cache != null && !this.cache.contains(this)) {
307307
this.cache.offer(this);
308308
}
309309
}

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

+23-8
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ public List<PartitionInfo> partitionsFor(String topic) {
226226
return producer.partitionsFor(topic);
227227
}
228228
finally {
229-
closeProducer(producer, inTransaction());
229+
closeProducer(producer, inLocalTransaction());
230230
}
231231
}
232232

@@ -237,7 +237,7 @@ public List<PartitionInfo> partitionsFor(String topic) {
237237
return producer.metrics();
238238
}
239239
finally {
240-
closeProducer(producer, inTransaction());
240+
closeProducer(producer, inLocalTransaction());
241241
}
242242
}
243243

@@ -248,7 +248,7 @@ public <T> T execute(ProducerCallback<K, V, T> callback) {
248248
return callback.doInKafka(producer);
249249
}
250250
finally {
251-
closeProducer(producer, inTransaction());
251+
closeProducer(producer, inLocalTransaction());
252252
}
253253
}
254254

@@ -295,7 +295,7 @@ public void flush() {
295295
producer.flush();
296296
}
297297
finally {
298-
closeProducer(producer, inTransaction());
298+
closeProducer(producer, inLocalTransaction());
299299
}
300300
}
301301

@@ -328,12 +328,20 @@ protected void closeProducer(Producer<K, V> producer, boolean inLocalTx) {
328328
* @return a Future for the {@link RecordMetadata}.
329329
*/
330330
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
331+
if (this.transactional) {
332+
Assert.state(inTransaction(),
333+
"No transaction is in process; "
334+
+ "possible solutions: run the template operation within the scope of a "
335+
+ "template.executeInTransaction() operation, start a transaction with @Transactional "
336+
+ "before invoking the template method, "
337+
+ "run in a transaction started by a listener container when consuming a record");
338+
}
331339
final Producer<K, V> producer = getTheProducer();
332340
if (this.logger.isTraceEnabled()) {
333341
this.logger.trace("Sending: " + producerRecord);
334342
}
335343
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
336-
final boolean inLocalTx = inTransaction();
344+
final boolean inLocalTx = inLocalTransaction();
337345
producer.send(producerRecord, new Callback() {
338346

339347
@Override
@@ -365,8 +373,8 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
365373
}
366374
}
367375
finally {
368-
if (KafkaTemplate.this.producers.get() == null) {
369-
closeProducer(producer, inLocalTx);
376+
if (!KafkaTemplate.this.transactional) {
377+
closeProducer(producer, false);
370378
}
371379
}
372380
}
@@ -381,8 +389,15 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
381389
return future;
382390
}
383391

392+
384393
protected boolean inTransaction() {
385-
return this.producers.get() != null || TransactionSynchronizationManager.isActualTransactionActive();
394+
return this.transactional && (this.producers.get() != null
395+
|| TransactionSynchronizationManager.getResource(this.producerFactory) != null
396+
|| TransactionSynchronizationManager.isActualTransactionActive());
397+
}
398+
399+
protected boolean inLocalTransaction() {
400+
return this.transactional && this.producers.get() != null;
386401
}
387402

388403
private Producer<K, V> getTheProducer() {

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminBadContextTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,7 +41,7 @@ public class KafkaAdminBadContextTests {
4141
@Test
4242
public void testContextNotLoaded() {
4343
try {
44-
new AnnotationConfigApplicationContext(BadConfig.class);
44+
new AnnotationConfigApplicationContext(BadConfig.class).close();
4545
fail("Expected Exception");
4646
}
4747
catch (IllegalStateException e) {

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.core;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2021
import static org.mockito.ArgumentMatchers.any;
2122
import static org.mockito.ArgumentMatchers.eq;
2223
import static org.mockito.BDDMockito.given;
@@ -83,7 +84,7 @@ public void testLocalTransaction() throws Exception {
8384
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
8485
pf.setKeySerializer(new StringSerializer());
8586
pf.setTransactionIdPrefix("my.transaction.");
86-
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf, true);
87+
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
8788
template.setDefaultTopic(STRING_KEY_TOPIC);
8889
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testTxString", "false", embeddedKafka);
8990
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -119,7 +120,7 @@ public void testGlobalTransaction() throws Exception {
119120
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
120121
pf.setKeySerializer(new StringSerializer());
121122
pf.setTransactionIdPrefix("my.transaction.");
122-
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf, true);
123+
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
123124
template.setDefaultTopic(STRING_KEY_TOPIC);
124125
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testTxString", "false", embeddedKafka);
125126
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -194,6 +195,20 @@ public void testOverrideProducerIdempotentConfig() throws Exception {
194195
.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)).isEqualTo(false);
195196
}
196197

198+
@Test
199+
public void testNoTx() {
200+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
201+
senderProps.put(ProducerConfig.RETRIES_CONFIG, 1);
202+
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
203+
pf.setKeySerializer(new StringSerializer());
204+
pf.setTransactionIdPrefix("my.transaction.");
205+
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
206+
template.setDefaultTopic(STRING_KEY_TOPIC);
207+
assertThatThrownBy(() -> template.send("foo", "bar"))
208+
.isInstanceOf(IllegalStateException.class)
209+
.hasMessageContaining("No transaction is in process;");
210+
}
211+
197212
@Configuration
198213
@EnableTransactionManagement
199214
public static class DeclarativeConfig {

0 commit comments

Comments
 (0)