秒杀/超卖/限流


秒杀/超卖/限流

存在问题

如果你的项目流量非常小,完全不用担心有并发的购买请求,那么做这样一个系统意义不大。但如果你的系统要像12306那样,接受高并发访问和下单的考验,那么你就需要一套完整的流程保护措施,来保证你系统在用户流量高峰期不会被搞挂了。

  • 严格防止超卖:库存100件你卖了120件,等着辞职吧
  • 防止黑产:防止不怀好意的人群通过各种技术手段把你本该下发给群众的利益全收入了囊中。
  • 防止服务器宕机:防止某时刻访问数过多,导致服务器宕机
  • 保证用户体验:高并发下,别网页打不开了,支付不成功了,购物车进不去了,地址改不了了。这个问题非常之大,涉及到各种技术,也不是一下子就能讲完的,甚至根本就没法讲完。

保护措施

  • 乐观锁防止超卖 —核心基础
  • 令牌桶限流
  • Redis 缓存
  • 消息队列异步处理订单
  • ….

问题一、超卖

1、悲观锁解决超卖

synchronized

synchronized是Java中的关键字,是一种同步锁。可修饰实例方法,静态方法,代码块。
synchronized是一种悲观锁

注意:synchronized 锁的范围必须比 @Transactional 事务注解的范围大,否则仍会出现超卖现象

(1)sql

-- ----------------------------
-- Table structure for stock
-- ----------------------------
DROP TABLE IF EXISTS `stock`;
CREATE TABLE `stock` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(50) NOT NULL DEFAULT '' COMMENT '名称',
  `count` int(11) NOT NULL COMMENT '库存',
  `sale` int(11) NOT NULL COMMENT '已售',
  `version` int(11) NOT NULL COMMENT '乐观锁,版本号',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Table structure for stock_order
-- ----------------------------
DROP TABLE IF EXISTS `stock_order`;
CREATE TABLE `stock_order` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `sid` int(11) NOT NULL COMMENT '库存ID',
  `name` varchar(30) NOT NULL DEFAULT '' COMMENT '商品名称',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

(2)Service

@Service
@Transactional
public class OrderServiceImpl implements OrderService {
    @Autowired
    private OrderDAO orderDAO;
    @Autowired
    private StockDAO stockDAO;
    @Override
    public Integer createOrder(Integer id) {
        //校验库存
        Stock stock = checkStock(id);
        //扣库存
        updateSale(stock);
        //下订单
        return createOrder(stock);
    }
    //校验库存
    private Stock checkStock(Integer id) {
        // 根据 商品id 查询库存
        Stock stock = stockDAO.checkStock(id);
        if (stock.getSale().equals(stock.getCount())) {
            throw new RuntimeException("库存不足");
        }
        return stock;
    }
    //扣库存
    private void updateSale(Stock stock){
        stock.setSale(stock.getSale() + 1);
        stockDAO.updateSale(stock);
    }
    //下订单
    private Integer createOrder(Stock stock){
        Order order = new Order();
        order.setSid(stock.getId());
        order.setCreateDate(new Date());
        order.setName(stock.getName());
        orderDAO.createOrder(order);
        return order.getId();
    }
}

(3)Controller

@RestController
@RequestMapping("stock")
public class StockController {
    @Autowired
    private OrderService orderService;
    //秒杀方法
    @GetMapping("sale")
    public String sale(Integer id){
        int orderId = 0;
        try{
            synchronized(this){
                //根据商品id创建订单,返回创建订单的id
                orderId =  orderService.createOrder(id);
                System.out.println("orderId = " + orderId);
                return String.valueOf(orderId);
            }
        }catch (Exception e){
            e.printStackTrace();
            return e.getMessage();
        }
    }
}

2、乐观锁解决超卖

乐观锁

基于数据库的乐观锁

乐观锁机制其实就是在数据库表中引入一个版本号(version)字段来实现的。

