在java里,实际上队列的用法是多变的,不需要太僵硬的使用,下面我们就来看看队列是如何处理高并发的。
实例代码:
/** * @author fuguangli * @description 前沿消费者类 * @Create date: 2017/3/7 * @using EXAMPLE */ public class Customer implements Runnable { /** * 抛出异常 特殊值 阻塞 超时 插入 add(e) offer(e) put(e) offer(e, time, unit) 移除 remove() poll() take() poll(time, unit) 检查 element() peek() 不可用 不可用 */ private BlockingQueue blockingQueue; private AtomicInteger count = new AtomicInteger(); public Customer(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; } /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p/> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { System.out.println("消费者线程启动..."); LockFlag.setCustomerRunningFlag(true); try { while (LockFlag.getProducerRunningFlag()) { System.out.println(Thread.currentThread() .getId() + "I'm Customer.Queue current size=" + blockingQueue.size()); String data = (String) blockingQueue.poll(10, TimeUnit.SECONDS); if (data != null) { System.out.println(Thread.currentThread() .getId() + "*************正在消费数据 data=" + data); } else { //表示超过取值时间,视为生产者不再生产数据 System.out.println(Thread.currentThread() .getId() + "队列为空无数据,请检查生产者是否阻塞"); } Thread.sleep(50); } System.err.println("消费者程序执行完毕"); } catch (InterruptedException e) { e.printStackTrace(); System.err.println("消费者程序退出"); LockFlag.setCustomerRunningFlag(false); //异常退出线程 Thread.currentThread() .interrupt(); } } }
package com.qysxy.framework.queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @author fuguangli * @description 队列生产者类 * @Create date: 2017/3/7 * @using EXAMPLE */ public class Producer implements Runnable { /** * 抛出异常 特殊值 阻塞 超时 插入 add(e) offer(e) put(e) offer(e, time, unit) 移除 remove() poll() take() poll(time, unit) 检查 element() peek() 不可用 不可用 */ private BlockingQueue blockingQueue; private AtomicInteger count = new AtomicInteger(); public Producer(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; } /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p/> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { System.out.println("生产者线程启动..."); LockFlag.setProducerRunningFlag(true); try { while (LockFlag.getProducerRunningFlag()) { String data = "data:" + count.incrementAndGet(); if (blockingQueue.offer(data, 10, TimeUnit.SECONDS)) { //返回true表示生产数据正确 System.out.println("^^^^^^^^^^^^^^正在生产数据 data=" + data); } else { //表示阻塞时间内还没有生产者生产数据 System.out.println("生产者异常,无法生产数据"); } Thread.sleep(50); } } catch (InterruptedException e) { e.printStackTrace(); System.err.println("生产者程序退出"); LockFlag.setProducerRunningFlag(false); //异常退出线程 Thread.currentThread() .interrupt(); } } }
package com.qysxy.framework.queue; /** * @author fuguangli * @description 前沿生产者消费者模型的锁类 * @Create date: 2017/3/7 */ public class LockFlag { /** * 生产者互斥锁 */ private static Boolean producerRunningFlag = false; /** * 消费者互斥锁 */ private static Boolean customerRunningFlag = false; public static Boolean getProducerRunningFlag() { return producerRunningFlag; } public static void setProducerRunningFlag(Boolean producerRunningFlag) { LockFlag.producerRunningFlag = producerRunningFlag; } public static Boolean getCustomerRunningFlag() { return customerRunningFlag; } public static void setCustomerRunningFlag(Boolean customerRunningFlag) { LockFlag.customerRunningFlag = customerRunningFlag; } }
package com.qysxy.framework.queue; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.util.Queue; import java.util.concurrent.*; /** * @author fuguangli * @description 前沿队列实用类,用于大量并发用户 * @Create date: 2017/3/7 */ public class BlockingQueueHelper { private static final Integer maxQueueSize = 1000; private static BlockingQueue blockingQueue = new LinkedBlockingQueue(maxQueueSize); private static ExecutorService threadPool = Executors.newCachedThreadPool(); public static BlockingQueue getBlockingQueue() { if (blockingQueue == null) { blockingQueue = new LinkedBlockingQueue(maxQueueSize); } return blockingQueue; } /** * @param o 队列处理对象(包含request,response,data) */ public static void requestQueue(Object o) { //检测当前的队列大小 if (blockingQueue != null && blockingQueue.size() < maxQueueSize) { //可以正常进入队列 if (blockingQueue.offer(o)) { //添加成功,检测数据处理线程是否正常 if (LockFlag.getCustomerRunningFlag()) { //说明处理线程类正常运行 } else { //说明处理线程类停止,此时,应重新启动线程进行数据处理 LockFlag.setCustomerRunningFlag(true); //example:run Customer customer = new Customer(blockingQueue); threadPool.execute(customer); } } else { //进入队列失败,做出相应的处理,或者尝试重新进入队列 } } else { //队列不正常,或队列大小已达上限,做出相应处理 } } }
以上就是本篇文章的所有内容,如若还有其他不懂的java编程常见问题,请记得关注我们寻找答案。
推荐阅读: