java callable用法示例,java实例教程

KLQ 2020-06-12 14:33:15 java常见问答 5243

下面给大家带来的实例就是callable的用法方面的例子,一起来看看java callable用法实例吧。

使用线程处理项目当中的需求

具体需求:

三个线程并行处理三个SQL语句查询

线程类

@Scope("prototype")
//线程类需要注解到spring中 不然无法使用具体DAO操作
@Service
public class MainExcutor implements Callable < List < XXXVO >>
{
    // 日志记录
    private final static Logger logger = LoggerFactory
    .getLogger(MainExcutor.class);
    @Resource
    private XXXDAO xxxDAO;
    private XXXVO xxxVO;
    public XXXVO getXXXVO()
    {
        return xxxVO;
    }
    public void setXXXVO(XXXVO xxxVO)
    {
        this.xxxVO = xxxVO;
    }
    MainExcutor()
    {
        super();
    }
    @Override
    public List < XXXVO > call() throws Exception
    {
        logger.info(" 线程查询数据中。。。");
        List < XXXVO > resultList = new ArrayList < XXXVO > ();
        resultList = this.xxxDAO
            .queryXXXData(xxxVO);
        logger.info(" 线程查询数据结束。。。");
        return resultList;
    }
};

XXXDAO代码片段

//三个线程实例
@Resource
private MainExcutor mainExcutor1;
@Resource
private MainExcutor mainExcutor2;
@Resource
private MainExcutor mainExcutor3;
Map < String, Callable < List < XXXVO >>> taskMap = new HashMap < String, Callable < List < XXXVO >>> ();
// 三个参数对象各不相同
BeanCopier beanCopier = BeanCopier.create(XXXVO.class
    , XXXVO.class, false);
XXXVO xxxVO02 = new XXXVO();
XXXVO xxxVO03 = new ElectricTopicVO();
beanCopier.copy(xxxVO, xxxVO02, null);
beanCopier.copy(xxxVO, xxxVO03, null);
// 1   2  3 
xxxVO.setType("1");
this.mainExcutor1.setXXXVO(xxxVO);
taskMap.put("Task1", this.mainExcutor1);
xxxVO02.setType("2");
this.mainExcutor2.setXXXVO(xxxVO02);
taskMap.put("Task2", this.mainExcutor2);
xxxVO03.setType("3");
this.mainExcutor3.setXXXVO(xxxVO03);
taskMap.put("Task3", this.mainExcutor3);
MessageThreadPoolExecutor executor = new MessageThreadPoolExecutor(3
    , 4, 3000 L, TimeUnit.MILLISECONDS
    , new ArrayBlockingQueue < Runnable > (4));
try
{
    Map < String, List < XXXVO >> resultMap = executor
        .performTasks(taskMap);
    for (String key: resultMap.keySet())
    {
        tempList = resultMap.get(key);
        resultList.addAll(tempList);
    }
}
catch (InterruptedException e1)
{
    e1.printStackTrace();
}

线程池实现类,这个是实现并发执行的关键

class MessageThreadPoolExecutor extends ThreadPoolExecutor
{
    // 把父类的构造函数全弄出来算了。。。
    public MessageThreadPoolExecutor(int corePoolSize, int maximumPoolSize
        , long keepAliveTime, TimeUnit unit
        , BlockingQueue < Runnable > workQueue, RejectedExecutionHandler handler)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue
            , handler);
    }
    public MessageThreadPoolExecutor(int corePoolSize, int maximumPoolSize
        , long keepAliveTime, TimeUnit unit
        , BlockingQueue < Runnable > workQueue, ThreadFactory threadFactory)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue
            , threadFactory);
    }
    public MessageThreadPoolExecutor(int corePoolSize, int maximumPoolSize
        , long keepAliveTime, TimeUnit unit, BlockingQueue < Runnable > workQueue)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    public MessageThreadPoolExecutor(int corePoolSize, int maximumPoolSize
        , long keepAliveTime, TimeUnit unit
        , BlockingQueue < Runnable > workQueue, ThreadFactory threadFactory
        , RejectedExecutionHandler handler)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue
            , threadFactory, handler);
    }
    // 线程池主要执行方法
    public < T > Map < String, T > performTasks(Map < String, Callable < T >> taskMap)
    throws InterruptedException
    {
        // 无任务集合
        if (taskMap == null || taskMap.isEmpty())
        {
            throw new NullPointerException();
        }
        Map < String, Future < T >> futureMap = new HashMap < String, Future < T >> ();
        Map < String, T > messageMap = new HashMap < String, T > ();
        boolean done = false;
        try
        {
            for (String key: taskMap.keySet())
            {
                futureMap.put(key, submit(taskMap.get(key)));
            }
            for (String key: futureMap.keySet())
            {
                Future < T > f = futureMap.get(key);
                try
                {
                    T result = f.get();
                    messageMap.put(key, result);
                }
                catch (ExecutionException e)
                {
                    System.out.println(e.getMessage());
                }
            }
            done = true;
            return messageMap;
        }
        finally
        {
            if (!done)
            {
                for (String key: futureMap.keySet())
                {
                    futureMap.get(key)
                        .cancel(true);
                }
            }
            this.shutdown();
        }
    }
};

