Halo
发布于 2022-05-13 / 117 阅读 / 0 评论 / 0 点赞

java线程池

构建

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime, 
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler ) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

corePoolSize

线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

maximumPoolSize

线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;

keepAliveTime

线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间;默认情况下,该参数只在线程数大于corePoolSize时才有用;

unit

keepAliveTime的单位;

workQueue

用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口

在JDK中提供了如下阻塞队列:

  • ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
  • LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
  • SynchronousQuene:没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素,吞吐量通常要高于LinkedBlockingQuene;
  • priorityBlockingQuene:具有优先级的无界阻塞队列;

threadFactory

指定创建线程的工厂, 可以不指定

handler

表示当 workQueue 已满,且池中的线程数达到 maximumPoolSize 时,线程池拒绝添加新任务时采取的策略。可以不指定.

ThreadPoolExecutor 状态

  • RUNNING 运行态,可处理新任务并执行队列中的任务
  • SHUTDOW 关闭态,不接受新任务,但处理队列中的任务
  • STOP 停止态,不接受新任务,不处理队列中任务,且打断运行中任务
  • TIDYING 整理态,所有任务已经结束,workerCount = 0 ,将执行terminated()方法
  • TERMINATED 结束态,terminated() 方法已完成

预定义线程池

FixedThreadPool

It is the best fit for most off the real-life use-cases.
可用于Web服务瞬时削峰,但需注意长时间持续高峰情况造成的队列阻塞。

使用

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);

底层

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

CachedThreadPool

DO NOT use this thread pool if tasks are long-running
快速处理大量耗时较短的任务,如Netty的NIO接受请求时,可使用CachedThreadPool

使用

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();

底层

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                              60L, TimeUnit.SECONDS,
                              new SynchronousQueue<Runnable>());
}

ScheduledThreadPool

run after a given delay, or to execute periodically
使用

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newScheduledThreadPool(10);

底层

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

SingleThreadExecutor

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newSingleThreadExecutor();

newWorkStealingPool

Creates a thread pool that maintains enough threads to support the given parallelism level. Here parallelism level means the maximum number of threads which will be used to execute a given task, at a single point of time, in multi-processor machines.

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newWorkStealingPool(4);

demo

  1. Task class
 
import java.util.concurrent.TimeUnit;
 
public class Task implements Runnable {
    private String name;
 
    public Task(String name) {
        this.name = name;
    }
 
    public String getName() {
        return name;
    }
 
    public void run() {
        try {
            Long duration = (long) (Math.random() * 10);
            System.out.println("Executing : " + name);
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  1. run task in threadpool
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
 
public class ThreadPoolExample 
{
    public static void main(String[] args) 
    {
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
         
        for (int i = 1; i <= 5; i++) 
        {
            Task task = new Task("Task " + i);
            System.out.println("Created : " + task.getName());
 
            executor.execute(task);
        }
        executor.shutdown();
    }
}

more

https://howtodoinjava.com/java/multi-threading/java-thread-pool-executor-example/


评论