Member since
01-03-2021
1
Post
0
Kudos Received
0
Solutions
01-03-2021
07:36 AM
Here's a sample problem and a custom Accumulator solution in java, you could use this as a sample to your own use case. Input: HashMap<String, String> Output: HashMap<String, Int> that will contain the count for each key in the input HashMaps, Example: Input HashMaps: 1. {"key1", "Value1"}, {"key2", "Value2"} 2. {"key1", "Value2"} Output: {"key1", 2}, {"key2", 1} //Since key1 is repeated 2 times. Code: import org.apache.spark.util.AccumulatorV2;
import java.util.HashMap;
public class CustomAccumulator extends AccumulatorV2<HashMap<String, String>, HashMap<String, Integer>> {
private HashMap<String, Integer> outputHashMap;
public CustomAccumulator(){
this.outputHashMap = new HashMap<>();
}
@Override
public boolean isZero() {
return outputHashMap.size() == 0;
}
@Override
public AccumulatorV2<HashMap<String, String>, HashMap<String, Integer>> copy() {
CustomAccumulator customAccumulatorCopy = new CustomAccumulator();
customAccumulatorCopy.merge(this);
return customAccumulatorCopy;
}
@Override
public void reset() {
this.outputHashMap = new HashMap<>();
}
@Override
public void add(HashMap<String, String> v) {
v.forEach((key, value) -> {
this.outputHashMap.merge(key, 1, (oldValue, newValue) -> oldValue + newValue);
});
}
@Override
public void merge(AccumulatorV2<HashMap<String, String>, HashMap<String, Integer>> other) {
other.value().forEach((key, value) -> {
this.outputHashMap.merge(key, value, (oldValue, newValue) -> oldValue + newValue);
});
}
@Override
public HashMap<String, Integer> value() {
return this.outputHashMap;
}
}
... View more