分布式锁

2023/2/14

# MySQL 分布式锁

# 基于(insert)唯一索引

关键点:

  1. 加锁的超时时间
  2. 业务出错,锁的释放是通过事务回滚
  3. 锁的去重
  4. 锁的排序,减少死锁的概率
  5. 锁的可重入,通过 thread_local 实现

# 总体实现

@Override
@Transactional
public <T> T lock(ERPLock lock, ILockCallback<T> callback) {
    return lock(
        // 锁的key
        lock, 
        // 业务方法
        callback, 
        // 等待锁超时时间,默认50s
        lockConfig.getLockTimeout(), 
        // 时间单位:秒
        ELockConstant.DEFAULT_TIMEUNIT);
}

----- 调用下面的-----

@Override
@Transactional
public <T> T locks(List<ERPLock> locks, ILockCallback<T> callback, 
                   Long timeout, TimeUnit unit) {
    int dbKey = locks.get(0).getDbKey();
    // 对锁进行去重
    List<ERPLock> addLocks = getAddLocks(getUniqKey(locks));
    try {
        // 对锁进行排序,减少死锁
        if (addLocks.size() > 1) {
            Collections.sort(addLocks);
        }
        // 加锁 + 执行业务 + 释放锁
        return lockProxyBuiness.lock(dbKey, addLocks, callback, timeout, unit);
    } catch (LockException e) {//说明加锁失败
        logger.error("加锁失败", e);
        throw e;
    } catch (RuntimeException e) {//说明业务有错
        callback.rollback(e);
        logger.error("操作失败", e);
        throw e;//异常直接抛出
    } catch (Exception e) {//说明业务有错
        callback.rollback(e);
        logger.error("操作失败", e);
        throw new LockException(e);//封装一遍异常
    } finally {
        // 这里不是删除erp_lock,是删除thread_local中的
        // 这里有个注意的点,唯一码finally中释放锁没有删除数据库中的记录,而是在执行完业务操作之后删除,因为有事务,执行业务出错之后,事务回滚,锁也会回滚掉
        unlocks(locks);
    }
}

