Skip to content

Commit 9be96d5

Browse files
izeyeartembilan
authored andcommitted
Some code polishing for optimization
1 parent be689a3 commit 9be96d5

11 files changed

+24
-25
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,8 @@ protected KafkaConsumer<K, V> createKafkaConsumer(String groupId, String clientI
128128
}
129129
if (shouldModifyClientId) {
130130
modifiedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG,
131-
(overrideClientIdPrefix ? clientIdPrefix
132-
: modifiedConfigs.get(ConsumerConfig.CLIENT_ID_CONFIG)) + clientIdSuffix);
131+
(overrideClientIdPrefix ? clientIdPrefix
132+
: modifiedConfigs.get(ConsumerConfig.CLIENT_ID_CONFIG)) + clientIdSuffix);
133133
}
134134
return createKafkaConsumer(modifiedConfigs);
135135
}

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

+7-8
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import java.util.HashMap;
2424
import java.util.List;
2525
import java.util.Map;
26+
import java.util.Objects;
2627
import java.util.concurrent.atomic.AtomicInteger;
28+
import java.util.stream.Collectors;
2729

2830
import org.apache.kafka.common.Metric;
2931
import org.apache.kafka.common.MetricName;
@@ -99,14 +101,11 @@ public List<KafkaMessageListenerContainer<K, V>> getContainers() {
99101

100102
@Override
101103
public Collection<TopicPartition> getAssignedPartitions() {
102-
List<TopicPartition> assigned = new ArrayList<>();
103-
this.containers.forEach(c -> {
104-
Collection<TopicPartition> assignedPartitions = c.getAssignedPartitions();
105-
if (assignedPartitions != null) {
106-
assigned.addAll(assignedPartitions);
107-
}
108-
});
109-
return assigned;
104+
return this.containers.stream()
105+
.map(KafkaMessageListenerContainer::getAssignedPartitions)
106+
.filter(Objects::nonNull)
107+
.flatMap(assignedPartitions -> assignedPartitions.stream())
108+
.collect(Collectors.toList());
110109
}
111110

112111
@Override

Diff for: spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ else if (listener instanceof MessageListener) {
497497
}
498498
this.monitorTask = this.taskScheduler.scheduleAtFixedRate(() -> checkConsumer(),
499499
this.containerProperties.getMonitorInterval() * 1000);
500-
if (this.containerProperties.isLogContainerConfig() && this.logger.isInfoEnabled()) {
500+
if (this.containerProperties.isLogContainerConfig()) {
501501
this.logger.info(this);
502502
}
503503
}

Diff for: spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ public void testSimple() throws Exception {
225225
assertThat(this.listener.listen4Consumer).isSameAs(KafkaTestUtils.getPropertyValue(KafkaTestUtils
226226
.getPropertyValue(this.registry.getListenerContainer("qux"), "containers", List.class).get(0),
227227
"listenerConsumer.consumer"));
228-
assertThat(this.quxGroup.size()).isEqualTo(1);
228+
assertThat(this.quxGroup).hasSize(1);
229229
assertThat(this.quxGroup.get(0)).isSameAs(manualContainer);
230230
List<?> containers = KafkaTestUtils.getPropertyValue(manualContainer, "containers", List.class);
231231
assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.consumerGroupId"))

Diff for: spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public void testAddTopics() throws Exception {
6868
this.admin.initialize();
6969
topics = adminClient.describeTopics(Arrays.asList("foo", "bar"));
7070
Map<String, TopicDescription> results = topics.all().get();
71-
results.forEach((name, td) -> assertThat(td.partitions().size()).isEqualTo(name.equals("foo") ? 2 : 3));
71+
results.forEach((name, td) -> assertThat(td.partitions()).hasSize(name.equals("foo") ? 2 : 3));
7272
adminClient.close(10, TimeUnit.SECONDS);
7373
}
7474

Diff for: spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public void testTemplate() throws Exception {
145145
assertThat(metrics).isNotNull();
146146
List<PartitionInfo> partitions = template.partitionsFor(INT_KEY_TOPIC);
147147
assertThat(partitions).isNotNull();
148-
assertThat(partitions.size()).isEqualTo(2);
148+
assertThat(partitions).hasSize(2);
149149
pf.destroy();
150150
}
151151

@@ -173,7 +173,7 @@ public void testTemplateWithTimestamps() throws Exception {
173173
assertThat(metrics).isNotNull();
174174
List<PartitionInfo> partitions = template.partitionsFor(INT_KEY_TOPIC);
175175
assertThat(partitions).isNotNull();
176-
assertThat(partitions.size()).isEqualTo(2);
176+
assertThat(partitions).hasSize(2);
177177
pf.destroy();
178178
}
179179

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public void testAutoCommit() throws Exception {
119119
container.start();
120120

121121
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
122-
assertThat(container.getAssignedPartitions().size()).isEqualTo(2);
122+
assertThat(container.getAssignedPartitions()).hasSize(2);
123123

124124
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
125125
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
@@ -137,7 +137,7 @@ public void testAutoCommit() throws Exception {
137137
@SuppressWarnings("unchecked")
138138
List<KafkaMessageListenerContainer<Integer, String>> containers = KafkaTestUtils.getPropertyValue(container,
139139
"containers", List.class);
140-
assertThat(containers.size()).isEqualTo(2);
140+
assertThat(containers).hasSize(2);
141141
for (int i = 0; i < 2; i++) {
142142
assertThat(KafkaTestUtils.getPropertyValue(containers.get(i), "listenerConsumer.acks", Collection.class)
143143
.size()).isEqualTo(0);
@@ -415,7 +415,7 @@ public ConsumerRecords<Integer, String> answer(InvocationOnMock invocation) thro
415415
container.start();
416416
List<KafkaMessageListenerContainer<Integer, String>> containers = KafkaTestUtils.getPropertyValue(container,
417417
"containers", List.class);
418-
assertThat(containers.size()).isEqualTo(3);
418+
assertThat(containers).hasSize(3);
419419
for (int i = 0; i < 3; i++) {
420420
assertThat(KafkaTestUtils.getPropertyValue(containers.get(i), "topicPartitions",
421421
TopicPartitionInitialOffset[].class).length).isEqualTo(i < 2 ? 2 : 3);

Diff for: spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1071,7 +1071,7 @@ public void testDefinedPartitions() throws Exception {
10711071

10721072
@Override
10731073
public Consumer<Integer, String> createConsumer(String groupId, String clientIdPrefix,
1074-
String clientIdSufffix) {
1074+
String clientIdSuffix) {
10751075
return new KafkaConsumer<Integer, String>(props) {
10761076

10771077
@Override

Diff for: spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
190190
ReplyingKafkaTemplate<Integer, String, String> template = new ReplyingKafkaTemplate<>(this.config.pf(), container);
191191
template.start();
192192
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
193-
assertThat(template.getAssignedReplyTopicPartitions().size()).isEqualTo(5);
193+
assertThat(template.getAssignedReplyTopicPartitions()).hasSize(5);
194194
assertThat(template.getAssignedReplyTopicPartitions().iterator().next().topic()).isEqualTo(topic);
195195
return template;
196196
}

Diff for: spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,15 @@ public void test() {
6969
NonTrustedHeaderType ntht = (NonTrustedHeaderType) headers.get("fix");
7070
assertThat(ntht.getHeaderValue()).isNotNull();
7171
assertThat(ntht.getUntrustedType()).isEqualTo(Foo.class.getName());
72-
assertThat(headers.size()).isEqualTo(6);
72+
assertThat(headers).hasSize(6);
7373
mapper.addTrustedPackages(getClass().getPackage().getName());
7474
headers = new HashMap<>();
7575
mapper.toHeaders(recordHeaders, headers);
7676
assertThat(headers.get("foo")).isInstanceOf(byte[].class);
7777
assertThat(new String((byte[]) headers.get("foo"))).isEqualTo("bar");
7878
assertThat(headers.get("baz")).isEqualTo("qux");
7979
assertThat(headers.get("fix")).isEqualTo(new Foo());
80-
assertThat(headers.size()).isEqualTo(6);
80+
assertThat(headers).hasSize(6);
8181
}
8282

8383
public static final class Foo {

Diff for: spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ public void testBatchConverters() throws Exception {
5353
@SuppressWarnings("unchecked")
5454
List<Map<String, Object>> converted = (List<Map<String, Object>>) headers
5555
.get(KafkaHeaders.BATCH_CONVERTED_HEADERS);
56-
assertThat(converted.size()).isEqualTo(3);
56+
assertThat(converted).hasSize(3);
5757
Map<String, Object> map = converted.get(0);
58-
assertThat(map.size()).isEqualTo(1);
58+
assertThat(map).hasSize(1);
5959
assertThat(new String((byte[]) map.get("foo"))).isEqualTo("bar");
6060
}
6161

@@ -67,7 +67,7 @@ public void testNoMapper() {
6767
MessageHeaders headers = testGuts(batchMessageConverter);
6868
@SuppressWarnings("unchecked")
6969
List<Headers> natives = (List<Headers>) headers.get(KafkaHeaders.NATIVE_HEADERS);
70-
assertThat(natives.size()).isEqualTo(3);
70+
assertThat(natives).hasSize(3);
7171
Iterator<Header> iterator = natives.get(0).iterator();
7272
assertThat(iterator.hasNext()).isEqualTo(true);
7373
Header next = iterator.next();

0 commit comments

Comments
 (0)