- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
How to create custom hashmap accumulator in spark using java?
- Labels:
-
Apache Spark
Created ‎06-08-2019 09:27 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi All,
I want to create the custom hashmap accumulator in spark for one of my use case. I have already referred and implemented Accumulator as per code is given on below links but not found an end-to-end example for the same. I am trying to do it in java.
https://www.brosinski.com/post/extending-spark-accumulators/
Problem :
I want to know, How Can I increment value for each key in the same accumulator within foreach() transformation?
Created ‎01-03-2021 07:36 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Here's a sample problem and a custom Accumulator solution in java, you could use this as a sample to your own use case.
Input:
HashMap<String, String>
Output:
HashMap<String, Int> that will contain the count for each key in the input HashMaps,
Example:
Input HashMaps:
1. {"key1", "Value1"}, {"key2", "Value2"}
2. {"key1", "Value2"}
Output:
{"key1", 2}, {"key2", 1} //Since key1 is repeated 2 times.
Code:
import org.apache.spark.util.AccumulatorV2;
import java.util.HashMap;
public class CustomAccumulator extends AccumulatorV2<HashMap<String, String>, HashMap<String, Integer>> {
private HashMap<String, Integer> outputHashMap;
public CustomAccumulator(){
this.outputHashMap = new HashMap<>();
}
@Override
public boolean isZero() {
return outputHashMap.size() == 0;
}
@Override
public AccumulatorV2<HashMap<String, String>, HashMap<String, Integer>> copy() {
CustomAccumulator customAccumulatorCopy = new CustomAccumulator();
customAccumulatorCopy.merge(this);
return customAccumulatorCopy;
}
@Override
public void reset() {
this.outputHashMap = new HashMap<>();
}
@Override
public void add(HashMap<String, String> v) {
v.forEach((key, value) -> {
this.outputHashMap.merge(key, 1, (oldValue, newValue) -> oldValue + newValue);
});
}
@Override
public void merge(AccumulatorV2<HashMap<String, String>, HashMap<String, Integer>> other) {
other.value().forEach((key, value) -> {
this.outputHashMap.merge(key, value, (oldValue, newValue) -> oldValue + newValue);
});
}
@Override
public HashMap<String, Integer> value() {
return this.outputHashMap;
}
}
