Java并发容器,实现java中的缓存

2019-10-08 14:22 来源:未知

使用DelayQueue、ConcurrentHashMap、FutureTask实现的缓存工具类。

Java程序员进行并发编程时,相比于其他语言的程序员而言要倍感幸福,因为并发编程大师Doug Lea不遗余力地为Java开发者提供了非常多的并发容器和框架。如果使用C语言进行开发的话,首先需要自己定义好底层数据结构(也可以说是容器,比如链表/哈希/树/...),然后结合使用mutex/cond等底层并发API来构建并发安全的容器;如果使用C++的话可以使用STL,但是STL中容器并未对并发安全有很好的支持。

DelayQueue是一个支持延时获取元素的无界阻塞队列。DelayQueue内部队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

1 ConcurrentHashMap

在并发编程中使用HashMap可能导致程序死循环。而使用线程安全的HashTable效率又非常低下,基于以上两个原因,便有了ConcurrentHashMap的登场机会,ConcurrentHashMap是并发安全且高效的HashMap。

(1) 线程不安全的HashMap
在多线程环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap。

public static void main(String[] args) {
    final HashMap<String, String> map = new HashMap<String, String>(2);
    Thread t = new Thread(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        map.put(UUID.randomUUID().toString(), "");
                    }
                }, "ftf" + i).start();
            }
        }
    }, "ftf");
    t.start();
    t.join();
}

执行以上代码就会导致死循环,HashMap在并发执行put操作时会引起死循环(具体来说就是put操作中rehash时可能导致链表形成循环结构),是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。

(2) 效率低下的HashTable
HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法时,会进入阻塞或轮询状态。

(3) ConcurrentHashMap的锁分段技术提高并发访问效率
HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的线程都必须竞争同一把锁,假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术。首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。(注意:JDK1.8中舍弃了分段锁技术,改用CAS+Synchronized机制。以下ConcurrentHashMap讲解以JDK1.8以例)

ConcurrentHashMap中一个重要的类就是Node,该类存储键值对,所有插入ConcurrentHashMap的数据都包装在这里面。它与HashMap中的定义很相似,但是但是有一些差别它对value和next属性设置了volatile同步锁,它不允许调用setValue方法直接改变Node的value域,它增加了find方法辅助map.get()方法。可在get方法返回的结果中更改对应的value值。

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;

    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }

    public final K getKey()       { return key; }
    public final V getValue()     { return val; }
    public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
    public final String toString(){ return key + "=" + val; }
    public final V setValue(V value) {
        throw new UnsupportedOperationException();
    }
    ...
}

ConcurrentHashMap定义了三个原子操作,用于对指定位置的节点进行操作。正是这些原子操作保证了ConcurrentHashMap的线程安全。

@SuppressWarnings("unchecked")  
//获得在i位置上的Node节点  
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {  
   return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);  
}  
//利用CAS算法设置i位置上的Node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少  
//在CAS算法中,会比较内存中的值与你指定的这个值是否相等,如果相等才接受你的修改,否则拒绝你的修改  
//因此当前线程中的值并不是最新的值,这种修改可能会覆盖掉其他线程的修改结果  有点类似于SVN  
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,  
                                   Node<K,V> c, Node<K,V> v) {  
   return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);  
}  
//利用volatile方法设置节点位置的值  
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {  
   U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);  
}  

put方法
ConcurrentHashMap最常用的方法就是put和get方法了,put方法依然沿用了HashMap的put方法的思想,据hash值计算这个新插入的点在table中的位置i,如果i位置是空的,直接放进去,否则进行判断,如果i位置是树节点,按照树的方式插入新的节点,否则把i插入到链表的末尾。ConcurrentHashMap中依然沿用这个思想,有一个最重要的不同点就是ConcurrentHashMap不允许key或value为null值。另外由于涉及到多线程,put方法就要复杂一点。在多线程中可能有以下两种情况:

  • 如果一个或多个线程正在对ConcurrentHashMap进行扩容操作,当前线程也要进入扩容的操作中。这个扩容的操作之所以能被检测到,是因为transfer方法中在空结点上插入forward节点,如果检测到需要插入的位置被forward节点占有,就帮助进行扩容;
  • 如果检测到要插入的节点是非空且不是forward节点,就对这个节点加锁,这样就保证了线程安全。尽管这个有一些影响效率,但是还是会比hashTable的synchronized要好得多。

