|
24 | 24 | import java.util.Calendar;
|
25 | 25 | import java.util.Map;
|
26 | 26 | import java.util.Map.Entry;
|
27 |
| -import java.util.Optional; |
28 | 27 | import java.util.Timer;
|
29 | 28 | import java.util.TimerTask;
|
30 | 29 | import java.util.UUID;
|
|
65 | 64 | import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
66 | 65 | import org.springframework.amqp.rabbit.connection.ConnectionFactoryConfigurationUtils;
|
67 | 66 | import org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean;
|
| 67 | +import org.springframework.amqp.rabbit.connection.RabbitUtils; |
68 | 68 | import org.springframework.amqp.rabbit.core.DeclareExchangeConnectionListener;
|
69 | 69 | import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
70 | 70 | import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
| 71 | +import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator; |
71 | 72 | import org.springframework.amqp.utils.JavaUtils;
|
72 | 73 | import org.springframework.core.io.Resource;
|
73 | 74 | import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
|
74 | 75 | import org.springframework.retry.RetryPolicy;
|
75 | 76 | import org.springframework.retry.policy.SimpleRetryPolicy;
|
76 | 77 | import org.springframework.retry.support.RetryTemplate;
|
| 78 | +import org.springframework.util.Assert; |
77 | 79 | import org.springframework.util.StringUtils;
|
78 | 80 |
|
79 | 81 | import com.rabbitmq.client.ConnectionFactory;
|
@@ -166,6 +168,7 @@ public static AmqpAppender createAppender(// NOSONAR NCSS line count
|
166 | 168 | @PluginAttribute("trustStore") String trustStore,
|
167 | 169 | @PluginAttribute("trustStorePassphrase") String trustStorePassphrase,
|
168 | 170 | @PluginAttribute("trustStoreType") String trustStoreType,
|
| 171 | + @PluginAttribute("saslConfig") String saslConfig, |
169 | 172 | @PluginAttribute("senderPoolSize") int senderPoolSize,
|
170 | 173 | @PluginAttribute("maxSenderRetries") int maxSenderRetries,
|
171 | 174 | @PluginAttribute("applicationId") String applicationId,
|
@@ -195,41 +198,43 @@ public static AmqpAppender createAppender(// NOSONAR NCSS line count
|
195 | 198 | theLayout = PatternLayout.createDefaultLayout();
|
196 | 199 | }
|
197 | 200 | AmqpManager manager = new AmqpManager(configuration.getLoggerContext(), name);
|
198 |
| - manager.uri = uri; |
199 |
| - manager.host = host; |
200 |
| - Optional.ofNullable(port).ifPresent(v -> manager.port = Integers.parseInt(v)); |
201 |
| - manager.addresses = addresses; |
202 |
| - manager.username = user; |
203 |
| - manager.password = password; |
204 |
| - manager.virtualHost = virtualHost; |
205 |
| - manager.useSsl = useSsl; |
206 |
| - manager.verifyHostname = verifyHostname; |
207 |
| - manager.sslAlgorithm = sslAlgorithm; |
208 |
| - manager.sslPropertiesLocation = sslPropertiesLocation; |
209 |
| - manager.keyStore = keyStore; |
210 |
| - manager.keyStorePassphrase = keyStorePassphrase; |
211 |
| - manager.keyStoreType = keyStoreType; |
212 |
| - manager.trustStore = trustStore; |
213 |
| - manager.trustStorePassphrase = trustStorePassphrase; |
214 |
| - manager.trustStoreType = trustStoreType; |
215 |
| - manager.senderPoolSize = senderPoolSize; |
216 |
| - manager.maxSenderRetries = maxSenderRetries; |
217 |
| - manager.applicationId = applicationId; |
218 |
| - manager.routingKeyPattern = routingKeyPattern; |
219 |
| - manager.generateId = generateId; |
220 |
| - manager.deliveryMode = MessageDeliveryMode.valueOf(deliveryMode); |
221 |
| - manager.exchangeName = exchange; |
222 |
| - manager.exchangeType = exchangeType; |
223 |
| - manager.declareExchange = declareExchange; |
224 |
| - manager.durable = durable; |
225 |
| - manager.autoDelete = autoDelete; |
226 |
| - manager.contentType = contentType; |
227 |
| - manager.contentEncoding = contentEncoding; |
228 |
| - manager.connectionName = connectionName; |
229 |
| - manager.clientConnectionProperties = clientConnectionProperties; |
230 |
| - manager.charset = charset; |
231 |
| - manager.async = async; |
232 |
| - manager.addMdcAsHeaders = addMdcAsHeaders; |
| 201 | + JavaUtils.INSTANCE |
| 202 | + .acceptIfNotNull(uri, value -> manager.uri = value) |
| 203 | + .acceptIfNotNull(host, value -> manager.host = value) |
| 204 | + .acceptIfNotNull(port, value -> manager.port = Integers.parseInt(value)) |
| 205 | + .acceptIfNotNull(addresses, value -> manager.addresses = value) |
| 206 | + .acceptIfNotNull(user, value -> manager.username = value) |
| 207 | + .acceptIfNotNull(password, value -> manager.password = value) |
| 208 | + .acceptIfNotNull(virtualHost, value -> manager.virtualHost = value) |
| 209 | + .acceptIfNotNull(useSsl, value -> manager.useSsl = value) |
| 210 | + .acceptIfNotNull(verifyHostname, value -> manager.verifyHostname = value) |
| 211 | + .acceptIfNotNull(sslAlgorithm, value -> manager.sslAlgorithm = value) |
| 212 | + .acceptIfNotNull(sslPropertiesLocation, value -> manager.sslPropertiesLocation = value) |
| 213 | + .acceptIfNotNull(keyStore, value -> manager.keyStore = value) |
| 214 | + .acceptIfNotNull(keyStorePassphrase, value -> manager.keyStorePassphrase = value) |
| 215 | + .acceptIfNotNull(keyStoreType, value -> manager.keyStoreType = value) |
| 216 | + .acceptIfNotNull(trustStore, value -> manager.trustStore = value) |
| 217 | + .acceptIfNotNull(trustStorePassphrase, value -> manager.trustStorePassphrase = value) |
| 218 | + .acceptIfNotNull(trustStoreType, value -> manager.trustStoreType = value) |
| 219 | + .acceptIfNotNull(saslConfig, value -> manager.saslConfig = value) |
| 220 | + .acceptIfNotNull(senderPoolSize, value -> manager.senderPoolSize = value) |
| 221 | + .acceptIfNotNull(maxSenderRetries, value -> manager.maxSenderRetries = value) |
| 222 | + .acceptIfNotNull(applicationId, value -> manager.applicationId = value) |
| 223 | + .acceptIfNotNull(routingKeyPattern, value -> manager.routingKeyPattern = value) |
| 224 | + .acceptIfNotNull(generateId, value -> manager.generateId = value) |
| 225 | + .acceptIfNotNull(deliveryMode, value -> manager.deliveryMode = MessageDeliveryMode.valueOf(deliveryMode)) |
| 226 | + .acceptIfNotNull(exchange, value -> manager.exchangeName = value) |
| 227 | + .acceptIfNotNull(exchangeType, value -> manager.exchangeType = value) |
| 228 | + .acceptIfNotNull(declareExchange, value -> manager.declareExchange = value) |
| 229 | + .acceptIfNotNull(durable, value -> manager.durable = value) |
| 230 | + .acceptIfNotNull(autoDelete, value -> manager.autoDelete = value) |
| 231 | + .acceptIfNotNull(contentType, value -> manager.contentType = value) |
| 232 | + .acceptIfNotNull(contentEncoding, value -> manager.contentEncoding = value) |
| 233 | + .acceptIfNotNull(connectionName, value -> manager.connectionName = value) |
| 234 | + .acceptIfNotNull(clientConnectionProperties, value -> manager.clientConnectionProperties = value) |
| 235 | + .acceptIfNotNull(charset, value -> manager.charset = value) |
| 236 | + .acceptIfNotNull(async, value -> manager.async = value) |
| 237 | + .acceptIfNotNull(addMdcAsHeaders, value -> manager.addMdcAsHeaders = value); |
233 | 238 |
|
234 | 239 | BlockingQueue<Event> eventQueue;
|
235 | 240 | if (blockingQueueFactory == null) {
|
@@ -293,11 +298,10 @@ protected void sendEvent(Event event, Map<?, ?> properties) {
|
293 | 298 | Level level = logEvent.getLevel();
|
294 | 299 |
|
295 | 300 | MessageProperties amqpProps = new MessageProperties();
|
296 |
| - amqpProps.setDeliveryMode(this.manager.deliveryMode); |
297 |
| - amqpProps.setContentType(this.manager.contentType); |
298 |
| - if (null != this.manager.contentEncoding) { |
299 |
| - amqpProps.setContentEncoding(this.manager.contentEncoding); |
300 |
| - } |
| 301 | + JavaUtils.INSTANCE |
| 302 | + .acceptIfNotNull(this.manager.deliveryMode, amqpProps::setDeliveryMode) |
| 303 | + .acceptIfNotNull(this.manager.contentType, amqpProps::setContentType) |
| 304 | + .acceptIfNotNull(this.manager.contentEncoding, amqpProps::setContentEncoding); |
301 | 305 | amqpProps.setHeader(CATEGORY_NAME, name);
|
302 | 306 | amqpProps.setHeader(THREAD_NAME, logEvent.getThreadName());
|
303 | 307 | amqpProps.setHeader(CATEGORY_LEVEL, level.toString());
|
@@ -578,6 +582,12 @@ protected static class AmqpManager extends AbstractManager {
|
578 | 582 | */
|
579 | 583 | private String trustStoreType = "JKS";
|
580 | 584 |
|
| 585 | + /** |
| 586 | + * SaslConfig. |
| 587 | + * @see RabbitUtils#stringToSaslConfig(String, ConnectionFactory) |
| 588 | + */ |
| 589 | + public String saslConfig; |
| 590 | + |
581 | 591 | /**
|
582 | 592 | * Default content-type of log messages.
|
583 | 593 | */
|
@@ -644,6 +654,7 @@ protected AmqpManager(LoggerContext loggerContext, String name) {
|
644 | 654 | private boolean activateOptions() {
|
645 | 655 | ConnectionFactory rabbitConnectionFactory = createRabbitConnectionFactory();
|
646 | 656 | if (rabbitConnectionFactory != null) {
|
| 657 | + Assert.state(this.applicationId != null, "applicationId is required"); |
647 | 658 | this.routingKeyLayout = PatternLayout.newBuilder()
|
648 | 659 | .withPattern(this.routingKeyPattern.replaceAll("%X\\{applicationId}", this.applicationId))
|
649 | 660 | .withCharset(Charset.forName(this.charset))
|
@@ -721,6 +732,16 @@ protected void configureRabbitConnectionFactory(RabbitConnectionFactoryBean fact
|
721 | 732 | factoryBean.setTrustStore(this.trustStore);
|
722 | 733 | factoryBean.setTrustStorePassphrase(this.trustStorePassphrase);
|
723 | 734 | factoryBean.setTrustStoreType(this.trustStoreType);
|
| 735 | + JavaUtils.INSTANCE |
| 736 | + .acceptIfNotNull(this.saslConfig, config -> { |
| 737 | + try { |
| 738 | + factoryBean.setSaslConfig(RabbitUtils.stringToSaslConfig(config, |
| 739 | + factoryBean.getRabbitConnectionFactory())); |
| 740 | + } |
| 741 | + catch (Exception e) { |
| 742 | + throw RabbitExceptionTranslator.convertRabbitAccessException(e); |
| 743 | + } |
| 744 | + }); |
724 | 745 | }
|
725 | 746 | }
|
726 | 747 | }
|
|
0 commit comments