java并发编程中的线程池技术和线程隔离技术的简单示例以及线程池参数的说明整理

作者: admin 分类: JAVA 发布时间: 2019-06-25 12:21  阅读: 98 views

并发编程中,经常会听到一些概念,池化技术,线程隔离什么的。具体指什么呢?刚开始我这个假程序员是完全听不懂的。”你把这里池化吧?”,what?

后来在实际工作中和书籍中看到,逐渐有了些概念。一些场景需要开启多个线程,频繁的创建新线程是一种消耗,如果使用线程池的话,可以有需要的时候拿来用,不需要的话放回去。一些场景会有多个线程访问同一个bean对象,但是bean中的属性各有不同,为了避免各个线程拿到错误的bean对象数据,就需要使用线程隔离技术了。

创建线程池主要包含以下几个参数,这个方法有多个重载,根据需要使用。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler)
参数 参数 参数说明
核心线程数 corePoolSize 核心线程会一直存活,当线程数小于核心线程数时,有新任务优先创建新线程,allowCoreThreadTimeout=true会关闭超时线程
最大线程数 maxPoolSize 线程数大于corePoolSize会创建线程。线程数大于maxPoolSize会抛出异常。
线程空闲时间 keepAliveTime 线程空闲时间大于该值,则退出;保持corePoolSize的数量。如果超时时间为true. 线程数会等待为0
工作队列 workqueue 所有的blockqueue都可以用于保持和传输任务。
时间单位 unit 超时时间单位
任务拒绝处理器 handler 当 Executor 已经关闭,并且 Executor 将有限边界用于最大线程和工作队列容量,且已经饱和时,在方法 execute(java.lang.Runnable)中提交的新任务将被 拒绝。

其中工作队列有三种策略

直接提交:工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

无界队列:使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

有界队列:当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

 

其中任务拒绝策略有四种

abortPolicy 
-在默认的 ThreadPoolExecutor.AbortPolicy 中,处理程序遭到拒绝将抛出运行时 RejectedExecutionException。
CallerRunsPolicy
-在ThreadPoolExecutor.CallerRunsPolicy 中,线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
DiscardPolicy
-在ThreadPoolExecutor.DiscardPolicy 中,不能执行的任务将被删除。
DiscardOldestPolicy
-在ThreadPoolExecutor.DiscardOldestPolicy 中,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
线程池的运行逻辑如下:
1.当线程数小于核心线程数时,则创建线程。
2.当线程数大于核心线程数时,且任务队列未满时,将任务放入队列。
3.当线程数大于核心线程数时,且任务队列已满。
4.如果线程数小于最大线程数,则创建线程。
5.如果线程数大于最大线程数,则执行任务决绝处理,抛出异常。

示例代码如下:

线程池

package com.chl.thread.pool;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy;
import java.util.concurrent.TimeUnit;

/**
 * 线程池执行器ThreadPoolExecutor,可自定义线程池,构造方法有多个重载,按需处理 <br>
 * 这里目的是创建,最优结构的线程池创建及代码结构,请根据实际情况设计
 * 
 * @author chenhailong
 */
public class ThreadPoolExecutorTest {
	/**
	 * 核心线程数
	 */
	static final Integer ThreadCoreNums = 20;
	/**
	 * 最大线程数
	 */
	static final Integer ThreadMaxNums = 50;
	/**
	 * 线程存活时间
	 */
	static final long LiveTime = 60;
	/**
	 * 队列存放长度
	 */
	static final Integer QueueSize = 1000;

	/**
	 * 存储队列结构
	 * 直接提交工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。
	 * 直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
	 * 
	 * 无界队列使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。
	 * (因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
	 * 
	 * 有界队列当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:
	 * 使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。
	 * 使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
	 */
	static final BlockingQueue<Runnable> bq = new LinkedBlockingQueue<Runnable>();

	/**
	 * 任务拒绝策略 4种
	 * abortPolicy在默认的 ThreadPoolExecutor.AbortPolicy 中,处理程序遭到拒绝将抛出运行时 RejectedExecutionException。
	 * CallerRunsPolicy在 ThreadPoolExecutor.CallerRunsPolicy 中,线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
	 * DiscardPolicy在 ThreadPoolExecutor.DiscardPolicy 中,不能执行的任务将被删除。
	 * DiscardOldestPolicy在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
	 */
	static final DiscardPolicy dp = new ThreadPoolExecutor.DiscardPolicy();

