Skip to content

Commit 966e8f6

Browse files
author
Zhiyang.Wang1
committed
Merge branch 'main' into spring-projectsGH-2588
2 parents 4ec6061 + aec40e4 commit 966e8f6

File tree

8 files changed

+23
-18
lines changed

8 files changed

+23
-18
lines changed

build.gradle

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,17 @@ ext {
6565
junit4Version = '4.13.2'
6666
junitJupiterVersion = '5.10.0'
6767
kafkaVersion = '3.6.0'
68-
log4jVersion = '2.20.0'
68+
log4jVersion = '2.21.0'
6969
micrometerDocsVersion = "1.0.2"
70-
micrometerVersion = '1.12.0-RC1'
71-
micrometerTracingVersion = '1.2.0-RC1'
70+
micrometerVersion = '1.12.0'
71+
micrometerTracingVersion = '1.2.0'
7272
mockitoVersion = '5.6.0'
73-
reactorVersion = '2023.0.0-RC1'
73+
reactorVersion = '2023.0.0'
7474
scalaVersion = '2.13'
75-
springBootVersion = '3.0.9' // docs module
76-
springDataVersion = '2023.1.0-RC1'
75+
springBootVersion = '3.0.12' // docs module
76+
springDataVersion = '2023.1.0'
7777
springRetryVersion = '2.0.4'
78-
springVersion = '6.1.0-SNAPSHOT'
78+
springVersion = '6.1.0'
7979
zookeeperVersion = '3.8.3'
8080

8181
idPrefix = 'kafka'

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version=3.1.0-SNAPSHOT
1+
version=3.1.1-SNAPSHOT
22
org.gradle.jvmargs=-Xmx1536M -Dfile.encoding=UTF-8
33
org.gradle.caching=true
44
org.gradle.parallel=true

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,8 @@ With a record listener, when `nack()` is called, any pending offsets are committ
233233
The consumer can be paused before redelivery, by setting the `sleep` argument.
234234
This is similar functionality to throwing an exception when the container is configured with a `DefaultErrorHandler`.
235235

236+
IMPORTANT: `nack()` pauses the entire listener for the specified sleep duration including all assigned partitions.
237+
236238
When using a batch listener, you can specify the index within the batch where the failure occurred.
237239
When `nack()` is called, offsets will be committed for records before the index and seeks are performed on the partitions for the failed and discarded records so that they will be redelivered on the next `poll()`.
238240

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/ooo-commits.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ The consumer will be paused (no new records delivered) until all the offsets for
99

1010
IMPORTANT: While this feature allows applications to process records asynchronously, it should be understood that it increases the possibility of duplicate deliveries after a failure.
1111

12+
IMPORTANT: When `asyncAcks` is activated, it is not possible to use `nack()` (negative acknowledgments) when xref:kafka/receiving-messages/message-listener-container.adoc#committing-offsets[Committing Offsets].
13+

spring-kafka-docs/src/main/antora/modules/ROOT/pages/testing.adoc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,8 +339,8 @@ public class EmbeddedKafkaConditionTests {
339339
}
340340
----
341341

342-
A stand-alone (not Spring test context) broker will be created if the class annotated with `@EmbeddedBroker` is not also annotated (or meta annotated) with `ExtendedWith(SpringExtension.class)`.
343-
`@SpringJunitConfig` and `@SpringBootTest` are so meta annotated and the context-based broker will be used when either of those annotations are also present.
342+
A standalone broker (outside the Spring's TestContext) will be created unless a class annotated `@EmbeddedKafka` is also annotated (or meta-annotated) with `ExtendWith(SpringExtension.class)`.
343+
`@SpringJunitConfig` and `@SpringBootTest` are so meta-annotated and the context-based broker will be used when either of those annotations are also present.
344344

345345
IMPORTANT: When there is a Spring test application context available, the topics and broker properties can contain property placeholders, which will be resolved as long as the property is defined somewhere.
346346
If there is no Spring context available, these placeholders won't be resolved.

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.Properties;
2323
import java.util.Set;
2424
import java.util.function.BiConsumer;
25+
import java.util.stream.Collectors;
26+
import java.util.stream.StreamSupport;
2527

2628
import org.apache.kafka.clients.consumer.Consumer;
2729
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -189,12 +191,9 @@ private static void listen(List<RetryListener> listeners, ConsumerRecords<?, ?>
189191
* @return the String.
190192
*/
191193
public static String recordsToString(ConsumerRecords<?, ?> records) {
192-
StringBuffer sb = new StringBuffer();
193-
records.spliterator().forEachRemaining(rec -> sb
194-
.append(KafkaUtils.format(rec))
195-
.append(','));
196-
sb.deleteCharAt(sb.length() - 1);
197-
return sb.toString();
194+
return StreamSupport.stream(records.spliterator(), false)
195+
.map(KafkaUtils::format)
196+
.collect(Collectors.joining(","));
198197
}
199198

200199
/**

spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ public interface Acknowledgment {
4141
/**
4242
* Negatively acknowledge the current record - discard remaining records from the poll
4343
* and re-seek all partitions so that this record will be redelivered after the sleep
44-
* duration. Must be called on the consumer thread.
44+
* duration. This will pause reading for the entire message listener for the specified
45+
* sleep duration and is not limited to a single partition.
46+
* Must be called on the consumer thread.
4547
* <p>
4648
* @param sleep the duration to sleep; the actual sleep time will be larger of this value
4749
* and the container's {@code maxPollInterval}, which defaults to 5 seconds.

spring-kafka/src/main/java/org/springframework/kafka/support/LoggingProducerListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void setMaxContentLogged(int maxContentLogged) {
6969
@Override
7070
public void onError(ProducerRecord<K, V> record, @Nullable RecordMetadata recordMetadata, Exception exception) {
7171
this.logger.error(exception, () -> {
72-
StringBuffer logOutput = new StringBuffer();
72+
StringBuilder logOutput = new StringBuilder();
7373
logOutput.append("Exception thrown when sending a message");
7474
if (this.includeContents) {
7575
logOutput.append(" with key='")

0 commit comments

Comments
 (0)