Support Questions
Find answers, ask questions, and share your expertise

UDA function causes Impala to crash

New Contributor

I created a UDA function which was written in C++. When I try to test it on some tables, it seems that when the table is stored as several parquet files, the UDA function will cause impala to crash.

 

The error message is: 

Socket error 104: Connection reset by peer.

 

However, the UDA function works fine with tables that stored in only one file.

I checked the error logs but didn't find any useful information.

 

The Version of Impala:

Version 2.11.0-cdh5.14.4

 

Here is the UDA function:

#include "funnel-uda.h"
#include <sstream>
#include <iostream>
#include <algorithm>
#include <vector>
#include <string>
#include <stdlib.h>

using namespace impala_udf;
using namespace std;

template <typename T>
StringVal ToStringVal(FunctionContext* context, const T& val) {
  stringstream ss;
  ss << val;
  string str = ss.str();
  StringVal string_val(context, str.size());
  memcpy(string_val.ptr, str.c_str(), str.size());
  return string_val;
}

struct EventPair {
    std::string firstTimeStamp;
    uint8_t step;
};
bool cmp(const std::pair<std::string, uint8_t> a, const std::pair<std::string, uint8_t> b) {
    return a.first<b.first;
}

struct EventLogs
{
    using TimestampEvent = std::pair<std::string, uint8_t>;
    using TimestampEvents = std::vector<TimestampEvent>;
    std::string windowSize;
    TimestampEvents events_list;
    bool sorted = true;

    size_t size() const
    {
        return events_list.size();
    }

    void add(std::string timestamp, uint8_t step)
    {
        events_list.push_back(std::make_pair(timestamp,step));
    }
    void sort()
    {
        if (!sorted)
        {
            std::stable_sort(std::begin(events_list), std::end(events_list), cmp);
            sorted = true;
        }
    }

    void merge(const EventLogs & other)
    {
        const auto size = events_list.size();
        events_list.insert(events_list.end(), other.events_list.begin(), other.events_list.end());

        /// either sort whole container or do so partially merging ranges afterwards
        if (!sorted && !other.sorted)
            std::stable_sort(std::begin(events_list), std::end(events_list), cmp);
        else
        {
            const auto begin = std::begin(events_list);
            const auto middle = std::next(begin, size);
            const auto end = std::end(events_list);

            if (!sorted)
                std::stable_sort(begin, middle, cmp);

            if (!other.sorted)
                std::stable_sort(middle, end, cmp);

            std::inplace_merge(begin, middle, end, cmp);
        }

        sorted = true;
    }

};

void SplitString(const std::string& s, std::vector<std::string>& v, const std::string& c)
{
    std::string::size_type pos1, pos2;
    pos2 = s.find(c);
    pos1 = 0;
    while(std::string::npos != pos2)
    {
        v.push_back(s.substr(pos1, pos2-pos1));
        pos1 = pos2 + c.size();
        pos2 = s.find(c, pos1);
    }
    if(pos1 != s.length())
    v.push_back(s.substr(pos1));
}

void FunnelInit(FunctionContext* context, StringVal* val) {
    EventLogs* eventLogs = new EventLogs();
    val->ptr = (uint8_t*) eventLogs;
    // Exit on failed allocation. Impala will fail the query after some time.
    if (val->ptr == NULL) {
        *val = StringVal::null();
        return;
    }
    val->is_null = false;
    val->len = sizeof(EventLogs);
}

void FunnelUpdate(FunctionContext* context, const StringVal& ev, const StringVal& timestamp, const StringVal& windowSize, const StringVal& funnelSteps, StringVal* result) {

    if (ev.is_null || timestamp.is_null || windowSize.is_null || funnelSteps.is_null) return;
    
    std::vector<std::string> funnelStepsVector;

    std::string evString ((char *)ev.ptr, ev.len);
    std::string timeStampString ((char *)timestamp.ptr, timestamp.len); 
    std::string funnelStepsString ((char *)funnelSteps.ptr, funnelSteps.len);
    std::string windowSizeString((char *)windowSize.ptr, windowSize.len);

    SplitString(funnelStepsString, funnelStepsVector, ",");

    for (int i = 0; i < funnelStepsVector.size(); i++) {
        std::vector<std::string> mergedFunnelStep;
        SplitString(funnelStepsVector[i], mergedFunnelStep, "|");
        for (int j=0; j< mergedFunnelStep.size(); j++) {
            if (mergedFunnelStep[j] == evString) {
                assert(result->len == sizeof(EventLogs));
                EventLogs* eventLogs = reinterpret_cast<EventLogs*>(result->ptr);
                eventLogs->windowSize = windowSizeString;
                eventLogs->add(timeStampString, i);
            }
        }
    }
}

void FunnelMerge(FunctionContext* context, const StringVal& src, StringVal* dst) {
    if (src.is_null || dst->is_null) return;
    const EventLogs* srcEventList = reinterpret_cast<const EventLogs*>(src.ptr);
    EventLogs* dstEventList = reinterpret_cast<EventLogs*>(dst->ptr);
    dstEventList->windowSize = srcEventList->windowSize.empty() ? dstEventList->windowSize : srcEventList->windowSize;
    dstEventList->merge(*srcEventList);
}

StringVal FunnelFinalize(FunctionContext* context, const StringVal& src) {
    if (src.is_null) return StringVal::null();
    std::vector<std::pair<uint8_t, std::string> > funnelResult;
    const EventLogs* eventList = reinterpret_cast<const EventLogs*>(src.ptr);
    if (eventList -> events_list.empty()) {
        return StringVal::null();
    }
    const std::string windowSize = eventList->windowSize;
    for (const auto & pair : eventList->events_list) {
        std::string timestamp = pair.first;
        uint8_t currentStep = pair.second;
        if (currentStep == 0) {
            funnelResult.push_back(std::make_pair(0, timestamp));
        } else {
            for (auto& eventPair : funnelResult) {
                std::string firstTimeStamp = eventPair.second;
                uint8_t step = eventPair.first;
                if ((currentStep == step + 1) && atol(timestamp.c_str()) <= atol(firstTimeStamp.c_str()) + atol(windowSize.c_str())) {
                    eventPair.first = currentStep;
                }
            }
        }
    }

    const auto maxStepPair = std::minmax_element(funnelResult.begin(), funnelResult.end());
    int  maxStep = (int)(maxStepPair.second->first);
    delete src.ptr;
    return ToStringVal(context, maxStep);
}

The query is:

select uid,my_funnel(ev, time, '10', 'A,B,C,D') from impala_udf_test group by uid;

 

The structure of the table:

CREATE TABLE default.impala_udf_test (
    uid STRING,
    time STRING,
    ev STRING  )
WITH SERDEPROPERTIES ('serialization.format'='1')                                          
STORED AS PARQUET

data of the table:

image.png

 

Best wishes,

Kathy

 

 

 

10 REPLIES 10

Impala expect your UDF code and dependencies to be in a single .so, so you'd have to statically link any libraries you depend on.