知乎专栏 |
Springboot 官方不建议在生产环境使用 SimpleAsyncTaskExecutor,建议使用 自定义线程池。
2024-02-02T16:24:38.585+08:00 WARN 86223 --- [watch-development] [nio-8080-exec-2] s.w.s.m.m.a.RequestMappingHandlerAdapter : !!! Performing asynchronous handling through the default Spring MVC SimpleAsyncTaskExecutor. This executor is not suitable for production use under load. Please, configure an AsyncTaskExecutor through the WebMvc config. ------------------------------- !!!
Bean 注入配置线程池
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; @SpringBootApplication @EnableAsync public class Application { public static void main(String[] args) { // close the application context to shut down the custom ExecutorService SpringApplication.run(Application.class, args).close(); } @Bean public Executor asyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(2); executor.setQueueCapacity(500); executor.setThreadNamePrefix("Netkiller -"); executor.initialize(); return executor; } @Bean("thread") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 设置核心线程数 executor.setCorePoolSize(5); // 设置最大线程数 executor.setMaxPoolSize(10); // 设置队列容量 executor.setQueueCapacity(20); // 设置线程活跃时间(秒) executor.setKeepAliveSeconds(60); // 设置线程名称 executor.setThreadNamePrefix("hello-"); // 设置拒绝策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 等待所有任务结束后再关闭线程池 executor.setWaitForTasksToCompleteOnShutdown(true); return executor; } }
设置线程池参数
@SpringBootApplication @EnableAsync public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
@Component public class Task { @Async public void doTaskOne() throws Exception { // 业务逻辑 } @Async("asyncExecutor") public void doTaskTwo() throws Exception { // 业务逻辑 } @Async("thread") public void doTaskThree() throws Exception { // 业务逻辑 } }
注意:@Async 不会用到刚刚定义的线程池,@Async("asyncExecutor"),@Async("thread") 会正确调用
线程池能接受多少队列?
下面配置是 executor.setQueueCapacity(10); 也就是 10个,但是实测结果跟你想的不同
package cn.netkiller.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; @Configuration @EnableAsync public class ThreadPoolTaskExecutorConfiguration { @Bean("asyncExecutor") public ThreadPoolTaskExecutor executor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setThreadGroupName("job"); executor.setThreadNamePrefix("async-job-"); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); executor.setQueueCapacity(10); executor.setKeepAliveSeconds(60); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); executor.setAwaitTerminationSeconds(60); executor.setWaitForTasksToCompleteOnShutdown(true); executor.initialize(); return executor; } }
实测结果是,首次执行可以容纳 20 个线程,20个线程执行完毕之后,再添加任务,就只接受 10 个,超过的部分会跑出异常
Executor [java.util.concurrent.ThreadPoolExecutor@7e729046[Running, pool size = 10, active threads = 10, queued tasks = 10, completed tasks = 0]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$1775/0x0000000801b6afb0@20eaccc2
这是因为线程池可以容纳 10 个任务,队列还能排队 10 个任务。
package cn.netkiller.wallet.config; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; @Configuration @EnableAsync public class ExecutorConfiguration { /** Set the ThreadPoolExecutor's core pool size. */ private int corePoolSize = 10; /** Set the ThreadPoolExecutor's maximum pool size. */ private int maxPoolSize = 200; /** Set the capacity for the ThreadPoolExecutor's BlockingQueue. */ private int queueCapacity = 10; @Bean public Executor OneAsync() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix("MySimpleExecutor-"); executor.initialize(); return executor; } @Bean public Executor TwoAsync() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix("MyExecutor-"); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
@Service public class DemoAsyncServiceImpl implements DemoAsyncService { public static Random random =new Random(); @Async("OneAsync") public Future<String> doTaskOne() throws Exception { System.out.println("开始做任务一"); long start = System.currentTimeMillis(); Thread.sleep(random.nextInt(10000)); long end = System.currentTimeMillis(); System.out.println("完成任务一,耗时:" + (end - start) + "毫秒"); return new AsyncResult<>("任务一完成"); } @Async("TwoAsync") public Future<String> doTaskTwo() throws Exception { System.out.println("开始做任务二"); long start = System.currentTimeMillis(); Thread.sleep(random.nextInt(10000)); long end = System.currentTimeMillis(); System.out.println("完成任务二,耗时:" + (end - start) + "毫秒"); return new AsyncResult<>("任务二完成"); } @Async public Future<String> doTaskThree() throws Exception { System.out.println("开始做任务三"); long start = System.currentTimeMillis(); Thread.sleep(random.nextInt(10000)); long end = System.currentTimeMillis(); System.out.println("完成任务三,耗时:" + (end - start) + "毫秒"); return new AsyncResult<>("任务三完成"); } }
这种方式可以直接使用 @Async
package cn.netkiller.config; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @Configuration @EnableAsync public class ThreadPoolTaskExecutorConfiguration implements AsyncConfigurer { @Value("${spring.task.execution.pool.core-size}") private int corePoolSize; @Value("${spring.task.execution.pool.max-size}") private int maxPoolSize; @Value("${spring.task.execution.pool.queue-capacity}") private int queueCapacity; // @Value("${spring.task.execution.thread-name-prefix}") private final String threadNamePrefix = "async-"; @Value("${spring.task.execution.pool.keep-alive}") private int keepAliveSeconds; @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //最大线程数 executor.setMaxPoolSize(maxPoolSize); //核心线程数 executor.setCorePoolSize(corePoolSize); //任务队列的大小 executor.setQueueCapacity(queueCapacity); //线程前缀名 executor.setThreadNamePrefix(threadNamePrefix); //线程存活时间 executor.setKeepAliveSeconds(keepAliveSeconds); /** * 拒绝处理策略 * CallerRunsPolicy():交由调用方线程运行,比如 main 线程。 * AbortPolicy():直接抛出异常。 * DiscardPolicy():直接丢弃。 * DiscardOldestPolicy():丢弃队列中最老的任务。 */ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //线程初始化 executor.initialize(); return executor; } }
package cn.netkiller.config; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurerSupport; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @Configuration @EnableAsync public class ThreadPoolTaskExecutorConfiguration extends AsyncConfigurerSupport { @Value("${spring.task.execution.pool.core-size}") private int corePoolSize; @Value("${spring.task.execution.pool.max-size}") private int maxPoolSize; @Value("${spring.task.execution.pool.queue-capacity}") private int queueCapacity; // @Value("${spring.task.execution.thread-name-prefix}") private final String threadNamePrefix = "async-"; @Value("${spring.task.execution.pool.keep-alive}") private int keepAliveSeconds; @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //最大线程数 executor.setMaxPoolSize(maxPoolSize); //核心线程数 executor.setCorePoolSize(corePoolSize); //任务队列的大小 executor.setQueueCapacity(queueCapacity); //线程前缀名 executor.setThreadNamePrefix(threadNamePrefix); //线程存活时间 executor.setKeepAliveSeconds(keepAliveSeconds); /** * 拒绝处理策略 * CallerRunsPolicy():交由调用方线程运行,比如 main 线程。 * AbortPolicy():直接抛出异常。 * DiscardPolicy():直接丢弃。 * DiscardOldestPolicy():丢弃队列中最老的任务。 */ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //线程初始化 executor.initialize(); return executor; } }
这种方式多用在 Spring 2.x 中
package cn.netkiller.config; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @Configuration @EnableAsync public class ThreadPoolTaskExecutorConfiguration { @Value("${spring.task.execution.pool.core-size}") private int corePoolSize; @Value("${spring.task.execution.pool.max-size}") private int maxPoolSize; @Value("${spring.task.execution.pool.queue-capacity}") private int queueCapacity; // @Value("${spring.task.execution.thread-name-prefix}") private final String threadNamePrefix = ""; @Value("${spring.task.execution.pool.keep-alive}") private int keepAliveSeconds; @Bean public Executor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //最大线程数 executor.setMaxPoolSize(maxPoolSize); //核心线程数 executor.setCorePoolSize(corePoolSize); //任务队列的大小 executor.setQueueCapacity(queueCapacity); //线程前缀名 executor.setThreadNamePrefix(threadNamePrefix); //线程存活时间 executor.setKeepAliveSeconds(keepAliveSeconds); /** * 拒绝处理策略 * CallerRunsPolicy():交由调用方线程运行,比如 main 线程。 * AbortPolicy():直接抛出异常。 * DiscardPolicy():直接丢弃。 * DiscardOldestPolicy():丢弃队列中最老的任务。 */ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //线程初始化 executor.initialize(); return executor; } }
注意:使用@Bean方式必须配合 @Async("threadPoolTaskExecutor")
自定义连接池之后,系统内会存在两个连接吃 SimpleAsyncTaskExecutor 和 ThreadPoolTaskExecutor
@Bean public Executor applicationTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(2); executor.setQueueCapacity(500); executor.setThreadNamePrefix("Netkiller-"); executor.initialize(); return executor; }