Created on 03-21-2019 11:41 PM - edited 09-16-2022 07:14 AM
I created a UDA function which was written in C++. When I try to test it on some tables, it seems that when the table is stored as several parquet files, the UDA function will cause impala to crash.
The error message is:
Socket error 104: Connection reset by peer.
However, the UDA function works fine with tables that stored in only one file.
I checked the error logs but didn't find any useful information.
The Version of Impala:
Version 2.11.0-cdh5.14.4
Here is the UDA function:
#include "funnel-uda.h"
#include <sstream>
#include <iostream>
#include <algorithm>
#include <vector>
#include <string>
#include <stdlib.h>
using namespace impala_udf;
using namespace std;
template <typename T>
StringVal ToStringVal(FunctionContext* context, const T& val) {
  stringstream ss;
  ss << val;
  string str = ss.str();
  StringVal string_val(context, str.size());
  memcpy(string_val.ptr, str.c_str(), str.size());
  return string_val;
}
struct EventPair {
    std::string firstTimeStamp;
    uint8_t step;
};
bool cmp(const std::pair<std::string, uint8_t> a, const std::pair<std::string, uint8_t> b) {
    return a.first<b.first;
}
struct EventLogs
{
    using TimestampEvent = std::pair<std::string, uint8_t>;
    using TimestampEvents = std::vector<TimestampEvent>;
    std::string windowSize;
    TimestampEvents events_list;
    bool sorted = true;
    size_t size() const
    {
        return events_list.size();
    }
    void add(std::string timestamp, uint8_t step)
    {
        events_list.push_back(std::make_pair(timestamp,step));
    }
    void sort()
    {
        if (!sorted)
        {
            std::stable_sort(std::begin(events_list), std::end(events_list), cmp);
            sorted = true;
        }
    }
    void merge(const EventLogs & other)
    {
        const auto size = events_list.size();
        events_list.insert(events_list.end(), other.events_list.begin(), other.events_list.end());
        /// either sort whole container or do so partially merging ranges afterwards
        if (!sorted && !other.sorted)
            std::stable_sort(std::begin(events_list), std::end(events_list), cmp);
        else
        {
            const auto begin = std::begin(events_list);
            const auto middle = std::next(begin, size);
            const auto end = std::end(events_list);
            if (!sorted)
                std::stable_sort(begin, middle, cmp);
            if (!other.sorted)
                std::stable_sort(middle, end, cmp);
            std::inplace_merge(begin, middle, end, cmp);
        }
        sorted = true;
    }
};
void SplitString(const std::string& s, std::vector<std::string>& v, const std::string& c)
{
    std::string::size_type pos1, pos2;
    pos2 = s.find(c);
    pos1 = 0;
    while(std::string::npos != pos2)
    {
        v.push_back(s.substr(pos1, pos2-pos1));
        pos1 = pos2 + c.size();
        pos2 = s.find(c, pos1);
    }
    if(pos1 != s.length())
    v.push_back(s.substr(pos1));
}
void FunnelInit(FunctionContext* context, StringVal* val) {
    EventLogs* eventLogs = new EventLogs();
    val->ptr = (uint8_t*) eventLogs;
    // Exit on failed allocation. Impala will fail the query after some time.
    if (val->ptr == NULL) {
        *val = StringVal::null();
        return;
    }
    val->is_null = false;
    val->len = sizeof(EventLogs);
}
void FunnelUpdate(FunctionContext* context, const StringVal& ev, const StringVal& timestamp, const StringVal& windowSize, const StringVal& funnelSteps, StringVal* result) {
    if (ev.is_null || timestamp.is_null || windowSize.is_null || funnelSteps.is_null) return;
    
    std::vector<std::string> funnelStepsVector;
    std::string evString ((char *)ev.ptr, ev.len);
    std::string timeStampString ((char *)timestamp.ptr, timestamp.len); 
    std::string funnelStepsString ((char *)funnelSteps.ptr, funnelSteps.len);
    std::string windowSizeString((char *)windowSize.ptr, windowSize.len);
    SplitString(funnelStepsString, funnelStepsVector, ",");
    for (int i = 0; i < funnelStepsVector.size(); i++) {
        std::vector<std::string> mergedFunnelStep;
        SplitString(funnelStepsVector[i], mergedFunnelStep, "|");
        for (int j=0; j< mergedFunnelStep.size(); j++) {
            if (mergedFunnelStep[j] == evString) {
                assert(result->len == sizeof(EventLogs));
                EventLogs* eventLogs = reinterpret_cast<EventLogs*>(result->ptr);
                eventLogs->windowSize = windowSizeString;
                eventLogs->add(timeStampString, i);
            }
        }
    }
}
void FunnelMerge(FunctionContext* context, const StringVal& src, StringVal* dst) {
    if (src.is_null || dst->is_null) return;
    const EventLogs* srcEventList = reinterpret_cast<const EventLogs*>(src.ptr);
    EventLogs* dstEventList = reinterpret_cast<EventLogs*>(dst->ptr);
    dstEventList->windowSize = srcEventList->windowSize.empty() ? dstEventList->windowSize : srcEventList->windowSize;
    dstEventList->merge(*srcEventList);
}
StringVal FunnelFinalize(FunctionContext* context, const StringVal& src) {
    if (src.is_null) return StringVal::null();
    std::vector<std::pair<uint8_t, std::string> > funnelResult;
    const EventLogs* eventList = reinterpret_cast<const EventLogs*>(src.ptr);
    if (eventList -> events_list.empty()) {
        return StringVal::null();
    }
    const std::string windowSize = eventList->windowSize;
    for (const auto & pair : eventList->events_list) {
        std::string timestamp = pair.first;
        uint8_t currentStep = pair.second;
        if (currentStep == 0) {
            funnelResult.push_back(std::make_pair(0, timestamp));
        } else {
            for (auto& eventPair : funnelResult) {
                std::string firstTimeStamp = eventPair.second;
                uint8_t step = eventPair.first;
                if ((currentStep == step + 1) && atol(timestamp.c_str()) <= atol(firstTimeStamp.c_str()) + atol(windowSize.c_str())) {
                    eventPair.first = currentStep;
                }
            }
        }
    }
    const auto maxStepPair = std::minmax_element(funnelResult.begin(), funnelResult.end());
    int  maxStep = (int)(maxStepPair.second->first);
    delete src.ptr;
    return ToStringVal(context, maxStep);
}The query is:
select uid,my_funnel(ev, time, '10', 'A,B,C,D') from impala_udf_test group by uid;
The structure of the table:
CREATE TABLE default.impala_udf_test (
    uid STRING,
    time STRING,
    ev STRING  )
