Created on 03-21-2019 11:41 PM - edited 09-16-2022 07:14 AM
I created a UDA function which was written in C++. When I try to test it on some tables, it seems that when the table is stored as several parquet files, the UDA function will cause impala to crash.
The error message is:
Socket error 104: Connection reset by peer.
However, the UDA function works fine with tables that stored in only one file.
I checked the error logs but didn't find any useful information.
The Version of Impala:
Version 2.11.0-cdh5.14.4
Here is the UDA function:
#include "funnel-uda.h" #include <sstream> #include <iostream> #include <algorithm> #include <vector> #include <string> #include <stdlib.h> using namespace impala_udf; using namespace std; template <typename T> StringVal ToStringVal(FunctionContext* context, const T& val) { stringstream ss; ss << val; string str = ss.str(); StringVal string_val(context, str.size()); memcpy(string_val.ptr, str.c_str(), str.size()); return string_val; } struct EventPair { std::string firstTimeStamp; uint8_t step; }; bool cmp(const std::pair<std::string, uint8_t> a, const std::pair<std::string, uint8_t> b) { return a.first<b.first; } struct EventLogs { using TimestampEvent = std::pair<std::string, uint8_t>; using TimestampEvents = std::vector<TimestampEvent>; std::string windowSize; TimestampEvents events_list; bool sorted = true; size_t size() const { return events_list.size(); } void add(std::string timestamp, uint8_t step) { events_list.push_back(std::make_pair(timestamp,step)); } void sort() { if (!sorted) { std::stable_sort(std::begin(events_list), std::end(events_list), cmp); sorted = true; } } void merge(const EventLogs & other) { const auto size = events_list.size(); events_list.insert(events_list.end(), other.events_list.begin(), other.events_list.end()); /// either sort whole container or do so partially merging ranges afterwards if (!sorted && !other.sorted) std::stable_sort(std::begin(events_list), std::end(events_list), cmp); else { const auto begin = std::begin(events_list); const auto middle = std::next(begin, size); const auto end = std::end(events_list); if (!sorted) std::stable_sort(begin, middle, cmp); if (!other.sorted) std::stable_sort(middle, end, cmp); std::inplace_merge(begin, middle, end, cmp); } sorted = true; } }; void SplitString(const std::string& s, std::vector<std::string>& v, const std::string& c) { std::string::size_type pos1, pos2; pos2 = s.find(c); pos1 = 0; while(std::string::npos != pos2) { v.push_back(s.substr(pos1, pos2-pos1)); pos1 = pos2 + c.size(); pos2 = s.find(c, pos1); } if(pos1 != s.length()) v.push_back(s.substr(pos1)); } void FunnelInit(FunctionContext* context, StringVal* val) { EventLogs* eventLogs = new EventLogs(); val->ptr = (uint8_t*) eventLogs; // Exit on failed allocation. Impala will fail the query after some time. if (val->ptr == NULL) { *val = StringVal::null(); return; } val->is_null = false; val->len = sizeof(EventLogs); } void FunnelUpdate(FunctionContext* context, const StringVal& ev, const StringVal& timestamp, const StringVal& windowSize, const StringVal& funnelSteps, StringVal* result) { if (ev.is_null || timestamp.is_null || windowSize.is_null || funnelSteps.is_null) return; std::vector<std::string> funnelStepsVector; std::string evString ((char *)ev.ptr, ev.len); std::string timeStampString ((char *)timestamp.ptr, timestamp.len); std::string funnelStepsString ((char *)funnelSteps.ptr, funnelSteps.len); std::string windowSizeString((char *)windowSize.ptr, windowSize.len); SplitString(funnelStepsString, funnelStepsVector, ","); for (int i = 0; i < funnelStepsVector.size(); i++) { std::vector<std::string> mergedFunnelStep; SplitString(funnelStepsVector[i], mergedFunnelStep, "|"); for (int j=0; j< mergedFunnelStep.size(); j++) { if (mergedFunnelStep[j] == evString) { assert(result->len == sizeof(EventLogs)); EventLogs* eventLogs = reinterpret_cast<EventLogs*>(result->ptr); eventLogs->windowSize = windowSizeString; eventLogs->add(timeStampString, i); } } } } void FunnelMerge(FunctionContext* context, const StringVal& src, StringVal* dst) { if (src.is_null || dst->is_null) return; const EventLogs* srcEventList = reinterpret_cast<const EventLogs*>(src.ptr); EventLogs* dstEventList = reinterpret_cast<EventLogs*>(dst->ptr); dstEventList->windowSize = srcEventList->windowSize.empty() ? dstEventList->windowSize : srcEventList->windowSize; dstEventList->merge(*srcEventList); } StringVal FunnelFinalize(FunctionContext* context, const StringVal& src) { if (src.is_null) return StringVal::null(); std::vector<std::pair<uint8_t, std::string> > funnelResult; const EventLogs* eventList = reinterpret_cast<const EventLogs*>(src.ptr); if (eventList -> events_list.empty()) { return StringVal::null(); } const std::string windowSize = eventList->windowSize; for (const auto & pair : eventList->events_list) { std::string timestamp = pair.first; uint8_t currentStep = pair.second; if (currentStep == 0) { funnelResult.push_back(std::make_pair(0, timestamp)); } else { for (auto& eventPair : funnelResult) { std::string firstTimeStamp = eventPair.second; uint8_t step = eventPair.first; if ((currentStep == step + 1) && atol(timestamp.c_str()) <= atol(firstTimeStamp.c_str()) + atol(windowSize.c_str())) { eventPair.first = currentStep; } } } } const auto maxStepPair = std::minmax_element(funnelResult.begin(), funnelResult.end()); int maxStep = (int)(maxStepPair.second->first); delete src.ptr; return ToStringVal(context, maxStep); }
The query is:
select uid,my_funnel(ev, time, '10', 'A,B,C,D') from impala_udf_test group by uid;
The structure of the table:
CREATE TABLE default.impala_udf_test ( uid STRING, time STRING, ev STRING ) WITH SERDEPROPERTIES ('serialization.format'='1') STORED AS PARQUET
data of the table:
Best wishes,
Kathy
Created 03-25-2019 12:36 AM
void FunnelInit(FunctionContext* context, StringVal* val) { EventLogs* eventLogs = new EventLogs(); val->ptr = (uint8_t*) eventLogs; // Exit on failed allocation. Impala will fail the query after some time. if (val->ptr == NULL) { *val = StringVal::null(); return; } val->is_null = false; val->len = sizeof(EventLogs); }
I did another scan and the memory management in the above function is also slightly problematic - the memory attached to the intermediate StringVal would be better allocated from the Impala UDF interface so that Impala can track the memory consumption. E.g. see https://github.com/cloudera/impala-udf-samples/blob/bc70833/uda-sample.cc#L76 .
I think the real issue though is the EventLogs data structure and lack of a Serialize() function. It's a somewhat complex nested structure with the string and vector. In order for the UDA to work, you need to have a Serialize() function that flattens out the intermediate result into a single StringVal. This is pretty unavoidable since Impala needs to be able to send the intermediate values over the network and/or write it to disk, and Impala doesn't know enough about your data structure to do it automatically. Our docs do mention this here https://www.cloudera.com/documentation/enterprise/latest/topics/impala_udf.html#udafs.
Putting it into practice is a bit tricky. One working example is the implementation of reservoir sampling in Impala itself. Unfortunately I think it's a little over-complicated:https://github.com/apache/impala/blob/df53ec/be/src/exprs/aggregate-functions-ir.cc#L1067
The general pattern for complex intermediate values is to have a "header" that lets your determine whether the intermediate value is currently serialized, then either the deserialized representation, or the serialized representation after the "header" using a flexible array member or similar - https://en.wikipedia.org/wiki/Flexible_array_member. The Serialize() function will convert the representation by packing any nested structures into a single StringVal() with the header in front. Then other functions can switch back to the deserialized representation. Or in some cases, you can be clever and avoid the conversion in some case (that's what the reservoir sample function above is doing, and part of why it's overly complex). Anyway, a really rough illustration of the idea is as follows:
struct DeserializedValue { ... } struct IntermediateValue { bool serialized; union { DeserializedValue val; char buf[0]; }; StringVal Serialize() { if (serialized) { // Just copy serialized representation to output StringVal } else { // Flatten val into an output StringVal } } void DeserializeIfNeeded() { if (serialized) { // Unpack buf into val. } } };
Just as a side note, the use of C++ builtin vector and string in the intermediate value can be problematic if they're large, since Impala doesn't account for the memory involved. But that's very much a second-order problem compared to it not working at all.
Created 03-26-2019 10:19 AM
Impala expect your UDF code and dependencies to be in a single .so, so you'd have to statically link any libraries you depend on.