Created on 04-17-2015 02:41 AM - edited 09-16-2022 02:26 AM
Hello,
I try to debug my UDA percentile function in cpp and i don't understand the impala operation.
That is my percentile code :
template <typename T>
StringVal ToStringVal(FunctionContext* context, const T& val) {
stringstream ss;
ss << val;
string str = ss.str();
StringVal string_val(context, str.size());
memcpy(string_val.ptr, str.c_str(), str.size());
return string_val;
}
template <>
StringVal ToStringVal<DoubleVal>(FunctionContext* context, const DoubleVal& val) {
if (val.is_null) return StringVal::null();
return ToStringVal(context, val.val);
}
struct t_percent
{
int nb;
std::list<double> *values;
double percent;
int64_t count;
};
void PercentInit(FunctionContext* context, StringVal* val)
{
val->is_null = false;
val->len = sizeof(t_percent);
val->ptr = context->Allocate(val->len);
memset(val->ptr, 0, val->len);
}
void PercentUpdate(FunctionContext* context, const DoubleVal& input,const DoubleVal& percent, StringVal* val)
{
if (input.is_null || percent.is_null || val->is_null) return;
t_percent* avg = reinterpret_cast<t_percent*>(val->ptr);
if (avg->values == NULL)
avg->values = new std::list<double>();
avg->nb += 1;
avg->percent = percent.val;
avg->values->push_back(input.val);
}
void PercentMerge(FunctionContext* context, const StringVal& src, StringVal* dst)
{
std::cout << "Merge\n";
if (src.is_null || dst->is_null) return;
t_percent* src_avg = reinterpret_cast<t_percent*>(src.ptr);
t_percent* dst_avg = reinterpret_cast<t_percent*>(dst->ptr);
if (dst_avg->values == NULL)
dst_avg->values = new std::list<double>();
dst_avg->nb += src_avg->nb;
for (std::list<double>::iterator it = src_avg->values->begin(); it != src_avg->values->end(); ++it)
dst_avg->values->push_back(*it);
}
const StringVal PercentSerialize(FunctionContext* context, const StringVal& val) {
std::cout << "Serialize\n";
if (val.is_null)
return (StringVal::null());
StringVal result(context, val.len);
memcpy(result.ptr, val.ptr, val.len);
t_percent* avg = reinterpret_cast<t_percent*>(result.ptr);
t_percent* old = reinterpret_cast<t_percent*>(val.ptr);
avg->values = old->values;
context->Free(val.ptr);
return result;
}
StringVal PercentFinalize(FunctionContext* context, const StringVal& val) {
std::cout << "Finalize\n";
if (val.is_null)
return (StringVal::null());
t_percent* avg = reinterpret_cast<t_percent*>(val.ptr);
if (avg->values == NULL || avg->values->empty())
return (StringVal::null());
StringVal result;
avg->values->sort();
int n = avg->percent * avg->nb;
std::list<double>::iterator it = avg->values->begin();
for (int i = 0; i < n ; ++i)
++it;
std::cout << " | n : " << n << " | it : " << *it << " \n";
result = ToStringVal(context, *it);
delete(avg->values);
context->Free(val.ptr);
return result;
}That is my Test Code :
bool TestPercent() {
UdaTestHarness2<StringVal, StringVal, DoubleVal, DoubleVal> test(
PercentInit, PercentUpdate, PercentMerge, PercentSerialize, PercentFinalize);
vector<DoubleVal> input;
vector<DoubleVal> vpercent;
for (int i = 0; i < 1001; ++i) {
input.push_back(DoubleVal(i));
vpercent.push_back(DoubleVal(0.75));
}
if (!test.Execute(input, vpercent, StringVal("750"))) {
cerr << "Avg: " << test.GetErrorMsg() << endl;
return false;
}
return true;
}
int main(int argc, char** argv) {
bool passed = true;
passed &= TestPercent();
cerr << (passed ? "Tests passed." : "Tests failed.") << endl;
return 0;
}That is the result:
Finalize | n : 750 | it : 750 Serialize Merge Finalize | n : 0 | it : 0 Avg: UDA failed running in one level distributed mode with 1 nodes. Expected: 750 Actual: 0 Tests failed.
We can to see, the good results in my first pass of my finalize function, and i don't understand why the program did not finish at that time.
And also, my code works with UdaExecutionMode == SINGLE_NODE in parameter of execute test function.
If someone can help me in this point, i would appreciate it.
Created 04-17-2015 08:27 AM
I find the error, i forget to initialize a struct's variable (percent) in merge function and when i used a SINGLE NODE level, the executions uses only update and finalize methods.
Created 06-29-2015 02:11 AM
Ok, no problem, this is the final code:
#include <algorithm>
#include <list>
#include <sstream>
#include <iostream>
#include <impala_udf/udf.h>
#include <impala_udf/udf-debug.h>
using namespace impala_udf;
using namespace std;
template <typename T>
StringVal ToStringVal(FunctionContext* context, const T& val)
{
stringstream ss;
ss << val;
string str = ss.str();
StringVal string_val(context, str.size());
memcpy(string_val.ptr, str.c_str(), str.size());
return string_val;
}
template <>
StringVal ToStringVal<DoubleVal>(FunctionContext* context, const DoubleVal& val)
{
if (val.is_null) return StringVal::null();
return ToStringVal(context, val.val);
}
struct t_percent
{
int nb;
std::list<double> *values;
double percent;
int64_t count;
};
void PercentInit(FunctionContext* context, StringVal* val)
{
val->is_null = false;
val->len = sizeof(t_percent);
val->ptr = context->Allocate(val->len);
memset(val->ptr, 0, val->len);
}
void PercentUpdate(FunctionContext* context, const DoubleVal& input,const DoubleVal& percent, StringVal* val)
{
if (input.is_null || percent.is_null || val->is_null) return;
t_percent* percentile = reinterpret_cast<t_percent*>(val->ptr);
if (percentile->values == NULL)
percentile->values = new std::list<double>();
percentile->nb += 1;
percentile->percent = percent.val;
percentile->values->push_back(input.val);
}
void PercentMerge(FunctionContext* context, const StringVal& src, StringVal* dst)
{
if (src.is_null || dst->is_null) return;
const t_percent* src_percentile = reinterpret_cast<t_percent*>(src.ptr);
t_percent* dst_percentile = reinterpret_cast<t_percent*>(dst->ptr);
if (dst_percentile->values == NULL)
dst_percentile->values = new std::list<double>();
dst_percentile->nb += src_percentile->nb;
dst_percentile->percent = src_percentile->percent;
dst_percentile->values->merge(*src_percentile->values);
}
const StringVal PercentSerialize(FunctionContext* context, const StringVal& val)
{
if (val.is_null)
return (StringVal::null());
StringVal result(context, val.len);
memcpy(result.ptr, val.ptr, val.len);
context->Free(val.ptr);
return result;
}
StringVal PercentFinalize(FunctionContext* context, const StringVal& val)
{
if (val.is_null)
return (StringVal::null());
t_percent* percentile = reinterpret_cast<t_percent*>(val.ptr);
if (percentile->values == NULL || percentile->values->empty())
return (StringVal::null());
StringVal result;
percentile->values->sort();
int n = percentile->percent * (percentile->nb - 1);
std::list<double>::iterator it = percentile->values->begin();
for (int i = 0; i < n ; ++i)
++it;
result = ToStringVal(context, *it);
delete(percentile->values);
context->Free(val.ptr);
return result;
}
Created 04-17-2015 08:27 AM
I find the error, i forget to initialize a struct's variable (percent) in merge function and when i used a SINGLE NODE level, the executions uses only update and finalize methods.
Created 06-19-2015 12:47 PM
Could you share your final code since you said that you have found the problem and resolved, thanks.
Created 06-29-2015 02:11 AM
Ok, no problem, this is the final code:
#include <algorithm>
#include <list>
#include <sstream>
#include <iostream>
#include <impala_udf/udf.h>
#include <impala_udf/udf-debug.h>
using namespace impala_udf;
using namespace std;
template <typename T>
StringVal ToStringVal(FunctionContext* context, const T& val)
{
stringstream ss;
ss << val;
string str = ss.str();
StringVal string_val(context, str.size());
memcpy(string_val.ptr, str.c_str(), str.size());
return string_val;
}
template <>
StringVal ToStringVal<DoubleVal>(FunctionContext* context, const DoubleVal& val)
{
if (val.is_null) return StringVal::null();
return ToStringVal(context, val.val);
}
struct t_percent
{
int nb;
std::list<double> *values;
double percent;
int64_t count;
};
void PercentInit(FunctionContext* context, StringVal* val)
{
val->is_null = false;
val->len = sizeof(t_percent);
val->ptr = context->Allocate(val->len);
memset(val->ptr, 0, val->len);
}
void PercentUpdate(FunctionContext* context, const DoubleVal& input,const DoubleVal& percent, StringVal* val)
{
if (input.is_null || percent.is_null || val->is_null) return;
t_percent* percentile = reinterpret_cast<t_percent*>(val->ptr);
if (percentile->values == NULL)
percentile->values = new std::list<double>();
percentile->nb += 1;
percentile->percent = percent.val;
percentile->values->push_back(input.val);
}
void PercentMerge(FunctionContext* context, const StringVal& src, StringVal* dst)
{
if (src.is_null || dst->is_null) return;
const t_percent* src_percentile = reinterpret_cast<t_percent*>(src.ptr);
t_percent* dst_percentile = reinterpret_cast<t_percent*>(dst->ptr);
if (dst_percentile->values == NULL)
dst_percentile->values = new std::list<double>();
dst_percentile->nb += src_percentile->nb;
dst_percentile->percent = src_percentile->percent;
dst_percentile->values->merge(*src_percentile->values);
}
const StringVal PercentSerialize(FunctionContext* context, const StringVal& val)
{
if (val.is_null)
return (StringVal::null());
StringVal result(context, val.len);
memcpy(result.ptr, val.ptr, val.len);
context->Free(val.ptr);
return result;
}
StringVal PercentFinalize(FunctionContext* context, const StringVal& val)
{
if (val.is_null)
return (StringVal::null());
t_percent* percentile = reinterpret_cast<t_percent*>(val.ptr);
if (percentile->values == NULL || percentile->values->empty())
return (StringVal::null());
StringVal result;
percentile->values->sort();
int n = percentile->percent * (percentile->nb - 1);
std::list<double>::iterator it = percentile->values->begin();
for (int i = 0; i < n ; ++i)
++it;
result = ToStringVal(context, *it);
delete(percentile->values);
context->Free(val.ptr);
return result;
}
Created 07-01-2015 12:27 PM
I can run your code in the shell with -test.cc without the a problem, but when I created a function with it in impala, it is not working, only lines I added are:
87 StringVal result;
88 percentile->values->sort();
89
90 std::cerr << "Sorted list:" << endl;
91 for (std::list<double>::iterator it = percentile->values->begin(); it != percentile->values->end(); it++)
92 std::cerr << *it << ' ';
93 std::cerr << "\n===Sorted list:" << endl;
94
95 int n = percentile->percent * (percentile->nb - 1);
96 std::list<double>::iterator it = percentile->values->begin();
from 90-93, to see if the list sorted correctly.
impalad.INFO
.....
I0701 12:15:43.854324 30959 plan-fragment-executor.cc:190] descriptor table for fragment=7b405b9a021d464a:6b#
# A fatal error has been detected by the Java Runtime Environment:
#
# SIGSEGV (0xb) at pc=0x00007fb42a30ba3c, pid=27281, tid=140411399993088
#
# JRE version: Java(TM) SE Runtime Environment (7.0_67-b01) (build 1.7.0_67-b01)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode linux-amd64 compressed oops)
# Problematic frame:
# C [libudapctL.27281.0.so+0x3a3c] std::list<double, std::allocator<double> >::merge(std::list<double, std::allocator<double> >&)+0x80
#
# Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /var/run/cloudera-scm-agent/process/411-impala-IMPALAD/hs_err_pid27281.log
#
# If you would like to submit a bug report, please visit:
# http://bugreport.sun.com/bugreport/crash.jsp
#
5b38a813715cac
....
imalapd.ERROR
E0701 12:15:45.940914 9620 logging.cc:119] stderr will be logged to this file.
call PercentUpdate:
call PercentUpdate:
call PercentUpdate:
call PercentUpdate:
call PercentUpdate:
call PercentUpdate:
call PercentUpdate:
call PercentUpdate:
call PercentSerialize:
call PercentMerge:
call PercentMerge:
My data set have more the 20 elements, seems like to read up 1/3 then start to serialize it, and merge the result, but error out there. Did you ever had this problem?
Seems like
Created on 07-02-2015 06:14 AM - edited 07-02-2015 06:55 AM
This is strange because my example works with many millions of items, now, i had this problem several times, for me the error cause is the bad initialization of a variable but you don't have a new struct variable.
Sorry, i tested my last example and it works,