Created on 04-17-2015 02:41 AM - edited 09-16-2022 02:26 AM
Hello,
I try to debug my UDA percentile function in cpp and i don't understand the impala operation.
That is my percentile code :
template <typename T>
StringVal ToStringVal(FunctionContext* context, const T& val) {
    stringstream ss;
    ss << val;
    string str = ss.str();
    StringVal string_val(context, str.size());
    memcpy(string_val.ptr, str.c_str(), str.size());
    return string_val;
}
template <>
StringVal ToStringVal<DoubleVal>(FunctionContext* context, const DoubleVal& val) {
    if (val.is_null) return StringVal::null();
    return ToStringVal(context, val.val);
}
 struct t_percent
 {
    int nb;
    std::list<double> *values;
    double percent;
    int64_t count;
 };
 void PercentInit(FunctionContext* context, StringVal* val)
 {
    val->is_null = false;
    val->len = sizeof(t_percent);
    val->ptr = context->Allocate(val->len);
    memset(val->ptr, 0, val->len);
 }
 void PercentUpdate(FunctionContext* context, const DoubleVal& input,const DoubleVal& percent, StringVal* val)
 {
    if (input.is_null || percent.is_null || val->is_null) return;
    t_percent* avg = reinterpret_cast<t_percent*>(val->ptr);
    if (avg->values == NULL)
        avg->values = new std::list<double>();
    avg->nb += 1;
    avg->percent = percent.val;
    avg->values->push_back(input.val);
 }
 void PercentMerge(FunctionContext* context, const StringVal& src, StringVal* dst)
 {
    std::cout << "Merge\n";
    if (src.is_null || dst->is_null) return;
    t_percent* src_avg = reinterpret_cast<t_percent*>(src.ptr);
    t_percent* dst_avg = reinterpret_cast<t_percent*>(dst->ptr);
    if (dst_avg->values == NULL)
        dst_avg->values = new std::list<double>();
    dst_avg->nb += src_avg->nb;
    for (std::list<double>::iterator it = src_avg->values->begin(); it != src_avg->values->end(); ++it)
        dst_avg->values->push_back(*it);
 }
const StringVal PercentSerialize(FunctionContext* context, const StringVal& val) {
    std::cout << "Serialize\n";
    if (val.is_null)
        return (StringVal::null());
    StringVal result(context, val.len);
    memcpy(result.ptr, val.ptr, val.len);
    t_percent* avg = reinterpret_cast<t_percent*>(result.ptr);
    t_percent* old = reinterpret_cast<t_percent*>(val.ptr);
    avg->values = old->values;
    context->Free(val.ptr);
    return result;
}
StringVal PercentFinalize(FunctionContext* context, const StringVal& val) {
    std::cout << "Finalize\n";
    if (val.is_null)
        return (StringVal::null());
    t_percent* avg = reinterpret_cast<t_percent*>(val.ptr);
    if (avg->values == NULL || avg->values->empty())
        return (StringVal::null());
    StringVal result;
    avg->values->sort();
    int n = avg->percent * avg->nb;
    std::list<double>::iterator it = avg->values->begin();
    for (int i = 0; i < n ; ++i)
        ++it;
    std::cout <<  " | n : " << n << " | it : " << *it << " \n";
    result = ToStringVal(context, *it);
    delete(avg->values);
    context->Free(val.ptr);
    return result;
}That is my Test Code :
bool TestPercent() {
    UdaTestHarness2<StringVal, StringVal, DoubleVal, DoubleVal> test(
    PercentInit, PercentUpdate, PercentMerge, PercentSerialize, PercentFinalize);
    vector<DoubleVal> input;
    vector<DoubleVal> vpercent;
   for (int i = 0; i < 1001; ++i) {
        input.push_back(DoubleVal(i));
        vpercent.push_back(DoubleVal(0.75));
    }
    if (!test.Execute(input, vpercent, StringVal("750"))) {
        cerr << "Avg: " << test.GetErrorMsg() << endl;
        return false;
    }
    return true;
}
int main(int argc, char** argv) {
bool passed = true;
passed &= TestPercent();
cerr << (passed ? "Tests passed." : "Tests failed.") << endl;
return 0;
}That is the result:
Finalize | n : 750 | it : 750 Serialize Merge Finalize | n : 0 | it : 0 Avg: UDA failed running in one level distributed mode with 1 nodes. Expected: 750 Actual: 0 Tests failed.
We can to see, the good results in my first pass of my finalize function, and i don't understand why the program did not finish at that time.
And also, my code works with UdaExecutionMode == SINGLE_NODE in parameter of execute test function.
If someone can help me in this point, i would appreciate it.
Created 04-17-2015 08:27 AM
I find the error, i forget to initialize a struct's variable (percent) in merge function and when i used a SINGLE NODE level, the executions uses only update and finalize methods.
Created 06-29-2015 02:11 AM
Ok, no problem, this is the final code:
 #include <algorithm>
 #include <list>
 #include <sstream>
 #include <iostream>
 #include <impala_udf/udf.h>
 #include <impala_udf/udf-debug.h>
 using namespace impala_udf;
 using namespace std;
