springboot整合rabbitmq要怎么实现?意义在哪里?

TheDisguiser 2020-04-26 17:54:07 java常见问答 7122

Rabbitmq大家吗?这是一个实现了高级消息队列协议(AMQP)的开源消息代理软件,今天小编就来教大家如何在springboot中整合rabbitmq,希望对你们有所帮助吧。

一、首先引入rabbitmq需要的依赖

<dependency>
<groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>1.7.9.RELEASE</version>
</dependency>

二、配置所需连接工厂

    /**
     * 创建连接工厂
     * @return
     */
    @Bean(name = "adapterConnectionFactory")
    @Order(value = 2)
    public ConnectionFactory adapterConnectionFactory()
    {
        //创建连接工厂
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        //设置集群方式
        connectionFactory.setAddresses(rabbitMQProperties.getAddress());
        //connectionFactory.setHost(rabbitMQProperties.getHost());设置单节点方式
        //设置端口
        connectionFactory.setPort(rabbitMQProperties.getPort());
        //设置用户名
        connectionFactory.setUsername(rabbitMQProperties.getUsername());
        //设置密码
        connectionFactory.setPassword(rabbitMQProperties.getPassword());
        //设置虚拟主机
        connectionFactory.setVirtualHost(rabbitMQProperties.getVirtualHost());
        //消息确认机制confirm-callback或return-callback,成功后confirm,失败后回调
        connectionFactory.setPublisherReturns(true);
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

三、创建消息发送的组件RabbitTemplate

    /**
     * 创建消息发送组件
     * @return
     */
    @Bean(name = "adapterRabbitTemplate")
    public RabbitTemplate rabbitTemplate()
    {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(adapterConnectionFactory());
        //exchange根据路由键匹配不到对应的queue时将会调用basic.return将消息返还给生产者
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback()
        {
            //消息成功发送到broker
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause)
            {
                ApiLog.info("mq message send (ACK)status =", ack);
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback()
        {
            //消息发送失败
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)
            {
                log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
            }
        });
        return rabbitTemplate;
    }

四、创建监听器容器工厂SimpleRabbitListenerContainerFactory

 /**
      * 消费者监听
      *
      * @return
      */
     @Bean(name = "singleListenerContainer")
     public SimpleRabbitListenerContainerFactory listenerContainer()
     {
         SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
         factory.setConnectionFactory(adapterConnectionFactory());
         factory.setMessageConverter(new Jackson2JsonMessageConverter());
         //单台并发消费者数量
         factory.setConcurrentConsumers(10);
         //单台并发消费的最大消费者数量
         factory.setMaxConcurrentConsumers(30);
         //预取消费数量,unacked数量超过这个值broker将不会接收消息
         factory.setPrefetchCount(5);
         //有事务时处理的消息数
         factory.setTxSize(1);
         //消息确认机制
         factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
         //构建retryConfig,用于在JavaConfig的模式下读取并发参数
         RabbitProperties.AmqpContainer config = rabbitMQProperties.getListener()
             .getSimple();
         RabbitProperties.ListenerRetry retryConfig = config.getRetry();
         RetryInterceptorBuilder < ? > builder = (retryConfig.isStateless() ?
             RetryInterceptorBuilder.stateless() :
             RetryInterceptorBuilder.stateful());
         //最大重试次数,消费者异常之后
         builder.maxAttempts(retryConfig.getMaxAttempts());
         builder.backOffOptions(retryConfig.getInitialInterval()
             , retryConfig.getMultiplier(), retryConfig.getMaxInterval());
         MessageRecoverer recoverer = (this.messageRecoverer != null ?
             this.messageRecoverer : new RejectAndDontRequeueRecoverer());
         builder.recoverer(recoverer);
         factory.setAdviceChain(builder.build());
         return factory;
     }

五、重试参数说明,其他参数略

spring.rabbitmq.listener.simple.retry.enabled = true
//消费者异常之后的最大重试次数,JavaConfig方式需显示构建retryConfig
spring.rabbitmq.listener.simple.retry.max - attempts = 4
spring.rabbitmq.listener.simple.retry.initial - interval = 2000
spring.rabbitmq.listener.simple.default-requeue - rejected = true

六、开始初始化

 @Autowired
 private ConnectionFactory connectionFactory;
 @Value("${mq.queue.callback_queue}")
 private String callbackQueueKey;
 @Value("${mq.exchange}")
 private String exchange;
 @PostConstruct
 public void init()
 {
     RabbitAdmin admin = new RabbitAdmin(connectionFactory);
     //声明exchange
     Exchange topicExchange = new TopicExchange(exchange);
     admin.declareExchange(topicExchange);
     //声明queue
     Queue callbackQueue = new Queue(callbackQueueKey, true);
     admin.declareQueue(callbackQueue);
     //Binding
     admin.declareBinding(BindingBuilder.bind(callbackQueue)
         .to(topicExchange)
         .with(callbackQueueKey)
         .noargs());
 }

七、启动生产者调试

    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setExchange(exchange);
        rabbitTemplate.setRoutingKey(callbackQueueKey);
        rabbitTemplate.convertAndSend(callBackRequest, new MessagePostProcessor()
        {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException
            {
                MessageProperties properties = message.getMessageProperties();
                properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                properties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, Obj.class);
                return message;
            }
        });

八、启动消费者调试

@RabbitListener(queues = "queueName", containerFactory = "singleListenerContainer")
public void consumeMessage(@Payload Object obj, Channel channel, Message message)
{
    doSometing...
}

注:消费者的异常没有捕获或抛出,或者catch块里出现异常时,在消息确认机制是AUTO的前提下将会无限重试进入死循环,这个时候可以设置最大重试次数或手动进行ack来处理。

如果需要手动ack,需要实现ChannelAwareMessageListener

@Override
public void onMessage(Message message, Channel channel) throws Exception
{
    channel.basicAck(message.getMessageProperties()
        .getDeliveryTag(), false);
}

意义:

增加可伸缩性:集群服务

消息持久化:从内存持久化消息到硬盘,再从硬盘加载到内存

以上就是今天的所有内容了,更多Java架构师相关内容请一直一直关注我们的网站吧。