get方法
get方法比较简单,给定一个key来确定value的时候,必须满足两个条件 key相同 hash值相同,对于节点可能在链表或树上的情况,需要分别去查找。get方法是不要加锁的,因为Node节点中value数据类型时volatile的,保证了内存可见性。

DelayQueue非常有用,可以将DelayQueue运用在以下应用场景。

2 ConcurrentLinkedQueue

线程安全的队列有2种实现方式,一种是非阻塞的,使用CAS算法实现;另一种是阻塞的,使用锁机制来实现。ConcurrentLinkedQueue是一种非阻塞型基于链接节点的无界线程安全队列,它采用先进先出的规
则对节点进行排序。

ConcurrentLinkedQueue结构类图:

图片 1

ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。默认情况下head节点存储的元素为空,tail节点等于head节点。

入队列
入队列就是将如对队节点添加到队列尾部,入队列主要做两件事情:多线程同时入队列操作时,使用CAS来保证操作。如果有一个线程正在入队,那么它必须先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这时可能有另外一个线程插队了,那么队列的尾节点就会发生变化,这时当前线程要暂停入队操作,然后重新获取尾节点。

图片 2

从源代码角度来看,整个入队过程确实做两件事情:第一是定位出尾节点;第二是使用CAS算法将入队节点设置成尾节点的next节点,如不成功则重试。

定位尾节点
tail节点并不总是尾节点,所以每次入队都必须先通过tail节点来找到尾节点。尾节点可能是tail节点,也可能是tail节点的next节点。代码中循环体中的第一个if就是判断tail是否有next节点,有则表示next节点可能是尾节点。获取tail节点的next节点需要注意的是p节点等于p的next节点的情况,只有一种可能就是p节点和p的next节点都等于空,表示这个队列刚初始化,正准备添加节点,所以需要返回head节点。获取p节点的next节点代码如下。

图片 3

出队列
出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。与入队列类似,并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head节点。这种做法也是通过hops变量来减少使用CAS更新head节点的消耗,从而提高出队效率。

图片 4

首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用CAS的方式将头节点的引用设置成null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。

  1. 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
  2. 定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的。

3 ConcurrentBlockingQueue

阻塞队列就是支持阻塞插入和删除的一个队列。
支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。
在阻塞队列不可用时(添加时队列已满,移除时队列为空),这两个附加操作提供了4种处理方式:

图片 5

注意:如果是无界阻塞队列,队列不可能会出现满的情况,所以使用put或offer方法永远不会被阻塞,而且使用offer方法时,该方法永远返回true。

Java中的阻塞队列
JDK7提供了以下7种队列:

  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列,默认最大长度为Integer.MAX_VALUE。
  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列,注意不能保证同优先级的相对顺序。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

ArrayBlockingQueue
一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。
默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。为了保证公平性,通常会降低吞吐量。

DelayQueue
DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
DelayQueue非常有用,可以将DelayQueue运用在以下应用场景。
● ·缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
● ·定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的。

参考资料:
1《Java并发编程的艺术》
2 JAVA HASHMAP的死循环 - 酷壳
3 ConcurrentHashMap源码分析(JDK8版本)

  1. ConcurrentHashMap 原理解析
  2. FutureTask 源码分析
  1. 支持缓存多长时间,单位毫秒。
  2. 支持多线程并发。比如:有一个比较耗时的操作,此时缓冲中没有此缓存值,一个线程开始计算这个耗时操作,而再次进来线程就不需要再次进行计算,只需要等上一个线程计算完成后(使用FutureTask)返回该值即可。
