Skip to content

Latest commit

 

History

History
348 lines (238 loc) · 16.6 KB

谈谈集合.md

File metadata and controls

348 lines (238 loc) · 16.6 KB

[toc]

集合这一块,问就是并发问题,但前提先有总体了解。

清晰uml图

大概

面试官:谈谈集合吧

我:可以的,我们首先要介绍集合顶层接口Collection、Map,而List、Queue、Set实现了Collection接口,List又有ArrayList、LinkedList,Queue又有LinkedList、PriorityQueue,Set又有HashSet、TreeSet、LinkedHashSet等。Map又有HashMap,TreeMap,LinkedHashMap,当然HashTable是继承Dictionary接口,实现了Map。

此时就开始谈JUC下的集合,比如HashMap对应的ConcurrentHashMapConcurrentSkipListMap;比如ArrayList对应CopyOnWriteArrayList,Set对应的CopyOnWriteArraySet等。阻塞队列暂时先不谈哈。

ArrayList和LinkedList的区别

  • ArrayList是实现了基于动态数组的数据结构,LinkedList基于链表的数据结构。
  • ArrayList查找速度快,添加和删除慢
  • LinkedList查找慢,添加和删除快

HashMap和HashTable的区别

  • HashTable是线程安全的,每个方法前加了synchronized,而HashMap是非线程安全的
  • HashTable底层是数组+链表,而HashMap1.8版本是数组+链表or红黑树

HashMap和ConcurrentHashMap

  • HashMap是非线程安全的,ConcurrentHashMap是线程安全的
  • HashMap和ConcurrentHashMap在1.7都是数组+链表,1.8都是数组+链表or红黑树
  • ConcurrentHashMap在1.7是分段锁,1.8是去掉分段锁改成cas+synchronized

ArrayList有哪些并发问题

其实就是size++ 这一步的问题。 越界就是两个线程临界值去扩容都满足,于是一个线程size++导致的,另外一个线程就溢出了,null就是element[size] = e,第一个线程还没来得及size++,第二个线程就在原先的索引上把值给覆盖了,并且在下一个索引为null。

越界

  • 列表大小为9,即size=9
  • 线程A开始进入add方法,这时它获取到size的值为9,调用ensureCapacityInternal方法进行容量判断。
  • 线程B此时也进入add方法,它和获取的size的值也为9,也开始调用ensureCapacityInternal方法。
  • 线程A发现需求大小为10,而elementData的大小就为10,可以容纳。于是它不再扩容,返回。
  • 线程B也发现需要大小为10,也可以容纳,返回。
  • 好了,问题来了哈
  • 线程A开始进行设置值操作,elementData[size++] = e操作。此时size变为10。
  • 线程B也开始进行设置值操作,它尝试设置elementData[10] = e, 而elementData没有进行过扩容,它的下标最大为9
  • 于是此时会报出一个数组越界的异常ArrayIndexOutOfBoundsException

null

  • 列表大小为10,即size=0
  • 线程A开始添加元素,值为A。此时它执行第一条操作,将A放在了elementData下标为0的位置上。也就是说,线程挂在了element[0] = e上。
  • 接着线程B刚好也要开始添加一个值为B的元素,且走到了第一条的操作。此时线程B获取的size的值依然为0,于是它将B也放在了elementData下标为0的位置上。
  • 问题来了,其实上面也是问题,覆盖了。。。
  • 线程A将size的值增加为1
  • 线程B开始将size的值增加为2
  • 当你获取1索引的时候,那不就是null了?

HashMap的参数

  • 为什么容量要是 2 的整数次幂?

因为获取 key 在数组中对应的下标是通过 key 的哈希值与数组长度 -1 进行与运算,如:tab[i = (n - 1) & hash]

  1. n 为 2 的整数次幂,这样 n-1 后之前为 1 的位后面全是 1,这样就能保证 (n-1) & hash 后相应的位数既可能是 0 又可能是 1,这取决于 hash 的值,这样能保证散列的均匀,同时与运算效率高

  2. 如果 n 不是 2 的整数次幂,会造成更多的 hash 冲突

