Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

UDA function crashes Impala

UDA function crashes Impala

New Contributor

I have implemented a UDA function based on HyperLogLog to calculate cardinality. The UDA  uses Java serialized bytes stored as strings in kudu, which needs to be reconstructed back to a HyperLogLog object and merge multiple of them together. The UDA function is of the type hll_merge(string) -> string which is the cardinality value.

 

All the intermediate values are StringVal. The rough code for doing the same is as below

 

 

IMPALA_UDF_EXPORT
void HllPlusPlusInit(FunctionContext* context, StringVal* val) {
   val->is_null = true;
}

IMPALA_UDF_EXPORT
void HllPlusPlusUpdate(FunctionContext* context, const StringVal& src, StringVal* result){
if (src.is_null) return;
vector<char> srcBytes = BytesFromStringVal(src);
if (result->is_null) {
    string s(srcBytes.begin(), srcBytes.end());
    uint8_t* copy = context->Allocate(s.size());
    if (copy == NULL) return;

    memcpy(copy, s.c_str(), s.size());
    *result = StringVal(copy, s.size());
    result->len = s.size();
    return;
}

vector<char> bytes = StringValToBytes(result);
Builder *b = new Builder(14, 25);
HyperLogLogPlusPlus hll = b->build(bytes);
HyperLogLogPlusPlus hll2 = b->build(srcBytes);
delete b;
hll.addAll(hll2);

vector<char> res = hll.getBytes();
string s(res.begin(), res.end());

result->ptr = context->Reallocate(result->ptr, s.size());
if (result->ptr == NULL) {
    //Allocation failed
    *result = StringVal::null();
    return;
}
memcpy(result->ptr, s.c_str(), s.size());
result->len = s.size();
}

IMPALA_UDF_EXPORT
void HllPlusPlusMerge(FunctionContext* context, const StringVal& src, StringVal* dst){
if (src.is_null) return;

if (dst->is_null) {
    vector<char> bytes = StringValToBytes(src);
    string s(bytes.begin(), bytes.end());
    uint8_t* copy = context->Allocate(s.size());
    if (copy == NULL) return;

    memcpy(copy, s.c_str(), s.size());
    *dst = StringVal(copy, s.size());
    return;
}

vector<char> srcBytes = StringValToBytes(src);
vector<char> bytes = StringValToBytes(dst);
Builder *b = new Builder(14, 25);
HyperLogLogPlusPlus hll = b->build(bytes);
HyperLogLogPlusPlus hll2 = b->build(srcBytes);
delete b;
hll.addAll(hll2);

vector<char> res = hll.getBytes();
string s(res.begin(), res.end());

dst->ptr = context->Reallocate(dst->ptr, s.size());
if (dst->ptr == NULL) {
    //Allocation failed
    *dst = StringVal::null();
    return;
}
memcpy(dst->ptr, s.c_str(), s.size());
dst->len = s.size();
}

IMPALA_UDF_EXPORT
StringVal HllPlusPlusSerialize(FunctionContext* context, const StringVal& src){
    if (src.is_null) {
         return StringVal::null();
    }

    StringVal result(context, src.len);
    memcpy(result.ptr, src.ptr, src.len);
    context->Free(src.ptr);
    return result;
}

IMPALA_UDF_EXPORT
StringVal HllPlusPlusFinalize(FunctionContext* context, const StringVal& src){
if (src.is_null) {
     return ToStringVal(context, 0);
}
Builder *b = new Builder(14, 25);
vector<char> srcBytes = StringValToBytes(src);
HyperLogLogPlusPlus hll = b->build(srcBytes);
delete b;
long cardinality = hll.cardinality();
StringVal result = ToStringVal(context, cardinality);

context->Free(src.ptr);
return result;
}

 

 The above function crashes intermittently due to memory corruption. Considering most of the allocated memory is on the stack, is there a possibility of memory leak or some corrupt memory access? I can provide error logs but they seem to be pointing to memory errors for allocation or access.

 

1 REPLY 1

Re: UDA function crashes Impala

New Contributor

Just an FYI -  the bytes stored in Kudu are ISO-8859-1 encoded and some contain null byte character which might not be the issue. or would it?

Don't have an account?
Coming from Hortonworks? Activate your account here