Spring Boot整合Redis实现发布/订阅(含ACK机制)全流程
实现步骤
步骤1:添加Maven依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
步骤2:配置Redis连接
# application.yml
spring:
redis:
host: localhost
port: 6379
lettuce:
pool:
max-active: 16
max-idle: 8
# redisStream配置信息
app:
redis:
stream: app-events
group: app-group
consumer: consumer-${random.int(1000)}
步骤3:创建消费者组
@Configuration
public class RedisConfig {
@Value(“${app.redis.stream}”)
private String streamKey;
@Value(“${app.redis.group}”)
private String groupName;
@Bean
public void createConsumerGroup(StringRedisTemplate redisTemplate) {
try {
redisTemplate.opsForStream().createGroup(streamKey, groupName);
} catch (Exception e) {
System.out.println(“消费者组已存在: ” + groupName);
}
}
}
步骤4:配置消息监听容器
@Configuration
public class RedisConfig {
// 配置消息监听线程池
@Bean(name = “redisStreamTaskExecutor”)
public ThreadPoolTaskExecutor redisStreamTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setThreadNamePrefix(“redis-stream-“);
return executor;
}
// 创建消息监听容器
@Bean(initMethod = “start”, destroyMethod = “stop”)
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamContainer(
RedisConnectionFactory factory,
@Qualifier(“redisStreamTaskExecutor”) ThreadPoolTaskExecutor executor) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.executor(executor)
.batchSize(10)
.build();
return StreamMessageListenerContainer.create(factory, options);
}
}
步骤5:注册消息监听器
@Component
public class StreamListenerRegistrar {
@Value(“${app.redis.stream}”)
private String streamKey;
@Value(“${app.redis.group}”)
private String groupName;
@Value(“${app.redis.consumer}”)
private String consumerName;
@PostConstruct
public void registerListener(StreamMessageListenerContainer container,
RedisMessageProcessor processor) {
StreamReadRequest<String> readRequest =
StreamReadRequest.builder(StreamOffset.create(streamKey, ReadOffset.lastConsumed()))
.consumer(Consumer.from(groupName, consumerName))
.autoAcknowledge(false) // 手动ACK
.build();
container.register(readRequest, processor);
}
}
步骤6:实现消息处理器
@Component
public class RedisMessageProcessor implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> record) {
CompletableFuture.runAsync(() -> {
try {
// 业务处理逻辑
processBusiness(record);
// 处理成功发送ACK
redisTemplate.opsForStream().acknowledge(streamKey, groupName, record.getId());
} catch (Exception e) {
// 失败消息进入Pending List
}
});
}
private void processBusiness(MapRecord<String, String, String> record) throws Exception {
String eventType = record.getValue().get(“eventType”);
String payload = record.getValue().get(“payload”);
// 根据事件类型处理
switch (eventType) {
case “ORDER_CREATED”: handleOrder(payload); break;
case “PAYMENT_PROCESSED”: handlePayment(payload); break;
}
}
}
步骤7:实现Pending消息处理器
@Component
@Slf4j
public class PendingMessageProcessor {
@Value(“${app.redis.stream}”)
private String streamKey;
@Value(“${app.redis.group}”)
private String groupName;
@Value(“${app.redis.consumer}”)
private String consumerName;
// 每分钟处理一次Pending消息
@Scheduled(fixedRate = 60000)
public void processPendingMessages() {
// 1. 查询Pending消息
PendingMessages pending = redisTemplate.opsForStream()
.pending(streamKey, groupName, Range.unbounded(), 100);
pending.forEach(this::handlePendingMessage);
}
private void handlePendingMessage(PendingMessage pending) {
try {
// 2. 重新认领消息
List<MapRecord<String, String, String>> records = redisTemplate.opsForStream()
.claim(streamKey,
Consumer.from(groupName, consumerName),
Duration.ofSeconds(30),
pending.getId());
if (!records.isEmpty()) {
MapRecord<String, String, String> record = records.get(0);
// 3. 重试处理
messageProcessor.processBusiness(record);
// 4. 处理成功发送ACK
redisTemplate.opsForStream().acknowledge(streamKey, groupName, record.getId());
}
} catch (Exception e) {
// 5. 超过重试次数移入死信队列
if (pending.getTotalDeliveryCount() > 3) {
moveToDeadLetterQueue(pending);
}
}
}
private void moveToDeadLetterQueue(PendingMessage pending) {
// 获取消息内容
List<MapRecord<String, String, String>> records = redisTemplate.opsForStream()
.range(streamKey, Range.from(pending.getId()));
if (!records.isEmpty()) {
// 添加到死信队列
redisTemplate.opsForStream().add(“dead-letter:” + streamKey, records.get(0).getValue());
// 确认原始消息
redisTemplate.opsForStream().acknowledge(streamKey, groupName, pending.getId());
}
}
}
步骤8:实现消息生产者
@Service
public class RedisMessageProducer {
@Value(“${app.redis.stream}”)
private String streamKey;
public String sendMessage(String eventType, String payload) {
Map<String, String> message = Map.of(
“eventType”, eventType,
“payload”, payload,
“timestamp”, String.valueOf(System.currentTimeMillis())
);
return redisTemplate.opsForStream()
.add(streamKey, message)
.getValue();
}
}
步骤9:创建REST接口
@RestController
@RequestMapping(“/messages”)
public class MessageController {
private final RedisMessageProducer producer;
@PostMapping
public String sendMessage(@RequestBody MessageRequest request) {
return producer.sendMessage(request.getEventType(), request.getPayload());
}
@Data
public static class MessageRequest {
private String eventType;
private String payload;
}
}
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 如遇到加密压缩包,请使用WINRAR解压,如遇到无法解压的请联系管理员!
7. 本站有不少源码未能详细测试(解密),不能分辨部分源码是病毒还是误报,所以没有进行任何修改,大家使用前请进行甄别!
66源码网 » Spring Boot整合Redis实现发布/订阅(含ACK机制)全流程