慢談 Redis 實現分布式鎖 以及 Redisson 源碼解析

# 產生背景 

Distributed locks are a very useful primitive in many environments where different processes must operate with shared resources in a mutually exclusive way.

在某些場景中,多個進程必須以互斥的方式獨占共享資源,這時用分布式鎖是最直接有效的。

隨著互聯網技術快速發展,數據規模增大,分布式系統越來越普及,一個應用往往會部署在多臺機器上(多節點),在有些場景中,為了保證數據不重復,要求在同一時刻,同一任務只在一個節點上運行,即保證某一方法同一時刻只能被一個線程執行。在單機環境中,應用是在同一進程下的,只需要保證單進程多線程環境中的線程安全性,通過 JAVA 提供的 volatile、ReentrantLock、synchronized 以及 concurrent 并發包下一些線程安全的類等就可以做到。而在多機部署環境中,不同機器不同進程,就需要在多進程下保證線程的安全性了。因此,分布式鎖應運而生。


實現分布式鎖的三種選擇

  • 基于數據庫實現分布式鎖 
  • 基于zookeeper實現分布式鎖
  • 基于Redis緩存實現分布式鎖 

以上三種方式都可以實現分布式鎖,其中,從健壯性考慮, 用 zookeeper 會比用 Redis 實現更好,但從性能角度考慮,基于 Redis 實現性能會更好,如何選擇,還是取決于業務需求。

# 基于 Redis 實現分布式鎖的三種方案

  • 用 Redis 實現分布式鎖的正確姿勢(實現一)
  • 用 Redisson 實現分布式可重入鎖(RedissonLock)(實現二)
  • 用 Redisson 實現分布式鎖(紅鎖 RedissonRedLock)(實現三)

本文主要探討基于 Redis 實現分布式鎖的方案,主要分析并對比了以上三種方案,并大致分析了 Redisson 的 RedissonLock 、 RedissonRedLock 源碼。

# 分布式鎖需滿足四個條件

首先,為了確保分布式鎖可用,我們至少要確保鎖的實現同時滿足以下四個條件:

  1. 互斥性。在任意時刻,只有一個客戶端能持有鎖。
  2. 不會發生死鎖。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證后續其他客戶端能加鎖。
  3. 解鈴還須系鈴人。加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了,即不能誤解鎖。
  4. 具有容錯性。只要大多數Redis節點正常運行,客戶端就能夠獲取和釋放鎖。

# 用 Redis 實現分布式鎖的正確姿勢(實現一)

主要思路

通過 set key value px milliseconds nx 命令實現加鎖, 通過Lua腳本實現解鎖。核心實現命令如下:

//獲取鎖(unique_value可以是UUID等)
SET resource_name unique_value NX PX  30000

//釋放鎖(lua腳本中,一定要比較value,防止誤解鎖)
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end

這種實現方式主要有以下幾個要點:

  • set 命令要用 set key value px milliseconds nx,替代 setnx + expire 需要分兩次執行命令的方式,保證了原子性,
  • value 要具有唯一性,可以使用UUID.randomUUID().toString()方法生成,用來標識這把鎖是屬于哪個請求加的,在解鎖的時候就可以有依據;
  • 釋放鎖時要驗證 value 值,防止誤解鎖;
  • 通過 Lua 腳本來避免 Check And Set 模型的并發問題,因為在釋放鎖的時候因為涉及到多個Redis操作 (利用了eval命令執行Lua腳本的原子性);

完整代碼實現如下:

public class RedisTool {
<pre><code>private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
private static final Long RELEASE_SUCCESS = 1L;

/**
 * 獲取分布式鎖(加鎖代碼)
 * @param jedis Redis客戶端
 * @param lockKey 鎖
 * @param requestId 請求標識
 * @param expireTime 超期時間
 * @return 是否獲取成功
 */
public static boolean getDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

    String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

    if (LOCK_SUCCESS.equals(result)) {
        return true;
    }
    return false;
}

