在java并发包下经常会有很多类供我们使用,今天我们就来看看一般常用的并发包类都有哪些吧。
一、ThredPoolExecutor
ThredPoolExecutor这个类是基于命令模式下的一个典型线程池实现,它主要通过一些策略实现一个典型的线程池,目前我们已经知道的策略如:ThreadPoolExecutor.AbortPolicy, ThreadPoolExecutor.CallerRunsPolicy, ThreadPoolExecutor.DiscardOldestPolicy, ThreadPoolExecutor.DiscardPolicy。示例:
package com.yhj.container.concurrent; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @Described:线程池测试 * @author YHJ create at 2012-4-13 下午01:34:03 * @FileNmae com.yhj.container.concurrent.ThreadPoolExecutorTestCase.java */ public class ThreadPoolExecutorTestCase { private AtomicInteger successTask = new AtomicInteger(0); //成功的任务数目 private AtomicInteger failedTask = new AtomicInteger(0); //失败的任务数目 private Integer thredCount; //启动的线程数 private ThreadPoolExecutor executor; private CountDownLatch latch; //计数器 private CyclicBarrier cyclicBarrier; //集合点 //构造函数 public ThreadPoolExecutorTestCase(BlockingQueue < Runnable > queue, Integer thredCount) { super(); System.out.println("queue name:" + queue.getClass()); this.thredCount = thredCount; executor = new ThreadPoolExecutor(10, 500, 30, TimeUnit.SECONDS, queue, Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); } //要处理的任务列表 class Task implements Runnable { private CountDownLatch latch; //计数器 private CyclicBarrier cyclicBarrier; //集合点 public Task(CountDownLatch latch, CyclicBarrier cyclicBarrier) { super(); this.latch = latch; this.cyclicBarrier = cyclicBarrier; } @Override public void run() { try { cyclicBarrier.await(); //到达预期集合点再执行 } catch (Exception e) { e.printStackTrace(); } try { executor.execute(new Runnable() { @Override public void run() { try { Thread.sleep(3000); //休眠3秒 } catch (Exception e) { e.printStackTrace(); } latch.countDown(); successTask.incrementAndGet(); } }); } catch (RejectedExecutionException e) { latch.countDown(); failedTask.incrementAndGet(); } } } //初始化 public void init() { latch = new CountDownLatch(thredCount); cyclicBarrier = new CyclicBarrier(thredCount); } //启动方法 public void start() { long startTime = System.currentTimeMillis(); for (int i = 0; i < thredCount; ++i) new Thread(new Task(latch, cyclicBarrier)) .start(); try { latch.await(); executor.shutdownNow(); System.out.println("total time:" + (System.currentTimeMillis() - startTime)); System.out.println("success count:" + successTask.intValue()); System.out.println("failed count:" + failedTask.intValue()); System.out.println("===end==="); } catch (Exception e) { e.printStackTrace(); } } //强制关闭方法 public void shutDonw() { executor.shutdownNow(); } //主函数 public static void main(String[] args) { //性能优先 速度优先 ThreadPoolExecutorTestCase testCase = new ThreadPoolExecutorTestCase(new SynchronousQueue < Runnable > (), 1000); testCase.init(); testCase.start(); //稳定优先 使用数组缓存队列 testCase = new ThreadPoolExecutorTestCase(new ArrayBlockingQueue < Runnable > (10), 1000); testCase.init(); testCase.start(); //稳定优先 使用链表缓存队列 testCase = new ThreadPoolExecutorTestCase(new LinkedBlockingDeque < Runnable > (10), 1000); testCase.init(); testCase.start(); //关掉处理器 //testCase.shutDonw(); } }
二、FutureTask
FutureTaska它是用户异步获取数据的其中一种方法,我们知道,使用ConcurrentHashMap可以代替HashMap来提升map的性能,然ConcurrentHashMap在进行读操作时一般是不加锁的,所以我们来假设一个例子,如:现有一个数据库的连接池,默认为不初始化,但在用户第一次用到的时候会进行初始化操作。
示例:
package com.yhj.container.concurrent; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.FutureTask; import java.util.concurrent.locks.ReentrantLock; /** * @Described:异步通知测试用例 * @author YHJ create at 2012-4-14 上午11:31:26 * @FileNmae com.yhj.container.concurrent.FutureTaskTestCase.java */ public class FutureTaskTestCase { //测试需求: 使用一个key-value 形式存储 //测试要求: 所有的连接池对象只能创建一次 且不能加载时初始化 在第一次访问时初始化目标连接池对象 //测试实现: 通过HashMap加锁实现和FutureTask实现 //Map测试任务 用例 interface MapTask { //根据指定的key 获取指定的DB连接 key etc:mysql sqlserver oracle DB2 and so on public Connection getConnection(String key); } //枚举 数据库类型 enum DB_TYPE { MYSQL(), SQLSERVR, ORACLE, DB2; } //使用HashMap加锁实现 class HashMapWithLock implements MapTask { private Map < String, DBConnectionPool > pools = new HashMap < String, DBConnectionPool > (); private ReentrantLock lock = new ReentrantLock(); //加锁获取连接对象,防止高并发下数据重复创建 @Override public Connection getConnection(String key) { try { lock.lock(); //锁定操作,后续再来等待 if (!pools.containsKey(key)) pools.put(key, new DBConnectionPool(key)); return pools.get(key) .getConnection(); } finally { lock.unlock(); //解锁操作 } } } //使用ConcurrentHashMap实现,因为ConcurrentHashMap读取时不加锁,因此需要通过回调的方式控制并发 class ConcurrentHashMapWithFutureTask implements MapTask { private ConcurrentHashMap < String, FutureTask < DBConnectionPool >> pools = new ConcurrentHashMap < String, FutureTask < DBConnectionPool >> (); private FutureTask < DBConnectionPool > futureTask; //通过回调的方式 确保多线程下不会引发多次创建 @Override public Connection getConnection(final String key) { try { if (!pools.containsKey(key)) { Callable < DBConnectionPool > callable = new Callable < DBConnectionPool > () { @Override public DBConnectionPool call() throws Exception { pools.put(key, futureTask); return new DBConnectionPool(key); } }; FutureTask < DBConnectionPool > tmpTask = new FutureTask < DBConnectionPool > (callable); futureTask = pools.putIfAbsent(key, tmpTask); if (futureTask == null) { futureTask = tmpTask; futureTask.run(); } } return pools.get(key) .get() .getConnection(); } catch (Exception e) { e.printStackTrace(); return null; } } } //DB连接池 测试用例供体 class DBConnectionPool { public DBConnectionPool(String key) { System.out.println("创建了" + key + "类型的数据库连接池"); } //获取DB连接 public Connection getConnection() { // create Connection for db return new Connection(); } } //DB连接 测试供体 class Connection {} //任务执行器 待执行的任务 class ExecutorTask implements Runnable { private CyclicBarrier barrier; //计数器 private CountDownLatch latch; //集合点 private MapTask task; //待执行的任务 private String key; public ExecutorTask(String key, CyclicBarrier barrier, CountDownLatch latch, MapTask task) { this.barrier = barrier; this.latch = latch; this.task = task; this.key = key; } @Override public void run() { try { barrier.await(); //到达集合点之前等待 确保数据是并发执行的 Connection connection = task.getConnection(key); if (null == connection) throw new NullPointerException("Null Connection Exception with " + key); latch.countDown(); } catch (Exception e) { e.printStackTrace(); } } } //执行函数 public void execute(String key, int thredCount, MapTask task) { CyclicBarrier barrier = new CyclicBarrier(thredCount); CountDownLatch latch = new CountDownLatch(thredCount); long beginTime = System.currentTimeMillis(); System.out.println("===start " + task.getClass() + "==="); for (int i = 0; i < thredCount; ++i) { new Thread(new ExecutorTask(key, barrier, latch, task)) .start(); } try { latch.await(); System.out.println("====end " + task.getClass() + " still time " + (System.currentTimeMillis() - beginTime) + "==="); } catch (InterruptedException e1) { throw new RuntimeException(e1); } } //启动函数 public void start() { int thredCount = 200; MapTask hashMapWithLock = new HashMapWithLock(); MapTask concurrentHashMapWithFutureTask = new ConcurrentHashMapWithFutureTask(); execute("mysql", thredCount, hashMapWithLock); execute("sqlserver", thredCount, concurrentHashMapWithFutureTask); } //主函数 public static void main(String[] args) { //启动主进程 new FutureTaskTestCase() .start(); //等待所有进程结束 while (Thread.activeCount() > 1) { Thread.yield(); } } }
结果:
我们可以看到对象值已经new了一次,因为是异步的,所以它通过回调的方式速度慢了许多,但在多线程模式下,我们就可以轻松规避单点访问推积过大的问题了。
三、Executors
线程池在java中我们经常会用到,数据库总是会需要它的。但,虽然用线程池解决并发操作非常好用,但它却有点麻烦,所以JDK为我们提供了Executors来方便我们高效率的new出ThredPoolExecutor,下面我们就来看看它都有哪些方法能够高效率的new出实例吧。
以上就是关于并发包下常用类的所以内容了,由于并发包下类数量繁多,这里只是举例了一些常用的类,如果小伙伴们还想了解更多java架构师知识的话,就请一直持续关注我们的网站吧。
推荐阅读: