Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to create custom hashmap accumulator in spark using java?

avatar
Super Collaborator

Hi All,
I want to create the custom hashmap accumulator in spark for one of my use case. I have already referred and implemented Accumulator as per code is given on below links but not found an end-to-end example for the same. I am trying to do it in java.
https://www.brosinski.com/post/extending-spark-accumulators/


Problem :
I want to know, How Can I increment value for each key in the same accumulator within foreach() transformation?


1 REPLY 1

avatar
New Contributor

Here's a sample problem and a custom Accumulator solution in java, you could use this as a sample to your own use case.

Input:

HashMap<String, String>

 

Output:

HashMap<String, Int> that will contain the count for each key in the input HashMaps,

 

Example:

Input HashMaps:

1. {"key1", "Value1"}, {"key2", "Value2"}

2. {"key1", "Value2"}

 

Output:

{"key1", 2}, {"key2", 1} //Since key1 is repeated 2 times.

 

Code:

import org.apache.spark.util.AccumulatorV2;
import java.util.HashMap;

public class CustomAccumulator extends AccumulatorV2<HashMap<String, String>, HashMap<String, Integer>> {

    private HashMap<String, Integer> outputHashMap;

    public CustomAccumulator(){
        this.outputHashMap = new HashMap<>();
    }

    @Override
    public boolean isZero() {
        return outputHashMap.size() == 0;
    }

    @Override
    public AccumulatorV2<HashMap<String, String>, HashMap<String, Integer>> copy() {
        CustomAccumulator customAccumulatorCopy = new CustomAccumulator();
        customAccumulatorCopy.merge(this);
        return customAccumulatorCopy;
    }

    @Override
    public void reset() {
        this.outputHashMap = new HashMap<>();
    }

    @Override
    public void add(HashMap<String, String> v) {
        v.forEach((key, value) -> {
            this.outputHashMap.merge(key, 1, (oldValue, newValue) -> oldValue + newValue);
        });
    }

    @Override
    public void merge(AccumulatorV2<HashMap<String, String>, HashMap<String, Integer>> other) {
        other.value().forEach((key, value) -> {
            this.outputHashMap.merge(key, value, (oldValue, newValue) -> oldValue + newValue);
        });
    }

    @Override
    public HashMap<String, Integer> value() {
        return this.outputHashMap;
    }
}