分布式事务分布式锁区别 (分布式事务分布式锁)

目标1:分布式锁Redisson讲解

目标2:分布式锁控制超卖

目标3:Seata分布式事务讲解

目标4:普通商品抢单分布式事务

目标5:WebSocket讲解

1 分布式锁

1.1 问题分析

上面抢单过程实现了,但其实还是有问题,会发生超卖问题,如下图:

分布式事务分布式锁区别,分布式事务分布式会话

在多线程执行的情况下,上面的抢单流程会发生超卖问题,比如只剩下1个商品,多线程同时判断是否有库存的时候,会同时判断有库存,最终导致1个商品多个订单的问题发生。

1.2 redisson分布式锁

1.2.1 分布式锁介绍

​ 解决上面超卖问题,我们可以采用分布式锁来控制,分布式锁的原理很简单。

​ 分布式锁主要是实现在分布式场景下保证数据的最终一致性。在单进程的系统中,存在多个线程可以同时改变某个变量(可变共享变量)时,就需要对变量或代码块做同步(lock—synchronized),使其在修改这种变量时能够线性执行消除并发修改变量。但分布式系统是多部署、多进程的,开发语言提供的并发处理API在此场景下就无能为力了。

目前市面上分布式锁常见的实现方式有三种:


1.基于数据库实现分布式锁; 
2.基于缓存(Redis等)实现分布式锁; 
3.基于Zookeeper实现分布式锁;

1.2.2 Redisson介绍

​ 大部分网站使用的分布式锁是基于缓存的,有更好的性能,而缓存一般是以集群方式部署,保证了高可用性。而Redis分布式锁官方推荐使用redisson。

Redisson原理图如下:

分布式事务分布式锁区别,分布式事务分布式会话

Redisson所说明:


1、redission获取锁释放锁的使用和JDK里面的lock很相似,底层的实现采用了类似lock的处理方式
2、redisson 依赖redis,因此使用redisson 锁需要服务端安装redis,而且redisson 支持单机和集群两种模式下的锁的实现
3、redisson 在多线程或者说是分布式环境下实现机制,其实是通过设置key的方式进行实现,也就是说多个线程为了抢占同一个锁,其实就是争抢设置key。

Redisson原理:

1)加锁:


if (redis.call('exists', KEYS[1]) == 0) then 
        redis.call('hset', KEYS[1], ARGV[2], 1);
         redis.call('pexpire', KEYS[1], ARGV[1]); 
         return nil;
          end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
        redis.call('hincrby', KEYS[1], ARGV[2], 1);
        redis.call('pexpire', KEYS[1], ARGV[1]); 
        return nil;
        end;
return redis.call('pttl', KEYS[1]);

将业务封装在lua中发给redis,保障业务执行的原子性。

第1个if表示执行加锁,会先判断要加锁的key是否存在,不存在就加锁。

当第1个if执行,key存在的时候,会执行第2个if,第2个if会获取第1个if对应的key剩余的有效时间,然后会进入while循环,不停的尝试加锁。

2)释放锁:


if (redis.call('exists', KEYS[1]) == 0) then
       redis.call('publish', KEYS[2], ARGV[1]);
        return 1; 
        end;
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then 
     return nil;
     end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then
     redis.call('pexpire', KEYS[1], ARGV[2]); 
     return 0; 
else redis.call('del', KEYS[1]); 
     redis.call('publish', KEYS[2], ARGV[1]); 
     return 1;
     end;
return nil;

执行lock.unlock(),每次都对myLock数据结构中的那个加锁次数减1。如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:“del myLock”命令,从redis里删除这个key,另外的客户端2就可以尝试完成加锁了。

3)缺点:

Redisson存在一个问题,就是如果你对某个redis master实例,写入了myLock这种锁key的value,此时会异步复制给对应的master slave实例。但是这个过程中一旦发生redis master宕机,主备切换,redis slave变为了redis master。接着就会导致,客户端2来尝试加锁的时候,在新的redis master上完成了加锁,而客户端1也以为自己成功加了锁。此时就会导致多个客户端对一个分布式锁完成了加锁。这时系统在业务上一定会出现问题,导致脏数据的产生。

1.2.3 Redisson配置

1)引入依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.11.0</version>
</dependency>

2)锁操作方法实现

要想用到分布式锁,我们就必须要实现获取锁和释放锁,获取锁和释放锁可以编写一个DistributedLocker接口,代码如下:

