首頁 > 軟體

Redisson分散式鎖之加解鎖詳解

2023-03-17 06:06:40

引言

2023的金三銀四來的沒想象中那麼激烈,一個朋友前段時間投了幾十家,多數石沉大海,好不容易等來面試機會,就恰好被問道專案中關於分散式鎖的應用,後涉及Redisson實現分散式鎖的原理,答不上來。

鎖的可重入性

我們都知道,Java中synchronized和lock都支援可重入,synchronized的鎖關聯一個執行緒持有者和一個計數器。當一個執行緒請求成功後,JVM會記下持有鎖的執行緒,並將計數器計為1。此時其他執行緒請求該鎖,則必須等待。而該持有鎖的執行緒如果再次請求這個鎖,就可以再次拿到這個鎖,同時計數器會遞增。當執行緒退出一個synchronized方法/塊時,計數器會遞減,如果計數器為0則釋放該鎖;在ReentrantLock中,底層的 AQS 對應的state 同步狀態值表示執行緒獲取該鎖的可重入次數,通過CAS方式進行設定,在預設情況下,state的值為0 表示當前鎖沒有被任何執行緒持有,原理類似。所以如果想要實現可重入性,可能須有一個計數器來控制重入次數,實際Redisson確實是這麼做的。

好的我們通過Redisson使用者端進行設定,並回圈3次,模擬鎖重入:000

for(int i = 0; i < 3; i++) {      
    RedissonLockUtil.tryLock("distributed:lock:distribute_key", TimeUnit.SECONDS, 20, 100); 
 }

連線Redis使用者端進行檢視:

可以看到,我們設定的分散式鎖是存在一個hash結構中,value看起來是迴圈的次數3,key就不怎麼認識了,那這個key是怎麼設定進去的呢,另外為什麼要設定成為Hash型別呢?

加鎖

我們先來看看普通的分散式鎖的上鎖流程:

說明:

  • 使用者端在進行加鎖時,會校驗如果業務上沒有設定持有鎖時長leaseTime,會啟動看門狗來每隔10s進行續命,否則就直接以leaseTime作為持有的時長;
  • 並行場景下,如果使用者端1鎖還未釋放,使用者端2嘗試獲取,加鎖必然失敗,然後會通過釋出訂閱模式來訂閱Key的釋放通知,並繼續進入後續的搶鎖流程。
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
      long time = unit.toMillis(waitTime);
      long current = System.currentTimeMillis();
      long threadId = Thread.currentThread().getId();
      Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
      if (ttl == null) {
         return true;
      } else {
         // 訂閱分散式Key對應的訊息,監聽其它鎖持有者釋放,鎖沒有釋放的時候則會等待,直到鎖釋放的時候會執行下面的while迴圈
         CompletableFuture subscribeFuture = this.subscribe(threadId);
         subscribeFuture.get(time, TimeUnit.MILLISECONDS);
         try {
            do {
               // 嘗試獲取鎖
               ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
               // 競爭獲取鎖成功,退出迴圈,不再競爭。
               if (ttl == null) {
                  return true;
               }
               // 利用號誌機制阻塞當前執行緒相應時間,之後再重新獲取鎖
               if (ttl >= 0L && ttl < time) {
                  ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
               } else {
                  ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
               }
               time -= System.currentTimeMillis() - currentTime;
            } while(time > 0L);
         } finally {
            // 競爭鎖成功後,取消訂閱該執行緒Id事件
            this.unsubscribe((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture), threadId);
         }
      }
   }
}
RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        // 如果設定了持有鎖的時長,直接進行嘗試加鎖操作
         if (leaseTime != -1L) {
            return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            // 未設定加鎖時長,在加鎖成功後,啟動續期任務,初始預設持有鎖時間是30s
            RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
            ttlRemainingFuture.addListener(new FutureListener<Long>() {
                public void operationComplete(Future<Long> future) throws Exception {
                    if (future.isSuccess()) {
                        Long ttlRemaining = (Long)future.getNow();
                        if (ttlRemaining == null) {
                            RedissonLock.this.scheduleExpirationRenewal(threadId);
                        }
                    }
                }
            });
            return ttlRemainingFuture;
        }
    }

