Skip to content

Spring boot application does not get shut down if you are using ScheduledExecutorService to start Kafka server #4008

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
ghost opened this issue Sep 22, 2015 · 13 comments

Comments

@ghost
Copy link

ghost commented Sep 22, 2015

I am have a spring boot application that sends/receive messages to Apache Kafka server. The project using ScheduledExecutorService to start/stop apache kafka server. All the things work well till I try to shutdown the server using curl -X POST http://localhost:9012/shutdown. On running this command the spring container closes however the ScheduledExecutorService task still exists.
KafkaServerTask

public class KafkaServerTask {
    private static final Logger logger = Logger.getLogger(KafkaServerTask.class.getName());
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    ScheduledFuture<?> kafkaserverFutureTask = null;
    KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter;
    Long taskPeriod;
    public void setTaskPeriod(Long taskPeriod) {
        this.taskPeriod = taskPeriod;
    }
    public void setKafkaMessageDrivenChannelAdapter(KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter) {
        this.kafkaMessageDrivenChannelAdapter = kafkaMessageDrivenChannelAdapter;
    }
    /*
     * Bean start up method.
     * Start the kafka server.
     */
    public void startKafkaServer() {
        final Runnable startKafkaServerTask = new Runnable() {
            public void run() {
                if (!kafkaMessageDrivenChannelAdapter.isRunning()) {
                    try {
                        kafkaMessageDrivenChannelAdapter.start();
                        logger.info("kafka server starteds");
                    } catch (Exception exp) {
                        logger.info("Failed to start the kafka server. " + exp.getMessage());
                    }
                }
            }
        };
        kafkaserverFutureTask = scheduler.scheduleAtFixedRate(startKafkaServerTask, 0, taskPeriod, TimeUnit.MINUTES);
    }
    /*
     * Bean destroy method.
     * Stop the kafka server and exit the task.
     */
    public void stopKafkaServer() {
        try {
            logger.info("stop the kafka server");
            kafkaMessageDrivenChannelAdapter.stop();
        } catch (Exception exp) {
            logger.info("Failed to stop the kafka server. " + exp.getMessage());
        }
        kafkaserverFutureTask.cancel(false);
        try {
            scheduler.awaitTermination(taskPeriod, TimeUnit.MINUTES);
        } catch (InterruptedException exp) {
            logger.info("InterruptedExceptio: " + exp.getMessage());
        }
        scheduler.shutdownNow();
    }
}

Thread dump:

Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.51-b03 mixed mode):

"Attach Listener" #63 daemon prio=9 os_prio=31 tid=0x00007fe6d9861000 nid=0x9d0f waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"ServoMonitorGetValueLimiter-0" #49 daemon prio=5 os_prio=31 tid=0x00007fe6d814a800 nid=0x8803 waiting on condition [0x00000001e04b4000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000011355ff50> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"DestroyJavaVM" #45 prio=5 os_prio=31 tid=0x00007fe6d9a93800 nid=0x1303 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"pool-6-thread-1" #38 prio=5 os_prio=31 tid=0x00007fe6d93a6800 nid=0x7403 waiting on condition [0x00000001e0b04000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000001135476e0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"pool-3-thread-1" #35 prio=5 os_prio=31 tid=0x00007fe6d89e9800 nid=0x6a07 waiting on condition [0x00000001e07fb000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000113560790> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"metrics-meter-tick-thread-2" #34 daemon prio=5 os_prio=31 tid=0x00007fe6d45be000 nid=0x6e07 waiting on condition [0x00000001df6e6000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000113273290> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"metrics-meter-tick-thread-1" #33 daemon prio=5 os_prio=31 tid=0x00007fe6d421d000 nid=0x6c07 waiting on condition [0x00000001de5fb000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000113273290> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

"java-sdk-http-connection-reaper" #27 daemon prio=5 os_prio=31 tid=0x00007fe6d999c000 nid=0x300b waiting on condition [0x00000001dc556000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
    at java.lang.Thread.sleep(Native Method)
    at com.amazonaws.http.IdleConnectionReaper.run(IdleConnectionReaper.java:112)

