189 8069 5689

vb.net阻塞队列 编程简单实现一个阻塞队列

阻塞队列和线程池原理

队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。

专注于为中小企业提供成都网站设计、网站建设服务,电脑端+手机端+微信端的三站合一,更高效的管理,为中小企业福田免费做网站提供优质的服务。我们立足成都,凝聚了一批互联网行业人才,有力地推动了1000+企业的稳健成长,帮助中小企业通过网站建设实现规模扩充和转变。

在队列中插入一个队列元素称为入队,从队列中删除一个队列元素称为出队。因为队列只允许在一端插入,在另一端删除,所以只有最早进入队列的元素才能最先从队列中删除,故队列又称为先进先出(FIFO—first in first out)线性表。

阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

阻塞队列在jdk中有个专门的接口,BlockingQueue。但BlockingQueue的方法并不都是阻塞的方法:

add()插入元素,remove()拿取元素。add()往一个满的队列中插元素会插不进,会抛出异常;而remove()从空队列拿,也会抛出异常。

offer()和poll(),往一个满的队列,返回一个false;poll()往一个空队列中取元素,返回一个null

take()、put(),真正体现BlockingQueue的阻塞

take()往一个满的队列中插元素会阻塞;put()往一个空队列中取元素,也会阻塞

以上的阻塞队列都实现了BlockingQueue接口,也都是线程安全的。

是一个用数组实现的有界阻塞队列。此队列按照先进先出的原则对元素进行排序。默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。初始化时有参数可以设置

是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

DelayQueue非常有用,可以将DelayQueue运用在以下应用场景。

缓存系统的设计: 可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。

是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。

多了tryTransfer和transfer方法,

(1)transfer方法

如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer()可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer()会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。

(2)tryTransfer方法

tryTransfer()是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer()的区别是,tryTransfer()无论消费者是否接收,方法立即返回。而transfer()是必须等到消费者消费了才返回。

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。

多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法却等同于takeFirst,不知道是不是JDK的bug,使用时还是用带有First和Last后缀的方法更清楚。在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀。另外,双向阻塞队列可以运用在“工作窃取”模式中。

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。 如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。

第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来;

线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;

如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;

如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize

线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于corePoolSize时才有用

keepAliveTime的时间单位

workQueue必须是BlockingQueue阻塞队列。当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待。通过workQueue,线程池实现了阻塞功能。

一般来说,我们应该尽量使用有界队列,因为使用无界队列作为工作队列会对线程池带来如下影响。

1)当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize。

2)由于1,使用无界队列时maximumPoolSize将是一个无效参数。

3)由于1和2,使用无界队列时keepAliveTime将是一个无效参数。

4)更重要的,使用无界queue可能会耗尽系统资源,有界队列则有助于防止资源耗尽,同时即使使用有界队列,也要尽量控制队列的大小在一个合适的范围。

创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名,当然还可以更加自由的对线程做更多的设置,比如设置所有的线程为守护线程。

Executors静态工厂里默认的threadFactory,线程的命名规则是“pool-数字-thread-数字”。

线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:

(1)AbortPolicy:直接抛出异常,默认策略;

(2)CallerRunsPolicy:用调用者所在的线程来执行任务;

(3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;

(4)DiscardPolicy:直接丢弃任务;

当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。

2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。

3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务。

4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。

submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断 所有没有正在执行任务的线程 。

只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。

要想合理地配置线程池,就必须首先分析任务特性

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

性质不同的任务可以用不同规模的线程池分开处理。

CPU密集型任务应配置尽可能小的线程,如配置Ncpu+1个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu。

混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。

优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。

执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。

建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。

如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。

阻塞队列

阻塞队列常用于生产者和消费者的场景,生产者就是往队列中放入元素,消费者就是从队列中获取元素,阻塞队列就是生产者存放元素的容器,而消费者也从该容器中拿元素。

阻塞队列有两种常见的阻塞场景,满足这两种阻塞场景的队列就是阻塞队列,分别如下:

Java中提供了7个阻塞队列,分别如下:

ArrayBlockingQueue和LinkedBlockingQueue一般为常用的阻塞队列。

接下来通过一个Demo演示阻塞队列的用法。

这里维护了一个ArrayBlockingQueue,并指定其大小为10,创建了一个生产者线程和一个消费者线程,生产者线程在生产5个事件后睡两秒钟,消费者线程在消费完“事件 - 5”后由于从队列中拿不到元素,就会自动阻塞,等待生产者往队列中放入元素,只要队列中有生产者放入元素,就会立即唤醒消费者线程继续获取元素,详见以下Log:

下面通过分析ArrayBlockingQueue的原理加深对阻塞队列的理解。

在生产者消费模型中,生产数据和消费数据的速率不一致,如果生产数据速度快一些,消费不过来,就会导致数据丢失,这时候我们就可以使用阻塞队列来解决这个问题。

阻塞队列是一个队列,我们使用单线程生产数据,使用多线程消费数据。由于阻塞队列的特点:队列为空的时候消费者端阻塞,队列满的时候生产者端阻塞。多线程消费数据起到了加速消费的作用,使得生产的数据不会在队列里积压过多,而生产的数据也不会丢失处理。

阻塞队列--概述

首先通过接口类BlockingQueue中的注释来简单了解阻塞队列。

阻塞队列是一个支持附加操作的特殊队列:在队列为空时回收元素会阻塞等待直到队列非空,或在队列已满时插入元素,会阻塞等待直到队列不满。

阻塞队列的方法提供了四种不同的处理方式:抛异常、返回特殊值(null或false)、阻塞当前线程直到操作成功以及阻塞一段时间,超时退出。这四种处理方式分别对应不同的函数接口:

是一个用数组实现的有界阻塞队列,按先进先出的原则对元素进行排序。put和take方法分别为添加和删除的阻塞方法。默认情况下不保证线程公平。

ArrayBlockingQueue内部使用一把重入锁ReentrantLock来保证多个线程之间的插入删除元素的同步;同时使用两个条件对象Condition来实现阻塞逻辑,调用其await和signal方法来实现线程的等待和唤醒。

由于使用了ReentrantLock,所以ArrayBlockingQueue存在线程公平与不公平两种选择。

插入删除元素的具体执行逻辑: ArrayBlockingQueue

PS:这7个阻塞队列本来想着一个一个解析的,但看了下源码,逻辑其实没有很复杂,所以后面几个就只记一下内部实现的主要点。

用链表实现的有界阻塞队列,默认和最大长度为Integer.MAX_VALUE,队列按照先进先出的原则对元素排序。

插入元素在表尾,删除元素在表头。

LinkedBlockingQueue内部使用了两把重入锁ReentrantLock,分别用来保护插入操作和删除操作。

同样也是使用两个条件对象来实现阻塞逻辑。

支持优先级的无界阻塞队列,默认按元素自然顺序升序排列,可通过自定义类实现compareTo方法或指定构造参数Comparator来指定元素排序规则,不保证同优先级元素的顺序。

PriorityBlockingQueue内部使用的是数组对象来存储元素,且数组容量初始化为11。

其内部只使用了一把重入锁ReentrantLock,和一个条件对象Condition,只用于阻塞和唤醒删除元素操作的线程。

当插入元素时,若此时数组已满,也不需要等待,它会尝试扩容,因此插入操作也不会有阻塞的可能。

PriorityBlockingQueue内部还有一个allocationSpinLock自旋锁,用于扩容时的同步保护,在执行扩容操作前,需先自旋尝试将allocationSpinLock置为1,设置成功后才能继续往下执行。

一个不存储元素的阻塞队列,每一个put的线程会阻塞到直到有一个take线程取走元素为止,每一个take的线程会阻塞到直到有一个put的线程放入元素为止。

由于SynchronousQueue不存储元素,所以类似peek操作或者迭代器操作都是无效的。

支持公平访问队列,默认情况下线程采用非公平性策略访问队列。

SynchronousQueue只是一个对外的封装层,其真正的实现逻辑在其类型为Transferer的成员变量transferer的transfer方法中;抽象类Transferer有两个具体的实现类:TransferStack和TransferQueue,分别在非公平和公平的模式下使用。

Transferer类内部是通过自旋锁及CAS操作实现多个线程间的同步。

SynchronousQueue可当做一个传递中介,负责将生产者线程处理的数据直接传递给消费者线程,适用于传递性场景,吞吐量高。

其内部的具体实现逻辑可参考: SynchronousQueue

LinkedTransferQueue内部是通过自旋以及CAS操作来实现线程间的同步。

有链表结构组成的双向阻塞队列,即可以从队列的两端插入或移除元素。

其内部同样拥有一把重入锁ReentrantLock,两个条件对象notEmpty和notFull。整体逻辑同LinkedBlockingQueue相似。

在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀,双向阻塞队列可以运用在“工作窃取”模式中。


分享名称:vb.net阻塞队列 编程简单实现一个阻塞队列
本文地址:http://cdxtjz.cn/article/dogidci.html

其他资讯