【Redisson starter】 示例

Metadata

title: 【Redisson starter】 示例
date: 2023-01-22 21:55
tags:
  - 行动阶段/完成
  - 主题场景/组件
  - 笔记空间/KnowladgeSpace/ProgramSpace/ModuleSpace
  - 细化主题/Module/Redisson
categories:
  - Redisson
keywords:
  - Redisson
description: 【Redisson starter】 示例

【Redisson starter】 示例

@Component
@Slf4j
public class DistributedRedisLock {

    @Autowired
    private RedissonClient redissonClient;


    private static final String DEFAULT_LOCK_NAME = "redisLock_";

    //加锁
    public boolean lock(String lockName) {
        //声明key对象
        String key = DEFAULT_LOCK_NAME + lockName;
        //获取锁对象
        RLock mylock = redissonClient.getLock(key);
        //加锁,并且设置锁过期时间3秒,防止死锁的产生  uuid+threadId
        mylock.lock();
        //加锁成功
        return true;
    }

    public boolean lock(String lockName, long timeout) {
        checkRedissonClient();
        RLock lock = getLock(lockName);
        try {
            if(timeout != -1){
                // timeout:超时时间   TimeUnit.SECONDS:单位
                lock.lock(timeout, TimeUnit.SECONDS);
            }else{
                lock.lock();
            }
            log.debug(" get lock success ,lockKey:{}", lockName);
            return true;
        } catch (Exception e) {
            log.error(" get lock fail,lockKey:{}, cause:{} ",
                    lockName, e.getMessage());
            return false;
        }
    }


    private void checkRedissonClient() {
        if (null == redissonClient) {
            log.error(" redissonClient is null ,please check redis instance ! ");
            throw new RuntimeException("redissonClient is null ,please check redis instance !");
        }
        if (redissonClient.isShutdown()) {
            log.error(" Redisson instance has been shut down !!!");
            throw new RuntimeException("Redisson instance has been shut down !!!");
        }
    }



    /**
     * 解锁
     * @param lockName
     */
    public void unlock(String lockName){
        checkRedissonClient();
        try {
            RLock lock = getLock(lockName);
            if(lock.isLocked() && lock.isHeldByCurrentThread()){
                lock.unlock();
                log.debug("key:{},unlock success",lockName);
            }else{
                log.debug("key:{},没有加锁或者不是当前线程加的锁 ",lockName);
            }
        }catch (Exception e){
            log.error("key:{},unlock error,reason:{}",lockName,e.getMessage());
        }
    }


    private RLock getLock(String lockName) {
        String key = DEFAULT_LOCK_NAME + lockName;
        return redissonClient.getLock(key);
    }


    /**
     * 可中断锁
     * @param lockName 锁名称
     * @param waitTimeout  等待时长
     * @param unit 时间单位
     * @return
     */
    public boolean tryLock(String lockName, long waitTimeout, TimeUnit unit) {
        checkRedissonClient();
        RLock lock = getLock(lockName);
        try {
            boolean res = lock.tryLock(waitTimeout,unit);
            if (!res) {
                log.debug(" get lock fail ,lockKey:{}", lockName);
                return false;
            }
            log.debug(" get lock success ,lockKey:{}", lockName);
            return true;
        } catch (Exception e) {
            log.error(" get lock fail,lockKey:{}, cause:{} ",
                    lockName, e.getMessage());
            return false;
        }
    }



    /**
     * 公平锁
     * @param lockName
     * @param waitTimeout
     * @param timeout
     * @param unit
     * @return
     */
    public boolean getFairLock(String lockName, long waitTimeout,long timeout, TimeUnit unit){
        checkRedissonClient();
        RLock lock = redissonClient.getFairLock(DEFAULT_LOCK_NAME + lockName);
        try {
            boolean res = lock.tryLock(waitTimeout,timeout,unit);
            if (!res) {
                log.debug(" get lock fail ,lockKey:{}", lockName);
                return false;
            }
            log.debug(" get lock success ,lockKey:{}", lockName);
            return true;
        } catch (Exception e) {
            log.error(" get lock fail,lockKey:{}, cause:{} ",
                    lockName, e.getMessage());
            return false;
        }
    }







    /**
     * 获取读写锁
     * @param lockName
     * @return
     */
    public RReadWriteLock getReadWriteLock(String lockName) {
        return redissonClient.getReadWriteLock(lockName);

    }