"Abandoned connection cleanup thread" #25 daemon prio=5 os_prio=31 tid=0x00007fe6d718b000 nid=0x3307 in Object.wait() [0x00000001dd358000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
    - locked <0x0000000112ae4818> (a java.lang.ref.ReferenceQueue$Lock)
    at com.mysql.jdbc.AbandonedConnectionCleanupThread.run(AbandonedConnectionCleanupThread.java:43)

"RMI TCP Accept-0" #12 daemon prio=5 os_prio=31 tid=0x00007fe6d421e000 nid=0x5207 runnable [0x00000001dace5000]
   java.lang.Thread.State: RUNNABLE
    at java.net.PlainSocketImpl.socketAccept(Native Method)
    at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:404)
    at java.net.ServerSocket.implAccept(ServerSocket.java:545)
    at java.net.ServerSocket.accept(ServerSocket.java:513)
    at sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRMIServerSocketFactory.java:52)
    at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:400)
    at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:372)
    at java.lang.Thread.run(Thread.java:745)

"RMI TCP Accept-20562" #11 daemon prio=5 os_prio=31 tid=0x00007fe6d52f5000 nid=0x4b03 runnable [0x00000001dabe2000]
   java.lang.Thread.State: RUNNABLE
    at java.net.PlainSocketImpl.socketAccept(Native Method)
    at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:404)
    at java.net.ServerSocket.implAccept(ServerSocket.java:545)
    at java.net.ServerSocket.accept(ServerSocket.java:513)
    at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:400)
    at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:372)
    at java.lang.Thread.run(Thread.java:745)

"RMI TCP Accept-0" #10 daemon prio=5 os_prio=31 tid=0x00007fe6d38fb000 nid=0x4903 runnable [0x00000001daadf000]
   java.lang.Thread.State: RUNNABLE
    at java.net.PlainSocketImpl.socketAccept(Native Method)
    at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:404)
    at java.net.ServerSocket.implAccept(ServerSocket.java:545)
    at java.net.ServerSocket.accept(ServerSocket.java:513)
    at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:400)
    at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:372)
    at java.lang.Thread.run(Thread.java:745)

"Service Thread" #8 daemon prio=9 os_prio=31 tid=0x00007fe6d3864800 nid=0x4703 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread2" #7 daemon prio=9 os_prio=31 tid=0x00007fe6d3842000 nid=0x4503 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #6 daemon prio=9 os_prio=31 tid=0x00007fe6d3841000 nid=0x4303 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #5 daemon prio=9 os_prio=31 tid=0x00007fe6d3831800 nid=0x4103 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007fe6d4008800 nid=0x3417 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007fe6d5024800 nid=0x2d03 in Object.wait() [0x00000001d8322000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
    - locked <0x000000011182b700> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
    at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007fe6d5024000 nid=0x2b03 in Object.wait() [0x00000001d821f000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:502)
    at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157)
    - locked <0x000000011182b740> (a java.lang.ref.Reference$Lock)

"VM Thread" os_prio=31 tid=0x00007fe6d5021000 nid=0x2903 runnable 

"GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007fe6d5016800 nid=0x2103 runnable 

"GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007fe6d5017800 nid=0x2303 runnable 

"GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007fe6d5018000 nid=0x2503 runnable 

"GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007fe6d5018800 nid=0x2703 runnable 

"VM Periodic Task Thread" os_prio=31 tid=0x00007fe6d4910000 nid=0x5303 waiting on condition 

JNI global references: 350
@ghost
Copy link
Author

ghost commented Sep 22, 2015

For reproducing this issue create a spring boot application with Apche Kafka server. Make sure autoStartup is false 'kafkaMessageDrivenChannelAdapterListenerContainerSpec.autoStartup(false)'.

Then create a bean that can start and stop the server as done in KafkaServerTask.

@wilkinsona
Copy link
Member

As I said in the other issue, it's not clear why you think this is a bug in Spring Boot and you haven't answered the questions that I asked. They were:

Can you please explain why you think this is a problem with Spring Boot? What you've shared so far doesn't suggest that it is. Your javadoc states that stopKafkaServer is the "bean destroy method", but I can't see anything that means it should have been called as part of application context shutdown. Have you used custom initMethod and destroyMethod attributes on an @bean declaration?

I'm not sure that I can piece together what you're doing from your code and description above. Please provide a sample project that reproduces the problem so that we can be sure that we're diagnosing the problem that you're seeing.

