Here's a sample problem and a custom Accumulator solution in java, you could use this as a sample to your own use case.
Input:
HashMap<String, String>
Output:
HashMap<String, Int> that will contain the count for each key in the input HashMaps,
Example:
Input HashMaps:
1. {"key1", "Value1"}, {"key2", "Value2"}
2. {"key1", "Value2"}
Output:
{"key1", 2}, {"key2", 1} //Since key1 is repeated 2 times.
Code:
import org.apache.spark.util.AccumulatorV2;
import java.util.HashMap;
public class CustomAccumulator extends AccumulatorV2<HashMap<String, String>, HashMap<String, Integer>> {
private HashMap<String, Integer> outputHashMap;
public CustomAccumulator(){
this.outputHashMap = new HashMap<>();
}
@Override
public boolean isZero() {
return outputHashMap.size() == 0;
}
@Override
public AccumulatorV2<HashMap<String, String>, HashMap<String, Integer>> copy() {
CustomAccumulator customAccumulatorCopy = new CustomAccumulator();
customAccumulatorCopy.merge(this);
return customAccumulatorCopy;
}
@Override
public void reset() {
this.outputHashMap = new HashMap<>();
}
@Override
public void add(HashMap<String, String> v) {
v.forEach((key, value) -> {
this.outputHashMap.merge(key, 1, (oldValue, newValue) -> oldValue + newValue);
});
}
@Override
public void merge(AccumulatorV2<HashMap<String, String>, HashMap<String, Integer>> other) {
other.value().forEach((key, value) -> {
this.outputHashMap.merge(key, value, (oldValue, newValue) -> oldValue + newValue);
});
}
@Override
public HashMap<String, Integer> value() {
return this.outputHashMap;
}
}