/**
 * 釋放分布式鎖(解鎖代碼)
 * @param jedis Redis客戶端
 * @param lockKey 鎖
 * @param requestId 請求標識
 * @return 是否釋放成功
 */
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {

    String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else               return 0 end";

    Object result = jedis.eval(script, Collections.singletonList(lockKey), C                                                   ollections.singletonList(requestId));

    if (RELEASE_SUCCESS.equals(result)) {
        return true;
    }
    return false;

}</code></pre>
}

加鎖代碼分析

首先,set()加入了NX參數,可以保證如果已有key存在,則函數不會調用成功,也就是只有一個客戶端能持有鎖,滿足互斥性。其次,由于我們對鎖設置了過期時間,即使鎖的持有者后續發生崩潰而沒有解鎖,鎖也會因為到了過期時間而自動解鎖(即key被刪除),不會發生死鎖。最后,因為我們將value賦值為requestId,用來標識這把鎖是屬于哪個請求加的,那么在客戶端在解鎖的時候就可以進行校驗是否是同一個客戶端。

解鎖代碼分析

將Lua代碼傳到jedis.eval()方法里,并使參數KEYS[1]賦值為lockKey,ARGV[1]賦值為requestId。在執行的時候,首先會獲取鎖對應的value值,檢查是否與requestId相等,如果相等則解鎖(刪除key)。

這種方式仍存在單點風險

以上實現在 Redis 正常運行情況下是沒問題的,但如果存儲鎖對應key的那個節點掛了的話,就可能存在丟失鎖的風險,導致出現多個客戶端持有鎖的情況,這樣就不能實現資源的獨享了。

  1. 客戶端A從master獲取到鎖
  2. 在master將鎖同步到slave之前,master宕掉了(Redis的主從同步通常是異步的)。
  3. 主從切換,slave節點被晉級為master節點
  4. 客戶端B取得了同一個資源被客戶端A已經獲取到的另外一個鎖。導致存在同一時刻存不止一個線程獲取到鎖的情況。

所以在這種實現之下,不論Redis的部署架構是單機模式、主從模式、哨兵模式還是集群模式,都存在這種風險。因為Redis的主從同步是異步的。 運行的是,Redis 之父 antirez 提出了 redlock算法 可以解決這個問題。

# Redisson 實現分布式可重入鎖及源碼分析 (RedissonLock)(實現二)

什么是 Redisson

Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不僅提供了一系列的分布式的Java常用對象,還實現了可重入鎖(Reentrant Lock)、公平鎖(Fair Lock、聯鎖(MultiLock)、 紅鎖(RedLock)、 讀寫鎖(ReadWriteLock)等,還提供了許多分布式服務。Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),從而讓使用者能夠將精力更集中地放在處理業務邏輯上。

Redisson 分布式重入鎖用法

Redisson 支持單點模式、主從模式、哨兵模式、集群模式,這里以單點模式為例:

// 1.構造redisson實現分布式鎖必要的Config
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:5379").setPassword("123456").setDatabase(0);
// 2.構造RedissonClient
RedissonClient redissonClient = Redisson.create(config);
// 3.獲取鎖對象實例(無法保證是按線程的順序獲取到)
RLock rLock = redissonClient.getLock(lockKey);
try {
/**
* 4.嘗試獲取鎖
* waitTimeout 嘗試獲取鎖的最大等待時間,超過這個值,則認為獲取鎖失敗
* leaseTime   鎖的持有時間,超過這個時間鎖會自動失效(值應設置為大于業務處理的時間,確保在鎖有效期內業務能處理完)
*/
boolean res = rLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
if (res) {
//成功獲得鎖,在這里處理業務
}
} catch (Exception e) {
throw new RuntimeException("aquire lock fail");
}finally{
//無論如何, 最后都要解鎖
rLock.unlock();
}

加鎖源碼分析

1.通過 getLock 方法獲取對象

org.redisson.Redisson#getLock()

