精巧好用的DelayQueue

2013-3-12 雨辰 遇到的一些问题

延迟队列:

我们谈一下实际的场景吧。我们在开发中,有如下场景

a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。
c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。

一种笨笨的办法就是,使用一个后台线程,遍历所有对象,挨个检查。这种笨笨的办法简单好用,但是对象数量过多时,可能存在性能问题,检查间隔时间不好设置,间隔时间过大,影响精确度,多小则存在效率问题。而且做不到按超时的时间顺序处理。

这场景,使用DelayQueue最适合了。

DelayQueue是java.util.concurrent中提供的一个很有意思的类。很巧妙,非常棒!但是java doc和Java SE 5.0的source中都没有提供Sample。我最初在阅读ScheduledThreadPoolExecutor源码时,发现DelayQueue的妙用。随后在实际工作中,应用在session超时管理,网络应答通讯协议的请求超时处理。

本文将会对DelayQueue做一个介绍,然后列举应用场景。并且提供一个Delayed接口的实现和Sample代码。

DelayQueue是一个BlockingQueue,其特化的参数是Delayed。(不了解BlockingQueue的同学,先去了解BlockingQueue再看本文)
Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay的返回值应为固定值(final)。DelayQueue内部是使用PriorityQueue实现的。

DelayQueue = BlockingQueue +PriorityQueue + Delayed

DelayQueue的关键元素BlockingQueue、PriorityQueue、Delayed。可以这么说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。

他们的基本定义如下
publicinterfaceComparable<T>{
publicintcompareTo(T o);
}
publicinterfaceDelayedextendsComparable<Delayed>{
longgetDelay(TimeUnit unit);
}
publicclassDelayQueue<EextendsDelayed>implementsBlockingQueue<E>{
privatefinalPriorityQueue<E>q=newPriorityQueue<E>();
}
DelayQueue内部的实现使用了一个优先队列。当调用DelayQueue的offer方法时,把Delayed对象加入到优先队列q中。如下:
publicbooleanoffer(E e) {
finalReentrantLock lock=this.lock;
lock.lock();
try{
E first=q.peek();
q.offer(e);
if(first==null||e.compareTo(first)<0)
available.signalAll();
returntrue;
}finally{
lock.unlock();
}
}
DelayQueue的take方法,把优先队列q的first拿出来(peek),如果没有达到延时阀值,则进行await处理。如下:
publicE take()throwsInterruptedException {
finalReentrantLock lock=this.lock;
lock.lockInterruptibly();
try{
for(;;) {
E first=q.peek();
if(first==null) {
available.await();
}else{
longdelay=first.getDelay(TimeUnit.NANOSECONDS);
if(delay>0) {
longtl=available.awaitNanos(delay);
}else{
E x=q.poll();
assertx!=null;
if(q.size()!=0)
available.signalAll();//wake up other takers
returnx;

}
}
}
}finally{
lock.unlock();
}
}
-------------------

以下是Sample,是一个缓存的简单实现。共包括三个类Pair、DelayItem、Cache。如下:

publicclassPair<K, V>{
publicK first;

publicV second;

publicPair() {}

publicPair(K first, V second) {
this.first=first;
this.second=second;
}
}
--------------
以下是Delayed的实现
importjava.util.concurrent.Delayed;
importjava.util.concurrent.TimeUnit;
importjava.util.concurrent.atomic.AtomicLong;

