基于Redis的分布式锁初体验

  |   1 评论   |   864 浏览

背景

项目中只使用了一个外部持久化组件 redis。

目前需要实现全局的并发数控制,因此调研了一下怎么使用 redis 来实现。

还好已经有了成熟的解决方案,于是直接拿来用用。

本文最终选择了 redisson 的 RPermitExpirableSemaphore 来实现。

client选取

redis 官方推荐的 java 客户端 有 jedis,lettuce和 redisson。

其中 jedis对应于 redis 的原生方法,lettuce在 jedis 的基础上,优化的redis的连接池。

而 redisson 在 lettuce 的基础上,又封装了很多非常实用的数据结构。

初体验

话不多说,还是上一个例子吧。

pom依赖

  <dependencies>
    <dependency>
      <groupId>org.redisson</groupId>
      <artifactId>redisson</artifactId>
      <version>3.10.0</version>
    </dependency>
  </dependencies>

使用

import java.util.concurrent.TimeUnit;

import org.redisson.Redisson;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RedissonClient;

public class MainRedisClient {
  public static void main(String[] args) throws InterruptedException {

    RedissonClient redisson = Redisson.create();

    // set permits;
    RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("note.abeffect");
    System.out.println(semaphore.trySetPermits(5)); // true; 如果已经存在了,那就是false. 

    // available permits
    System.out.println(semaphore.availablePermits()); // 5

    // add permits
    semaphore.addPermits(1);
    System.out.println(semaphore.availablePermits()); // 6

    // acquire
    String permitId = semaphore.tryAcquire(120, 120, TimeUnit.SECONDS);
    System.out.println(semaphore.availablePermits()); // 5

    String permitId2 = semaphore.tryAcquire(120, 120, TimeUnit.SECONDS);
    System.out.println(semaphore.availablePermits()); // 4
	
	// print used permits count
	String timeoutName = RedissonObject.suffixName(semaphore.getName(), "timeout");
    System.out.println(redisson.getScoredSortedSet(timeoutName).size());

    // release
    semaphore.release(permitId);
    System.out.println(semaphore.availablePermits()); // 5

    semaphore.release(permitId2);
    System.out.println(semaphore.availablePermits()); // 6

    // shutdown
    redisson.shutdown();
  }
}

实现

观察下redis中的值的变化。

当前可用值存储在 keystring 中;
已经在用的值,存储有 {key}:timeoutzset

初始时

127.0.0.1:6379> keys *
(empty list or set)

初始化后

127.0.0.1:6379> keys *
1) "note.abeffect"

127.0.0.1:6379> type note.abeffect
string

127.0.0.1:6379> get note.abeffect
"5"

add后

127.0.0.1:6379> get note.abeffect
"6"

acquire后

127.0.0.1:6379> keys *
1) "note.abeffect"
2) "{note.abeffect}:timeout"

127.0.0.1:6379> type "{note.abeffect}:timeout"
zset

127.0.0.1:6379> zrange "{note.abeffect}:timeout" 0 100 withscores
1) "7536b157d06591398faa711b1861e4a7"
2) "1547109745958"
3) "5e2ff2b25e7cc90c296540f66ef5318b"
4) "1547109747182"

127.0.0.1:6379> get note.abeffect
"4"

其中的时间戳是过期时间。

释放后

127.0.0.1:6379> keys *
1) "note.abeffect"

127.0.0.1:6379> get note.abeffect
"6"

原理

跟踪代码后,得到:

tryAcquireAsync

首先删除过期的 permits,然后如果有可用的 permit,则返回对应的 permitId。

tryAcquireAsync中执行了lua脚本,入口参数示例为:

keys: [note.abeffect, {note.abeffect}:timeout, redisson_sc:{note.abeffect}]

params: [1, 1550938468044, 194142cd142da1b1c54b0ef6781eee20, 1550926513741, 922337203685477]

lua脚本如下:

-- 查找过期的id,最多找 permits 个 --
local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[4], 'limit', 0, ARGV[1]); 

-- 如果找到了,则在zset中删除掉过期的项,同时更新 permitName 的值 --
if #expiredIds > 0 
then 
  redis.call('zrem', KEYS[2], unpack(expiredIds)); 
  local value = redis.call('incrby', KEYS[1], #expiredIds); 

  if tonumber(value) > 0 
  then 
    redis.call('publish', KEYS[3], value); 
  end;
end; 

-- 看到可用的信号量有多少个,有的话,直接返回 permitId --
local value = redis.call('get', KEYS[1]); 
if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) 
then 
  redis.call('decrby', KEYS[1], ARGV[1]); 
  redis.call('zadd', KEYS[2], ARGV[2], ARGV[3]); 
  return ARGV[3]; 
end; 

-- 如果没有可用的信号量,则如果 zset 不为空,则返回 zset 的第一项 --
local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); 
if v[1] ~= nil and v[2] ~= ARGV[5] 
then 
  return ':' .. tostring(v[2]); 
end 

-- 最后返回 null -- 
return nil;

tryAcquire

如果没有 permit 了,则等待 redis 的 publish 事件。

参考

评论

发表评论

validate