@Transactional
@Override
public <T> T lock(@RouterKey int dbno, List<ERPLock> locks, ILockCallback<T> callback, 
                  Long timeout, TimeUnit unit) {
    long start = System.currentTimeMillis();
    long took;
    int lockSize = locks == null ? 0 : locks.size();
    if (lockSize > 0) {
        if (timeout == null || timeout <= 0 
            || timeout >= ELockConstant.DEFAULT_TIMEOUT) {
            lock(locks);
        } else {
            // 加锁有超时时间
            lock(locks, timeout);
        }
        took = System.currentTimeMillis() - start;
        if (took >= 3000) {
            logger.warn(String.format("加锁耗时过长,locks.size=%s,
                                      took:%s ms", lockSize, took));
        }
    }
    start = System.currentTimeMillis();
    // 执行业务代码                                
    T result = callback.callback();
    took = System.currentTimeMillis() - start;
    if (took >= 3000) {
        logger.warn(String.format("加锁后业务执行时间过长,
                                  locks.size=%s,took:%s ms", lockSize, took));
    }

    try {
        start = System.currentTimeMillis();
        // 释放锁
        lockDao.batchDelete(locks);
        took = System.currentTimeMillis() - start;
        if (took >= 3000) {
            logger.warn(String.format("加锁后释放锁时间过长,locks.size=%s,
                                      took:%s ms", lockSize, took));
        }
    } catch (Exception e) {
        logger.error("mysql释放锁失败locks=" + locks, e);
    }
    return result;
}

# 获取与释放锁

# com.raycloud.dmj.services.inner.LockMysqlService#lock
private void lock(List<ERPLock> locks, Long timeout) {
    Integer dbNo = locks.get(0).getDbKey();
    try {
    	// 设置当前会话的innodb_lock_wait_timeout
        lockDao.setSessionLockFf(dbNo, timeout);
        // 插入一条记录到erp_lock,key是唯一索引
        lockDao.batchInsert(locks);
    } catch (Exception e) {
        logger.error("mysql加锁失败locks=" + locks, e);
        //异常一定要抛出
        throw new LockException(e.getMessage());
    } finally {
        lockDao.setSessionDefaultLockWaitTimeout(dbNo);
    }
}

// 释放锁,删除记录
lockDao.batchDelete(locks);

# 存在的问题

  • 锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作(搞一个while循环,直到insert成功再返回成功)
  • 锁是不可重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了
  • 并发量大的时候请求量大,获取锁的间隔如果较小会给系统和数据库造成压力
  • 锁和数据库在一起,要挂就一起都挂了

# 基于排他锁(for update)

CREATE TABLE `methodLock` (
    `id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '主键',
    `method_name` VARCHAR ( 64 ) NOT NULL DEFAULT '' COMMENT '锁定的方法名',
    `desc` VARCHAR ( 1024 ) NOT NULL DEFAULT '备注信息',
    `update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP 
    	ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',
    PRIMARY KEY ( `id` ),
    UNIQUE KEY `uidx_method_name` ( `method_name ` ) USING BTREE 
) ENGINE = INNODB DEFAULT CHARSET = utf8 COMMENT = '锁定中的方法';
/**
 * 加锁
 */
public boolean lock() {
    // 开启事务
    connection.setAutoCommit(false);
    // 循环阻塞,等待获取锁
    while (true) {
        // 执行获取锁的sql
        result = select * from methodLock where method_name = xxx for update;
        // 结果非空,加锁成功
        if (result != null) {
            return true;
        }
    }
    // 加锁失败
    return false;
}

/**
 * 解锁
 */
public void unlock() {
    // 提交事务,解锁
    connection.commit();
}

例子二:

//0.开始事务
begin;/begin work;/start transaction; (三者选一就可以)
//1.查询出商品信息
select status from t_goods where id=1 for update;
//2.根据商品信息生成订单
insert into t_orders (id,goods_id) values (null,1);
//3.修改商品status为2
update t_goods set status=2;
//4.提交事务
commit;/commit work;

问题和第一种方式类似

# 基于乐观锁(版本号)

适用于并发比较小

1.查询出商品信息
select (status,status,version) from t_goods where id=#{id}
2.根据商品信息生成订单
3.修改商品status为2
update t_goods 
set status=2,version=version+1
where id=#{id} and version=#{version};

# Redis 分布式锁

# 加锁

set NX PX + 重试 + 重试间隔

向 Redis 发起如下命令: SET productId:lock 0xx9p03001 NX PX 30000 其中,"productId"由自己定义,可以是与本次业务有关的id,"0xx9p03001"是一串随机值,必须保证全局唯一,“NX"指的是当且仅当key(也就是案例中的"productId:lock”)在Redis中不存在时,返回执行成功,否则执行失败。"PX 30000"指的是在30秒后,key将被自动删除(防止客户端挂掉,锁还在的情况)。执行命令后返回成功,表明服务成功的获得了锁。

@Override
public boolean lock(String key, long expire, int retryTimes, long retryDuration) {
    // use JedisCommands instead of setIfAbsense
    boolean result = setRedis(key, expire);

    // retry if needed
    while ((!result) && retryTimes-- > 0) {
        try {
            log.debug("lock failed, retrying..." + retryTimes);
            Thread.sleep(retryDuration);
        } catch (Exception e) {
            return false;
        }

        // use JedisCommands instead of setIfAbsense
        result = setRedis(key, expire);
    }
    return result;
}

private boolean setRedis(String key, long expire) {
    try {
        RedisCallback<String> redisCallback = connection -> {
            JedisCommands commands = (JedisCommands) connection.getNativeConnection();
            String uuid = SnowIDUtil.uniqueStr();
            lockFlag.set(uuid);
            return commands.set(key, uuid, NX, PX, expire); // 看这里
        };
        String result = redisTemplate.execute(redisCallback);
        return !StringUtil.isEmpty(result);
    } catch (Exception e) {
        log.error("set redis occurred an exception", e);
    }
    return false;
}

# 解锁

解锁:采用lua脚本,保证操作的原子性

在删除key之前,一定要判断服务A持有的value与Redis内存储的value是否一致。如果贸然使用服务A持有的key来删除锁,则会误将服务B的锁释放掉。

在加锁的时候 key 是具体业务,例如 key = 波次 Id; value 是全局唯一 Id

在解锁时,需要判断 value 是否一致

if redis.call("get", KEYS[1])==ARGV[1] then
	return redis.call("del", KEYS[1])
else
	return 0
end

# 问题1:误解除锁

如果线程A成功获取到了锁,并且设置了过期时间30秒,但线程A执行时间超过了30秒,锁过期自动释放,此时线程B获取到了锁;随后A执行完成,线程A使用DEL命令来释放锁,但此时线程B加的锁还没有执行完成,线程 A 实际释放的线程B加的锁:

解决方案:通过在value中设置当前线程加锁的标识(全局唯一Id),在删除之前验证key对应的value判断锁是否是当前线程持;验证key,然后再删除是两步不具有原子性,删除要使用lua脚本

# 问题2:超时解锁导致并发

如果线程A成功获取锁并设置过期时间30秒,但线程A执行时间超过了30秒,锁过期自动释放,此时线程B获取到了锁,线程A和线程B并发执行,解决方案:

  1. 将过期时间设置足够长,确保代码逻辑在锁释放之前能够执行完成
  2. 在获取到锁之后,另外开一个定时任务,在过期时间的1/3重新设置超时时间(Redisson使用)

# 问题3:锁重入问题

可通过对锁进行重入计数,加锁时加1,解锁时减1,当计数归0时释放锁;这就需要使用lua脚本了

// 如果 lock_key 不存在
if (redis.call('exists', KEYS[1]) == 0)
then
    // 设置 lock_key 线程标识 1 进行加锁
    redis.call('hset', KEYS[1], ARGV[2], 1);
    // 设置过期时间
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
    end;
// 如果 lock_key 存在且线程标识是当前欲加锁的线程标识
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
    // 自增
    then redis.call('hincrby', KEYS[1], ARGV[2], 1);
    // 重置过期时间
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
    end;
// 如果加锁失败,返回锁剩余时间
return redis.call('pttl', KEYS[1]);

// 这里使用hash是因为要存储key,线程唯一标志,加锁次数

# 问题4:获取锁失败重试

  1. 客户端轮询,当未获取到锁时,等待一段时间重新获取锁,直到成功获取锁或等待超时。这种方式比较消耗服务器资源,当并发量比较大时,会影响服务器的效率
  2. 另一种方式是使用 Redis 的发布订阅功能,当获取锁失败时,订阅锁释放消息

# 集群问题1:主从切换问题

当客户端A成功在master节点上加锁,指令还未同步到slave节点,此时主节点挂掉,从节点提升为主节点,新的主节点没有锁的数据,客户端B加锁时就会成功

image.png

# 集群问题2:集群脑裂问题

集群脑裂指因为网络问题,导致Redis 某个master节点跟slave节点和sentinel集群处于不同的网络分区,因为sentinel集群无法感知到master的存在,所以将slave节点提升为master节点,此时存在两个不同的master节点;当不同的客户端连接不同的 master 节点时,两个客户端可以同时拥有同一把锁: image.png

# 集群问题解决方案

  • 使用 zookeeper 分布式锁
  • redlock(RedLock算法本身有争议)
  • redis 解决脑裂问题,增加如下配置:
# 连接到master的最少slave数
# 配置超过半数就可以了
# 例如:master + slave一共有6台,这个参数配置成3
min-slaves-to-write 3

# zookeeper 分布式锁

# 利用的zookeeper特性

  • 在同一时刻,不能有多个客户端创建同一个节点,顺序一致性
  • 临时节点只在session生命周期存在,session一结束会自动销毁
  • watcher机制,在代表锁资源的节点被删除,即可以触发watcher解除阻塞重新去获取锁

# 基于临时节点方案

# 原理与流程

实现较为简单,逻辑就是谁创建成功该节点,谁就持有锁,创建失败的自己进行阻塞,A线程先持有锁,B线程获取失败就会阻塞,同时对/lockPath设置监听,A线程执行完操作后删除节点,触发监听器,B线程此时解除阻塞,重新去获取锁 image.png

# 代码实现

public interface Lock {
    /**
     * 获取锁
     */
    void getLock() throws Exception;

    /**
     * 释放锁
     */
    void unlock() throws Exception;
}
public abstract class AbstractTemplateLock implements Lock {
    @Override
    public void getLock() {
        if (tryLock()) {
            System.out.println(Thread.currentThread().getName() + "获取锁成功");
        } else {
            //等待
            waitLock();//事件监听 如果节点被删除则可以重新获取
            //重新获取
            getLock();
        }
    }
    protected abstract void waitLock();
    protected abstract boolean tryLock();
    protected abstract void releaseLock();
    @Override
    public void unlock() {
        releaseLock();
    }
}
public class ZkTemplateLock extends AbstractTemplateLock {
    private static final String zkServers = "127.0.0.1:2181";
    private static final int sessionTimeout = 8000;
    private static final int connectionTimeout = 5000;

    private static final String lockPath = "/lockPath";


    private ZkClient client;

    public ZkTemplateLock() {
        client = new ZkClient(zkServers, sessionTimeout, connectionTimeout);
        log.info("zk client 连接成功:{}",zkServers);
    }

    @Override
    protected void waitLock() {
        CountDownLatch latch = new CountDownLatch(1);

        IZkDataListener listener = new IZkDataListener() {
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("监听到节点被删除");
                latch.countDown();
            }
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {}
        };
        //完成 watcher 注册
        client.subscribeDataChanges(lockPath, listener);

        //阻塞自己
        if (client.exists(lockPath)) {
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //取消watcher注册
        client.unsubscribeDataChanges(lockPath, listener);
    }

    @Override
    protected boolean tryLock() {
        try {
            client.createEphemeral(lockPath);
            System.out.println(Thread.currentThread().getName()+"获取到锁");
        } catch (Exception e) {
            log.error("创建失败");
            return false;
        }
        return true;
    }

    @Override
    public void releaseLock() {
       client.delete(this.lockPath);
    }
}

# 羊群效应(惊群效应)

每次去竞争锁,都只会有一个线程拿到锁,当线程数庞大时会发生“羊群效应”,zookeeper节点可能会运行缓慢甚至宕机。这是因为其他线程没获取到锁时都会监听/lock节点,当A线程释放完毕,海量的线程都同时停止阻塞,去争抢锁,这种操作十分耗费资源,且性能大打折扣

# 基于临时顺序节点方案

# 原理与流程

临时顺序节点与临时节点不同的是产生的节点是有序的,我们可以利用这一特点,只让当前线程监听上一序号的线程,每次获取锁的时候判断自己的序号是否为最小,最小即获取到锁,执行完毕就删除当前节点 image.png

# 代码实现

public class ZkSequenTemplateLock extends AbstractTemplateLock {
    private static final String zkServers = "127.0.0.1:2181";
    private static final int sessionTimeout = 8000;
    private static final int connectionTimeout = 5000;
    private static final String lockPath = "/lockPath";
    private String beforePath;
    private String currentPath;
    private ZkClient client;

    public ZkSequenTemplateLock() {
        client = new ZkClient(zkServers);
        if (!client.exists(lockPath)) {
            client.createPersistent(lockPath);

        }
        log.info("zk client 连接成功:{}",zkServers);

    }

    @Override
    protected void waitLock() {
        CountDownLatch latch = new CountDownLatch(1);
        IZkDataListener listener = new IZkDataListener() {
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("监听到节点被删除");
                latch.countDown();
            }
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {}
        };
        //给排在前面的节点增加数据删除的watcher,本质是启动另一个线程去监听上一个节点
        client.subscribeDataChanges(beforePath, listener);

        //阻塞自己
        if (client.exists(beforePath)) {
            try {
                System.out.println("阻塞"+currentPath);
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //取消watcher注册
        client.unsubscribeDataChanges(beforePath, listener);
    }

    @Override
    protected boolean tryLock() {
        if (currentPath == null) {
            //创建一个临时顺序节点
            currentPath = client.createEphemeralSequential(lockPath + "/", "lock-data");
            System.out.println("current:" + currentPath);
        }

        //获得所有的子节点并排序。临时节点名称为自增长的字符串
        List<String> childrens = client.getChildren(lockPath);
        //排序list,按自然顺序排序
        Collections.sort(childrens);
        if (currentPath.equals(lockPath + "/" + childrens.get(0))) {
            return true;
        } else {
            //如果当前节点不是排第一,则获取前面一个节点信息,赋值给beforePath
            int curIndex = childrens.indexOf(currentPath.substring(lockPath.length() + 1));
            beforePath = lockPath + "/" + childrens.get(curIndex - 1);
        }
        System.out.println("beforePath"+beforePath);
        return false;
    }

    @Override
    public void releaseLock() {
        System.out.println("delete:" + currentPath);
        client.delete(currentPath);
    }
}

# 与Redis分布式锁的比较

image.png