publicclassDelayItem<T>implementsDelayed {
/**Base of nanosecond timings, to avoid wrapping*/
privatestaticfinallongNANO_ORIGIN=System.nanoTime();

/**
* Returns nanosecond time offset by origin
*/
finalstaticlongnow() {
returnSystem.nanoTime()-NANO_ORIGIN;
}

/**
* Sequence number to break scheduling ties, and in turn to guarantee FIFO order among tied
* entries.
*/
privatestaticfinalAtomicLong sequencer=newAtomicLong(0);

/**Sequence number to break ties FIFO*/
privatefinallongsequenceNumber;

/**The time the task is enabled to execute in nanoTime units*/
privatefinallongtime;

privatefinalT item;

publicDelayItem(T submit,longtimeout) {
this.time=now()+timeout;
this.item=submit;
this.sequenceNumber=sequencer.getAndIncrement();
}

publicT getItem() {
returnthis.item;
}

publiclonggetDelay(TimeUnit unit) {
longd=unit.convert(time-now(), TimeUnit.NANOSECONDS);
returnd;
}

publicintcompareTo(Delayed other) {
if(other==this)//compare zero ONLY if same object
return0;
if(otherinstanceofDelayItem) {
DelayItem x=(DelayItem) other;
longdiff=time-x.time;
if(diff<0)
return-1;
elseif(diff>0)
return1;
elseif(sequenceNumber<x.sequenceNumber)
return-1;
else
return1;
}
longd=(getDelay(TimeUnit.NANOSECONDS)-other.getDelay(TimeUnit.NANOSECONDS));
return(d==0)?0: ((d<0)?-1:1);
}
}



以下是Cache的实现,包括了put和get方法,还包括了可执行的main函数。
importjava.util.concurrent.ConcurrentHashMap;
importjava.util.concurrent.ConcurrentMap;
importjava.util.concurrent.DelayQueue;
importjava.util.concurrent.TimeUnit;
importjava.util.logging.Level;
importjava.util.logging.Logger;

publicclassCache<K, V>{
privatestaticfinalLogger LOG=Logger.getLogger(Cache.class.getName());

privateConcurrentMap<K, V>cacheObjMap=newConcurrentHashMap<K, V>();

privateDelayQueue<DelayItem<Pair<K, V>>>q=newDelayQueue<DelayItem<Pair<K, V>>>();

privateThread daemonThread;

publicCache() {

Runnable daemonTask=newRunnable() {
publicvoidrun() {
daemonCheck();
}
};

daemonThread=newThread(daemonTask);
daemonThread.setDaemon(true);
daemonThread.setName("Cache Daemon");
daemonThread.start();
}

privatevoiddaemonCheck() {

if(LOG.isLoggable(Level.INFO))
LOG.info("cache service started.");

for(;;) {
try{
DelayItem<Pair<K, V>>delayItem=q.take();
if(delayItem!=null) {
//超时对象处理
Pair<K, V>pair=delayItem.getItem();
cacheObjMap.remove(pair.first, pair.second);//compare and remove
}
}catch(InterruptedException e) {
if(LOG.isLoggable(Level.SEVERE))
LOG.log(Level.SEVERE, e.getMessage(), e);
break;
}
}

if(LOG.isLoggable(Level.INFO))
LOG.info("cache service stopped.");
}

//添加缓存对象
publicvoidput(K key, V value,longtime, TimeUnit unit) {
V oldValue=cacheObjMap.put(key, value);
if(oldValue!=null)
q.remove(key);

longnanoTime=TimeUnit.NANOSECONDS.convert(time, unit);
q.put(newDelayItem<Pair<K, V>>(newPair<K, V>(key, value), nanoTime));
}

publicV get(K key) {
returncacheObjMap.get(key);
}

//测试入口函数
publicstaticvoidmain(String[] args)throwsException {
Cache<Integer, String>cache=newCache<Integer, String>();
cache.put(1,"aaaa",3, TimeUnit.SECONDS);

Thread.sleep(1000*2);
{
String str=cache.get(1);
System.out.println(str);
}

Thread.sleep(1000*2);
{
String str=cache.get(1);
System.out.println(str);
}
}
}
运行Sample,main函数执行的结果是输出两行,第一行为aaa,第二行为null。

标签: 游戏业务架构设计 技术问题

发表评论:

雨辰 joyimp|@2011-2018 京ICP备16030765号