@wilkinsona wilkinsona added the status: waiting-for-feedback We need additional information before we can continue label Sep 22, 2015
@ghost
Copy link
Author

ghost commented Sep 22, 2015

Might be I am wrong but from dibugging it looks that spring boot application on shutdown stops the kafka server which seems to be creating the issue. It might be related to the way spring-kafka-integration to.
I am building up a sample code that can be used to reproduce the issue.

KafkaServerTask is defined as a bean in other part of application and it has 'stopKafkaServer' as a destroy method. Yes, I haves used initMethod and destroyMethod attributes on KafkaServerTask bean

@ghost
Copy link
Author

ghost commented Sep 22, 2015

Also If I remove the code ' kafkaMessageDrivenChannelAdapter.start();' from KafkaServerTask. Everythings work fine

@ghost
Copy link
Author

ghost commented Sep 23, 2015

I was able to reproduce it with a simple spring-boot-application with kafka integration:

Please set up Apache Kafka as mentioned in http://kafka.apache.org

Code files are as below:

  1. Create a maven project.

Application.java:

@EnableScheduling
@EnableAsync
@Configuration
@ComponentScan("com")
@EnableAutoConfiguration
@EnableAspectJAutoProxy
@SpringBootApplication
@EnableWebMvc
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

KafkaConfiguration

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaConfiguration {

    /*
     * The address for apache kafka server.
     */
    @Value("${Kafka.brokerAddress}")
    private String bootstrapServer;

    @Value("${Kafka.serializerClass}")
    private String serializerClass;

    @Value("${Kafka.keySerializer}")
    private String keySerializer;

    @Value("${Kafka.valueSerializer}")
    private String valueSerializer;

    /*
     * KafkaProducer holds the configuration required for sending the message to
     * kafka server. This is a default implementation for Kaishi-kafka producer.
     */
    @Bean(name = "kafkaProducer")
    public KafkaProducer<String, String> producer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServer);
        props.put("serializer.class", serializerClass);
        props.put("key.serializer", keySerializer);
        props.put("value.serializer", valueSerializer);
        return new KafkaProducer<>(props);
    }
}

KafkaMessageConsumerConfiguration:

package com.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.kafka.Kafka;
import org.springframework.integration.dsl.kafka.KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec;
import org.springframework.integration.kafka.core.ConnectionFactory;
import org.springframework.integration.kafka.core.DefaultConnectionFactory;
import org.springframework.integration.kafka.core.ZookeeperConfiguration;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.listener.MetadataStoreOffsetManager;
import org.springframework.integration.kafka.listener.OffsetManager;
import org.springframework.integration.kafka.support.ZookeeperConnect;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.scheduling.support.PeriodicTrigger;

import com.test.KafkaServerTask;
import com.test.TestConsumerServiceImpl;

import kafka.api.OffsetRequest;

/**
 * AbstractKafkaMessageConsumerConfiguration is an helper class for building a
 * kafka message driven adapter. The class extending it should call the super
 * methods for implementation.
 * 
 */
@Configuration
public class KafkaMessageConsumerConfiguration {

    @Value("mediaConsumerListeningFromKafkaResults")
    private String mediaConsumerListeningFromKafkaResults;

    /*
     * The address for kafka zookeeper.
     */
    @Value("${Kafka.zookeeperAddress}")
    public String zookeeperAddress;

    /*
     * The method name of consumer which will be invoked when a messaged arrives
     * at kafka queue.
     */
    @Value("${Kafka.consumeMessageMethodName}")
    private String consumeMessageMethodName;

    /*
     * Bean for establishing connection to kafka server.
     */
    @Bean()
    public ConnectionFactory connectionFactory() {
        return new DefaultConnectionFactory(new ZookeeperConfiguration(new ZookeeperConnect(zookeeperAddress)));
    }

    /*
     * The poller defines the properties for polling the request from kafka
     * server.
     */
    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata defaultPoller() {
        PollerMetadata pollerMetadata = new PollerMetadata();
        pollerMetadata.setTrigger(new PeriodicTrigger(10));
        return pollerMetadata;
    }

