java队列处理高并发怎么实现?

TheDisguiser 2020-08-26 17:03:49 java常见问答 5690

在java里,实际上队列的用法是多变的,不需要太僵硬的使用,下面我们就来看看队列是如何处理高并发的。

实例代码:

/**
 * @author fuguangli
 * @description 前沿消费者类
 * @Create date:    2017/3/7
 * @using   EXAMPLE
 */
public class Customer implements Runnable
{
    /**
     *         抛出异常    特殊值        阻塞         超时
     插入    add(e)    offer(e)    put(e)    offer(e, time, unit)
     移除    remove()    poll()    take()    poll(time, unit)
     检查    element()    peek()    不可用    不可用
     */
    private BlockingQueue blockingQueue;
    private AtomicInteger count = new AtomicInteger();
    public Customer(BlockingQueue blockingQueue)
    {
        this.blockingQueue = blockingQueue;
    }
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p/>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see Thread#run()
     */
    @Override
    public void run()
    {
        System.out.println("消费者线程启动...");
        LockFlag.setCustomerRunningFlag(true);
        try
        {
            while (LockFlag.getProducerRunningFlag())
            {
                System.out.println(Thread.currentThread()
                    .getId() + "I'm Customer.Queue current size=" + blockingQueue.size());
                String data = (String) blockingQueue.poll(10, TimeUnit.SECONDS);
                if (data != null)
                {
                    System.out.println(Thread.currentThread()
                        .getId() + "*************正在消费数据 data=" + data);
                }
                else
                {
                    //表示超过取值时间,视为生产者不再生产数据
                    System.out.println(Thread.currentThread()
                        .getId() + "队列为空无数据,请检查生产者是否阻塞");
                }
                Thread.sleep(50);
            }
            System.err.println("消费者程序执行完毕");
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
            System.err.println("消费者程序退出");
            LockFlag.setCustomerRunningFlag(false); //异常退出线程
            Thread.currentThread()
                .interrupt();
        }
    }
}
package com.qysxy.framework.queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * @author fuguangli
 * @description 队列生产者类
 * @Create date:    2017/3/7
 * @using       EXAMPLE
 */
public class Producer implements Runnable
{
    /**
     *         抛出异常    特殊值        阻塞         超时
     插入    add(e)    offer(e)    put(e)    offer(e, time, unit)
     移除    remove()    poll()    take()    poll(time, unit)
     检查    element()    peek()    不可用    不可用
     */
    private BlockingQueue blockingQueue;
    private AtomicInteger count = new AtomicInteger();
    public Producer(BlockingQueue blockingQueue)
    {
        this.blockingQueue = blockingQueue;
    }
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p/>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see Thread#run()
     */
    @Override
    public void run()
    {
        System.out.println("生产者线程启动...");
        LockFlag.setProducerRunningFlag(true);
        try
        {
            while (LockFlag.getProducerRunningFlag())
            {
                String data = "data:" + count.incrementAndGet();
                if (blockingQueue.offer(data, 10, TimeUnit.SECONDS))
                {
                    //返回true表示生产数据正确
                    System.out.println("^^^^^^^^^^^^^^正在生产数据 data=" + data);
                }
                else
                {
                    //表示阻塞时间内还没有生产者生产数据
                    System.out.println("生产者异常,无法生产数据");
                }
                Thread.sleep(50);
            }
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
            System.err.println("生产者程序退出");
            LockFlag.setProducerRunningFlag(false); //异常退出线程
            Thread.currentThread()
                .interrupt();
        }
    }
}
package com.qysxy.framework.queue;
/**
 * @author fuguangli
 * @description 前沿生产者消费者模型的锁类
 * @Create date:    2017/3/7
 */
public class LockFlag
{
    /**
     * 生产者互斥锁
     */
    private static Boolean producerRunningFlag = false;
    /**
     * 消费者互斥锁
     */
    private static Boolean customerRunningFlag = false;
    public static Boolean getProducerRunningFlag()
    {
        return producerRunningFlag;
    }
    public static void setProducerRunningFlag(Boolean producerRunningFlag)
    {
        LockFlag.producerRunningFlag = producerRunningFlag;
    }
    public static Boolean getCustomerRunningFlag()
    {
        return customerRunningFlag;
    }
    public static void setCustomerRunningFlag(Boolean customerRunningFlag)
    {
        LockFlag.customerRunningFlag = customerRunningFlag;
    }
}
package com.qysxy.framework.queue;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Queue;
import java.util.concurrent.*;
/**
 * @author fuguangli
 * @description 前沿队列实用类,用于大量并发用户
 * @Create date:    2017/3/7
 */
public class BlockingQueueHelper
{
    private static final Integer maxQueueSize = 1000;
    private static BlockingQueue blockingQueue = new LinkedBlockingQueue(maxQueueSize);
    private static ExecutorService threadPool = Executors.newCachedThreadPool();
    public static BlockingQueue getBlockingQueue()
    {
        if (blockingQueue == null)
        {
            blockingQueue = new LinkedBlockingQueue(maxQueueSize);
        }
        return blockingQueue;
    }
    /**
     * @param o 队列处理对象(包含request,response,data)
     */
    public static void requestQueue(Object o)
    {
        //检测当前的队列大小
        if (blockingQueue != null && blockingQueue.size() < maxQueueSize)
        {
            //可以正常进入队列
            if (blockingQueue.offer(o))
            {
                //添加成功,检测数据处理线程是否正常
                if (LockFlag.getCustomerRunningFlag())
                {
                    //说明处理线程类正常运行
                }
                else
                {
                    //说明处理线程类停止,此时,应重新启动线程进行数据处理
                    LockFlag.setCustomerRunningFlag(true);
                    //example:run
                    Customer customer = new Customer(blockingQueue);
                    threadPool.execute(customer);
                }
            }
            else
            {
                //进入队列失败,做出相应的处理,或者尝试重新进入队列
            }
        }
        else
        {
            //队列不正常,或队列大小已达上限,做出相应处理
        }
    }
}

以上就是本篇文章的所有内容,如若还有其他不懂的java编程常见问题,请记得关注我们寻找答案。

推荐阅读:

java队列和栈的区别有哪些?

java队列详解,队列的用法是?

java队列入门解析