Member since
12-10-2014
25
Posts
0
Kudos Received
0
Solutions
06-04-2016
07:49 AM
Thank you Sean, but the link that you have provided are returning this message: The core node you are trying to access was not found, it may have been deleted. Please refresh your original page and try the operation again. EDIT, now i have noticed that there is an extra ")".
... View more
06-03-2016
02:40 AM
Hi Sean, doing this provokes that spark-worker doesn't run and Hue does not work properly, i think that it needs internally to use quickstart.cloudera. If i restart again the vmware image and i redo again what i did within /etc/init.d/cloudera-quickstart-init with the call to cloudera-quickstart-ip, i can login again to Hue, and spark-history-manager runs properly: I have noticed that running a ps xa | grep spark... [cloudera@quickstart ~]$ ps xa | grep spark
6330 ? Sl 0:03 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp /usr/lib/spark/conf/:/usr/lib/spark/lib/spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar:/etc/hadoop/conf/:/usr/lib/spark/lib/spark-assembly.jar:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/* -Dspark.deploy.defaultCores=4 -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master
6499 ? Sl 0:04 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp /usr/lib/spark/conf/:/usr/lib/spark/lib/spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar:/etc/hadoop/conf/:/usr/lib/spark/lib/spark-assembly.jar:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/* -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker spark://quickstart.cloudera:7077
6674 ? Sl 0:05 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp /etc/spark/conf/:/usr/lib/spark/lib/spark-assembly-1.6.0-cdh5.7.0-hadoop2.6.0-cdh5.7.0.jar:/etc/hadoop/conf/:/usr/lib/spark/lib/spark-assembly.jar:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/* -Dspark.history.fs.logDirectory=hdfs:///user/spark/applicationHistory -Dspark.history.ui.port=18088 -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.history.HistoryServer
6915 pts/0 R+ 0:00 grep spark As you can see, spark-master runs with 4 cores (-Dspark.deploy.defaultCores=4) with no dedicated cores to worker, is it normal? [cloudera@quickstart ~]$ sudo service spark-worker status
Spark worker is running [ OK ]
[cloudera@quickstart ~]$ sudo service spark-master status
Spark master is running [ OK ]
[cloudera@quickstart ~]$ sudo service spark-history-server status
Spark history-server is running [ OK ] As you can see, it looks normal, but examining http://quickstart.cloudera:18080/ , the spark-master, i can see that URL: spark://192.168.30.137:7077
REST URL: spark://192.168.30.137:6066 (cluster mode)
Alive Workers: 0
Cores in use: 0 Total, 0 Used
Memory in use: 0.0 B Total, 0.0 B Used
Applications: 0 Running, 0 Completed
Drivers: 0 Running, 0 Completed
Status: ALIVE zero cores in use from zero in total! with no memory, that´s strange, because spark-master tries to use 4 cores (-Dspark.deploy.defaultCores=4) and 1GB (-Xms1g -Xmx1g -XX:MaxPermSize=256m) Then, you can see this output from spark-worker: ID: worker-20160603111341-192.168.30.137-7078
Master URL:
Cores: 4 (0 Used)
Memory: 6.7 GB (0.0 B Used) the master URL is not setup and it is using 4 cores with 6.7 GB when spark-worker is running with this setup: -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker spark://quickstart.cloudera:7077 What do you think, what can i do in order to continue developing my project? because, that is what i want, use this vmware image to develop this project using the hdfs to load a tiny file, only 16MB. What most annoys me is that the code works perfectly in the spark-shell of the virtual image but when I try to make it run programmatically creating the unix with SBT-pack command does not work. Regards Alonso
... View more
06-03-2016
02:11 AM
Hi Sean, that change does not have any effect, spark-worker doesn't run, even if i try to restart it manually with sudo service spark-worker restart, it failed as soon as i ask by its status. Also, Hue does not work, i think it happens because internally uses quickstart.cloudera in order to talk with another component... I am starting to think that this vmware image is useless to develop something related with spark, i cannot run anything...
... View more
06-03-2016
01:44 AM
Hi Sean this is how it looks /etc/hosts by default, when the image is restarted: [cloudera@quickstart ~]$ cat /etc/hosts 127.0.0.1 quickstart.cloudera quickstart localhost localhost.domain
... View more
06-02-2016
11:56 AM
I wanted to say, i do not know what to do next. I have readed this blog post, http://www.datastax.com/dev/blog/common-spark-troubleshooting, using that post i figure out to modify programmatically this variable: .set("spark.cores.max", "2") and modify /etc/spark/conf/spark-env.sh to put this export: export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=4" Is there anything i can do in order to run a simple spark job with a file of 16MB? another thing i noticed is that i HAVE to put this programmatically in the sparkconf in order to run the job: .set("spark.driver.allowMultipleContexts", "true") Please help...
... View more
06-02-2016
11:51 AM
Hi Sean, i am adding more research about this issue. I have added programmatically this line to SparkConf .set("spark.cores.max", "2") and this line to /etc/spark/conf/spark-env.sh ##ADDED!!
export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=4" with no luck, if i i check /var/log/spark-worker.out file, these are last 50 lines: 16/06/02 20:47:43 INFO worker.Worker: Retrying connection to master (attempt # 11)
16/06/02 20:47:43 INFO worker.Worker: Connecting to master quickstart.cloudera:7077...
16/06/02 20:47:43 WARN worker.Worker: Failed to connect to master quickstart.cloudera:7077
java.io.IOException: Failed to connect to quickstart.cloudera/127.0.0.1:7077
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:199)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:186)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: quickstart.cloudera/127.0.0.1:7077
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) the worker cannot talk to master. I starting to think that this vmware image running with 8GB and 4 cores is not enough to run a simple spark job programmatically, but why i can run the same code within a spark-shell? Please, help, i do not want what to do...
... View more
06-02-2016
09:45 AM
Hi Sean, if i update /etc/hosts removing the line with 127.0.0.1 and using a line with the assigned ip instead of 127.0.0.1, i am getting this exception: Caused by: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1223)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:65)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1220) I I am pretty sure that i can reach to my host machine and i can reach to vmware image from my host. host -> vmware image MacBook-Pro-Retina-de-Alonso:my-recommendation-spark-engine aironman$ ping 192.168.30.139
PING 192.168.30.139 (192.168.30.139): 56 data bytes
64 bytes from 192.168.30.139: icmp_seq=0 ttl=64 time=0.462 ms
64 bytes from 192.168.30.139: icmp_seq=1 ttl=64 time=0.353 ms
64 bytes from 192.168.30.139: icmp_seq=2 ttl=64 time=0.311 ms
64 bytes from 192.168.30.139: icmp_seq=3 ttl=64 time=0.310 ms
64 bytes from 192.168.30.139: icmp_seq=4 ttl=64 time=0.273 ms vmware image -> host [cloudera@quickstart bin]$ ping 192.168.1.35
PING 192.168.1.35 (192.168.1.35) 56(84) bytes of data.
64 bytes from 192.168.1.35: icmp_seq=1 ttl=128 time=0.272 ms
64 bytes from 192.168.1.35: icmp_seq=2 ttl=128 time=0.416 ms
64 bytes from 192.168.1.35: icmp_seq=3 ttl=128 time=0.348 ms
64 bytes from 192.168.1.35: icmp_seq=4 ttl=128 time=0.367 ms
64 bytes from 192.168.1.35: icmp_seq=5 ttl=128 time=0.432 ms It is not a problem of the software trying to talk with the kafka server located in host machine, i can do a telnet to host machine and kafka port: [cloudera@quickstart bin]$ telnet 192.168.1.35 9092
Trying 192.168.1.35...
Connected to 192.168.1.35.
Escape character is '^]'.
Connection closed by foreign host. Please, help me Sean, i am desperate, i cannot run nothing in this vmware image...
... View more
06-01-2016
08:50 AM
Updating thread with relevant update (06/01/2016): I have a vmware cloudera image, cdh-5.7 running with centos6.8, i am using OS X as my development machine, and the cdh image to run the code. UPDATE This is the build.sbt that i am currently using, i just have updated spark version from official (1.6.1) to 1.6.0-cdh5.7.0: [cloudera@quickstart awesome-recommendation-engine]$ cat build.sbt
name := "my-recommendation-spark-engine"
version := "1.0-SNAPSHOT"
scalaVersion := "2.10.4"
val sparkVersion = "1.6.0-cdh5.7.0"
val akkaVersion = "2.3.11" // override Akka to be this version to match the one in Spark
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka_2.10" % "0.8.1"
exclude("javax.jms", "jms")
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri"),
// HTTP client to request data to Amazon
"net.databinder.dispatch" %% "dispatch-core" % "0.11.1",
// HTML parser
"org.jodd" % "jodd-lagarto" % "3.5.2",
"com.typesafe" % "config" % "1.2.1",
"com.typesafe.play" % "play-json_2.10" % "2.4.0-M2",
"org.scalatest" % "scalatest_2.10" % "2.2.1" % "test",
"org.twitter4j" % "twitter4j-core" % "4.0.2",
"org.twitter4j" % "twitter4j-stream" % "4.0.2",
"org.codehaus.jackson" % "jackson-core-asl" % "1.6.1",
"org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test",
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.0-cdh5.7.0",
"org.apache.spark" % "spark-core_2.10" % "1.6.0-cdh5.7.0",
"org.apache.spark" % "spark-streaming_2.10" % "1.6.0-cdh5.7.0",
"org.apache.spark" % "spark-sql_2.10" % "1.6.0-cdh5.7.0",
"org.apache.spark" % "spark-mllib_2.10" % "1.6.0-cdh5.7.0",
"com.google.code.gson" % "gson" % "2.6.2",
"commons-cli" % "commons-cli" % "1.3.1",
"com.stratio.datasource" % "spark-mongodb_2.10" % "0.11.1",
// Akka
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
// MongoDB
"org.reactivemongo" %% "reactivemongo" % "0.10.0"
)
packAutoSettings
resolvers ++= Seq(
"JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
"Spray Repository" at "http://repo.spray.cc/",
"Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",
"Akka Repository" at "http://repo.akka.io/releases/",
"Twitter4J Repository" at "http://twitter4j.org/maven2/",
"Apache HBase" at "https://repository.apache.org/content/repositories/releases",
"Twitter Maven Repo" at "http://maven.twttr.com/",
"scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools",
"Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/",
"Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/",
"Mesosphere Public Repository" at "http://downloads.mesosphere.io/maven",
Resolver.sonatypeRepo("public")
) This is my /etc/hosts file located in the cdh image with a line like this: 127.0.0.1 quickstart.cloudera quickstart localhost localhost.domain The cloudera version that i am running is: [cloudera@quickstart bin]$ cat /usr/lib/hadoop/cloudera/cdh_version.properties
# Autogenerated build properties
version=2.6.0-cdh5.7.0
git.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a
cloudera.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a
cloudera.cdh.hash=e7465a27c5da4ceee397421b89e924e67bc3cbe1
cloudera.cdh-packaging.hash=8f9a1632ebfb9da946f7d8a3a8cf86efcdccec76
cloudera.base-branch=cdh5-base-2.6.0
cloudera.build-branch=cdh5-2.6.0_5.7.0
cloudera.pkg.version=2.6.0+cdh5.7.0+1280
cloudera.pkg.release=1.cdh5.7.0.p0.92
cloudera.cdh.release=cdh5.7.0
cloudera.build.time=2016.03.23-18:30:29GMT I can do a ls command in the vmware machine: [cloudera@quickstart ~]$ hdfs dfs -ls /user/cloudera/ratings.csv
-rw-r--r-- 1 cloudera cloudera 16906296 2016-05-30 11:29 /user/cloudera/ratings.csv I can read its content: [cloudera@quickstart ~]$ hdfs dfs -cat /user/cloudera/ratings.csv | wc -l
568454 The code is quite simple, just trying to map its content: val ratingFile="hdfs://192.168.1.40:8020/user/cloudera/ratings.csv"
//where 192.168.1.40 is the eth0 assigned ip of cloudera image
case class AmazonRating(userId: String, productId: String, rating: Double)
val NumRecommendations = 10
val MinRecommendationsPerUser = 10
val MaxRecommendationsPerUser = 20
val MyUsername = "myself"
val NumPartitions = 20
println("Using this ratingFile: " + ratingFile)
// first create an RDD out of the rating file
val rawTrainingRatings = sc.textFile(ratingFile).map {
line =>
val Array(userId, productId, scoreStr) = line.split(",")
AmazonRating(userId, productId, scoreStr.toDouble)
}
// only keep users that have rated between MinRecommendationsPerUser and MaxRecommendationsPerUser products
val trainingRatings = rawTrainingRatings.groupBy(_.userId).filter(r => MinRecommendationsPerUser <= r._2.size && r._2.size < MaxRecommendationsPerUser).flatMap(_._2).repartition(NumPartitions).cache()
println(s"Parsed $ratingFile. Kept ${trainingRatings.count()} ratings out of ${rawTrainingRatings.count()}") I am getting this message: **16/06/01 17:20:04 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources** because if i run the exact code within the spark-shell, i got this message: Parsed hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv. Kept 73279 ratings out of 568454 Why is it working fine within the spark-shell but it is not programmatically running in the vmware image? UPDATE I am running the code using sbt-pack plugin to generate unix commands and run them within the vmware image which has the spark pseudocluster, This is the code i use to instantiate the sparkconf: val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector")
.setMaster("spark://192.168.1.40:7077") .set("spark.driver.allowMultipleContexts", "true")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val ssc = new StreamingContext(sparkConf, Seconds(2))
//this checkpointdir should be in a conf file, for now it is hardcoded!
val streamingCheckpointDir = "/home/cloudera/my-recommendation-spark-engine/checkpoint"
ssc.checkpoint(streamingCheckpointDir) I think that this must be a misconfiguration in a cloudera configuration file, but which one? UPDATE2 06/01/2016 Ok, changing the ip (192.168.1.40) instead of the fully qualified name (quickstart.cloudera) now eliminates the previous exception but now this warning arises: **16/06/01 17:20:04 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources** If i run the next commands: [cloudera@quickstart awesome-recommendation-engine]$ sudo service spark-master status
Spark master is running [ OK ]
[cloudera@quickstart awesome-recommendation-engine]$ sudo service spark-worker status
Spark worker is running [ OK ] I can see that spark-master and spark-worker are running, but when i check in 192.168.1.40:18081, the web page that checks spark-worker status, i see: ID: worker-20160601173323-192.168.1.40-7078
Master URL:
Cores: 1 (0 Used)
Memory: 2.7 GB (0.0 B Used)
Back to Master
Running Executors (0)
ExecutorID Cores State Memory Job Details Logs Nothing to show!, no ExecutorID, no Cores, no State, no Memory, nothing at all! In fact, after few minutes or less, when i run the next status command: [cloudera@quickstart awesome-recommendation-engine]$ sudo service spark-worker status
Spark worker is dead and pid file exists [FAILED]
[cloudera@quickstart awesome-recommendation-engine]$ sudo service spark-master status
Spark master is running [ OK ] I guess that i have to increase resources to the vmware image, more ram and maybe another available core from host machine, isn't? Thank you very much for reading until here.
... View more
06-01-2016
02:35 AM
Hi Sean, thanks for the answer, but, ¿what are you exactly suggesting i have to do? I mean, i have a vmware process running a cloudera image simulating a spark cluster inside of a OS X host. In this host i am running a single instance of kafka and the spark process has not any problem trying to reach to that kafka process when i use setMaster("local[*]"), but the software crashes quickly when i try to use setMaster("spark://quickstart.cloudera:7077"). An exception rises... java.io.IOException: Failed to connect to quickstart.cloudera/127.0.0.1:7077
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:200)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:187)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:183)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: quickstart.cloudera/127.0.0.1:7077
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) I was thinking that this could be related with used dependencies, this is my build.sbt: [cloudera@quickstart awesome-recommendation-engine]$ cat build.sbt
name := "my-recommendation-spark-engine"
version := "1.0-SNAPSHOT"
scalaVersion := "2.10.4"
val sparkVersion = "1.6.1"
val akkaVersion = "2.3.11" // override Akka to be this version to match the one in Spark
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka_2.10" % "0.8.1"
exclude("javax.jms", "jms")
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri"),
//not working play module!! check
//jdbc,
//anorm,
//cache,
// HTTP client
"net.databinder.dispatch" %% "dispatch-core" % "0.11.1",
// HTML parser
"org.jodd" % "jodd-lagarto" % "3.5.2",
"com.typesafe" % "config" % "1.2.1",
"com.typesafe.play" % "play-json_2.10" % "2.4.0-M2",
"org.scalatest" % "scalatest_2.10" % "2.2.1" % "test",
"org.twitter4j" % "twitter4j-core" % "4.0.2",
"org.twitter4j" % "twitter4j-stream" % "4.0.2",
"org.codehaus.jackson" % "jackson-core-asl" % "1.6.1",
"org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test",
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1",
"org.apache.spark" % "spark-core_2.10" % "1.6.1",
"org.apache.spark" % "spark-streaming_2.10" % "1.6.1",
"org.apache.spark" % "spark-sql_2.10" % "1.6.1",
"org.apache.spark" % "spark-mllib_2.10" % "1.6.1",
"com.google.code.gson" % "gson" % "2.6.2",
"commons-cli" % "commons-cli" % "1.3.1",
"com.stratio.datasource" % "spark-mongodb_2.10" % "0.11.1",
// Akka
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
// MongoDB
"org.reactivemongo" %% "reactivemongo" % "0.10.0"
)
packAutoSettings
resolvers ++= Seq(
"JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
"Spray Repository" at "http://repo.spray.cc/",
"Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",
"Akka Repository" at "http://repo.akka.io/releases/",
"Twitter4J Repository" at "http://twitter4j.org/maven2/",
"Apache HBase" at "https://repository.apache.org/content/repositories/releases",
"Twitter Maven Repo" at "http://maven.twttr.com/",
"scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools",
"Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/",
"Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/",
"Mesosphere Public Repository" at "http://downloads.mesosphere.io/maven",
Resolver.sonatypeRepo("public")
) Could it be that i have to use the spark jars from cloudera? With this build.sbt, i am getting that exception...
... View more
05-31-2016
02:20 AM
Updating the thread with better info, why i cant modify the previous thread like in StackOverflow? I have a vmware cloudera image, cdh-5.7 running with centos6.8, i am using OS X as my development machine, and the cdh image to run the code, i have modified my /etc/hosts file located in the cdh image with a line like this: 127.0.0.1 quickstart.cloudera quickstart localhost localhost.domain
192.168.30.138 quickstart.cloudera quickstart localhost localhost.domain The cloudera version that i am running is: [cloudera@quickstart bin]$ cat /usr/lib/hadoop/cloudera/cdh_version.properties
# Autogenerated build properties
version=2.6.0-cdh5.7.0
git.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a
cloudera.hash=c00978c67b0d3fe9f3b896b5030741bd40bf541a
cloudera.cdh.hash=e7465a27c5da4ceee397421b89e924e67bc3cbe1
cloudera.cdh-packaging.hash=8f9a1632ebfb9da946f7d8a3a8cf86efcdccec76
cloudera.base-branch=cdh5-base-2.6.0
cloudera.build-branch=cdh5-2.6.0_5.7.0
cloudera.pkg.version=2.6.0+cdh5.7.0+1280
cloudera.pkg.release=1.cdh5.7.0.p0.92
cloudera.cdh.release=cdh5.7.0
cloudera.build.time=2016.03.23-18:30:29GMT I can do a ls command in the vmware machine: [cloudera@quickstart ~]$ hdfs dfs -ls /user/cloudera/ratings.csv
-rw-r--r-- 1 cloudera cloudera 16906296 2016-05-30 11:29 /user/cloudera/ratings.csv
I can read its content:
[cloudera@quickstart ~]$ hdfs dfs -cat /user/cloudera/ratings.csv | wc -l
568454 The code is quite simple, just trying to map its content: val ratingFile="hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv"
case class AmazonRating(userId: String, productId: String, rating: Double)
val NumRecommendations = 10
val MinRecommendationsPerUser = 10
val MaxRecommendationsPerUser = 20
val MyUsername = "myself"
val NumPartitions = 20
println("Using this ratingFile: " + ratingFile)
// first create an RDD out of the rating file
val rawTrainingRatings = sc.textFile(ratingFile).map {
line =>
val Array(userId, productId, scoreStr) = line.split(",")
AmazonRating(userId, productId, scoreStr.toDouble)
}
// only keep users that have rated between MinRecommendationsPerUser and MaxRecommendationsPerUser products
val trainingRatings = rawTrainingRatings.groupBy(_.userId).filter(r => MinRecommendationsPerUser <= r._2.size && r._2.size < MaxRecommendationsPerUser).flatMap(_._2).repartition(NumPartitions).cache()
println(s"Parsed $ratingFile. Kept ${trainingRatings.count()} ratings out of ${rawTrainingRatings.count()}") I am getting this message: Parsed hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv. Kept 0 ratings out of 568454 because if i run the exact code within the spark-shell, i got this message: Parsed hdfs://quickstart.cloudera:8020/user/cloudera/ratings.csv. Kept 73279 ratings out of 568454 Why is it working fine within the spark-shell but it is not programmatically running in the vmware image? UPDATE I am running the code using sbt-pack plugin to generate unix commands and run them within the vmware image which has the spark pseudocluster, This is the code i use to instantiate the sparkconf: val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector")
.setMaster("local[4]") .set("spark.driver.allowMultipleContexts", "true")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val ssc = new StreamingContext(sparkConf, Seconds(2))
//this checkpointdir should be in a conf file, for now it is hardcoded!
val streamingCheckpointDir = "/home/cloudera/my-recommendation-spark-engine/checkpoint"
ssc.checkpoint(streamingCheckpointDir) I have tried to use this way of setting spark master, but an exception raises, i suspect that this is symptomatic of my problem. //.setMaster("spark://quickstart.cloudera:7077") The exception when i try to use the fully qualified domain name: .setMaster("spark://quickstart.cloudera:7077") java.io.IOException: Failed to connect to quickstart.cloudera/127.0.0.1:7077
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:200)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:187)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:183)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: quickstart.cloudera/127.0.0.1:7077
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) I can ping to quickstart.cloudera in the cloudera terminal, so why i can't use .setMaster("spark://quickstart.cloudera:7077") instead of .setMaster("local[*]"): [cloudera@quickstart bin]$ ping quickstart.cloudera
PING quickstart.cloudera (127.0.0.1) 56(84) bytes of data.
64 bytes from quickstart.cloudera (127.0.0.1): icmp_seq=1 ttl=64 time=0.019 ms
64 bytes from quickstart.cloudera (127.0.0.1): icmp_seq=2 ttl=64 time=0.026 ms
64 bytes from quickstart.cloudera (127.0.0.1): icmp_seq=3 ttl=64 time=0.026 ms
64 bytes from quickstart.cloudera (127.0.0.1): icmp_seq=4 ttl=64 time=0.028 ms
64 bytes from quickstart.cloudera (127.0.0.1): icmp_seq=5 ttl=64 time=0.026 ms
64 bytes from quickstart.cloudera (127.0.0.1): icmp_seq=6 ttl=64 time=0.020 ms And the port 7077 is listening to incoming calls: [cloudera@quickstart bin]$ netstat -nap | grep 7077
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp 0 0 192.168.30.138:7077 0.0.0.0:* LISTEN
[cloudera@quickstart bin]$ ping 192.168.30.138
PING 192.168.30.138 (192.168.30.138) 56(84) bytes of data.
64 bytes from 192.168.30.138: icmp_seq=1 ttl=64 time=0.023 ms
64 bytes from 192.168.30.138: icmp_seq=2 ttl=64 time=0.026 ms
64 bytes from 192.168.30.138: icmp_seq=3 ttl=64 time=0.028 ms
^C
--- 192.168.30.138 ping statistics ---
3 packets transmitted, 3 received, 0% packet loss, time 2810ms
rtt min/avg/max/mdev = 0.023/0.025/0.028/0.006 ms
[cloudera@quickstart bin]$ ifconfig
eth2 Link encap:Ethernet HWaddr 00:0C:29:6F:80:D2
inet addr:192.168.30.138 Bcast:192.168.30.255 Mask:255.255.255.0
UP BROADCAST RUNNING MULTICAST MTU:1500 Metric:1
RX packets:8612 errors:0 dropped:0 overruns:0 frame:0
TX packets:8493 errors:0 dropped:0 overruns:0 carrier:0
collisions:0 txqueuelen:1000
RX bytes:2917515 (2.7 MiB) TX bytes:849750 (829.8 KiB)
lo Link encap:Local Loopback
inet addr:127.0.0.1 Mask:255.0.0.0
UP LOOPBACK RUNNING MTU:65536 Metric:1
RX packets:57534 errors:0 dropped:0 overruns:0 frame:0
TX packets:57534 errors:0 dropped:0 overruns:0 carrier:0
collisions:0 txqueuelen:0
RX bytes:44440656 (42.3 MiB) TX bytes:44440656 (42.3 MiB) I think that this must be a misconfiguration in a cloudera configuration file, but which one? Thank you very much for reading until here.
... View more