    /*
     * OffsetManager keep track of the messages that have been sent and received
     * from kafka server.
     */
    @Bean
    public OffsetManager offsetManager(ConnectionFactory connectionFactory) {
        MetadataStoreOffsetManager offsetManager = new MetadataStoreOffsetManager(connectionFactory);
        /*
         * A string that uniquely identifies the group of consumer processes to
         * which this consumer belongs. By setting the same group id multiple
         * processes indicate that they are all part of the same consumer group.
         */
        offsetManager.setConsumerId(getConsumerGroupId());
        /*
         * smallest : automatically reset the offset to the smallest offset
         * largest : automatically reset the offset to the largest offset
         */
        // offsetManager.setReferenceTimestamp(OffsetRequest.LatestTime());
        offsetManager.setReferenceTimestamp(OffsetRequest.LatestTime());

        return offsetManager;
    }

    /*
     * Configuration for fetching the messages from kafka server.
     */
    @Bean
    public IntegrationFlow consumerIntegrationFlow() {
        KafkaMessageDrivenChannelAdapterListenerContainerSpec kafkaMessageDrivenChannelAdapterListenerContainerSpec = Kafka.messageDriverChannelAdapter(connectionFactory(), getConsumerTopic()).id(getKafkaAdpaterId());
        return IntegrationFlows
                .from(kafkaMessageDrivenChannelAdapterListenerContainerSpec.autoStartup(false).autoCommitOffset(false).payloadDecoder(String::new).keyDecoder(b -> Integer.valueOf(new String(b))).configureListenerContainer(c -> c.offsetManager(offsetManager(connectionFactory())).maxFetch(100)))
                .channel(c -> c.queue(getConsumerListeningFromKafkaResults())).handle(getConsumerService(), consumeMessageMethodName).get();
    }

    @Bean(name = "consumerService")
    @DependsOn({ "offsetManager", "connectionFactory" })
    public TestConsumerServiceImpl getConsumerService() {
        TestConsumerServiceImpl mediaConsumerService = new TestConsumerServiceImpl();
        return mediaConsumerService;
    }

    /*
     * Bean responsible for start/stop/restart of kafka server.
     */
    @Bean(destroyMethod="stopKafkaServer")
    @DependsOn("consumerIntegrationFlow")
    public KafkaServerTask kafkaServerTask(KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter) {
        KafkaServerTask kafkaServerTask = new KafkaServerTask();
        kafkaServerTask.setKafkaMessageDrivenChannelAdapter(kafkaMessageDrivenChannelAdapter);
        return kafkaServerTask;
    }

    protected String getConsumerGroupId() {
        return "testConsumer";
    }

    protected String getConsumerListeningFromKafkaResults() {
        return mediaConsumerListeningFromKafkaResults;
    }

    protected String getConsumerTopic() {
        return "mediaKafkaKaishiTopic";
    }

    protected String getKafkaAdpaterId() {
        return "kafkaMessageDrivenChannelAdapter";
    }

    protected Integer getPartitionId() {
        return new Integer("0");
    }

}

KafkaProdcuerConfiguration

package com.config;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;

import com.test.TestProducerServiceImpl;

@Configuration
public class KafkaProdcuerConfiguration {


    @Autowired
    private KafkaProducer<String, String> kafkaProducer;

    @Bean()
    @DependsOn()
    public TestProducerServiceImpl kafkaProducerService() {
        TestProducerServiceImpl testProducerServiceImpl = new TestProducerServiceImpl();
        testProducerServiceImpl.setKafkaProducer(kafkaProducer);
        return testProducerServiceImpl;
    }
}

KafkaServerTask:

package com.test;

import org.apache.log4j.Logger;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;

/*
 * KafkaServerTask is responsible for starting and stopping the kafka server.
 */
public class KafkaServerTask {
    private static final Logger logger = Logger.getLogger(KafkaServerTask.class.getName());

    KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter;

    public void setKafkaMessageDrivenChannelAdapter(KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter) {
        this.kafkaMessageDrivenChannelAdapter = kafkaMessageDrivenChannelAdapter;
    }

    /*
     * Bean start up method.
     * 
     * Start the kafka server.
     */
    @Scheduled(fixedDelay = 600000)
    @Async
    public void startKafkaServer() {
        if (!kafkaMessageDrivenChannelAdapter.isRunning()) {
            try {
                kafkaMessageDrivenChannelAdapter.start();
                logger.info("kafka server started");
            } catch (Exception exp) {
                logger.info("Failed to start the kafka server. " + exp.getMessage());
            }
        }
    }

