Support Questions
Find answers, ask questions, and share your expertise

UDA function crashes Impala

UDA function crashes Impala

New Contributor

I have implemented a UDA function based on HyperLogLog to calculate cardinality. The UDA  uses Java serialized bytes stored as strings in kudu, which needs to be reconstructed back to a HyperLogLog object and merge multiple of them together. The UDA function is of the type hll_merge(string) -> string which is the cardinality value.

 

All the intermediate values are StringVal. The rough code for doing the same is as below

 

 

IMPALA_UDF_EXPORT
void HllPlusPlusInit(FunctionContext* context, StringVal* val) {
   val->is_null = true;
}

IMPALA_UDF_EXPORT
void HllPlusPlusUpdate(FunctionContext* context, const StringVal& src, StringVal* result){
if (src.is_null) return;
vector<char> srcBytes = BytesFromStringVal(src);
if (result->is_null) {
    string s(srcBytes.begin(), srcBytes.end());
    uint8_t* copy = context->Allocate(s.size());
    if (copy == NULL) return;

    memcpy(copy, s.c_str(), s.size());
    *result = StringVal(copy, s.size());
    result->len = s.size();
    return;
}

vector<char> bytes = StringValToBytes(result);
Builder *b = new Builder(14, 25);
HyperLogLogPlusPlus hll = b->build(bytes);
HyperLogLogPlusPlus hll2 = b->build(srcBytes);
delete b;
hll.addAll(hll2);

vector<char> res = hll.getBytes();
string s(res.begin(), res.end());

result->ptr = context->Reallocate(result->ptr, s.size());
if (result->ptr == NULL) {
    //Allocation failed
    *result = StringVal::null();
    return;
}
memcpy(result->ptr, s.c_str(), s.size());
result->len = s.size();
}

IMPALA_UDF_EXPORT
void HllPlusPlusMerge(FunctionContext* context, const StringVal& src, StringVal* dst){
if (src.is_null) return;

if (dst->is_null) {
    vector<char> bytes = StringValToBytes(src);
    string s(bytes.begin(), bytes.end());
    uint8_t* copy = context->Allocate(s.size());
    if (copy == NULL) return;

    memcpy(copy, s.c_str(), s.size());
    *dst = StringVal(copy, s.size());
    return;
}

vector<char> srcBytes = StringValToBytes(src);
vector<char> bytes = StringValToBytes(dst);
Builder *b = new Builder(14, 25);
HyperLogLogPlusPlus hll = b->build(bytes);
HyperLogLogPlusPlus hll2 = b->build(srcBytes);
delete b;
hll.addAll(hll2);

vector<char> res = hll.getBytes();
string s(res.begin(), res.end());

dst->ptr = context->Reallocate(dst->ptr, s.size());
if (dst->ptr == NULL) {
    //Allocation failed
    *dst = StringVal::null();
    return;
}
memcpy(dst->ptr, s.c_str(), s.size());
dst->len = s.size();
}

IMPALA_UDF_EXPORT
StringVal HllPlusPlusSerialize(FunctionContext* context, const StringVal& src){
    if (src.is_null) {
         return StringVal::null();
    }

    StringVal result(context, src.len);
    memcpy(result.ptr, src.ptr, src.len);
    context->Free(src.ptr);
    return result;
}

IMPALA_UDF_EXPORT
StringVal HllPlusPlusFinalize(FunctionContext* context, const StringVal& src){
if (src.is_null) {
     return ToStringVal(context, 0);
}
Builder *b = new Builder(14, 25);
vector<char> srcBytes = StringValToBytes(src);
HyperLogLogPlusPlus hll = b->build(srcBytes);
delete b;
long cardinality = hll.cardinality();
StringVal result = ToStringVal(context, cardinality);

context->Free(src.ptr);
return result;
}

 

 The above function crashes intermittently due to memory corruption. Considering most of the allocated memory is on the stack, is there a possibility of memory leak or some corrupt memory access? I can provide error logs but they seem to be pointing to memory errors for allocation or access.

 

1 REPLY 1

Re: UDA function crashes Impala

New Contributor

Just an FYI -  the bytes stored in Kudu are ISO-8859-1 encoded and some contain null byte character which might not be the issue. or would it?