Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

[UDA] Impala Operation Problem

avatar
Explorer

Hello,

 

I try to debug my UDA percentile function in cpp and i don't understand the impala operation.

 

That is my percentile code :

template <typename T>
StringVal ToStringVal(FunctionContext* context, const T& val) {
    stringstream ss;
    ss << val;
    string str = ss.str();
    StringVal string_val(context, str.size());
    memcpy(string_val.ptr, str.c_str(), str.size());
    return string_val;
}

template <>
StringVal ToStringVal<DoubleVal>(FunctionContext* context, const DoubleVal& val) {
    if (val.is_null) return StringVal::null();
    return ToStringVal(context, val.val);
}

 struct t_percent
 {
    int nb;
    std::list<double> *values;
    double percent;
    int64_t count;
 };

 void PercentInit(FunctionContext* context, StringVal* val)
 {
    val->is_null = false;
    val->len = sizeof(t_percent);
    val->ptr = context->Allocate(val->len);
    memset(val->ptr, 0, val->len);
 }

 void PercentUpdate(FunctionContext* context, const DoubleVal& input,const DoubleVal& percent, StringVal* val)
 {
    if (input.is_null || percent.is_null || val->is_null) return;

    t_percent* avg = reinterpret_cast<t_percent*>(val->ptr);
    if (avg->values == NULL)
        avg->values = new std::list<double>();
    avg->nb += 1;
    avg->percent = percent.val;
    avg->values->push_back(input.val);
 }

 void PercentMerge(FunctionContext* context, const StringVal& src, StringVal* dst)
 {
    std::cout << "Merge\n";
    if (src.is_null || dst->is_null) return;
    t_percent* src_avg = reinterpret_cast<t_percent*>(src.ptr);
    t_percent* dst_avg = reinterpret_cast<t_percent*>(dst->ptr);
    if (dst_avg->values == NULL)
        dst_avg->values = new std::list<double>();
    dst_avg->nb += src_avg->nb;
    for (std::list<double>::iterator it = src_avg->values->begin(); it != src_avg->values->end(); ++it)
        dst_avg->values->push_back(*it);
 }

const StringVal PercentSerialize(FunctionContext* context, const StringVal& val) {
    std::cout << "Serialize\n";
    if (val.is_null)
        return (StringVal::null());
    StringVal result(context, val.len);
    memcpy(result.ptr, val.ptr, val.len);
    t_percent* avg = reinterpret_cast<t_percent*>(result.ptr);
    t_percent* old = reinterpret_cast<t_percent*>(val.ptr);
    avg->values = old->values;
    context->Free(val.ptr);
    return result;
}

StringVal PercentFinalize(FunctionContext* context, const StringVal& val) {
    std::cout << "Finalize\n";
    if (val.is_null)
        return (StringVal::null());
    t_percent* avg = reinterpret_cast<t_percent*>(val.ptr);
    if (avg->values == NULL || avg->values->empty())
        return (StringVal::null());
    StringVal result;
    avg->values->sort();

    int n = avg->percent * avg->nb;
    std::list<double>::iterator it = avg->values->begin();
    for (int i = 0; i < n ; ++i)
        ++it;
    std::cout <<  " | n : " << n << " | it : " << *it << " \n";
    result = ToStringVal(context, *it);
    delete(avg->values);
    context->Free(val.ptr);
    return result;
}

That is my Test Code :

bool TestPercent() {
    UdaTestHarness2<StringVal, StringVal, DoubleVal, DoubleVal> test(
    PercentInit, PercentUpdate, PercentMerge, PercentSerialize, PercentFinalize);

    vector<DoubleVal> input;
    vector<DoubleVal> vpercent;

   for (int i = 0; i < 1001; ++i) {
        input.push_back(DoubleVal(i));
        vpercent.push_back(DoubleVal(0.75));
    }

    if (!test.Execute(input, vpercent, StringVal("750"))) {
        cerr << "Avg: " << test.GetErrorMsg() << endl;
        return false;
    }
    return true;
}

int main(int argc, char** argv) {
bool passed = true;
passed &= TestPercent();
cerr << (passed ? "Tests passed." : "Tests failed.") << endl;
return 0;
}

That is the result:

Finalize
 | n : 750 | it : 750 
Serialize
Merge
Finalize
 | n : 0 | it : 0 
Avg: UDA failed running in one level distributed mode with 1 nodes.
Expected: 750 Actual: 0
Tests failed.

We can to see, the good results in my first pass of my finalize function, and i don't understand why the program did not finish at that time.

And also, my code works with UdaExecutionMode == SINGLE_NODE in parameter of execute test function.