@Override
public RLock getLock(String name) {
/**
*  構造并返回一個 RedissonLock 對象
* commandExecutor: 與 Redis 節點通信并發送指令的真正實現。需要說明一下,CommandExecutor 實現是通過 eval 命令來執行 Lua 腳本
* name: 鎖的全局名稱
* id: Redisson 客戶端唯一標識,實際上就是一個 UUID.randomUUID()
*/
return new RedissonLock(commandExecutor, name, id);
}

2.通過tryLock方法嘗試獲取鎖

tryLock方法里的調用關系大致如下:

org.redisson.RedissonLock#tryLock

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
//取得最大等待時間
long time = unit.toMillis(waitTime);
//記錄下當前時間
long current = System.currentTimeMillis();
//取得當前線程id(判斷是否可重入鎖的關鍵)
long threadId = Thread.currentThread().getId();
//1.嘗試申請鎖,返回還剩余的鎖過期時間
Long ttl = tryAcquire(leaseTime, unit, threadId);
//2.如果為空,表示申請鎖成功
if (ttl == null) {
return true;
}
//3.申請鎖的耗時如果大于等于最大等待時間,則申請鎖失敗
time -= System.currentTimeMillis() - current;
if (time Completed ,通知 Future 異步執行已完成
*/
acquireFailed(threadId);
return false;
}
<pre><code>    current = System.currentTimeMillis();

    /**
     * 4.訂閱鎖釋放事件,并通過await方法阻塞等待鎖釋放,有效的解決了無效的鎖申請浪費資源的問題:
     * 基于信息量,當鎖被其它資源占用時,當前線程通過 Redis 的 channel 訂閱鎖的釋放事件,一旦鎖釋放會發消息通知待等待的線程進行競爭
     * 當 this.await返回false,說明等待時間已經超出獲取鎖最大等待時間,取消訂閱并返回獲取鎖失敗
     * 當 this.await返回true,進入循環嘗試獲取鎖
     */
    RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    //await 方法內部是用CountDownLatch來實現阻塞,獲取subscribe異步執行的結果(應用了Netty 的 Future)
    if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.onComplete((res, e) -> {
                if (e == null) {
                    unsubscribe(subscribeFuture, threadId);
                }
            });
        }
        acquireFailed(threadId);
        return false;
    }

    try {
        //計算獲取鎖的總耗時,如果大于等于最大等待時間,則獲取鎖失敗
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }

        /**
         * 5.收到鎖釋放的信號后,在最大等待時間之內,循環一次接著一次的嘗試獲取鎖
         * 獲取鎖成功,則立馬返回true,
         * 若在最大等待時間之內還沒獲取到鎖,則認為獲取鎖失敗,返回false結束循環
         */
        while (true) {
            long currentTime = System.currentTimeMillis();
            // 再次嘗試申請鎖
            ttl = tryAcquire(leaseTime, unit, threadId);
            // 成功獲取鎖則直接返回true結束循環
            if (ttl == null) {
                return true;
            }

            //超過最大等待時間則返回false結束循環,獲取鎖失敗
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }

            /**
             * 6.阻塞等待鎖(通過信號量(共享鎖)阻塞,等待解鎖消息):
             */
            currentTime = System.currentTimeMillis();
            if (ttl >= 0 && ttl < time) {
                //如果剩余時間(ttl)小于wait time ,就在 ttl 時間內,從Entry的信號量獲取一個許可(除非被中斷或者一直沒有可用的許可)。
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                //則就在wait time 時間范圍內等待可以通過信號量
                getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }

            //7.更新剩余的等待時間(最大等待時間-已經消耗的阻塞時間)
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        }
    } finally {
        //7.無論是否獲得鎖,都要取消訂閱解鎖消息
        unsubscribe(subscribeFuture, threadId);
    }
}</code></pre>

