【Redisson starter】 示例
【Redisson starter】 示例
Metadata
title: 【Redisson starter】 示例
date: 2023-01-22 21:55
tags:
- 行动阶段/完成
- 主题场景/组件
- 笔记空间/KnowladgeSpace/ProgramSpace/ModuleSpace
- 细化主题/Module/Redisson
categories:
- Redisson
keywords:
- Redisson
description: 【Redisson starter】 示例
【Redisson starter】 示例
@Component
@Slf4j
public class DistributedRedisLock {
@Autowired
private RedissonClient redissonClient;
private static final String DEFAULT_LOCK_NAME = "redisLock_";
//加锁
public boolean lock(String lockName) {
//声明key对象
String key = DEFAULT_LOCK_NAME + lockName;
//获取锁对象
RLock mylock = redissonClient.getLock(key);
//加锁,并且设置锁过期时间3秒,防止死锁的产生 uuid+threadId
mylock.lock();
//加锁成功
return true;
}
public boolean lock(String lockName, long timeout) {
checkRedissonClient();
RLock lock = getLock(lockName);
try {
if(timeout != -1){
// timeout:超时时间 TimeUnit.SECONDS:单位
lock.lock(timeout, TimeUnit.SECONDS);
}else{
lock.lock();
}
log.debug(" get lock success ,lockKey:{}", lockName);
return true;
} catch (Exception e) {
log.error(" get lock fail,lockKey:{}, cause:{} ",
lockName, e.getMessage());
return false;
}
}
private void checkRedissonClient() {
if (null == redissonClient) {
log.error(" redissonClient is null ,please check redis instance ! ");
throw new RuntimeException("redissonClient is null ,please check redis instance !");
}
if (redissonClient.isShutdown()) {
log.error(" Redisson instance has been shut down !!!");
throw new RuntimeException("Redisson instance has been shut down !!!");
}
}
/**
* 解锁
* @param lockName
*/
public void unlock(String lockName){
checkRedissonClient();
try {
RLock lock = getLock(lockName);
if(lock.isLocked() && lock.isHeldByCurrentThread()){
lock.unlock();
log.debug("key:{},unlock success",lockName);
}else{
log.debug("key:{},没有加锁或者不是当前线程加的锁 ",lockName);
}
}catch (Exception e){
log.error("key:{},unlock error,reason:{}",lockName,e.getMessage());
}
}
private RLock getLock(String lockName) {
String key = DEFAULT_LOCK_NAME + lockName;
return redissonClient.getLock(key);
}
/**
* 可中断锁
* @param lockName 锁名称
* @param waitTimeout 等待时长
* @param unit 时间单位
* @return
*/
public boolean tryLock(String lockName, long waitTimeout, TimeUnit unit) {
checkRedissonClient();
RLock lock = getLock(lockName);
try {
boolean res = lock.tryLock(waitTimeout,unit);
if (!res) {
log.debug(" get lock fail ,lockKey:{}", lockName);
return false;
}
log.debug(" get lock success ,lockKey:{}", lockName);
return true;
} catch (Exception e) {
log.error(" get lock fail,lockKey:{}, cause:{} ",
lockName, e.getMessage());
return false;
}
}
/**
* 公平锁
* @param lockName
* @param waitTimeout
* @param timeout
* @param unit
* @return
*/
public boolean getFairLock(String lockName, long waitTimeout,long timeout, TimeUnit unit){
checkRedissonClient();
RLock lock = redissonClient.getFairLock(DEFAULT_LOCK_NAME + lockName);
try {
boolean res = lock.tryLock(waitTimeout,timeout,unit);
if (!res) {
log.debug(" get lock fail ,lockKey:{}", lockName);
return false;
}
log.debug(" get lock success ,lockKey:{}", lockName);
return true;
} catch (Exception e) {
log.error(" get lock fail,lockKey:{}, cause:{} ",
lockName, e.getMessage());
return false;
}
}
/**
* 获取读写锁
* @param lockName
* @return
*/
public RReadWriteLock getReadWriteLock(String lockName) {
return redissonClient.getReadWriteLock(lockName);
}
/**
* 信号量
* @param semaphoreName
* @return
*/
public RSemaphore getSemaphore(String semaphoreName) {
return redissonClient.getSemaphore(semaphoreName);
}
/**
* 可过期性信号量
* @param permitExpirableSemaphoreName
* @return
*/
public RPermitExpirableSemaphore getPermitExpirableSemaphore(String permitExpirableSemaphoreName) {
return redissonClient.getPermitExpirableSemaphore(permitExpirableSemaphoreName);
}
/**
* 闭锁
* @param countDownLatchName
* @return
*/
public RCountDownLatch getCountDownLatch(String countDownLatchName) {
return redissonClient.getCountDownLatch(countDownLatchName);
}
}
测试用例
@RunWith(SpringRunner.class)
@SpringBootTest
public class RedissonDistributedLockerTest {
private static final Logger log = LoggerFactory.getLogger(RedissonDistributedLocker.class);
@Resource
private DistributedLocker distributedLocker;
private static final ExecutorService executorServiceB = Executors.newSingleThreadExecutor();
private static final ExecutorService executorServiceC = Executors.newSingleThreadExecutor();
@Test
public void tryLockUnlockCost() throws Exception {
StopWatch stopWatch = new StopWatch("加锁解锁耗时统计");
stopWatch.start();
for (int i = 0; i < 10000; i++) {
String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
Assert.assertTrue(optLocked.isPresent());
optLocked.get().unlock();
}
stopWatch.stop();
log.info(stopWatch.prettyPrint());
}
@Test
public void tryLock() throws Exception {
String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
Assert.assertTrue(optLocked.isPresent());
Optional<LockResource> optLocked2 = distributedLocker.tryLock(key, 600000, 600000);
Assert.assertTrue(optLocked2.isPresent());
optLocked.get().unlock();
}
/**
* 模拟2个线程争抢锁:A先获取到锁,A释放锁后,B再获得锁
*/
@Test
public void tryLock2() throws Exception {
String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
CountDownLatch countDownLatch = new CountDownLatch(1);
Future<Optional<LockResource>> submit = executorServiceB.submit(() -> {
countDownLatch.await();
log.info("B尝试获得锁:thread={}", currentThreadId());
return distributedLocker.tryLock(key, 600000, 600000);
}
);
log.info("A尝试获得锁:thread={}", currentThreadId());
Optional<LockResource> optLocked = distributedLocker.tryLock(key, 300000, 600000);
Assert.assertTrue(optLocked.isPresent());
log.info("A已获得锁:thread={}", currentThreadId());
countDownLatch.countDown();
optLocked.get().unlock();
log.info("A已释放锁:thread={}", currentThreadId());
Optional<LockResource> lockResource2 = submit.get();
Assert.assertTrue(lockResource2.isPresent());
executorServiceB.submit(() -> {
log.info("B已获得锁:thread={}", currentThreadId());
lockResource2.get().unlock();
log.info("B已释放锁:thread={}", currentThreadId());
});
}
/**
* 模拟3个线程争抢锁:A先获取到锁,A释放锁后,B和C同时争抢锁
*/
@Test
public void tryLock3() throws Exception {
String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
log.info("A尝试获得锁:thread={}", currentThreadId());
Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
if (optLocked.isPresent()) {
log.info("A已获得锁:thread={}", currentThreadId());
}
Assert.assertTrue(optLocked.isPresent());
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
Future<Optional<LockResource>> submitB = executorServiceB.submit(() -> {
cyclicBarrier.await();
log.info("B尝试获得锁:thread={}", currentThreadId());
return distributedLocker.tryLock(key, 600000, 600000);
}
);
Future<Optional<LockResource>> submitC = executorServiceC.submit(() -> {
cyclicBarrier.await();
log.info("C尝试获得锁:thread={}", currentThreadId());
return distributedLocker.tryLock(key, 600000, 600000);
}
);
optLocked.get().unlock();
log.info("A已释放锁:thread={}", currentThreadId());
CountDownLatch countDownLatch = new CountDownLatch(2);
executorServiceB.submit(() -> {
log.info("B已获得锁:thread={}", currentThreadId());
try {
submitB.get().get().unlock();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
log.info("B已释放锁:thread={}", currentThreadId());
countDownLatch.countDown();
});
executorServiceC.submit(() -> {
log.info("C已获得锁:thread={}", currentThreadId());
try {
submitC.get().get().unlock();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
log.info("C已释放锁:thread={}", currentThreadId());
countDownLatch.countDown();
});
countDownLatch.await();
}
private static Long currentThreadId() {
return Thread.currentThread().getId();
}
@Test
public void tryLockWaitTimeout() throws Exception {
String key = "mock-key:" + UUID.randomUUID().toString();
Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10, 2000);
Assert.assertTrue(optLocked.isPresent());
Optional<LockResource> optLockResource = CompletableFuture.supplyAsync(() -> {
long now = System.currentTimeMillis();
Optional<LockResource> optLockedAgain = distributedLocker.tryLock(key, 1000, 10);
long cost = System.currentTimeMillis() - now;
log.info("cost={}", cost);
return optLockedAgain;
}).exceptionally(th -> {
log.error("Exception: ", th);
return Optional.empty();
}).join();
Assert.assertTrue(!optLockResource.isPresent());
}
@Test
public void tryLockWithLeaseTime() throws Exception {
String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
Optional<LockResource> optLocked = distributedLocker.tryLock(key, 3000, 1000);
Assert.assertTrue(optLocked.isPresent());
// 可重入
Optional<LockResource> optLockedAgain = distributedLocker.tryLock(key, 3000, 1000);
Assert.assertTrue(optLockedAgain.isPresent());
}
/**
* 模拟1000个并发请求枪一把锁
*/
@Test
public void tryLockWithLeaseTimeOnMultiThread() throws Exception {
int totalThread = 1000;
String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
AtomicInteger tryAcquireLockTimes = new AtomicInteger(0);
AtomicInteger acquiredLockTimes = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(totalThread);
for (int i = 0; i < totalThread; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
tryAcquireLockTimes.getAndIncrement();
Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10, 10000);
if (optLocked.isPresent()) {
acquiredLockTimes.getAndIncrement();
}
}
});
}
executor.awaitTermination(15, TimeUnit.SECONDS);
Assert.assertTrue(tryAcquireLockTimes.get() == totalThread);
Assert.assertTrue(acquiredLockTimes.get() == 1);
}
@Test
public void tryLockWithLeaseTimeOnMultiThread2() throws Exception {
int totalThread = 100;
String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
AtomicInteger tryAcquireLockTimes = new AtomicInteger(0);
AtomicInteger acquiredLockTimes = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(totalThread);
for (int i = 0; i < totalThread; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
long now = System.currentTimeMillis();
Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10000, 5);
long cost = System.currentTimeMillis() - now;
log.info("tryAcquireLockTimes={}||wait={}", tryAcquireLockTimes.incrementAndGet(), cost);
if (optLocked.isPresent()) {
acquiredLockTimes.getAndIncrement();
// 主动释放锁
optLocked.get().unlock();
}
}
});
}
executor.awaitTermination(20, TimeUnit.SECONDS);
log.info("tryAcquireLockTimes={}, acquireLockTimes={}", tryAcquireLockTimes.get(), acquiredLockTimes.get());
Assert.assertTrue(tryAcquireLockTimes.get() == totalThread);
Assert.assertTrue(acquiredLockTimes.get() == totalThread);
}
}
public interface DistributedLocker {
Optional<LockResource> tryLock(String lockKey, int waitTime);
Optional<LockResource> tryLock(String lockKey, int waitTime, int leaseTime);
}
public interface LockResource {
void unlock();
}
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 蝶梦庄生!
评论