下面给大家带来的实例就是callable的用法方面的例子,一起来看看java callable用法实例吧。
使用线程处理项目当中的需求
具体需求:
三个线程并行处理三个SQL语句查询
线程类
@Scope("prototype") //线程类需要注解到spring中 不然无法使用具体DAO操作 @Service public class MainExcutor implements Callable < List < XXXVO >> { // 日志记录 private final static Logger logger = LoggerFactory .getLogger(MainExcutor.class); @Resource private XXXDAO xxxDAO; private XXXVO xxxVO; public XXXVO getXXXVO() { return xxxVO; } public void setXXXVO(XXXVO xxxVO) { this.xxxVO = xxxVO; } MainExcutor() { super(); } @Override public List < XXXVO > call() throws Exception { logger.info(" 线程查询数据中。。。"); List < XXXVO > resultList = new ArrayList < XXXVO > (); resultList = this.xxxDAO .queryXXXData(xxxVO); logger.info(" 线程查询数据结束。。。"); return resultList; } };
XXXDAO代码片段
//三个线程实例 @Resource private MainExcutor mainExcutor1; @Resource private MainExcutor mainExcutor2; @Resource private MainExcutor mainExcutor3; Map < String, Callable < List < XXXVO >>> taskMap = new HashMap < String, Callable < List < XXXVO >>> (); // 三个参数对象各不相同 BeanCopier beanCopier = BeanCopier.create(XXXVO.class , XXXVO.class, false); XXXVO xxxVO02 = new XXXVO(); XXXVO xxxVO03 = new ElectricTopicVO(); beanCopier.copy(xxxVO, xxxVO02, null); beanCopier.copy(xxxVO, xxxVO03, null); // 1 2 3 xxxVO.setType("1"); this.mainExcutor1.setXXXVO(xxxVO); taskMap.put("Task1", this.mainExcutor1); xxxVO02.setType("2"); this.mainExcutor2.setXXXVO(xxxVO02); taskMap.put("Task2", this.mainExcutor2); xxxVO03.setType("3"); this.mainExcutor3.setXXXVO(xxxVO03); taskMap.put("Task3", this.mainExcutor3); MessageThreadPoolExecutor executor = new MessageThreadPoolExecutor(3 , 4, 3000 L, TimeUnit.MILLISECONDS , new ArrayBlockingQueue < Runnable > (4)); try { Map < String, List < XXXVO >> resultMap = executor .performTasks(taskMap); for (String key: resultMap.keySet()) { tempList = resultMap.get(key); resultList.addAll(tempList); } } catch (InterruptedException e1) { e1.printStackTrace(); }
线程池实现类,这个是实现并发执行的关键
class MessageThreadPoolExecutor extends ThreadPoolExecutor { // 把父类的构造函数全弄出来算了。。。 public MessageThreadPoolExecutor(int corePoolSize, int maximumPoolSize , long keepAliveTime, TimeUnit unit , BlockingQueue < Runnable > workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue , handler); } public MessageThreadPoolExecutor(int corePoolSize, int maximumPoolSize , long keepAliveTime, TimeUnit unit , BlockingQueue < Runnable > workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue , threadFactory); } public MessageThreadPoolExecutor(int corePoolSize, int maximumPoolSize , long keepAliveTime, TimeUnit unit, BlockingQueue < Runnable > workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public MessageThreadPoolExecutor(int corePoolSize, int maximumPoolSize , long keepAliveTime, TimeUnit unit , BlockingQueue < Runnable > workQueue, ThreadFactory threadFactory , RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue , threadFactory, handler); } // 线程池主要执行方法 public < T > Map < String, T > performTasks(Map < String, Callable < T >> taskMap) throws InterruptedException { // 无任务集合 if (taskMap == null || taskMap.isEmpty()) { throw new NullPointerException(); } Map < String, Future < T >> futureMap = new HashMap < String, Future < T >> (); Map < String, T > messageMap = new HashMap < String, T > (); boolean done = false; try { for (String key: taskMap.keySet()) { futureMap.put(key, submit(taskMap.get(key))); } for (String key: futureMap.keySet()) { Future < T > f = futureMap.get(key); try { T result = f.get(); messageMap.put(key, result); } catch (ExecutionException e) { System.out.println(e.getMessage()); } } done = true; return messageMap; } finally { if (!done) { for (String key: futureMap.keySet()) { futureMap.get(key) .cancel(true); } } this.shutdown(); } } };
多个线程实现累加
package com.thread; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * @author:xxx * @TODO:使用callable进行自增 */ public class AddCallable { public static void main(String[] args) { long begin = System.currentTimeMillis(); ExecutorService threadPool = Executors.newCachedThreadPool(); AddTask task01 = new AddTask("Task-01", 1, 50); AddTask task02 = new AddTask("Task-02", 51, 100); //得到的结果集 Future < Long > resultSet01 = threadPool.submit(task01); Future < Long > resultSet02 = threadPool.submit(task02); try { System.out.println("最后的结果是:" + (resultSet01.get() + resultSet02.get())); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } finally { threadPool.shutdown(); } System.out.println(System.currentTimeMillis() - begin); } }; /** * @author:xxx * @TODO:泛型中的Long表示返回的类型 */ class AddTask implements Callable < Long > { private String name; private long begin; private long end; public AddTask(String name, long begin, long end) { this.name = name; this.begin = begin; this.end = end; } @Override public Long call() throws Exception { System.out.println(Thread.currentThread() .getName()); System.out.println(name + " 执行中......."); long sum = 0; for (long i = begin; i <= end; i++) { sum += i; } return sum; } };
/** * */ package com.comtop.test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @ProjectName:Skeleton * @PackageName:com.comtop.test * @Verson :0.1 * @CreateUser :lanweixing * @CreateDate :2014-9-3上午9:41:16 * @UseFor : */ public class DoThread { public static void main(String[] args) { List < Num > list = new ArrayList < Num > (); List < Message > result1 = new ArrayList < Message > (); List < Message > result = new ArrayList < Message > (); // ExecutorService pool = Executors.newCachedThreadPool() ; //使用线程池处理 ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 4, 3000 L , TimeUnit.MILLISECONDS, new ArrayBlockingQueue < Runnable > (2)); for (int i = 0; i < 1000000; i++) { list.add(new Num(i)); } long begin = System.currentTimeMillis(); try { //1000000条数据5个线程跑 每个线程跑集合中的一部分 for (int i = 0; i < 5; i++) { result1 = pool.submit( new Main4Thread(list, i * 200000, (i + 1) * 200000)) .get(); result.addAll(result1); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } pool.shutdown(); System.out.println("所用时间" + (System.currentTimeMillis() - begin)); System.out.println(result.size()); } }; class Main4Thread implements Callable < List < Message >> { private List < Num > num; private int begin; private int end; /** * @param num */ Main4Thread(List < Num > num, int begin, int end) { super(); this.num = num; this.begin = begin; this.end = end; } @Override public List < Message > call() throws Exception { List < Message > msgList = new ArrayList < Message > (); for (int i = begin; i < end; i++) { if (num.get(i) .getAge() >= 500000) { //线程逻辑处理 Message msg = new Message(); msg.setMsg(num.get(i) .getAge() + " Too old"); msgList.add(msg); } } System.out.println(Thread.currentThread() .getName()); return msgList; } }; class Message { private String msg; public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } }; class Num { private int age; /** * @param age */ Num(int age) { super(); this.age = age; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } };
多线程调用多线程
import java.util.HashMap; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; public class Test implements Callable < String > { private static final MainPoolExecutor executor = new MainPoolExecutor(3 , 4, 3000 L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue < Runnable > ( 10)); private String name; public String getName() { return name; } public void setName(String name) { this.name = name; } public Test() { super(); } public Test(String name) { this.name = name; } @Override public String call() throws Exception { System.out.println(name + " 开始!"); Thread.sleep(1000); System.out.println(name + " 结束!"); return null; } public static void main(String[] args) { SuperTest[] testArr = new SuperTest[] { new SuperTest("父线程1") , new SuperTest("父线程2"), new SuperTest("父线程3") }; Map < String, Callable < String >> taskMap = new HashMap < String, Callable < String >> (); try { for (SuperTest test: testArr) { taskMap.put(String.valueOf(Math.random()), test); } executor.performTasks(taskMap); } catch (InterruptedException e) { e.printStackTrace(); } finally { executor.shutdown(); } } }; class SuperTest implements Callable < String > { // 一共有9个线程所以 corePoolSize 和 maximumPoolSize得设置成大于等于9的数字 否则不同步 private static final MainPoolExecutor executor = new MainPoolExecutor(9 , 9, 3000 L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue < Runnable > ( 10)); private String name; public String getName() { return name; } public SuperTest() { super(); } public SuperTest(String name) { this.name = name; } @Override public String call() { Test[] testArr = new Test[] { new Test(name + " 子线程1") , new Test(name + " 子线程2"), new Test(name + " 子线程3") }; Map < String, Callable < String >> taskMap = new HashMap < String, Callable < String >> (); for (Test test: testArr) { taskMap.put(String.valueOf(Math.random()), test); } try { executor.performTasks(taskMap); } catch (InterruptedException e) { e.printStackTrace(); } finally { executor.shutdown(); } return null; } }
FutureTask不会阻塞主线程
import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * FutureTask线程执行不会阻塞主线程 * @author Test * */ public class NoInterruptTest implements Callable < String > { private long longTime; // 锁对象 需要放外部 否则 private final ReadWriteLock lock = new ReentrantReadWriteLock(); public long getLongTime() { return longTime; } public void setLongTime(long longTime) { this.longTime = longTime; } public NoInterruptTest() { super(); } public NoInterruptTest(long longTime) { this.longTime = longTime; } @Override public String call() throws Exception { // synchronized 用于控制同一个对象的多个线程访问 lock.writeLock() .lock(); System.out.println(Thread.currentThread() .getName() + " 开始!"); Thread.sleep(longTime); System.out.println(Thread.currentThread() .getName() + " 结束!"); lock.writeLock() .unlock(); return "lanweixing"; } public static void main(String[] args) throws Exception { long n1 = System.currentTimeMillis(); NoInterruptTest callable1 = new NoInterruptTest(3000); NoInterruptTest callable2 = new NoInterruptTest(3000); ExecutorService executorService = Executors.newCachedThreadPool(); FutureTask < String > task1 = new FutureTask < String > (callable1); FutureTask < String > task2 = new FutureTask < String > (callable1); executorService.execute(task1); executorService.execute(task2); while (true) { if (task1.isDone() && task2.isDone()) { // 获得线程执行结果 System.out.println(task1.get()); System.out.println("任务已处理。"); break; } } System.out.println((System.currentTimeMillis() - n1) / 1000); executorService.shutdown(); } };
更多java实例,请继续来奇Q工具网了解吧。
推荐阅读: