@@ -142,17 +142,17 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
142
142
143
143
private TransactionIdSuffixStrategy transactionIdSuffixStrategy = new DefaultTransactionIdSuffixStrategy (0 );
144
144
145
- private Supplier <Serializer <K >> keySerializerSupplier ;
145
+ private @ Nullable Supplier <Serializer <K >> keySerializerSupplier ;
146
146
147
- private Supplier <Serializer <V >> valueSerializerSupplier ;
147
+ private @ Nullable Supplier <Serializer <V >> valueSerializerSupplier ;
148
148
149
- private Supplier <Serializer <K >> rawKeySerializerSupplier ;
149
+ private @ Nullable Supplier <Serializer <K >> rawKeySerializerSupplier ;
150
150
151
- private Supplier <Serializer <V >> rawValueSerializerSupplier ;
151
+ private @ Nullable Supplier <Serializer <V >> rawValueSerializerSupplier ;
152
152
153
153
private Duration physicalCloseTimeout = DEFAULT_PHYSICAL_CLOSE_TIMEOUT ;
154
154
155
- private ApplicationContext applicationContext ;
155
+ private @ Nullable ApplicationContext applicationContext ;
156
156
157
157
private String beanName = "not.managed.by.Spring" ;
158
158
@@ -162,11 +162,11 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
162
162
163
163
private boolean configureSerializers = true ;
164
164
165
- private volatile String transactionIdPrefix ;
165
+ private volatile @ Nullable String transactionIdPrefix ;
166
166
167
- private volatile String clientIdPrefix ;
167
+ private volatile @ Nullable String clientIdPrefix ;
168
168
169
- private volatile CloseSafeProducer <K , V > producer ;
169
+ private volatile @ Nullable CloseSafeProducer <K , V > producer ;
170
170
171
171
/**
172
172
* Construct a factory with the provided configuration.
@@ -267,7 +267,7 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
267
267
}
268
268
}
269
269
270
- private Supplier <Serializer <K >> keySerializerSupplier (@ Nullable Supplier <Serializer <K >> keySerializerSupplier ) {
270
+ private @ Nullable Supplier <Serializer <K >> keySerializerSupplier (@ Nullable Supplier <Serializer <K >> keySerializerSupplier ) {
271
271
this .rawKeySerializerSupplier = keySerializerSupplier ;
272
272
if (!this .configureSerializers ) {
273
273
return keySerializerSupplier ;
@@ -283,7 +283,7 @@ private Supplier<Serializer<K>> keySerializerSupplier(@Nullable Supplier<Seriali
283
283
};
284
284
}
285
285
286
- private Supplier <Serializer <V >> valueSerializerSupplier (@ Nullable Supplier <Serializer <V >> valueSerializerSupplier ) {
286
+ private @ Nullable Supplier <Serializer <V >> valueSerializerSupplier (@ Nullable Supplier <Serializer <V >> valueSerializerSupplier ) {
287
287
this .rawValueSerializerSupplier = valueSerializerSupplier ;
288
288
if (!this .configureSerializers ) {
289
289
return valueSerializerSupplier ;
@@ -456,21 +456,23 @@ public boolean isProducerPerThread() {
456
456
@ Override
457
457
@ Nullable
458
458
public Serializer <K > getKeySerializer () {
459
- return this .keySerializerSupplier .get ();
459
+ return this .keySerializerSupplier == null ? null : this . keySerializerSupplier .get ();
460
460
}
461
461
462
462
@ Override
463
463
@ Nullable
464
464
public Serializer <V > getValueSerializer () {
465
- return this .valueSerializerSupplier .get ();
465
+ return this .valueSerializerSupplier == null ? null : this . valueSerializerSupplier .get ();
466
466
}
467
467
468
468
@ Override
469
+ @ Nullable
469
470
public Supplier <Serializer <K >> getKeySerializerSupplier () {
470
471
return this .rawKeySerializerSupplier ;
471
472
}
472
473
473
474
@ Override
475
+ @ Nullable
474
476
public Supplier <Serializer <V >> getValueSerializerSupplier () {
475
477
return this .rawValueSerializerSupplier ;
476
478
}
@@ -546,9 +548,11 @@ public int getPhase() {
546
548
* properties applied
547
549
*/
548
550
@ Override
549
- public ProducerFactory <K , V > copyWithConfigurationOverride (Map <String , Object > overrideProperties ) {
551
+ public ProducerFactory <K , V > copyWithConfigurationOverride (@ Nullable Map <String , Object > overrideProperties ) {
550
552
Map <String , Object > producerProperties = new HashMap <>(getConfigurationProperties ());
551
- producerProperties .putAll (overrideProperties );
553
+ if (overrideProperties != null ) {
554
+ producerProperties .putAll (overrideProperties );
555
+ }
552
556
producerProperties = ensureExistingTransactionIdPrefixInProperties (producerProperties );
553
557
DefaultKafkaProducerFactory <K , V > newFactory = new DefaultKafkaProducerFactory <>(producerProperties ,
554
558
getKeySerializerSupplier (),
@@ -850,7 +854,7 @@ protected Producer<K, V> createTransactionalProducer() {
850
854
return createTransactionalProducer (this .transactionIdPrefix );
851
855
}
852
856
853
- protected Producer <K , V > createTransactionalProducer (String txIdPrefix ) {
857
+ protected Producer <K , V > createTransactionalProducer (@ Nullable String txIdPrefix ) {
854
858
BlockingQueue <CloseSafeProducer <K , V >> queue = getCache (txIdPrefix );
855
859
Assert .notNull (queue , () -> "No cache found for " + txIdPrefix );
856
860
CloseSafeProducer <K , V > cachedProducer = queue .poll ();
@@ -912,7 +916,7 @@ private void removeTransactionProducer(CloseSafeProducer<K, V> producer, Duratio
912
916
listeners .forEach (listener -> listener .producerRemoved (producer .clientId , producer ));
913
917
}
914
918
915
- private CloseSafeProducer <K , V > doCreateTxProducer (String prefix , String suffix ,
919
+ private CloseSafeProducer <K , V > doCreateTxProducer (@ Nullable String prefix , String suffix ,
916
920
BiPredicate <CloseSafeProducer <K , V >, Duration > remover ) {
917
921
Producer <K , V > newProducer = createRawProducer (getTxProducerConfigs (prefix + suffix ));
918
922
try {
@@ -941,7 +945,8 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
941
945
942
946
protected Producer <K , V > createRawProducer (Map <String , Object > rawConfigs ) {
943
947
Producer <K , V > kafkaProducer =
944
- new KafkaProducer <>(rawConfigs , this .keySerializerSupplier .get (), this .valueSerializerSupplier .get ());
948
+ new KafkaProducer <>(rawConfigs , this .keySerializerSupplier == null ? null : this .keySerializerSupplier .get (),
949
+ this .valueSerializerSupplier == null ? null : this .valueSerializerSupplier .get ());
945
950
for (ProducerPostProcessor <K , V > pp : this .postProcessors ) {
946
951
kafkaProducer = pp .apply (kafkaProducer );
947
952
}
@@ -1033,9 +1038,9 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
1033
1038
1034
1039
private final BiPredicate <CloseSafeProducer <K , V >, Duration > removeProducer ;
1035
1040
1036
- final String txIdPrefix ; // NOSONAR
1041
+ final @ Nullable String txIdPrefix ; // NOSONAR
1037
1042
1038
- final String txIdSuffix ; // NOSONAR
1043
+ final @ Nullable String txIdSuffix ; // NOSONAR
1039
1044
1040
1045
final long created ; // NOSONAR
1041
1046
@@ -1045,7 +1050,7 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
1045
1050
1046
1051
final int epoch ; // NOSONAR
1047
1052
1048
- private volatile Exception producerFailed ;
1053
+ private volatile @ Nullable Exception producerFailed ;
1049
1054
1050
1055
volatile boolean closed ; // NOSONAR
1051
1056
@@ -1186,8 +1191,11 @@ public void commitTransaction() throws ProducerFencedException {
1186
1191
public void abortTransaction () throws ProducerFencedException {
1187
1192
LOGGER .debug (() -> toString () + " abortTransaction()" );
1188
1193
if (this .producerFailed != null ) {
1189
- LOGGER .debug (() -> "abortTransaction ignored - previous txFailed: " + this .producerFailed .getMessage ()
1190
- + ": " + this );
1194
+ LOGGER .debug (() -> {
1195
+ String message = this .producerFailed == null ? "" : this .producerFailed .getMessage ();
1196
+ return "abortTransaction ignored - previous txFailed: " + message
1197
+ + ": " + this ;
1198
+ });
1191
1199
}
1192
1200
else {
1193
1201
try {
0 commit comments