Appearance
RabbitMQ
安装
使用 spring-boot-starter-amqp
添加依赖:
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置 RabbitMQ 属性
properties
# 代理的主机(默认为 localhost)
spring.rabbitmq.host=localhost
# 代理的端口(默认为 5672)
spring.rabbitmq.port=5672
使用 RabbitTemplate
提示
需要在 RabbitMQ 上提前设置好需要使用的 Exchange、Routing key 和 Queue
设置默认的 Exchange、Routing key 和 Queue
properties# 默认 Exchange spring.rabbitmq.template.exchange=exchange-hello # 默认 Routing key spring.rabbitmq.template.routing-key=routing-key-hello # 默认 Queue spring.rabbitmq.template.default-receive-queue=hello
使用 RabbitTemplate 发送消息
java
package study.helloworld.spring.rabbitmq.send;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import study.helloworld.spring.rabbitmq.dto.MessageDTO;
@Component
public class Sender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(MessageDTO messageDTO) {
// 使用默认 Exchange 和 Routing key。手动使用 MessageConverter 来生成 Message 对象
MessageConverter messageConverter = rabbitTemplate
.getMessageConverter();
Message message = messageConverter.toMessage(messageDTO, null);
rabbitTemplate.send(message);
// 指定 Exchange 和 Routing key。自动使用 MessageConverter 将 Object 转换为
// Message,默认情况下,将会使用 SimpleMessageConverter,需要被发送的对象实现 Serializable
rabbitTemplate.convertAndSend("exchange-world", "routing-key-world",
messageDTO);
// 使用默认 Exchange 和指定 Routing key。设置消息的属性
rabbitTemplate.convertAndSend("routing-key-study", messageDTO,
message1 -> {
MessageProperties messageProperties = message1
.getMessageProperties();
messageProperties.setHeader("x_source", "web");
return message1;
});
}
}
使用 RabbitTemplate 接收消息
主动拉取
java
package study.helloworld.spring.rabbitmq.receive;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import study.helloworld.spring.rabbitmq.dto.MessageDTO;
@Component
public class Receiver {
private static final Logger LOGGER = LoggerFactory
.getLogger(Receiver.class);
@Autowired
private RabbitTemplate rabbitTemplate;
public void receive() {
MessageConverter messageConverter = rabbitTemplate
.getMessageConverter();
Message message = rabbitTemplate.receive();
MessageDTO messageDTO = message != null
? (MessageDTO) messageConverter.fromMessage(message)
: null;
LOGGER.info("default destination messageDTO={}", messageDTO);
messageDTO = (MessageDTO) rabbitTemplate.receiveAndConvert("world");
LOGGER.info("world destination messageDTO={}", messageDTO);
message = rabbitTemplate.receive("study");
Object header = message.getMessageProperties().getHeader("x_source");
LOGGER.info("study destination header x_source={}", header);
}
}
被动监听
java
package study.helloworld.spring.rabbitmq.receive;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import study.helloworld.spring.rabbitmq.dto.MessageDTO;
@Component
public class MessageListener {
private static final Logger LOGGER = LoggerFactory
.getLogger(MessageListener.class);
@RabbitListener(queues = { "hello" })
public void receiveHello(MessageDTO messageDTO) {
LOGGER.info("hello destination messageDTO={}", messageDTO);
}
@RabbitListener(queues = { "world" })
public void receiveWorld(MessageDTO messageDTO) {
LOGGER.info("world destination messageDTO={}", messageDTO);
}
@RabbitListener(queues = { "study" })
public void receiveStudy(MessageDTO messageDTO) {
LOGGER.info("study destination messageDTO={}", messageDTO);
}
}