Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to make hive queries including scala and python functions?

avatar
Rising Star

Hello guys,

I'm following this Twitter streaming spark tutorial

However, instead of Spark I'm using Flume and Hive to store the data.

The problem is that in the tutorial we create 2 functions: one in scala and the other in python.

And when I use the hive interpreter to access the data %hive it doens't recognize the functions created before.

Is there any way to make a bridge between Scala and/or Python so that Hive can recognize these 2 functions?

/* declaring a function in Scala */

def sentiment(s:String) : String = {
    val positive = Array("like", "love", "good", "great", "happy", "cool", "the", "one", "that")
    val negative = Array("hate", "bad", "stupid", "is")

    var st = 0;

    val words = s.split(" ")    
    positive.foreach(p =>
        words.foreach(w =>
            if(p==w) st = st+1
        )
    )

    negative.foreach(p=>
        words.foreach(w=>
            if(p==w) st = st-1
        )
    )
    if(st>0)
        "positivie"
    else if(st<0)
        "negative"
    else
        "neutral"
}

sqlc.udf.register("sentiment", sentiment _)
%pyspark

#declaring a function in Python

import re

def wordcount(a):
  return len(re.split("\W+",a))
  
sqlContext.registerFunction("wordcount", wordcount)

Many thanks in advance.

Best regards

1 ACCEPTED SOLUTION

avatar

@Hugo Felix

One option is to implement these functions as a Hive UDF (written in python).

For example, your new python function (my_py_udf.py) would look something like this:

import sys

for line in sys.stdin:
    
    createdAt, screenName, text = line.replace('\n',' ').split('\t')
    
    positive = set(["love", "good", "great", "happy", "cool", "best", "awesome", "nice"])
    negative = set(["hate", "bad", "stupid"])
    
    words = text.split()
    word_count = len(words)
    
    positive_matches = [1 for word in words if word in positive]
    negative_matches = [-1 for word in words if word in negative]
    
    st = sum(positive_matches) + sum(negative_matches)
    
    if st > 0:
        print '\t'.join([text, 'positive', str(word_count)])
    elif st < 0:
        print '\t'.join([text, 'negative', str(word_count)])
    else:
        print '\t'.join([text, 'neutral', str(word_count)])

NOTE: This function combines both of your previous functions into one (since you can calculate wordcount and sentiment in one function).

To call this UDF within Hive, run Hive code similar to this:

ADD FILE /home/hive/my_py_udf.py;
SELECT
TRANSFORM (createdAt, screenName, text)
USING 'python my_py_udf.py'
AS  text, 
    sentiment,
    word_count
FROM tweets;

Hope this helps!

View solution in original post

23 REPLIES 23

avatar

Thanks @Hugo Felix

I'll refer to your other posts to tackle the additional issues. I wonder if there's an issue in the way you are storing the twitter data within Hive. Here's an older post, but details the serde and Hive queries necessary to read the Twitter data on an older version of HDP. You may find this helpful: https://hortonworks.com/blog/howto-use-hive-to-sqlize-your-own-tweets-part-two-loading-hive-sql-quer...

avatar
Rising Star

Hello @Dan Zaratsian

I'm currently at HDP 2.6.1.0

Followed the instructions on the link you've gave me and used json serde 1.3.7 with dependencies (cannot found the 1.1.4)

Created the table and I've used this json captured from flume:

flumedata.zip

Made the query and the same error persists:

java.sql.SQLException: Error while 
processing statement: FAILED: Execution Error, return code 2 from 
org.apache.hadoop.hive.ql.exec.tez.TezTask. Vertex failed, 
vertexName=Map 1, vertexId=vertex_1502379039867_0005_1_00, 
diagnostics=[Task failed, taskId=task_1502379039867_0005_1_00_000000, 
diagnostics=[TaskAttempt 0 failed, info=[Error: Failure while running 
task:java.lang.RuntimeException: java.lang.RuntimeException: Hive 
Runtime Error while closing operators
	at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:173)
	at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:139)
	at 
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:347)
	at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:194)
	at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:185)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
	at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:185)
	at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:181)
	at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Hive Runtime Error while closing 
operators
	at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.close(MapRecordProcessor.java:370)
	at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:164)
	... 14 more
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 
20003]: An error occurred when trying to close the Operator running your
 custom script.
	at 