    /**
     * 信号量
     * @param semaphoreName
     * @return
     */
    public RSemaphore getSemaphore(String semaphoreName) {
        return redissonClient.getSemaphore(semaphoreName);
    }

    /**
     * 可过期性信号量
     * @param permitExpirableSemaphoreName
     * @return
     */
    public RPermitExpirableSemaphore getPermitExpirableSemaphore(String permitExpirableSemaphoreName) {

        return redissonClient.getPermitExpirableSemaphore(permitExpirableSemaphoreName);
    }

    /**
     * 闭锁
     * @param countDownLatchName
     * @return
     */
    public RCountDownLatch getCountDownLatch(String countDownLatchName) {
        return redissonClient.getCountDownLatch(countDownLatchName);
    }

}

测试用例

@RunWith(SpringRunner.class)
@SpringBootTest
public class RedissonDistributedLockerTest {
 
    private static final Logger log = LoggerFactory.getLogger(RedissonDistributedLocker.class);
 
    @Resource
    private DistributedLocker distributedLocker;
 
    private static final ExecutorService executorServiceB = Executors.newSingleThreadExecutor();
 
    private static final ExecutorService executorServiceC = Executors.newSingleThreadExecutor();
 
    @Test
    public void tryLockUnlockCost() throws Exception {
        StopWatch stopWatch = new StopWatch("加锁解锁耗时统计");
        stopWatch.start();
        for (int i = 0; i < 10000; i++) {
            String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
            Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
            Assert.assertTrue(optLocked.isPresent());
            optLocked.get().unlock();
        }
        stopWatch.stop();
        log.info(stopWatch.prettyPrint());
    }
 
    @Test
    public void tryLock() throws Exception {
        String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
        Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
        Assert.assertTrue(optLocked.isPresent());
 
        Optional<LockResource> optLocked2 = distributedLocker.tryLock(key, 600000, 600000);
        Assert.assertTrue(optLocked2.isPresent());
 
        optLocked.get().unlock();
    }
 
    /**
     * 模拟2个线程争抢锁:A先获取到锁,A释放锁后,B再获得锁
     */
    @Test
    public void tryLock2() throws Exception {
        String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Future<Optional<LockResource>> submit = executorServiceB.submit(() -> {
                    countDownLatch.await();
                    log.info("B尝试获得锁:thread={}", currentThreadId());
                    return distributedLocker.tryLock(key, 600000, 600000);
                }
        );
 
        log.info("A尝试获得锁:thread={}", currentThreadId());
        Optional<LockResource> optLocked = distributedLocker.tryLock(key, 300000, 600000);
        Assert.assertTrue(optLocked.isPresent());
 
        log.info("A已获得锁:thread={}", currentThreadId());
        countDownLatch.countDown();
 
        optLocked.get().unlock();
        log.info("A已释放锁:thread={}", currentThreadId());
 
        Optional<LockResource> lockResource2 = submit.get();
        Assert.assertTrue(lockResource2.isPresent());
 
        executorServiceB.submit(() -> {
            log.info("B已获得锁:thread={}", currentThreadId());
            lockResource2.get().unlock();
            log.info("B已释放锁:thread={}", currentThreadId());
        });
    }
 