    /*
     * Bean destroy method.
     * 
     * Stop the kafka serve.
     */
    public void stopKafkaServer() {
        try {
            kafkaMessageDrivenChannelAdapter.stop();
            logger.info("kafka server stoped");
        } catch (Exception exp) {
            logger.info("Failed to stop the kafka server. " + exp.getMessage());
        }
    }
}

TestConsumerServiceImpl:

package com.test;

public class TestConsumerServiceImpl {
    public void consumeMessage(String message) {
        System.out.println("Message received." + message);
    }
}
package com.test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;



@Controller
public class TestController {

    @Autowired
    TestProducerServiceImpl kafkaProducerService;

    @RequestMapping(value = "/hello", method = RequestMethod.GET)
    public @ResponseBody String getPage() {
        kafkaProducerService.sendMessage("mediaKafkaKaishiTopic", "testing....");
        return "Test message";
    }
}

TestProducerServiceImpl:

package com.test;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.Logger;

/**
 * The Abstract class for all the kafka producers. The class holds the generic
 * implementation for the methods that are used while sending the message to
 * Kafka.
 * 
 */

public class TestProducerServiceImpl  {

    private static final Logger logger = Logger.getLogger(TestProducerServiceImpl.class.getName());

    protected KafkaProducer<String, String> kafkaProducer;

    public void setKafkaProducer(KafkaProducer<String, String> kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    /*
     * Send message to kafka server.
     * 
     */
    public void sendMessage(String topic, String message) {
        logger.info(String.format("Processing kafka send meesage request for topic '%s' .", topic));
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, message);
        kafkaProducer.send(record, new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e != null) {
                    logger.info("Error while sending message to Kafka: " + e.getMessage());
                } else {
                    logger.info("kafka meesage sent. Topic: " + topic + ". Meessage: " + message + ".");
                }
            }
        });
    }

}

appliocation.yaml

server:
  port: 9012

Kafka:
    consumeMessageMethodName: consumeMessage
    brokerAddress: localhost:9092,localhost:9093,localhost:9094
    zookeeperAddress: ilocalhost:2181
    seed: localhost
    seedPort: 9092
    serializerClass: kafka.serializer.StringEncoder
    keySerializer: org.apache.kafka.common.serialization.StringSerializer
    valueSerializer: org.apache.kafka.common.serialization.StringSerializer
    enabled: true

endpoints:
    restart:
        enabled: true
    shutdown:
        enabled: true
    health:
        sensitive: false

ribbon:
    ReadTimeout: 180000

execution.isolation.thread.timeoutInMilliseconds: 2000

