java并发包下的类一般都有哪些?java并发包常用类

TheDisguiser 2020-06-06 13:41:40 java常见问答 5384

在java并发包下经常会有很多类供我们使用,今天我们就来看看一般常用的并发包类都有哪些吧。

一、ThredPoolExecutor

ThredPoolExecutor这个类是基于命令模式下的一个典型线程池实现,它主要通过一些策略实现一个典型的线程池,目前我们已经知道的策略如:ThreadPoolExecutor.AbortPolicy, ThreadPoolExecutor.CallerRunsPolicy, ThreadPoolExecutor.DiscardOldestPolicy, ThreadPoolExecutor.DiscardPolicy。示例:

package com.yhj.container.concurrent;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * @Described:线程池测试
 * @author YHJ create at 2012-4-13 下午01:34:03
 * @FileNmae com.yhj.container.concurrent.ThreadPoolExecutorTestCase.java
 */
public class ThreadPoolExecutorTestCase
{
    private AtomicInteger successTask = new AtomicInteger(0); //成功的任务数目
    private AtomicInteger failedTask = new AtomicInteger(0); //失败的任务数目
    private Integer thredCount; //启动的线程数
    private ThreadPoolExecutor executor;
    private CountDownLatch latch; //计数器
    private CyclicBarrier cyclicBarrier; //集合点
    //构造函数
    public ThreadPoolExecutorTestCase(BlockingQueue < Runnable > queue, Integer thredCount)
    {
        super();
        System.out.println("queue name:" + queue.getClass());
        this.thredCount = thredCount;
        executor = new ThreadPoolExecutor(10, 500, 30, TimeUnit.SECONDS, queue, Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
    }
    //要处理的任务列表
    class Task implements Runnable
    {
        private CountDownLatch latch; //计数器
        private CyclicBarrier cyclicBarrier; //集合点
        public Task(CountDownLatch latch, CyclicBarrier cyclicBarrier)
        {
            super();
            this.latch = latch;
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run()
        {
            try
            {
                cyclicBarrier.await(); //到达预期集合点再执行
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
            try
            {
                executor.execute(new Runnable()
                {
                    @Override
                    public void run()
                    {
                        try
                        {
                            Thread.sleep(3000); //休眠3秒
                        }
                        catch (Exception e)
                        {
                            e.printStackTrace();
                        }
                        latch.countDown();
                        successTask.incrementAndGet();
                    }
                });
            }
            catch (RejectedExecutionException e)
            {
                latch.countDown();
                failedTask.incrementAndGet();
            }
        }
    }
    //初始化
    public void init()
    {
        latch = new CountDownLatch(thredCount);
        cyclicBarrier = new CyclicBarrier(thredCount);
    }
    //启动方法
    public void start()
    {
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < thredCount; ++i)
            new Thread(new Task(latch, cyclicBarrier))
            .start();
        try
        {
            latch.await();
            executor.shutdownNow();
            System.out.println("total time:" + (System.currentTimeMillis() - startTime));
            System.out.println("success count:" + successTask.intValue());
            System.out.println("failed count:" + failedTask.intValue());
            System.out.println("===end===");
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }
    //强制关闭方法
    public void shutDonw()
    {
        executor.shutdownNow();
    }
    //主函数
    public static void main(String[] args)
    {
        //性能优先 速度优先
        ThreadPoolExecutorTestCase testCase = new ThreadPoolExecutorTestCase(new SynchronousQueue < Runnable > (), 1000);
        testCase.init();
        testCase.start();
        //稳定优先  使用数组缓存队列
        testCase = new ThreadPoolExecutorTestCase(new ArrayBlockingQueue < Runnable > (10), 1000);
        testCase.init();
        testCase.start();
        //稳定优先  使用链表缓存队列
        testCase = new ThreadPoolExecutorTestCase(new LinkedBlockingDeque < Runnable > (10), 1000);
        testCase.init();
        testCase.start();
        //关掉处理器
        //testCase.shutDonw();
    }
}

二、FutureTask

FutureTaska它是用户异步获取数据的其中一种方法,我们知道,使用ConcurrentHashMap可以代替HashMap来提升map的性能,然ConcurrentHashMap在进行读操作时一般是不加锁的,所以我们来假设一个例子,如:现有一个数据库的连接池,默认为不初始化,但在用户第一次用到的时候会进行初始化操作。

示例:

package com.yhj.container.concurrent;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.FutureTask;
import java.util.concurrent.locks.ReentrantLock;
/**
 * @Described:异步通知测试用例
 * @author YHJ create at 2012-4-14 上午11:31:26
 * @FileNmae com.yhj.container.concurrent.FutureTaskTestCase.java
 */
public class FutureTaskTestCase
{
    //测试需求: 使用一个key-value 形式存储
    //测试要求: 所有的连接池对象只能创建一次 且不能加载时初始化 在第一次访问时初始化目标连接池对象
    //测试实现: 通过HashMap加锁实现和FutureTask实现
    //Map测试任务 用例
    interface MapTask
    {
        //根据指定的key 获取指定的DB连接  key etc:mysql sqlserver oracle DB2 and so on
        public Connection getConnection(String key);
    }
    //枚举 数据库类型
    enum DB_TYPE
    {
        MYSQL(), SQLSERVR, ORACLE, DB2;
    }
    //使用HashMap加锁实现
    class HashMapWithLock implements MapTask
    {
        private Map < String, DBConnectionPool > pools = new HashMap < String, DBConnectionPool > ();
        private ReentrantLock lock = new ReentrantLock();
        //加锁获取连接对象,防止高并发下数据重复创建
        @Override
        public Connection getConnection(String key)
        {
            try
            {
                lock.lock(); //锁定操作,后续再来等待
                if (!pools.containsKey(key))
                    pools.put(key, new DBConnectionPool(key));
                return pools.get(key)
                    .getConnection();
            }
            finally
            {
                lock.unlock(); //解锁操作
            }
        }
    }
    //使用ConcurrentHashMap实现,因为ConcurrentHashMap读取时不加锁,因此需要通过回调的方式控制并发
    class ConcurrentHashMapWithFutureTask implements MapTask
    {
        private ConcurrentHashMap < String, FutureTask < DBConnectionPool >> pools = new ConcurrentHashMap < String, FutureTask < DBConnectionPool >> ();
        private FutureTask < DBConnectionPool > futureTask;
        //通过回调的方式 确保多线程下不会引发多次创建
        @Override
        public Connection getConnection(final String key)
        {
            try
            {
                if (!pools.containsKey(key))
                {
                    Callable < DBConnectionPool > callable = new Callable < DBConnectionPool > ()
                    {
                        @Override
                        public DBConnectionPool call() throws Exception
                        {
                            pools.put(key, futureTask);
                            return new DBConnectionPool(key);
                        }
                    };
                    FutureTask < DBConnectionPool > tmpTask = new FutureTask < DBConnectionPool > (callable);
                    futureTask = pools.putIfAbsent(key, tmpTask);
                    if (futureTask == null)
                    {
                        futureTask = tmpTask;
                        futureTask.run();
                    }
                }
                return pools.get(key)
                    .get()
                    .getConnection();
            }
            catch (Exception e)
            {
                e.printStackTrace();
                return null;
            }
        }
    }
    //DB连接池  测试用例供体
    class DBConnectionPool
    {
        public DBConnectionPool(String key)
        {
            System.out.println("创建了" + key + "类型的数据库连接池");
        }
        //获取DB连接
        public Connection getConnection()
        {
            // create Connection for db
            return new Connection();
        }
    }
    //DB连接 测试供体
    class Connection
    {}
    //任务执行器 待执行的任务
    class ExecutorTask implements Runnable
    {
        private CyclicBarrier barrier; //计数器
        private CountDownLatch latch; //集合点
        private MapTask task; //待执行的任务
        private String key;
        public ExecutorTask(String key, CyclicBarrier barrier, CountDownLatch latch, MapTask task)
        {
            this.barrier = barrier;
            this.latch = latch;
            this.task = task;
            this.key = key;
        }
        @Override
        public void run()
        {
            try
            {
                barrier.await(); //到达集合点之前等待 确保数据是并发执行的
                Connection connection = task.getConnection(key);
                if (null == connection)
                    throw new NullPointerException("Null Connection Exception with " + key);
                latch.countDown();
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
        }
    }
    //执行函数
    public void execute(String key, int thredCount, MapTask task)
    {
        CyclicBarrier barrier = new CyclicBarrier(thredCount);
        CountDownLatch latch = new CountDownLatch(thredCount);
        long beginTime = System.currentTimeMillis();
        System.out.println("===start " + task.getClass() + "===");
        for (int i = 0; i < thredCount; ++i)
        {
            new Thread(new ExecutorTask(key, barrier, latch, task))
                .start();
        }
        try
        {
            latch.await();
            System.out.println("====end " + task.getClass() + " still time " + (System.currentTimeMillis() - beginTime) + "===");
        }
        catch (InterruptedException e1)
        {
            throw new RuntimeException(e1);
        }
    }
    //启动函数
    public void start()
    {
        int thredCount = 200;
        MapTask hashMapWithLock = new HashMapWithLock();
        MapTask concurrentHashMapWithFutureTask = new ConcurrentHashMapWithFutureTask();
        execute("mysql", thredCount, hashMapWithLock);
        execute("sqlserver", thredCount, concurrentHashMapWithFutureTask);
    }
    //主函数
    public static void main(String[] args)
    {
        //启动主进程
        new FutureTaskTestCase()
            .start();
        //等待所有进程结束
        while (Thread.activeCount() > 1)
        {
            Thread.yield();
        }
    }
}

结果:

并发包下的类  常用类

我们可以看到对象值已经new了一次,因为是异步的,所以它通过回调的方式速度慢了许多,但在多线程模式下,我们就可以轻松规避单点访问推积过大的问题了。

三、Executors

线程池在java中我们经常会用到,数据库总是会需要它的。但,虽然用线程池解决并发操作非常好用,但它却有点麻烦,所以JDK为我们提供了Executors来方便我们高效率的new出ThredPoolExecutor,下面我们就来看看它都有哪些方法能够高效率的new出实例吧。

并发包下的类  常用类

以上就是关于并发包下常用类的所以内容了,由于并发包下类数量繁多,这里只是举例了一些常用的类,如果小伙伴们还想了解更多java架构师知识的话,就请一直持续关注我们的网站吧。

推荐阅读:

AQS,juc下的类有哪些?有什么作用?