Appearance
RocketMQ
启动 RocketMQ 服务
4.x
sh
$ cd rocketmq-all-4.9.4-bin-release/
- 启动 NameServer
sh
### 启动 namesrv
$ nohup sh bin/mqnamesrv &
### 验证 namesrv 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
- 启动 Broker
sh
### 启动 broker
$ nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
### 验证 broker 是否启动成功
$ tail -f ~/logs/rocketmqlogs/broker.log
connect to x.x.x.x:10911 failed
修改 conf/broker.conf 文件,加入以下配置:
conf
brokerIP1 = localhost
brokerIP1
的值为你的真实 IP。
- 关闭服务器
sh
$ sh bin/mqshutdown broker
$ sh bin/mqshutdown namesrv
5.x
使用 rocketmq-spring-boot-starter
添加依赖:
xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
配置 RocketMQ 属性
properties
# 设置一个或多个 RocketMQ NameServer 的地址
rocketmq.name-server=localhost:9876
使用 RocketMQTemplate
设置默认目的地
properties# 配置 producer group rocketmq.producer.group=producer-group
使用 RocketMQTemplate 发送消息
java
package study.helloworld.spring.rocketmq.send;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.stereotype.Component;
import study.helloworld.spring.rocketmq.dto.MessageDTO;
@Component
public class Sender {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void send(MessageDTO messageDTO) {
// 使用默认目的地。手动使用 MessageCreator 来生成 Message 对象
rocketMQTemplate.setDefaultDestination("hello");
MessageConverter messageConverter = rocketMQTemplate
.getMessageConverter();
rocketMQTemplate.send(messageConverter.toMessage(messageDTO, null));
// 指定目的地。自动使用 MessageConverter 将 Object 转换为 Message,默认情况下,将会使用
// CompositeMessageConverter
rocketMQTemplate.convertAndSend("world", messageDTO);
// 指定目的地。设置消息的属性
Map<String, Object> headers = new HashMap<>();
headers.put("x_source", "web");
rocketMQTemplate.convertAndSend("study", messageDTO, headers);
}
}
使用 RocketMQTemplate 接收消息
主动拉取
被动监听
java
package study.helloworld.spring.rocketmq.receive;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import study.helloworld.spring.rocketmq.dto.MessageDTO;
@Configuration
public class MessageListener {
private static final Logger LOGGER = LoggerFactory
.getLogger(MessageListener.class);
@Component
@RocketMQMessageListener(consumerGroup = "hello-consumer-group", topic = "hello")
public class ReceiveHelloConsumer implements RocketMQListener<MessageDTO> {
@Override
public void onMessage(MessageDTO message) {
LOGGER.info("hello destination messageDTO={}", message);
}
}
@Component
@RocketMQMessageListener(consumerGroup = "world-consumer-group", topic = "world")
public class ReceiveWorldConsumer implements RocketMQListener<MessageDTO> {
@Override
public void onMessage(MessageDTO message) {
LOGGER.info("world destination messageDTO={}", message);
}
}
@Component
@RocketMQMessageListener(consumerGroup = "study-consumer-group", topic = "study")
public class ReceiveStudyConsumer implements RocketMQListener<MessageDTO> {
@Override
public void onMessage(MessageDTO message) {
LOGGER.info("study destination header x_source={}", message);
}
}
}