一、配置和使用
1、配置类
@Slf4j
@Configuration
@EnableAsync
public class ExecutorConfig {
/**
*获取当前机器CPU数量
*/
private static final int CPU = Runtime.getRuntime().availableProcessors();
/**
*核心线程数(默认线程数)
*/
private static final int CORE_POOL_SIZE = CPU;
/**
*最大线程数
*/
private static final int MAX_POOL_SIZE = CPU * 2;
/**
*允许线程空闲时间(单位:默认为秒)
*/
private static final int KEEP_ALIVE_TIME = 60;
/**
*缓冲队列数
*/
private static final int QUEUE_CAPACITY = 200;
/**
*线程池名前缀
*/
private static final String THREAD_NAME_PREFIX = "orderTaskExecutor-";
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(CORE_POOL_SIZE);
//配置最大线程数
executor.setMaxPoolSize(MAX_POOL_SIZE);
// 配置最大空闲时间
executor.setKeepAliveSeconds(KEEP_ALIVE_TIME);
//配置队列大小
executor.setQueueCapacity(QUEUE_CAPACITY);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
}
}
2、接口及实现类
public interface AsyncService {
/**
* 执行异步任务
*/
void executeAsync();
/**
* 执行异步任务,有返回值
*/
Future executeAsync2();
}
@Service
public class AsyncServiceImpl implements AsyncService {
private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);
@Override
@Async("asyncServiceExecutor")
public void executeAsync() {
logger.info("start executeAsync");
System.out.println("异步线程要做的事情");
System.out.println("可以在这里执行批量插入等耗时的事情");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("end executeAsync");
}
/**
* 异常调用返回Future
* 对于返回值是Future,不会被AsyncUncaughtExceptionHandler处理,需要我们在方法中捕获异常并处理
* 或者在调用方在调用Futrue.get时捕获异常进行处理
*/
@Override
@Async("asyncServiceExecutor")
public Future<String> executeAsync2() {
logger.info("start executeAsync");
System.out.println("异步线程要做的事情");
System.out.println("可以在这里执行批量插入等耗时的事情");
Future<String> future = null;
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
future = new AsyncResult<String>(e.toString());
}
logger.info("end executeAsync");
return future;
}
}
3、方法调用
@RestController
@RequestMapping("/test")
public class TestController2 {
@Resource
private AsyncService asyncService;
// 调用无返回值的异步方法时,不会阻塞
@GetMapping("/async")
public String async(){
asyncService.executeAsync();
System.out.println("controller 执行完毕");
return "controller 执行完毕";
}
@GetMapping("/async2")
public String async2(){
Future<String> future = asyncService.executeAsync2();
System.out.println("controller 执行完毕");
String s = null;
try {
s = future.get(); // 阻塞等待异步线程执行完毕返回结果
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return s;
}
}
二、异步处理案例
使用阻塞队列模拟消息队列,思路:用户下单,通过redis判断库存是否充足,如果充足,将订单信息存储到阻塞队列中,利用异步线程处理队列中的订单,存储到数据库。
该案例只是个练习案例,不要在生产中使用,需要用成熟的消息队列,比如 rabbitMQ
配置类同上
异步线程任务类
@Service
public class AsyncServiceImpl {
@Autowired
private KillOrderService killOrderService;
@Async
public void createOrder(BlockingQueue<Order> queue){
while (true){
try {
// 从阻塞队列中取出订单,并存储到数据库
Order order = queue.take();
killOrderService.save(order);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
主逻辑
@Service
public class KillOrderServiceImpl implements KillOrderService {
// 阻塞队列
private BlockingQueue<Order> queue = new ArrayBlockingQueue<>(1024 * 1024);
@Autowired
private AsyncServiceImpl asyncService;
// 在 bean 加载后立刻执行
@PostConstruct
public void init(){
asyncService.createOrder(queue);
}
// 用户下单
@Override
public void killOrder(String userId) {
// 查询库存,填充信息,此处简写
Order order = new Order();
order.setUserId(UUID.randomUUID().toString());
// 将订单添加到阻塞队列
queue.add(order);
}
}