Member since
04-17-2015
5
Posts
0
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2424 | 06-29-2015 02:11 AM | |
2667 | 04-17-2015 08:27 AM |
07-02-2015
06:14 AM
This is strange because my example works with many millions of items, now, i had this problem several times, for me the error cause is the bad initialization of a variable but you don't have a new struct variable. Sorry, i tested my last example and it works,
... View more
06-29-2015
02:11 AM
Ok, no problem, this is the final code: #include <algorithm>
#include <list>
#include <sstream>
#include <iostream>
#include <impala_udf/udf.h>
#include <impala_udf/udf-debug.h>
using namespace impala_udf;
using namespace std;
template <typename T>
StringVal ToStringVal(FunctionContext* context, const T& val)
{
stringstream ss;
ss << val;
string str = ss.str();
StringVal string_val(context, str.size());
memcpy(string_val.ptr, str.c_str(), str.size());
return string_val;
}
template <>
StringVal ToStringVal<DoubleVal>(FunctionContext* context, const DoubleVal& val)
{
if (val.is_null) return StringVal::null();
return ToStringVal(context, val.val);
}
struct t_percent
{
int nb;
std::list<double> *values;
double percent;
int64_t count;
};
void PercentInit(FunctionContext* context, StringVal* val)
{
val->is_null = false;
val->len = sizeof(t_percent);
val->ptr = context->Allocate(val->len);
memset(val->ptr, 0, val->len);
}
void PercentUpdate(FunctionContext* context, const DoubleVal& input,const DoubleVal& percent, StringVal* val)
{
if (input.is_null || percent.is_null || val->is_null) return;
t_percent* percentile = reinterpret_cast<t_percent*>(val->ptr);
if (percentile->values == NULL)
percentile->values = new std::list<double>();
percentile->nb += 1;
percentile->percent = percent.val;
percentile->values->push_back(input.val);
}
void PercentMerge(FunctionContext* context, const StringVal& src, StringVal* dst)
{
if (src.is_null || dst->is_null) return;
const t_percent* src_percentile = reinterpret_cast<t_percent*>(src.ptr);
t_percent* dst_percentile = reinterpret_cast<t_percent*>(dst->ptr);
if (dst_percentile->values == NULL)
dst_percentile->values = new std::list<double>();
dst_percentile->nb += src_percentile->nb;
dst_percentile->percent = src_percentile->percent;
dst_percentile->values->merge(*src_percentile->values);
}
const StringVal PercentSerialize(FunctionContext* context, const StringVal& val)
{
if (val.is_null)
return (StringVal::null());
StringVal result(context, val.len);
memcpy(result.ptr, val.ptr, val.len);
context->Free(val.ptr);
return result;
}
StringVal PercentFinalize(FunctionContext* context, const StringVal& val)
{
if (val.is_null)
return (StringVal::null());
t_percent* percentile = reinterpret_cast<t_percent*>(val.ptr);
if (percentile->values == NULL || percentile->values->empty())
return (StringVal::null());
StringVal result;
percentile->values->sort();
int n = percentile->percent * (percentile->nb - 1);
std::list<double>::iterator it = percentile->values->begin();
for (int i = 0; i < n ; ++i)
++it;
result = ToStringVal(context, *it);
delete(percentile->values);
context->Free(val.ptr);
return result;
}
... View more
04-17-2015
08:27 AM
I find the error, i forget to initialize a struct's variable (percent) in merge function and when i used a SINGLE NODE level, the executions uses only update and finalize methods.
... View more
04-17-2015
02:41 AM
Hello, I try to debug my UDA percentile function in cpp and i don't understand the impala operation. That is my percentile code : template <typename T>
StringVal ToStringVal(FunctionContext* context, const T& val) {
stringstream ss;
ss << val;
string str = ss.str();
StringVal string_val(context, str.size());
memcpy(string_val.ptr, str.c_str(), str.size());
return string_val;
}
template <>
StringVal ToStringVal<DoubleVal>(FunctionContext* context, const DoubleVal& val) {
if (val.is_null) return StringVal::null();
return ToStringVal(context, val.val);
}
struct t_percent
{
int nb;
std::list<double> *values;
double percent;
int64_t count;
};
void PercentInit(FunctionContext* context, StringVal* val)
{
val->is_null = false;
val->len = sizeof(t_percent);
val->ptr = context->Allocate(val->len);
memset(val->ptr, 0, val->len);
}
void PercentUpdate(FunctionContext* context, const DoubleVal& input,const DoubleVal& percent, StringVal* val)
{
if (input.is_null || percent.is_null || val->is_null) return;
t_percent* avg = reinterpret_cast<t_percent*>(val->ptr);
if (avg->values == NULL)
avg->values = new std::list<double>();
avg->nb += 1;
avg->percent = percent.val;
avg->values->push_back(input.val);
}
void PercentMerge(FunctionContext* context, const StringVal& src, StringVal* dst)
{
std::cout << "Merge\n";
if (src.is_null || dst->is_null) return;
t_percent* src_avg = reinterpret_cast<t_percent*>(src.ptr);
t_percent* dst_avg = reinterpret_cast<t_percent*>(dst->ptr);
if (dst_avg->values == NULL)
dst_avg->values = new std::list<double>();
dst_avg->nb += src_avg->nb;
for (std::list<double>::iterator it = src_avg->values->begin(); it != src_avg->values->end(); ++it)
dst_avg->values->push_back(*it);
}
const StringVal PercentSerialize(FunctionContext* context, const StringVal& val) {
std::cout << "Serialize\n";
if (val.is_null)
return (StringVal::null());
StringVal result(context, val.len);
memcpy(result.ptr, val.ptr, val.len);
t_percent* avg = reinterpret_cast<t_percent*>(result.ptr);
t_percent* old = reinterpret_cast<t_percent*>(val.ptr);
avg->values = old->values;
context->Free(val.ptr);
return result;
}
StringVal PercentFinalize(FunctionContext* context, const StringVal& val) {
std::cout << "Finalize\n";
if (val.is_null)
return (StringVal::null());
t_percent* avg = reinterpret_cast<t_percent*>(val.ptr);
if (avg->values == NULL || avg->values->empty())
return (StringVal::null());
StringVal result;
avg->values->sort();
int n = avg->percent * avg->nb;
std::list<double>::iterator it = avg->values->begin();
for (int i = 0; i < n ; ++i)
++it;
std::cout << " | n : " << n << " | it : " << *it << " \n";
result = ToStringVal(context, *it);
delete(avg->values);
context->Free(val.ptr);
return result;
} That is my Test Code : bool TestPercent() {
UdaTestHarness2<StringVal, StringVal, DoubleVal, DoubleVal> test(
PercentInit, PercentUpdate, PercentMerge, PercentSerialize, PercentFinalize);
vector<DoubleVal> input;
vector<DoubleVal> vpercent;
for (int i = 0; i < 1001; ++i) {
input.push_back(DoubleVal(i));
vpercent.push_back(DoubleVal(0.75));
}
if (!test.Execute(input, vpercent, StringVal("750"))) {
cerr << "Avg: " << test.GetErrorMsg() << endl;
return false;
}
return true;
}
int main(int argc, char** argv) {
bool passed = true;
passed &= TestPercent();
cerr << (passed ? "Tests passed." : "Tests failed.") << endl;
return 0;
} That is the result: Finalize
| n : 750 | it : 750
Serialize
Merge
Finalize
| n : 0 | it : 0
Avg: UDA failed running in one level distributed mode with 1 nodes.
Expected: 750 Actual: 0
Tests failed. We can to see, the good results in my first pass of my finalize function, and i don't understand why the program did not finish at that time. And also, my code works with UdaExecutionMode == SINGLE_NODE in parameter of execute test function. If someone can help me in this point, i would appreciate it.
... View more
Labels:
- Labels:
-
Apache Impala