If someone can help me in this point, i would appreciate it.

 

2 ACCEPTED SOLUTIONS

avatar
Explorer

I find the error, i forget to initialize a struct's variable (percent) in merge function and when i used a SINGLE NODE level, the executions uses only update and finalize methods.

View solution in original post

avatar
Explorer

Ok, no problem, this is the final code:

 

 #include <algorithm>
 #include <list>
 #include <sstream>
 #include <iostream>

 #include <impala_udf/udf.h>
 #include <impala_udf/udf-debug.h>

 using namespace impala_udf;
 using namespace std;

template <typename T>
StringVal ToStringVal(FunctionContext* context, const T& val)
{
    stringstream ss;
    ss << val;
    string str = ss.str();
    StringVal string_val(context, str.size());
    memcpy(string_val.ptr, str.c_str(), str.size());
    return string_val;
}

template <>
StringVal ToStringVal<DoubleVal>(FunctionContext* context, const DoubleVal& val)
{
    if (val.is_null) return StringVal::null();
    return ToStringVal(context, val.val);
}

 struct t_percent
 {
    int nb;
    std::list<double> *values;
    double percent;
    int64_t count;
 };

 void PercentInit(FunctionContext* context, StringVal* val)
 {
    val->is_null = false;
    val->len = sizeof(t_percent);
    val->ptr = context->Allocate(val->len);
    memset(val->ptr, 0, val->len);
 }

 void PercentUpdate(FunctionContext* context, const DoubleVal& input,const DoubleVal& percent, StringVal* val)
 {
    if (input.is_null || percent.is_null || val->is_null) return;

    t_percent* percentile = reinterpret_cast<t_percent*>(val->ptr);
    if (percentile->values == NULL)
        percentile->values = new std::list<double>();
    percentile->nb += 1;
    percentile->percent = percent.val;
    percentile->values->push_back(input.val);
 }

 void PercentMerge(FunctionContext* context, const StringVal& src, StringVal* dst)
 {
    if (src.is_null || dst->is_null) return;
    const t_percent* src_percentile = reinterpret_cast<t_percent*>(src.ptr);
    t_percent* dst_percentile = reinterpret_cast<t_percent*>(dst->ptr);
    if (dst_percentile->values == NULL)
        dst_percentile->values = new std::list<double>();
    dst_percentile->nb += src_percentile->nb;
    dst_percentile->percent = src_percentile->percent;
    dst_percentile->values->merge(*src_percentile->values);
 }

const StringVal PercentSerialize(FunctionContext* context, const StringVal& val)
{
    if (val.is_null)
        return (StringVal::null());
    StringVal result(context, val.len);
    memcpy(result.ptr, val.ptr, val.len);
    context->Free(val.ptr);
    return result;
}

StringVal PercentFinalize(FunctionContext* context, const StringVal& val)
{
    if (val.is_null)
        return (StringVal::null());
    t_percent* percentile = reinterpret_cast<t_percent*>(val.ptr);
    if (percentile->values == NULL || percentile->values->empty())
        return (StringVal::null());
    StringVal result;
    percentile->values->sort();

    int n = percentile->percent * (percentile->nb - 1);
    std::list<double>::iterator it = percentile->values->begin();
    for (int i = 0; i < n ; ++i)
        ++it;
    result = ToStringVal(context, *it);
    delete(percentile->values);
    context->Free(val.ptr);
    return result;
}

View solution in original post

5 REPLIES 5

avatar
Explorer

I find the error, i forget to initialize a struct's variable (percent) in merge function and when i used a SINGLE NODE level, the executions uses only update and finalize methods.

avatar
New Contributor

Could you share your final code since you said that you have found the problem and resolved, thanks.

avatar
Explorer

Ok, no problem, this is the final code:

 

 #include <algorithm>
 #include <list>
 #include <sstream>
 #include <iostream>

 #include <impala_udf/udf.h>
 #include <impala_udf/udf-debug.h>

 using namespace impala_udf;
 using namespace std;

template <typename T>
StringVal ToStringVal(FunctionContext* context, const T& val)
{
    stringstream ss;
    ss << val;
    string str = ss.str();
    StringVal string_val(context, str.size());
    memcpy(string_val.ptr, str.c_str(), str.size());
    return string_val;
}