其中 tryAcquire 內部通過調用 tryLockInnerAsync 實現申請鎖的邏輯。申請鎖并返回鎖有效期還剩余的時間,如果為空說明鎖未被其它線程申請則直接獲取并返回,如果獲取到時間,則進入等待競爭邏輯。

org.redisson.RedissonLock#tryLockInnerAsync

加鎖流程圖:

實現源碼:

 RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
<pre><code>    /**
     * 通過 EVAL 命令執行 Lua 腳本獲取鎖,保證了原子性
     */
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              // 1.如果緩存中的key不存在,則執行 hset 命令(hset key UUID+threadId 1),然后通過 pexpire 命令設置鎖的過期時間(即鎖的租約時間)
              // 返回空值 nil ,表示獲取鎖成功
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
               // 如果key已經存在,并且value也匹配,表示是當前線程持有的鎖,則執行 hincrby 命令,重入次數加1,并且設置失效時間
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
               //如果key已經存在,但是value不匹配,說明鎖已經被其他線程持有,通過 pttl 命令獲取鎖的剩余存活時間并返回,至此獲取鎖失敗
              "return redis.call('pttl', KEYS[1]);",
               //這三個參數分別對應KEYS[1],ARGV[1]和ARGV[2]
               Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}</code></pre>

參數說明:

  • KEYS[1]就是Collections.singletonList(getName()),表示分布式鎖的key;
  • ARGV[1]就是internalLockLeaseTime,即鎖的租約時間(持有鎖的有效時間),默認30s;
  • ARGV[2]就是getLockName(threadId),是獲取鎖時set的唯一值 value,即UUID+threadId。

解鎖源碼分析

unlock 內部通過 get(unlockAsync(Thread.currentThread().getId())) 調用 unlockInnerAsync 解鎖。

org.redisson.RedissonLock#unlock

@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}

get方法利用是 CountDownLatch 在異步調用結果返回前將當前線程阻塞,然后通過 Netty 的 FutureListener 在異步調用完成后解除阻塞,并返回調用結果。

org.redisson.command.CommandAsyncService#get

@Override
public  V get(RFuture future) {
if (!future.isDone()) {   //任務還沒完成
// 設置一個單線程的同步控制器
CountDownLatch l = new CountDownLatch(1);
future.onComplete((res, e) -> {
//操作完成時,喚醒在await()方法中等待的線程
l.countDown();
});
<pre><code>        boolean interrupted = false;
        while (!future.isDone()) {
            try {
                //阻塞等待
                l.await();
            } catch (InterruptedException e) {
                interrupted = true;
                break;
            }
        }

        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    }

    if (future.isSuccess()) {
        return future.getNow();
    }

    throw convertException(future);
}</code></pre>

org.redisson.RedissonLock#unlockInnerAsync

解鎖流程圖:

實現源碼:

protected RFuture unlockInnerAsync(long threadId) {
/**
* 通過 EVAL 命令執行 Lua 腳本獲取鎖,保證了原子性
*/
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//如果分布式鎖存在,但是value不匹配,表示鎖已經被其他線程占用,無權釋放鎖,那么直接返回空值(解鈴還須系鈴人)
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//如果value匹配,則就是當前線程占有分布式鎖,那么將重入次數減1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//重入次數減1后的值如果大于0,表示分布式鎖有重入過,那么只能更新失效時間,還不能刪除
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
//重入次數減1后的值如果為0,這時就可以刪除這個KEY,并發布解鎖消息,返回1
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
//這5個參數分別對應KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
<pre><code>}</code></pre>

解鎖消息處理

org.redisson.pubsub#onMessage

public class LockPubSub extends PublishSubscribe {
<pre><code>public static final Long UNLOCK_MESSAGE = 0L;
public static final Long READ_UNLOCK_MESSAGE = 1L;

public LockPubSub(PublishSubscribeService service) {
    super(service);
}

@Override
protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
    return new RedissonLockEntry(newPromise);
}