WITH SERDEPROPERTIES ('serialization.format'='1')                                          
STORED AS PARQUETdata of the table:
Best wishes,
Kathy
Created 03-25-2019 12:36 AM
void FunnelInit(FunctionContext* context, StringVal* val) {
    EventLogs* eventLogs = new EventLogs();
    val->ptr = (uint8_t*) eventLogs;
    // Exit on failed allocation. Impala will fail the query after some time.
    if (val->ptr == NULL) {
        *val = StringVal::null();
        return;
    }
    val->is_null = false;
    val->len = sizeof(EventLogs);
}I did another scan and the memory management in the above function is also slightly problematic - the memory attached to the intermediate StringVal would be better allocated from the Impala UDF interface so that Impala can track the memory consumption. E.g. see https://github.com/cloudera/impala-udf-samples/blob/bc70833/uda-sample.cc#L76 .
I think the real issue though is the EventLogs data structure and lack of a Serialize() function. It's a somewhat complex nested structure with the string and vector. In order for the UDA to work, you need to have a Serialize() function that flattens out the intermediate result into a single StringVal. This is pretty unavoidable since Impala needs to be able to send the intermediate values over the network and/or write it to disk, and Impala doesn't know enough about your data structure to do it automatically. Our docs do mention this here https://www.cloudera.com/documentation/enterprise/latest/topics/impala_udf.html#udafs.
Putting it into practice is a bit tricky. One working example is the implementation of reservoir sampling in Impala itself. Unfortunately I think it's a little over-complicated:https://github.com/apache/impala/blob/df53ec/be/src/exprs/aggregate-functions-ir.cc#L1067
The general pattern for complex intermediate values is to have a "header" that lets your determine whether the intermediate value is currently serialized, then either the deserialized representation, or the serialized representation after the "header" using a flexible array member or similar - https://en.wikipedia.org/wiki/Flexible_array_member. The Serialize() function will convert the representation by packing any nested structures into a single StringVal() with the header in front. Then other functions can switch back to the deserialized representation. Or in some cases, you can be clever and avoid the conversion in some case (that's what the reservoir sample function above is doing, and part of why it's overly complex). Anyway, a really rough illustration of the idea is as follows:
struct DeserializedValue {
 ...
}
struct IntermediateValue {
  bool serialized;
  union {
    DeserializedValue val;
     char buf[0];
   };
   StringVal Serialize() {
     if (serialized) {
       // Just copy serialized representation to output StringVal
     } else {
        // Flatten val into an output StringVal
     }
   }
   void DeserializeIfNeeded() {
       if (serialized) {
         // Unpack buf into val.  
      }
    }
};
Just as a side note, the use of C++ builtin vector and string in the intermediate value can be problematic if they're large, since Impala doesn't account for the memory involved. But that's very much a second-order problem compared to it not working at all.
Created 03-22-2019 10:08 AM
The "Connection reset by peer" message should have just been printed by the client (e.g. Hue, impala-shell, etc.). If the impalad itself crashed it should have printed some exit message or other logs that indicated why the impalad crashed (logs should be in impalad.INFO). If the impalad logs don't have anything useful, then it should have written either a minidump or core dump which can be parsed to see what caused it to crash.
How big are your input files? Is it possible that with only one input file, only a single scan range was used to read the table, and that with more input files, multiple scan ranges were spawned? If that is the case, it sounds like there might be a race condition somewhere .
Created 03-22-2019 08:38 PM
delete src.ptr;
<-- that is a bug that will definitely cause Impala to crash if you run the UDA enough times. Impala manages that memory and it's definitely not valid to free it yourself! The Impala runtime will automatically manage memory for StringVal inputs.
Created 03-23-2019 08:03 AM
Hi, Tim. Thanks for your help. I removed that from the code, but Impala still crashed when I use the UDA function.
Created 03-23-2019 08:37 AM
Hi, Sahil. Thanks for your help. It's a small table with only 20 records. I found the problem because Impala will crash immediately if I use the UDA function after an INSERT INTO clause, even with only one record added. I found the hs_err.log today.
 
