聊聊flink的DualKeyMap

【聊聊flink的DualKeyMap】本文主要研究一下flink的DualKeyMap
实例

@Test public void testKeySets() { final Random random = new Random(); final int capacity = 10; final Set> keys = new HashSet<>(capacity); for (int i = 0; i < capacity; i++) { int keyA = random.nextInt(); int keyB = random.nextInt(); keys.add(Tuple2.of(keyA, keyB)); }final DualKeyMap dualKeyMap = new DualKeyMap<>(capacity); for (Tuple2 key : keys) { dualKeyMap.put(key.f0, key.f1, "foobar"); }assertThat(dualKeyMap.keySetA(), Matchers.equalTo(keys.stream().map(t -> t.f0).collect(Collectors.toSet()))); assertThat(dualKeyMap.keySetB(), Matchers.equalTo(keys.stream().map(t -> t.f1).collect(Collectors.toSet()))); }

  • DualKeyMap有两个key,put值的时候,需要指定keyA及keyB
DualKeyMap flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.java
public class DualKeyMap {private final HashMap> aMap; private final HashMap bMap; private transient Collection values; public DualKeyMap(int initialCapacity) { this.aMap = new HashMap<>(initialCapacity); this.bMap = new HashMap<>(initialCapacity); }public int size() { return aMap.size(); }public V getKeyA(A aKey) { final Tuple2 value = https://www.it610.com/article/aMap.get(aKey); if (value != null) { return value.f1; } else { return null; } }public V getKeyB(B bKey) { final A aKey = bMap.get(bKey); if (aKey != null) { return aMap.get(aKey).f1; } else { return null; } }public V put(A aKey, B bKey, V value) { Tuple2 aValue = https://www.it610.com/article/aMap.put(aKey, Tuple2.of(bKey, value)); bMap.put(bKey, aKey); if (aValue != null) { return aValue.f1; } else { return null; } }public boolean containsKeyA(A aKey) { return aMap.containsKey(aKey); }public boolean containsKeyB(B bKey) { return bMap.containsKey(bKey); }public V removeKeyA(A aKey) { Tuple2 aValue = https://www.it610.com/article/aMap.remove(aKey); if (aValue != null) { bMap.remove(aValue.f0); return aValue.f1; } else { return null; } }public V removeKeyB(B bKey) { A aKey = bMap.remove(bKey); if (aKey != null) { Tuple2 aValue = https://www.it610.com/article/aMap.remove(aKey); if (aValue != null) { return aValue.f1; } else { return null; } } else { return null; } }public Collection values() { Collection vs = values; if (vs == null) { vs = new Values(); values = vs; }return vs; }public Set keySetA() { return aMap.keySet(); }public Set keySetB() { return bMap.keySet(); }public void clear() { aMap.clear(); bMap.clear(); }// ----------------------------------------------------------------------- // Inner classes // -----------------------------------------------------------------------/** * Collection which contains the values of the dual key map. */ private final class Values extends AbstractCollection {@Override public Iterator iterator() { return new ValueIterator(); }@Override public int size() { return aMap.size(); } }/** * Iterator which iterates over the values of the dual key map. */ private final class ValueIterator implements Iterator {private final Iterator> iterator = aMap.values().iterator(); @Override public boolean hasNext() { return iterator.hasNext(); }@Override public V next() { Tuple2 value = https://www.it610.com/article/iterator.next(); return value.f1; } } }

  • DualKeyMap定义了三个泛型,分别是A,B,V,即keyA,keyB,value的泛型;它维护了两个HashMap,其中aMap的key为keyA,value为Tuple2;bMap的key为keyB,value为keyA
  • DualKeyMap提供了getKeyA、getKeyB、containsKeyA、containsKeyB、removeKeyA、removeKeyB、keySetA、keySetB、size、put、values、clear方法
  • values方法返回的是Values,它继承了AbstractCollection,它的iterator方法返回的是ValueIterator;ValueIterator实现了Iterator接口,其内部使用的是aMap.values().iterator()
小结
  • DualKeyMap定义了三个泛型,分别是A,B,V,即keyA,keyB,value的泛型;它维护了两个HashMap,其中aMap的key为keyA,value为Tuple2;bMap的key为keyB,value为keyA
  • DualKeyMap提供了getKeyA、getKeyB、containsKeyA、containsKeyB、removeKeyA、removeKeyB、keySetA、keySetB、size、put、values、clear方法;put值的时候,需要指定keyA及keyB
  • values方法返回的是Values,它继承了AbstractCollection,它的iterator方法返回的是ValueIterator;ValueIterator实现了Iterator接口,其内部使用的是aMap.values().iterator()
doc
  • DualKeyMap

    推荐阅读