Redis监听过期的key实现流程是什么
一、简介
我们来个最简单的集群架构,如下图:
我们上面图中看到是服务A和服务B就是同一个服务的不同实例。
二、maven依赖
pom.xml
4.0.0 org.springframework.boot spring-boot-starter-parent 2.6.0 com.alian expiration 0.0.1-SNAPSHOT expiration redis-key-expiration-listener UTF-8 UTF-8 target 1.8 2.9.10 1.2.68 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test org.springframework.boot spring-boot-starter-data-redis ${parent.version} com.fasterxml.jackson.core jackson-databind ${jackson.version} com.fasterxml.jackson.datatype jackson-datatype-jsr310 ${jackson.version} com.alibaba fastjson 1.2.68 org.projectlombok lombok 1.16.14 junit junit 4.13.2 test org.springframework.boot spring-boot-maven-plugin
三、编码实现
3.1、application.properties
# 端口
server.port=8090
# 上下文路径
server.servlet.context-path=/expiration# Redis数据库索引(默认为0)
spring.redis.database=0
# Redis服务器地址
spring.redis.host=192.168.0.193
#spring.redis.host=127.0.0.1
# Redis服务器连接端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=20
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=10
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=10
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.jedis.pool.max-wait=20000
# 读时间(毫秒)
spring.redis.timeout=10000
# 连接超时时间(毫秒)
spring.redis.connect-timeout=10000
3.2、Redis配置类
RedisConfig
package com.alian.expiration.config; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer; import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer; import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer; import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer; import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.format.DateTimeFormatter; @Configuration public class RedisConfig { /** * redis配置 * * @param redisConnectionFactory * @return */ @Bean public RedisTemplateredisTemplate(RedisConnectionFactory redisConnectionFactory) { // 实例化redisTemplate RedisTemplate redisTemplate = new RedisTemplate<>(); //设置连接工厂 redisTemplate.setConnectionFactory(redisConnectionFactory); // key采用String的序列化 redisTemplate.setKeySerializer(keySerializer()); // value采用jackson序列化 redisTemplate.setValueSerializer(valueSerializer()); // Hash key采用String的序列化 redisTemplate.setHashKeySerializer(keySerializer()); // Hash value采用jackson序列化 redisTemplate.setHashValueSerializer(valueSerializer()); // 支持事务 // redisTemplate.setEnableTransactionSupport(true); //执行函数,初始化RedisTemplate redisTemplate.afterPropertiesSet(); return redisTemplate; } /** * key类型采用String序列化 * * @return */ private RedisSerializer keySerializer() { return new StringRedisSerializer(); } /** * value采用JSON序列化 * * @return */ private RedisSerializer
和我们之前整合redis差不多,只不过在最后增加了一个redis消息监听监听容器RedisMessageListenerContainer
3.3、监听器
RedisKeyExpirationListener
package com.alian.expiration.listener; import com.alian.expiration.service.RedisExpirationService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Component; @Slf4j @Component public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { @Autowired private RedisExpirationService redisExpirationService; // 把我们上面一步配置的bean注入进去 public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } /** * 针对redis数据失效事件,进行数据处理 * * @param message * @param pattern */ @Override public void onMessage(Message message, byte[] pattern) { // 用户做自己的业务处理即可,注意message.toString()可以获取失效的key String expiredKey = message.toString(); log.info("onMessage --> redis 过期的key是:{}", expiredKey); try { // 对过期key进行处理 redisExpirationService.processingExpiredKey(expiredKey); log.info("过期key处理完成:{}", expiredKey); } catch (Exception e) { e.printStackTrace(); log.error("处理redis 过期的key异常:{}", expiredKey, e); } } }
实现的步骤如下:
继承KeyExpirationEventMessageListener
把redis消息监听监听容器RedisMessageListenerContainer 注入到密钥空间事件消息侦 听器中
重写onMessage方法
通过Message 的 toString() 方法就可以获取到过期的key
对key中关键信息进行业务处理,比如 id
3.4、服务类
RedisExpirationService
package com.alian.expiration.service; import com.alian.expiration.util.SignUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import java.util.concurrent.TimeUnit; @Slf4j @Service public class RedisExpirationService { @Autowired private RedisTemplateredisTemplate; public void processingExpiredKey(String expiredKey) { // 如果是优惠券的key(一定要规范命名) if (expiredKey.startsWith("com.mall.coupon.id")) { // 临时key,此key可以在业务处理完,然后延迟一定时间删除,或者不处理 String tempKey = SignUtils.md5(expiredKey, "UTF-8"); // 临时key不存在才设置值,key超时时间为10秒(此处相当于分布式锁的应用) Boolean exist = redisTemplate.opsForValue().setIfAbsent(tempKey, "1", 10, TimeUnit.SECONDS); if (Boolean.TRUE.equals(exist)) { log.info("Business Handing..."); // 比如截取里面的id,然后关联数据库进行处理 } else { log.info("Other service is handing..."); } } else { log.info("Expired keys without processing"); } } }
基本流程如下:
判断是否是需要处理的key,一般这种key通过命名规范加以处理
以当前key生成一个新的key作为分布式key
如果redis中不存在这个新的key,则为新的key设置一个值,达到分布式服务处理(核心)
设置成功的,进行业务处理;设置失败了,说明其他服务正在处理这个key
根据 key 的关键信息(比如截取id),进行业务处理
3.5、工具类
SignUtils
package com.alian.expiration.util; import java.security.MessageDigest; public class SignUtils { public static final String md5(String s, String charset) { char[] hexDigits = new char[]{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'}; try { byte[] btInput = s.getBytes(charset); MessageDigest mdInst = MessageDigest.getInstance("MD5"); mdInst.update(btInput); byte[] md = mdInst.digest(); int j = md.length; char[] str = new char[j * 2]; int k = 0; for (byte byte0 : md) { str[k++] = hexDigits[byte0 >>> 4 & 15]; str[k++] = hexDigits[byte0 & 15]; } return new String(str); } catch (Exception var11) { return ""; } } }
四、测试
4.1、测试类
简单模拟下发送一个优惠券数据到redis,然后设置超时时间
package com.alian.expiration; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @Slf4j @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest public class RedisKeyExpirationTest { @Autowired private RedisTemplateredisTemplate; @Test public void keyExpiration() { // 优惠券信息 String id = "2023021685264735"; Map map = new HashMap<>(); map.put("id", id); map.put("amount", "1000"); map.put("type", "1001"); map.put("describe", "满减红包"); // 缓存到redis redisTemplate.opsForHash().putAll("com.mall.coupon.id." + id, map); // 设置过期时间 redisTemplate.expire("com.mall.coupon.id." + id, 10, TimeUnit.SECONDS); } }
4.2、单实例
单实例就是服务只部署了一份,我们启动一份,端口是8090,然后通过上面的测试类,发送一个消息,结果如下:
10:23:39 701 INFO [container-2]:onMessage --> redis 过期的key是:com.mall.coupon.id.2023021685264735
10:23:39 988 INFO [container-2]:Business Handing...
10:23:39 989 INFO [container-2]:过期key处理完成:com.mall.coupon.id.2023021685264735
10:23:50 005 INFO [container-3]:onMessage --> redis 过期的key是:450FCC35415BADC16805962CA5BC7E12
10:23:50 005 INFO [container-3]:Expired keys without processing
10:23:50 005 INFO [container-3]:过期key处理完成:450FCC35415BADC16805962CA5BC7E12
4.3、多实例
多实例就是服务部署了多份,比如我们启动两份,端口分别为8090和8091,然后通过上面的测试类,发送一个消息,8090端口的服务结果如下(Business Handing…):
11:39:06 691 INFO [container-2]:onMessage --> redis 过期的key是:com.mall.coupon.id.2023021685264735
11:39:06 707 INFO [container-2]:Business Handing...
11:39:06 707 INFO [container-2]:过期key处理完成:com.mall.coupon.id.2023021685264735
11:39:16 796 INFO [container-3]:onMessage --> redis 过期的key是:450FCC35415BADC16805962CA5BC7E12
11:39:16 796 INFO [container-3]:Expired keys without processing
11:39:16 796 INFO [container-3]:过期key处理完成:450FCC35415BADC16805962CA5BC7E12
8091端口的服务结果如下(Other service is handing…):
11:39:06 691 INFO [container-2]:onMessage --> redis 过期的key是:com.mall.coupon.id.2023021685264735
11:39:06 707 INFO [container-2]:Other service is handing...
11:39:06 707 INFO [container-2]:过期key处理完成:com.mall.coupon.id.2023021685264735
11:39:16 796 INFO [container-3]:onMessage --> redis 过期的key是:450FCC35415BADC16805962CA5BC7E12
11:39:16 796 INFO [container-3]:Expired keys without processing
11:39:16 796 INFO [container-3]:过期key处理完成:450FCC35415BADC16805962CA5BC7E12
结果分析:
多实例的情况下,每个实例都会收到过期key通知
通过redis分布式锁,实现只有一个实例会进行业务处理,防止重复
使用分布式锁会有一个新的key过期,并且收到该key的通知,你可以业务执行完延迟一定时间(避免重复执行),再删除,也可以不处理(因为本就不是要处理业务的key)