Support Questions

Find answers, ask questions, and share your expertise

How to make hive queries including scala and python functions?

Contributor

Hello guys,

I'm following this Twitter streaming spark tutorial

However, instead of Spark I'm using Flume and Hive to store the data.

The problem is that in the tutorial we create 2 functions: one in scala and the other in python.

And when I use the hive interpreter to access the data %hive it doens't recognize the functions created before.

Is there any way to make a bridge between Scala and/or Python so that Hive can recognize these 2 functions?

/* declaring a function in Scala */

def sentiment(s:String) : String = {
    val positive = Array("like", "love", "good", "great", "happy", "cool", "the", "one", "that")
    val negative = Array("hate", "bad", "stupid", "is")

    var st = 0;

    val words = s.split(" ")    
    positive.foreach(p =>
        words.foreach(w =>
            if(p==w) st = st+1
        )
    )

    negative.foreach(p=>
        words.foreach(w=>
            if(p==w) st = st-1
        )
    )
    if(st>0)
        "positivie"
    else if(st<0)
        "negative"
    else
        "neutral"
}

sqlc.udf.register("sentiment", sentiment _)
%pyspark

#declaring a function in Python

import re

def wordcount(a):
  return len(re.split("\W+",a))
  
sqlContext.registerFunction("wordcount", wordcount)

Many thanks in advance.

Best regards

1 ACCEPTED SOLUTION

@Hugo Felix

One option is to implement these functions as a Hive UDF (written in python).

For example, your new python function (my_py_udf.py) would look something like this:

import sys

for line in sys.stdin:
    
    createdAt, screenName, text = line.replace('\n',' ').split('\t')
    
    positive = set(["love", "good", "great", "happy", "cool", "best", "awesome", "nice"])
    negative = set(["hate", "bad", "stupid"])
    
    words = text.split()
    word_count = len(words)
    
    positive_matches = [1 for word in words if word in positive]
    negative_matches = [-1 for word in words if word in negative]
    
    st = sum(positive_matches) + sum(negative_matches)
    
    if st > 0:
        print '\t'.join([text, 'positive', str(word_count)])
    elif st < 0:
        print '\t'.join([text, 'negative', str(word_count)])
    else:
        print '\t'.join([text, 'neutral', str(word_count)])

NOTE: This function combines both of your previous functions into one (since you can calculate wordcount and sentiment in one function).

To call this UDF within Hive, run Hive code similar to this:

ADD FILE /home/hive/my_py_udf.py;
SELECT
TRANSFORM (createdAt, screenName, text)
USING 'python my_py_udf.py'
AS  text, 
    sentiment,
    word_count
FROM tweets;

Hope this helps!

View solution in original post

23 REPLIES 23

@Hugo Felix

One option is to implement these functions as a Hive UDF (written in python).

For example, your new python function (my_py_udf.py) would look something like this:

import sys

for line in sys.stdin:
    
    createdAt, screenName, text = line.replace('\n',' ').split('\t')
    
    positive = set(["love", "good", "great", "happy", "cool", "best", "awesome", "nice"])
    negative = set(["hate", "bad", "stupid"])
    
    words = text.split()
    word_count = len(words)
    
    positive_matches = [1 for word in words if word in positive]
    negative_matches = [-1 for word in words if word in negative]
    
    st = sum(positive_matches) + sum(negative_matches)
    
    if st > 0:
        print '\t'.join([text, 'positive', str(word_count)])
    elif st < 0:
        print '\t'.join([text, 'negative', str(word_count)])
    else:
        print '\t'.join([text, 'neutral', str(word_count)])

NOTE: This function combines both of your previous functions into one (since you can calculate wordcount and sentiment in one function).

To call this UDF within Hive, run Hive code similar to this:

ADD FILE /home/hive/my_py_udf.py;
SELECT
TRANSFORM (createdAt, screenName, text)
USING 'python my_py_udf.py'
AS  text, 
    sentiment,
    word_count
FROM tweets;

Hope this helps!

Contributor

@Dan Zaratsian

Many many thanks for your great answer!

I ran the code you gave me but encounter a first error which was due to the fact that I had security enabled on hive. Disabled it and now I get this one:

