Member since
03-21-2019
5
Posts
0
Kudos Received
0
Solutions
03-26-2019
12:51 AM
Thanks for your reply, Tim. Does Impala support using third-party shared libraries such as boost_serialization in UDAF? I tried to use that to do serialization but it caused an error. libboost_serialization.so.1.53.0: cannot open shared object file: No such file or directory
... View more
03-24-2019
08:24 AM
Hi, Sahil. Thanks for your reply. Sorry, but according to the policy of my company, I cannot post the whole hs_err.log file. Here is a part of it. I am not sure whether it would be useful.
... View more
03-23-2019
08:37 AM
Hi, Sahil. Thanks for your help. It's a small table with only 20 records. I found the problem because Impala will crash immediately if I use the UDA function after an INSERT INTO clause, even with only one record added. I found the hs_err.log today. Does it mean that something went wrong with string values?
... View more
03-23-2019
08:03 AM
Hi, Tim. Thanks for your help. I removed that from the code, but Impala still crashed when I use the UDA function.
... View more
03-21-2019
11:41 PM
I created a UDA function which was written in C++. When I try to test it on some tables, it seems that when the table is stored as several parquet files, the UDA function will cause impala to crash. The error message is: Socket error 104: Connection reset by peer. However, the UDA function works fine with tables that stored in only one file. I checked the error logs but didn't find any useful information. The Version of Impala: Version 2.11.0-cdh5.14.4 Here is the UDA function: #include "funnel-uda.h"
#include <sstream>
#include <iostream>
#include <algorithm>
#include <vector>
#include <string>
#include <stdlib.h>
using namespace impala_udf;
using namespace std;
template <typename T>
StringVal ToStringVal(FunctionContext* context, const T& val) {
stringstream ss;
ss << val;
string str = ss.str();
StringVal string_val(context, str.size());
memcpy(string_val.ptr, str.c_str(), str.size());
return string_val;
}
struct EventPair {
std::string firstTimeStamp;
uint8_t step;
};
bool cmp(const std::pair<std::string, uint8_t> a, const std::pair<std::string, uint8_t> b) {
return a.first<b.first;
}
struct EventLogs
{
using TimestampEvent = std::pair<std::string, uint8_t>;
using TimestampEvents = std::vector<TimestampEvent>;
std::string windowSize;
TimestampEvents events_list;
bool sorted = true;
size_t size() const
{
return events_list.size();
}
void add(std::string timestamp, uint8_t step)
{
events_list.push_back(std::make_pair(timestamp,step));
}
void sort()
{
if (!sorted)
{
std::stable_sort(std::begin(events_list), std::end(events_list), cmp);
sorted = true;
}
}
void merge(const EventLogs & other)
{
const auto size = events_list.size();
events_list.insert(events_list.end(), other.events_list.begin(), other.events_list.end());
/// either sort whole container or do so partially merging ranges afterwards
if (!sorted && !other.sorted)
std::stable_sort(std::begin(events_list), std::end(events_list), cmp);
else
{
const auto begin = std::begin(events_list);
const auto middle = std::next(begin, size);
const auto end = std::end(events_list);
if (!sorted)
std::stable_sort(begin, middle, cmp);
if (!other.sorted)
std::stable_sort(middle, end, cmp);
std::inplace_merge(begin, middle, end, cmp);
}
sorted = true;
}
};
void SplitString(const std::string& s, std::vector<std::string>& v, const std::string& c)
{
std::string::size_type pos1, pos2;
pos2 = s.find(c);
pos1 = 0;
while(std::string::npos != pos2)
{
v.push_back(s.substr(pos1, pos2-pos1));
pos1 = pos2 + c.size();
pos2 = s.find(c, pos1);
}
if(pos1 != s.length())
v.push_back(s.substr(pos1));
}
void FunnelInit(FunctionContext* context, StringVal* val) {
EventLogs* eventLogs = new EventLogs();
val->ptr = (uint8_t*) eventLogs;
// Exit on failed allocation. Impala will fail the query after some time.
if (val->ptr == NULL) {
*val = StringVal::null();
return;
}
val->is_null = false;
val->len = sizeof(EventLogs);
}
void FunnelUpdate(FunctionContext* context, const StringVal& ev, const StringVal& timestamp, const StringVal& windowSize, const StringVal& funnelSteps, StringVal* result) {
if (ev.is_null || timestamp.is_null || windowSize.is_null || funnelSteps.is_null) return;
std::vector<std::string> funnelStepsVector;
std::string evString ((char *)ev.ptr, ev.len);
std::string timeStampString ((char *)timestamp.ptr, timestamp.len);
std::string funnelStepsString ((char *)funnelSteps.ptr, funnelSteps.len);
std::string windowSizeString((char *)windowSize.ptr, windowSize.len);
SplitString(funnelStepsString, funnelStepsVector, ",");
for (int i = 0; i < funnelStepsVector.size(); i++) {
std::vector<std::string> mergedFunnelStep;
SplitString(funnelStepsVector[i], mergedFunnelStep, "|");
for (int j=0; j< mergedFunnelStep.size(); j++) {
if (mergedFunnelStep[j] == evString) {
assert(result->len == sizeof(EventLogs));
EventLogs* eventLogs = reinterpret_cast<EventLogs*>(result->ptr);
eventLogs->windowSize = windowSizeString;
eventLogs->add(timeStampString, i);
}
}
}
}
void FunnelMerge(FunctionContext* context, const StringVal& src, StringVal* dst) {
if (src.is_null || dst->is_null) return;
const EventLogs* srcEventList = reinterpret_cast<const EventLogs*>(src.ptr);
EventLogs* dstEventList = reinterpret_cast<EventLogs*>(dst->ptr);
dstEventList->windowSize = srcEventList->windowSize.empty() ? dstEventList->windowSize : srcEventList->windowSize;
dstEventList->merge(*srcEventList);
}
StringVal FunnelFinalize(FunctionContext* context, const StringVal& src) {
if (src.is_null) return StringVal::null();
std::vector<std::pair<uint8_t, std::string> > funnelResult;
const EventLogs* eventList = reinterpret_cast<const EventLogs*>(src.ptr);
if (eventList -> events_list.empty()) {
return StringVal::null();
}
const std::string windowSize = eventList->windowSize;
for (const auto & pair : eventList->events_list) {
std::string timestamp = pair.first;
uint8_t currentStep = pair.second;
if (currentStep == 0) {
funnelResult.push_back(std::make_pair(0, timestamp));
} else {
for (auto& eventPair : funnelResult) {
std::string firstTimeStamp = eventPair.second;
uint8_t step = eventPair.first;
if ((currentStep == step + 1) && atol(timestamp.c_str()) <= atol(firstTimeStamp.c_str()) + atol(windowSize.c_str())) {
eventPair.first = currentStep;
}
}
}
}
const auto maxStepPair = std::minmax_element(funnelResult.begin(), funnelResult.end());
int maxStep = (int)(maxStepPair.second->first);
delete src.ptr;
return ToStringVal(context, maxStep);
} The query is: select uid,my_funnel(ev, time, '10', 'A,B,C,D') from impala_udf_test group by uid; The structure of the table: CREATE TABLE default.impala_udf_test (
uid STRING,
time STRING,
ev STRING )
WITH SERDEPROPERTIES ('serialization.format'='1')
STORED AS PARQUET data of the table: Best wishes, Kathy
... View more
Labels:
- Labels:
-
Apache Impala