当我们要从数据库中读取数据的时候,同时把这个version字段也读出来,如果要对读出来的数据进行更新后写回数据库,则需要将version加1,同时将新的数据与新的version更新到数据表中,且必须在更新的时候同时检查目前数据库里version值是不是之前的那个version,如果是,则正常更新。如果不是,则更新失败,说明在这个过程中有其它的进程去更新过数据了。

(1)Mapper

StockMapper

<!--根据秒杀商品id查询库存-->
<select id="checkStock" parameterType="int" resultType="Stock">
    select id,name,count,sale,version from stock
    where id = #{id}
</select>

<!--根据商品id扣除库存(对已售商品字段加一)-->
<!--乐观锁-->
<update id="updateSale" parameterType="Stock">
    update stock set
    sale=sale+1,
    version=version+1
    where
    id =#{id}
    and
    version = #{version}
</update>

OrderMapper

<!--创建订单-->
<insert id="createOrder" parameterType="Order" useGeneratedKeys="true" keyProperty="id" >
    insert into stock_order values(#{id},#{sid},#{name},#{createDate})
</insert>

3、redis解决超卖

可以将商品数量保存到redis中,多个用户对商品数量key调用decr命令,并且都会返回一个值,只有当返回的值大于零时,才生成订单,其他的都算是抢购失败,redis是单线程处理数据的,所以不会有并发问题,至于效率肯定比数据库快!

//设值
redisTemplate.opsForValue().set("key", "10");

//加1
Long value = redisTemplate.opsForValue().increment("key");

//加4
Long value = redisTemplate.opsForValue().increment("key",4);

//减1
Long value = redisTemplate.opsForValue().decrement("key");

//减4
Long value = redisTemplate.opsForValue().decrement("key",4);    

//加2.3 increment是float类型
redisTemplate.opsForValue().increment("i",2.3);

RedisTemplate与StringRedisTemplate

两者的关系

都是spring-data-redis包中的class
StringRedisTemplate继承RedisTemplate

适用对象

RedisTemplate: 各种复杂对象
序列化方式:JdkSerializationRedisSerializer

StringRedisTemplate: String对象
序列化方式:StringRedisSerializer

由于大部分的redis的操作都基于String类型,所以StringRedisTemplate最小化配置项,是其变得更通用,尤其是在序列化方式上面

注意点

两者序列化方式不同,所以两个template不通用(序列化及反序列化问题)

通过RedisTemplate存储的键值,不能通过StringRedisTemplate 获取,会报错

问题二、接口限流

(1)接口限流

限流:是对某一时间窗口内的请求数进行限制,保持系统的可用性和稳定性,防止因流量暴增而导致的系统运行缓慢或宕机

常用的限流算法有令牌桶和和漏桶(漏斗算法),而Google开源项目Guava中的RateLimiter使用的就是令牌桶控制算法。在开发高并发系统时有三把利器用来保护系统:缓存降级限流

  • 缓存:缓存的目的是提升系统访问速度和增大系统处理容量
  • 降级:降级是当服务器压力剧增的情况下,根据当前业务情况及流量对一些服务和页面有策略的降级,以此释放服务器资源以保证核心任务的正常运行
  • 限流:限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理。

(2)令牌桶和漏斗算法

  • 漏斗算法:漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。
  • 令牌桶算法:最初来源于计算机网络。在网络传输数据时,为了防止网络拥塞,需限制流出网络的流量,使流量以比较均匀的速度向外发送。令牌桶算法就实现了这个功能,可控制发送到网络上数据的数目,并允许突发数据的发送。大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。如果令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直到把桶填满。后面再产生的令牌就会从桶中溢出。最后桶中可以保存的最大令牌数永远不会超过桶的大小。这意味,面对瞬时大流量,该算法可以在短时间内请求拿到大量令牌,而且拿令牌的过程并不是消耗很大的事情。

(3)令牌桶的简单使用

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>28.2-jre</version>
</dependency>
public class StockController {
    @Autowired
    private OrderService orderService;

    //创建令牌桶实例(限速器),设置每秒最多产生40个令牌
    private RateLimiter rateLimiter =  RateLimiter.create(40);

    @GetMapping("sale")
    public String sale(Integer id){
        //1.没有获取到token,则一直请求直到获取到token 令牌
        //log.info("等待的时间: "+  rateLimiter.acquire());

        //2.设置一个等待时间,如果在等待的时间(2s)内获取到了token 令牌,则处理业务,如果在等待时间内没有获取到响应token则抛弃
        if(!rateLimiter.tryAcquire(2, TimeUnit.SECONDS)){
            System.out.println("当前请求被限流,直接抛弃,无法调用后续秒杀逻辑....");
            return "抢购失败!";
        }
        System.out.println("处理业务.....................");
        return "抢购成功";
    }
}
/*----------------------------创建限速器----------------------------------------*/
//根据指定的稳定吞吐率创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询)
// 人话:返回一个每秒产生 (参数) 个令牌的限速器
static RateLimiter create(double permitsPerSecond)

/*根据指定的稳定吞吐率和预热期来创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少个请求量),
在这段预热时间内,RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。(只要存在足够请求数来使其饱和)*/
// 人话:返回一个每个指定时间周期产生指定数量令牌的限速器    
static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit);