    /**
     * 模拟3个线程争抢锁:A先获取到锁,A释放锁后,B和C同时争抢锁
     */
    @Test
    public void tryLock3() throws Exception {
        String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
 
        log.info("A尝试获得锁:thread={}", currentThreadId());
        Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
 
        if (optLocked.isPresent()) {
            log.info("A已获得锁:thread={}", currentThreadId());
        }
        Assert.assertTrue(optLocked.isPresent());
 
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        Future<Optional<LockResource>> submitB = executorServiceB.submit(() -> {
                    cyclicBarrier.await();
                    log.info("B尝试获得锁:thread={}", currentThreadId());
                    return distributedLocker.tryLock(key, 600000, 600000);
                }
        );
 
        Future<Optional<LockResource>> submitC = executorServiceC.submit(() -> {
                    cyclicBarrier.await();
                    log.info("C尝试获得锁:thread={}", currentThreadId());
                    return distributedLocker.tryLock(key, 600000, 600000);
                }
        );
 
        optLocked.get().unlock();
        log.info("A已释放锁:thread={}", currentThreadId());
 
        CountDownLatch countDownLatch = new CountDownLatch(2);
        executorServiceB.submit(() -> {
            log.info("B已获得锁:thread={}", currentThreadId());
            try {
                submitB.get().get().unlock();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            log.info("B已释放锁:thread={}", currentThreadId());
            countDownLatch.countDown();
        });
 
        executorServiceC.submit(() -> {
            log.info("C已获得锁:thread={}", currentThreadId());
            try {
                submitC.get().get().unlock();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            log.info("C已释放锁:thread={}", currentThreadId());
            countDownLatch.countDown();
        });
 
        countDownLatch.await();
    }
 
    private static Long currentThreadId() {
        return Thread.currentThread().getId();
    }
 
    @Test
    public void tryLockWaitTimeout() throws Exception {
        String key = "mock-key:" + UUID.randomUUID().toString();
 
        Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10, 2000);
        Assert.assertTrue(optLocked.isPresent());
 
        Optional<LockResource> optLockResource = CompletableFuture.supplyAsync(() -> {
                long now = System.currentTimeMillis();
                Optional<LockResource> optLockedAgain = distributedLocker.tryLock(key, 1000, 10);
                long cost = System.currentTimeMillis() - now;
                log.info("cost={}", cost);
                return optLockedAgain;
        }).exceptionally(th -> {
            log.error("Exception: ", th);
            return Optional.empty();
        }).join();
 
        Assert.assertTrue(!optLockResource.isPresent());
    }
 
    @Test
    public void tryLockWithLeaseTime() throws Exception {
        String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
        Optional<LockResource> optLocked = distributedLocker.tryLock(key, 3000, 1000);
        Assert.assertTrue(optLocked.isPresent());
 
        // 可重入
        Optional<LockResource> optLockedAgain = distributedLocker.tryLock(key, 3000, 1000);
        Assert.assertTrue(optLockedAgain.isPresent());
    }
 
    /**
     * 模拟1000个并发请求枪一把锁
     */
    @Test
    public void tryLockWithLeaseTimeOnMultiThread() throws Exception {
        int totalThread = 1000;
        String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
        AtomicInteger tryAcquireLockTimes = new AtomicInteger(0);
        AtomicInteger acquiredLockTimes = new AtomicInteger(0);
 
        ExecutorService executor = Executors.newFixedThreadPool(totalThread);
        for (int i = 0; i < totalThread; i++) {
            executor.submit(new Runnable() {
 
                @Override
                public void run() {
                    tryAcquireLockTimes.getAndIncrement();
                    Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10, 10000);
                    if (optLocked.isPresent()) {
                        acquiredLockTimes.getAndIncrement();
                    }
                }
            });
        }
        executor.awaitTermination(15, TimeUnit.SECONDS);
 
        Assert.assertTrue(tryAcquireLockTimes.get() == totalThread);
        Assert.assertTrue(acquiredLockTimes.get() == 1);
    }
 
    @Test
    public void tryLockWithLeaseTimeOnMultiThread2() throws Exception {
        int totalThread = 100;
        String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
        AtomicInteger tryAcquireLockTimes = new AtomicInteger(0);
        AtomicInteger acquiredLockTimes = new AtomicInteger(0);
 
        ExecutorService executor = Executors.newFixedThreadPool(totalThread);
        for (int i = 0; i < totalThread; i++) {
            executor.submit(new Runnable() {
 
                @Override
                public void run() {
                    long now = System.currentTimeMillis();
                    Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10000, 5);
                    long cost = System.currentTimeMillis() - now;
                    log.info("tryAcquireLockTimes={}||wait={}", tryAcquireLockTimes.incrementAndGet(), cost);
                    if (optLocked.isPresent()) {
                        acquiredLockTimes.getAndIncrement();
                        // 主动释放锁
                        optLocked.get().unlock();
                    }
                }
            });
        }
        executor.awaitTermination(20, TimeUnit.SECONDS);
 
        log.info("tryAcquireLockTimes={}, acquireLockTimes={}", tryAcquireLockTimes.get(), acquiredLockTimes.get());
        Assert.assertTrue(tryAcquireLockTimes.get() == totalThread);
        Assert.assertTrue(acquiredLockTimes.get() == totalThread);
    }
 
}
 
 
public interface DistributedLocker {
 
    Optional<LockResource> tryLock(String lockKey, int waitTime);
 
    Optional<LockResource> tryLock(String lockKey, int waitTime, int leaseTime);
 
}
 
public interface LockResource {
 
    void unlock();
 
}