Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏

10.10. java 线程池

10.10.1. newCachedThreadPool

newCachedThreadPool 线程池尺寸没有固定上线

			
	private void startTask(List<String> usersList){
        ExecutorService executor = Executors.newCachedThreadPool();
        executor.submit(()->{
			//do someting
        });
    }
			
			
			
			
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {
            executor.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(5);
                    System.out.println(Thread.currentThread().getName());

                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
        }
        executor.shutdown();			
			
			

10.10.2. 固定线程池(newFixedThreadPool)

			
package cn.netkiller.test.grey;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class GreyTest {

	public GreyTest() {
		// TODO Auto-generated constructor stub
	}

	static class MyThread implements Runnable {
		public void run() {
			System.out.println("Thread Name:" + Thread.currentThread().getName());
		}
	}

	public static void main(String[] args) {
		// 创建五个线程池
		int nThreads = 5;
		ExecutorService pool = Executors.newFixedThreadPool(nThreads);
		// 创建实现了Runnable接口对象
		MyThread t1 = new MyThread();
		MyThread t2 = new MyThread();
		MyThread t3 = new MyThread();
		MyThread t4 = new MyThread();
		MyThread t5 = new MyThread();
		// 将线程放入池中进行执行
		pool.execute(t1);
		pool.execute(t2);
		pool.execute(t3);
		pool.execute(t4);
		pool.execute(t5);
		// 关闭线程池
		pool.shutdown();
	}

}
			
			

提交线程数大于线程池尺寸时会同步等待,然后复用已经处理完的空间线程。

			
        System.out.println(Thread.currentThread());
        AtomicInteger count = new AtomicInteger(1);
        int nThread = Runtime.getRuntime().availableProcessors();
        ExecutorService executor = Executors.newFixedThreadPool(nThread);
        for (int i = 0; i < 100; i++) {
            executor.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(count.getAndIncrement() + " [" + Thread.currentThread().getName() + "] " + System.currentTimeMillis());

                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
        }
        executor.shutdown();			
			
			

CompletableFuture 线程池

			
package cn.netkiller.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Supplier;

public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println(Thread.currentThread());
        ExecutorService executor = Executors.newFixedThreadPool(10);


        Parallel parallel = new Parallel();

        parallel.addAsyncTask(() -> {
                    System.out.println(Thread.currentThread().getName());
                    return "task1";
                }, executor)
                .addAsyncTask(() -> {
                    System.out.println(Thread.currentThread().getName());
                    return "task2";
                }, executor)
                .addAsyncTask(() -> {
                    System.out.println(Thread.currentThread().getName());
                    return "task3";
                }, executor)
                .addAsyncTask(() -> {
                    System.out.println(Thread.currentThread().getName());
                    try {
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "finally";
                }, executor);
        parallel.join();
        List<CompletableFuture<String>> futures = parallel.get();

        futures.stream().forEach(item -> {
            System.out.println(item.getNow("no result"));
        });

        executor.shutdown();
    }

    public static class Parallel<T> {

        private final List<CompletableFuture<T>> futures;

        Parallel() {
            this(10);
        }

        Parallel(int size) {
            futures = new ArrayList<>(size);
        }

        public Parallel addAsyncTask(Supplier<T> supplier) {
            futures.add(CompletableFuture.supplyAsync(supplier));
            return this;
        }

        public Parallel addAsyncTask(Supplier<T> supplier, Executor executor) {
            futures.add(CompletableFuture.supplyAsync(supplier, executor));
            return this;
        }

        public List<CompletableFuture<T>> get() {
            return futures;
        }

        public void join() {
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).join();
        }

        public void clear() {
            futures.clear();
        }
    }
}


			
			

10.10.3. Executors.newScheduledThreadPool

			

@Configuration
public class ScheduleConfig implements SchedulingConfigurer {

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        //当然了,这里设置的线程池是corePoolSize也是很关键了,自己根据业务需求设定
        taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10));
    }

}			
			
			
			
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);

service.schedule(new Task(), 10, TimeUnit.SECONDS);

service.scheduleAtFixedRate(new Task(), 10, 10, TimeUnit.SECONDS);

service.scheduleWithFixedDelay(new Task(), 10, 10, TimeUnit.SECONDS);
			
			
			

10.10.4. SingleThreadExecutor

可以理解为 SingleThreadExecutor = Executors.newFixedThreadPool(1);

			
ExecutorService executor = Executors.newSingleThreadExecutor();
ExecutorService executor = Executors.newSingleThreadScheduledExecutor();			
			
			
			
        System.out.println(Thread.currentThread());
        AtomicInteger count = new AtomicInteger(1);
        int nThread = Runtime.getRuntime().availableProcessors();
        ExecutorService executor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 100; i++) {
            executor.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(count.getAndIncrement() + " [" + Thread.currentThread().getName() + "] " + System.currentTimeMillis());

                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
        }
        executor.shutdown();			
			
			

10.10.5. ExecutorService 正确关闭方法

ExecutorService 功能包括,提交任务、执行任务、关闭线程池。

首先通过 executor.shutdown(); 发送关闭信号,然后再通过 executor.awaitTermination(10, TimeUnit.SECONDS) 设置超时时间,超时抛出异常,最后通过 executor.shutdownNow(); 强制关闭。

			
package cn.netkiller.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Supplier;

public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println(Thread.currentThread());
        ExecutorService executor = Executors.newFixedThreadPool(10);


        Parallel parallel = new Parallel();

        parallel.addAsyncTask(() -> {
                    System.out.println(Thread.currentThread().getName());
                    return "task1";
                }, executor)
                .addAsyncTask(() -> {
                    System.out.println(Thread.currentThread().getName());
                    return "task2";
                }, executor)
                .addAsyncTask(() -> {
                    System.out.println(Thread.currentThread().getName());
                    return "task3";
                }, executor)
                .addAsyncTask(() -> {
                    System.out.println(Thread.currentThread().getName());
                    try {
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "finally";
                }, executor);
        parallel.join();
        List<CompletableFuture<String>> futures = parallel.get();

        futures.stream().forEach(item -> {
            System.out.println(item.getNow("no result"));
        });

        executor.shutdown();

        try {
            executor.shutdown();
            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                // 超时的时候向线程池中所有的线程发出中断(interrupted)。
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            // awaitTermination方法被中断的时候也中止线程池中全部的线程的执行。
            System.out.println("awaitTermination interrupted: " + e);
            executor.shutdownNow();
        }
    }

    public static class Parallel<T> {

        private final List<CompletableFuture<T>> futures;

        Parallel() {
            this(10);
        }

        Parallel(int size) {
            futures = new ArrayList<>(size);
        }

        public Parallel addAsyncTask(Supplier<T> supplier) {
            futures.add(CompletableFuture.supplyAsync(supplier));
            return this;
        }

        public Parallel addAsyncTask(Supplier<T> supplier, Executor executor) {
            futures.add(CompletableFuture.supplyAsync(supplier, executor));
            return this;
        }

        public List<CompletableFuture<T>> get() {
            return futures;
        }

        public void join() {
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).join();
        }

        public void clear() {
            futures.clear();
        }
    }
}

			
			
			

10.10.6. ForkJoinPool / ForkJoinTask