/*----------------------------令牌产生速率相关方法----------------------------------------*/
//返回RateLimiter 配置中的稳定速率,该速率单位是每秒多少许可数
double getRate()

//更新RateLimite的稳定速率,参数permitsPerSecond 由构造RateLimiter的工厂方法提供。
void setRate(double permitsPerSecond)


/*----------------------------阻塞获取令牌----------------------------------------*/
//从RateLimiter获取一个许可,该方法会被阻塞直到获取到请求,返回0表示获取令牌成功,否则阻塞等待获取
double acquire() 

//从RateLimiter获取指定许可数,该方法会被阻塞直到获取到请求
double acquire(int permits)    


/*----------------------------非阻塞获取令牌----------------------------------------*/
//从RateLimiter 获取许可,如果该许可可以在无延迟下的情况下立即获取得到的话,返回true表示获取成功,否则等待获取
boolean tryAcquire()

//从RateLimiter 获取许可数,如果该许可数可以在无延迟下的情况下立即获取得到的话
boolean tryAcquire(int permits)

/*从RateLimiter 获取指定许可数如果该许可数可以在不超过timeout的时间内获取得到的话,
或者如果无法在timeout 过期之前获取得到许可数的话,那么立即返回false (无需等待)*/
boolean tryAcquire(int permits, long timeout, TimeUnit unit)

/*从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话,
或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待)*/
boolean  tryAcquire(long timeout, TimeUnit unit)

(4)令牌桶具体使用

RateLimiterInterceptor,负责实现限速逻辑

public class RateLimiterInterceptor extends HandlerInterceptorAdapter {

    private final RateLimiter rateLimiter;

    //通过构造函数初始化限速器
    public RateLimiterInterceptor(RateLimiter rateLimiter) {
        this.rateLimiter = rateLimiter;
    }

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {

        if (this.rateLimiter.tryAcquire()){
            // 成功获取到令牌
            return true;
        }

        /*
        * 获取失败,直接相应"错误信息"
        * 也可以通过抛出异常,通过全局异常处理器相应客户端
        * */
        response.setCharacterEncoding(StandardCharsets.UTF_8.name());
        response.setContentType(MediaType.TEXT_PLAIN_VALUE);
        response.getWriter().write("服务器繁忙");
        return false;
    }
}

拦截器的配置

@Configuration
public class WebMvcConfiguration implements WebMvcConfigurer {

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        // 限流 接口,1秒钟生成一个令牌,也就是1秒钟允许2次访问
        registry.addInterceptor(new RateLimiterInterceptor(RateLimiter.create(2,1, TimeUnit.SECONDS)))
                .addPathPatterns("/log","/getAdminOrderList","/findAdminOrder");
    }
}

问题三、隐藏秒杀接口

问题

  1. 应该在一定的时间内执行秒杀处理,不能再任意时间都接受秒杀请求。如何加入时间验证?

  2. 对于稍微懂点电脑的,又会动歪脑筋的人来说开始通过抓包方式获取我们的接口地址。然后通过脚本进行抢购怎么办?

  3. 秒杀开始之后如何限制单个用户的请求频率,即单位时间内限制访问次数?

