Member since
10-16-2016
46
Posts
9
Kudos Received
0
Solutions
02-28-2017
07:02 PM
Thanks for your answer @Dan Zaratsian ..i implemented a solution similar to what you suggested but still facing performance issue could you please take a look at https://community.hortonworks.com/questions/85745/spark-writing-data-to-amazondynamodb-is-slow.html
... View more
02-24-2017
04:37 AM
1 Kudo
We have spark streaming job ..writing data to AmazonDynamoDB using foreachRDD but it is very slow with our consumption rate at 10,000/sec and writing 10,000 takes 35min ...this is the code piece From research learnt that using foreachpartition and creating a connection per partition will help ..but not sure how to go about writing code for it ..will greatly appreciate if someone can help with this ...Also any other suggestion to speed up writing is greatly appreciated tempRequestsWithState.foreachRDD { rdd =>
if ((rdd != null) && (rdd.count() > 0) && (!rdd.isEmpty()) ) {
rdd.foreachPartition {
case (topicsTableName, hashKeyTemp, attributeValueUpdate) => {
val client = new AmazonDynamoDBClient()
val request = new UpdateItemRequest(topicsTableName, hashKeyTemp, attributeValueUpdate)
try client.updateItem(request)
catch {
case se: Exception => println("Error executing updateItem!\nTable ", se)
}
}
case null =>
}
}
}
... View more
Labels:
- Labels:
-
Apache Spark
02-24-2017
01:34 AM
@Bikas thanks a lot for your suggestion ! the link you provided doesn't seem to be working could you please provide some sample code of how to write foreachpartition as i'm not familiar with it
... View more
02-22-2017
06:24 AM
2 Kudos
We have a spark streaming application where we receive a dstream from kafka and need to store to dynamoDB ....i'm experimenting with two ways to do it as described in the code below requestsWithState is a Dstream Code Snippet 1 with foreachRDD: <code>requestsWithState.foreachRDD { rdd =>
println("Data being populated to Pulsar")
rdd.foreach { case (id, eventStream) =>
println("id is " + id + " Event is " + eventStream)
val dynamoConnection = setupDynamoClientConnection()
DBUtils.putItem(dynamoConnection, id, eventStream.toString())
}
}
Code Snippet 2 with map: <code> requestsWithState.map (rdd => { rdd match {
case (id, eventStream) => {
println("id is " + id + " Event is " + eventStream)
val dynamoConnection = setupDynamoClientConnection()
DBUtils.putItem(dynamoConnection, id, eventStream.toString())
}
}
})
requestsWithState.print(1)
Code Snippet1 work's fine and populates the database...the second code snippet doesn't work ....we would love to know the reason behind it and the way we can make it work ........the reason we are experimenting ( we know it's a transformation and foreachRdd is an action) is foreachRdd is very slow for our use case with heavy load on a cluster and we found that map is much faster if we can get it working.....please help us get map code working
... View more
Labels:
- Labels:
-
Apache Spark
02-02-2017
07:11 PM
@Geoffrey Shelton Okot we are not using Ambari or Cloudera Manager , we are using apache hadoop 2.7.3 all changes i'm making are through cli , i have added additional info to the question please let me know if any more info is required
... View more
02-02-2017
04:56 PM
@Divakar Annapureddy We are not using amabri for management ...we are using apache hadoop 2.7.3
... View more
02-02-2017
01:17 PM
We have an application managed by yarn when we change yarn-site.xml those changes are not reflected , application is still running with old configuration. We are new to Yarn any help in this regard will be helpful Note : we have already tried restarted yarn using stop-yarn.sh and start-yarn.sh also restared dfs using start-dfs.sh and stop-dfs.sh . We are using hadoop 2.7.3 this is what yarn looks with only max memory configured to 16GB as shown in the picture but actual configuration is 22GB as per yarn-site.xml this is the yarn-site.xml
<configuration>
<!-- Site specific YARN configuration properties -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hdfs-name-node</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>21528</value>
</property>
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>6</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>21528</value>
</property>
<property>
<name>yarn.nodemanager.local-dirs</name>
<value>file:///tmp/hadoop/data/nm-local-dir,file:///tmp/hadoop/data/nm-local-dir/filecache,file:///tmp/hadoop/data/nm-local-dir/usercache</value>
</property>
<property>
<name>yarn.nodemanager.localizer.cache.cleanup.interval-ms</name>
<value>500</value>
</property>
<property>
<name>yarn.nodemanager.localizer.cache.target-size-mb</name>
<value>512</value>
</property>
</configuration>
this is the node configuration 1 Master/Driver Node :
Memory :24GB Cores :8
4 Worker Nodes :
Memory :24GB Cores :8
... View more
Labels:
- Labels:
-
Apache Hadoop
-
Apache YARN
01-09-2017
02:59 PM
1 Kudo
We are running a spark streaming job with yarn as resource manager, noticing that these two directories are getting filled up on the data nodes and we are running out of space when we only run for couple of min's /tmp/hadoop/data/nm-local-dir/filecache /tmp/hadoop/data/nm-local-dir/filecache these directories are not getting cleared automatically , from my research found that this property need's to be set, yarn.nodemanager.localizer.cache.cleanup.interval-ms Even after setting this up ..it's not automatically clearing out any help will be greatly appreciated <configuration>
~
~ <property>
~ <name>yarn.nodemanager.aux-services</name>
~ <value>mapreduce_shuffle</value>
~ </property>
~
~ <property>
~ <name>yarn.resourcemanager.hostname</name>
~ <value>hdfs-name-node</value>
~ </property>
~
~ <property>
~ <name>yarn.nodemanager.resource.memory-mb</name>
~ <value>16384</value>
~ </property>
~
~ <property>
~ <name>yarn.nodemanager.resource.cpu-vcores</name>
~ <value>6</value>
~ </property>
~
~ <property>
~ <name>yarn.scheduler.maximum-allocation-mb</name>
~ <value>16384</value>
~ </property>
<property>
~ <name>yarn.nodemanager.localizer.cache.cleanup.interval-ms</name>
~ <value>3000</value>
~ </property>
~
~ <!-- Needs to be explicitly set as part of a workaround for YARN-367.
~ | If changing this property, you must also change the
~ | hadoop.tmp.dir property in hdfs-site.xml. This location must always
~ | be a subdirectory of the location specified in hadoop.tmp.dir. This
~ | affects all versions of Yarn 2.0.0 through 2.7.3+. -->
~ <property>
~ <name>yarn.nodemanager.local-dirs</name>
~ <value>file:///tmp/hadoop/data/nm-local-dir</value>
~ </property>
~
~ </configuration>
... View more
Labels:
- Labels:
-
Apache Hadoop
-
Apache Spark
-
Apache YARN
12-25-2016
05:33 PM
java.lang.NullPointerException
at DevMain$anonfun$5.apply(DevMain.scala:2)
at (DevMain.scala:2)
at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:389)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1631)
at org.apache.spark.rdd.RDD$anonfun$count$1.apply(RDD.scala:1157)
at org.apache.spark.rdd.RDD$anonfun$count$1.apply(RDD.scala:1157)
at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
at org.apache.spark.rdd.RDD.count(RDD.scala:1157)
at DevMain$anonfun$10.apply(DevMain.scala:2)
at DevMain$anonfun$10.apply(DevMain.scala:2)
at org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at DevMain$anonfun$5.apply(DevMain.scala:2)
at (DevMain.scala:2)
at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:389)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1631)
at org.apache.spark.rdd.RDD$anonfun$count$1.apply(RDD.scala:1157)
at org.apache.spark.rdd.RDD$anonfun$count$1.apply(RDD.scala:1157)
at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
... View more
12-25-2016
05:23 PM
Thanks for pointing out Constantin, i have corrected it ....sure i have added exception block ...running on a 4 node cluster
... View more