English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

Detailed explanation and example code of Java thread pool

Technical background of thread pool

In object-oriented programming, creating and destroying objects is very time-consuming because creating an object requires obtaining memory resources or other more resources. In Java, it is even more so, as the virtual machine will try to track each object so that it can perform garbage collection after the object is destroyed.

Therefore, one way to improve the efficiency of service programs is to minimize the number of times objects are created and destroyed, especially for object creation and destruction that are very resource-intensive. How to use existing objects to serve is a key problem that needs to be solved, which is actually the reason for the emergence of some 'pooling resource' technologies.

For example, many common components often seen in Android generally cannot do without the concept of 'pool', such as various image loading libraries, network request libraries. Even in the message passing mechanism of Android, when Message.obtain() is used, it is using an object from the Message pool, so this concept is very important. The thread pool technology introduced in this article also conforms to this idea.

The advantages of thread pool:

1.Reuse the threads in the thread pool, reducing the performance overhead caused by object creation and destruction;

2.Can effectively control the maximum concurrent number of threads, improve the utilization of system resources, and at the same time avoid excessive resource competition, avoid blocking;

3.Can manage multi-threading, making thread usage simple and efficient.

Thread pool framework Executor

Java's thread pool is implemented through the Executor framework, which includes classes: Executor, Executors, ExecutorService, ThreadPoolExecutor, Callable, and Future, FutureTask usage, etc.

Executor: The interface of all thread pools, with only one method.

public interface Executor {  
 void execute(Runnable command);  
}

ExecutorService: Enhances the behavior of Executor, which is the most direct interface of Executor implementation class.

Executors: Provides a series of factory methods for creating thread pools, and the returned thread pools all implement the ExecutorService interface.

ThreadPoolExecutor: The specific implementation class of thread pool, generally, all kinds of thread pools used are based on this class implementation. The constructor is as follows:

public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

corePoolSize: The number of core threads in the thread pool, the number of threads running in the thread pool will never exceed corePoolSize, and by default, they can survive indefinitely. It can be set to allowCoreThreadTimeOut as True, at which point the core thread count is 0, and at this point, keepAliveTime controls the timeout time for all threads.

maximumPoolSize: The maximum number of threads that the thread pool is allowed to have;

keepAliveTime: Refers to the timeout time for the idle thread to end;

unit: An enumeration representing the unit of keepAliveTime;

workQueue: Represents the BlockingQueue<Runnable> queue for storing tasks.

BlockingQueue: The BlockingQueue (BlockingQueue) is a major tool under java.util.concurrent used for thread synchronization control. If the BlockQueue is empty, the operation of taking things from the BlockingQueue will be blocked and enter a waiting state until something is added to the BlockingQueue, at which point it will be awakened. Similarly, if the BlockingQueue is full, any operation attempting to add things to it will also be blocked and enter a waiting state until there is space in the BlockingQueue, at which point it will continue to operate. Blocking queues are commonly used in producer-consumer scenarios, where producers are threads that add elements to the queue, and consumers are threads that take elements from the queue. The blocking queue is the container for elements stored by producers, and consumers only take elements from the container. Specific implementation classes include LinkedBlockingQueue, ArrayBlockingQueue, and others. Generally, the internal implementation is through Lock and Condition (the study and use of explicit lock (Lock) and Condition).

The working process of the thread pool is as follows:

When the thread pool is just created, there is no thread inside. The task queue is passed in as a parameter. However, even if there are tasks in the queue, the thread pool will not execute them immediately.

When the execute() method is called to add a task, the thread pool will make the following judgment:

If the number of running threads is less than corePoolSize, create a thread immediately to run this task;

If the number of running threads is greater than or equal to corePoolSize, put this task into the queue;

If the queue is full at this time and the number of running threads is less than maximumPoolSize, it is still necessary to create a non-core thread to run this task immediately;

If the queue is full and the number of running threads is greater than or equal to maximumPoolSize, then the thread pool will throw an exception RejectExecutionException.