java.sql.SQLException: Error while 
processing statement: FAILED: Execution Error, return code 2 from 
org.apache.hadoop.hive.ql.exec.tez.TezTask. Vertex failed, 
vertexName=Map 1, vertexId=vertex_1499773736316_0013_1_00, 
diagnostics=[Task failed, taskId=task_1499773736316_0013_1_00_000000, 
diagnostics=[TaskAttempt 0 failed, info=[Error: Failure while running 
task:java.lang.RuntimeException: java.lang.RuntimeException: 
org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error 
while processing row {"id":884722952981426178,"created_at":"Tue Jul 11 
10:36:04 +0000 2017","source":"<a 
href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter 
for 
iPhone</a>","favorited":false,"retweeted_status":{"text":"We're 
hiring! https://t.co/jGOOgyxuzo via @colliderocks #jobs #onsite #lisbon 
#gamedev","user":{"screen_name":"tiagomploureiro","name":"Tiago 
Loureiro"},"retweet_count":1},"entities":{"urls":[{"expanded_url":"https://collide.rocks/careers/"}],"user_mentions":[{"screen_name":"tiagomploureiro","name":"Tiago
 
Loureiro"},{"screen_name":"CollideRocks","name":"Collide"}],"hashtags":[{"text":"jobs"},{"text":"onsite"},{"text":"lisbon"},{"text":"gamedev"}]},"lang":"en","retweet_count":0,"text":"RT
 @tiagomploureiro: We're hiring! https://t.co/jGOOgyxuzo via 
@colliderocks #jobs #onsite #lisbon 
#gamedev","user":{"screen_name":"diogostuart","name":"Diogo 
Vasconcelos","friends_count":1976,"followers_count":2954,"statuses_count":5122,"verified":false,"utc_offset":3600,"time_zone":"Lisbon"},"datehour":2017071111}
	at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:173)
	at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:139)
	at 
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:344)
	at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:181)
	at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:172)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
	at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:172)
	at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:168)
	at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: 
org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error 
while processing row {"id":884722952981426178,"created_at":"Tue Jul 11 
10:36:04 +0000 2017","source":"<a 
href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter 
for 
iPhone</a>","favorited":false,"retweeted_status":{"text":"We're 
hiring! https://t.co/jGOOgyxuzo via @colliderocks #jobs #onsite #lisbon 
#gamedev","user":{"screen_name":"tiagomploureiro","name":"Tiago 
Loureiro"},"retweet_count":1},"entities":{"urls":[{"expanded_url":"https://collide.rocks/careers/"}],"user_mentions":[{"screen_name":"tiagomploureiro","name":"Tiago
 
Loureiro"},{"screen_name":"CollideRocks","name":"Collide"}],"hashtags":[{"text":"jobs"},{"text":"onsite"},{"text":"lisbon"},{"text":"gamedev"}]},"lang":"en","retweet_count":0,"text":"RT
 @tiagomploureiro: We're hiring! https://t.co/jGOOgyxuzo via 
@colliderocks #jobs #onsite #lisbon 
#gamedev","user":{"screen_name":"diogostuart","name":"Diogo 
Vasconcelos","friends_count":1976,"followers_count":2954,"statuses_count":5122,"verified":false,"utc_offset":3600,"time_zone":"Lisbon"},"datehour":2017071111}
	at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.processRow(MapRecordSource.java:91)
	at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68)
	at 
(...)



I think this is an out o memory error... What do you think?

I've always had a lot of difficulties configuring the memory on this VM.

Many thanks in advance. Kind regards

@Hugo Felix

It might be an out of memory issue, but it could also be a variable/column mismatch. Can you share your python function and the Hive query so that I can review?

Contributor

@Dan Zaratsian

I'm using the stuff you've gave me for now:

    import sys
     
    for line in sys.stdin:
        
        created_at, user.screen_name, text = line.split('\t')
        
        positive = set(["love", "good", "great", "happy", "cool", "best", "awesome", "nice"])
        negative = set(["hate", "bad", "stupid"])
        
        words = text.split()
        word_count = len(words)
        
        positive_matches = [1 for word in words if word in positive]
        negative_matches = [-1 for word in words if word in negative]
        
        st = sum(positive_matches) + sum(negative_matches)
        
        if st > 0:
            print '\t'.join([text, 'positive', str(word_count)])
        elif st < 0:
            print '\t'.join([text, 'negative', str(word_count)])
        else:
            print '\t'.join([text, 'neutral', str(word_count)])
ADD JAR /tmp/json-serde-1.3.8-jar-with-dependencies.jar;
ADD FILE /tmp/my_py_udf.py;

SELECT
    TRANSFORM (created_at, user.screen_name, text)
    USING 'python my_py_udf.py'
    AS  text, 
        sentiment,
        word_count
    FROM tweets

Contributor

@Dan Zaratsian

Can you please help one more time?

Many thanks in advance.

@Hugo Felix What happens if you replace

created_at, user.screen_name, text = line.split('\t')

with

created_at, screen_name, text = line.split('\t')