该部分内容

  • 限时抢购
  • 抢购接口隐藏
  • 单用户限制频率(单位时间内限制访问次数)

1、限时抢购

redis 设置超时时间实现

@Service
@Transactional
public class OrderServiceImpl implements OrderService {
  @Autowired
  private StringRedisTemplate stringRedisTemplate;

  @Override
    public Integer createOrder(Integer id) {
        //redis校验抢购时间,以 kill+商品id 为key
        if(!stringRedisTemplate.hasKey("kill" + id)){
            throw new RuntimeException("秒杀超时,活动已经结束啦!!!");
        }
        //校验库存
        Stock stock = checkStock(id);
        //扣库存
        updateSale(stock);
        //下订单
        return createOrder(stock);
    }
}

2、抢购接口隐藏

对于稍微懂点电脑的,又会动歪脑筋的人来说,点击F12打开浏览器的控制台,就能在点击抢购按钮后,获取我们抢购接口的链接。(手机APP等其他客户端可以抓包来拿到)一旦坏蛋拿到了抢购的链接,只要稍微写点爬虫代码,模拟一个抢购请求,就可以不通过点击下单按钮,直接在代码中请求我们的接口,完成下单。所以就有了成千上万的薅羊毛军团,写一些脚本抢购各种秒杀商品。

他们只需要在抢购时刻的000毫秒,开始不间断发起大量请求,觉得比大家在APP上点抢购按钮要快,毕竟人的速度又极限,更别说APP说不定还要经过几层前端验证才会真正发出请求。

所以我们需要将抢购接口进行隐藏,抢购接口隐藏(接口加盐)的具体做法

  • 每次点击秒杀按钮,先从服务器获取一个秒杀验证值(接口内判断是否到秒杀时间)。
  • Redis以缓存用户ID和商品ID为Key,秒杀地址为Value缓存验证值
  • 用户请求秒杀商品的时候,要带上秒杀验证值进行校验。
  • 具体流程:

(1)获取 md5 接口

controller

//生成md5值的方法
@RequestMapping("md5")
public String getMd5(Integer id, Integer userid) {
  String md5;
  try {
    md5 = orderService.getMd5(id, userid);
  }catch (Exception e){
    e.printStackTrace();
    return "获取md5失败: "+e.getMessage();
  }
  return "获取md5信息为: "+md5;
}

service

@Override
public String getMd5(Integer id, Integer userid) {
  //检验用户的合法性
  User user = userDAO.findById(userid);
  if(user==null)throw new RuntimeException("用户信息不存在!");
  log.info("用户信息:[{}]",user.toString());
  //检验商品的合法行
  Stock stock = stockDAO.checkStock(id);
  if(stock==null) throw new RuntimeException("商品信息不合法!");
  log.info("商品信息:[{}]",stock.toString());
  //生成hashkey
  String hashKey = "KEY_"+userid+"_"+id;
  //生成md5//这里!QS#是一个盐 随机生成
  String key = DigestUtils.md5DigestAsHex((userid+id+"!Q*jS#").getBytes());
  stringRedisTemplate.opsForValue().set(hashKey, key, 3600, TimeUnit.SECONDS);
  log.info("Redis写入:[{}] [{}]", hashKey, key);
  return key;
}

(2)抢购接口

controller

//开发一个秒杀方法 乐观锁防止超卖+ 令牌桶算法限流
@GetMapping("killtokenmd5")
public String killtoken(Integer id,Integer userid,String md5) {
    System.out.println("秒杀商品的id = " + id);
    //加入令牌桶的限流措施
    if (!rateLimiter.tryAcquire(3, TimeUnit.SECONDS)) {
        log.info("抛弃请求: 抢购失败,当前秒杀活动过于火爆,请重试");
        return "抢购失败,当前秒杀活动过于火爆,请重试!";
    }
    try {
        //根据秒杀商品id 去调用秒杀业务
        int orderId = orderService.kill(id,userid,md5);
        return "秒杀成功,订单id为: " + String.valueOf(orderId);
    } catch (Exception e) {
        e.printStackTrace();
        return e.getMessage();
    }
}