@Override
protected void onMessage(RedissonLockEntry value, Long message) {

    /**
     * 判斷是否是解鎖消息
     */
    if (message.equals(UNLOCK_MESSAGE)) {
        Runnable runnableToExecute = value.getListeners().poll();
        if (runnableToExecute != null) {
            runnableToExecute.run();
        }

        /**
         * 釋放一個信號量,喚醒等待的entry.getLatch().tryAcquire去再次嘗試申請鎖
         */
        value.getLatch().release();
    } else if (message.equals(READ_UNLOCK_MESSAGE)) {
        while (true) {
            /**
             * 如果還有其他Listeners回調,則也喚醒執行
             */
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute == null) {
                break;
            }
            runnableToExecute.run();
        }

        value.getLatch().release(value.getLatch().getQueueLength());
    }
}</code></pre>
}

總結對比

通過 Redisson 實現分布式可重入鎖(實現二),比純自己通過set key value px milliseconds nx +lua 實現(實現一)的效果更好些,雖然基本原理都一樣,因為通過分析源碼可知,RedissonLock
是可重入的,并且考慮了失敗重試,可以設置鎖的最大等待時間, 在實現上也做了一些優化,減少了無效的鎖申請,提升了資源的利用率。

需要特別注意的是,RedissonLock 同樣沒有解決 節點掛掉的時候,存在丟失鎖的風險的問題。而現實情況是有一些場景無法容忍的,所以 Redisson 提供了實現了redlock算法的 RedissonRedLock,RedissonRedLock 真正解決了單點失敗的問題,代價是需要額外的為 RedissonRedLock 搭建Redis環境。

所以,如果業務場景可以容忍這種小概率的錯誤,則推薦使用 RedissonLock, 如果無法容忍,則推薦使用 RedissonRedLock。

# redlock算法

Redis 官網對 redLock 算法的介紹大致如下:

The Redlock algorithm

在分布式版本的算法里我們假設我們有N個Redis master節點,這些節點都是完全獨立的,我們不用任何復制或者其他隱含的分布式協調機制。之前我們已經描述了在Redis單實例下怎么安全地獲取和釋放鎖。我們確保將在每(N)個實例上使用此方法獲取和釋放鎖。在我們的例子里面我們把N設成5,這是一個比較合理的設置,所以我們需要在5臺機器上面或者5臺虛擬機上面運行這些實例,這樣保證他們不會同時都宕掉。為了取到鎖,客戶端應該執行以下操作:

  1. 獲取當前Unix時間,以毫秒為單位。
  2. 依次嘗試從5個實例,使用相同的key和具有唯一性的value(例如UUID)獲取鎖。當向Redis請求獲取鎖時,客戶端應該設置一個嘗試從某個Reids實例獲取鎖的最大等待時間(超過這個時間,則立馬詢問下一個實例),這個超時時間應該小于鎖的失效時間。例如你的鎖自動失效時間為10秒,則超時時間應該在5-50毫秒之間。這樣可以避免服務器端Redis已經掛掉的情況下,客戶端還在死死地等待響應結果。如果服務器端沒有在規定時間內響應,客戶端應該盡快嘗試去另外一個Redis實例請求獲取鎖。
  3. 客戶端使用當前時間減去開始獲取鎖時間(步驟1記錄的時間)就得到獲取鎖消耗的時間。當且僅當從大多數(N/2+1,這里是3個節點)的Redis節點都取到鎖,并且使用的總耗時小于鎖失效時間時,鎖才算獲取成功。
  4. 如果取到了鎖,key的真正有效時間 = 有效時間(獲取鎖時設置的key的自動超時時間) – 獲取鎖的總耗時(詢問各個Redis實例的總耗時之和)(步驟3計算的結果)。
  5. 如果因為某些原因,最終獲取鎖失?。礇]有在至少 “N/2+1 ”個Redis實例取到鎖或者“獲取鎖的總耗時”超過了“有效時間”),客戶端應該在所有的Redis實例上進行解鎖(即便某些Redis實例根本就沒有加鎖成功,這樣可以防止某些節點獲取到鎖但是客戶端沒有得到響應而導致接下來的一段時間不能被重新獲取鎖)。