import java.util.concurrent.Callable;import java.util.concurrent.CancellationException;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentMap;import java.util.concurrent.DelayQueue;import java.util.concurrent.Delayed;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;import java.util.concurrent.FutureTask;import java.util.concurrent.TimeUnit;/** * @author jijs * @date 2017/08/04 */public class CacheBean<V> { // 缓存计算的结果 private final static ConcurrentMap<String, Future<Object>> cache = new ConcurrentHashMap<>(); // 延迟队列来判断那些缓存过期 private final static DelayQueue<DelayedItem<String>> delayQueue = new DelayQueue<>(); // 缓存时间 private final int ms; static { // 定时清理过期缓存 Thread t = new Thread() { @Override public void run() { dameonCheckOverdueKey(); } }; t.setDaemon; t.start(); } private final Computable<V> c; /** * @param c Computable */ public CacheBean(Computable<V> c) { this(c, 60 * 1000); } /** * @param c Computable * @param ms 缓存多少毫秒 */ public CacheBean(Computable<V> c, int ms) { this.c = c; this.ms = ms; } public V compute(final String key) throws InterruptedException { while  { //根据key从缓存中获取值 Future<V> f = (Future<V>) cache.get; if (f == null) { Callable<V> eval = new Callable<V>() { public V call() { return  c.compute; } }; FutureTask<V> ft = new FutureTask<>; //如果缓存中存在此可以,则返回已存在的value f = (Future<V>) cache.putIfAbsent(key, (Future<Object>) ft); if (f == null) { //向delayQueue中添加key,并设置该key的存活时间 delayQueue.put(new DelayedItem<>; f = ft; ft.run(); } } try { return f.get(); } catch (CancellationException e) { cache.remove; } catch (ExecutionException e) { e.printStackTrace(); } } } /** * 检查过期的key,从cache中删除 */ private static void dameonCheckOverdueKey() { DelayedItem<String> delayedItem; while  { try { delayedItem = delayQueue.take(); if (delayedItem != null) { cache.remove(delayedItem.getT; System.out.println(System.nanoTime() + " remove " + delayedItem.getT() + " from cache"); } } catch (InterruptedException e) { e.printStackTrace(); } } }}class DelayedItem<T> implements Delayed { private T t; private long liveTime; private long removeTime; public DelayedItem(T t, long liveTime) { this.setT; this.liveTime = liveTime; this.removeTime = TimeUnit.MILLISECONDS.convert(liveTime, TimeUnit.MILLISECONDS) + System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { if (o == null) return 1; if (o == this) return 0; if (o instanceof DelayedItem) { DelayedItem<T> tmpDelayedItem = (DelayedItem<T>) o; if (liveTime > tmpDelayedItem.liveTime) { return 1; } else if (liveTime == tmpDelayedItem.liveTime) { return 0; } else { return -1; } } long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); return diff > 0 ? 1 : diff == 0 ? 0 : -1; } @Override public long getDelay(TimeUnit unit) { return unit.convert(removeTime - System.currentTimeMillis; } public T getT() { return t; } public void setT { this.t = t; } @Override public int hashCode() { return t.hashCode(); } @Override public boolean equals(Object object) { if (object instanceof DelayedItem) { return object.hashCode() == hashCode() ? true : false; } return false; }}

/** * @author jijs * @date 2017/08/04 */public interface Computable<V> { V compute;}

/** * @author jijs * @date 2017/08/04 */public class FutureTaskDemo { public static void main(String[] args) throws InterruptedException { // 子线程 Thread t = new Thread -> { CacheBean<String> cb = new CacheBean<>(k -> { try { System.out.println("模拟计算数据,计算时长2秒。key=" + k); TimeUnit.SECONDS.sleep; } catch (InterruptedException e) { e.printStackTrace(); } return "你好:" + k; }, 5000); try { while  { System.out.println("thead2:" + cb.compute; TimeUnit.SECONDS.sleep; } } catch (InterruptedException e) { e.printStackTrace; t.start(); // 主线程 while  { CacheBean<String> cb = new CacheBean<>(k -> { try { System.out.println("模拟计算数据,计算时长2秒。key=" + k); TimeUnit.SECONDS.sleep; } catch (InterruptedException e) { e.printStackTrace(); } return "你好:" + k; }, 5000); System.out.println("thead1:" + cb.compute; TimeUnit.SECONDS.sleep; } }}

执行结果:

图片 6Paste_Image.png

两个线程同时访问同一个key的缓存。从执行结果发现,每次缓存失效后,同一个key只执行一次计算,而不是多个线程并发执行同一个计算然后缓存。

想了解更多精彩内容请关注我的公众号

图片 7

TAG标签:
版权声明:本文由金沙澳门唯一官网发布于编程教学,转载请注明出处:Java并发容器,实现java中的缓存