SpringBoot配置线程池


一、配置和使用

1、配置类

@Slf4j
@Configuration
@EnableAsync
public class ExecutorConfig {

    /**
     *获取当前机器CPU数量
     */
    private static final int CPU = Runtime.getRuntime().availableProcessors();
    /**
     *核心线程数(默认线程数)
     */
    private static final int CORE_POOL_SIZE = CPU;
    /**
     *最大线程数
     */
    private static final int MAX_POOL_SIZE = CPU * 2;
    /**
     *允许线程空闲时间(单位:默认为秒)
     */
    private static final int KEEP_ALIVE_TIME = 60;
    /**
     *缓冲队列数
     */
    private static final int QUEUE_CAPACITY = 200;
    /**
     *线程池名前缀
     */
    private static final String THREAD_NAME_PREFIX = "orderTaskExecutor-";


    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(CORE_POOL_SIZE);
        //配置最大线程数
        executor.setMaxPoolSize(MAX_POOL_SIZE);
        // 配置最大空闲时间
        executor.setKeepAliveSeconds(KEEP_ALIVE_TIME);
        //配置队列大小
        executor.setQueueCapacity(QUEUE_CAPACITY);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix(THREAD_NAME_PREFIX);

        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }
}

2、接口及实现类

public interface AsyncService {
    /**
     * 执行异步任务
     */
    void executeAsync();

    /**
     * 执行异步任务,有返回值
     */
    Future executeAsync2();
}
@Service
public class AsyncServiceImpl implements AsyncService {
    private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);

    @Override
    @Async("asyncServiceExecutor")
    public void executeAsync() {
        logger.info("start executeAsync");

        System.out.println("异步线程要做的事情");
        System.out.println("可以在这里执行批量插入等耗时的事情");

        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        logger.info("end executeAsync");
    }

    /**
     * 异常调用返回Future
     *  对于返回值是Future,不会被AsyncUncaughtExceptionHandler处理,需要我们在方法中捕获异常并处理
     *  或者在调用方在调用Futrue.get时捕获异常进行处理
     */
    @Override
    @Async("asyncServiceExecutor")
    public Future<String> executeAsync2() {
        logger.info("start executeAsync");

        System.out.println("异步线程要做的事情");
        System.out.println("可以在这里执行批量插入等耗时的事情");

        Future<String> future = null;
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (Exception e) {
            e.printStackTrace();
            future = new AsyncResult<String>(e.toString());
        }

        logger.info("end executeAsync");
        return future;
    }
}

3、方法调用

@RestController
@RequestMapping("/test")
public class TestController2 {
    @Resource
    private AsyncService asyncService;

    // 调用无返回值的异步方法时,不会阻塞
    @GetMapping("/async")
    public String async(){
        asyncService.executeAsync();
        System.out.println("controller 执行完毕");
        return "controller 执行完毕";
    }

    @GetMapping("/async2")
    public String async2(){

        Future<String> future = asyncService.executeAsync2();
        System.out.println("controller 执行完毕");

        String s = null;

        try {
            s = future.get();   // 阻塞等待异步线程执行完毕返回结果
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return s;
    }
}

二、异步处理案例

使用阻塞队列模拟消息队列,思路:用户下单,通过redis判断库存是否充足,如果充足,将订单信息存储到阻塞队列中,利用异步线程处理队列中的订单,存储到数据库。

该案例只是个练习案例,不要在生产中使用,需要用成熟的消息队列,比如 rabbitMQ

配置类同上

异步线程任务类

@Service
public class AsyncServiceImpl {

    @Autowired
    private KillOrderService killOrderService;

    @Async
    public void createOrder(BlockingQueue<Order> queue){
        while (true){
            try {
                // 从阻塞队列中取出订单,并存储到数据库
                Order order = queue.take();
                killOrderService.save(order);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

主逻辑

@Service
public class KillOrderServiceImpl implements KillOrderService {

    // 阻塞队列
    private BlockingQueue<Order> queue = new ArrayBlockingQueue<>(1024 * 1024);

    @Autowired
    private AsyncServiceImpl asyncService;

    // 在 bean 加载后立刻执行
    @PostConstruct
    public void init(){
        asyncService.createOrder(queue);
    }

    // 用户下单
    @Override
    public void killOrder(String userId) {
        // 查询库存,填充信息,此处简写
        Order order = new Order();
        order.setUserId(UUID.randomUUID().toString());

        // 将订单添加到阻塞队列
        queue.add(order);
    }
}

  目录