验证码: 看不清楚,换一张 查询 注册会员,免验证
  • {{ basic.site_slogan }}
  • 打开微信扫一扫,
    您还可以在这里找到我们哟

    关注我们

Java根据某个key加锁怎么实现

阅读:653 来源:乙速云 作者:代码code

Java根据某个key加锁怎么实现

      一、背景

      日常开发中,有时候需要根据某个 key 加锁,确保多线程情况下,对该 key 的加锁和解锁之间的代码串行执行。
      大家可以借助每个 key 对应一个 ReentrantLock ,让同一个 key 的线程使用该 lock 加锁;每个 key 对应一个 Semaphore ,让同一个 key 的线程使用 Semaphore 控制同时执行的线程数。

      二、参考代码

      接口定义

      public interface LockByKey {
      
          /**
           * 加锁
           */
          void lock(T key);
      
          /**
           * 解锁
           */
          void unlock(T key);
      }

      2.1 同一个 key 只能一个线程执行

      2.1.1 代码实现

      每个 key 对应一个 ReentrantLock ,让同一个 key 的线程使用该 lock 加锁。

      import java.util.Map;
      import java.util.concurrent.ConcurrentHashMap;
      import java.util.concurrent.locks.ReentrantLock;
      
      public class DefaultLockByKeyImpl implements LockByKey {
      
          private final Map lockMap = new ConcurrentHashMap<>();
      
          /**
           * 加锁
           */
          @Override
          public void lock(T key) {
              // 如果key为空,直接返回
              if (key == null) {
                  throw new IllegalArgumentException("key 不能为空");
              }
              
              // 获取或创建一个ReentrantLock对象
              ReentrantLock lock = lockMap.computeIfAbsent(key, k -> new ReentrantLock());
              // 获取锁
              lock.lock();
          }
      
      
          /**
           * 解锁
           */
          @Override
          public void unlock(T key) {
              // 如果key为空,直接返回
              if (key == null) {
                  throw new IllegalArgumentException("key 不能为空");
              }
      
              // 从Map中获取锁对象
              ReentrantLock lock = lockMap.get(key);
              // 获取不到报错
              if (lock == null) {
                  throw new IllegalArgumentException("key " + key + "尚未加锁");
              }
              // 其他线程非法持有不允许释放
              if (!lock.isHeldByCurrentThread()) {
                  throw new IllegalStateException("当前线程尚未持有,key:" + key + "的锁,不允许释放");
              }
              lock.unlock();
          }
      }

      注意事项:
      (1)参数合法性校验
      (2)解锁时需要判断该锁是否为当前线程持有

      2.1.2 编写单测
      import com.google.common.collect.Lists;
      import org.junit.Test;
      
      import java.util.HashSet;
      import java.util.List;
      import java.util.Set;
      import java.util.concurrent.CountDownLatch;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      import java.util.concurrent.TimeUnit;
      
      public class DefaultLockByKeyImplTest {
      
          private final LockByKey lockByKey = new DefaultLockByKeyImpl<>();
      
          private final CountDownLatch countDownLatch = new CountDownLatch(7);
          private final ExecutorService executorService = Executors.newFixedThreadPool(10);
      
          @Test
          public void test() throws InterruptedException {
              List keys = Lists.newArrayList("a", "a", "a", "b", "c", "b", "d");
              Set executingKeySet = new HashSet<>();
      
              for (int i = 0; i < keys.size(); i++) {
                  String key = keys.get(i);
                  int finalI = i;
                  executorService.submit(() -> {
                      lockByKey.lock(key);
                      if (executingKeySet.contains(key)) {
                          throw new RuntimeException("存在正在执行的 key:" + key);
                      }
                      executingKeySet.add(key);
      
                      try {
                          System.out.println("index:" + finalI + "对 [" + key + "] 加锁 ->" + Thread.currentThread().getName());
                          TimeUnit.SECONDS.sleep(1);
                      } catch (InterruptedException e) {
                          throw new RuntimeException(e);
                      } finally {
                          System.out.println("index:" + finalI + "释放 [" + key + "] ->" + Thread.currentThread().getName());
                          lockByKey.unlock(key);
                          executingKeySet.remove(key);
                          countDownLatch.countDown();
                      }
                  });
              }
              countDownLatch.await();
          }
      }

      如果同一个 key 没释放能够再次进入,会抛出异常。
      也可以通过日志来观察执行情况:

      index:0对 [a] 加锁 ->pool-1-thread-1
      index:6对 [d] 加锁 ->pool-1-thread-7
      index:4对 [c] 加锁 ->pool-1-thread-5
      index:3对 [b] 加锁 ->pool-1-thread-4
      index:6释放 [d] ->pool-1-thread-7
      index:4释放 [c] ->pool-1-thread-5
      index:0释放 [a] ->pool-1-thread-1
      index:3释放 [b] ->pool-1-thread-4
      
      index:1对 [a] 加锁 ->pool-1-thread-2
      index:5对 [b] 加锁 ->pool-1-thread-6
      index:1释放 [a] ->pool-1-thread-2
      index:5释放 [b] ->pool-1-thread-6
      
      index:2对 [a] 加锁 ->pool-1-thread-3
      index:2释放 [a] ->pool-1-thread-3

      2.2、同一个 key 可以有 n个线程执行

      2.2.1 代码实现

      每个 key 对应一个 Semaphore ,让同一个 key 的线程使用 Semaphore 控制同时执行的线程数。

      import lombok.SneakyThrows;
      
      import java.util.Map;
      import java.util.concurrent.ConcurrentHashMap;
      import java.util.concurrent.Semaphore;
      
      public class SimultaneousEntriesLockByKey implements LockByKey {
      
          private final Map semaphores = new ConcurrentHashMap<>();
      
          /**
           * 最大线程
           */
          private int allowed_threads;
      
          public SimultaneousEntriesLockByKey(int allowed_threads) {
              this.allowed_threads = allowed_threads;
          }
      
          /**
           * 加锁
           */
          @Override
          public void lock(T key) {
              Semaphore semaphore = semaphores.compute(key, (k, v) -> v == null ? new Semaphore(allowed_threads) : v);
              semaphore.acquireUninterruptibly();
          }
      
      
          /**
           * 解锁
           */
          @Override
          public void unlock(T key) {
              // 如果key为空,直接返回
              if (key == null) {
                  throw new IllegalArgumentException("key 不能为空");
              }
      
              // 从Map中获取锁对象
              Semaphore semaphore = semaphores.get(key);
              if (semaphore == null) {
                  throw new IllegalArgumentException("key " + key + "尚未加锁");
              }
              semaphore.release();
              if (semaphore.availablePermits() >= allowed_threads) {
                  semaphores.remove(key, semaphore);
              }
          }
      2.2.2 测试代码
      import com.google.common.collect.Lists;
      import org.junit.Test;
      
      import java.time.LocalDateTime;
      import java.util.Collections;
      import java.util.HashMap;
      import java.util.List;
      import java.util.Map;
      import java.util.concurrent.CountDownLatch;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      import java.util.concurrent.TimeUnit;
      
      public class SimultaneousEntriesLockByKeyTest {
      
          private final int maxThreadEachKey = 2;
          private final LockByKey lockByKey = new SimultaneousEntriesLockByKey<>(maxThreadEachKey);
      
          private final CountDownLatch countDownLatch = new CountDownLatch(7);
          private final ExecutorService executorService = Executors.newFixedThreadPool(10);
      
          @Test
          public void test() throws InterruptedException {
              List keys = Lists.newArrayList("a", "a", "a", "b", "c", "b", "d");
              Map executingKeyCount = Collections.synchronizedMap(new HashMap<>());
      
              for (int i = 0; i < keys.size(); i++) {
                  String key = keys.get(i);
                  int finalI = i;
                  executorService.submit(() -> {
                      lockByKey.lock(key);
                      executingKeyCount.compute(key, (k, v) -> {
                          if (v != null && v + 1 > maxThreadEachKey) {
                              throw new RuntimeException("超过限制了");
                          }
                          return v == null ? 1 : v + 1;
                      });
                      try {
                          System.out.println("time:" + LocalDateTime.now().toString() + " ,index:" + finalI + "对 [" + key + "] 加锁 ->" + Thread.currentThread().getName() + "count:" + executingKeyCount.get(key));
                          TimeUnit.SECONDS.sleep(1);
                      } catch (InterruptedException e) {
                          throw new RuntimeException(e);
                      } finally {
                          System.out.println("time:" + LocalDateTime.now().toString() + " ,index:" + finalI + "释放 [" + key + "] ->" + Thread.currentThread().getName() + "count:" + (executingKeyCount.get(key) - 1));
                          lockByKey.unlock(key);
                          executingKeyCount.compute(key, (k, v) -> v - 1);
                          countDownLatch.countDown();
                      }
                  });
              }
              countDownLatch.await();
          }
      }

      输出:

      time:2023-03-15T20:49:57.044195 ,index:6对 [d] 加锁 ->pool-1-thread-7count:1
      time:2023-03-15T20:49:57.058942 ,index:5对 [b] 加锁 ->pool-1-thread-6count:2
      time:2023-03-15T20:49:57.069789 ,index:1对 [a] 加锁 ->pool-1-thread-2count:2
      time:2023-03-15T20:49:57.042402 ,index:4对 [c] 加锁 ->pool-1-thread-5count:1
      time:2023-03-15T20:49:57.046866 ,index:0对 [a] 加锁 ->pool-1-thread-1count:2
      time:2023-03-15T20:49:57.042991 ,index:3对 [b] 加锁 ->pool-1-thread-4count:2
      time:2023-03-15T20:49:58.089557 ,index:0释放 [a] ->pool-1-thread-1count:1
      time:2023-03-15T20:49:58.082679 ,index:6释放 [d] ->pool-1-thread-7count:0
      time:2023-03-15T20:49:58.084579 ,index:4释放 [c] ->pool-1-thread-5count:0
      time:2023-03-15T20:49:58.083462 ,index:5释放 [b] ->pool-1-thread-6count:1
      time:2023-03-15T20:49:58.089576 ,index:3释放 [b] ->pool-1-thread-4count:1
      time:2023-03-15T20:49:58.085359 ,index:1释放 [a] ->pool-1-thread-2count:1
      time:2023-03-15T20:49:58.096912 ,index:2对 [a] 加锁 ->pool-1-thread-3count:1
      time:2023-03-15T20:49:59.099935 ,index:2释放 [a] ->pool-1-thread-3count:0

    分享到:
    *特别声明:以上内容来自于网络收集,著作权属原作者所有,如有侵权,请联系我们: hlamps#outlook.com (#换成@)。
    相关文章
    {{ v.title }}
    {{ v.description||(cleanHtml(v.content)).substr(0,100)+'···' }}
    你可能感兴趣
    推荐阅读 更多>