Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏

37.6. ThreadPoolTaskExecutor 自定义线程池

Springboot 官方不建议在生产环境使用 SimpleAsyncTaskExecutor,建议使用 自定义线程池。

		
2024-02-02T16:24:38.585+08:00  WARN 86223 --- [watch-development] [nio-8080-exec-2] s.w.s.m.m.a.RequestMappingHandlerAdapter : !!!
Performing asynchronous handling through the default Spring MVC SimpleAsyncTaskExecutor.
This executor is not suitable for production use under load.
Please, configure an AsyncTaskExecutor through the WebMvc config.
-------------------------------
!!!		
		
		

Bean 注入配置线程池

        
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@SpringBootApplication
@EnableAsync
public class Application {

	public static void main(String[] args) {
	    // close the application context to shut down the custom ExecutorService
	    SpringApplication.run(Application.class, args).close();
	}
	
	@Bean
	public Executor asyncExecutor() {
	    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
	    executor.setCorePoolSize(2);
	    executor.setMaxPoolSize(2);
	    executor.setQueueCapacity(500);
	    executor.setThreadNamePrefix("Netkiller -");
	    executor.initialize();
	    return executor;
	}

	@Bean("thread")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置核心线程数
        executor.setCorePoolSize(5);
        // 设置最大线程数
        executor.setMaxPoolSize(10);
        // 设置队列容量
        executor.setQueueCapacity(20);
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(60);
        // 设置线程名称
        executor.setThreadNamePrefix("hello-");
        // 设置拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);

        return executor;
    }

}			
        
		

设置线程池参数

37.6.1. 最简单的配置

        
@SpringBootApplication
@EnableAsync
public class Application {
public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
}
}
        
			

        
@Component
public class Task {

	@Async
	public void doTaskOne() throws Exception {
	    // 业务逻辑
	}
	
	@Async("asyncExecutor")
	public void doTaskTwo() throws Exception {
	    // 业务逻辑
	}
	
	@Async("thread")
	public void doTaskThree() throws Exception {
	    // 业务逻辑
	}

}			
        
			

注意:@Async 不会用到刚刚定义的线程池,@Async("asyncExecutor"),@Async("thread") 会正确调用

37.6.2. 队列

线程池能接受多少队列?

下面配置是 executor.setQueueCapacity(10); 也就是 10个,但是实测结果跟你想的不同

			
package cn.netkiller.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync
public class ThreadPoolTaskExecutorConfiguration {
    @Bean("asyncExecutor")
    public ThreadPoolTaskExecutor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadGroupName("job");
        executor.setThreadNamePrefix("async-job-");
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(10);
        executor.setKeepAliveSeconds(60);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        executor.setAwaitTerminationSeconds(60);
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }
}
			
			

实测结果是,首次执行可以容纳 20 个线程,20个线程执行完毕之后,再添加任务,就只接受 10 个,超过的部分会跑出异常

			
 Executor [java.util.concurrent.ThreadPoolExecutor@7e729046[Running, pool size = 10, active threads = 10, queued tasks = 10, completed tasks = 0]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$1775/0x0000000801b6afb0@20eaccc2			
			
			

这是因为线程池可以容纳 10 个任务,队列还能排队 10 个任务。

			
			
			
			

37.6.3. 定义多个线程池

        
package cn.netkiller.wallet.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;

@Configuration
@EnableAsync
public class ExecutorConfiguration {
	/** Set the ThreadPoolExecutor's core pool size. */
	private int corePoolSize = 10;
	/** Set the ThreadPoolExecutor's maximum pool size. */
	private int maxPoolSize = 200;
	/** Set the capacity for the ThreadPoolExecutor's BlockingQueue. */
	private int queueCapacity = 10;
	
	@Bean
	public Executor OneAsync() {
	    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
	    executor.setCorePoolSize(corePoolSize);
	    executor.setMaxPoolSize(maxPoolSize);
	    executor.setQueueCapacity(queueCapacity);
	    executor.setThreadNamePrefix("MySimpleExecutor-");
	    executor.initialize();
	    return executor;
	}
	
	@Bean
	public Executor TwoAsync() {
	    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
	    executor.setCorePoolSize(corePoolSize);
	    executor.setMaxPoolSize(maxPoolSize);
	    executor.setQueueCapacity(queueCapacity);
	    executor.setThreadNamePrefix("MyExecutor-");
	
	    // rejection-policy:当pool已经达到max size的时候,如何处理新任务
	    // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
	    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
	    executor.initialize();
	    return executor;
	}

}
        
        
			
        
@Service
public class DemoAsyncServiceImpl implements DemoAsyncService {

	public static Random random =new Random();
	
	@Async("OneAsync")
	public Future<String> doTaskOne() throws Exception {
	    System.out.println("开始做任务一");
	    long start = System.currentTimeMillis();
	    Thread.sleep(random.nextInt(10000));
	    long end = System.currentTimeMillis();
	    System.out.println("完成任务一,耗时:" + (end - start) + "毫秒");
	    return new AsyncResult<>("任务一完成");
	}
	
	@Async("TwoAsync")
	public Future<String> doTaskTwo() throws Exception {
	    System.out.println("开始做任务二");
	    long start = System.currentTimeMillis();
	    Thread.sleep(random.nextInt(10000));
	    long end = System.currentTimeMillis();
	    System.out.println("完成任务二,耗时:" + (end - start) + "毫秒");
	    return new AsyncResult<>("任务二完成");
	}
	
	@Async
	public Future<String> doTaskThree() throws Exception {
	    System.out.println("开始做任务三");
	    long start = System.currentTimeMillis();
	    Thread.sleep(random.nextInt(10000));
	    long end = System.currentTimeMillis();
	    System.out.println("完成任务三,耗时:" + (end - start) + "毫秒");
	    return new AsyncResult<>("任务三完成");
	}

}
        
			

37.6.4. 实现 AsyncConfigurer 接口方式创建自定义连接池

这种方式可以直接使用 @Async

			
package cn.netkiller.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync
public class ThreadPoolTaskExecutorConfiguration implements AsyncConfigurer {

    @Value("${spring.task.execution.pool.core-size}")
    private int corePoolSize;
    @Value("${spring.task.execution.pool.max-size}")
    private int maxPoolSize;
    @Value("${spring.task.execution.pool.queue-capacity}")
    private int queueCapacity;
    //    @Value("${spring.task.execution.thread-name-prefix}")
    private final String threadNamePrefix = "async-";
    @Value("${spring.task.execution.pool.keep-alive}")
    private int keepAliveSeconds;

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        //核心线程数
        executor.setCorePoolSize(corePoolSize);
        //任务队列的大小
        executor.setQueueCapacity(queueCapacity);
        //线程前缀名
        executor.setThreadNamePrefix(threadNamePrefix);
        //线程存活时间
        executor.setKeepAliveSeconds(keepAliveSeconds);

        /**
         * 拒绝处理策略
         * CallerRunsPolicy():交由调用方线程运行,比如 main 线程。
         * AbortPolicy():直接抛出异常。
         * DiscardPolicy():直接丢弃。
         * DiscardOldestPolicy():丢弃队列中最老的任务。
         */
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //线程初始化
        executor.initialize();
        return executor;
    }
}
    
			
			

