Created 07-17-2017 03:09 PM
I'm trying to convert this Zeppelin / Spark twitter stream tutorial into one which uses Flume and Hive to store tweets.
One friend here helped me and gave this code in python which replaces the 2 functions in Scala and Python presented in the tutorial:
import sys for line in sys.stdin: created_at, user.screen_name, text = line.split('\t') positive = set(["love", "good", "great", "happy", "cool", "best", "awesome", "nice"]) negative = set(["hate", "bad", "stupid"]) words = text.split() word_count = len(words) positive_matches = [1 for word in words if word in positive] negative_matches = [-1 for word in words if word in negative] st = sum(positive_matches) + sum(negative_matches) if st > 0: print '\t'.join([text, 'positive', str(word_count)]) elif st < 0: print '\t'.join([text, 'negative', str(word_count)]) else: print '\t'.join([text, 'neutral', str(word_count)])
However, when I make the following query on hive:
ADD JAR /tmp/json-serde-1.3.8-jar-with-dependencies.jar; ADD FILE /tmp/my_py_udf.py; SELECT TRANSFORM (created_at, user.screen_name, text) USING 'python my_py_udf.py' AS text, sentiment, word_count FROM tweets
I get this row error:
java.lang.Exception: java.sql.SQLException: Error while processing statement: FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.tez.TezTask. Vertex failed, vertexName=Map 1, vertexId=vertex_1500301290070_0001_1_00, diagnostics=[Task failed, taskId=task_1500301290070_0001_1_00_000000, diagnostics=[TaskAttempt 0 failed, info=[Error: Failure while running task:java.lang.RuntimeException: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row {"id":884722952981426178,"created_at":"Tue Jul 11 10:36:04 +0000 2017","source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone</a>","favorited":false,"retweeted_status":{"text":"We're hiring! https://t.co/jGOOgyxuzo via @colliderocks #jobs #onsite #lisbon #gamedev","user":{"screen_name":"tiagomploureiro","name":"Tiago Loureiro"},"retweet_count":1},"entities":{"urls":[{"expanded_url":"https://collide.rocks/careers/"}],"user_mentions":[{"screen_name":"tiagomploureiro","name":"Tiago Loureiro"},{"screen_name":"CollideRocks","name":"Collide"}],"hashtags":[{"text":"jobs"},{"text":"onsite"},{"text":"lisbon"},{"text":"gamedev"}]},"lang":"en","retweet_count":0,"text":"RT @tiagomploureiro: We're hiring! https://t.co/jGOOgyxuzo via @colliderocks #jobs #onsite #lisbon #gamedev","user":{"screen_name":"diogostuart","name":"Diogo Vasconcelos","friends_count":1976,"followers_count":2954,"statuses_count":5122,"verified":false,"utc_offset":3600,"time_zone":"Lisbon"},"datehour":2017071111}
Can anyone help?
Many thanks in advance.
Created 07-26-2017 05:37 AM
The issue seems to be with mismatch between the data coming in and the schema of the table.
Created 07-26-2017 08:54 AM
Many thanks for your help.
Found the error:
createdAt, screenName, text = line.replace('\n',' ').split('\t')
It only works when I have only 1 variable. With more than 1 it crashes.
Is there any alternative to the split('\t') ?
Created 10-04-2017 09:56 AM
Created 07-27-2017 09:08 AM
I don't think using multiple variables should affect the query crash/failure, link.
If you can share the complete use case, then we could validate the same.
Created 07-27-2017 09:25 AM
No problem here it is. The table is stored in Hive (via flume and oozie), I don't know how I can send it...
The code seems to be ok and runs well locally....
import sys import hashlib for line in sys.stdin: line = line.strip() line = line.replace('\n',' ') lang, text = line.split('\t',maxsplit=2) positive = set(["love", "good", "great", "happy", "cool", "best", "awesome", "nice", "helpful", "enjoyed"]) negative = set(["hate", "bad", "stupid", "terrible", "unhappy"]) words = text.split() word_count = len(words) positive_matches = [1 for word in words if word in positive] negative_matches = [-1 for word in words if word in negative] st = sum(positive_matches) + sum(negative_matches) if st > 0: print ('\t'.join([lang, text, 'positive', str(word_count)])) elif st < 0: print ('\t'.join([lang, text, 'negative', str(word_count)])) else: print ('\t'.join([lang, text, 'neutral', str(word_count)]))