Does it mean that something went wrong with string values?
Created 03-23-2019 12:21 PM
Could you attach the full hs_err.log file? There should be a stack trace in the hs_err.log file of the thread that crashed. The impalad.INFO files would probably help as well.
Created 03-24-2019 08:24 AM
Hi, Sahil. Thanks for your reply. Sorry, but according to the policy of my company, I cannot post the whole hs_err.log file. Here is a part of it. I am not sure whether it would be useful.
Created 03-24-2019 09:16 AM
Is the stack trace the same every time this crashes? If so, there is likely a bug somewhere in your UDAF, it's hard for me to say exactly where the bug is, but I would recommend trying to reproduce the issue outside Impala (perhaps run the UDAF in a test harness). If you can reproduce the crash outside Impala, standard C++ debugging tools should help you pin down the issue.
Created 03-25-2019 12:36 AM
void FunnelInit(FunctionContext* context, StringVal* val) {
    EventLogs* eventLogs = new EventLogs();
    val->ptr = (uint8_t*) eventLogs;
    // Exit on failed allocation. Impala will fail the query after some time.
    if (val->ptr == NULL) {
        *val = StringVal::null();
        return;
    }
    val->is_null = false;
    val->len = sizeof(EventLogs);
}I did another scan and the memory management in the above function is also slightly problematic - the memory attached to the intermediate StringVal would be better allocated from the Impala UDF interface so that Impala can track the memory consumption. E.g. see https://github.com/cloudera/impala-udf-samples/blob/bc70833/uda-sample.cc#L76 .
I think the real issue though is the EventLogs data structure and lack of a Serialize() function. It's a somewhat complex nested structure with the string and vector. In order for the UDA to work, you need to have a Serialize() function that flattens out the intermediate result into a single StringVal. This is pretty unavoidable since Impala needs to be able to send the intermediate values over the network and/or write it to disk, and Impala doesn't know enough about your data structure to do it automatically. Our docs do mention this here https://www.cloudera.com/documentation/enterprise/latest/topics/impala_udf.html#udafs.
Putting it into practice is a bit tricky. One working example is the implementation of reservoir sampling in Impala itself. Unfortunately I think it's a little over-complicated:https://github.com/apache/impala/blob/df53ec/be/src/exprs/aggregate-functions-ir.cc#L1067
The general pattern for complex intermediate values is to have a "header" that lets your determine whether the intermediate value is currently serialized, then either the deserialized representation, or the serialized representation after the "header" using a flexible array member or similar - https://en.wikipedia.org/wiki/Flexible_array_member. The Serialize() function will convert the representation by packing any nested structures into a single StringVal() with the header in front. Then other functions can switch back to the deserialized representation. Or in some cases, you can be clever and avoid the conversion in some case (that's what the reservoir sample function above is doing, and part of why it's overly complex). Anyway, a really rough illustration of the idea is as follows:
struct DeserializedValue {
 ...
}
struct IntermediateValue {
  bool serialized;
  union {
    DeserializedValue val;
     char buf[0];
   };
   StringVal Serialize() {
     if (serialized) {
       // Just copy serialized representation to output StringVal
     } else {
        // Flatten val into an output StringVal
     }
   }
   void DeserializeIfNeeded() {
       if (serialized) {
         // Unpack buf into val.  
      }
    }
};
Just as a side note, the use of C++ builtin vector and string in the intermediate value can be problematic if they're large, since Impala doesn't account for the memory involved. But that's very much a second-order problem compared to it not working at all.
Created 03-26-2019 12:51 AM
Thanks for your reply, Tim. Does Impala support using third-party shared libraries such as boost_serialization in UDAF? I tried to use that to do serialization but it caused an error.
libboost_serialization.so.1.53.0: cannot open shared object file: No such file or directory
 
					
				
				
			
		
