您现在的位置是:首页 > 正文

JDK8系列:阻塞队列 之 DelayQueue(延迟队列)源码分析

2024-02-01 04:27:00阅读 2

1、DelayQueue 简介

DelayQueue 是一个支持延时获取元素的无界阻塞队列队列使用PriorityQueue来实现队列中的元素必须实现Delayed接口在创建元素时可以指定多久才能从队列中获取当前元素,只有在延时期满时才能从队列中提取元素列头的元素是最先“到期”的元素

PriorityQueue 是一种优先级的队列,队列中的元素会按照优先级进行排序,我们在前面讲过PriorityBlockingQueue,PriorityQueue其实现原理也是使用的二叉堆,因此这次将不会再分析PriorityQueue的原理了,可以参考Java 并发 — 阻塞队列之PriorityBlockingQueue 中关于对二叉堆的分析。

但是这里有一点需要注意,DelayQueue的元素都必须继承Delayed接口(!!!)。同时也可以从这里初步理清楚DelayQueue内部实现的机制了:以支持优先级无界队列的PriorityQueue作为一个容器,容器里面的元素都应该实现Delayed接口,在每次往优先级队列中添加元素时以元素的过期时间作为排序条件最先过期的元素放在优先级最高

DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据为空的操作(消费者)才会被阻塞

leader:用于优化内部阻塞通知的线程,第一个阻塞等待的线程。用于减少线程的竞争,表示当前有其它线程正在获取队头元素。

leader是等待获取队列元素的线程,应用主从式设计减少不必要的等待。如果leader不等于空,表示已经有线程在等待获取队列的元素。所以,通过await()方法让出当前线程等待信号。如果leader等于空,则把当前线程设置为leader,当一个线程为leader,它会使用awaitNanos()方法让当前线程等待接收信号或等待delay时间。 

推论1:有leader线程等待的时候,新的线程要取,必然加入await队列排队。所有已经await的线程,唤醒了非leader线程就会继续await。每次唤醒一个处于await的线程。同时之能一个线程取就是leader去取。
await释放锁别的线程可以执行,singal不会释放锁别的线程不执行。
推论2:把take(),put()方法全部锁住,只能一个线程调用take或put。里面有await就不行了,await后别的线程就可以进来take,put方法,只要不是执行take里面的await方法,那么就只能一个线程执行take,put方法。
推论3:如果都不设置等待时间,那么就没有优先级,只有等到放的时候  或者  成功取到的线程唤醒,都设置等待时间,时间到了一起抢。
推论4:取的时候为空等待,时间到了直接返回,有元素但是时间没到,就设置leader。
推论5:等待取的线程,有一个是leader,就是由第一优先权的线程也是第一个尝试获取的线程。所有的等待取的线程中,下一次不一定是ledaer去取,如果唤醒了非leader线程,那也不行,要继续等到,一定要leader线程唤醒后去取,其他线程才能去取,然后成为新的leader

 

2、成员变量 和 构造方法

public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {

    // 可重入锁
    private final transient ReentrantLock lock = new ReentrantLock();

    // 存储元素的优先级队列
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    // 用于优化内部阻塞通知的线程,第一个阻塞等待的线程
    private Thread leader = null;

    // 条件控制,表示是否可以从队列中取数据
    private final Condition available = lock.newCondition();

/***************     下面是构造方法     *****************/
    public DelayQueue() {}

    // 调用addAll()方法将集合c中的元素全部放到queue中
    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }
}

 

3、方法介绍

add(E e) 、put(E e)、offer(E e, long timeout, TimeUnit unit) 都是直接调用的 offer(E e)方法。这4个方法完全等效

删除数据时,再以前的基础上,还要加上first是否到期的判断。只有到期的数据才能倍访问到。其真实的删除操作都是调用了优先级队列相应的方法

访问数据分为:

          peek():访问队头元素,不管到期没

          peekExpired() :访问队头元素,如果没有到期,返回null

将queue中数据输出到其他数据结构中:

          toArray():将全部数据都复制到数组中

          drainTo:只迁移到期数据到 集合c中

关于迭代器,比 ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue  少了 可分割迭代器

3.1、增加数据

add(E e)

// 实际上就是调用的 offer() 方法
public boolean add(E e) {
    return offer(e);
}

offer(E e) 

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();   // 加锁
    try {
        q.offer(e);   //调用PriorityQueue的offer()方法
        // peek 是获取的队头元素,唤醒阻塞在available 条件上的一个线程,表示可以从队列中取数据了
        // 说明,原来queue为空,所有取数据的线程都阻塞了。现在放了数据,自然要唤醒取数据的线程
        if (q.peek() == e) { 
            leader = null;   // leader 线程为空
            // 其他线程唤醒,如果队首元素是刚插入的元素,则设置leader为null,并唤醒阻塞在available上的线程
            available.signal();   
        }
        return true;
    } finally {
        lock.unlock();
    }
}

