Rabbitmq大家吗?这是一个实现了高级消息队列协议(AMQP)的开源消息代理软件,今天小编就来教大家如何在springboot中整合rabbitmq,希望对你们有所帮助吧。
一、首先引入rabbitmq需要的依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>1.7.9.RELEASE</version> </dependency>
二、配置所需连接工厂
/** * 创建连接工厂 * @return */ @Bean(name = "adapterConnectionFactory") @Order(value = 2) public ConnectionFactory adapterConnectionFactory() { //创建连接工厂 CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); //设置集群方式 connectionFactory.setAddresses(rabbitMQProperties.getAddress()); //connectionFactory.setHost(rabbitMQProperties.getHost());设置单节点方式 //设置端口 connectionFactory.setPort(rabbitMQProperties.getPort()); //设置用户名 connectionFactory.setUsername(rabbitMQProperties.getUsername()); //设置密码 connectionFactory.setPassword(rabbitMQProperties.getPassword()); //设置虚拟主机 connectionFactory.setVirtualHost(rabbitMQProperties.getVirtualHost()); //消息确认机制confirm-callback或return-callback,成功后confirm,失败后回调 connectionFactory.setPublisherReturns(true); connectionFactory.setPublisherConfirms(true); return connectionFactory; }
三、创建消息发送的组件RabbitTemplate
/** * 创建消息发送组件 * @return */ @Bean(name = "adapterRabbitTemplate") public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(adapterConnectionFactory()); //exchange根据路由键匹配不到对应的queue时将会调用basic.return将消息返还给生产者 rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { //消息成功发送到broker @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { ApiLog.info("mq message send (ACK)status =", ack); } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { //消息发送失败 @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message); } }); return rabbitTemplate; }
四、创建监听器容器工厂SimpleRabbitListenerContainerFactory
/** * 消费者监听 * * @return */ @Bean(name = "singleListenerContainer") public SimpleRabbitListenerContainerFactory listenerContainer() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(adapterConnectionFactory()); factory.setMessageConverter(new Jackson2JsonMessageConverter()); //单台并发消费者数量 factory.setConcurrentConsumers(10); //单台并发消费的最大消费者数量 factory.setMaxConcurrentConsumers(30); //预取消费数量,unacked数量超过这个值broker将不会接收消息 factory.setPrefetchCount(5); //有事务时处理的消息数 factory.setTxSize(1); //消息确认机制 factory.setAcknowledgeMode(AcknowledgeMode.AUTO); //构建retryConfig,用于在JavaConfig的模式下读取并发参数 RabbitProperties.AmqpContainer config = rabbitMQProperties.getListener() .getSimple(); RabbitProperties.ListenerRetry retryConfig = config.getRetry(); RetryInterceptorBuilder < ? > builder = (retryConfig.isStateless() ? RetryInterceptorBuilder.stateless() : RetryInterceptorBuilder.stateful()); //最大重试次数,消费者异常之后 builder.maxAttempts(retryConfig.getMaxAttempts()); builder.backOffOptions(retryConfig.getInitialInterval() , retryConfig.getMultiplier(), retryConfig.getMaxInterval()); MessageRecoverer recoverer = (this.messageRecoverer != null ? this.messageRecoverer : new RejectAndDontRequeueRecoverer()); builder.recoverer(recoverer); factory.setAdviceChain(builder.build()); return factory; }
五、重试参数说明,其他参数略
spring.rabbitmq.listener.simple.retry.enabled = true //消费者异常之后的最大重试次数,JavaConfig方式需显示构建retryConfig spring.rabbitmq.listener.simple.retry.max - attempts = 4 spring.rabbitmq.listener.simple.retry.initial - interval = 2000 spring.rabbitmq.listener.simple.default-requeue - rejected = true
六、开始初始化
@Autowired private ConnectionFactory connectionFactory; @Value("${mq.queue.callback_queue}") private String callbackQueueKey; @Value("${mq.exchange}") private String exchange; @PostConstruct public void init() { RabbitAdmin admin = new RabbitAdmin(connectionFactory); //声明exchange Exchange topicExchange = new TopicExchange(exchange); admin.declareExchange(topicExchange); //声明queue Queue callbackQueue = new Queue(callbackQueueKey, true); admin.declareQueue(callbackQueue); //Binding admin.declareBinding(BindingBuilder.bind(callbackQueue) .to(topicExchange) .with(callbackQueueKey) .noargs()); }
七、启动生产者调试
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setExchange(exchange); rabbitTemplate.setRoutingKey(callbackQueueKey); rabbitTemplate.convertAndSend(callBackRequest, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { MessageProperties properties = message.getMessageProperties(); properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); properties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, Obj.class); return message; } });
八、启动消费者调试
@RabbitListener(queues = "queueName", containerFactory = "singleListenerContainer") public void consumeMessage(@Payload Object obj, Channel channel, Message message) { doSometing... }
注:消费者的异常没有捕获或抛出,或者catch块里出现异常时,在消息确认机制是AUTO的前提下将会无限重试进入死循环,这个时候可以设置最大重试次数或手动进行ack来处理。
如果需要手动ack,需要实现ChannelAwareMessageListener
@Override public void onMessage(Message message, Channel channel) throws Exception { channel.basicAck(message.getMessageProperties() .getDeliveryTag(), false); }
意义:
增加可伸缩性:集群服务
消息持久化:从内存持久化消息到硬盘,再从硬盘加载到内存
以上就是今天的所有内容了,更多Java架构师相关内容请一直一直关注我们的网站吧。