Skip to content

RocketMQ

启动 RocketMQ 服务

4.x

sh
$ cd rocketmq-all-4.9.4-bin-release/
  1. 启动 NameServer
sh
### 启动 namesrv
$ nohup sh bin/mqnamesrv &
### 验证 namesrv 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
  1. 启动 Broker
sh
### 启动 broker
$ nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
### 验证 broker 是否启动成功
$ tail -f ~/logs/rocketmqlogs/broker.log

connect to x.x.x.x:10911 failed

修改 conf/broker.conf 文件,加入以下配置:

conf
brokerIP1 = localhost

brokerIP1 的值为你的真实 IP。

  1. 关闭服务器
sh
$ sh bin/mqshutdown broker
$ sh bin/mqshutdown namesrv

5.x

使用 rocketmq-spring-boot-starter

添加依赖:

xml
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

配置 RocketMQ 属性

properties
# 设置一个或多个 RocketMQ NameServer 的地址
rocketmq.name-server=localhost:9876

使用 RocketMQTemplate

  • 设置默认目的地

    properties
    # 配置 producer group
    rocketmq.producer.group=producer-group

使用 RocketMQTemplate 发送消息

java
package study.helloworld.spring.rocketmq.send;

import java.util.HashMap;
import java.util.Map;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.stereotype.Component;

import study.helloworld.spring.rocketmq.dto.MessageDTO;

@Component
public class Sender {

	@Autowired
	private RocketMQTemplate rocketMQTemplate;

	public void send(MessageDTO messageDTO) {
		// 使用默认目的地。手动使用 MessageCreator 来生成 Message 对象
		rocketMQTemplate.setDefaultDestination("hello");
		MessageConverter messageConverter = rocketMQTemplate
				.getMessageConverter();
		rocketMQTemplate.send(messageConverter.toMessage(messageDTO, null));

		// 指定目的地。自动使用 MessageConverter 将 Object 转换为 Message,默认情况下,将会使用
		// CompositeMessageConverter
		rocketMQTemplate.convertAndSend("world", messageDTO);

		// 指定目的地。设置消息的属性
		Map<String, Object> headers = new HashMap<>();
		headers.put("x_source", "web");
		rocketMQTemplate.convertAndSend("study", messageDTO, headers);

	}

}

使用 RocketMQTemplate 接收消息

主动拉取

被动监听

java
package study.helloworld.spring.rocketmq.receive;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import study.helloworld.spring.rocketmq.dto.MessageDTO;

@Configuration
public class MessageListener {

	private static final Logger LOGGER = LoggerFactory
			.getLogger(MessageListener.class);

	@Component
	@RocketMQMessageListener(consumerGroup = "hello-consumer-group", topic = "hello")
	public class ReceiveHelloConsumer implements RocketMQListener<MessageDTO> {

		@Override
		public void onMessage(MessageDTO message) {
			LOGGER.info("hello destination messageDTO={}", message);
		}

	}

	@Component
	@RocketMQMessageListener(consumerGroup = "world-consumer-group", topic = "world")
	public class ReceiveWorldConsumer implements RocketMQListener<MessageDTO> {

		@Override
		public void onMessage(MessageDTO message) {
			LOGGER.info("world destination messageDTO={}", message);
		}

	}

	@Component
	@RocketMQMessageListener(consumerGroup = "study-consumer-group", topic = "study")
	public class ReceiveStudyConsumer implements RocketMQListener<MessageDTO> {

		@Override
		public void onMessage(MessageDTO message) {
			LOGGER.info("study destination header x_source={}", message);
		}

	}

}

Released under the MIT License.