Skip to content

GH-550: master to 2.2; fix tangles #641

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 10 additions & 19 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ subprojects { subproject ->
junitJupiterVersion = '5.1.0'
junitPlatformVersion = '1.1.0'
junitVintageVersion = '5.1.0'
kafkaVersion = '1.0.1'
kafkaVersion = '1.1.0'
log4jVersion = '2.11.0'
mockitoVersion = '2.15.0'
scalaVersion = '2.11'
slf4jVersion = '1.7.25'
springRetryVersion = '1.2.2.RELEASE'
springVersion = '5.0.4.RELEASE'
springDataCommonsVersion = '2.0.4.RELEASE'
Expand All @@ -107,6 +107,9 @@ subprojects { subproject ->

// To avoid compiler warnings about @API annotations in JUnit code
testCompileOnly 'org.apiguardian:apiguardian-api:1.0.0'

testRuntime "org.apache.logging.log4j:log4j-core:$log4jVersion"
testRuntime "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion"
}

// enable all compiler warnings; individual projects may customize further
Expand Down Expand Up @@ -177,14 +180,8 @@ project ('spring-kafka') {
compile "org.springframework:spring-messaging:$springVersion"
compile "org.springframework:spring-tx:$springVersion"
compile "org.springframework.retry:spring-retry:$springRetryVersion"
compile ("org.apache.kafka:kafka-clients:$kafkaVersion") {
exclude group: 'org.slf4j', module: 'slf4j-api'
}
compile ("org.apache.kafka:kafka-streams:$kafkaVersion") {
optional it
exclude group: 'org.slf4j', module: 'slf4j-api'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
compile "org.apache.kafka:kafka-clients:$kafkaVersion"
compile ("org.apache.kafka:kafka-streams:$kafkaVersion", optional)

compile ("com.fasterxml.jackson.core:jackson-core:$jacksonVersion", optional)
compile ("com.fasterxml.jackson.core:jackson-databind:$jacksonVersion", optional)
Expand All @@ -196,8 +193,6 @@ project ('spring-kafka') {
testCompile project (":spring-kafka-test")
testCompile "org.assertj:assertj-core:$assertjVersion"
testCompile "org.springframework:spring-tx:$springVersion"

testRuntime "org.slf4j:slf4j-log4j12:$slf4jVersion"
}
}

Expand All @@ -211,13 +206,8 @@ project ('spring-kafka-test') {

compile ("org.apache.kafka:kafka-clients:$kafkaVersion:test")

compile ("org.apache.kafka:kafka_$scalaVersion:$kafkaVersion") {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}

compile ("org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test") {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
compile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion"
compile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"

compile ("junit:junit:$junit4Version") {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
Expand All @@ -228,6 +218,7 @@ project ('spring-kafka-test') {

compile ("org.hamcrest:hamcrest-all:$hamcrestVersion", optional)
compile ("org.assertj:assertj-core:$assertjVersion", optional)
compile ("org.apache.logging.log4j:log4j-core:$log4jVersion", optional)
}
}

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=2.1.6.BUILD-SNAPSHOT
version=2.2.0.BUILD-SNAPSHOT
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Time;
import org.junit.rules.ExternalResource;

Expand Down Expand Up @@ -89,27 +88,27 @@ public class KafkaEmbedded extends ExternalResource implements KafkaRule, Initia

public static final long METADATA_PROPAGATION_TIMEOUT = 10000L;

private static final String clientVersion;
// private static final String clientVersion;

private static final Method testUtilsCreateBrokerConfigMethod;

static {
clientVersion = AppInfoParser.getVersion();
if (clientVersion.startsWith("1.1.")) {
try {
testUtilsCreateBrokerConfigMethod = TestUtils.class.getDeclaredMethod("createBrokerConfig",
int.class, String.class, boolean.class, boolean.class, int.class,
scala.Option.class, scala.Option.class, scala.Option.class,
boolean.class, boolean.class, int.class, boolean.class, int.class, boolean.class,
int.class, scala.Option.class, int.class, boolean.class);
}
catch (NoSuchMethodException | SecurityException e) {
throw new RuntimeException("Failed to determine TestUtils.createBrokerConfig() method");
}
}
else {
// clientVersion = AppInfoParser.getVersion();
// if (clientVersion.startsWith("1.1.")) {
// try {
// testUtilsCreateBrokerConfigMethod = TestUtils.class.getDeclaredMethod("createBrokerConfig",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't consider to support older version on this branch. Do we ?
I mean no need to comment out, but just remove such a code.
Although I still see how we have if...else in the createBrokerProperties().
Shound't that be fixed respectively ?

Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to just comment it out to make it easier to reinstate if we have a similar problem with 1.2.0. I can remove it entirely if you insist.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. Let it live as is!

// int.class, String.class, boolean.class, boolean.class, int.class,
// scala.Option.class, scala.Option.class, scala.Option.class,
// boolean.class, boolean.class, int.class, boolean.class, int.class, boolean.class,
// int.class, scala.Option.class, int.class, boolean.class);
// }
// catch (NoSuchMethodException | SecurityException e) {
// throw new RuntimeException("Failed to determine TestUtils.createBrokerConfig() method");
// }
// }
// else {
testUtilsCreateBrokerConfigMethod = null;
}
// }
}

private final int count;
Expand Down Expand Up @@ -222,6 +221,8 @@ public void before() throws Exception { //NOSONAR
brokerConfigProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
brokerConfigProperties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000");
brokerConfigProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
brokerConfigProperties.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp(),
String.valueOf(Long.MAX_VALUE));
if (this.brokerProperties != null) {
this.brokerProperties.forEach(brokerConfigProperties::put);
}
Expand Down Expand Up @@ -251,7 +252,7 @@ public Properties createBrokerProperties(int i) {
scala.Option.apply(null),
scala.Option.apply(null),
scala.Option.apply(null),
true, false, 0, false, 0, false, 0, scala.Option.apply(null), 1);
true, false, 0, false, 0, false, 0, scala.Option.apply(null), 1, false);
}
else {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.test.rule;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.junit.rules.MethodRule;
import org.junit.runners.model.FrameworkMethod;
import org.junit.runners.model.Statement;

import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;

/**
* A JUnit method {@link org.junit.Rule} that changes the Log4J 2 logger level for a set of classes
* or packages while a test method is running. Useful for performance or scalability tests
* where we don't want to generate a large log in a tight inner loop, or
* enabling debug logging for a test case.
*
* @author Artem Bilan
*
* @since 2.2
*
*/
public final class Log4j2LevelAdjuster implements MethodRule {

private static final Log logger = LogFactory.getLog(Log4j2LevelAdjuster.class);

private final Class<?>[] classes;

private final Level level;

private final String[] categories;

private Log4j2LevelAdjuster(Level level) {
this(level, null, new String[] { "org.springframework.integration" });
}

private Log4j2LevelAdjuster(Level level, Class<?>[] classes, String[] categories) {
Assert.notNull(level, "'level' must be null");
this.level = level;
this.classes = classes != null ? classes : new Class<?>[0];

Stream<String> categoryStream = Stream.of(getClass().getPackage().getName());

if (!ObjectUtils.isEmpty(categories)) {
categoryStream = Stream.concat(Arrays.stream(categories), categoryStream);
}

this.categories = categoryStream.toArray(String[]::new);
}

@Override
public Statement apply(final Statement base, final FrameworkMethod method, Object target) {
return new Statement() {

@Override
public void evaluate() throws Throwable {
LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
Configuration config = ctx.getConfiguration();

Map<Class<?>, Level> classLevels = new HashMap<>();
for (Class<?> cls : Log4j2LevelAdjuster.this.classes) {
String className = cls.getName();
LoggerConfig loggerConfig = config.getLoggerConfig(className);
LoggerConfig specificConfig = loggerConfig;

// We need a specific configuration for this logger,
// otherwise we would change the level of all other loggers
// having the original configuration as parent as well

if (!loggerConfig.getName().equals(className)) {
specificConfig = new LoggerConfig(className, Log4j2LevelAdjuster.this.level, true);
specificConfig.setParent(loggerConfig);
config.addLogger(className, specificConfig);
}

classLevels.put(cls, specificConfig.getLevel());
specificConfig.setLevel(Log4j2LevelAdjuster.this.level);
}

Map<String, Level> categoryLevels = new HashMap<>();
for (String category : Log4j2LevelAdjuster.this.categories) {
LoggerConfig loggerConfig = config.getLoggerConfig(category);
LoggerConfig specificConfig = loggerConfig;

// We need a specific configuration for this logger,
// otherwise we would change the level of all other loggers
// having the original configuration as parent as well

if (!loggerConfig.getName().equals(category)) {
specificConfig = new LoggerConfig(category, Log4j2LevelAdjuster.this.level, true);
specificConfig.setParent(loggerConfig);
config.addLogger(category, specificConfig);
}

categoryLevels.put(category, specificConfig.getLevel());
specificConfig.setLevel(Log4j2LevelAdjuster.this.level);
}

ctx.updateLoggers();

logger.debug("++++++++++++++++++++++++++++ "
+ "Overridden log level setting for: " + Arrays.toString(Log4j2LevelAdjuster.this.classes)
+ " and " + Arrays.toString(Log4j2LevelAdjuster.this.categories)
+ " for test " + method.getName());

try {
base.evaluate();
}
finally {
logger.debug("++++++++++++++++++++++++++++ "
+ "Restoring log level setting for: " + Arrays.toString(Log4j2LevelAdjuster.this.classes)
+ " and " + Arrays.toString(Log4j2LevelAdjuster.this.categories)
+ " for test " + method.getName());

for (Class<?> cls : Log4j2LevelAdjuster.this.classes) {
LoggerConfig loggerConfig = config.getLoggerConfig(cls.getName());
loggerConfig.setLevel(classLevels.get(cls));
}

for (String category : Log4j2LevelAdjuster.this.categories) {
LoggerConfig loggerConfig = config.getLoggerConfig(category);
loggerConfig.setLevel(categoryLevels.get(category));
}

ctx.updateLoggers();
}
}

};
}

/**
* Specify the classes for logging level adjusting configured before.
* A new copy Log4j2LevelAdjuster instance is produced by this method.
* The provided classes parameter overrides existing value in the {@link #classes}.
* @param classes the classes to use for logging level adjusting
* @return a Log4j2LevelAdjuster copy with the provided classes
*/
public Log4j2LevelAdjuster classes(Class<?>... classes) {
return classes(false, classes);
}

/**
* Specify the classes for logging level adjusting configured before.
* A new copy Log4j2LevelAdjuster instance is produced by this method.
* The provided classes parameter can be merged with existing value in the {@link #classes}.
* @param merge to merge or not with previously configured {@link #classes}
* @param classes the classes to use for logging level adjusting
* @return a Log4j2LevelAdjuster copy with the provided classes
* @since 5.0.2
*/
public Log4j2LevelAdjuster classes(boolean merge, Class<?>... classes) {
return new Log4j2LevelAdjuster(this.level,
merge ? Stream.of(this.classes, classes).flatMap(Stream::of).toArray(Class<?>[]::new) : classes,
this.categories);
}

/**
* Specify the categories for logging level adjusting configured before.
* A new copy Log4j2LevelAdjuster instance is produced by this method.
* The provided categories parameter overrides existing value in the {@link #categories}.
* @param categories the categories to use for logging level adjusting
* @return a Log4j2LevelAdjuster copy with the provided categories
*/
public Log4j2LevelAdjuster categories(String... categories) {
return categories(false, categories);
}

/**
* Specify the categories for logging level adjusting configured before.
* A new copy Log4j2LevelAdjuster instance is produced by this method.
* The provided categories parameter can be merged with existing value in the {@link #categories}.
* @param merge to merge or not with previously configured {@link #categories}
* @param categories the categories to use for logging level adjusting
* @return a Log4j2LevelAdjuster copy with the provided categories
* @since 5.0.2
*/
public Log4j2LevelAdjuster categories(boolean merge, String... categories) {
return new Log4j2LevelAdjuster(this.level, this.classes,
merge ? Stream.of(this.categories, categories).flatMap(Stream::of).toArray(String[]::new) : categories);
}

/**
* The factory to produce Log4j2LevelAdjuster instances for {@link Level#TRACE} logging
* with the {@code org.springframework.integration} as default category.
* @return the Log4j2LevelAdjuster instance
*/
public static Log4j2LevelAdjuster trace() {
return forLevel(Level.TRACE);
}

/**
* The factory to produce Log4j2LevelAdjuster instances for {@link Level#DEBUG} logging
* with the {@code org.springframework.integration} as default category.
* @return the Log4j2LevelAdjuster instance
*/
public static Log4j2LevelAdjuster debug() {
return forLevel(Level.DEBUG);
}

/**
* The factory to produce Log4j2LevelAdjuster instances for {@link Level#INFO} logging
* with the {@code org.springframework.integration} as default category.
* @return the Log4j2LevelAdjuster instance
*/
public static Log4j2LevelAdjuster info() {
return forLevel(Level.INFO);
}

/**
* The factory to produce Log4j2LevelAdjuster instances for arbitrary logging {@link Level}
* with the {@code org.springframework.integration} as default category.
* @param level the {@link Level} to use for logging
* @return the Log4j2LevelAdjuster instance
*/
public static Log4j2LevelAdjuster forLevel(Level level) {
return new Log4j2LevelAdjuster(level);
}

}
Loading