template <typename T>
StringVal ToStringVal(FunctionContext* context, const T& val)
{
    stringstream ss;
    ss << val;
    string str = ss.str();
    StringVal string_val(context, str.size());
    memcpy(string_val.ptr, str.c_str(), str.size());
    return string_val;
}
template <>
StringVal ToStringVal<DoubleVal>(FunctionContext* context, const DoubleVal& val)
{
    if (val.is_null) return StringVal::null();
    return ToStringVal(context, val.val);
}
 struct t_percent
 {
    int nb;
    std::list<double> *values;
    double percent;
    int64_t count;
 };
 void PercentInit(FunctionContext* context, StringVal* val)
 {
    val->is_null = false;
    val->len = sizeof(t_percent);
    val->ptr = context->Allocate(val->len);
    memset(val->ptr, 0, val->len);
 }
 void PercentUpdate(FunctionContext* context, const DoubleVal& input,const DoubleVal& percent, StringVal* val)
 {
    if (input.is_null || percent.is_null || val->is_null) return;
    t_percent* percentile = reinterpret_cast<t_percent*>(val->ptr);
    if (percentile->values == NULL)
        percentile->values = new std::list<double>();
    percentile->nb += 1;
    percentile->percent = percent.val;
    percentile->values->push_back(input.val);
 }
 void PercentMerge(FunctionContext* context, const StringVal& src, StringVal* dst)
 {
    if (src.is_null || dst->is_null) return;
    const t_percent* src_percentile = reinterpret_cast<t_percent*>(src.ptr);
    t_percent* dst_percentile = reinterpret_cast<t_percent*>(dst->ptr);
    if (dst_percentile->values == NULL)
        dst_percentile->values = new std::list<double>();
    dst_percentile->nb += src_percentile->nb;
    dst_percentile->percent = src_percentile->percent;
    dst_percentile->values->merge(*src_percentile->values);
 }
const StringVal PercentSerialize(FunctionContext* context, const StringVal& val)
{
    if (val.is_null)
        return (StringVal::null());
    StringVal result(context, val.len);
    memcpy(result.ptr, val.ptr, val.len);
    context->Free(val.ptr);
    return result;
}
StringVal PercentFinalize(FunctionContext* context, const StringVal& val)
{
    if (val.is_null)
        return (StringVal::null());
    t_percent* percentile = reinterpret_cast<t_percent*>(val.ptr);
    if (percentile->values == NULL || percentile->values->empty())
        return (StringVal::null());
    StringVal result;
    percentile->values->sort();
    int n = percentile->percent * (percentile->nb - 1);
    std::list<double>::iterator it = percentile->values->begin();
    for (int i = 0; i < n ; ++i)
        ++it;
    result = ToStringVal(context, *it);
    delete(percentile->values);
    context->Free(val.ptr);
    return result;
}
					
				
			
			
				
			
			
			
				
			
			
			
			
			
		Created 04-17-2015 08:27 AM
I find the error, i forget to initialize a struct's variable (percent) in merge function and when i used a SINGLE NODE level, the executions uses only update and finalize methods.
Created 06-19-2015 12:47 PM
Could you share your final code since you said that you have found the problem and resolved, thanks.
Created 06-29-2015 02:11 AM
Ok, no problem, this is the final code:
 #include <algorithm>
 #include <list>
 #include <sstream>
 #include <iostream>
 #include <impala_udf/udf.h>
 #include <impala_udf/udf-debug.h>
 using namespace impala_udf;
 using namespace std;
