Java根据某个key加锁怎么实现
一、背景
日常开发中,有时候需要根据某个 key 加锁,确保多线程情况下,对该 key 的加锁和解锁之间的代码串行执行。
大家可以借助每个 key 对应一个 ReentrantLock ,让同一个 key 的线程使用该 lock 加锁;每个 key 对应一个 Semaphore ,让同一个 key 的线程使用 Semaphore 控制同时执行的线程数。
二、参考代码
接口定义
public interface LockByKey{ /** * 加锁 */ void lock(T key); /** * 解锁 */ void unlock(T key); }
2.1 同一个 key 只能一个线程执行
2.1.1 代码实现
每个 key 对应一个 ReentrantLock ,让同一个 key 的线程使用该 lock 加锁。
import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; public class DefaultLockByKeyImplimplements LockByKey { private final Map lockMap = new ConcurrentHashMap<>(); /** * 加锁 */ @Override public void lock(T key) { // 如果key为空,直接返回 if (key == null) { throw new IllegalArgumentException("key 不能为空"); } // 获取或创建一个ReentrantLock对象 ReentrantLock lock = lockMap.computeIfAbsent(key, k -> new ReentrantLock()); // 获取锁 lock.lock(); } /** * 解锁 */ @Override public void unlock(T key) { // 如果key为空,直接返回 if (key == null) { throw new IllegalArgumentException("key 不能为空"); } // 从Map中获取锁对象 ReentrantLock lock = lockMap.get(key); // 获取不到报错 if (lock == null) { throw new IllegalArgumentException("key " + key + "尚未加锁"); } // 其他线程非法持有不允许释放 if (!lock.isHeldByCurrentThread()) { throw new IllegalStateException("当前线程尚未持有,key:" + key + "的锁,不允许释放"); } lock.unlock(); } }
注意事项:
(1)参数合法性校验
(2)解锁时需要判断该锁是否为当前线程持有
2.1.2 编写单测
import com.google.common.collect.Lists;
import org.junit.Test;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class DefaultLockByKeyImplTest {
private final LockByKey lockByKey = new DefaultLockByKeyImpl<>();
private final CountDownLatch countDownLatch = new CountDownLatch(7);
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
@Test
public void test() throws InterruptedException {
List keys = Lists.newArrayList("a", "a", "a", "b", "c", "b", "d");
Set executingKeySet = new HashSet<>();
for (int i = 0; i < keys.size(); i++) {
String key = keys.get(i);
int finalI = i;
executorService.submit(() -> {
lockByKey.lock(key);
if (executingKeySet.contains(key)) {
throw new RuntimeException("存在正在执行的 key:" + key);
}
executingKeySet.add(key);
try {
System.out.println("index:" + finalI + "对 [" + key + "] 加锁 ->" + Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
System.out.println("index:" + finalI + "释放 [" + key + "] ->" + Thread.currentThread().getName());
lockByKey.unlock(key);
executingKeySet.remove(key);
countDownLatch.countDown();
}
});
}
countDownLatch.await();
}
} 如果同一个 key 没释放能够再次进入,会抛出异常。
也可以通过日志来观察执行情况:
index:0对 [a] 加锁 ->pool-1-thread-1 index:6对 [d] 加锁 ->pool-1-thread-7 index:4对 [c] 加锁 ->pool-1-thread-5 index:3对 [b] 加锁 ->pool-1-thread-4 index:6释放 [d] ->pool-1-thread-7 index:4释放 [c] ->pool-1-thread-5 index:0释放 [a] ->pool-1-thread-1 index:3释放 [b] ->pool-1-thread-4 index:1对 [a] 加锁 ->pool-1-thread-2 index:5对 [b] 加锁 ->pool-1-thread-6 index:1释放 [a] ->pool-1-thread-2 index:5释放 [b] ->pool-1-thread-6 index:2对 [a] 加锁 ->pool-1-thread-3 index:2释放 [a] ->pool-1-thread-3
2.2、同一个 key 可以有 n个线程执行
2.2.1 代码实现
每个 key 对应一个 Semaphore ,让同一个 key 的线程使用 Semaphore 控制同时执行的线程数。
import lombok.SneakyThrows; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; public class SimultaneousEntriesLockByKeyimplements LockByKey { private final Map semaphores = new ConcurrentHashMap<>(); /** * 最大线程 */ private int allowed_threads; public SimultaneousEntriesLockByKey(int allowed_threads) { this.allowed_threads = allowed_threads; } /** * 加锁 */ @Override public void lock(T key) { Semaphore semaphore = semaphores.compute(key, (k, v) -> v == null ? new Semaphore(allowed_threads) : v); semaphore.acquireUninterruptibly(); } /** * 解锁 */ @Override public void unlock(T key) { // 如果key为空,直接返回 if (key == null) { throw new IllegalArgumentException("key 不能为空"); } // 从Map中获取锁对象 Semaphore semaphore = semaphores.get(key); if (semaphore == null) { throw new IllegalArgumentException("key " + key + "尚未加锁"); } semaphore.release(); if (semaphore.availablePermits() >= allowed_threads) { semaphores.remove(key, semaphore); } }
2.2.2 测试代码
import com.google.common.collect.Lists;
import org.junit.Test;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class SimultaneousEntriesLockByKeyTest {
private final int maxThreadEachKey = 2;
private final LockByKey lockByKey = new SimultaneousEntriesLockByKey<>(maxThreadEachKey);
private final CountDownLatch countDownLatch = new CountDownLatch(7);
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
@Test
public void test() throws InterruptedException {
List keys = Lists.newArrayList("a", "a", "a", "b", "c", "b", "d");
Map executingKeyCount = Collections.synchronizedMap(new HashMap<>());
for (int i = 0; i < keys.size(); i++) {
String key = keys.get(i);
int finalI = i;
executorService.submit(() -> {
lockByKey.lock(key);
executingKeyCount.compute(key, (k, v) -> {
if (v != null && v + 1 > maxThreadEachKey) {
throw new RuntimeException("超过限制了");
}
return v == null ? 1 : v + 1;
});
try {
System.out.println("time:" + LocalDateTime.now().toString() + " ,index:" + finalI + "对 [" + key + "] 加锁 ->" + Thread.currentThread().getName() + "count:" + executingKeyCount.get(key));
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
System.out.println("time:" + LocalDateTime.now().toString() + " ,index:" + finalI + "释放 [" + key + "] ->" + Thread.currentThread().getName() + "count:" + (executingKeyCount.get(key) - 1));
lockByKey.unlock(key);
executingKeyCount.compute(key, (k, v) -> v - 1);
countDownLatch.countDown();
}
});
}
countDownLatch.await();
}
} 输出:
time:2023-03-15T20:49:57.044195 ,index:6对 [d] 加锁 ->pool-1-thread-7count:1
time:2023-03-15T20:49:57.058942 ,index:5对 [b] 加锁 ->pool-1-thread-6count:2
time:2023-03-15T20:49:57.069789 ,index:1对 [a] 加锁 ->pool-1-thread-2count:2
time:2023-03-15T20:49:57.042402 ,index:4对 [c] 加锁 ->pool-1-thread-5count:1
time:2023-03-15T20:49:57.046866 ,index:0对 [a] 加锁 ->pool-1-thread-1count:2
time:2023-03-15T20:49:57.042991 ,index:3对 [b] 加锁 ->pool-1-thread-4count:2
time:2023-03-15T20:49:58.089557 ,index:0释放 [a] ->pool-1-thread-1count:1
time:2023-03-15T20:49:58.082679 ,index:6释放 [d] ->pool-1-thread-7count:0
time:2023-03-15T20:49:58.084579 ,index:4释放 [c] ->pool-1-thread-5count:0
time:2023-03-15T20:49:58.083462 ,index:5释放 [b] ->pool-1-thread-6count:1
time:2023-03-15T20:49:58.089576 ,index:3释放 [b] ->pool-1-thread-4count:1
time:2023-03-15T20:49:58.085359 ,index:1释放 [a] ->pool-1-thread-2count:1
time:2023-03-15T20:49:58.096912 ,index:2对 [a] 加锁 ->pool-1-thread-3count:1
time:2023-03-15T20:49:59.099935 ,index:2释放 [a] ->pool-1-thread-3count:0