Appearance
Kafka 
安装 
使用 spring-kafka 
添加依赖: 
xml
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>配置 Kafka 属性 
properties
# 设置一个或多个 Kafka 服务器的地址
spring.kafka.bootstrap-servers=localhost:9092使用 Kafka 
使用 Kafka 发送消息 
设置默认主题
properties# 设置默认主题 spring.kafka.template.default-topic=hello设置发送消息使用的序列化器
properties# 默认使用的是 StringSerializer。发送的消息是自定义类型时会出现类型转换异常 spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
java
package study.helloworld.spring.kafka.send;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Component;
import study.helloworld.spring.kafka.dto.MessageDTO;
@Component
public class Sender {
	@Autowired
	private KafkaTemplate<String, MessageDTO> kafkaTemplate;
	public void send(MessageDTO messageDTO) {
		// 发送消息到默认主题
		kafkaTemplate.sendDefault(messageDTO);
		// 发送消息到指定主题
		ProducerRecord<String, MessageDTO> producerRecord = new ProducerRecord<>(
				"world", messageDTO);
		kafkaTemplate.send(producerRecord);
		// 发送消息到默认主题。设置消息的属性
		Map<String, Object> headers = new HashMap<>();
		headers.put("x_source", "web");
		Message<MessageDTO> message = new GenericMessage<>(messageDTO, headers);
		kafkaTemplate.send(message);
	}
}使用 Kafka 接收消息 
设置接收消息使用的反序列化器
properties# 默认使用的是 StringDeserializer。接收的消息是自定义类型时会出现类型转换异常 spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer将接收消息使用的自定义类型所在的包加入 TRUSTED_PACKAGES 列表
properties# 接收的消息是自定义类型时需要将自定义类型所在的包加入 TRUSTED_PACKAGES 列表,不然提示 The class 'study.helloworld.spring.kafka.dto.MessageDTO' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*). spring.kafka.consumer.properties.spring.json.trusted.packages=study.helloworld.spring.kafka.dto
主动拉取 
被动监听 
java
package study.helloworld.spring.kafka.receive;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import study.helloworld.spring.kafka.dto.MessageDTO;
@Component
public class MessageListener {
	private static final Logger LOGGER = LoggerFactory
			.getLogger(MessageListener.class);
	@KafkaListener(id = "hello", topics = { "hello" })
	public void receiveHello(MessageDTO messageDTO) {
		LOGGER.info("hello destination messageDTO={}", messageDTO);
	}
	@KafkaListener(id = "world", topics = { "world" })
	public void receiveWorld(
			ConsumerRecord<String, MessageDTO> consumerRecord) {
		LOGGER.info("world destination messageDTO={}", consumerRecord.value());
	}
	@KafkaListener(id = "hello1", topics = { "hello" })
	public void receiveHello1(Message<MessageDTO> message) {
		LOGGER.info("hello destination header x_source={}",
				message.getHeaders().get("x_source"));
	}
}消费失败策略 
反序列化失败 
设置接收消息使用的反序列化器为ErrorHandlingDeserializer
properties# 当反序列化程序无法反序列化消息时,Spring无法处理该问题,因为它发生在poll()返回之前。为了解决这个问题,引入了ErrorHandlingDeserializer。 spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer # 委托给真正的反序列化器 spring.kafka.consumer.properties.spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
发送到DLT 
当达到某条记录的最大失败次数时,可以使用记录恢复器配置DefaultErrorHandler和DefaultAfterRollbackProcessor。框架提供了DeadLetterPublishingRecoverer,它将失败的消息发布到另一个主题。
TIP
默认情况下,死信记录发送到一个名为<originalTopic>.DLT(原始的主题名称加上.DLT)的主题,并与原始记录相同的分区。因此,在使用默认解析器时,死信主题必须至少具有与原始主题相同多的分区。
java
@Bean
public DefaultErrorHandler errorHandler(
		KafkaTemplate<Object, Object> kafkaTemplate) {
	ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(
			kafkaTemplate);
	// 设置重试间隔10秒,次数为3次
	BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
	// 创建DefaultErrorHandler对象
	return new DefaultErrorHandler(recoverer, backOff);
}非阻塞重试 
使用Kafka实现非阻塞重试/dlt功能通常需要设置额外的主题,并创建和配置相应的侦听器。自2.7 Spring for Apache以来,Kafka通过@RetryableTopic注释和RetryTopicConfiguration类提供了支持,以简化引导。
java
@RetryableTopic(fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(id = "hello", topics = { "hello" })
public void errorHello(MessageDTO messageDTO) {
	LOGGER.info("hello destination messageDTO={}", messageDTO);
	throw new RuntimeException("hello error");
}