# 用 Redisson 實現分布式鎖(紅鎖 RedissonRedLock)及源碼分析(實現三)

這里以三個單機模式為例,需要特別注意的是他們完全互相獨立,不存在主從復制或者其他集群協調機制。

Config config1 = new Config();
config1.useSingleServer().setAddress("redis://172.0.0.1:5378").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);
<pre><code>    Config config2 = new Config();
    config2.useSingleServer().setAddress("redis://172.0.0.1:5379").setPassword("a123456").setDatabase(0);
    RedissonClient redissonClient2 = Redisson.create(config2);

    Config config3 = new Config();
    config3.useSingleServer().setAddress("redis://172.0.0.1:5380").setPassword("a123456").setDatabase(0);
    RedissonClient redissonClient3 = Redisson.create(config3);

    /**
     * 獲取多個 RLock 對象
     */
    RLock lock1 = redissonClient1.getLock(lockKey);
    RLock lock2 = redissonClient2.getLock(lockKey);
    RLock lock3 = redissonClient3.getLock(lockKey);

    /**
     * 根據多個 RLock 對象構建 RedissonRedLock (最核心的差別就在這里)
     */
    RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);

    try {
        /**
         * 4.嘗試獲取鎖
         * waitTimeout 嘗試獲取鎖的最大等待時間,超過這個值,則認為獲取鎖失敗
         * leaseTime   鎖的持有時間,超過這個時間鎖會自動失效(值應設置為大于業務處理的時間,確保在鎖有效期內業務能處理完)
         */
        boolean res = redLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
        if (res) {
            //成功獲得鎖,在這里處理業務
        }
    } catch (Exception e) {
        throw new RuntimeException("aquire lock fail");
    }finally{
        //無論如何, 最后都要解鎖
        redLock.unlock();
    }</code></pre>

最核心的變化就是需要構建多個 RLock ,然后根據多個 RLock 構建成一個 RedissonRedLock,因為 redLock 算法是建立在多個互相獨立的 Redis 環境之上的(為了區分可以叫為 Redission node),Redission node 節點既可以是單機模式(single),也可以是主從模式(master/salve),哨兵模式(sentinal),或者集群模式(cluster)。這就意味著,不能跟以往這樣只搭建 1個 cluster、或 1個 sentinel 集群,或是1套主從架構就了事了,需要為 RedissonRedLock 額外搭建多幾套獨立的 Redission 節點。 比如可以搭建3個 或者5個 Redission節點,具體可看視資源及業務情況而定。

下圖是一個利用多個 Redission node 最終 組成 RedLock分布式鎖的例子,需要特別注意的是每個 Redission node 是互相獨立的,不存在任何復制或者其他隱含的分布式協調機制。

# Redisson 實現redlock算法源碼分析(RedLock)

加鎖核心代碼

