Skip to content

RabbitMQ

安装

使用 spring-boot-starter-amqp

添加依赖:

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置 RabbitMQ 属性

properties
# 代理的主机(默认为 localhost)
spring.rabbitmq.host=localhost
# 代理的端口(默认为 5672)
spring.rabbitmq.port=5672

使用 RabbitTemplate

提示

需要在 RabbitMQ 上提前设置好需要使用的 Exchange、Routing key 和 Queue

  • 设置默认的 Exchange、Routing key 和 Queue

    properties
    # 默认 Exchange
    spring.rabbitmq.template.exchange=exchange-hello
    # 默认 Routing key
    spring.rabbitmq.template.routing-key=routing-key-hello
    # 默认 Queue
    spring.rabbitmq.template.default-receive-queue=hello

使用 RabbitTemplate 发送消息

java
package study.helloworld.spring.rabbitmq.send;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import study.helloworld.spring.rabbitmq.dto.MessageDTO;

@Component
public class Sender {

	@Autowired
	private RabbitTemplate rabbitTemplate;

	public void send(MessageDTO messageDTO) {
		// 使用默认 Exchange 和 Routing key。手动使用 MessageConverter 来生成 Message 对象
		MessageConverter messageConverter = rabbitTemplate
				.getMessageConverter();
		Message message = messageConverter.toMessage(messageDTO, null);
		rabbitTemplate.send(message);

		// 指定 Exchange 和 Routing key。自动使用 MessageConverter 将 Object 转换为
		// Message,默认情况下,将会使用 SimpleMessageConverter,需要被发送的对象实现 Serializable
		rabbitTemplate.convertAndSend("exchange-world", "routing-key-world",
				messageDTO);

		// 使用默认 Exchange 和指定 Routing key。设置消息的属性
		rabbitTemplate.convertAndSend("routing-key-study", messageDTO,
				message1 -> {
					MessageProperties messageProperties = message1
							.getMessageProperties();
					messageProperties.setHeader("x_source", "web");
					return message1;
				});
	}

}

使用 RabbitTemplate 接收消息

主动拉取

java
package study.helloworld.spring.rabbitmq.receive;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import study.helloworld.spring.rabbitmq.dto.MessageDTO;

@Component
public class Receiver {

	private static final Logger LOGGER = LoggerFactory
			.getLogger(Receiver.class);

	@Autowired
	private RabbitTemplate rabbitTemplate;

	public void receive() {
		MessageConverter messageConverter = rabbitTemplate
				.getMessageConverter();
		Message message = rabbitTemplate.receive();
		MessageDTO messageDTO = message != null
				? (MessageDTO) messageConverter.fromMessage(message)
				: null;
		LOGGER.info("default destination messageDTO={}", messageDTO);

		messageDTO = (MessageDTO) rabbitTemplate.receiveAndConvert("world");
		LOGGER.info("world destination messageDTO={}", messageDTO);

		message = rabbitTemplate.receive("study");
		Object header = message.getMessageProperties().getHeader("x_source");
		LOGGER.info("study destination header x_source={}", header);
	}

}

被动监听

java
package study.helloworld.spring.rabbitmq.receive;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import study.helloworld.spring.rabbitmq.dto.MessageDTO;

@Component
public class MessageListener {

	private static final Logger LOGGER = LoggerFactory
			.getLogger(MessageListener.class);

	@RabbitListener(queues = { "hello" })
	public void receiveHello(MessageDTO messageDTO) {
		LOGGER.info("hello destination messageDTO={}", messageDTO);
	}

	@RabbitListener(queues = { "world" })
	public void receiveWorld(MessageDTO messageDTO) {
		LOGGER.info("world destination messageDTO={}", messageDTO);
	}

	@RabbitListener(queues = { "study" })
	public void receiveStudy(MessageDTO messageDTO) {
		LOGGER.info("study destination messageDTO={}", messageDTO);
	}

}

Released under the MIT License.