Created on 12-31-2017 11:57 PM - edited 08-17-2019 09:34 AM
Hive is very powerful, but sometimes you need to add some procedural code for a special circumstance such as complex parsing of a field. Hive provides the ability to easily create User Defined Table Functions (UDFT’s). These allow you to transform your Hive results, pass them through the UDTF and return data as a set of rows that can then be used like any other Hive result set. These can be written in Java or Python, and we will use Python for this article. However, the techniques here are applicable to both with some syntax changes.
There are a lot of great articles on building these such as https://community.hortonworks.com/articles/72414/how-to-create-a-custom-udf-for-hive-using-python.ht....
These pretty much work as advertised, but don’t get into how to debug or troubleshoot your code. However, when you get into any significant logic (and sometimes not so significant!), you are likely to create a few bugs. This can be an issue parsing, or in the case of Python, can even be syntax errors in your code! So we are going to look at two techniques that can be used to debug these UDTF’s.
The problem
When Hive encounters an error in the UDTF it simply blows up with a pretty confusing error. The underlying error is hidden, and you are left scratching your head. The error will probably look something like:
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20003]: An error occurred when trying to close the Operator running your custom script.
Or from a Yarn perspective:
Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143
NOT VERY HELPFUL!!!
Our test scenario
We need to parse json text stored in a column of a Hive table into columns. Hive serde’s can handle most json formats, but there are still a few outlier situations, where you need a more flexible json parser.
Quick UDTF Review
First, two important things to remember about Hive UDTF’s:
So, this means that if you are getting an error in your UDTF, you can’t just print debug statements to stdout. Hive is expecting its’ output here, and just won’t print them. Rather, it would cause a format error.
The Table
CREATE EXTERNAL TABLE `default.events`(`json_text` string) STORED AS TEXTFILE LOCATION '/tmp/events';
The Data
{ "deviceId": "13a46b21-9528-4eb1-93bd-303a3b3e6b6a", "events": [ { "Process_Started": { "timestamp": "2017-06-01T18:26:24.444Z" } }, { "Process_Stopped": { "timestamp": "2017-06-01T18:26:24.444Z", "errorReason": "-1", "errorMsg": "The operation couldn’t be completed." } } ] }{ "deviceId": "9cd57d50-4d0e-457e-9fd3-05b9e56644e6", "events": [ { "Process_Started": { "timestamp": "2017-06-02T00:20:20.400Z" } }, { "Process_Completed": { "timestamp": "2017-06-02T02:20:29.020" } } ] }
The Query
We will save this in select_json.hql
DELETE FILE /home/<your id>/parse_events.py; ADD FILE /home/<your id>/parse_events.py; SELECT TRANSFORM (json_text) USING 'python parse_events.py' AS deviceId, eventType, eventTime, errorReason, errorMsg FROM default.events;
The UDFT
#!/usr/bin/python ################################################################################################## Hive UDTF to parse json data ################################################################################################## import sys import json reload(sys) sys.setdefaultencoding('utf8') def parse_json(json_string): j = json.loads(json_string) deviceId=j["deviceId"] events=j["events"] # Force a stupid error! x=1 y=0 z=x/y # Flatten Events Array for evt in events: try: eventType = evt.keys()[0] e = evt[eventType] edata = []edata.append(eventType) edata.append(e.get("timestamp",u'')) edata.append(e.get("errorReason",u'')) edata.append(e.get("errorMsg",u'')) # Send a tab-separated string back to Hive print u'\t'.join(edata) except Exception as ex: sys.stderr.write('AN ERROR OCCURRED IN PYTHON UDTF\n %s\n' % ex.message) def main(argv): # Parse each line sent from Hive (note we are only receiving 1 column, so no split needed) for line in sys.stdin: parse_json(line) if __name__ == "__main__": main(sys.argv[1:])
Let's Run It!
Here's a hint, Python should throw and error "ZeroDivisionError: integer division or modulo by zero". Assuming you have saved the query in select_json.hql, this would go something like this:
hive -f select_json.hql ... Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks is set to 0 since there's no reduce operator ... Task with the most failures(4): ----- Task ID: task_1514310228021_3433_m_000000 ----- Diagnostic Messages for this Task: Error: java.lang.RuntimeException: Hive Runtime Error while closing operators at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.close(ExecMapper.java:210) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162) Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20003]: An error occurred when trying to close the Operator running your custom script. at org.apache.hadoop.hive.ql.exec.ScriptOperator.close(ScriptOperator.java:560) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:631) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:631) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:631) at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.close(ExecMapper.java:192) ... 8 more
Nothing about divide by Zero anywhere. Ugh!
TECHNIQUE 1 - Forget Hive!
You are writing a Hive UDTF, but you are also just writing a program that reads from stdin and writes to stdout. So, it is a great idea to develop your logic completely outside of Hive, and once you have adequately tested you can plug it in and continue development. The easiest way to do this, which also allows you to test later with no changes is to pull out the data that Hive would send your UDTF and feed it to stdin. Given our table, it could be done like this:
hive -e "INSERT OVERWRITE LOCAL DIRECTORY '/tmp/events' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' SELECT json_text FROM default.events;" -bash-4.1$ ls -l /tmp/events total 4 -rw-r--r-- 1 screamingweasel screamingweasel 486 Dec 31 23:02 000000_0 cat /tmp/events/* { "deviceId": "13a46b21-9528-4eb1-93bd-303a3b3e6b6a", "events": [ { "Process_Started": { "timestamp": "2017-06-01T18:26:24.444Z" } }, { "Process_Stopped": { "timestamp": "2017-06-01T18:26:24.444Z", "errorReason": "-1", "errorMsg": "The operation couldn’t be completed." } } ] } { "deviceId": "9cd57d50-4d0e-457e-9fd3-05b9e56644e6", "events": [ { "Process_Started": { "timestamp": "2017-06-02T00:20:20.400Z" } }, { "Process_Completed": { "timestamp": "2017-06-02T02:20:29.020" } } ] } # DO THE ACTUAL TEST (note there may be >1 file in the directory) cat /tmp/events/* | python parse_events.py Traceback (most recent call last): File "parse_events.py", line 42, in <module> main(sys.argv[1:]) File "parse_events.py", line 39, in main parse_json(line) File "parse_events.py", line 18, in parse_json z=x/y ZeroDivisionError: integer division or modulo by zero
Simple as that! Export the columns you will be passing to the UDTF to a tab-separated file and pipe it into your UDTF. This will simulate Hive calling your UDTF, but doesn't bury any error messages. In addition, you can print whatever debug messages you like to stdout or stderr to help in debugging.
TECHNIQUE 2 - stderr is your friend!
As noted, Hive expects the results from the UDTF to be in stdout. The stderr file is fair game for writing debug statements. This is pretty old school debugging, but it's still effective. Print out values and locations in your code to help you determine where the error occurs or what values are in variables at certain times. For example, you might add the following to the UDTF script to help identify where the issue is happening:
sys.stderr.write("Before stupid error\n") x=1 y=0 z=x/y sys.stderr.write("After stupid error!\n")
The trick is to find these in the logs when running on a Yarn cluster. These scripts are set to use mapreduce, which makes it a little easier, but basically, you find the Yarn job, drill down on one of the failed containers and examine its' stderr. Attached are some screen prints from the Yarn RM showing this process.
Winner, Winner, Here are our debugging statements!
SUPPORTING FILES FOR THIS ARTICLE ARE AVAILABLE ON GITHUB AT
https://github.com/screamingweasel/articles/tree/master/udtf_debugging