service

@Override
public int kill(Integer id, Integer userid, String md5) {

  //校验redis中秒杀商品是否超时
  //        if(!stringRedisTemplate.hasKey("kill"+id))
  //            throw new RuntimeException("当前商品的抢购活动已经结束啦~~");

  //先验证签名
  String hashKey = "KEY_"+userid+"_"+id;
  String s = stringRedisTemplate.opsForValue().get(hashKey);

  if (s==null && !s.equals(md5)){
      throw  new RuntimeException("当前请求数据不合法,请稍后再试!");
  } 

  //校验库存
  Stock stock = checkStock(id);
  //更新库存
  updateSale(stock);
  //创建订单
  return createOrder(stock);
}

3、限制用户访问频率

假设我们做好了接口隐藏,但是像我上面说的,总有无聊的人会写一个复杂的脚本,先请求hash(md5)值,再立刻请求购买,如果你的app下单按钮做的很差,大家都要开抢后0.5秒才能请求成功,那可能会让脚本依然能够在大家前面抢购成功。

我们需要在做一个额外的措施,来限制单个用户的抢购频率。

其实很简单的就能想到用redis给每个用户做访问统计,甚至是带上商品id,对单个商品做访问统计,这都是可行的。

我们先实现一个对用户的访问频率限制,我们在用户申请下单时,检查用户的访问次数,超过访问次数,则不让他下单!

  • 具体流程

(1)controller

//开发一个秒杀方法 乐观锁防止超卖+ 令牌桶算法限流
@GetMapping("killtokenmd5limit")
public String killtokenlimit(Integer id,Integer userid,String md5) {
  //加入令牌桶的限流措施
  if (!rateLimiter.tryAcquire(3, TimeUnit.SECONDS)) {
    log.info("抛弃请求: 抢购失败,当前秒杀活动过于火爆,请重试");
    return "抢购失败,当前秒杀活动过于火爆,请重试!";
  }
  try {
    //加入单用户限制调用频率
    int count = userService.saveUserCount(userid);
    log.info("用户截至该次的访问次数为: [{}]", count);
    boolean isBanned = userService.getUserCount(userid);
    if (isBanned) {
      log.info("购买失败,超过频率限制!");
      return "购买失败,超过频率限制!";
    }
    //根据秒杀商品id 去调用秒杀业务
    int orderId = orderService.kill(id,userid,md5);
    return "秒杀成功,订单id为: " + String.valueOf(orderId);
  } catch (Exception e) {
    e.printStackTrace();
    return e.getMessage();
  }
}

(2)service

@Service
@Transactional
@Slf4j
public class UserServiceImpl  implements UserService{

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public int saveUserCount(Integer userId) {
        //根据不同用户id生成调用次数的key
        String limitKey = "LIMIT" + "_" + userId;
        //获取redis中指定key的调用次数
        String limitNum = stringRedisTemplate.opsForValue().get(limitKey);
        int limit =-1;
        if (limitNum == null) {
            //第一次调用放入redis中设置为0
            stringRedisTemplate.opsForValue().set(limitKey, "0", 3600, TimeUnit.SECONDS);
        } else {
            //不是第一次调用每次+1
            limit = Integer.parseInt(limitNum) + 1;
            stringRedisTemplate.opsForValue().set(limitKey, String.valueOf(limit), 3600, TimeUnit.SECONDS);
        }
        return limit;//返回调用次数
    }

    @Override
    public boolean getUserCount(Integer userId) {
        String limitKey = "LIMIT"+ "_" + userId;
        //跟库用户调用次数的key获取redis中调用次数
        String limitNum = stringRedisTemplate.opsForValue().get(limitKey);
        if (limitNum == null) {
            //为空直接抛弃说明key出现异常
            log.error("该用户没有访问申请验证值记录,疑似异常");
            return true;
        }
        return Integer.parseInt(limitNum) > 10; //false代表没有超过 true代表超过
    }
}

  目录