public interface DistributedLocker {
    /***
     * lock(), 拿不到lock就不罢休,不然线程就一直block
     * @param lockKey
     * @return
     */
    RLock lock(String lockKey);
    /***
     * timeout为加锁时间,单位为秒
     * @param lockKey
     * @param timeout
     * @return
     */
    RLock lock(String lockKey, long timeout);
    /***
     * timeout为加锁时间,时间单位由unit确定
     * @param lockKey
     * @param unit
     * @param timeout
     * @return
     */
    RLock lock(String lockKey, TimeUnit unit, long timeout);
    /***
     * tryLock(),马上返回,拿到lock就返回true,不然返回false。
     * 带时间限制的tryLock(),拿不到lock,就等一段时间,超时返回false.
     * @param lockKey
     * @param unit
     * @param waitTime
     * @param leaseTime
     * @return
     */
    boolean tryLock(String lockKey, TimeUnit unit, long waitTime, long leaseTime);
    /***
     * 解锁
     * @param lockKey
     */
    void unlock(String lockKey);
    /***
     * 解锁
     * @param lock
     */
    void unlock(RLock lock);
}

实现上面接口中对应的锁管理方法,编写一个锁管理类RedissonDistributedLocker,代码如下:


@Component
public class RedissonDistributedLocker implements DistributedLocker {

    @Autowired
    private RedissonClient redissonClient;

    /***
     * lock(), 拿不到lock就不罢休,不然线程就一直block
     * @param lockKey
     * @return
     */
    @Override
    public RLock lock(String lockKey) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.lock();
        return lock;
    }

    /***
     * timeout为加锁时间,单位为秒
     * @param lockKey
     * @param timeout
     * @return
     */
    @Override
    public RLock lock(String lockKey, long timeout) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.lock(timeout, TimeUnit.SECONDS);
        return lock;
    }

    /***
     * timeout为加锁时间,时间单位由unit确定
     * @param lockKey
     * @param unit
     * @param timeout
     * @return
     */
    @Override
    public RLock lock(String lockKey, TimeUnit unit, long timeout) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.lock(timeout, unit);
        return lock;
    }

    /***
     * tryLock(),马上返回,拿到lock就返回true,不然返回false。
     * 带时间限制的tryLock(),拿不到lock,就等一段时间,超时返回false.
     * @param lockKey
     * @param unit
     * @param waitTime
     * @param leaseTime
     * @return
     */
    @Override
    public boolean tryLock(String lockKey, TimeUnit unit, long waitTime, long leaseTime) {
        RLock lock = redissonClient.getLock(lockKey);
        try {
            return lock.tryLock(waitTime, leaseTime, unit);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    /***
     * 解锁
     * @param lockKey
     */
    @Override
    public void unlock(String lockKey) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.unlock();
    }

    /***
     * 解锁
     * @param lock
     */
    @Override
    public void unlock(RLock lock) {
        lock.unlock();
    }
}

3)配置Redis链接

在resources下新建文件redisson.yml,主要用于配置redis集群节点链接配置,代码如下:

clusterServersConfig:
  # 连接空闲超时,单位:毫秒 默认10000
  idleConnectionTimeout: 10000
  pingTimeout: 1000
  # 同任何节点建立连接时的等待超时。时间单位是毫秒 默认10000
  connectTimeout: 10000
  # 等待节点回复命令的时间。该时间从命令发送成功时开始计时。默认3000
  timeout: 3000
  # 命令失败重试次数
  retryAttempts: 3
  # 命令重试发送时间间隔,单位:毫秒
  retryInterval: 1500
  # 重新连接时间间隔,单位:毫秒
  reconnectionTimeout: 3000
  # 执行失败最大次数
  failedAttempts: 3
  # 密码
  #password: test1234
  # 单个连接最大订阅数量
  subscriptionsPerConnection: 5
  clientName: null
  # loadBalancer 负载均衡算法类的选择
  loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
  #从节点发布和订阅连接的最小空闲连接数
  slaveSubscriptionConnectionMinimumIdleSize: 1
  #从节点发布和订阅连接池大小 默认值50
  slaveSubscriptionConnectionPoolSize: 50
  # 从节点最小空闲连接数 默认值32
  slaveConnectionMinimumIdleSize: 32
  # 从节点连接池大小 默认64
  slaveConnectionPoolSize: 64
  # 主节点最小空闲连接数 默认32
  masterConnectionMinimumIdleSize: 32
  # 主节点连接池大小 默认64
  masterConnectionPoolSize: 64
  # 订阅操作的负载均衡模式
  subscriptionMode: SLAVE
  # 只在从服务器读取
  readMode: SLAVE
  # 集群地址
  nodeAddresses:
    - "redis://192.168.211.137:7001"
    - "redis://192.168.211.137:7002"
    - "redis://192.168.211.137:7003"
    - "redis://192.168.211.137:7004"
    - "redis://192.168.211.137:7005"
    - "redis://192.168.211.137:7006"
  # 对Redis集群节点状态扫描的时间间隔。单位是毫秒。默认1000
  scanInterval: 1000
  #这个线程池数量被所有RTopic对象监听器,RRemoteService调用者和RExecutorService任务共同共享。默认2
