验证码: 看不清楚,换一张 查询 注册会员,免验证
  • {{ basic.site_slogan }}
  • 打开微信扫一扫,
    您还可以在这里找到我们哟

    关注我们

Redis监听过期的key实现流程是什么

阅读:914 来源:乙速云 作者:代码code

Redis监听过期的key实现流程是什么

      一、简介

      我们来个最简单的集群架构,如下图:

      Redis监听过期的key实现流程是什么

      我们上面图中看到是服务A和服务B就是同一个服务的不同实例。

      二、maven依赖

      pom.xml

      
      
          4.0.0
          
              org.springframework.boot
              spring-boot-starter-parent
              2.6.0
               
          
          com.alian
          expiration
          0.0.1-SNAPSHOT
          expiration
          redis-key-expiration-listener
          
              UTF-8
              UTF-8
              target
              1.8
              
              2.9.10
              
              1.2.68
          
          
              
                  org.springframework.boot
                  spring-boot-starter-web
              
              
                  org.springframework.boot
                  spring-boot-starter-test
              
              
              
                  org.springframework.boot
                  spring-boot-starter-data-redis
                  ${parent.version}
              
              
              
                  com.fasterxml.jackson.core
                  jackson-databind
                  ${jackson.version}
              
              
              
                  com.fasterxml.jackson.datatype
                  jackson-datatype-jsr310
                  ${jackson.version}
              
              
                  com.alibaba
                  fastjson
                  1.2.68
              
              
                  org.projectlombok
                  lombok
                  1.16.14
              
              
                  junit
                  junit
                  4.13.2
                  test
              
          
          
              
                  
                      org.springframework.boot
                      spring-boot-maven-plugin
                  
              
          
      

      三、编码实现

      3.1、application.properties

      # 端口
      server.port=8090
      # 上下文路径
      server.servlet.context-path=/expiration

      # Redis数据库索引(默认为0)
      spring.redis.database=0
      # Redis服务器地址
      spring.redis.host=192.168.0.193
      #spring.redis.host=127.0.0.1
      # Redis服务器连接端口
      spring.redis.port=6379
      # Redis服务器连接密码(默认为空)
      spring.redis.password=
      # 连接池最大连接数(使用负值表示没有限制)
      spring.redis.jedis.pool.max-active=20
      # 连接池中的最小空闲连接
      spring.redis.jedis.pool.min-idle=10
      # 连接池中的最大空闲连接
      spring.redis.jedis.pool.max-idle=10
      # 连接池最大阻塞等待时间(使用负值表示没有限制)
      spring.redis.jedis.pool.max-wait=20000
      # 读时间(毫秒)
      spring.redis.timeout=10000
      # 连接超时时间(毫秒)
      spring.redis.connect-timeout=10000

      3.2、Redis配置类

      RedisConfig

      package com.alian.expiration.config;
      import com.fasterxml.jackson.annotation.JsonAutoDetect;
      import com.fasterxml.jackson.annotation.PropertyAccessor;
      import com.fasterxml.jackson.databind.ObjectMapper;
      import com.fasterxml.jackson.databind.SerializationFeature;
      import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
      import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;
      import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
      import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer;
      import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;
      import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
      import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.data.redis.connection.RedisConnectionFactory;
      import org.springframework.data.redis.core.RedisTemplate;
      import org.springframework.data.redis.listener.RedisMessageListenerContainer;
      import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
      import org.springframework.data.redis.serializer.RedisSerializer;
      import org.springframework.data.redis.serializer.StringRedisSerializer;
      import java.time.LocalDate;
      import java.time.LocalDateTime;
      import java.time.LocalTime;
      import java.time.format.DateTimeFormatter;
      @Configuration
      public class RedisConfig {
          /**
           * redis配置
           *
           * @param redisConnectionFactory
           * @return
           */
          @Bean
          public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
              // 实例化redisTemplate
              RedisTemplate redisTemplate = new RedisTemplate<>();
              //设置连接工厂
              redisTemplate.setConnectionFactory(redisConnectionFactory);
              // key采用String的序列化
              redisTemplate.setKeySerializer(keySerializer());
              // value采用jackson序列化
              redisTemplate.setValueSerializer(valueSerializer());
              // Hash key采用String的序列化
              redisTemplate.setHashKeySerializer(keySerializer());
              // Hash value采用jackson序列化
              redisTemplate.setHashValueSerializer(valueSerializer());
              // 支持事务
              // redisTemplate.setEnableTransactionSupport(true);
              //执行函数,初始化RedisTemplate
              redisTemplate.afterPropertiesSet();
              return redisTemplate;
          }
          /**
           * key类型采用String序列化
           *
           * @return
           */
          private RedisSerializer keySerializer() {
              return new StringRedisSerializer();
          }
          /**
           * value采用JSON序列化
           *
           * @return
           */
          private RedisSerializer valueSerializer() {
              //设置jackson序列化
              Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
              //设置序列化对象
              jackson2JsonRedisSerializer.setObjectMapper(getMapper());
              return jackson2JsonRedisSerializer;
          }
          /**
           * 使用com.fasterxml.jackson.databind.ObjectMapper
           * 对数据进行处理包括java8里的时间
           *
           * @return
           */
          private ObjectMapper getMapper() {
              ObjectMapper mapper = new ObjectMapper();
              //设置可见性
              mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
              //默认键入对象
              mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
              //设置Java 8 时间序列化
              JavaTimeModule timeModule = new JavaTimeModule();
              timeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
              timeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
              timeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
              timeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
              timeModule.addDeserializer(LocalDate.class, new LocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
              timeModule.addDeserializer(LocalTime.class, new LocalTimeDeserializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
              //禁用把时间转为时间戳
              mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
              mapper.registerModule(timeModule);
              return mapper;
          }
          @Bean
          RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
              RedisMessageListenerContainer container = new RedisMessageListenerContainer();
              container.setConnectionFactory(connectionFactory);
              return container;
          }
      }

      和我们之前整合redis差不多,只不过在最后增加了一个redis消息监听监听容器RedisMessageListenerContainer

      3.3、监听器

      RedisKeyExpirationListener

      package com.alian.expiration.listener;
      import com.alian.expiration.service.RedisExpirationService;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.data.redis.connection.Message;
      import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
      import org.springframework.data.redis.listener.RedisMessageListenerContainer;
      import org.springframework.stereotype.Component;
      @Slf4j
      @Component
      public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
          @Autowired
          private RedisExpirationService redisExpirationService;
      	// 把我们上面一步配置的bean注入进去
          public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
              super(listenerContainer);
          }
          /**
           * 针对redis数据失效事件,进行数据处理
           *
           * @param message
           * @param pattern
           */
          @Override
          public void onMessage(Message message, byte[] pattern) {
              // 用户做自己的业务处理即可,注意message.toString()可以获取失效的key
              String expiredKey = message.toString();
              log.info("onMessage --> redis 过期的key是:{}", expiredKey);
              try {
                  // 对过期key进行处理
                  redisExpirationService.processingExpiredKey(expiredKey);
                  log.info("过期key处理完成:{}", expiredKey);
              } catch (Exception e) {
                  e.printStackTrace();
                  log.error("处理redis 过期的key异常:{}", expiredKey, e);
              }
          }
      }

      实现的步骤如下:

      • 继承KeyExpirationEventMessageListener

      • 把redis消息监听监听容器RedisMessageListenerContainer 注入到密钥空间事件消息侦 听器中

      • 重写onMessage方法

      • 通过Message 的 toString() 方法就可以获取到过期的key

      • 对key中关键信息进行业务处理,比如 id

      3.4、服务类

      RedisExpirationService

      package com.alian.expiration.service;
      import com.alian.expiration.util.SignUtils;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.data.redis.core.RedisTemplate;
      import org.springframework.stereotype.Service;
      import java.util.concurrent.TimeUnit;
      @Slf4j
      @Service
      public class RedisExpirationService {
          @Autowired
          private RedisTemplate redisTemplate;
          public void processingExpiredKey(String expiredKey) {
              // 如果是优惠券的key(一定要规范命名)
              if (expiredKey.startsWith("com.mall.coupon.id")) {
                  // 临时key,此key可以在业务处理完,然后延迟一定时间删除,或者不处理
                  String tempKey = SignUtils.md5(expiredKey, "UTF-8");
                  // 临时key不存在才设置值,key超时时间为10秒(此处相当于分布式锁的应用)
                  Boolean exist = redisTemplate.opsForValue().setIfAbsent(tempKey, "1", 10, TimeUnit.SECONDS);
                  if (Boolean.TRUE.equals(exist)) {
                      log.info("Business Handing...");
                      // 比如截取里面的id,然后关联数据库进行处理
                  } else {
                      log.info("Other service is handing...");
                  }
              } else {
                  log.info("Expired keys without processing");
              }
          }
      }

      基本流程如下:

      • 判断是否是需要处理的key,一般这种key通过命名规范加以处理

      • 以当前key生成一个新的key作为分布式key

      • 如果redis中不存在这个新的key,则为新的key设置一个值,达到分布式服务处理(核心)

      • 设置成功的,进行业务处理;设置失败了,说明其他服务正在处理这个key

      • 根据 key 的关键信息(比如截取id),进行业务处理

      3.5、工具类

      SignUtils

      package com.alian.expiration.util;
      import java.security.MessageDigest;
      public class SignUtils {
          public static final String md5(String s, String charset) {
              char[] hexDigits = new char[]{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
              try {
                  byte[] btInput = s.getBytes(charset);
                  MessageDigest mdInst = MessageDigest.getInstance("MD5");
                  mdInst.update(btInput);
                  byte[] md = mdInst.digest();
                  int j = md.length;
                  char[] str = new char[j * 2];
                  int k = 0;
                  for (byte byte0 : md) {
                      str[k++] = hexDigits[byte0 >>> 4 & 15];
                      str[k++] = hexDigits[byte0 & 15];
                  }
                  return new String(str);
              } catch (Exception var11) {
                  return "";
              }
          }
      }

      四、测试

      4.1、测试类

      简单模拟下发送一个优惠券数据到redis,然后设置超时时间

      package com.alian.expiration;
      import lombok.extern.slf4j.Slf4j;
      import org.junit.Test;
      import org.junit.runner.RunWith;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.boot.test.context.SpringBootTest;
      import org.springframework.data.redis.core.RedisTemplate;
      import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
      import java.util.HashMap;
      import java.util.Map;
      import java.util.concurrent.TimeUnit;
      @Slf4j
      @RunWith(SpringJUnit4ClassRunner.class)
      @SpringBootTest
      public class RedisKeyExpirationTest {
          @Autowired
          private RedisTemplate redisTemplate;
          @Test
          public void keyExpiration() {
              // 优惠券信息
              String id = "2023021685264735";
              Map map = new HashMap<>();
              map.put("id", id);
              map.put("amount", "1000");
              map.put("type", "1001");
              map.put("describe", "满减红包");
              // 缓存到redis
              redisTemplate.opsForHash().putAll("com.mall.coupon.id." + id, map);
              // 设置过期时间
              redisTemplate.expire("com.mall.coupon.id." + id, 10, TimeUnit.SECONDS);
          }
      }

      4.2、单实例

      单实例就是服务只部署了一份,我们启动一份,端口是8090,然后通过上面的测试类,发送一个消息,结果如下:

      10:23:39 701 INFO [container-2]:onMessage --> redis 过期的key是:com.mall.coupon.id.2023021685264735
      10:23:39 988 INFO [container-2]:Business Handing...
      10:23:39 989 INFO [container-2]:过期key处理完成:com.mall.coupon.id.2023021685264735
      10:23:50 005 INFO [container-3]:onMessage --> redis 过期的key是:450FCC35415BADC16805962CA5BC7E12
      10:23:50 005 INFO [container-3]:Expired keys without processing
      10:23:50 005 INFO [container-3]:过期key处理完成:450FCC35415BADC16805962CA5BC7E12

      4.3、多实例

      多实例就是服务部署了多份,比如我们启动两份,端口分别为8090和8091,然后通过上面的测试类,发送一个消息,8090端口的服务结果如下(Business Handing…):

      11:39:06 691 INFO [container-2]:onMessage --> redis 过期的key是:com.mall.coupon.id.2023021685264735
      11:39:06 707 INFO [container-2]:Business Handing...
      11:39:06 707 INFO [container-2]:过期key处理完成:com.mall.coupon.id.2023021685264735
      11:39:16 796 INFO [container-3]:onMessage --> redis 过期的key是:450FCC35415BADC16805962CA5BC7E12
      11:39:16 796 INFO [container-3]:Expired keys without processing
      11:39:16 796 INFO [container-3]:过期key处理完成:450FCC35415BADC16805962CA5BC7E12

      8091端口的服务结果如下(Other service is handing…):

      11:39:06 691 INFO [container-2]:onMessage --> redis 过期的key是:com.mall.coupon.id.2023021685264735
      11:39:06 707 INFO [container-2]:Other service is handing...
      11:39:06 707 INFO [container-2]:过期key处理完成:com.mall.coupon.id.2023021685264735
      11:39:16 796 INFO [container-3]:onMessage --> redis 过期的key是:450FCC35415BADC16805962CA5BC7E12
      11:39:16 796 INFO [container-3]:Expired keys without processing
      11:39:16 796 INFO [container-3]:过期key处理完成:450FCC35415BADC16805962CA5BC7E12

      结果分析:

      • 多实例的情况下,每个实例都会收到过期key通知

      • 通过redis分布式锁,实现只有一个实例会进行业务处理,防止重复

      • 使用分布式锁会有一个新的key过期,并且收到该key的通知,你可以业务执行完延迟一定时间(避免重复执行),再删除,也可以不处理(因为本就不是要处理业务的key)

      分享到:
      *特别声明:以上内容来自于网络收集,著作权属原作者所有,如有侵权,请联系我们: hlamps#outlook.com (#换成@)。
      相关文章
      {{ v.title }}
      {{ v.description||(cleanHtml(v.content)).substr(0,100)+'···' }}
      你可能感兴趣
      推荐阅读 更多>

      {{ basic.bottom_text }}