我們都知道Redis執行Lua指令碼具有原子性,所以在嘗試加鎖的下層,Redis主要執行了一段複雜的lua指令碼:

-- 不存在該key時
if (redis.call('exists', KEYS[1]) == 0) then
      -- 新增該鎖並且hash中該執行緒id對應的count置1
redis.call('hincrby', KEYS[1], ARGV[2], 1);
-- 設定過期時間
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 存在該key 並且 hash中執行緒id的key也存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
      -- 執行緒重入次數++
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);

引數說明:

KEYS[1]:對應我們設定的分散式key,即:distributed:lock:distribute_key

ARGV[1]:業務自定義的加鎖時長或者預設的30s;

ARGV[2]: 具體的使用者端初始化連線UUID+執行緒ID: 9d8f0907-1165-47d2-8983-1e130b07ad0c:1

我們從上面的指令碼中可以看出核心邏輯其實不難:

  • 如果分散式鎖Key未被任何端持有,直接根據“使用者端連線ID+執行緒ID” 進行初始化設定,並設定重入次數為1,並設定Key的過期時間;
  • 否則重入次數+1,並重置過期時間;

鎖續命

接下來看看scheduleExpirationRenewal續命是怎麼做的呢?

private void scheduleExpirationRenewal(final long threadId) {
   if (!expirationRenewalMap.containsKey(this.getEntryName())) {
      Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
         public void run(Timeout timeout) throws Exception {
            // 執行續命操作
            RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
            future.addListener(new FutureListener<Boolean>() {
               public void operationComplete(Future<Boolean> future) throws Exception {
                  RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
                          ...
                  // 續命成功,繼續
                  if ((Boolean)future.getNow()) {
                     RedissonLock.this.scheduleExpirationRenewal(threadId);
                  }
               }
            });
         }
      }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
   }
}

Tip小知識點:

  • 續期是用的什麼定時任務執行的?
    Redisson用netty的HashedWheelTimer做命令重試機制,原因在於一條redis命令的執行不論成功或者失敗耗時都很短,而HashedWheelTimer是單執行緒的,系統效能開銷小。

而在上面的renewExpirationAsync中續命操作的執行核心Lua指令碼要做的事情也非常的簡單,就是給這個Key的過期時間重新設定為指定的30s.

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1;
end;
return 0;

釋放鎖

釋放鎖主要是除了解鎖本省,另外還要考慮到如果存在續期的情況,要將續期任務刪除:

public RFuture<Void> unlockAsync(long threadId) {
   // 解鎖
   RFuture<Boolean> future = this.unlockInnerAsync(threadId);
   CompletionStage<Void> f = future.handle((opStatus, e) -> {
      // 解除續期
      this.cancelExpirationRenewal(threadId);
      ...
   });
   return new CompletableFutureWrapper(f);
}

在unlockInnerAsync內部,Redisson釋放鎖其實核心也是執行了如下一段核心Lua指令碼:

    // 校驗是否存在
    if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
      return nil;
      end;
    // 獲取加鎖次數,校驗是否為重入鎖
    local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
    // 如果為重入鎖,重置過期時間,鎖本身不釋放
    if (counter > 0) then
      redis.call('pexpire', KEYS[1], ARGV[2]);
      return 0;
   // 刪除Key
    else redis.call('del', KEYS[1]);
      // 通知阻塞的使用者端可以搶鎖啦
      redis.call('publish', KEYS[2], ARGV[1]);
      return 1;
      end;
      return nil;

其中:

KEYS[1]: 分散式鎖
KEYS[2]: redisson_lock_channel:{分散式鎖} 釋出訂閱訊息的管道名稱
ARGV[1]: 釋出的訊息內容
ARGV[2]: 鎖的過期時間
ARGV[3]: 執行緒ID標識名稱

其它問題

  • 紅鎖這麼火,但真的靠譜麼?
  • Redisson公平鎖是什麼情況?

以上就是Redisson分散式鎖第一彈-加解鎖的詳細內容,更多關於Redisson分散式鎖加解鎖的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com