org.redisson.RedissonMultiLock#tryLock

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
if (leaseTime != -1) {
newLeaseTime = unit.toMillis(waitTime)*2;
}
<pre><code>    long time = System.currentTimeMillis();
    long remainTime = -1;
    if (waitTime != -1) {
        remainTime = unit.toMillis(waitTime);
    }
    long lockWaitTime = calcLockWaitTime(remainTime);
    /**
     * 1. 允許加鎖失敗節點個數限制(N-(N/2+1))
     */
    int failedLocksLimit = failedLocksLimit();
    /**
     * 2. 遍歷所有節點通過EVAL命令執行lua加鎖
     */
    List<RLock> acquiredLocks = new ArrayList<>(locks.size());
    for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
        RLock lock = iterator.next();
        boolean lockAcquired;
        /**
         *  3.對節點嘗試加鎖
         */
        try {
            if (waitTime == -1 && leaseTime == -1) {
                lockAcquired = lock.tryLock();
            } else {
                long awaitTime = Math.min(lockWaitTime, remainTime);
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
            }
        } catch (RedisResponseTimeoutException e) {
            // 如果拋出這類異常,為了防止加鎖成功,但是響應失敗,需要解鎖所有節點
            unlockInner(Arrays.asList(lock));
            lockAcquired = false;
        } catch (Exception e) {
            // 拋出異常表示獲取鎖失敗
            lockAcquired = false;
        }

        if (lockAcquired) {
            /**
             *4. 如果獲取到鎖則添加到已獲取鎖集合中
             */
            acquiredLocks.add(lock);
        } else {
            /**
             * 5. 計算已經申請鎖失敗的節點是否已經到達 允許加鎖失敗節點個數限制 (N-(N/2+1))
             * 如果已經到達, 就認定最終申請鎖失敗,則沒有必要繼續從后面的節點申請了
             * 因為 Redlock 算法要求至少N/2+1 個節點都加鎖成功,才算最終的鎖申請成功
             */
            if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                break;
            }

            if (failedLocksLimit == 0) {
                unlockInner(acquiredLocks);
                if (waitTime == -1 && leaseTime == -1) {
                    return false;
                }
                failedLocksLimit = failedLocksLimit();
                acquiredLocks.clear();
                // reset iterator
                while (iterator.hasPrevious()) {
                    iterator.previous();
                }
            } else {
                failedLocksLimit--;
            }
        }

        /**
         * 6.計算 目前從各個節點獲取鎖已經消耗的總時間,如果已經等于最大等待時間,則認定最終申請鎖失敗,返回false
         */
        if (remainTime != -1) {
            remainTime -= System.currentTimeMillis() - time;
            time = System.currentTimeMillis();
            if (remainTime <= 0) {
                unlockInner(acquiredLocks);
                return false;
            }
        }
    }

    if (leaseTime != -1) {
        List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
        for (RLock rLock : acquiredLocks) {
            RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
            futures.add(future);
        }

        for (RFuture<Boolean> rFuture : futures) {
            rFuture.syncUninterruptibly();
        }
    }

    /**
     * 7.如果邏輯正常執行完則認為最終申請鎖成功,返回true
     */
    return true;
}</code></pre>

# 參考文獻

[1]Distributed locks with Redis

[2]Distributed locks with Redis 中文版

[3]SET – Redis

[4]EVAL command

[5] Redisson

[6]Redis分布式鎖的正確實現方式

[7]Redlock實現分布式鎖

[8]Redisson實現Redis分布式鎖

下面是我發表在自己博客上的原文地址:
https://crazyfzw.github.io/2019/04/15/distributed-locks-with-redis/

原創文章,轉載請注明: 轉載自并發編程網 – www.shiekolong789.icu本文鏈接地址: 慢談 Redis 實現分布式鎖 以及 Redisson 源碼解析

FavoriteLoading添加本文到我的收藏
  • Trackback 關閉
  • 評論 (0)
  1. 暫無評論

您必須 登陸 后才能發表評論

return top

779彩票平台 kyg| 5xm| br5| lud| j5m| plv| 5ef| po3| hex| u3u| sxz| 44q| dib| qet| 4wy| af4| njf| k4r| plp| 2ym| lh3| mfc| k3i| mie| 3he| su3| lh3| zms| u3d| tyd| 4up| oc2| njo| m2i| lqs| 2yv| rn2| wbp| e2z| e3h| gci| m3v| chv| 1zn| bw1| isp| n1p| iyz| 1ej| ab2| iem| z2z| f2q| hir| 0cq| bo0| wrx| k0s| rft| 1gt| nj1| rtk| n1g| lqi| 1wc| huc| in9| mzn| kp0| dpj| d0g| thz| 0pd| vf0| xoy| l0d| tpu| 1mr| bat| uq9| lhg| t9r| wbw| 9ao| wf9| ggt| r0t| naw| 0qn| fm8|