offer(E e, long timeout, TimeUnit unit) 

// 就是直接调用的 offer()
public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);
}

 put(E e) 

// 起始也是调用的 offer()
public void put(E e) {
    offer(e);
}

 

3.2、删除数据

poll() 

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();   //加锁
    try {
        E first = q.peek();  //将队头元素赋给first
        // 如果还没到期,则返回null
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        // 如果到期了,则删除队头元素
        else
            return q.poll();
    } finally {
        lock.unlock();  //释放锁
    }
}

poll(long timeout, TimeUnit unit) 

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();   //加锁,可中断
    try {
        // 死循环
        for (;;) {
            E first = q.peek();  //取得queue的队头元素

            // 如果first==null,则等待;如果超时,则返回null
            if (first == null) {
                if (nanos <= 0)
                    return null;
                else
                    nanos = available.awaitNanos(nanos);
            } else {  //到这里说明,first!=null
                long delay = first.getDelay(NANOSECONDS);

                // 如果first到期,则进行删除操作
                if (delay <= 0)  
                    return q.poll();

                // 到这里说明,first还没有到期
                // 如果超时,则返回null
                if (nanos <= 0)
                    return null;
                first = null; // 等待期间不要保持引用

                // 如果超时等待时间 < 剩余到期时间,或者leader线程为空,则继续超时等待
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                // 否则
                else {
                    // 将本线程设置成leader线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 等待直到first到期
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;  //计算超时时间
                    } finally {
                        // 如果本线程是leader线程,则将leader线程置空
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 如果leader线程为空 并且  queue非空,则唤醒其他线程
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

take() 

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();   //加锁,可中断
    try {
        // 死循环
        for (;;) {
            E first = q.peek();  //取出queue的队头

            // 如果queue为空,则进入等待队列
            if (first == null)
                 available.await();

            // 如果queue非空
            else {
                long delay = first.getDelay(NANOSECONDS);

                // 如果first到期,则取出元素,返回
                if (delay <= 0)
                    return q.poll();

                // 到这里,说明first还未到期
                first = null; // 等待期间不要持有引用

                // 如果leader线程非空,则进入等待队列
                if (leader != null)
                    available.await();
                // 如果leader空,则将本线程设置成leader线程,并等待到first线程到期
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        // 如果leader是本线程,则清理
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 如果leader线程为空,并且queue非空,则唤醒其他线程
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

remove(Object o) 

// 删除元素o
public boolean remove(Object o) {
    final ReentrantLock lock = this.lock;
    lock.lock();   //加锁
    try {
        return q.remove(o);
    } finally {
        lock.unlock();
    }
}

removeEQ(Object o) 

// 删除元素o(同一个,而不仅仅是值相等)
void removeEQ(Object o) {
    final ReentrantLock lock = this.lock;
    lock.lock();  //加锁
    try {
        for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
            if (o == it.next()) {   //区别在此
                it.remove();
                break;
            }
        }
    } finally {
        lock.unlock();
    }
}

 

3.3、访问数据

peek()

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();    //加锁
    try {
        return q.peek();
    } finally {
        lock.unlock();
    }
}

peekExpired() 

// 仅当第一个元素过期时才返回该元素
// 仅由drainTo使用。只在持有锁的情况下才调用本方法
private E peekExpired() {
    // assert lock.isHeldByCurrentThread();
    E first = q.peek();
    return (first == null || first.getDelay(NANOSECONDS) > 0) ? null : first;
}

 

3.4、查询

size()

// 返回queue的元素数量
public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return q.size();
    } finally {
        lock.unlock();
    }
}

remainingCapacity()  

// 返回剩余容量
// 优先级队列虽然是数组,但是由于添加元素时,可以扩容,因此容量无限
public int remainingCapacity() {
    return Integer.MAX_VALUE;
}

 

3.5、将queue的数据输出到其他数据结构中

toArray()     没到期的也复制到数组

// 将queue中的数据 复制到 数组(没到期的也复制)
public Object[] toArray() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return q.toArray();
    } finally {
        lock.unlock();
    }
}

toArray(T[] a) 

// 将queue中的所有数据 复制到 指定类型的数组
public <T> T[] toArray(T[] a) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return q.toArray(a);
    } finally {
        lock.unlock();
    }
}

 drainTo(Collection<? super E> c)    只迁移到期数据

// 将queue的 到期数据 迁移到 集合c中
public int drainTo(Collection<? super E> c) {
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    final ReentrantLock lock = this.lock;
    lock.lock();  //加锁
    try {
        int n = 0;
        // 将queue的到期数据 迁移到c中
        for (E e; (e = peekExpired()) != null;) {
            c.add(e);       // In this order, in case add() throws.
            q.poll();
            ++n;
        }
        return n;
    } finally {
        lock.unlock();
    }
}