threads: 0
#这个线程池数量是在一个Redisson实例内,被其创建的所有分布式数据类型和服务,以及底层客户端所一同共享的线程池里保存的线程数量。默认2
nettyThreads: 0
# 编码方式 默认org.redisson.codec.JsonJacksonCodec
codec: !<org.redisson.codec.JsonJacksonCodec> {}
#传输模式
transportMode: NIO
# 分布式锁自动过期时间,防止死锁,默认30000
lockWatchdogTimeout: 30000
# 通过该参数来修改是否按订阅发布消息的接收顺序出来消息,如果选否将对消息实行并行处理,该参数只适用于订阅发布消息的情况, 默认true
keepPubSubOrder: true
# 用来指定高性能引擎的行为。由于该变量值的选用与使用场景息息相关(NORMAL除外)我们建议对每个参数值都进行尝试。
#
#该参数仅限于Redisson PRO版本。
#performanceMode: HIGHER_THROUGHPUT

4)创建Redisson管理对象

​ Redisson管理对象有2个,分别为RedissonClient和RedissonConnectionFactory,我们只用在项目的RedisConfig中配置一下这2个对象即可,在RedisConfig中添加的代码如下:

/****
 * Redisson客户端
 * @return
 * @throws IOException
 */
@Bean
public RedissonClient redisson() throws IOException {
    ClassPathResource resource = new ClassPathResource("redssion.yml");
    Config config = Config.fromYAML(resource.getInputStream());
    RedissonClient redisson = Redisson.create(config);
    return redisson;
}

/***
 * Redisson工厂对象
 * @param redisson
 * @return
 */
@Bean
public RedissonConnectionFactory redissonConnectionFactory(RedissonClient redisson) {
    return new RedissonConnectionFactory(redisson);
}

5)测试代码

测试Redisson分布式锁的代码如下:

分布式事务分布式锁区别,分布式事务分布式会话

测试结果如下:

分布式事务分布式锁区别,分布式事务分布式会话

1.3 Redisson分布式锁控制超卖

​ 我们把上面秒杀下单会出现超卖的部分代码用Redisson分布式锁来控制一下,代码如下:

/***
 * 秒杀下单
 * @param orderMap
 */
@Override
public void addHotOrder(Map<String, String> orderMap) {
    String id = orderMap.get("id");
    String username = orderMap.get("username");
    //key
    String key = "SKU_" + id;
    //分布式锁的key
    String lockkey = "LOCKSKU_" + id;
    //用户购买的key
    String userKey = "USER" + username + "ID" + id;

    //尝试获取锁,等待10秒,自己获得锁后一直不解锁则10秒后自动解锁
    boolean bo = distributedLocker.tryLock(lockkey, TimeUnit.SECONDS, 10L, 10L);
    if(bo){
        if (redisTemplate.hasKey(key)) {
            //数量
            Integer num = Integer.parseInt(redisTemplate.boundHashOps(key).get("num").toString());

            //拥有库存,执行递减操作
            if (num > 0) {
                //查询商品
                Result<Sku> result = skuFeign.findById(id);
                Sku sku = result.getData();
                Order order = new Order();
                order.setCreateTime(new Date());
                order.setUpdateTime(order.getCreateTime());
                order.setUsername(username);
                order.setSkuId(id);
                order.setName(sku.getName());
                order.setPrice(sku.getSeckillPrice());
                order.setId("No" + idWorker.nextId());
                order.setOrderStatus("0");
                order.setPayStatus("0");
                order.setConsignStatus("0");
                orderMapper.insertSelective(order);

                //库存递减
                num--;

                if (num == 0) {
                    //同步数据到数据库,秒杀数量归零
                    skuFeign.zero(id);
                }

                //更新数据
                Map<String, Object> dataMap = new HashMap<String, Object>();
                dataMap.put("num", num);
                dataMap.put(userKey, 0);

                //存数据
                redisTemplate.boundHashOps(key).putAll(dataMap);
            }

            //记录该商品用户24小时内无法再次购买,测试环境,我们只配置成1分钟
            redisTemplate.boundValueOps(userKey).set("");
            redisTemplate.expire(userKey, 1, TimeUnit.MINUTES);
        }
        //解锁
        distributedLocker.unlock(lockkey);
    }
}