举个例子:如 16:10000, 16-1=15:1111, 1111 再与 hash 做 & 运算的时候,各个位置的取值取决于 hash,如果不是2的整数次幂,必然会有的0的位,这样再进行 & 操作的时候就为 0了,会造成哈希冲突。注意:HashMap的tableSizeFor方法做了处理,能保证n永远都是2次幂

  • 为什么负载因子是0.75?

负载因子过低,频繁扩容,扩容会重新哈希,性能下降;负载因子过高,容易浪费容量.(经验+概率)

  • 为什么红黑树的阈值是8?

在 hash 函数设计合理的情况下,发生 hash 碰撞 8 次的几率为百万分之 6,概率说话。(泊松分布)

  • 为什么退化链表的阈值6?

6是因为如果 hash 碰撞次数在 8 附近徘徊,会一直发生链表和红黑树的转化,为了预防这种情况的发生。

  • hash
static final int hash(Object key) {
    int h;
    return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

hash 函数是先拿到通过 key 的 hashcode,是 32 位的 int 值,然后让 hashcode 的高 16 位和低 16 位进行异或操作。这个也叫扰动函数,这么设计有二点原因:

  • 一定要尽可能降低 hash 碰撞,越分散越好
  • 算法一定要尽可能高效,因为这是高频操作, 因此采用位运算;

HashMap的put、get和扩容

put

hashmap-put-1.8

  • 判断数组是否为空,为空进行初始化;(初始化)
  • 不为空,计算 k 的 hash 值,通过(n - 1) & hash计算应当存放在数组中的下标 index;(通过hash计算index)
  • 查看 table[index] 是否存在数据,没有数据就构造一个 Node 节点存放在 table[index] 中;(查看数组中是否哈希冲突)
  • 存在数据,说明发生了 hash 冲突(存在二个节点 key 的 hash 值一样), 继续判断 key 是否相等,相等,用新的 value 替换原数据(onlyIfAbsent 为 false);(冲突,判断key是否相等,相等则替换)
  • 如果不相等,判断当前节点类型是不是树型节点,如果是树型节点,创造树型节点插入红黑树中;(判断是否红黑树)
  • 如果不是树型节点,创建普通 Node 加入链表中;判断链表长度是否大于 8, 大于的话链表转换为红黑树;(判断是否转成红黑树)
  • 插入完成之后判断当前节点数是否大于阈值,如果大于开始扩容为原数组的二倍。(扩容)

get

  1. 判断:是否为空,为空,返回null
  2. 不为空,判断第一个位置是否为查询key,是,返回value
  3. 不是,下一个节点继续判断是否为红黑树,是,按树查找
  4. 不是,按链表查找

扩容

先说1.7吧

for (HashMapEntry<K, V> e : table) {
    // 如果这个数组位置上有元素且存在哈希冲突的链表结构则继续遍历链表
    while (null != e) {
        //取当前数组索引位上单向链表的下一个元素
        HashMapEntry<K, V> next = e.next;
        //重新依据hash值计算元素在扩容后数组中的索引位置
        int i = indexFor(e.hash, newCapacity);
        e.next = newTable[i]; // 这一步和下一步就是头插法了,并且这两步出现线程不安全死循环问题
        newTable[i] = e;
        e = next; // 遍历链表
    }
}

1.8 HashMap的扩容使用的是2次幂的扩展(指长度扩为原来2倍),所以,元素的位置要么是在原位置,要么是在原位置再移动2次幂的位置。也就是说省略了重新计算hash值的时间,而且新增的1位是0还是1机会是随机的,因此resize的过程,均匀的把之前的冲突的节点分散到新的bucket了。如果在新表的数组索引位置相同,则链表元素不会倒置。

并发问题

  • HashMap扩容的时候会调用resize()方法,就是这里的并发操作容易在一个桶上形成环形链表
  • 这样当获取一个不存在的key时,计算出的index正好是环形链表的下标就会出现死循环。
  • 但是1.7的头插法造成的问题,1.8改变了插入顺序,就解决了这个问题,但是为了内存可见性等安全性,还是需要ConCurrentHashMap
  • HashTable 是直接在操作方法上加 synchronized 关键字,锁住整个数组,粒度比较大
  • Collections.synchronizedMap 是使用 Collections 集合工具的内部类,通过传入 Map 封装出一个 SynchronizedMap 对象,内部定义了一个对象锁,方法内通过对象锁实现
  • ConcurrentHashMap 使用分段锁,降低了锁粒度,让并发度大大提高。(jdk1.8 CAS+ synchronized)

ConcurrentHashMap

1.7

segment

  • 唯一的区别(和HashMap)就是其中的核心数据如 value ,以及链表都是 volatile 修饰的,保证了获取时的可见性。
  • ConcurrentHashMap 采用了分段锁技术(相对hashTable降低锁的粒度),其中 Segment 继承于 ReentrantLock(可能还会扯AQS)。
  • 不会像HashTable那样不管是put还是get操作都需要做同步处理,理论上 ConcurrentHashMap 支持 CurrencyLevel (Segment 数组数量)的线程并发。
  • 每当一个线程占用锁访问一个 Segment 时,不会影响到其他的 Segment。

put

  • 虽然HashEntry中的value是用volatile关键字修饰的,但是并不能保证并发的原子性,所以put操作仍然需要加锁处理。
  • 首先第一步的时候会尝试获取锁,如果获取失败肯定就是其他线程存在竞争,则利用 scanAndLockForPut() 自旋获取锁
    • 尝试获取自旋锁
    • 如果重试的次数达到了MAX_SCAN_RETRIES 则改为阻塞锁获取,保证能获取成功。

总的来说:

  • 将当前的Segment中的table通过key的hashcode定位到HashEntry
  • 遍历该HashEntry,如果不为空则判断传入的key和当前遍历的key是否相等,相等则覆盖旧的value
  • 不为空则需要新建一个HashEntry并加入到Segment中,同时会先判断是否需要扩容
  • 最后会解除在1中所获取当前Segment的锁。

get

  • 只需要将 Key 通过 Hash 之后定位到具体的 Segment ,再通过一次 Hash 定位到具体的元素上。
  • 由于 HashEntry 中的 value 属性是用 volatile 关键词修饰的,保证了内存可见性,所以每次获取时都是最新值。
  • ConcurrentHashMap 的 get 方法是非常高效的,因为整个过程都不需要加锁。

size

在 JDK1.7 中,第一种方案他会使用不加锁的模式去尝试多次计算 ConcurrentHashMap 的 size,最多三次,比较前后两次计算的结果,结果一致就认为当前没有元素加入,计算的结果是准确的。 第二种方案是如果第一种方案不符合,他就会给每个 Segment 加上锁,然后计算 ConcurrentHashMap 的 size 返回

1.8

1.7 查询遍历链表效率太低(种种原因)。其中抛弃了原有的 Segment 分段锁,而采用了 CAS + synchronized 来保证并发安全性(会扯1.6对synchronized的优化)

put

  • 根据key计算出hashcode
  • 判断是否需要进行初始化
  • 如果f为null,说明table中这个位置第一次插入元素,利用Unsafe.compareAndSwapObject方法插入Node节点。
    • 如果CAS成功,说明Node节点已经插入,随后addCount(1L, binCount)方法会检查当前容量是否需要进行扩容。
    • 如果CAS失败,说明有其它线程提前插入了节点,自旋重新尝试在这个位置插入节点。
  • 如果f的hash值为-1,说明当前f是ForwardingNode节点,意味有其它线程正在扩容,则一起进行扩容操作。
  • 如果都不满足,则利用synchronized锁写入数据
  • 如果数量大于TREEIFY_THRESHOLD 则要转换为红黑树。

get

  • 根据计算出来的 hashcode 寻址,如果就在桶上那么直接返回值。
  • 如果是红黑树那就按照树的方式获取值。
  • 就不满足那就按照链表的方式遍历获取值。

size

ConcurrentHashMap 提供了 baseCount、counterCells 两个辅助变量和一个 CounterCell 辅助内部类。sumCount() 就是迭代 counterCells 来统计 sum 的过程。 put 操作时,肯定会影响 size(),在 put() 方法最后会调用 addCount() 方法。

在addCount()方法中:

  • 如果 counterCells == null, 则对 baseCount 做 CAS 自增操作。
  • 如果并发导致 baseCount CAS 失败了使用 counterCells。
  • 如果counterCells CAS 失败了,在 fullAddCount 方法中,会继续死循环操作,直到成功。
  • CounterCell使用了 @sun.misc.Contended 标记的类

缓存系统中是以缓存行(cache line)为单位存储的。缓存行是2的整数幂个连续字节,一般为32-256个字节。最常见的缓存行大小是64个字节。当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会无意中影响彼此的性能,这就是伪共享

实际上:

  • JDK1.8 size 是通过对 baseCount 和 counterCell 进行 CAS 计算,最终通过 baseCount 和 遍历 CounterCell 数组得出 size。
  • JDK 8 推荐使用mappingCount 方法,因为这个方法的返回值是 long 类型,不会因为 size 方法是 int 类型限制最大值。

https://zhuanlan.zhihu.com/p/40627259

HashSet底层

HashSet中不允许有重复元素,这是因为HashSet是基于HashMap实现的,HashSet中的元素都存放在HashMap的key上面,而value中的值都是统一的一个private static final Object PRESENT = new Object();。 HashSet跟HashMap一样,都是一个存放链表的数组。这样遇到重复元素就可以返回object了(意味着不是null),如果value是null的话,发现重复,那么就返回上一个value值null,那么不符合源码:

private static final Object PRESENT = new Object();
public boolean add(E e) {
    return map.put(e, PRESENT)==null;
}
System.out.println(map.put(1, o)); // null
System.out.println(map.put(1, o));// java.lang.Object@610455d6
// 所以重复就false

TreeSet底层

TreeSet底层则采用NavigableMap这个接口来保存TreeSet集合,而实际上NavigableMap只是一个接口,实际上TreeSet还是用TreeMap来保存set元素。

TreeSet(NavigableMap<E,Object> m) {
        this.m = m;
}

 public TreeSet() {
        this(new TreeMap<E,Object>());

 }

TreeMap采用一种被称为“红黑树”的排序二叉树来保存Map中的的每个Entry——每个Entry都被当做红黑树的一个节点来对待;TreeMap实现SortedMap接口,能够把它保存的记录根据键排序,默认是按键值的升序排序,也可以指定排序的比较器,当用Iterator遍历TreeMap时,得到的记录是排过序的。如果使用排序的映射,建议使用TreeMap。在使用TreeMap时,key必须实现Comparable接口或者在构造TreeMap传入自定义的Comparator,否则会在运行时抛出java.lang.ClassCastException类型的异常。

LinkedHashMap

可以参考

一般说完这个,可能让你手撸LRU,你可以撸个伪代码即可。

// 双向链表+HashMap,Java中的LinkedHashMap就实现了该算法。
// get
public int get(int key) {
    if (map.containsKey(key)) {
        Node n = map.get(key); // 获取内存中存在的值,比如A
        remove(n); //使用链表的方法,移除该节点
        setHead(n); //依然使用链表的方法,将该节点放入头部
        return n.value;
    } 
    return -1;
}
// put
public void set(int key, int value) {
    if (map.containsKey(key)) {
        Node old = map.get(key);
        old.value = value;
        remove(old); // 移除旧节点
        setHead(old); // 放到队头
    } else {
        Node created = new Node(key, value);
        if (map.size() >= capacity) {
            map.remove(end.key); // clear该key
            remove(end); //链表也是依次
            setHead(created); // 将created放入队头
        } else {
            setHead(created); // 如果没满,直接放入队头
        }
        map.put(key,created);
    }
}
//lc: 146. LRU缓存机制
class LRUCache {
    private int cap;
    private Map<Integer, Integer> map = new LinkedHashMap<>();
    public LRUCache(int capacity) {
        this.cap = capacity;
    }
    
    public int get(int key) {
        if (map.containsKey(key)) {
            int value = map.get(key);
            // 查一次,就将查到到仍在队尾
            map.remove(key);
            map.put(key,value);
            return value;
        }
        return -1;
    }
    
    public void put(int key, int value) {
        if (map.containsKey(key)) {
            map.remove(key);
        } else if (map.size() == cap) {
            // 满了
            Iterator<Map.Entry<Integer, Integer>> iterator = map.entrySet().iterator();
            iterator.next();
            iterator.remove();
        }
        map.put(key, value);
    }
}