Created on
10-10-2019
09:23 AM
- last edited on
10-10-2019
05:39 PM
by
ask_bill_brooks
I have implemented a UDA function based on HyperLogLog to calculate cardinality. The UDA uses Java serialized bytes stored as strings in kudu, which needs to be reconstructed back to a HyperLogLog object and merge multiple of them together. The UDA function is of the type hll_merge(string) -> string which is the cardinality value.
All the intermediate values are StringVal. The rough code for doing the same is as below
IMPALA_UDF_EXPORT
void HllPlusPlusInit(FunctionContext* context, StringVal* val) {
val->is_null = true;
}
IMPALA_UDF_EXPORT
void HllPlusPlusUpdate(FunctionContext* context, const StringVal& src, StringVal* result){
if (src.is_null) return;
vector<char> srcBytes = BytesFromStringVal(src);
if (result->is_null) {
string s(srcBytes.begin(), srcBytes.end());
uint8_t* copy = context->Allocate(s.size());
if (copy == NULL) return;
memcpy(copy, s.c_str(), s.size());
*result = StringVal(copy, s.size());
result->len = s.size();
return;
}
vector<char> bytes = StringValToBytes(result);
Builder *b = new Builder(14, 25);
HyperLogLogPlusPlus hll = b->build(bytes);
HyperLogLogPlusPlus hll2 = b->build(srcBytes);
delete b;
hll.addAll(hll2);
vector<char> res = hll.getBytes();
string s(res.begin(), res.end());
result->ptr = context->Reallocate(result->ptr, s.size());
if (result->ptr == NULL) {
//Allocation failed
*result = StringVal::null();
return;
}
memcpy(result->ptr, s.c_str(), s.size());
result->len = s.size();
}
IMPALA_UDF_EXPORT
void HllPlusPlusMerge(FunctionContext* context, const StringVal& src, StringVal* dst){
if (src.is_null) return;
if (dst->is_null) {
vector<char> bytes = StringValToBytes(src);
string s(bytes.begin(), bytes.end());
uint8_t* copy = context->Allocate(s.size());
if (copy == NULL) return;
memcpy(copy, s.c_str(), s.size());
*dst = StringVal(copy, s.size());
return;
}
vector<char> srcBytes = StringValToBytes(src);
vector<char> bytes = StringValToBytes(dst);
Builder *b = new Builder(14, 25);
HyperLogLogPlusPlus hll = b->build(bytes);
HyperLogLogPlusPlus hll2 = b->build(srcBytes);
delete b;
hll.addAll(hll2);
vector<char> res = hll.getBytes();
string s(res.begin(), res.end());
dst->ptr = context->Reallocate(dst->ptr, s.size());
if (dst->ptr == NULL) {
//Allocation failed
*dst = StringVal::null();
return;
}
memcpy(dst->ptr, s.c_str(), s.size());
dst->len = s.size();
}
IMPALA_UDF_EXPORT
StringVal HllPlusPlusSerialize(FunctionContext* context, const StringVal& src){
if (src.is_null) {
return StringVal::null();
}
StringVal result(context, src.len);
memcpy(result.ptr, src.ptr, src.len);
context->Free(src.ptr);
return result;
}
IMPALA_UDF_EXPORT
StringVal HllPlusPlusFinalize(FunctionContext* context, const StringVal& src){
if (src.is_null) {
return ToStringVal(context, 0);
}
Builder *b = new Builder(14, 25);
vector<char> srcBytes = StringValToBytes(src);
HyperLogLogPlusPlus hll = b->build(srcBytes);
delete b;
long cardinality = hll.cardinality();
StringVal result = ToStringVal(context, cardinality);
context->Free(src.ptr);
return result;
}
The above function crashes intermittently due to memory corruption. Considering most of the allocated memory is on the stack, is there a possibility of memory leak or some corrupt memory access? I can provide error logs but they seem to be pointing to memory errors for allocation or access.
Created 10-10-2019 09:32 AM
Just an FYI - the bytes stored in Kudu are ISO-8859-1 encoded and some contain null byte character which might not be the issue. or would it?