I do not believe python will be able to parse the user.screen_name variable in the context you are writing it. In my example, the function accepts a tab-delimited argument and then you perform the split('\t'), it parses the argument out into X number of variables. The names of the assigned variables (such as created_at, screen_name, text) are arbitrary (you could name then x,y,z if you wanted, but you would have to make sure the rest of the python script used the x,y,z variable names). Give that a try and let me know if it helps. Thanks.

Contributor

Hello @Dan Zaratsian

It happens exactly the same...

I'm still wondering if this is a problem of Hive or some other configuration on Ambari.

I ask this because I've made this simple UDF:

import sys
for line in sys.stdin:
print '\t'.join([line])

And ran this query:

ADD JAR /tmp/json-serde-1.3.8-jar-with-dependencies.jar;
ADD FILE /tmp/test.py;

SELECT
    TRANSFORM (text)
    USING 'python test.py'
    FROM tweets

And I'm getting the exact same error...

@Hugo Felix Yeah, that was the next test I had in mind for you to test out. Thanks for sharing the results. Can you share the environment you are working in?

Are you using Spark 1.6.x or Spark 2.x?

Also, what version of HDP are you using?

Contributor

@Dan Zaratsian

I'm not using Spark at the moment since I'm running the job directly on Hive for troubleshooting

Spark ver - 1.6.x.2.4

Hive - 1.2.1.2.4

HDP - 2.4.0.0

@Hugo Felix

Here's a test, that should help to determine if your syntax is off or if your environment is misconfigured.

First, create a test Hive table and populate it with data:

CREATE TABLE IF NOT EXISTS testtable
    (id string, text string)
    STORED AS ORC;

INSERT INTO TABLE testtable VALUES 
    ('1111', 'The service was great, and the agent was very helpful'), 
    ('2222', 'I enjoyed the event but the food was terrible'),
    ('3333', 'Unhappy with the organization of the event')

Then create a file called "my_py_udf.py" (as shown below). It can be placed anywhere, but in my example I placed it at /tmp/my_py_udf.py.

import sys
 
for line in sys.stdin:
    
    id, text = line.replace('\n',' ').split('\t')
    
    positive = set(["love", "good", "great", "happy", "cool", "best", "awesome", "nice", "helpful", "enjoyed"])
    negative = set(["hate", "bad", "stupid", "terrible", "unhappy"])
    
    words = text.split()
    word_count = len(words)
    
    positive_matches = [1 for word in words if word in positive]
    negative_matches = [-1 for word in words if word in negative]
    
    st = sum(positive_matches) + sum(negative_matches)
    
    if st > 0:
        print '\t'.join([text, 'positive', str(word_count)])
    elif st < 0:
        print '\t'.join([text, 'negative', str(word_count)])
    else:
        print '\t'.join([text, 'neutral', str(word_count)])

Then from within Hive, execute the following commands:

ADD FILE my_py_udf.py;

SELECT
TRANSFORM (id, text)
USING 'python my_py_udf.py'
AS  (text, sentiment, word_count)
FROM testtable;

Your resulting output should look like this:

The service was great, and the agent was very helpful   positive    10
I enjoyed the event but the food was terrible           neutral     9
Unhappy with the organization of the event              neutral     7

Contributor

@Dan Zaratsian

I was able to reproduce the code you've gave me. It runs ok.

The problem with the twitter table persists.

If I add the Json serde in the query I get error processing row, if not, it hangs a lot of time and returns map operator run initialized . I think I have to add the Serde and therefore the problem is not from here.

Here's the code:

import sys
for line in sys.stdin:
     text = line.split('\t')
        
    positive = set(["love", "good", "great", "happy", "cool", "best", "awesome", "nice", "helpful", "enjoyed"])
    negative = set(["hate", "bad", "stupid", "terrible", "unhappy"])

    words = text.split()
    word_count = len(words)
        
    positive_matches = [1 for word in words if word in positive]
    negative_matches = [-1 for word in words if word in negative]
        
    st = sum(positive_matches) + sum(negative_matches)
        
    if st > 0:
        print ('\t'.join([text, 'positive', str(word_count)]))
    elif st < 0:
        print ('\t'.join([text, 'negative', str(word_count)]))
    else:
        print ('\t'.join([text, 'neutral', str(word_count)]))

I was able to run the tweets table with this test:

import sys
     
for line in sys.stdin:

    print ('\t'.join([line]))
ADD JAR /tmp/json-serde-1.3.8-jar-with-dependencies.jar;

ADD FILE /tmp/teste.py;

SELECT
    TRANSFORM (text)
    USING 'python teste.py'
    FROM tweets;

@Hugo Felix

Nice! Glad to see that you got it working. If you upgrade to Spark 2.x, then you should not need to add the serde (just something to keep in mind). If you're all set, can you please mark thread as accepted. Thanks!

Contributor

@Dan Zaratsian

