Support Questions

Find answers, ask questions, and share your expertise

How to create custom hashmap accumulator in spark using java?

avatar
Super Collaborator

Hi All,
I want to create the custom hashmap accumulator in spark for one of my use case. I have already referred and implemented Accumulator as per code is given on below links but not found an end-to-end example for the same. I am trying to do it in java.
https://www.brosinski.com/post/extending-spark-accumulators/


Problem :
I want to know, How Can I increment value for each key in the same accumulator within foreach() transformation?


1 REPLY 1

avatar
New Contributor

Here's a sample problem and a custom Accumulator solution in java, you could use this as a sample to your own use case.

Input:

HashMap<String, String>

 

Output:

HashMap<String, Int> that will contain the count for each key in the input HashMaps,

 

Example:

Input HashMaps:

1. {"key1", "Value1"}, {"key2", "Value2"}

2. {"key1", "Value2"}

 

Output:

{"key1", 2}, {"key2", 1} //Since key1 is repeated 2 times.

 

Code:

import org.apache.spark.util.AccumulatorV2;
import java.util.HashMap;

public class CustomAccumulator extends AccumulatorV2<HashMap<String, String>, HashMap<String, Integer>> {

    private HashMap<String, Integer> outputHashMap;

    public CustomAccumulator(){
        this.outputHashMap = new HashMap<>();
    }

    @Override
    public boolean isZero() {
        return outputHashMap.size() == 0;
    }

    @Override
    public AccumulatorV2<HashMap<String, String>, HashMap<String, Integer>> copy() {
        CustomAccumulator customAccumulatorCopy = new CustomAccumulator();
        customAccumulatorCopy.merge(this);
        return customAccumulatorCopy;
    }

    @Override
    public void reset() {
        this.outputHashMap = new HashMap<>();
    }

    @Override
    public void add(HashMap<String, String> v) {
        v.forEach((key, value) -> {
            this.outputHashMap.merge(key, 1, (oldValue, newValue) -> oldValue + newValue);
        });
    }

    @Override
    public void merge(AccumulatorV2<HashMap<String, String>, HashMap<String, Integer>> other) {
        other.value().forEach((key, value) -> {
            this.outputHashMap.merge(key, value, (oldValue, newValue) -> oldValue + newValue);
        });
    }

    @Override
    public HashMap<String, Integer> value() {
        return this.outputHashMap;
    }
}