package com.flowmailer; import java.io.IOException; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.MessageProperties; import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.impl.AMQConnection; import com.rabbitmq.client.impl.FrameHandler; import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; import com.rabbitmq.client.AMQP.BasicProperties; public class TestRabbitRecover { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("***"); factory.setUsername("***"); factory.setPassword("***"); String testQueueName = "testqueue1"; Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(testQueueName, "fanout", true, false, null); channel.queueDeclare(testQueueName, true, false, false, null); channel.queueBind(testQueueName, testQueueName, ""); for(int i = 0; i < 10; i++) { channel.basicPublish(testQueueName, testQueueName, MessageProperties.PERSISTENT_BASIC, String.valueOf(i).getBytes(StandardCharsets.US_ASCII)); System.out.println("published: '" + i + "'"); } channel.basicQos(3); // prefetch 3 String newConsumerTag = channel.basicConsume(testQueueName, false, new Consumer() { @Override public void handleCancel(String consumerTag) throws IOException { System.out.println("handleCancel " + consumerTag); } @Override public void handleCancelOk(String consumerTag) { System.out.println("handleCancelOk " + consumerTag); } @Override public void handleConsumeOk(String consumerTag) { System.out.println("handleConsumeOk " + consumerTag); } @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties props, byte[] body) throws IOException { System.out.println("handleDelivery " + consumerTag + ": " + envelope.getDeliveryTag() + " = '" + new String(body, StandardCharsets.US_ASCII) + "'"); } @Override public void handleRecoverOk(String consumerTag) { System.out.println("handleRecoverOk " + consumerTag); } @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { System.out.println("handleShutdownSignal " + consumerTag); } }); System.out.println("newConsumerTag " + newConsumerTag); for(int i = 0; i < 10; i++) { if(i == 5) { // pretend connection closed for some unknown reason AMQConnection c = ((AutorecoveringConnection)connection).getDelegate(); Field f = AMQConnection.class.getDeclaredField("_frameHandler"); f.setAccessible(true); FrameHandler fh = (FrameHandler)f.get(c); fh.close(); } ackMulti(channel, i+1); } channel.basicCancel(newConsumerTag); Thread.sleep(5000); channel.close(); connection.close(); } private static void ackMulti(Channel channel, long tag) throws InterruptedException, IOException { Thread.sleep(1000); System.out.println("basicAck " + tag); while(true) { try { channel.basicAck(tag, true); return; } catch(Exception e) { System.out.println("e: " + e.getMessage()); } Thread.sleep(1000); } } }