org.apache.hadoop.hive.ql.exec.ScriptOperator.close(ScriptOperator.java:560)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:634)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:634)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:634)
	at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.close(MapRecordProcessor.java:346)
	... 15 more
], TaskAttempt 1 failed, info=[Error: Failure while running 
task:java.lang.RuntimeException: java.lang.RuntimeException: Hive 
Runtime Error while closing operators
	at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:173)
	at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:139)
	at 
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:347)
	at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:194)
	at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:185)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
	at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:185)
	at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:181)
	at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Hive Runtime Error while closing 
operators
	at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.close(MapRecordProcessor.java:370)
	at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:164)
	... 14 more
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 
20003]: An error occurred when trying to close the Operator running your
 custom script.
	at 
org.apache.hadoop.hive.ql.exec.ScriptOperator.close(ScriptOperator.java:560)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:634)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:634)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:634)
	at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.close(MapRecordProcessor.java:346)
	... 15 more
], TaskAttempt 2 failed, info=[Error: Failure while running 
task:java.lang.RuntimeException: java.lang.RuntimeException: Hive 
Runtime Error while closing operators
	at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:173)
	at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:139)
	at 
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:347)
	at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:194)
	at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:185)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
	at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:185)
	at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:181)
	at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Hive Runtime Error while closing 
operators
	at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.close(MapRecordProcessor.java:370)
	at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:164)
	... 14 more
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 
20003]: An error occurred when trying to close the Operator running your
 custom script.
	at 
org.apache.hadoop.hive.ql.exec.ScriptOperator.close(ScriptOperator.java:560)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:634)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:634)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:634)
	at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.close(MapRecordProcessor.java:346)
	... 15 more
], TaskAttempt 3 failed, info=[Error: Failure while running 
task:java.lang.RuntimeException: java.lang.RuntimeException: Hive 
Runtime Error while closing operators
	at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:173)
	at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:139)
	at 
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:347)
	at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:194)
	at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:185)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
	at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:185)
	at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:181)
	at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Hive Runtime Error while closing 
operators
	at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.close(MapRecordProcessor.java:370)
	at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:164)
	... 14 more
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 
20003]: An error occurred when trying to close the Operator running your
 custom script.
	at 
org.apache.hadoop.hive.ql.exec.ScriptOperator.close(ScriptOperator.java:560)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:634)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:634)
	at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:634)
	at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.close(MapRecordProcessor.java:346)
	... 15 more
]], Vertex did not succeed due to OWN_TASK_FAILURE, failedTasks:1 
killedTasks:0, Vertex vertex_1502379039867_0005_1_00 [Map 1] 
killed/failed due to:OWN_TASK_FAILURE]DAG did not succeed due to 
VERTEX_FAILURE. failedVertices:1 killedVertices:0

avatar
Rising Star

@Dan Zaratsian,

Hope you're doing great.

After a fresh install of HDP 2.6.1.0 I've tried to make the query.

First I've made the external tweets table:

ADD JAR /tmp/json-serde-1.3.8-jar-with-dependencies.jar;
CREATE EXTERNAL TABLE tweets (
  id bigint, 
  created_at string,
  source STRING,
   favorited BOOLEAN,
   retweeted_status STRUCT<
     text:STRING,
     user:STRUCT<screen_name:STRING,name:STRING>,
     retweet_count:INT>,
   entities STRUCT<
     urls:ARRAY<STRUCT<expanded_url:STRING>>,
     user_mentions:ARRAY<STRUCT<screen_name:STRING,name:STRING>>,
     hashtags:ARRAY<STRUCT<text:STRING>>>,
  lang string,
  retweet_count int,
  text string,
  user STRUCT<
     screen_name:STRING,
     name:STRING,
     friends_count:INT,
     followers_count:INT,
     statuses_count:INT,
     verified:BOOLEAN,
     utc_offset:INT,
     time_zone:STRING>
       )
PARTITIONED BY (datehour int)
ROW FORMAT SERDE 
'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES ( "ignore.malformed.json" = "true")
LOCATION
  'hdfs://192.168.1.11:8020/user/flume/tweets'

The hive query gives this error on hiveserver2:

2017-09-28 15:22:39,577 ERROR [HiveServer2-Background-Pool: Thread-1161]: SessionState (SessionState.java:printError(993)) - Status: Failed
2017-09-28 15:22:39,578 ERROR [HiveServer2-Background-Pool: Thread-1161]: SessionState (SessionState.java:printError(993)) - Vertex failed, vertexName=Map 1, vertexId=vertex_1506521964877_0091_1_00, diagnostics=[Task failed, taskId=task_1506521964877_0091_1_00_000000, diagnostics=[TaskAttempt 0 failed, info=[Error: Failure while running task:java.lang.RuntimeException: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row {"id":913124962290069505,"created_at":"Wed Sep 27 19:35:30 +0000 2017","source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone</a>","retweeted_status":{"text":"Mãe de @Cristiano, Dolores Aveiro no seu perfil de Instagram: \"Vim apoiar o meu @Sporting_CP\" #UCL #DiaDeSporting https://t.co/GnPPkt2CYG","user":{"screen_name":"sportingfanspt","name":"SPORTING FANS"},"retweet_count":43},"lang":"pt","retweet_count":0,"text":"RT @sportingfanspt: Mãe de @Cristiano, Dolores Aveiro no seu perfil de Instagram: \"Vim apoiar o meu @Sporting_CP\" #UCL #DiaDeSporting https…","user":{"screen_name":"ladraodoapito","name":"Arbitro com Voucher","friends_count":568,"followers_count":319,"statuses_count":1668,"verified":false,"utc_offset":3600,"time_zone":"Lisbon"},"datehour":2017092720}
    at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:173)
    at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:139)
    at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:347)
    at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:194)
    at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:185)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
    at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:185)
    at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:181)
    at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row {"id":913124962290069505,"created_at":"Wed Sep 27 19:35:30 +0000 2017","source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone</a>","retweeted_status":{"text":"Mãe de @Cristiano, Dolores Aveiro no seu perfil de Instagram: \"Vim apoiar o meu @Sporting_CP\" #UCL #DiaDeSporting https://t.co/GnPPkt2CYG","user":{"screen_name":"sportingfanspt","name":"SPORTING FANS"},"retweet_count":43},"lang":"pt","retweet_count":0,"text":"RT @sportingfanspt: Mãe de @Cristiano, Dolores Aveiro no seu perfil de Instagram: \"Vim apoiar o meu @Sporting_CP\" #UCL #DiaDeSporting https…","user":{"screen_name":"ladraodoapito","name":"Arbitro com Voucher","friends_count":568,"followers_count":319,"statuses_count":1668,"verified":false,"utc_offset":3600,"time_zone":"Lisbon"},"datehour":2017092720}
    at org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.processRow(MapRecordSource.java:91)
    at org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68)
    at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run(MapRecordProcessor.java:325)
    at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:150)
    ... 14 more
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row {"id":913124962290069505,"created_at":"Wed Sep 27 19:35:30 +0000 2017","source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone</a>","retweeted_status":{"text":"Mãe de @Cristiano, Dolores Aveiro no seu perfil de Instagram: \"Vim apoiar o meu @Sporting_CP\" #UCL #DiaDeSporting https://t.co/GnPPkt2CYG","user":{"screen_name":"sportingfanspt","name":"SPORTING FANS"},"retweet_count":43},"lang":"pt","retweet_count":0,"text":"RT @sportingfanspt: Mãe de @Cristiano, Dolores Aveiro no seu perfil de Instagram: \"Vim apoiar o meu @Sporting_CP\" #UCL #DiaDeSporting https…","user":{"screen_name":"ladraodoapito","name":"Arbitro com Voucher","friends_count":568,"followers_count":319,"statuses_count":1668,"verified":false,"utc_offset":3600,"time_zone":"Lisbon"},"datehour":2017092720}
    at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:565)
    at org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.processRow(MapRecordSource.java:83)
    ... 17 more

Can you help?

I'm only getting this error when I query the text column....

avatar
Rising Star

@Dan Zaratsian
I'm still around this problem... the query only crashes when I query 2 or more columns with the text column.

If I don't query the text column, or query it alone it works...
Do you have any suggestion, please?
Many thanks in advance.