Member since
10-16-2016
46
Posts
9
Kudos Received
0
Solutions
03-16-2017
04:26 AM
We have spark streaming application where we consume events from Kafka ..we want to aggregate the event's over a period of time by the traceid in each event and create an aggregate event for that traceid and write the aggreagated event into a database our events are like this <code>traceid: 123
{
info: abc;
}
traceid: 123
{
info:bcd;
}
Now what we want to achieve is create an aggregate event over a period of time say 2mins and write the aggregated event into database instead of individual event's <code>traceid: 123
{
info:abc,bcd
}
we used mapwithState and came up with this code <code>val stateSpec = StateSpec.function(trackStateFunc _).timeout(Minutes(2)).
val requestsWithState = tempLines.mapWithState(stateSpec)
requestsWithState.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => { record match {
case (accountId, enrichedId, ets, attributeMap) =>
if (validateRecordForStorage(accountId, enrichedId, ets, attributeMap)) {
val ds = new DBDataStore(connection)
ds.saveEnrichedEvent(accountId, enrichedId, ets, attributeMap)
//val r = scala.util.Random
} else {
/*logError("Discarded record [enrichedId=" + enrichedId
+ ", accountId=" + accountId
+ ", ets=" + ets
+ ", attributes=" + attributeMap.toString() + "]")*/
println("Discarded record [enrichedId=" + enrichedId
+ ", accountId=" + accountId
+ ", ets=" + ets
+ "]")
null
}
case default => {
logInfo("You gave me: " + default)
null
}
}
}
)
}
}
the mapwithState aggregates fine ...but our understanding is ..it should start writing to the database only after 2 min's but ..noticing that it start's writing immediately to the database without waiting for 2min's .....so definitely our understanding is not right if someone can please guide us in achieving our goal of writing to database only after aggregating for 2mins would greatly help
... View more
Labels:
03-10-2017
07:54 AM
We have a spark streaming application receiving data from kafka parse it and save it to database using the code below val lines: DStream[MetricTypes.InputStreamType] = myConsumer.createDefaultStream()
val keyDeLines = lines.filter(x => ((null != x) && !(x.equals(null) && (x._1 != null) && (x._2 != null) ))).map(x => {
val lmeMap: RawMetricsExtractor = new JsonExtractor[HttpEvent]()
try lmeMap.aParser(x)
catch {
case ase: Exception =>
logError("Error parsing item + "+ new GsonBuilder().setPrettyPrinting().create.toJson(x._2) + "]", ase)
None
}
//lmeMap.aParser(x)
})
val tempLines = keyDeLines.filter(x => ((null != x) && !(x.equals(null)))).filter(_.isDefined).map(_.get).map(x => {
val keyExtractMap: KeyExtractor[MetricTypes.EnrichedKeyType] = new EnrichedEventExtractor()
val eventExtractMap: KeyExtractor[MetricTypes.EventKeyType] = new TopicIdExtractor()
(eventExtractMap.getKey(x), keyExtractMap.getKey(x))
})
val stateSpec = StateSpec.function(trackStateFunc _).timeout(Minutes(2))
// val requestsWithState = pKeyDlines.mapWithState(stateSpec)
val requestsWithState = tempLines.mapWithState(stateSpec)
requestsWithState.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => { record match {
case (accountId, enrichedId, ets, attributeMap) =>
if (validateRecordForStorage(accountId, enrichedId, ets, attributeMap)) {
val ds = new DynamoDBDataStore(connection)
ds.saveEnrichedEvent(accountId, enrichedId, ets, attributeMap)
//val r = scala.util.Random
} else {
/*logError("Discarded record [enrichedId=" + enrichedId
+ ", accountId=" + accountId
+ ", ets=" + ets
+ ", attributes=" + attributeMap.toString() + "]")*/
println("Discarded record [enrichedId=" + enrichedId
+ ", accountId=" + accountId
+ ", ets=" + ets
+ "]")
null
}
case default => {
logInfo("You gave me: " + default)
null
}
}
}
)
}
}
private def createNewConnection():AmazonDynamoDBClient = {
val amazonAWSAccessKey = "abcd"
val amazonAWSSecretKey = "1234"
val amazonDynamoDBEndpoint = "http:endpoint:9090"
val client = new AmazonDynamoDBClient(new BasicAWSCredentials(amazonAWSAccessKey, amazonAWSSecretKey))
client.setEndpoint(amazonDynamoDBEndpoint)
return client;
}
this is how much time each task is taking we are consuming @10,000/sec and writing 65000/8.4 min ...would greatly appreciate if someone could please point out why our job is so slow it would of great help
... View more
Labels:
03-07-2017
03:10 AM
We have streaming application which has count action tempRequestsWithState is a DStream
tempRequestsWithState.foreachRDD { rdd =>
print (rdd.count())
}
The count action is taking a lot of time and slow taking about 30 mins Would greatly appreciate if anyone could suggest a way to speedup this action as we are consuming @ 10,000 events/sec Also noticed we have 54 partitions for each RDD
... View more
Labels:
03-06-2017
05:28 AM
As per the spark documention as per the link here http://spark.apache.org/docs/latest/streaming-programming-guide.html#other-points-to-remember "DStreams are executed lazily by the output operations, just like RDDs are lazily executed by RDD actions. Specifically, RDD actions inside the DStream output operations force the processing of the received data. Hence, if your application does not have any output operation, or has output operations like dstream.foreachRDD() without any RDD action inside them, then nothing will get executed. The system will simply receive the data and discard it." We have a spark streaming application with map operation followed by the DStream output operation As pert the documentation we have a RDD action inside foreachRDD which is rdd.first() but still nothing happen's tempRequestsWithState is DStream
tempRequestsWithState.foreachRDD { rdd =>
rdd.first()
} but interestingly if we do rdd.foreach inside foreachRDD application run's perfectly .... tempRequestsWithState.foreachRDD { rdd =>
rdd.foreach {
}
} In our case rdd.foreach is a very slow operation and would like to avoid it as we are dealing with huge data load of 10,000 events/sec also we need the foreachRDD .. Please let us know if we are missing anything and if we can try any other RDD action inside foreachRDD
... View more
Labels:
03-06-2017
05:26 AM
We are new to spark looking for an equivalent action of an RDD similar to foreach which applies to each element
... View more
Labels:
03-03-2017
03:14 PM
We have a Spark streaming application which ingests data @10,000/ sec ...
We use the foreachRDD operation on our DStream( since spark doesn't execute unless it finds the output operation on DStream) so we have to use the foreachRDD output operation like this , it takes upto to 3 hours ...to write a singlebatch of data (10,000) which is slow requestsWithState is a DStream CodeSnippet 1: requestsWithState.foreachRDD { rdd =>
rdd.foreach {
case (topicsTableName, hashKeyTemp, attributeValueUpdate) => {
val client = new AmazonDynamoDBClient()
val request = new UpdateItemRequest(topicsTableName, hashKeyTemp, attributeValueUpdate)
try client.updateItem(request)
catch {
case se: Exception => println("Error executing updateItem!\nTable ", se)
}
}
case null =>
}
}
}
So i thought the code inside foreachRDD might be the problem so commented it out to see how much time it takes ....to my surprise ...even with nocode inside the foreachRDD it still run's for 3 hours CodeSnippet 2: requestsWithState.foreachRDD {
rdd => rdd.foreach {
// No code here still takes a lot of time ( there used to be code but removed it to see if it's any faster without code) //
}
} Please let us know if we are missing anything or an alternative way to do this as i understand without a output operation on DStream spark streaming application will not run .. at this time i can't use other output operations ... Note : To isolate the problem and make sure that dynamo code is not problem ...i ran with empty loop .....look's like foreachRDD is slow on it's own when iterating over a huge record set coming in @10,000/sec ...and not the dynamo code as empty foreachRDD and with dynamo code took the same time ...
... View more
Labels:
02-28-2017
07:02 PM
Thanks for your answer @Dan Zaratsian ..i implemented a solution similar to what you suggested but still facing performance issue could you please take a look at https://community.hortonworks.com/questions/85745/spark-writing-data-to-amazondynamodb-is-slow.html
... View more
02-26-2017
07:26 AM
1 Kudo
We are using spark for our streaming application , where we get data and write it to DynamoDB table but it is very slow .....we are consuming @10,000 msgs/sec ...but writing only @160/sec we are using val awssdk = "com.amazonaws" % "aws-java-sdk" % "1.11.60" this is the piece of code we are using to write to DynamoDB tempRequestsWithState is the Dstream
tempRequestsWithState.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val client = new AmazonDynamoDBClient(new BasicAWSCredentials(amazonAWSAccessKey, amazonAWSSecretKey))
client.setEndpoint(amazonDynamoDBEndpoint)
partitionOfRecords.foreach {
case (topicsTableName, hashKeyTemp, attributeValueUpdate) => {
val request = new UpdateItemRequest(topicsTableName, hashKeyTemp, attributeValueUpdate)
try client.updateItem(request)
catch {
case se: Exception => println("Error executing updateItem!\nTable ", se)
}
}
case null =>
}
}
}
We are following the best guidelines from spark documention as here [https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams] Please let us know if there is an alternative way to do this so that we can increase the write throughput . Appreciate your help in this regard
... View more
Labels:
02-24-2017
04:37 AM
1 Kudo
We have spark streaming job ..writing data to AmazonDynamoDB using foreachRDD but it is very slow with our consumption rate at 10,000/sec and writing 10,000 takes 35min ...this is the code piece From research learnt that using foreachpartition and creating a connection per partition will help ..but not sure how to go about writing code for it ..will greatly appreciate if someone can help with this ...Also any other suggestion to speed up writing is greatly appreciated tempRequestsWithState.foreachRDD { rdd =>
if ((rdd != null) && (rdd.count() > 0) && (!rdd.isEmpty()) ) {
rdd.foreachPartition {
case (topicsTableName, hashKeyTemp, attributeValueUpdate) => {
val client = new AmazonDynamoDBClient()
val request = new UpdateItemRequest(topicsTableName, hashKeyTemp, attributeValueUpdate)
try client.updateItem(request)
catch {
case se: Exception => println("Error executing updateItem!\nTable ", se)
}
}
case null =>
}
}
}
... View more
Labels:
02-24-2017
01:34 AM
@Bikas thanks a lot for your suggestion ! the link you provided doesn't seem to be working could you please provide some sample code of how to write foreachpartition as i'm not familiar with it
... View more
02-22-2017
06:24 AM
2 Kudos
We have a spark streaming application where we receive a dstream from kafka and need to store to dynamoDB ....i'm experimenting with two ways to do it as described in the code below requestsWithState is a Dstream Code Snippet 1 with foreachRDD: <code>requestsWithState.foreachRDD { rdd =>
println("Data being populated to Pulsar")
rdd.foreach { case (id, eventStream) =>
println("id is " + id + " Event is " + eventStream)
val dynamoConnection = setupDynamoClientConnection()
DBUtils.putItem(dynamoConnection, id, eventStream.toString())
}
}
Code Snippet 2 with map: <code> requestsWithState.map (rdd => { rdd match {
case (id, eventStream) => {
println("id is " + id + " Event is " + eventStream)
val dynamoConnection = setupDynamoClientConnection()
DBUtils.putItem(dynamoConnection, id, eventStream.toString())
}
}
})
requestsWithState.print(1)
Code Snippet1 work's fine and populates the database...the second code snippet doesn't work ....we would love to know the reason behind it and the way we can make it work ........the reason we are experimenting ( we know it's a transformation and foreachRdd is an action) is foreachRdd is very slow for our use case with heavy load on a cluster and we found that map is much faster if we can get it working.....please help us get map code working
... View more
Labels:
02-03-2017
03:03 AM
@Satish Bomma we are using apache hadoop 2.7.3 so will not be able to make the change you suggested
... View more
02-02-2017
10:38 PM
1 Kudo
we have a 5 node cluster managed by Yarn and running hadoop 1 Master name node 8 vcores and 24GB memory 4 data nodes each 8 vcores and 24GB memory When i look at Yarn configuration on the ui as highlighted in the picture below it's only using 16GB and 6 vcores Our application is using all of the 16gb so want to increase the memory since its available (24 - 2gb for os so avialable is 22gb) where do i need to configure this 22gb instead of 16gb ? From research found yarn-site.xml might be the place so went ahead and updated it and restarted yarn but still its showing 16gb Would grealty appreciate if any of the experts in the community will help out as we are new to Yarn
... View more
Labels:
02-02-2017
07:11 PM
@Geoffrey Shelton Okot we are not using Ambari or Cloudera Manager , we are using apache hadoop 2.7.3 all changes i'm making are through cli , i have added additional info to the question please let me know if any more info is required
... View more
02-02-2017
04:56 PM
@Divakar Annapureddy We are not using amabri for management ...we are using apache hadoop 2.7.3
... View more
02-02-2017
01:17 PM
We have an application managed by yarn when we change yarn-site.xml those changes are not reflected , application is still running with old configuration. We are new to Yarn any help in this regard will be helpful Note : we have already tried restarted yarn using stop-yarn.sh and start-yarn.sh also restared dfs using start-dfs.sh and stop-dfs.sh . We are using hadoop 2.7.3 this is what yarn looks with only max memory configured to 16GB as shown in the picture but actual configuration is 22GB as per yarn-site.xml this is the yarn-site.xml
<configuration>
<!-- Site specific YARN configuration properties -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hdfs-name-node</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>21528</value>
</property>
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>6</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>21528</value>
</property>
<property>
<name>yarn.nodemanager.local-dirs</name>
<value>file:///tmp/hadoop/data/nm-local-dir,file:///tmp/hadoop/data/nm-local-dir/filecache,file:///tmp/hadoop/data/nm-local-dir/usercache</value>
</property>
<property>
<name>yarn.nodemanager.localizer.cache.cleanup.interval-ms</name>
<value>500</value>
</property>
<property>
<name>yarn.nodemanager.localizer.cache.target-size-mb</name>
<value>512</value>
</property>
</configuration>
this is the node configuration 1 Master/Driver Node :
Memory :24GB Cores :8
4 Worker Nodes :
Memory :24GB Cores :8
... View more
Labels:
02-01-2017
07:59 PM
We are running a spark streaming application it has batches queued up ..but it's not using all the executors that were configured to it .. it's configured to use 24 executors but actually it's only using 16 and batches are getting queued up how can we make it use all the 24 executors and not let it queue batches up
... View more
Labels:
01-18-2017
06:43 PM
tried changing it still no effect
... View more
01-18-2017
06:07 PM
We are using hadoop 2.7.3 changed the hdfs-site.xml to point to new directory provided permissions on new directory too ...and ran start-dfs.sh and stop-dfs.sh ..on name node ...but changes are not taking effect it still points to the old directory ... Am I missing anything while doing the configuration changes? And how can we make sure to use the new directory?
... View more
Labels:
01-18-2017
05:28 PM
yes it exists ..
... View more
01-18-2017
05:27 PM
i ran this is the output
... View more
01-18-2017
03:54 PM
We have 4 datanode HDFS cluster ...there is large amount of space available on each data node of about 98gb ...but when i look at the datanode information .. it's only using about 10gb and running out of space ... How can we make it use all the 98gb and not run out of space as indicated in image this is the disk space configuration this is the hdfs-site.xml on name node <property>
<name>dfs.name.dir</name>
<value>/test/hadoop/hadoopinfra/hdfs/namenode</value>
</property>
this is the hdfs-site.xml under data node <property>
<name>dfs.data.dir</name>
<value>/test/hadoop/hadoopinfra/hdfs/datanode</value>
</property>
Eventhough /test has 98GB and hdfs is configured to use it it's not using it Am I missing anything while doing the configuration changes? And how can we make sure 98GB is used?
... View more
Labels:
01-18-2017
02:43 PM
Thanks for the comment Yukti...i have updated the question with hdfs-site.xml and disk space distribution please take a look and let us know if we are missing anything ....the scheduler is Yarn
... View more
01-18-2017
02:42 PM
Thanks for the comment Geoffrey...i have updated the question with hdfs-site.xml and disk space distribution please take a look and let us know if we are missing anything.... these are VM's
... View more
01-18-2017
02:41 PM
Thanks for the comment Hardik ...i have updated the question with hdfs-site.xml and disk space distribution please take a look and let us know if we are missing anything
... View more
01-17-2017
11:01 PM
We have 4 datanode HDFS cluster ...there is large amount of space avialable on each data node of about 98gb ...but when i look at the datanode information .. it's only using about 10gb ... How can we make it use all the 98gb and not run out of space as indicated in image this is the hdfs-site.xml on name node <configuration>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.name.dir</name>
<value>file:///test/hadoop/hadoopinfra/hdfs/namenode</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>file:///tmp/hadoop/data</value>
</property>
<property>
<name>dfs.datanode.du.reserved</name>
<value>2368709120</value>
</property>
<property>
<name>dfs.datanode.fsdataset.volume.choosing.policy</name>
<value>org.apache.hadoop.hdfs.server.datanode.fsdataset.AvailableSpaceVolumeChoosingPolicy</value>
</property>
<property>
<name>dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction</name>
<value>1.0</value>
</property>
</configuration>
this is the hdfs-site.xml under data node <configuration>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>file:///test/hadoop/hadoopinfra/hdfs/datanode</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>file:///tmp/hadoop/data</value>
</property>
<property>
<name>dfs.datanode.du.reserved</name>
<value>2368709120</value>
</property>
<property>
<name>dfs.datanode.fsdataset.volume.choosing.policy</name>
<value>org.apache.hadoop.hdfs.server.datanode.fsdataset.AvailableSpaceVolumeChoosingPolicy</value>
</property>
<property>
<name>dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction</name>
<value>1.0</value>
</property>
</configuration> the 98gb is under /test
... View more
Labels:
01-14-2017
09:49 PM
We have a spark streaming job which reads data from kafka running on a 4 node cluster that uses a checkpoint directory on HDFS ....we had an I/O error where we ran out of the space and we had to go in and delete a few hdfs folders to free up some space and now we have bigger disks mounted ....and want to restart cleanly no need to preserve checkpoint data or kafka offset.....getting the error .. Application application_1482342493553_0077 failed 2 times due to AM Container for appattempt_1482342493553_0077_000002 exited with exitCode: -1000
For more detailed output, check application tracking page:http://hdfs-name-node:8088/cluster/app/application_1482342493553_0077Then, click on links to logs of each attempt.
Diagnostics: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-1266542908-96.118.179.119-1479844615420:blk_1073795938_55173 file=/user/hadoopuser/streaming_2.10-1.0.0-SNAPSHOT.jar
Failing this attempt. Failing the application.
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1484420770001
final status: FAILED
tracking URL: http://hdfs-name-node:8088/cluster/app/application_1482342493553_0077
user: hadoopuser
From the error what i can make out is it's still looking for old hdfs blocks which we deleted ... From research found that ..changing check point directory will help tried changing it and pointing to a new directory ...but still it's not helping to restart spark on clean slate ..it's still giving the same block exception ...Are we missing anything while doing the configuration changes? And how can we make sure that spark is started on a clean slate ?
... View more
Labels:
01-11-2017
08:13 PM
yes i updated yarn-site.xml on all nodes
... View more
01-11-2017
06:53 PM
Thanks a lot for your answer Bill .......on master node i ran stop-yarn.sh ..so do u suggest running the same on worker nodes or is there a way i'm missing please help
... View more
01-11-2017
03:10 PM
We are trying to locate yarn-default.xml location in hadoop-2.7.3 could someone please point to where it may be located ......i'm able to find yarn-site.xml but not yarn-default.xml would greatly appreciate if someone could point out where it could be located.
... View more
Labels: