Appearance
Kafka
安装
使用 spring-kafka
添加依赖:
xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置 Kafka 属性
properties
# 设置一个或多个 Kafka 服务器的地址
spring.kafka.bootstrap-servers=localhost:9092
使用 Kafka
使用 Kafka 发送消息
设置默认主题
properties# 设置默认主题 spring.kafka.template.default-topic=hello
设置发送消息使用的序列化器
properties# 默认使用的是 StringSerializer。发送的消息是自定义类型时会出现类型转换异常 spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
java
package study.helloworld.spring.kafka.send;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Component;
import study.helloworld.spring.kafka.dto.MessageDTO;
@Component
public class Sender {
@Autowired
private KafkaTemplate<String, MessageDTO> kafkaTemplate;
public void send(MessageDTO messageDTO) {
// 发送消息到默认主题
kafkaTemplate.sendDefault(messageDTO);
// 发送消息到指定主题
ProducerRecord<String, MessageDTO> producerRecord = new ProducerRecord<>(
"world", messageDTO);
kafkaTemplate.send(producerRecord);
// 发送消息到默认主题。设置消息的属性
Map<String, Object> headers = new HashMap<>();
headers.put("x_source", "web");
Message<MessageDTO> message = new GenericMessage<>(messageDTO, headers);
kafkaTemplate.send(message);
}
}
使用 Kafka 接收消息
设置接收消息使用的反序列化器
properties# 默认使用的是 StringDeserializer。接收的消息是自定义类型时会出现类型转换异常 spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
将接收消息使用的自定义类型所在的包加入 TRUSTED_PACKAGES 列表
properties# 接收的消息是自定义类型时需要将自定义类型所在的包加入 TRUSTED_PACKAGES 列表,不然提示 The class 'study.helloworld.spring.kafka.dto.MessageDTO' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*). spring.kafka.consumer.properties.spring.json.trusted.packages=study.helloworld.spring.kafka.dto
主动拉取
被动监听
java
package study.helloworld.spring.kafka.receive;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import study.helloworld.spring.kafka.dto.MessageDTO;
@Component
public class MessageListener {
private static final Logger LOGGER = LoggerFactory
.getLogger(MessageListener.class);
@KafkaListener(id = "hello", topics = { "hello" })
public void receiveHello(MessageDTO messageDTO) {
LOGGER.info("hello destination messageDTO={}", messageDTO);
}
@KafkaListener(id = "world", topics = { "world" })
public void receiveWorld(
ConsumerRecord<String, MessageDTO> consumerRecord) {
LOGGER.info("world destination messageDTO={}", consumerRecord.value());
}
@KafkaListener(id = "hello1", topics = { "hello" })
public void receiveHello1(Message<MessageDTO> message) {
LOGGER.info("hello destination header x_source={}",
message.getHeaders().get("x_source"));
}
}
消费失败策略
反序列化失败
设置接收消息使用的反序列化器为ErrorHandlingDeserializer
properties# 当反序列化程序无法反序列化消息时,Spring无法处理该问题,因为它发生在poll()返回之前。为了解决这个问题,引入了ErrorHandlingDeserializer。 spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer # 委托给真正的反序列化器 spring.kafka.consumer.properties.spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
发送到DLT
当达到某条记录的最大失败次数时,可以使用记录恢复器配置DefaultErrorHandler
和DefaultAfterRollbackProcessor
。框架提供了DeadLetterPublishingRecoverer
,它将失败的消息发布到另一个主题。
TIP
默认情况下,死信记录发送到一个名为<originalTopic>.DLT
(原始的主题名称加上.DLT)的主题,并与原始记录相同的分区。因此,在使用默认解析器时,死信主题必须至少具有与原始主题相同多的分区。
java
@Bean
public DefaultErrorHandler errorHandler(
KafkaTemplate<Object, Object> kafkaTemplate) {
ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(
kafkaTemplate);
// 设置重试间隔10秒,次数为3次
BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
// 创建DefaultErrorHandler对象
return new DefaultErrorHandler(recoverer, backOff);
}
非阻塞重试
使用Kafka实现非阻塞重试/dlt功能通常需要设置额外的主题,并创建和配置相应的侦听器。自2.7 Spring for Apache以来,Kafka通过@RetryableTopic
注释和RetryTopicConfiguration
类提供了支持,以简化引导。
java
@RetryableTopic(fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(id = "hello", topics = { "hello" })
public void errorHello(MessageDTO messageDTO) {
LOGGER.info("hello destination messageDTO={}", messageDTO);
throw new RuntimeException("hello error");
}