template <>
StringVal ToStringVal<DoubleVal>(FunctionContext* context, const DoubleVal& val)
{
    if (val.is_null) return StringVal::null();
    return ToStringVal(context, val.val);
}

 struct t_percent
 {
    int nb;
    std::list<double> *values;
    double percent;
    int64_t count;
 };

 void PercentInit(FunctionContext* context, StringVal* val)
 {
    val->is_null = false;
    val->len = sizeof(t_percent);
    val->ptr = context->Allocate(val->len);
    memset(val->ptr, 0, val->len);
 }

 void PercentUpdate(FunctionContext* context, const DoubleVal& input,const DoubleVal& percent, StringVal* val)
 {
    if (input.is_null || percent.is_null || val->is_null) return;

    t_percent* percentile = reinterpret_cast<t_percent*>(val->ptr);
    if (percentile->values == NULL)
        percentile->values = new std::list<double>();
    percentile->nb += 1;
    percentile->percent = percent.val;
    percentile->values->push_back(input.val);
 }

 void PercentMerge(FunctionContext* context, const StringVal& src, StringVal* dst)
 {
    if (src.is_null || dst->is_null) return;
    const t_percent* src_percentile = reinterpret_cast<t_percent*>(src.ptr);
    t_percent* dst_percentile = reinterpret_cast<t_percent*>(dst->ptr);
    if (dst_percentile->values == NULL)
        dst_percentile->values = new std::list<double>();
    dst_percentile->nb += src_percentile->nb;
    dst_percentile->percent = src_percentile->percent;
    dst_percentile->values->merge(*src_percentile->values);
 }

const StringVal PercentSerialize(FunctionContext* context, const StringVal& val)
{
    if (val.is_null)
        return (StringVal::null());
    StringVal result(context, val.len);
    memcpy(result.ptr, val.ptr, val.len);
    context->Free(val.ptr);
    return result;
}

StringVal PercentFinalize(FunctionContext* context, const StringVal& val)
{
    if (val.is_null)
        return (StringVal::null());
    t_percent* percentile = reinterpret_cast<t_percent*>(val.ptr);
    if (percentile->values == NULL || percentile->values->empty())
        return (StringVal::null());
    StringVal result;
    percentile->values->sort();

    int n = percentile->percent * (percentile->nb - 1);
    std::list<double>::iterator it = percentile->values->begin();
    for (int i = 0; i < n ; ++i)
        ++it;
    result = ToStringVal(context, *it);
    delete(percentile->values);
    context->Free(val.ptr);
    return result;
}

avatar
New Contributor

I can run your code in the shell with -test.cc without the a problem, but when I created a function with it in impala, it is not working, only lines I added are:

 87     StringVal result;

 88     percentile->values->sort();

 89 

 90     std::cerr << "Sorted list:" << endl;

 91     for (std::list<double>::iterator it = percentile->values->begin(); it != percentile->values->end(); it++)

 92         std::cerr << *it << ' ';

 93     std::cerr << "\n===Sorted list:" << endl;

 94 

 95     int n = percentile->percent * (percentile->nb - 1);

 96     std::list<double>::iterator it = percentile->values->begin();

 

from 90-93, to see if the list sorted correctly.

 

impalad.INFO

.....

I0701 12:15:43.854324 30959 plan-fragment-executor.cc:190] descriptor table for fragment=7b405b9a021d464a:6b#

# A fatal error has been detected by the Java Runtime Environment:

#

#  SIGSEGV (0xb) at pc=0x00007fb42a30ba3c, pid=27281, tid=140411399993088

#

# JRE version: Java(TM) SE Runtime Environment (7.0_67-b01) (build 1.7.0_67-b01)

# Java VM: Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode linux-amd64 compressed oops)

# Problematic frame:

# C  [libudapctL.27281.0.so+0x3a3c]  std::list<double, std::allocator<double> >::merge(std::list<double, std::allocator<double> >&)+0x80

#

# Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again

#

# An error report file with more information is saved as:

# /var/run/cloudera-scm-agent/process/411-impala-IMPALAD/hs_err_pid27281.log

#

# If you would like to submit a bug report, please visit:

#   http://bugreport.sun.com/bugreport/crash.jsp

#

5b38a813715cac

....

imalapd.ERROR

E0701 12:15:45.940914  9620 logging.cc:119] stderr will be logged to this file.

call PercentUpdate:

call PercentUpdate:

call PercentUpdate:

call PercentUpdate:

call PercentUpdate:

call PercentUpdate:

call PercentUpdate:

call PercentUpdate:

call PercentSerialize:

call PercentMerge:

call PercentMerge:

My data set have more the 20 elements, seems like to read up 1/3 then start to serialize it, and merge the result, but error out there. Did you ever had this problem? 

 Seems like

avatar
Explorer

This is strange because my example works with many millions of items, now, i had this problem several times, for me the error cause is the bad initialization of a variable but you don't have a new struct variable.

Sorry, i tested my last example and it works,