template <typename T>
StringVal ToStringVal(FunctionContext* context, const T& val)
{
    stringstream ss;
    ss << val;
    string str = ss.str();
    StringVal string_val(context, str.size());
    memcpy(string_val.ptr, str.c_str(), str.size());
    return string_val;
}
template <>
StringVal ToStringVal<DoubleVal>(FunctionContext* context, const DoubleVal& val)
{
    if (val.is_null) return StringVal::null();
    return ToStringVal(context, val.val);
}
 struct t_percent
 {
    int nb;
    std::list<double> *values;
    double percent;
    int64_t count;
 };
 void PercentInit(FunctionContext* context, StringVal* val)
 {
    val->is_null = false;
    val->len = sizeof(t_percent);
    val->ptr = context->Allocate(val->len);
    memset(val->ptr, 0, val->len);
 }
 void PercentUpdate(FunctionContext* context, const DoubleVal& input,const DoubleVal& percent, StringVal* val)
 {
    if (input.is_null || percent.is_null || val->is_null) return;
    t_percent* percentile = reinterpret_cast<t_percent*>(val->ptr);
    if (percentile->values == NULL)
        percentile->values = new std::list<double>();
    percentile->nb += 1;
    percentile->percent = percent.val;
    percentile->values->push_back(input.val);
 }
 void PercentMerge(FunctionContext* context, const StringVal& src, StringVal* dst)
 {
    if (src.is_null || dst->is_null) return;
    const t_percent* src_percentile = reinterpret_cast<t_percent*>(src.ptr);
    t_percent* dst_percentile = reinterpret_cast<t_percent*>(dst->ptr);
    if (dst_percentile->values == NULL)
        dst_percentile->values = new std::list<double>();
    dst_percentile->nb += src_percentile->nb;
    dst_percentile->percent = src_percentile->percent;
    dst_percentile->values->merge(*src_percentile->values);
 }
const StringVal PercentSerialize(FunctionContext* context, const StringVal& val)
{
    if (val.is_null)
        return (StringVal::null());
    StringVal result(context, val.len);
    memcpy(result.ptr, val.ptr, val.len);
    context->Free(val.ptr);
    return result;
}
StringVal PercentFinalize(FunctionContext* context, const StringVal& val)
{
    if (val.is_null)
        return (StringVal::null());
    t_percent* percentile = reinterpret_cast<t_percent*>(val.ptr);
    if (percentile->values == NULL || percentile->values->empty())
        return (StringVal::null());
    StringVal result;
    percentile->values->sort();
    int n = percentile->percent * (percentile->nb - 1);
    std::list<double>::iterator it = percentile->values->begin();
    for (int i = 0; i < n ; ++i)
        ++it;
    result = ToStringVal(context, *it);
    delete(percentile->values);
    context->Free(val.ptr);
    return result;
}
					
				
			
			
				
			
			
			
			
			
			
			
		Created 07-01-2015 12:27 PM
I can run your code in the shell with -test.cc without the a problem, but when I created a function with it in impala, it is not working, only lines I added are:
87 StringVal result;
88 percentile->values->sort();
89
90 std::cerr << "Sorted list:" << endl;
91 for (std::list<double>::iterator it = percentile->values->begin(); it != percentile->values->end(); it++)
92 std::cerr << *it << ' ';
93 std::cerr << "\n===Sorted list:" << endl;
94
95 int n = percentile->percent * (percentile->nb - 1);
96 std::list<double>::iterator it = percentile->values->begin();
from 90-93, to see if the list sorted correctly.
impalad.INFO
.....
I0701 12:15:43.854324 30959 plan-fragment-executor.cc:190] descriptor table for fragment=7b405b9a021d464a:6b#
# A fatal error has been detected by the Java Runtime Environment:
#
# SIGSEGV (0xb) at pc=0x00007fb42a30ba3c, pid=27281, tid=140411399993088
#
# JRE version: Java(TM) SE Runtime Environment (7.0_67-b01) (build 1.7.0_67-b01)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode linux-amd64 compressed oops)
# Problematic frame:
# C [libudapctL.27281.0.so+0x3a3c] std::list<double, std::allocator<double> >::merge(std::list<double, std::allocator<double> >&)+0x80
#
# Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /var/run/cloudera-scm-agent/process/411-impala-IMPALAD/hs_err_pid27281.log
#
# If you would like to submit a bug report, please visit:
# http://bugreport.sun.com/bugreport/crash.jsp
#
5b38a813715cac
....
imalapd.ERROR
E0701 12:15:45.940914 9620 logging.cc:119] stderr will be logged to this file.
call PercentUpdate:
call PercentUpdate:
call PercentUpdate:
call PercentUpdate:
call PercentUpdate:
call PercentUpdate:
call PercentUpdate:
call PercentUpdate:
call PercentSerialize:
call PercentMerge:
call PercentMerge:
My data set have more the 20 elements, seems like to read up 1/3 then start to serialize it, and merge the result, but error out there. Did you ever had this problem?
Seems like
Created on 07-02-2015 06:14 AM - edited 07-02-2015 06:55 AM
This is strange because my example works with many millions of items, now, i had this problem several times, for me the error cause is the bad initialization of a variable but you don't have a new struct variable.
Sorry, i tested my last example and it works,
 
					
				
				
			
		