Only got your 2nd example working... that to check if the prolem was with the syntax or system misconfiguration.

With the function you gave me i cannot check the tweets table, it gives the row error.

Can you please check this code one last time?

import sys
     
for line in sys.stdin:

    id, text = line.replace('\n',' ').split('\t')
        
    positive = set(["love", "good", "great", "happy", "cool", "best", "awesome", "nice", "helpful", "enjoyed"])
    negative = set(["hate", "bad", "stupid", "terrible", "unhappy"])

    words = text.split()
    word_count = len(words)
        
    positive_matches = [1 for word in words if word in positive]
    negative_matches = [-1 for word in words if word in negative]
        
    st = sum(positive_matches) + sum(negative_matches)
        
    if st > 0:
        print ('\t'.join([text, 'positive', str(word_count)]))
    elif st < 0:
        print ('\t'.join([text, 'negative', str(word_count)]))
    else:
        print ('\t'.join([text, 'neutral', str(word_count)]))

Best regards

Contributor
@Dan Zaratsian

Found the error:

createdAt, screenName, text = line.replace('\n',' ').split('\t')

It only works when I have only 1 variable. With more than 1 it crashes.

Is there any alternative to the split('\t') ?

Contributor

@Dan Zaratsian

After some days I've reached the conclusion that the problem must be on the Json Serde because when I upload your table into Hive it works ok.

I'm currently using

json-serde-1.3.8-jar-with-dependencies.jar ...

Many thanks in advance.

Best regards

@Hugo Felix

Thanks for the update! Hortonworks is currently on HDP 2.6, so if you have the option, it sounds like it would be beneficial to upgrade.

Also as a quick reference, the most recent Hortonworks Sandbox can be downloaded from here: https://hortonworks.com/downloads/#sandbox

Here's a link to the documentation as well: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/index.html

Contributor

@Dan Zaratsian

I don't see how updating HDP can solve my problem.

I think the problem maybe of the serde when creating the table.

I'm currently using this:

ADD JAR hdfs://192.168.0.73:8020/user/admin/oozie-workflows/lib/json-serde-1.3.8-jar-with-dependencies.jar;
CREATE EXTERNAL TABLE tweets (
  id bigint, 
  created_at string,
  source STRING,
   favorited BOOLEAN,
   retweeted_status STRUCT<
     text:STRING,
     user:STRUCT<screen_name:STRING,name:STRING>,
     retweet_count:INT>,
   entities STRUCT<
     urls:ARRAY<STRUCT<expanded_url:STRING>>,
     user_mentions:ARRAY<STRUCT<screen_name:STRING,name:STRING>>,
     hashtags:ARRAY<STRUCT<text:STRING>>>,
  lang string,
  retweet_count int,
  text string,
  user STRUCT<
     screen_name:STRING,
     name:STRING,
     friends_count:INT,
     followers_count:INT,
     statuses_count:INT,
     verified:BOOLEAN,
     utc_offset:INT,
     time_zone:STRING>
       )
PARTITIONED BY (datehour int)
ROW FORMAT SERDE 
'org.apache.hive.hcatalog.data.JsonSerDe' 
WITH SERDEPROPERTIES ( "ignore.malformed.json" = "true")
LOCATION
  'hdfs://192.168.0.73:8020/user/flume/tweets'

@Hugo Felix

It could solve the problem because serdes are build-in and updated along with Hive updates. It works fine for me in recent versions of HDP, so that is why I wanted to mention it.

I saw that you opened another question specifically for the Hive serde issue.
Your original question "How to make hive queries include scala and python functions" is answered as part of my posts, so when you get a chance, could you please accept the best answer. There are a lot of responses in this thread, so that may help someone else out.

I do have one other thought to debug your JSON serde error. It could be that the way that you stored JSON within Hive is incorrect. If that is the case, then when you try to execute a python UDF against that Hive record, it isn't able to find the right structure. If you execute a "select *" against your hive table, how does the output look?

Contributor

Hello @Dan Zaratsian and many thanks once more.
Sorry you're right, I've forgotten to select best answer.
I can make queries in the hive table with this serde, except when I query the text column.

For instance, if I use your script with ID and Lang columns it runs smoothly. If I query the text column, it gives error.

I'll try to update HDP to version 2.6.1.0 and then let you know.
Best regards

Thanks @Hugo Felix

I'll refer to your other posts to tackle the additional issues. I wonder if there's an issue in the way you are storing the twitter data within Hive. Here's an older post, but details the serde and Hive queries necessary to read the Twitter data on an older version of HDP. You may find this helpful: https://hortonworks.com/blog/howto-use-hive-to-sqlize-your-own-tweets-part-two-loading-hive-sql-quer...

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.