Member since
08-01-2017
65
Posts
3
Kudos Received
4
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
26630 | 01-22-2018 10:19 AM | |
3108 | 01-22-2018 10:18 AM | |
2876 | 07-05-2017 02:33 PM | |
3312 | 05-26-2017 09:01 AM |
07-17-2017
05:31 PM
Hello @Dan Zaratsian
It happens exactly the same... I'm still wondering if this is a problem of Hive or some other configuration on Ambari. I ask this because I've made this simple UDF: import sys
for line in sys.stdin:
print '\t'.join([line])
And ran this query: ADD JAR /tmp/json-serde-1.3.8-jar-with-dependencies.jar;
ADD FILE /tmp/test.py;
SELECT
TRANSFORM (text)
USING 'python test.py'
FROM tweets
And I'm getting the exact same error...
... View more
07-17-2017
09:20 AM
@Dan Zaratsian Can you please help one more time? Many thanks in advance.
... View more
07-13-2017
08:42 AM
@Dan Zaratsian
I'm using the stuff you've gave me for now: import sys
for line in sys.stdin:
created_at, user.screen_name, text = line.split('\t')
positive = set(["love", "good", "great", "happy", "cool", "best", "awesome", "nice"])
negative = set(["hate", "bad", "stupid"])
words = text.split()
word_count = len(words)
positive_matches = [1 for word in words if word in positive]
negative_matches = [-1 for word in words if word in negative]
st = sum(positive_matches) + sum(negative_matches)
if st > 0:
print '\t'.join([text, 'positive', str(word_count)])
elif st < 0:
print '\t'.join([text, 'negative', str(word_count)])
else:
print '\t'.join([text, 'neutral', str(word_count)])
ADD JAR /tmp/json-serde-1.3.8-jar-with-dependencies.jar;
ADD FILE /tmp/my_py_udf.py;
SELECT
TRANSFORM (created_at, user.screen_name, text)
USING 'python my_py_udf.py'
AS text,
sentiment,
word_count
FROM tweets
... View more
07-11-2017
01:17 PM
@Dan Zaratsian
Many many thanks for your great answer! I ran the code you gave me but encounter a first error which was due to the fact that I had security enabled on hive. Disabled it and now I get this one: java.sql.SQLException: Error while
processing statement: FAILED: Execution Error, return code 2 from
org.apache.hadoop.hive.ql.exec.tez.TezTask. Vertex failed,
vertexName=Map 1, vertexId=vertex_1499773736316_0013_1_00,
diagnostics=[Task failed, taskId=task_1499773736316_0013_1_00_000000,
diagnostics=[TaskAttempt 0 failed, info=[Error: Failure while running
task:java.lang.RuntimeException: java.lang.RuntimeException:
org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error
while processing row {"id":884722952981426178,"created_at":"Tue Jul 11
10:36:04 +0000 2017","source":"<a
href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter
for
iPhone</a>","favorited":false,"retweeted_status":{"text":"We're
hiring! https://t.co/jGOOgyxuzo via @colliderocks #jobs #onsite #lisbon
#gamedev","user":{"screen_name":"tiagomploureiro","name":"Tiago
Loureiro"},"retweet_count":1},"entities":{"urls":[{"expanded_url":"https://collide.rocks/careers/"}],"user_mentions":[{"screen_name":"tiagomploureiro","name":"Tiago
Loureiro"},{"screen_name":"CollideRocks","name":"Collide"}],"hashtags":[{"text":"jobs"},{"text":"onsite"},{"text":"lisbon"},{"text":"gamedev"}]},"lang":"en","retweet_count":0,"text":"RT
@tiagomploureiro: We're hiring! https://t.co/jGOOgyxuzo via
@colliderocks #jobs #onsite #lisbon
#gamedev","user":{"screen_name":"diogostuart","name":"Diogo
Vasconcelos","friends_count":1976,"followers_count":2954,"statuses_count":5122,"verified":false,"utc_offset":3600,"time_zone":"Lisbon"},"datehour":2017071111}
at
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:173)
at
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:139)
at
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:344)
at
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:181)
at
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:172)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:172)
at
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:168)
at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException:
org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error
while processing row {"id":884722952981426178,"created_at":"Tue Jul 11
10:36:04 +0000 2017","source":"<a
href=\"http://twitter.com/download/iphone\" rel=\"nofollow\">Twitter
for
iPhone</a>","favorited":false,"retweeted_status":{"text":"We're
hiring! https://t.co/jGOOgyxuzo via @colliderocks #jobs #onsite #lisbon
#gamedev","user":{"screen_name":"tiagomploureiro","name":"Tiago
Loureiro"},"retweet_count":1},"entities":{"urls":[{"expanded_url":"https://collide.rocks/careers/"}],"user_mentions":[{"screen_name":"tiagomploureiro","name":"Tiago
Loureiro"},{"screen_name":"CollideRocks","name":"Collide"}],"hashtags":[{"text":"jobs"},{"text":"onsite"},{"text":"lisbon"},{"text":"gamedev"}]},"lang":"en","retweet_count":0,"text":"RT
@tiagomploureiro: We're hiring! https://t.co/jGOOgyxuzo via
@colliderocks #jobs #onsite #lisbon
#gamedev","user":{"screen_name":"diogostuart","name":"Diogo
Vasconcelos","friends_count":1976,"followers_count":2954,"statuses_count":5122,"verified":false,"utc_offset":3600,"time_zone":"Lisbon"},"datehour":2017071111}
at
org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.processRow(MapRecordSource.java:91)
at
org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68)
at
(...)
I think this is an out o memory error... What do you think? I've always had a lot of difficulties configuring the memory on this VM. Many thanks in advance. Kind regards
... View more
07-10-2017
11:13 AM
1 Kudo
Hello guys, I'm following this Twitter streaming spark tutorial
However, instead of Spark I'm using Flume and Hive to store the data. The problem is that in the tutorial we create 2 functions: one in scala and the other in python. And when I use the hive interpreter to access the data %hive it doens't recognize the functions created before. Is there any way to make a bridge between Scala and/or Python so that Hive can recognize these 2 functions? /* declaring a function in Scala */
def sentiment(s:String) : String = {
val positive = Array("like", "love", "good", "great", "happy", "cool", "the", "one", "that")
val negative = Array("hate", "bad", "stupid", "is")
var st = 0;
val words = s.split(" ")
positive.foreach(p =>
words.foreach(w =>
if(p==w) st = st+1
)
)
negative.foreach(p=>
words.foreach(w=>
if(p==w) st = st-1
)
)
if(st>0)
"positivie"
else if(st<0)
"negative"
else
"neutral"
}
sqlc.udf.register("sentiment", sentiment _)
%pyspark
#declaring a function in Python
import re
def wordcount(a):
return len(re.split("\W+",a))
sqlContext.registerFunction("wordcount", wordcount) Many thanks in advance. Best regards
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache Zeppelin
07-06-2017
09:17 AM
@Dinesh Chitlangia
Many thanks once more. I'm not using spark-shell... I'm using the Zeppelin view and loading the jars with z.load. However the spark-streaming jar is not on central (is hortonworks) and therefore I can't add it with z.load ... Tried with the old jar that I had 2_10:1.4.1 and I'm getting the same error on Zeppelin 0.6 (Ambari) when I run the first sql comand of the tutorial: %sql select * from tweets where text like '%girl%' limit 10
Can it be from the jar? How I add it on Zeppelin since it's not on Central? Many thanks once more.
... View more
07-05-2017
03:47 PM
@Dinesh Chitlangia
First of all, many thanks for your answer. Yes I've added that value myself since that with no value (infinite) I got the same error and I've tried to give it a value just to check... but the problem persists. My spark version is 1.6.x.2.4 and the tutorial is this one: https://zeppelin.apache.org/docs/0.5.5-incubating/tutorial/tutorial.html Many thanks once more. Best regards
... View more
07-05-2017
02:38 PM
I'm still trying to get the Zepellin's tutorial working. With a few tweaks I was able to get the stream working but I can't make queries... Here's the code: import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess
import org.apache.spark.sql.DataFrame
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.StreamingContext._
/** Configures the Oauth Credentials for accessing Twitter */
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {
val configs = new HashMap[String, String] ++= Seq(
"apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret)
println("Configuring Twitter OAuth")
configs.foreach{ case(key, value) =>
if (value.trim.isEmpty) {
throw new Exception("Error setting authentication - value for " + key + " not set")
}
val fullKey = "twitter4j.oauth." + key.replace("api", "consumer")
System.setProperty(fullKey, value.trim)
println("\tProperty " + fullKey + " set as [" + value.trim + "]")
}
println()
}
// Configure Twitter credentials
val apiKey = "wsb68B0Xv7JMm9y0GI8UWVv9M"
val apiSecret = "LxNt80GDB0noKzJOrgjJfldgZvJHTRPox8hpeidkEzD7jbJ5wk"
val accessToken = "361298122-W2KrbTiAtmtydVhbNeAEHvIHy1VbMcTgitbRrQcW"
val accessTokenSecret = "n6hjL3CXONlxHT37lbQ2PnjCfGiIMlSxxNiNbgIWHSaA9"
configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)
import org.apache.spark.streaming.twitter._
val sc = new SparkConf().setMaster("local[2]").setAppName("tweets").set("spark.cleaner.ttl","2000")
val ssc = new StreamingContext(sc, Seconds(2))
val tweets = TwitterUtils.createStream(ssc, None)
val twt = tweets.window(Seconds(60))
case class Tweet(createdAt:Long, text:String)
twt.map(status=>
Tweet(status.getCreatedAt().getTime()/1000, status.getText())
).foreachRDD { rdd=>
// Below line works only in spark 1.3.0.
// For spark 1.1.x and spark 1.2.x,
// use rdd.registerTempTable("tweets") instead.
//rdd.toDF().registerAsTable("tweets")
//val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext._
import sqlContext.implicits._
rdd.toDF.registerTempTable("tweets")
}
twt.print
ssc.start()
And the error I'm getting: org.apache.spark.SparkException: Failed to get broadcast_2_piece0 of broadcast_2
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1219)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Can anyone help? Many thanks!
... View more
Labels:
- Labels:
-
Apache Spark
-
Apache Zeppelin
07-05-2017
02:33 PM
Now it's working... import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess
import org.apache.spark.sql.DataFrame
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.StreamingContext._
/** Configures the Oauth Credentials for accessing Twitter */
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {
val configs = new HashMap[String, String] ++= Seq(
"apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret)
println("Configuring Twitter OAuth")
configs.foreach{ case(key, value) =>
if (value.trim.isEmpty) {
throw new Exception("Error setting authentication - value for " + key + " not set")
}
val fullKey = "twitter4j.oauth." + key.replace("api", "consumer")
System.setProperty(fullKey, value.trim)
println("\tProperty " + fullKey + " set as [" + value.trim + "]")
}
println()
}
// Configure Twitter credentials
val apiKey = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
val apiSecret = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
val accessToken = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
val accessTokenSecret = "XXXXXXXXXXXXXXXXXXXXXXX"
configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)
import org.apache.spark.streaming.twitter._
val sc = new SparkConf().setMaster("local[2]").setAppName("tweets").set("spark.cleaner.ttl","2000")
val ssc = new StreamingContext(sc, Seconds(2))
val tweets = TwitterUtils.createStream(ssc, None)
val twt = tweets.window(Seconds(60))
case class Tweet(createdAt:Long, text:String)
twt.map(status=>
Tweet(status.getCreatedAt().getTime()/1000, status.getText())
).foreachRDD { rdd=>
// Below line works only in spark 1.3.0.
// For spark 1.1.x and spark 1.2.x,
// use rdd.registerTempTable("tweets") instead.
//rdd.toDF().registerAsTable("tweets")
//val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext._
import sqlContext.implicits._
rdd.toDF.registerTempTable("tweets")
}
twt.print
ssc.start()
... View more
06-30-2017
03:25 PM
I'm trying to run the following twitter tutorial: import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess
/** Configures the Oauth Credentials for accessing Twitter */
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {
val configs = new HashMap[String, String] ++= Seq(
"apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret)
println("Configuring Twitter OAuth")
configs.foreach{ case(key, value) =>
if (value.trim.isEmpty) {
throw new Exception("Error setting authentication - value for " + key + " not set")
}
val fullKey = "twitter4j.oauth." + key.replace("api", "consumer")
System.setProperty(fullKey, value.trim)
println("\tProperty " + fullKey + " set as [" + value.trim + "]")
}
println()
}
// Configure Twitter credentials
val apiKey = "xxxxxxxxxxxxxxxxxxxxxxxxx"
val apiSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
val accessToken = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
val accessTokenSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)
import org.apache.spark.streaming.twitter._
val ssc = new StreamingContext(sc, Seconds(2))
val tweets = TwitterUtils.createStream(ssc, None)
val twt = tweets.window(Seconds(60))
case class Tweet(createdAt:Long, text:String)
twt.map(status=>
Tweet(status.getCreatedAt().getTime()/1000, status.getText())
).foreachRDD(rdd=>
// Below line works only in spark 1.3.0.
// For spark 1.1.x and spark 1.2.x,
// use rdd.registerTempTable("tweets") instead.
rdd.toDF().registerAsTable("tweets")
)
twt.print
ssc.start()
However I get the following error: <console>:504: error: value toDF is not a member of org.apache.spark.rdd.RDD[Tweet]
rdd.toDF().registerAsTable("tweets") I have Spark 1.6.x.2.4 and Zeppelin 0.6
... View more
Labels:
- Labels:
-
Apache Spark
-
Apache Zeppelin