Реализация распределенной блокировки с помощью Go и Redis

Зачем нужны распределенные блокировки

  • Размещение заказов (Блокировка uid для предотвращения дублирования заказов.)
  • Вычет из запаса (Блокировка инвентаря для предотвращения перепроданности.)
  • Балансовый вычет (Блокировка учетной записи для предотвращения одновременных операций.)

Распределенные блокировки часто необходимы для обеспечения согласованности ресурсов изменений при совместном использовании одного и того же ресурса в распределенных системах.

Функции, которыми должны обладать распределенные блокировки

  • Эксклюзивность
  • В случае возникновения взаимоблокировки на критическом ресурсе в сценарии с высоким уровнем параллелизма очень сложно устранить неполадки, и обычно этого можно избежать, установив тайм-аут для автоматического снятия блокировки по истечении срока его действия.
  • Повторный вход
  • Высокая производительность и доступность

Ключевые моменты по реализации блокировок Redis

  • установка команды

SET key value [EX seconds] [PX milliseconds] [NX|XX]

  • EXsecond: Устанавливает время истечения срока действия ключа в секундах.
  • PXмиллисекунда: устанавливает время истечения срока действия ключа в миллисекундах. 
  • NX: Устанавливайте ключ, только если он не существует. 
  • XX: Устанавливает ключ, только если он уже существует.
  • Redis скрипт lua

Код lua redis инкапсулирует серию командных операций в конвейер для достижения атомарности всей операции.

Объяснение кода

core/stores/redis/redislock.go 1.

  • процесс блокировки
-- KEYS[1]: lock key
-- ARGV[1]: lock value, random string
-- ARGV[2]: expiration time
-- determine if the value held by the lock key is equal to the value passed in
-- If equal, it means that the lock is acquired again and the acquisition time is updated to prevent expiration on reentry
-- this means it is a "reentrant lock"
if redis.call("GET", KEYS[1]) == ARGV[1] then
    -- set
    redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])
    return "OK"
else
    -- If the lock key.value is not equal to the incoming value, it is the first time to get the lock.
    -- SET key value NX PX timeout : Set the value of the key only when the key does not exist
    -- Set success will automatically return "OK", set failure returns "NULL Bulk Reply"
    -- Why add "NX" here, because we need to prevent the lock from being overwritten by others
    return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])
end
  • Процесс разблокировки

Основной код

package redis
import (
    "math/rand"
    "strconv"
    "sync/atomic"
    "time"
    red "github.com/go-redis/redis"
    "github.com/tal-tech/go-zero/core/logx"
)
const (
    letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
    lockCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
    redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])
    return "OK"
else
    return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])
end`
    delCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then
    return redis.call("DEL", KEYS[1])
else
    return 0
end`
    randomLen = 16
    // default timeout to prevent deadlocks
    tolerance = 500 // milliseconds
    millisPerSecond = 1000
)
// A RedisLock is a redis lock.
type RedisLock struct {
    // redis client
    store *Redis
    // Timeout time
    seconds uint32
    // lock key
    key string
    // Lock the value to prevent the lock from being accessed by others
    id string
id string }
func init() {
    rand.Seed(time.Now().UnixNano())
}
// NewRedisLock returns a RedisLock.
func NewRedisLock(store *Redis, key string) *RedisLock {
    return &RedisLock{
        store: store,
        key: key,
        // When getting a lock, the value of the lock is generated by a random string
        // Actually go-zero provides a more efficient way to generate random strings
        // See core/stringx/random.go: Randn
        id: randomStr(randomLen),
    }
}
// Acquire acquires the lock.
func (rl *RedisLock) Acquire() (bool, error) {
    // Acquire the expiration time
    seconds := atomic.LoadUint32(&rl.seconds)
    // Default lock expiration time is 500ms to prevent deadlocks
    resp, err := rl.store.Eval(lockCommand, []string{rl.key}, []string{
        rl.id, strconv.Itoa(int(seconds)*millisPerSecond + tolerance),
    })
    if err == red.Nil {
        return false, nil
    } else if err ! = nil {
        logx.Errorf("Error on acquiring lock for %s, %s", rl.key, err.Error())
        return false, err
    } else if resp == nil {
        return false, nil
    }
    reply, ok := resp.(string)
    if ok && reply == "OK" {
        return true, nil
    }
    logx.Errorf("Unknown reply when acquiring lock for %s: %v", rl.key, resp)
    return false, nil
}
// Release releases the lock.
func (rl *RedisLock) Release() (bool, error) {
    resp, err := rl.store.Eval(delCommand, []string{rl.key}, []string{rl.id})
    if err ! = nil {
        return false, err
    }
    reply, ok := resp.(int64)
    if !ok {
        return false, nil
    }
    return reply == 1, nil
}
// SetExpire sets the expire.
// Note that it needs to be called before Acquire()
// otherwise the default is 500ms auto-release
func (rl *RedisLock) SetExpire(seconds int) {
    atomic.StoreUint32(&rl.seconds, uint32(seconds))
}
func randomStr(n int) string {
    b := make([]byte, n)
    for i := range b {
        b[i] = letters[rand.Intn(len(letters))]
    }
    return string(b)
}

Ответить