37.6.5. 继承 AsyncConfigurerSupport 创建自定义连接池

			
package cn.netkiller.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurerSupport;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync
public class ThreadPoolTaskExecutorConfiguration extends AsyncConfigurerSupport {

    @Value("${spring.task.execution.pool.core-size}")
    private int corePoolSize;
    @Value("${spring.task.execution.pool.max-size}")
    private int maxPoolSize;
    @Value("${spring.task.execution.pool.queue-capacity}")
    private int queueCapacity;
    //    @Value("${spring.task.execution.thread-name-prefix}")
    private final String threadNamePrefix = "async-";
    @Value("${spring.task.execution.pool.keep-alive}")
    private int keepAliveSeconds;

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        //核心线程数
        executor.setCorePoolSize(corePoolSize);
        //任务队列的大小
        executor.setQueueCapacity(queueCapacity);
        //线程前缀名
        executor.setThreadNamePrefix(threadNamePrefix);
        //线程存活时间
        executor.setKeepAliveSeconds(keepAliveSeconds);

        /**
         * 拒绝处理策略
         * CallerRunsPolicy():交由调用方线程运行,比如 main 线程。
         * AbortPolicy():直接抛出异常。
         * DiscardPolicy():直接丢弃。
         * DiscardOldestPolicy():丢弃队列中最老的任务。
         */
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //线程初始化
        executor.initialize();
        return executor;
    }
}			
			
			

37.6.6. 生产环境完整代码 @Bean 注入方式

这种方式多用在 Spring 2.x 中

			
package cn.netkiller.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync
public class ThreadPoolTaskExecutorConfiguration {

    @Value("${spring.task.execution.pool.core-size}")
    private int corePoolSize;
    @Value("${spring.task.execution.pool.max-size}")
    private int maxPoolSize;
    @Value("${spring.task.execution.pool.queue-capacity}")
    private int queueCapacity;
    //    @Value("${spring.task.execution.thread-name-prefix}")
    private final String threadNamePrefix = "";
    @Value("${spring.task.execution.pool.keep-alive}")
    private int keepAliveSeconds;
    
    @Bean
    public Executor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        //核心线程数
        executor.setCorePoolSize(corePoolSize);
        //任务队列的大小
        executor.setQueueCapacity(queueCapacity);
        //线程前缀名
        executor.setThreadNamePrefix(threadNamePrefix);
        //线程存活时间
        executor.setKeepAliveSeconds(keepAliveSeconds);

        /**
         * 拒绝处理策略
         * CallerRunsPolicy():交由调用方线程运行,比如 main 线程。
         * AbortPolicy():直接抛出异常。
         * DiscardPolicy():直接丢弃。
         * DiscardOldestPolicy():丢弃队列中最老的任务。
         */
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //线程初始化
        executor.initialize();
        return executor;
    }
}			
			
			

注意:使用@Bean方式必须配合 @Async("threadPoolTaskExecutor")

37.6.7. 通过 @Bean 覆盖掉 SimpleAsyncTaskExecutor

自定义连接池之后,系统内会存在两个连接吃 SimpleAsyncTaskExecutor 和 ThreadPoolTaskExecutor

			
   @Bean
    public Executor applicationTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("Netkiller-");
        executor.initialize();
        return executor;
    }