Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Hive UDF Twitter stream givining ROW error

Expert Contributor

I'm trying to convert this Zeppelin / Spark twitter stream tutorial into one which uses Flume and Hive to store tweets.

One friend here helped me and gave this code in python which replaces the 2 functions in Scala and Python presented in the tutorial:

    import sys
     
    for line in sys.stdin:
        
        created_at, user.screen_name, text = line.split('\t')
        
        positive = set(["love", "good", "great", "happy", "cool", "best", "awesome", "nice"])
        negative = set(["hate", "bad", "stupid"])


        words = text.split()
        word_count = len(words)
        
        positive_matches = [1 for word in words if word in positive]
        negative_matches = [-1 for word in words if word in negative]
        
        st = sum(positive_matches) + sum(negative_matches)
        
        if st > 0:
            print '\t'.join([text, 'positive', str(word_count)])
        elif st < 0:
            print '\t'.join([text, 'negative', str(word_count)])
        else:
            print '\t'.join([text, 'neutral', str(word_count)])

However, when I make the following query on hive:

ADD JAR /tmp/json-serde-1.3.8-jar-with-dependencies.jar;
ADD FILE /tmp/my_py_udf.py;

SELECT
    TRANSFORM (created_at, user.screen_name, text)
    USING 'python my_py_udf.py'
    AS  text, 
        sentiment,
        word_count
    FROM tweets

I get this row error:

java.lang.Exception: java.sql.SQLException: Error while processing statement: FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.tez.TezTask. Vertex failed, vertexName=Map 1, vertexId=vertex_1500301290070_0001_1_00, diagnostics=[Task failed, taskId=task_1500301290070_0001_1_00_000000, diagnostics=[TaskAttempt 0 failed, info=[Error: Failure while running task:java.lang.RuntimeException: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row {"id":884722952981426178,"created_at":"Tue Jul 11 10:36:04 +0000 2017","source":"<a href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter for iPhone</a>","favorited":false,"retweeted_status":{"text":"We're hiring! https://t.co/jGOOgyxuzo via @colliderocks #jobs #onsite #lisbon #gamedev","user":{"screen_name":"tiagomploureiro","name":"Tiago Loureiro"},"retweet_count":1},"entities":{"urls":[{"expanded_url":"https://collide.rocks/careers/"}],"user_mentions":[{"screen_name":"tiagomploureiro","name":"Tiago Loureiro"},{"screen_name":"CollideRocks","name":"Collide"}],"hashtags":[{"text":"jobs"},{"text":"onsite"},{"text":"lisbon"},{"text":"gamedev"}]},"lang":"en","retweet_count":0,"text":"RT @tiagomploureiro: We're hiring! https://t.co/jGOOgyxuzo via @colliderocks #jobs #onsite #lisbon #gamedev","user":{"screen_name":"diogostuart","name":"Diogo Vasconcelos","friends_count":1976,"followers_count":2954,"statuses_count":5122,"verified":false,"utc_offset":3600,"time_zone":"Lisbon"},"datehour":2017071111}

Can anyone help?

Many thanks in advance.

5 REPLIES 5

@Hugo Felix

The issue seems to be with mismatch between the data coming in and the schema of the table.

Expert Contributor

@Sindhu

Many thanks for your help.

Found the error:

createdAt, screenName, text = line.replace('\n',' ').split('\t')

It only works when I have only 1 variable. With more than 1 it crashes.

Is there any alternative to the split('\t') ?

Expert Contributor

@Sindhu

I'm still around this problem... the query only crashes when I query 2 or more columns with the text column.

If I don't query the text column, or query it alone it works...
Do you have any suggestion, please?
Many thanks in advance.

@Hugo Felix

I don't think using multiple variables should affect the query crash/failure, link.

If you can share the complete use case, then we could validate the same.

Expert Contributor

@Sindhu

No problem here it is. The table is stored in Hive (via flume and oozie), I don't know how I can send it...

The code seems to be ok and runs well locally....

import sys
import hashlib

for line in sys.stdin:

    line = line.strip()
    line = line.replace('\n',' ')
    lang, text = line.split('\t',maxsplit=2)
        
    positive = set(["love", "good", "great", "happy", "cool", "best", "awesome", "nice", "helpful", "enjoyed"])
    negative = set(["hate", "bad", "stupid", "terrible", "unhappy"])

    words = text.split()
    word_count = len(words)
        
    positive_matches = [1 for word in words if word in positive]
    negative_matches = [-1 for word in words if word in negative]
        
    st = sum(positive_matches) + sum(negative_matches)
        
    if st > 0:
        print ('\t'.join([lang, text, 'positive', str(word_count)]))
    elif st < 0:
        print ('\t'.join([lang, text, 'negative', str(word_count)]))
    else:
        print ('\t'.join([lang, text, 'neutral', str(word_count)]))
 
Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.