When a thread completes a task, it will take the next task from the queue to execute.

When a thread has nothing to do for a certain period of time (keepAliveTime), the thread pool will judge that if the number of currently running threads is greater than corePoolSize, then this thread will be shut down. Therefore, after all tasks in the thread pool are completed, it will eventually shrink to the size of corePoolSize.

Thread pool creation and usage

The thread pool is created using the static method of the utility class Executors, and the following are several common thread pool types.

SingleThreadExecutor: A single background thread (whose buffer queue is unbounded)

public static ExecutorService newSingleThreadExecutor() {  
 return new FinalizableDelegatedExecutorService (
  new ThreadPoolExecutor(1, 1,         
  0L, TimeUnit.MILLISECONDS;         
  new LinkedBlockingQueue<Runnable>())); 
}

Create a single-threaded thread pool. This thread pool has only one core thread working, which is equivalent to serially executing all tasks in a single thread. If this unique thread ends due to an exception, a new thread will replace it. This thread pool guarantees that all tasks are executed in the order of task submission.

FixedThreadPool: A thread pool with only core threads, fixed size (its buffer queue is unbounded).

public static ExecutorService newFixedThreadPool(int nThreads) {        
        return new ThreadPoolExecutor(nThreads, nThreads,                                      
            0L, TimeUnit.MILLISECONDS;                                        
            new LinkedBlockingQueue<Runnable>());    
}
Create a thread pool with a fixed size. A new thread is created each time a task is submitted, until the thread pool reaches its maximum size. Once the size of the thread pool reaches the maximum value, it remains unchanged. If a thread ends due to an execution exception, the thread pool will add a new thread to replace it.

CachedThreadPool: An unbounded thread pool that can automatically recycle threads.

public static ExecutorService newCachedThreadPool() {   
 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,           
   60L, TimeUnit.SECONDS;          
   new SynchronousQueue<Runnable>());  
}

If the size of the thread pool exceeds the number of threads required to process tasks, some idle (6A thread that does not execute tasks for 0 seconds) when the number of tasks increases, this thread pool can also intelligently add new threads to handle tasks. This thread pool does not limit the size of the thread pool, and the size of the thread pool is completely dependent on the maximum number of threads that the operating system (or JVM) can create. SynchronousQueue is a buffer with1of the blocking queue.

ScheduledThreadPool: A fixed-size thread pool with an unbounded number of threads. This thread pool supports the needs for timing and periodic execution of tasks.

public static ExecutorService newScheduledThreadPool(int corePoolSize) {   
 return new ScheduledThreadPool(corePoolSize, 
    Integer.MAX_VALUE;             
    DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS;             
    new DelayedWorkQueue()); 
}

Create a thread pool that executes tasks periodically. If idle, non-core thread pool will be recycled within DEFAULT_KEEPALIVEMILLIS time.

There are two commonly used methods for submitting tasks in thread pool:

execute:

ExecutorService.execute(Runnable runable);

submit:

FutureTask task = ExecutorService.submit(Runnable runnable);
FutureTask<T> task = ExecutorService.submit(Runnable runnable, T Result);

FutureTask<T> task = ExecutorService.submit(Callable<T> callable);

The implementation of submit(Callable callable) is the same as submit(Runnable runnable).

public <T> Future<T> submit(Callable<T> task) {
 if (task == null) throw new NullPointerException();
 FutureTask<T> ftask = newTaskFor(task);
 execute(ftask);
 return ftask;
}

It can be seen that submit starts a task with a return result, which returns a FutureTask object, so that the result can be obtained through the get() method. submit ultimately calls execute(Runnable runable) as well, submit just wraps a Callable object or Runnable into a FutureTask object, because FutureTask is a Runnable, so it can be executed in execute. For how to wrap Callable object and Runnable into FutureTask object, see the usage of Callable and Future, FutureTask.

The principle of thread pool implementation

If only the use of thread pool is discussed, this blog does not have much value, at most it is the process of getting familiar with the Executor related API. The implementation process of thread pool does not use the Synchronized keyword, and uses Volatile, Lock, synchronous (blocking) queue, Atomic related classes, FutureTask, and so on, because the latter has better performance. The process of understanding can learn the concurrent control ideas in the source code very well.

As mentioned at the beginning, the advantages of thread pools can be summarized into the following three points:

Thread reuse

Control maximum concurrency

Manage threads

1. Thread reuse process

To understand the principle of thread reuse, it is first necessary to understand the thread lifecycle.

During the lifecycle of a thread, it must go through the following states: New, Runnable, Running, Blocked, and Dead5This state.

Thread creates a new thread through new, which is the initialization of some thread information, such as thread name, id, thread group, etc., which can be considered as a normal object. After calling Thread's start(), the Java virtual machine will create a method call stack and program counter for it, and set hasBeenStarted to true. After that, an exception will occur when calling the start method.

Threads in this state have not started running; they just indicate that the thread is ready to run. As to when the thread starts running, it depends on the thread scheduler in the JVM. When the thread gets the CPU, the run() method will be called. Do not call the Thread's run() method yourself. After that, according to CPU scheduling, it will switch between ready, running, and blocked until the run() method ends or the thread is stopped in some other way, entering the dead state.

Therefore, the principle of implementing thread reuse should be to keep the thread in a state of survival (ready, running, or blocked). Next, let's see how ThreadPoolExecutor implements thread reuse.

In the main Worker class of ThreadPoolExecutor, thread reuse is controlled. Let's take a look at the simplified code of Worker class, which is convenient for understanding:

private final class Worker implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
while (task != null || (task = getTask()) != null){
task.run();
}
}

Worker is a Runnable that also has a thread, which is the thread to be started. A new Thread object is created at the same time as a new Worker object is created, and the Worker itself is passed as a parameter to TThread. This way, when the start() method of the Thread is called, the run() method of the Worker is actually executed, followed by runWorker(). There is a while loop in runWorker(), which continuously gets Runnable objects from getTask() and executes them sequentially. How does getTask() get the Runnable objects?

It is still the simplified code:

private Runnable getTask() {
 if (some special cases) {
  return null;
 }
Runnable r = workQueue.take();
return r;
}

This workQueue is the BlockingQueue used to store tasks when initializing ThreadPoolExecutor, and all the objects stored in this queue are the Runnable tasks that are about to be executed. Since BlockingQueue is a blocking queue, if BlockingQueue.take() returns an empty object, it will enter a waiting state until a new object is added to BlockingQueue, at which point the blocked thread will be woken up. Therefore, in general, the run() method of the Thread will not end, but it will continuously execute Runnable tasks from the workQueue, thus achieving the principle of thread reuse.

2.Control the maximum concurrency level

When is the Runnable object added to the workQueue? When is the Worker created, and when is the start() method of the Thread in the Worker called to start a new thread to execute the run() method of the Worker?

It is easy to think that some tasks are performed when execute(Runnable runnable) is called. Let's see how it is done inside execute().

execute:

Simplified code

public void execute(Runnable command) {
 if (command == null)
  throw new NullPointerException();
int c = ctl.get();
// Current thread count < corePoolSize
if (workerCountOf(c) < corePoolSize) {
// Start a new thread directly.
if (addWorker(command, true))
return;
c = ctl.get();
}
// The number of active threads is greater than or equal to corePoolSize
// runState is RUNNING and the queue is not full
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// Recheck whether it is in RUNNING state
// In non-RUNNING state, remove the task from workQueue and refuse
if (!isRunning(recheck) && remove(command))
reject(command);// Refuse tasks using the strategy specified by the thread pool
// Two situations:
// 1. Refuse new tasks in non-RUNNING state
// 2. Queue is full and the failure to start a new thread (workCount > maximumPoolSize)
} else if (!addWorker(command, false))
reject(command);
}