	public static void main(String[] args) {

		ThreadPoolExecutor pool = new ThreadPoolExecutor(ThreadCoreNums, ThreadMaxNums, LiveTime, TimeUnit.SECONDS, bq,
				dp);

		pool.execute(() -> {
			System.out.println("线程池中处理线程业务!");
		});
		
		Runtime.getRuntime().addShutdownHook(new Thread() {
			public void run() {
				try {
					//增加钩子函数,在系统关闭前关闭线程池
					pool.shutdown(); 
				} catch (Throwable e) {
				}
			}
		});
	}

}

线程隔离

package com.chl.thread.pool;

import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 利用concurrentHashMap达到线程隔离目的
 * 
 * @author chenhailong
 */
public class ThreadIsolateTest {

	public static void main(String[] args) {
		ExecutorWrong ew = new ExecutorWrong();

		// //错误现象演示
		// for (int i = 0; i < 10; i++) {
		// new Thread(() -> {
		// try {
		// //初始化对象
		// Properties pr = new Properties();
		// pr.setId(new Random().nextInt(100));
		// pr.setName(UUID.randomUUID().toString());
		//
		// ew.set(pr);
		// //Thread.sleep(1 * 1000);//逻辑处理时间越长,出错率越高
		// System.out.println("wrong===="+ Thread.currentThread().getName() +"-before:
		// "+ pr +"-after:" + ew.get());
		// } catch (Exception e) {
		// e.printStackTrace();
		// }
		//
		// }).start();
		// }
		//
		// try {
		// Thread.sleep(3 * 1000);
		// } catch (InterruptedException e1) {
		// e1.printStackTrace();
		// }
		System.out.println("隔离线输出-------------");

		// 隔离现象演示
		for (int i = 0; i < 10; i++) {
			new Thread(() -> {
				try {
					// 初始化对象
					Properties pr = new Properties();
					pr.setId(new Random().nextInt(100));
					pr.setName(UUID.randomUUID().toString());

					ew.set(pr);
					// Thread.sleep(1 * 1000);//逻辑处理时间越长,出错率越高
					System.out.println(
							"wrong====" + Thread.currentThread().getName() + "-before: " + pr + "-after:" + ew.get());
				} catch (Exception e) {
					e.printStackTrace();
				}

			}).start();
		}
	}

}

/**
 * 测试对象
 */
class Properties {
	private int id;
	private String name;

	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	@Override
	public String toString() {
		return "Properties [id=" + id + ", name=" + name + "]";
	}
}

class ExecutorWrong {
	private Properties pro;

	void set(Properties pro) {
		this.pro = pro;
	}

	Properties get() {
		return pro;
	}

}

class ExecutorRight {
	Map<Thread, Properties> map = new ConcurrentHashMap<Thread, Properties>();

	@SuppressWarnings("unused")
	private Properties pro;

	void set(Properties pro) {
		this.pro = pro;
		map.put(Thread.currentThread(), pro);
	}

	Properties get() {
		return map.get(Thread.currentThread());
	}

}

Executors 工具类

package com.chl.thread.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Executors 线程池工具类
 * 可以创建不同类型的线程池对象,按需选择
 * @author chenhailong
 *
 */
public class ExecutorsTest {

	@SuppressWarnings("unused")
	public static void main(String[] args) {
                //创建一个可根据需要创建新的线程池。以前构造的线程可用时将重用他们。
		ExecutorService es1 = Executors.newCachedThreadPool();
		//固定数量的线程池。
		ExecutorService es2 = Executors.newFixedThreadPool(10);
		//无界队列来运行该线程。
		ExecutorService es3 = Executors.newSingleThreadExecutor();
		//创建一个线程池,可安排在给定延迟后运行命令或者定期执行。
		ExecutorService es4 = Executors.newScheduledThreadPool(10);
		//适合使用在很耗时场景
		ExecutorService es5 = Executors.newWorkStealingPool();

		
	}

}

 


   原创文章,转载请标明本文链接: java并发编程中的线程池技术和线程隔离技术的简单示例以及线程池参数的说明整理

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

发表评论

电子邮件地址不会被公开。 必填项已用*标注

更多阅读