# MySQL 分布式锁
# 基于(insert)唯一索引
关键点:
- 加锁的超时时间
- 业务出错,锁的释放是通过事务回滚
- 锁的去重
- 锁的排序,减少死锁的概率
- 锁的可重入,通过 thread_local 实现
# 总体实现
@Override
@Transactional
public <T> T lock(ERPLock lock, ILockCallback<T> callback) {
return lock(
// 锁的key
lock,
// 业务方法
callback,
// 等待锁超时时间,默认50s
lockConfig.getLockTimeout(),
// 时间单位:秒
ELockConstant.DEFAULT_TIMEUNIT);
}
----- 调用下面的-----
@Override
@Transactional
public <T> T locks(List<ERPLock> locks, ILockCallback<T> callback,
Long timeout, TimeUnit unit) {
int dbKey = locks.get(0).getDbKey();
// 对锁进行去重
List<ERPLock> addLocks = getAddLocks(getUniqKey(locks));
try {
// 对锁进行排序,减少死锁
if (addLocks.size() > 1) {
Collections.sort(addLocks);
}
// 加锁 + 执行业务 + 释放锁
return lockProxyBuiness.lock(dbKey, addLocks, callback, timeout, unit);
} catch (LockException e) {//说明加锁失败
logger.error("加锁失败", e);
throw e;
} catch (RuntimeException e) {//说明业务有错
callback.rollback(e);
logger.error("操作失败", e);
throw e;//异常直接抛出
} catch (Exception e) {//说明业务有错
callback.rollback(e);
logger.error("操作失败", e);
throw new LockException(e);//封装一遍异常
} finally {
// 这里不是删除erp_lock,是删除thread_local中的
// 这里有个注意的点,唯一码finally中释放锁没有删除数据库中的记录,而是在执行完业务操作之后删除,因为有事务,执行业务出错之后,事务回滚,锁也会回滚掉
unlocks(locks);
}
}
@Transactional
@Override
public <T> T lock(@RouterKey int dbno, List<ERPLock> locks, ILockCallback<T> callback,
Long timeout, TimeUnit unit) {
long start = System.currentTimeMillis();
long took;
int lockSize = locks == null ? 0 : locks.size();
if (lockSize > 0) {
if (timeout == null || timeout <= 0
|| timeout >= ELockConstant.DEFAULT_TIMEOUT) {
lock(locks);
} else {
// 加锁有超时时间
lock(locks, timeout);
}
took = System.currentTimeMillis() - start;
if (took >= 3000) {
logger.warn(String.format("加锁耗时过长,locks.size=%s,
took:%s ms", lockSize, took));
}
}
start = System.currentTimeMillis();
// 执行业务代码
T result = callback.callback();
took = System.currentTimeMillis() - start;
if (took >= 3000) {
logger.warn(String.format("加锁后业务执行时间过长,
locks.size=%s,took:%s ms", lockSize, took));
}
try {
start = System.currentTimeMillis();
// 释放锁
lockDao.batchDelete(locks);
took = System.currentTimeMillis() - start;
if (took >= 3000) {
logger.warn(String.format("加锁后释放锁时间过长,locks.size=%s,
took:%s ms", lockSize, took));
}
} catch (Exception e) {
logger.error("mysql释放锁失败locks=" + locks, e);
}
return result;
}
# 获取与释放锁
# com.raycloud.dmj.services.inner.LockMysqlService#lock
private void lock(List<ERPLock> locks, Long timeout) {
Integer dbNo = locks.get(0).getDbKey();
try {
// 设置当前会话的innodb_lock_wait_timeout
lockDao.setSessionLockFf(dbNo, timeout);
// 插入一条记录到erp_lock,key是唯一索引
lockDao.batchInsert(locks);
} catch (Exception e) {
logger.error("mysql加锁失败locks=" + locks, e);
//异常一定要抛出
throw new LockException(e.getMessage());
} finally {
lockDao.setSessionDefaultLockWaitTimeout(dbNo);
}
}
// 释放锁,删除记录
lockDao.batchDelete(locks);
# 存在的问题
- 锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作(搞一个while循环,直到insert成功再返回成功)
- 锁是不可重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了
- 并发量大的时候请求量大,获取锁的间隔如果较小会给系统和数据库造成压力
- 锁和数据库在一起,要挂就一起都挂了
# 基于排他锁(for update)
CREATE TABLE `methodLock` (
`id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '主键',
`method_name` VARCHAR ( 64 ) NOT NULL DEFAULT '' COMMENT '锁定的方法名',
`desc` VARCHAR ( 1024 ) NOT NULL DEFAULT '备注信息',
`update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',
PRIMARY KEY ( `id` ),
UNIQUE KEY `uidx_method_name` ( `method_name ` ) USING BTREE
) ENGINE = INNODB DEFAULT CHARSET = utf8 COMMENT = '锁定中的方法';
/**
* 加锁
*/
public boolean lock() {
// 开启事务
connection.setAutoCommit(false);
// 循环阻塞,等待获取锁
while (true) {
// 执行获取锁的sql
result = select * from methodLock where method_name = xxx for update;
// 结果非空,加锁成功
if (result != null) {
return true;
}
}
// 加锁失败
return false;
}
/**
* 解锁
*/
public void unlock() {
// 提交事务,解锁
connection.commit();
}
例子二:
//0.开始事务
begin;/begin work;/start transaction; (三者选一就可以)
//1.查询出商品信息
select status from t_goods where id=1 for update;
//2.根据商品信息生成订单
insert into t_orders (id,goods_id) values (null,1);
//3.修改商品status为2
update t_goods set status=2;
//4.提交事务
commit;/commit work;
问题和第一种方式类似
# 基于乐观锁(版本号)
适用于并发比较小
1.查询出商品信息
select (status,status,version) from t_goods where id=#{id}
2.根据商品信息生成订单
3.修改商品status为2
update t_goods
set status=2,version=version+1
where id=#{id} and version=#{version};
# Redis 分布式锁
# 加锁
set NX PX + 重试 + 重试间隔
向 Redis 发起如下命令: SET productId:lock 0xx9p03001 NX PX 30000 其中,"productId"由自己定义,可以是与本次业务有关的id,"0xx9p03001"是一串随机值,必须保证全局唯一,“NX"指的是当且仅当key(也就是案例中的"productId:lock”)在Redis中不存在时,返回执行成功,否则执行失败。"PX 30000"指的是在30秒后,key将被自动删除(防止客户端挂掉,锁还在的情况)。执行命令后返回成功,表明服务成功的获得了锁。
@Override
public boolean lock(String key, long expire, int retryTimes, long retryDuration) {
// use JedisCommands instead of setIfAbsense
boolean result = setRedis(key, expire);
// retry if needed
while ((!result) && retryTimes-- > 0) {
try {
log.debug("lock failed, retrying..." + retryTimes);
Thread.sleep(retryDuration);
} catch (Exception e) {
return false;
}
// use JedisCommands instead of setIfAbsense
result = setRedis(key, expire);
}
return result;
}
private boolean setRedis(String key, long expire) {
try {
RedisCallback<String> redisCallback = connection -> {
JedisCommands commands = (JedisCommands) connection.getNativeConnection();
String uuid = SnowIDUtil.uniqueStr();
lockFlag.set(uuid);
return commands.set(key, uuid, NX, PX, expire); // 看这里
};
String result = redisTemplate.execute(redisCallback);
return !StringUtil.isEmpty(result);
} catch (Exception e) {
log.error("set redis occurred an exception", e);
}
return false;
}
# 解锁
解锁:采用lua脚本,保证操作的原子性
在删除key之前,一定要判断服务A持有的value与Redis内存储的value是否一致。如果贸然使用服务A持有的key来删除锁,则会误将服务B的锁释放掉。
在加锁的时候 key 是具体业务,例如 key = 波次 Id; value 是全局唯一 Id
在解锁时,需要判断 value 是否一致
if redis.call("get", KEYS[1])==ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
# 问题1:误解除锁
如果线程A成功获取到了锁,并且设置了过期时间30秒,但线程A执行时间超过了30秒,锁过期自动释放,此时线程B获取到了锁;随后A执行完成,线程A使用DEL命令来释放锁,但此时线程B加的锁还没有执行完成,线程 A 实际释放的线程B加的锁:

解决方案:通过在value中设置当前线程加锁的标识(全局唯一Id),在删除之前验证key对应的value判断锁是否是当前线程持;验证key,然后再删除是两步不具有原子性,删除要使用lua脚本
# 问题2:超时解锁导致并发
如果线程A成功获取锁并设置过期时间30秒,但线程A执行时间超过了30秒,锁过期自动释放,此时线程B获取到了锁,线程A和线程B并发执行,解决方案:
- 将过期时间设置足够长,确保代码逻辑在锁释放之前能够执行完成
- 在获取到锁之后,另外开一个定时任务,在过期时间的1/3重新设置超时时间(Redisson使用)
# 问题3:锁重入问题
可通过对锁进行重入计数,加锁时加1,解锁时减1,当计数归0时释放锁;这就需要使用lua脚本了
// 如果 lock_key 不存在
if (redis.call('exists', KEYS[1]) == 0)
then
// 设置 lock_key 线程标识 1 进行加锁
redis.call('hset', KEYS[1], ARGV[2], 1);
// 设置过期时间
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 如果 lock_key 存在且线程标识是当前欲加锁的线程标识
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
// 自增
then redis.call('hincrby', KEYS[1], ARGV[2], 1);
// 重置过期时间
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 如果加锁失败,返回锁剩余时间
return redis.call('pttl', KEYS[1]);
// 这里使用hash是因为要存储key,线程唯一标志,加锁次数
# 问题4:获取锁失败重试
- 客户端轮询,当未获取到锁时,等待一段时间重新获取锁,直到成功获取锁或等待超时。这种方式比较消耗服务器资源,当并发量比较大时,会影响服务器的效率
- 另一种方式是使用 Redis 的发布订阅功能,当获取锁失败时,订阅锁释放消息

# 集群问题1:主从切换问题
当客户端A成功在master节点上加锁,指令还未同步到slave节点,此时主节点挂掉,从节点提升为主节点,新的主节点没有锁的数据,客户端B加锁时就会成功
# 集群问题2:集群脑裂问题
集群脑裂指因为网络问题,导致Redis 某个master节点跟slave节点和sentinel集群处于不同的网络分区,因为sentinel集群无法感知到master的存在,所以将slave节点提升为master节点,此时存在两个不同的master节点;当不同的客户端连接不同的 master 节点时,两个客户端可以同时拥有同一把锁:
# 集群问题解决方案
- 使用 zookeeper 分布式锁
- redlock(RedLock算法本身有争议)
- redis 解决脑裂问题,增加如下配置:
# 连接到master的最少slave数
# 配置超过半数就可以了
# 例如:master + slave一共有6台,这个参数配置成3
min-slaves-to-write 3
# zookeeper 分布式锁
# 利用的zookeeper特性
- 在同一时刻,不能有多个客户端创建同一个节点,顺序一致性
- 临时节点只在session生命周期存在,session一结束会自动销毁
- watcher机制,在代表锁资源的节点被删除,即可以触发watcher解除阻塞重新去获取锁
# 基于临时节点方案
# 原理与流程
实现较为简单,逻辑就是谁创建成功该节点,谁就持有锁,创建失败的自己进行阻塞,A线程先持有锁,B线程获取失败就会阻塞,同时对/lockPath设置监听,A线程执行完操作后删除节点,触发监听器,B线程此时解除阻塞,重新去获取锁
# 代码实现
public interface Lock {
/**
* 获取锁
*/
void getLock() throws Exception;
/**
* 释放锁
*/
void unlock() throws Exception;
}
public abstract class AbstractTemplateLock implements Lock {
@Override
public void getLock() {
if (tryLock()) {
System.out.println(Thread.currentThread().getName() + "获取锁成功");
} else {
//等待
waitLock();//事件监听 如果节点被删除则可以重新获取
//重新获取
getLock();
}
}
protected abstract void waitLock();
protected abstract boolean tryLock();
protected abstract void releaseLock();
@Override
public void unlock() {
releaseLock();
}
}
public class ZkTemplateLock extends AbstractTemplateLock {
private static final String zkServers = "127.0.0.1:2181";
private static final int sessionTimeout = 8000;
private static final int connectionTimeout = 5000;
private static final String lockPath = "/lockPath";
private ZkClient client;
public ZkTemplateLock() {
client = new ZkClient(zkServers, sessionTimeout, connectionTimeout);
log.info("zk client 连接成功:{}",zkServers);
}
@Override
protected void waitLock() {
CountDownLatch latch = new CountDownLatch(1);
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("监听到节点被删除");
latch.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {}
};
//完成 watcher 注册
client.subscribeDataChanges(lockPath, listener);
//阻塞自己
if (client.exists(lockPath)) {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//取消watcher注册
client.unsubscribeDataChanges(lockPath, listener);
}
@Override
protected boolean tryLock() {
try {
client.createEphemeral(lockPath);
System.out.println(Thread.currentThread().getName()+"获取到锁");
} catch (Exception e) {
log.error("创建失败");
return false;
}
return true;
}
@Override
public void releaseLock() {
client.delete(this.lockPath);
}
}
# 羊群效应(惊群效应)
每次去竞争锁,都只会有一个线程拿到锁,当线程数庞大时会发生“羊群效应”,zookeeper节点可能会运行缓慢甚至宕机。这是因为其他线程没获取到锁时都会监听/lock节点,当A线程释放完毕,海量的线程都同时停止阻塞,去争抢锁,这种操作十分耗费资源,且性能大打折扣
# 基于临时顺序节点方案
# 原理与流程
临时顺序节点与临时节点不同的是产生的节点是有序的,我们可以利用这一特点,只让当前线程监听上一序号的线程,每次获取锁的时候判断自己的序号是否为最小,最小即获取到锁,执行完毕就删除当前节点
# 代码实现
public class ZkSequenTemplateLock extends AbstractTemplateLock {
private static final String zkServers = "127.0.0.1:2181";
private static final int sessionTimeout = 8000;
private static final int connectionTimeout = 5000;
private static final String lockPath = "/lockPath";
private String beforePath;
private String currentPath;
private ZkClient client;
public ZkSequenTemplateLock() {
client = new ZkClient(zkServers);
if (!client.exists(lockPath)) {
client.createPersistent(lockPath);
}
log.info("zk client 连接成功:{}",zkServers);
}
@Override
protected void waitLock() {
CountDownLatch latch = new CountDownLatch(1);
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("监听到节点被删除");
latch.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {}
};
//给排在前面的节点增加数据删除的watcher,本质是启动另一个线程去监听上一个节点
client.subscribeDataChanges(beforePath, listener);
//阻塞自己
if (client.exists(beforePath)) {
try {
System.out.println("阻塞"+currentPath);
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//取消watcher注册
client.unsubscribeDataChanges(beforePath, listener);
}
@Override
protected boolean tryLock() {
if (currentPath == null) {
//创建一个临时顺序节点
currentPath = client.createEphemeralSequential(lockPath + "/", "lock-data");
System.out.println("current:" + currentPath);
}
//获得所有的子节点并排序。临时节点名称为自增长的字符串
List<String> childrens = client.getChildren(lockPath);
//排序list,按自然顺序排序
Collections.sort(childrens);
if (currentPath.equals(lockPath + "/" + childrens.get(0))) {
return true;
} else {
//如果当前节点不是排第一,则获取前面一个节点信息,赋值给beforePath
int curIndex = childrens.indexOf(currentPath.substring(lockPath.length() + 1));
beforePath = lockPath + "/" + childrens.get(curIndex - 1);
}
System.out.println("beforePath"+beforePath);
return false;
}
@Override
public void releaseLock() {
System.out.println("delete:" + currentPath);
client.delete(currentPath);
}
}