logging:
    level:
        root: INFO
        com.netflix.discovery: 'OFF'
        org.springframework.web: DEBUG

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-parent</artifactId>
        <version>Angel.SR3</version>
        <relativePath />
    </parent>

    <name>kafka-service</name>
    <groupId>com.bcgdv</groupId>
    <artifactId>core-kafka-service</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <start-class>com.bcgdv.kaishi.kafka.Application</start-class>
        <spring.integration.kafka.version>1.2.0.RELEASE</spring.integration.kafka.version>
    </properties>

    <repositories>
        <repository>
            <id>spring-milestones</id>
            <url>http://repo.spring.io/libs-milestone/</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jetty</artifactId>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjrt</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
            <version>${spring.integration.kafka.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>jms</artifactId>
                    <groupId>javax.jms</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jmxri</artifactId>
                    <groupId>com.sun.jmx</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jmxtools</artifactId>
                    <groupId>com.sun.jdmk</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j</artifactId>
                    <groupId>log4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-java-dsl</artifactId>
            <version>1.1.0.M1</version>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
        </dependency>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk</artifactId>
            <version>1.10.11</version>
            <exclusions>
                <exclusion>
                    <groupId>joda-time</groupId>
                    <artifactId>joda-time</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.4</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

Start the server, hit the url 'htpp://localhost:8080/hello/

Check whether kafka is working ot not.

Stop the server using curl like 'curl -X POST http://localhost:9012/shutdown'

Spring context closes but there are certain threads that remains active.

@dgomesbr
Copy link
Contributor

hey @suchitgupta the runnable part of your code are not tied to any TaskExecutor or Pool controlled by Spring, isn't that the issue?

 public void startKafkaServer() {
        final Runnable startKafkaServerTask = new Runnable() {
            public void run() {
                if (!kafkaMessageDrivenChannelAdapter.isRunning()) {
                    try {
                        kafkaMessageDrivenChannelAdapter.start();
                        logger.info("kafka server starteds");
                    } catch (Exception exp) {
                        logger.info("Failed to start the kafka server. " + exp.getMessage());
                    }
                }
            }
        };
        kafkaserverFutureTask = scheduler.scheduleAtFixedRate(startKafkaServerTask, 0, taskPeriod, TimeUnit.MINUTES);
    }

and yes, although you're using Spring with Spring Boot, I don't see any Spring Boot issue either.

@ghost
Copy link
Author

ghost commented Sep 23, 2015

I even tried spring scheduler and even that did not worked.

@Scheduled(fixedDelay = 600000)
    @Async
    public void startKafkaServer() {
        if (!kafkaMessageDrivenChannelAdapter.isRunning()) {
            try {
                kafkaMessageDrivenChannelAdapter.start();
                logger.info("kafka server started");
            } catch (Exception exp) {
                logger.info("Failed to start the kafka server. " + exp.getMessage());
            }
        }
    }

    /*
     * Bean destroy method.
     * 
     * Stop the kafka serve.
     */
    public void stopKafkaServer() {
        try {
            kafkaMessageDrivenChannelAdapter.stop();
            logger.info("kafka server stoped");
        } catch (Exception exp) {
            logger.info("Failed to stop the kafka server. " + exp.getMessage());
        }
    }

@ghost
Copy link
Author

ghost commented Sep 23, 2015

I even removed init-method and destroy method configuration from bean and even that didn't worked..

@dgomesbr
Copy link
Contributor

Your runnable start a thread out of the Spring thread executor. There's
nothing to do with the schedule annotation.

Sent from mobile.
On Sep 23, 2015 00:38, "suchitgupta" [email protected] wrote:

I even tried spring scheduler and even that did not worked.

@scheduled https://github.com/Scheduled(fixedDelay = 600000)
@async https://github.com/Async
public void startKafkaServer() {
if (!kafkaMessageDrivenChannelAdapter.isRunning()) {
try {
kafkaMessageDrivenChannelAdapter.start();
logger.info("kafka server started");
} catch (Exception exp) {
logger.info("Failed to start the kafka server. " + exp.getMessage());
}
}
}

/*

  • Bean destroy method.
    *
  • Stop the kafka serve.
    */
    public void stopKafkaServer() {
    try {
    kafkaMessageDrivenChannelAdapter.stop();
    logger.info("kafka server stoped");
    } catch (Exception exp) {
    logger.info("Failed to stop the kafka server. " + exp.getMessage());
    }
    }


Reply to this email directly or view it on GitHub
#4008 (comment)
.

@ghost
Copy link
Author

ghost commented Sep 23, 2015

With my new code I am not using runnable interface. It just uses @schedule.

@wilkinsona
Copy link
Member

@suchitgupta I don't think a GitHub issue is the right place to get the help that it appears you need. Looking at the code, I think you'd benefit from spending some time reading the reference guides for the projects that you're using as well as the Getting Started Guides.

A series of comments on a GitHub issue isn't a good way to share some code. The code that you have shared doesn't appear to be the same as the code that you've described. For example, I can't see any sign of your @Bean method with custom initMethod and destroyMethod attributes. If you'd like people to take the time to help you, then you should take the time to make it easy for them to do so by creating and sharing a minimal, complete, verifiable example of the problem. Once you're ready to ask for help with such an example, Stack Overflow is the best place to do so.

@philwebb philwebb removed the status: waiting-for-feedback We need additional information before we can continue label Sep 23, 2015
@ghost
Copy link
Author

ghost commented Sep 23, 2015

@wilkinsona, firstly I am sorry if the code was not easy to understand. I will definitely try to improve it.

I build up a new code to demonstrate it to be an issue with spring-boot (spring boot application with Kafka integration ) and it is reproducible with the code I shared.

As mentioned in my comments I tried different things and I even removed initMethod from Bean and the problem is still reproducible.

@skilledmonster
Copy link

Was this issue ever resolved?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants