各种阻塞队列的特点与应用

一共有7个阻塞队列:

  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

ArrayBlockingQueue

  • 有界
  • 内部是用Object数组维护的
  • 默认非公平,可以在构造指定
public ArrayBlockingQueue(int capacity, boolean fair)

公平性用ReentrantLock实现的,公平也会降低并发效率。

LinkedBlockingQueue

  • 有界
  • 内部是链表维护
  • 默认最大长度是Integer.MAX_VALUE,可以在构造里面指定

数组和链表实现的区别跟基础数据结构里边的数组和链表的区别一样,但是对队都是在首尾操作,感觉区别也不大

PriorityBlockingQueue

  • 无界
  • 支持优先级
  • 可以实现元素的compareTo方法指定排序规则,也可以在构造时传入Comparator, 这和PriorityQueue是一样一样的

DelayQueue

  • 无界
  • 支持延时获取元素
  • 对列元素要实现Delayed接口,指定多久才能获取元素,只有到期的元素才能出队

使用场景

  • 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
  • 定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的。

我写了一个很垃圾的例子,搞个定时任务, 严格来讲并不是一个定时任务

class Schedule {
    private DelayQueue<Task> delayQueue = new DelayQueue<>();
    private Thread thread;
    private boolean shoudRun = true;

    public Schedule() {
        thread = new Thread(() -> {
            while (shoudRun) {
                try {
                    Task task = delayQueue.take();
                    System.out.println("我开始处理任务了" + task.getTaskName() + "-时间=" + new Date());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public void start() {
        thread.start();
    }

    public void submit(Task task) {
        delayQueue.put(task);
    }

    // 这样正确的终止线程
    public void stop() {
        shoudRun = false;
    }
}

class Task implements Delayed {
    private AtomicInteger sequencer = new AtomicInteger(0);
    private String taskName;
    private int sequencerNumber = 0;
    private long time;

    public String getTaskName() {
        return taskName;
    }

    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }

    public Task(String taskName, long second) {
        this.sequencerNumber = sequencer.getAndIncrement();
        this.taskName = taskName;
        this.time = System.currentTimeMillis() + (second > 0 ? TimeUnit.SECONDS.toMillis(second) : 0);
    }

    @Override
    public long getDelay(TimeUnit unit) {
        // <=0就是到期了
        return unit.convert(time - System.currentTimeMillis(), TimeUnit.NANOSECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        if (o == this) {
            return 0;
        }
        if (o instanceof Task) {
            Task x = (Task) o;
            long diff = time - x.time;
            if (diff < 0) {
                return -1;
            } else if (diff > 0) {
                return 1;
            } else if (sequencerNumber < x.sequencerNumber) {
                return -1;
            } else {
                return 1;
            }
        }
        long d = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
        if (d == 0) {
            return 0;
        } else if (d < 0) {
            return -1;
        } else {
            return 1;
        }
    }
}

public class Demo_05_02_1_DelayQueue {
    public static void main(String[] args) throws InterruptedException {
        Schedule schedule = new Schedule();
        System.out.println("开始加任务了"+new Date());
        Task task = new Task("欢迎关注微信公众号“大雄和你一起学编程”1", 2);
        Task task2 = new Task("欢迎关注微信公众号“大雄和你一起学编程”2", 4);
        schedule.start();
        schedule.submit(task2);
        schedule.submit(task);
        schedule.submit(task); // 再提交一次
        Thread.sleep(10000);
        schedule.stop();
    }
}

// 执行结果大概就是下边这样:
// 开始加任务了Tue May 05 18:20:13 CST 2020
// 我开始处理任务了欢迎关注微信公众号“大雄和你一起学编程”1-时间=Tue May 05 18:20:15 CST 2020
// 我开始处理任务了欢迎关注微信公众号“大雄和你一起学编程”1-时间=Tue May 05 18:20:15 CST 2020
// 我开始处理任务了欢迎关注微信公众号“大雄和你一起学编程”2-时间=Tue May 05 18:20:17 CST 2020

这里最重要的点就是【欢迎关注微信公众号“大雄和你一起学编程”】, 呸呸呸,应该是时间,看秒的部分发现是符合预期的。

总结下使用DelayQueue的步骤

  1. 首先就是要定义你的队列元素,这个类需要实现Delayed接口,需要实现里边的两个方法

  2. long getDelay(TimeUnit unit)

这个表示,如果返回的值<=0就表示到期了,否则没到期

  • public int compareTo(Delayed o)

这个玩意儿是Comparable接口里面的,因为Delayed继承了Comparable

他就是定义排序规则,在DelayQueue里边,我们一般喜欢把快要过期的、甚至已经过期的老元素放到前面,其实你也可以用Comparator接口,会更灵活

然后你会发现我们的compareTo用了一个sequencerNumber,这个呢是个原子自增的,用于处理两个过期时间一样的任务如何排序的问题,我的规则是谁先创建谁排前面

  1. 生产队列元素和消费队列元素,这和使用一个普通队列几乎没两样了

SynchronousQueue

  • 无界,不存储元素
  • 每一个put操作必须等待一个take操作,否则不能继续添加元素,每一个take也必须等待一个put,因为没有元素
  • 支持公平与非公平
  • 非常适合传递性场景
  • 吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue

LinkedTransferQueue

  • 无界
  • 链表
  • 多了tryTransfer和transfer方法

transfer方法

如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。

如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回

(原文引用)

tryTransfer方法

tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。

和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回。

(原文引用)

LinkedBlockingDeque

  • 有界,默认为Integer.MAX_VALUE
  • 链表
  • 双向

他的特点就是从队首队尾插入都行,从队首队尾出队都行

results matching ""

    No results matching ""