- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
How to make hive queries including scala and python functions?
- Labels:
-
Apache Hive
-
Apache Zeppelin
Created 07-10-2017 11:13 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hello guys,
I'm following this Twitter streaming spark tutorial
However, instead of Spark I'm using Flume and Hive to store the data.
The problem is that in the tutorial we create 2 functions: one in scala and the other in python.
And when I use the hive interpreter to access the data %hive it doens't recognize the functions created before.
Is there any way to make a bridge between Scala and/or Python so that Hive can recognize these 2 functions?
/* declaring a function in Scala */ def sentiment(s:String) : String = { val positive = Array("like", "love", "good", "great", "happy", "cool", "the", "one", "that") val negative = Array("hate", "bad", "stupid", "is") var st = 0; val words = s.split(" ") positive.foreach(p => words.foreach(w => if(p==w) st = st+1 ) ) negative.foreach(p=> words.foreach(w=> if(p==w) st = st-1 ) ) if(st>0) "positivie" else if(st<0) "negative" else "neutral" } sqlc.udf.register("sentiment", sentiment _)
%pyspark #declaring a function in Python import re def wordcount(a): return len(re.split("\W+",a)) sqlContext.registerFunction("wordcount", wordcount)
Many thanks in advance.
Best regards
Created 07-10-2017 03:36 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
One option is to implement these functions as a Hive UDF (written in python).
For example, your new python function (my_py_udf.py) would look something like this:
import sys for line in sys.stdin: createdAt, screenName, text = line.replace('\n',' ').split('\t') positive = set(["love", "good", "great", "happy", "cool", "best", "awesome", "nice"]) negative = set(["hate", "bad", "stupid"]) words = text.split() word_count = len(words) positive_matches = [1 for word in words if word in positive] negative_matches = [-1 for word in words if word in negative] st = sum(positive_matches) + sum(negative_matches) if st > 0: print '\t'.join([text, 'positive', str(word_count)]) elif st < 0: print '\t'.join([text, 'negative', str(word_count)]) else: print '\t'.join([text, 'neutral', str(word_count)])
NOTE: This function combines both of your previous functions into one (since you can calculate wordcount and sentiment in one function).
To call this UDF within Hive, run Hive code similar to this:
ADD FILE /home/hive/my_py_udf.py; SELECT TRANSFORM (createdAt, screenName, text) USING 'python my_py_udf.py' AS text, sentiment, word_count FROM tweets;
Hope this helps!
Created 07-10-2017 03:36 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
One option is to implement these functions as a Hive UDF (written in python).
For example, your new python function (my_py_udf.py) would look something like this:
import sys for line in sys.stdin: createdAt, screenName, text = line.replace('\n',' ').split('\t') positive = set(["love", "good", "great", "happy", "cool", "best", "awesome", "nice"]) negative = set(["hate", "bad", "stupid"]) words = text.split() word_count = len(words) positive_matches = [1 for word in words if word in positive] negative_matches = [-1 for word in words if word in negative] st = sum(positive_matches) + sum(negative_matches) if st > 0: print '\t'.join([text, 'positive', str(word_count)]) elif st < 0: print '\t'.join([text, 'negative', str(word_count)]) else: print '\t'.join([text, 'neutral', str(word_count)])
NOTE: This function combines both of your previous functions into one (since you can calculate wordcount and sentiment in one function).
To call this UDF within Hive, run Hive code similar to this:
ADD FILE /home/hive/my_py_udf.py; SELECT TRANSFORM (createdAt, screenName, text) USING 'python my_py_udf.py' AS text, sentiment, word_count FROM tweets;
Hope this helps!
Created 07-11-2017 01:17 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Many many thanks for your great answer!
I ran the code you gave me but encounter a first error which was due to the fact that I had security enabled on hive. Disabled it and now I get this one:
java.sql.SQLException: Error while processing statement: FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.tez.TezTask. Vertex failed, vertexName=Map 1, vertexId=vertex_1499773736316_0013_1_00, diagnostics=[Task failed, taskId=task_1499773736316_0013_1_00_000000, diagnostics=[TaskAttempt 0 failed, info=[Error: Failure while running task:java.lang.RuntimeException: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row {"id":884722952981426178,"created_at":"Tue Jul 11 10:36:04 +0000 2017","source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone</a>","favorited":false,"retweeted_status":{"text":"We're hiring! https://t.co/jGOOgyxuzo via @colliderocks #jobs #onsite #lisbon #gamedev","user":{"screen_name":"tiagomploureiro","name":"Tiago Loureiro"},"retweet_count":1},"entities":{"urls":[{"expanded_url":"https://collide.rocks/careers/"}],"user_mentions":[{"screen_name":"tiagomploureiro","name":"Tiago Loureiro"},{"screen_name":"CollideRocks","name":"Collide"}],"hashtags":[{"text":"jobs"},{"text":"onsite"},{"text":"lisbon"},{"text":"gamedev"}]},"lang":"en","retweet_count":0,"text":"RT @tiagomploureiro: We're hiring! https://t.co/jGOOgyxuzo via @colliderocks #jobs #onsite #lisbon #gamedev","user":{"screen_name":"diogostuart","name":"Diogo Vasconcelos","friends_count":1976,"followers_count":2954,"statuses_count":5122,"verified":false,"utc_offset":3600,"time_zone":"Lisbon"},"datehour":2017071111} at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:173) at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:139) at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:344) at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:181) at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:172) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:172) at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:168) at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row {"id":884722952981426178,"created_at":"Tue Jul 11 10:36:04 +0000 2017","source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone</a>","favorited":false,"retweeted_status":{"text":"We're hiring! https://t.co/jGOOgyxuzo via @colliderocks #jobs #onsite #lisbon #gamedev","user":{"screen_name":"tiagomploureiro","name":"Tiago Loureiro"},"retweet_count":1},"entities":{"urls":[{"expanded_url":"https://collide.rocks/careers/"}],"user_mentions":[{"screen_name":"tiagomploureiro","name":"Tiago Loureiro"},{"screen_name":"CollideRocks","name":"Collide"}],"hashtags":[{"text":"jobs"},{"text":"onsite"},{"text":"lisbon"},{"text":"gamedev"}]},"lang":"en","retweet_count":0,"text":"RT @tiagomploureiro: We're hiring! https://t.co/jGOOgyxuzo via @colliderocks #jobs #onsite #lisbon #gamedev","user":{"screen_name":"diogostuart","name":"Diogo Vasconcelos","friends_count":1976,"followers_count":2954,"statuses_count":5122,"verified":false,"utc_offset":3600,"time_zone":"Lisbon"},"datehour":2017071111} at org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.processRow(MapRecordSource.java:91) at org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68) at (...)
I think this is an out o memory error... What do you think?
I've always had a lot of difficulties configuring the memory on this VM.
Many thanks in advance. Kind regards
Created 07-12-2017 08:01 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
It might be an out of memory issue, but it could also be a variable/column mismatch. Can you share your python function and the Hive query so that I can review?
Created 07-13-2017 08:42 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I'm using the stuff you've gave me for now:
import sys for line in sys.stdin: created_at, user.screen_name, text = line.split('\t') positive = set(["love", "good", "great", "happy", "cool", "best", "awesome", "nice"]) negative = set(["hate", "bad", "stupid"]) words = text.split() word_count = len(words) positive_matches = [1 for word in words if word in positive] negative_matches = [-1 for word in words if word in negative] st = sum(positive_matches) + sum(negative_matches) if st > 0: print '\t'.join([text, 'positive', str(word_count)]) elif st < 0: print '\t'.join([text, 'negative', str(word_count)]) else: print '\t'.join([text, 'neutral', str(word_count)])
ADD JAR /tmp/json-serde-1.3.8-jar-with-dependencies.jar; ADD FILE /tmp/my_py_udf.py; SELECT TRANSFORM (created_at, user.screen_name, text) USING 'python my_py_udf.py' AS text, sentiment, word_count FROM tweets
Created 07-17-2017 09:20 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Created 07-17-2017 04:21 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@Hugo Felix What happens if you replace
created_at, user.screen_name, text = line.split('\t')
with
created_at, screen_name, text = line.split('\t')
I do not believe python will be able to parse the user.screen_name variable in the context you are writing it. In my example, the function accepts a tab-delimited argument and then you perform the split('\t'), it parses the argument out into X number of variables. The names of the assigned variables (such as created_at, screen_name, text) are arbitrary (you could name then x,y,z if you wanted, but you would have to make sure the rest of the python script used the x,y,z variable names). Give that a try and let me know if it helps. Thanks.
Created 07-17-2017 05:31 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hello @Dan Zaratsian
It happens exactly the same...
I'm still wondering if this is a problem of Hive or some other configuration on Ambari.
I ask this because I've made this simple UDF:
import sys for line in sys.stdin: print '\t'.join([line])
And ran this query:
ADD JAR /tmp/json-serde-1.3.8-jar-with-dependencies.jar; ADD FILE /tmp/test.py; SELECT TRANSFORM (text) USING 'python test.py' FROM tweets
And I'm getting the exact same error...
Created 07-17-2017 08:02 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@Hugo Felix Yeah, that was the next test I had in mind for you to test out. Thanks for sharing the results. Can you share the environment you are working in?
Are you using Spark 1.6.x or Spark 2.x?
Also, what version of HDP are you using?
Created 07-18-2017 08:24 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I'm not using Spark at the moment since I'm running the job directly on Hive for troubleshooting
Spark ver - 1.6.x.2.4
Hive - 1.2.1.2.4
HDP - 2.4.0.0