Skip to content

Kafka

安装

使用 spring-kafka

添加依赖:

xml
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

配置 Kafka 属性

properties
# 设置一个或多个 Kafka 服务器的地址
spring.kafka.bootstrap-servers=localhost:9092

使用 Kafka

使用 Kafka 发送消息

  • 设置默认主题

    properties
    # 设置默认主题
    spring.kafka.template.default-topic=hello
  • 设置发送消息使用的序列化器

    properties
    # 默认使用的是 StringSerializer。发送的消息是自定义类型时会出现类型转换异常
    spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
java
package study.helloworld.spring.kafka.send;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Component;

import study.helloworld.spring.kafka.dto.MessageDTO;

@Component
public class Sender {

	@Autowired
	private KafkaTemplate<String, MessageDTO> kafkaTemplate;

	public void send(MessageDTO messageDTO) {
		// 发送消息到默认主题
		kafkaTemplate.sendDefault(messageDTO);

		// 发送消息到指定主题
		ProducerRecord<String, MessageDTO> producerRecord = new ProducerRecord<>(
				"world", messageDTO);
		kafkaTemplate.send(producerRecord);

		// 发送消息到默认主题。设置消息的属性
		Map<String, Object> headers = new HashMap<>();
		headers.put("x_source", "web");
		Message<MessageDTO> message = new GenericMessage<>(messageDTO, headers);
		kafkaTemplate.send(message);
	}

}

使用 Kafka 接收消息

  • 设置接收消息使用的反序列化器

    properties
    # 默认使用的是 StringDeserializer。接收的消息是自定义类型时会出现类型转换异常
    spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
  • 将接收消息使用的自定义类型所在的包加入 TRUSTED_PACKAGES 列表

    properties
    # 接收的消息是自定义类型时需要将自定义类型所在的包加入 TRUSTED_PACKAGES 列表,不然提示 The class 'study.helloworld.spring.kafka.dto.MessageDTO' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
    spring.kafka.consumer.properties.spring.json.trusted.packages=study.helloworld.spring.kafka.dto

主动拉取

被动监听

java
package study.helloworld.spring.kafka.receive;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import study.helloworld.spring.kafka.dto.MessageDTO;

@Component
public class MessageListener {

	private static final Logger LOGGER = LoggerFactory
			.getLogger(MessageListener.class);

	@KafkaListener(id = "hello", topics = { "hello" })
	public void receiveHello(MessageDTO messageDTO) {
		LOGGER.info("hello destination messageDTO={}", messageDTO);
	}

	@KafkaListener(id = "world", topics = { "world" })
	public void receiveWorld(
			ConsumerRecord<String, MessageDTO> consumerRecord) {
		LOGGER.info("world destination messageDTO={}", consumerRecord.value());
	}

	@KafkaListener(id = "hello1", topics = { "hello" })
	public void receiveHello1(Message<MessageDTO> message) {
		LOGGER.info("hello destination header x_source={}",
				message.getHeaders().get("x_source"));
	}

}

消费失败策略

反序列化失败

  • 设置接收消息使用的反序列化器为ErrorHandlingDeserializer

    properties
    # 当反序列化程序无法反序列化消息时,Spring无法处理该问题,因为它发生在poll()返回之前。为了解决这个问题,引入了ErrorHandlingDeserializer。
    spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
    # 委托给真正的反序列化器
    spring.kafka.consumer.properties.spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer

发送到DLT

当达到某条记录的最大失败次数时,可以使用记录恢复器配置DefaultErrorHandlerDefaultAfterRollbackProcessor。框架提供了DeadLetterPublishingRecoverer,它将失败的消息发布到另一个主题。

TIP

默认情况下,死信记录发送到一个名为<originalTopic>.DLT(原始的主题名称加上.DLT)的主题,并与原始记录相同的分区。因此,在使用默认解析器时,死信主题必须至少具有与原始主题相同多的分区。

java
@Bean
public DefaultErrorHandler errorHandler(
		KafkaTemplate<Object, Object> kafkaTemplate) {
	ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(
			kafkaTemplate);
	// 设置重试间隔10秒,次数为3次
	BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
	// 创建DefaultErrorHandler对象
	return new DefaultErrorHandler(recoverer, backOff);
}

非阻塞重试

使用Kafka实现非阻塞重试/dlt功能通常需要设置额外的主题,并创建和配置相应的侦听器。自2.7 Spring for Apache以来,Kafka通过@RetryableTopic注释和RetryTopicConfiguration类提供了支持,以简化引导。

java
@RetryableTopic(fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(id = "hello", topics = { "hello" })
public void errorHello(MessageDTO messageDTO) {
	LOGGER.info("hello destination messageDTO={}", messageDTO);
	throw new RuntimeException("hello error");
}

Released under the MIT License.