Created on 04-17-2015 02:41 AM - edited 09-16-2022 02:26 AM
Hello,
I try to debug my UDA percentile function in cpp and i don't understand the impala operation.
That is my percentile code :
template <typename T> StringVal ToStringVal(FunctionContext* context, const T& val) { stringstream ss; ss << val; string str = ss.str(); StringVal string_val(context, str.size()); memcpy(string_val.ptr, str.c_str(), str.size()); return string_val; } template <> StringVal ToStringVal<DoubleVal>(FunctionContext* context, const DoubleVal& val) { if (val.is_null) return StringVal::null(); return ToStringVal(context, val.val); } struct t_percent { int nb; std::list<double> *values; double percent; int64_t count; }; void PercentInit(FunctionContext* context, StringVal* val) { val->is_null = false; val->len = sizeof(t_percent); val->ptr = context->Allocate(val->len); memset(val->ptr, 0, val->len); } void PercentUpdate(FunctionContext* context, const DoubleVal& input,const DoubleVal& percent, StringVal* val) { if (input.is_null || percent.is_null || val->is_null) return; t_percent* avg = reinterpret_cast<t_percent*>(val->ptr); if (avg->values == NULL) avg->values = new std::list<double>(); avg->nb += 1; avg->percent = percent.val; avg->values->push_back(input.val); } void PercentMerge(FunctionContext* context, const StringVal& src, StringVal* dst) { std::cout << "Merge\n"; if (src.is_null || dst->is_null) return; t_percent* src_avg = reinterpret_cast<t_percent*>(src.ptr); t_percent* dst_avg = reinterpret_cast<t_percent*>(dst->ptr); if (dst_avg->values == NULL) dst_avg->values = new std::list<double>(); dst_avg->nb += src_avg->nb; for (std::list<double>::iterator it = src_avg->values->begin(); it != src_avg->values->end(); ++it) dst_avg->values->push_back(*it); } const StringVal PercentSerialize(FunctionContext* context, const StringVal& val) { std::cout << "Serialize\n"; if (val.is_null) return (StringVal::null()); StringVal result(context, val.len); memcpy(result.ptr, val.ptr, val.len); t_percent* avg = reinterpret_cast<t_percent*>(result.ptr); t_percent* old = reinterpret_cast<t_percent*>(val.ptr); avg->values = old->values; context->Free(val.ptr); return result; } StringVal PercentFinalize(FunctionContext* context, const StringVal& val) { std::cout << "Finalize\n"; if (val.is_null) return (StringVal::null()); t_percent* avg = reinterpret_cast<t_percent*>(val.ptr); if (avg->values == NULL || avg->values->empty()) return (StringVal::null()); StringVal result; avg->values->sort(); int n = avg->percent * avg->nb; std::list<double>::iterator it = avg->values->begin(); for (int i = 0; i < n ; ++i) ++it; std::cout << " | n : " << n << " | it : " << *it << " \n"; result = ToStringVal(context, *it); delete(avg->values); context->Free(val.ptr); return result; }
That is my Test Code :
bool TestPercent() { UdaTestHarness2<StringVal, StringVal, DoubleVal, DoubleVal> test( PercentInit, PercentUpdate, PercentMerge, PercentSerialize, PercentFinalize); vector<DoubleVal> input; vector<DoubleVal> vpercent; for (int i = 0; i < 1001; ++i) { input.push_back(DoubleVal(i)); vpercent.push_back(DoubleVal(0.75)); } if (!test.Execute(input, vpercent, StringVal("750"))) { cerr << "Avg: " << test.GetErrorMsg() << endl; return false; } return true; } int main(int argc, char** argv) { bool passed = true; passed &= TestPercent(); cerr << (passed ? "Tests passed." : "Tests failed.") << endl; return 0; }
That is the result:
Finalize | n : 750 | it : 750 Serialize Merge Finalize | n : 0 | it : 0 Avg: UDA failed running in one level distributed mode with 1 nodes. Expected: 750 Actual: 0 Tests failed.
We can to see, the good results in my first pass of my finalize function, and i don't understand why the program did not finish at that time.
And also, my code works with UdaExecutionMode == SINGLE_NODE in parameter of execute test function.
If someone can help me in this point, i would appreciate it.
Created 04-17-2015 08:27 AM
I find the error, i forget to initialize a struct's variable (percent) in merge function and when i used a SINGLE NODE level, the executions uses only update and finalize methods.
Created 06-29-2015 02:11 AM
Ok, no problem, this is the final code:
#include <algorithm> #include <list> #include <sstream> #include <iostream> #include <impala_udf/udf.h> #include <impala_udf/udf-debug.h> using namespace impala_udf; using namespace std; template <typename T> StringVal ToStringVal(FunctionContext* context, const T& val) { stringstream ss; ss << val; string str = ss.str(); StringVal string_val(context, str.size()); memcpy(string_val.ptr, str.c_str(), str.size()); return string_val; } template <> StringVal ToStringVal<DoubleVal>(FunctionContext* context, const DoubleVal& val) { if (val.is_null) return StringVal::null(); return ToStringVal(context, val.val); } struct t_percent { int nb; std::list<double> *values; double percent; int64_t count; }; void PercentInit(FunctionContext* context, StringVal* val) { val->is_null = false; val->len = sizeof(t_percent); val->ptr = context->Allocate(val->len); memset(val->ptr, 0, val->len); } void PercentUpdate(FunctionContext* context, const DoubleVal& input,const DoubleVal& percent, StringVal* val) { if (input.is_null || percent.is_null || val->is_null) return; t_percent* percentile = reinterpret_cast<t_percent*>(val->ptr); if (percentile->values == NULL) percentile->values = new std::list<double>(); percentile->nb += 1; percentile->percent = percent.val; percentile->values->push_back(input.val); } void PercentMerge(FunctionContext* context, const StringVal& src, StringVal* dst) { if (src.is_null || dst->is_null) return; const t_percent* src_percentile = reinterpret_cast<t_percent*>(src.ptr); t_percent* dst_percentile = reinterpret_cast<t_percent*>(dst->ptr); if (dst_percentile->values == NULL) dst_percentile->values = new std::list<double>(); dst_percentile->nb += src_percentile->nb; dst_percentile->percent = src_percentile->percent; dst_percentile->values->merge(*src_percentile->values); } const StringVal PercentSerialize(FunctionContext* context, const StringVal& val) { if (val.is_null) return (StringVal::null()); StringVal result(context, val.len); memcpy(result.ptr, val.ptr, val.len); context->Free(val.ptr); return result; } StringVal PercentFinalize(FunctionContext* context, const StringVal& val) { if (val.is_null) return (StringVal::null()); t_percent* percentile = reinterpret_cast<t_percent*>(val.ptr); if (percentile->values == NULL || percentile->values->empty()) return (StringVal::null()); StringVal result; percentile->values->sort(); int n = percentile->percent * (percentile->nb - 1); std::list<double>::iterator it = percentile->values->begin(); for (int i = 0; i < n ; ++i) ++it; result = ToStringVal(context, *it); delete(percentile->values); context->Free(val.ptr); return result; }
Created 04-17-2015 08:27 AM
I find the error, i forget to initialize a struct's variable (percent) in merge function and when i used a SINGLE NODE level, the executions uses only update and finalize methods.
Created 06-19-2015 12:47 PM
Could you share your final code since you said that you have found the problem and resolved, thanks.
Created 06-29-2015 02:11 AM
Ok, no problem, this is the final code:
#include <algorithm> #include <list> #include <sstream> #include <iostream> #include <impala_udf/udf.h> #include <impala_udf/udf-debug.h> using namespace impala_udf; using namespace std; template <typename T> StringVal ToStringVal(FunctionContext* context, const T& val) { stringstream ss; ss << val; string str = ss.str(); StringVal string_val(context, str.size()); memcpy(string_val.ptr, str.c_str(), str.size()); return string_val; } template <> StringVal ToStringVal<DoubleVal>(FunctionContext* context, const DoubleVal& val) { if (val.is_null) return StringVal::null(); return ToStringVal(context, val.val); } struct t_percent { int nb; std::list<double> *values; double percent; int64_t count; }; void PercentInit(FunctionContext* context, StringVal* val) { val->is_null = false; val->len = sizeof(t_percent); val->ptr = context->Allocate(val->len); memset(val->ptr, 0, val->len); } void PercentUpdate(FunctionContext* context, const DoubleVal& input,const DoubleVal& percent, StringVal* val) { if (input.is_null || percent.is_null || val->is_null) return; t_percent* percentile = reinterpret_cast<t_percent*>(val->ptr); if (percentile->values == NULL) percentile->values = new std::list<double>(); percentile->nb += 1; percentile->percent = percent.val; percentile->values->push_back(input.val); } void PercentMerge(FunctionContext* context, const StringVal& src, StringVal* dst) { if (src.is_null || dst->is_null) return; const t_percent* src_percentile = reinterpret_cast<t_percent*>(src.ptr); t_percent* dst_percentile = reinterpret_cast<t_percent*>(dst->ptr); if (dst_percentile->values == NULL) dst_percentile->values = new std::list<double>(); dst_percentile->nb += src_percentile->nb; dst_percentile->percent = src_percentile->percent; dst_percentile->values->merge(*src_percentile->values); } const StringVal PercentSerialize(FunctionContext* context, const StringVal& val) { if (val.is_null) return (StringVal::null()); StringVal result(context, val.len); memcpy(result.ptr, val.ptr, val.len); context->Free(val.ptr); return result; } StringVal PercentFinalize(FunctionContext* context, const StringVal& val) { if (val.is_null) return (StringVal::null()); t_percent* percentile = reinterpret_cast<t_percent*>(val.ptr); if (percentile->values == NULL || percentile->values->empty()) return (StringVal::null()); StringVal result; percentile->values->sort(); int n = percentile->percent * (percentile->nb - 1); std::list<double>::iterator it = percentile->values->begin(); for (int i = 0; i < n ; ++i) ++it; result = ToStringVal(context, *it); delete(percentile->values); context->Free(val.ptr); return result; }
Created 07-01-2015 12:27 PM
I can run your code in the shell with -test.cc without the a problem, but when I created a function with it in impala, it is not working, only lines I added are:
87 StringVal result;
88 percentile->values->sort();
89
90 std::cerr << "Sorted list:" << endl;
91 for (std::list<double>::iterator it = percentile->values->begin(); it != percentile->values->end(); it++)
92 std::cerr << *it << ' ';
93 std::cerr << "\n===Sorted list:" << endl;
94
95 int n = percentile->percent * (percentile->nb - 1);
96 std::list<double>::iterator it = percentile->values->begin();
from 90-93, to see if the list sorted correctly.
impalad.INFO
.....
I0701 12:15:43.854324 30959 plan-fragment-executor.cc:190] descriptor table for fragment=7b405b9a021d464a:6b#
# A fatal error has been detected by the Java Runtime Environment:
#
# SIGSEGV (0xb) at pc=0x00007fb42a30ba3c, pid=27281, tid=140411399993088
#
# JRE version: Java(TM) SE Runtime Environment (7.0_67-b01) (build 1.7.0_67-b01)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode linux-amd64 compressed oops)
# Problematic frame:
# C [libudapctL.27281.0.so+0x3a3c] std::list<double, std::allocator<double> >::merge(std::list<double, std::allocator<double> >&)+0x80
#
# Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /var/run/cloudera-scm-agent/process/411-impala-IMPALAD/hs_err_pid27281.log
#
# If you would like to submit a bug report, please visit:
# http://bugreport.sun.com/bugreport/crash.jsp
#
5b38a813715cac
....
imalapd.ERROR
E0701 12:15:45.940914 9620 logging.cc:119] stderr will be logged to this file.
call PercentUpdate:
call PercentUpdate:
call PercentUpdate:
call PercentUpdate:
call PercentUpdate:
call PercentUpdate:
call PercentUpdate:
call PercentUpdate:
call PercentSerialize:
call PercentMerge:
call PercentMerge:
My data set have more the 20 elements, seems like to read up 1/3 then start to serialize it, and merge the result, but error out there. Did you ever had this problem?
Seems like
Created on 07-02-2015 06:14 AM - edited 07-02-2015 06:55 AM
This is strange because my example works with many millions of items, now, i had this problem several times, for me the error cause is the bad initialization of a variable but you don't have a new struct variable.
Sorry, i tested my last example and it works,