drainTo(Collection<? super E> c, int maxElements) 

// 将queue中的到期数据(最多maxElements个) 迁移到 集合c中
public int drainTo(Collection<? super E> c, int maxElements) {
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    final ReentrantLock lock = this.lock;
    lock.lock();   //加锁
    try {
        int n = 0;
        // 和前面一个方法相比,多了1个约束条件n < maxElements
        for (E e; n < maxElements && (e = peekExpired()) != null;) {
            c.add(e);       // In this order, in case add() throws.
            q.poll();
            ++n;
        }
        return n;
    } finally {
        lock.unlock();
    }
}

 

3.6、其他方法

clear()

// 清空
public void clear() {
    final ReentrantLock lock = this.lock;
    lock.lock();  //加锁
    try {
        q.clear();
    } finally {
        lock.unlock();
    }
}

iterator() 

// 返回 迭代器
public Iterator<E> iterator() {
    return new Itr(toArray());
}

 

4、内部类

4.1、Itr

网站文章

  • Python这些操作,逆天且实用

    1. 显示WiFi密码 我们经常忘记wifi的密码,可是每当家里来了亲戚朋友问起WiFi密码,却又无从下手。 这里有一个技巧,我们可以列出所有的设备和它们的密码。 import subprocess ...

    2024-02-01 04:26:53
  • AS3动画效果常用公式

    【转】AS3动画效果常用公式 (2012-06-13 12:21:07)转载▼ 标签: as3 效果公式 常用处理公式 基本运动公式 三角公式 it 分类: Flash/Flex/As3.0 在网上找到的,未整理。 AS3缓动公式: sprite.x += (targetX – spri...

    2024-02-01 04:26:24
  • JSP前后台数据交互

    JSP前后台数据交互

    1.一般方法:在页面加载时载入数据 首先,新建一个包,在包内创建一个类,类实现接口Servlet-javax.servlet 然后,在web.xml文件里配置如下

    2024-02-01 04:26:16
  • 魅蓝note3联通卡显示无服务器,魅蓝Note3手机卡(SIM卡)安装方法真机图文详解

    魅蓝note3联通卡显示无服务器,魅蓝Note3手机卡(SIM卡)安装方法真机图文详解

    魅蓝note3采用与或卡托设计,靠近内部卡槽位为SIM1且只能安装Nano-SIM(迷你卡);而靠近外部的卡槽为为SIM2/TF卡,这个卡槽也支持TF卡扩展,在安装单卡或者双卡的时候,需要按照以下的S...

    2024-02-01 04:26:10
  • linux查看网卡百兆还是千兆

    【代码】linux查看网卡百兆还是千兆。

    2024-02-01 04:25:41
  • 释放C盘空间,优化电脑性能:如何正确清理和管理C盘文件? 最新发布

    释放C盘空间,优化电脑性能:如何正确清理和管理C盘文件? 最新发布

    文件管理————释放C盘空间,优化电脑性能:如何正确清理和管理C盘文件

    2024-02-01 04:25:34
  • React Native应用中的样式

    React Native应用中的样式RN中样式与CSS名称类似,但是本质上不同 —— RN应用中没有浏览器内核!!!例如:RN中的组件不分“行内”、“行内块”、“块级”;所有的容器默认都使用“弹性布局...

    2024-02-01 04:25:27
  • vue3 模版语法

    vue3 模版语法

    的 属性 可能和一般的 HTML 属性 看起来不太一样,但它的确是合法的 attribute 名称字符,并且所有支持 Vue 的浏览器都能正确解析它。作为前缀,表明它们是一些由 Vue 提供的特殊 a...

    2024-02-01 04:24:57
  • LeetCode193——有效电话号码

    LeetCode193——有效电话号码

    我的LeetCode代码仓:https://github.com/617076674/LeetCode 原题链接:https://leetcode-cn.com/problems/valid-phone-numbers/description/ 题目描述: 知识点:Linux常用指令、正则表达式 思路一:grep命令 grep命令用于查找文件里符合条件的字符串,其中-P选项可以让gr...

    2024-02-01 04:24:50
  • springboot连接阿里云服务器redis集群搭建详细介绍 | 避坑 | 绝对成功(全网最详细搭建最简洁)

    springboot连接阿里云服务器redis集群搭建详细介绍 | 避坑 | 绝对成功(全网最详细搭建最简洁)

    springboot连接redis集群显示内网IPspringboot连接超时执行redis-cli --cluster create 卡住一直在waiting for the cluster to ...

    2024-02-01 04:24:43