addWorker:

Simplified code

private boolean addWorker(Runnable firstTask, boolean core) {
int wc = workerCountOf(c);
if (wc >= (core63; corePoolSize : maximumPoolSize)) {
return false;
}
w = new Worker(firstTask);
final Thread t = w.thread;
t.start();
}

Let's look at the situation of adding tasks in the thread pool working process mentioned above according to the code:

* If the number of running threads is less than corePoolSize, create a thread immediately to run this task;  
* If the number of running threads is greater than or equal to corePoolSize, put this task into the queue;
* If the queue is full at this time and the number of running threads is less than maximumPoolSize, it is still necessary to create a non-core thread to run this task immediately;
* If the queue is full and the number of running threads is greater than or equal to maximumPoolSize, then the thread pool will throw an exception RejectExecutionException.

This is the reason why Android's AsyncTask throws RejectExecutionException when executing in parallel and exceeds the maximum number of tasks, see the source code analysis of the latest version of AsyncTask and the dark side of AsyncTask.

If the new thread is successfully created through addWorker, then the new thread is started through start(), and the firstTask is set as the first task to be executed in the run() of this Worker.

Although each Worker's task is processed in a serial manner, if multiple Workers are created, because they share a workQueue, they will process in parallel.

Therefore, control the maximum number of concurrent threads according to corePoolSize and maximumPoolSize. The general process can be represented by the following diagram.

The above explanation and diagram can help you understand this process very well.

If you are doing Android development and are familiar with the principles of Handler, you may find this diagram quite familiar. Some processes in it are very similar to those in Handler, Looper, and Message usage. Handler.send(Message) is equivalent to execute(Runnable), and the Message queue maintained by Looper is equivalent to BlockingQueue, but it needs to be maintained by synchronization. The loop() function in Looper iteratively retrieves Message from the Message queue, and the runWork() in Worker continuously retrieves Runnable from the BlockingQueue, which is the same principle.

3.Manage threads

The thread pool can manage the reuse of threads, control the number of concurrent threads, and destroy the process very well. The reuse of threads and control of concurrent threads have been discussed, and the management process is interwoven among them, which is also easy to understand.

There is an AtomicInteger variable called ctl in ThreadPoolExecutor. Through this variable, two contents are saved:

the total number of threads, the state of each thread, and the low part29bit storage for the number of threads, high3bit storage for runState, using bitwise operations to obtain different values.

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//get the state of the thread
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
//Get the number of Workers
private static int workerCountOf(int c) {
return c & CAPACITY;
}
// Determine if the thread is running
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

This section mainly analyzes the shutdown process of the thread pool through shutdown and shutdownNow(). First, the thread pool has five states to control task addition and execution. Mainly introduce the following three:

RUNNING state: The thread pool runs normally, can accept new tasks, and process tasks in the queue;

SHUTDOWN state: No longer accept new tasks, but execute tasks in the queue;

STOP state: No longer accept new tasks, do not process tasks in the queue. The shutdown method will set the runState to SHUTDOWN, terminate all idle threads, and the threads that are still working will not be affected, so the tasks in the queue will not be executed.

The shutdownNow method sets the runState to STOP. The difference from the shutdown method is that this method will terminate all threads, so the tasks in the queue will not be executed.

Summary
Through the analysis of the ThreadPoolExecutor source code, we have a general understanding of the process of thread pool creation, task addition, and execution. Familiarity with these processes will make it easier to use thread pools.

What we have learned about concurrency control and the use of the producer-consumer model task processing will be of great help in understanding or solving other related issues in the future. For example, the Handler mechanism in Android, and the use of a BlookQueue to process the Messager queue in Looper is also acceptable. This is the benefit of reading the source code.

This is the summary of the materials for Java thread pool, and we will continue to supplement relevant materials in the future. Thank you all for your support to this site!

You May Also Like