多个线程实现累加

package com.thread;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
 * @author:xxx
 * @TODO:使用callable进行自增
 */
public class AddCallable
{
    public static void main(String[] args)
    {
        long begin = System.currentTimeMillis();
        ExecutorService threadPool = Executors.newCachedThreadPool();
        AddTask task01 = new AddTask("Task-01", 1, 50);
        AddTask task02 = new AddTask("Task-02", 51, 100);
        //得到的结果集
        Future < Long > resultSet01 = threadPool.submit(task01);
        Future < Long > resultSet02 = threadPool.submit(task02);
        try
        {
            System.out.println("最后的结果是:" + (resultSet01.get() + resultSet02.get()));
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        catch (ExecutionException e)
        {
            e.printStackTrace();
        }
        finally
        {
            threadPool.shutdown();
        }
        System.out.println(System.currentTimeMillis() - begin);
    }
};
/**
 * @author:xxx
 * @TODO:泛型中的Long表示返回的类型
 */
class AddTask implements Callable < Long >
{
    private String name;
    private long begin;
    private long end;
    public AddTask(String name, long begin, long end)
    {
        this.name = name;
        this.begin = begin;
        this.end = end;
    }
    @Override
    public Long call() throws Exception
    {
        System.out.println(Thread.currentThread()
            .getName());
        System.out.println(name + " 执行中.......");
        long sum = 0;
        for (long i = begin; i <= end; i++)
        {
            sum += i;
        }
        return sum;
    }
};
/**
 * 
 */
package com.comtop.test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * @ProjectName:Skeleton
 * @PackageName:com.comtop.test
 * @Verson :0.1
 * @CreateUser :lanweixing
 * @CreateDate :2014-9-3上午9:41:16
 * @UseFor :
 */
public class DoThread
{
    public static void main(String[] args)
    {
        List < Num > list = new ArrayList < Num > ();
        List < Message > result1 = new ArrayList < Message > ();
        List < Message > result = new ArrayList < Message > ();
        // ExecutorService pool = Executors.newCachedThreadPool() ;
        //使用线程池处理
        ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 4, 3000 L
            , TimeUnit.MILLISECONDS, new ArrayBlockingQueue < Runnable > (2));
        for (int i = 0; i < 1000000; i++)
        {
            list.add(new Num(i));
        }
        long begin = System.currentTimeMillis();
        try
        {
            //1000000条数据5个线程跑   每个线程跑集合中的一部分
            for (int i = 0; i < 5; i++)
            {
                result1 = pool.submit(
                        new Main4Thread(list, i * 200000, (i + 1) * 200000))
                    .get();
                result.addAll(result1);
            }
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        catch (ExecutionException e)
        {
            e.printStackTrace();
        }
        pool.shutdown();
        System.out.println("所用时间" + (System.currentTimeMillis() - begin));
        System.out.println(result.size());
    }
};
class Main4Thread implements Callable < List < Message >>
{
    private List < Num > num;
    private int begin;
    private int end;
    /**
     * @param num
     */
    Main4Thread(List < Num > num, int begin, int end)
    {
        super();
        this.num = num;
        this.begin = begin;
        this.end = end;
    }
    @Override
    public List < Message > call() throws Exception
    {
        List < Message > msgList = new ArrayList < Message > ();
        for (int i = begin; i < end; i++)
        {
            if (num.get(i)
                .getAge() >= 500000)
            {
                //线程逻辑处理
                Message msg = new Message();
                msg.setMsg(num.get(i)
                    .getAge() + " Too old");
                msgList.add(msg);
            }
        }
        System.out.println(Thread.currentThread()
            .getName());
        return msgList;
    }
};
class Message
{
    private String msg;
    public String getMsg()
    {
        return msg;
    }
    public void setMsg(String msg)
    {
        this.msg = msg;
    }
};
class Num
{
    private int age;
    /**
     * @param age
     */
    Num(int age)
    {
        super();
        this.age = age;
    }
    public int getAge()
    {
        return age;
    }
    public void setAge(int age)
    {
        this.age = age;
    }
};

多线程调用多线程

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
public class Test implements Callable < String >
{
    private static final MainPoolExecutor executor = new MainPoolExecutor(3
        , 4, 3000 L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue < Runnable > (
            10));
    private String name;
    public String getName()
    {
        return name;
    }
    public void setName(String name)
    {
        this.name = name;
    }
    public Test()
    {
        super();
    }
    public Test(String name)
    {
        this.name = name;
    }
    @Override
    public String call() throws Exception
    {
        System.out.println(name + " 开始!");
        Thread.sleep(1000);
        System.out.println(name + " 结束!");
        return null;
    }
    public static void main(String[] args)
    {
        SuperTest[] testArr = new SuperTest[]
        {
            new SuperTest("父线程1")
                , new SuperTest("父线程2"), new SuperTest("父线程3")
        };
        Map < String, Callable < String >> taskMap = new HashMap < String, Callable < String >> ();
        try
        {
            for (SuperTest test: testArr)
            {
                taskMap.put(String.valueOf(Math.random()), test);
            }
            executor.performTasks(taskMap);
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        finally
        {
            executor.shutdown();
        }
    }
};
class SuperTest implements Callable < String >
{
    // 一共有9个线程所以 corePoolSize 和 maximumPoolSize得设置成大于等于9的数字 否则不同步
    private static final MainPoolExecutor executor = new MainPoolExecutor(9
        , 9, 3000 L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue < Runnable > (
            10));
    private String name;
    public String getName()
    {
        return name;
    }
    public SuperTest()
    {
        super();
    }
    public SuperTest(String name)
    {
        this.name = name;
    }
    @Override
    public String call()
    {
        Test[] testArr = new Test[]
        {
            new Test(name + " 子线程1")
                , new Test(name + " 子线程2"), new Test(name + " 子线程3")
        };
        Map < String, Callable < String >> taskMap = new HashMap < String, Callable < String >> ();
        for (Test test: testArr)
        {
            taskMap.put(String.valueOf(Math.random()), test);
        }
        try
        {
            executor.performTasks(taskMap);
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        finally
        {
            executor.shutdown();
        }
        return null;
    }
}

FutureTask不会阻塞主线程

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
 * FutureTask线程执行不会阻塞主线程
 * @author Test
 *
 */
public class NoInterruptTest implements Callable < String >
{
    private long longTime;
    // 锁对象 需要放外部 否则
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    public long getLongTime()
    {
        return longTime;
    }
    public void setLongTime(long longTime)
    {
        this.longTime = longTime;
    }
    public NoInterruptTest()
    {
        super();
    }
    public NoInterruptTest(long longTime)
    {
        this.longTime = longTime;
    }
    @Override
    public String call() throws Exception
    {
        // synchronized 用于控制同一个对象的多个线程访问
        lock.writeLock()
            .lock();
        System.out.println(Thread.currentThread()
            .getName() + " 开始!");
        Thread.sleep(longTime);
        System.out.println(Thread.currentThread()
            .getName() + " 结束!");
        lock.writeLock()
            .unlock();
        return "lanweixing";
    }
    public static void main(String[] args) throws Exception
    {
        long n1 = System.currentTimeMillis();
        NoInterruptTest callable1 = new NoInterruptTest(3000);
        NoInterruptTest callable2 = new NoInterruptTest(3000);
        ExecutorService executorService = Executors.newCachedThreadPool();
        FutureTask < String > task1 = new FutureTask < String > (callable1);
        FutureTask < String > task2 = new FutureTask < String > (callable1);
        executorService.execute(task1);
        executorService.execute(task2);
        while (true)
        {
            if (task1.isDone() && task2.isDone())
            {
                // 获得线程执行结果
                System.out.println(task1.get());
                System.out.println("任务已处理。");
                break;
            }
        }
        System.out.println((System.currentTimeMillis() - n1) / 1000);
        executorService.shutdown();
    }
};

更多java实例,请继续来奇Q工具网了解吧。

推荐阅读:

java输入一个数求绝对值,java绝对值怎么写?

java堆排序代码,堆排序实现

java继承例子代码,java类的继承示例