Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Spark transformation becomes very slow at times.

avatar
Contributor

Hi,

 

We have a small 6 node cluster with 3 masters (2 HA and 1 with CM services) and 3 data nodes. All nodes have 16 cores and 128GB RAM. Each data node have 6 SSD disks with 2TB each for HDFS, so 12TB per node and 36TB in total.

 

Currently in use is half of the HDFS space (18TB) and we also ingest around 100GB per day with Flume.  

There is also a Spark transformation that operates on the newly ingested data on a schedule through Oozie, after which the data is stored into Impala tables.

Lastly there are clients that freuently queiry Impala for this data.

 

The problem is that the cluster speed is changing during the day, which means that at times the Spark transformation finishes in minutes (5 to 10) but at other times it takes between 20-60 minutes, which is a problem for us. 

 

We've done tests and saw that the same Spark tranformation finishes in different times even when ran on the same amount of data. We also don't see heavy load on CPU/Mem/DISK IO and Network which would have indicated a possible bottleneck.

 

There was also performance tuning done on the cluster which speed up the overal Spark transformation from 20 mins initially to around 5. This included adjusting blocksize, Flume file size, HDFS balancer settings, memory heaps and yarn limits. However as said the problem is that the speed is not constant but varies during the different executions.

 

Could it be that we have hit the speed limits given the amount of data we have?

It our cluster too small to handle such data or is Hadoop just meant to be slow?

Why is the transformation taking longer time, when we are not seeing any saturation of system resources (i.e. hardware is not fully utilized)? 

 

There aren't any errors in Cloudera Manager and we are also running CDH 5.10.1-1.cdh5.10.1.p0.10 express.

 

1 ACCEPTED SOLUTION

avatar
Contributor

So we shutdown the cluster in order to upgrade the network cards to 10Gbit on all machines. It worked flawlessly after the cluster came back up again. The same transformation is now taking around 5 mins every time and the load on the cluster is mild.

 

Not sure if the mismatch in network speeds between the nodes or some sort of a lumbo state that the cluster was in was the problem. Maybe it was even a combination of those two. Now it works as expected. Thanks for the help @mbigelow

View solution in original post

7 REPLIES 7

avatar
Champion

Have you examined the job metrics in depth?  Where is it spending more time?  On a specific stage? All stages?  I know you mentioned it operating on the same amount of data.  It can still end up shuffling more data around.  Is it?  Is it spending more time in GC?  That should help narrow down where on the cluster you should be looking.

 

You just call it a Spark transformation but what is done; just the high level work down during each stage.  What services does it interact with. i.e. is it just reading and writing to HDFS?

avatar
Contributor
We have done our best to avoid shuffling.
Following are the actions we have in spark:
1. read some impala tables and create scala maps
2. read files from hdfs, apply maps and create dataframe
3. cache the dataframe
4. filter out invalid data and write to hive metastore
5. cache the validated dataframe
6. transform and write the data into multiple hive tables

When the job runs slow, usually all the actions are slow.
Some times only hdfs read is slow.
At other times only the first steps after caching are too slow.

avatar
Contributor

We assume two reasons for the jobs being slow: 

1: HDFS write being slow (we got the errors posted below).

 

2: The spark executor are lost from the driver due HEARTBEAT timeouts.

 

What could be the reason for these errors?

Aöso note that due to network upgrades we currently have 10Gbit between two of the nodes, but only 1Gbit between the rest. Could this be playing any role?

 

2017-05-27 18:44:00,650 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: Slow BlockReceiver write packet to mirror took 718ms (threshold=300ms)
2017-05-27 18:44:01,788 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: Slow BlockReceiver write packet to mirror took 955ms (threshold=300ms)
2017-05-27 18:44:02,134 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: Slow BlockReceiver write packet to mirror took 311ms (threshold=300ms)
2017-05-27 18:44:02,468 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: Slow BlockReceiver write packet to mirror took 317ms (threshold=300ms)
2017-05-27 18:44:02,784 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: Slow BlockReceiver write packet to mirror took 310ms (threshold=300ms)
2017-05-27 18:44:09,732 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: Slow BlockReceiver write data to disk cost:5551ms (threshold=300ms)
2017-05-27 18:44:10,871 INFO org.apache.hadoop.util.JvmPauseMonitor: Detected pause in JVM or host machine (eg GC): pause of approximately 3118ms
No GCs detected
2017-05-27 18:44:14,690 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: Slow PacketResponder send ack to upstream took 1024ms (threshold=300ms), PacketResponder: BP-144179543-ip-1487684967444:blk_1074786991_1047037, type=HAS_DOWNSTREAM_IN_PIPELINE, replyAck=seqno: 1857 reply: 0 reply: 0 reply: 0 downstreamAckTimeNanos: 4225727141
2017-05-27 18:44:15,691 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: Slow BlockReceiver write data to disk cost:327ms (threshold=300ms)
2017-05-27 18:44:22,140 INFO org.apache.hadoop.util.JvmPauseMonitor: Detected pause in JVM or host machine (eg GC): pause of approximately 6275ms
No GCs detected
2017-05-27 18:44:23,727 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: Slow BlockReceiver write data to disk cost:305ms (threshold=300ms)
2017-05-27 18:44:24,973 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: Slow BlockReceiver write data to disk cost:538ms (threshold=300ms)
2017-05-27 18:44:25,331 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: Slow BlockReceiver write data to disk cost:344ms (threshold=300ms)
2017-05-27 18:44:28,210 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: Slow BlockReceiver write data to disk cost:2215ms (threshold=300ms)
2017-05-27 18:44:29,108 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: Slow BlockReceiver write data to disk cost:378ms (threshold=300ms)
2017-05-27 18:44:29,396 INFO org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService: Scheduling blk_1074786998_1047044 file /d
2017-05-27 05:10:22,247 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Exception for BP-144179543-ip-1487684967444:blk_1074767297_1027324 java.io.IOException: Premature EOF from inputStream at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:201) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:500) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:896) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:808) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:169) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:106) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:246) at java.lang.Thread.run(Thread.java:745) 2017-05-27 05:10:24,865 INFO org.apache.hadoop.util.JvmPauseMonitor: Detected pause in JVM or host machine (eg GC): pause of approximately 1117ms No GCs detected 2017-05-27 05:10:22,966 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: Slow BlockReceiver write data to disk cost:322ms (threshold=300ms)

 

 

 

 

 

avatar
Champion
I'll look at this some more later, but it at least seems like your DN heap may be too low for the workload. What is it set to? How many total blocks do you have on the cluster? How many blocks worth of data are touched by this job on both the read and write side?

You also could just try increasing the heap to see if the GC pauses disappear and if the performance stabilizes. Those pauses could be the source as they are 3, 6, and 1 second pauses were nothing else was occurring. That isn't all of the difference but it does indicate that HDFS performance was degraded overall.

avatar
New Contributor

Datanode java heap is set to : 1.5 GB

We have 59686 blocks on cluster.

The spark script is doing an ETL operation on a constant ingestion from Flume.

So the input is allways varying, let us say 4 to 6GB per hour. And we have a block = 256MB.

Thanks for your advice on Heap size. We will try that out.

 

But, we have another finding on which a coment will be appreciated.

Out of 3 datanodes, only one is showing more CPU load.

All the slow performance is caused by this one slow node. 

How do you insist we should tackle this behaviour?

 

avatar
Champion
What process is using up more CPU on the one node? Is it the DN, NM, or YARN containers?

I think it is likely a imbalance of data or blocks that is causing it. That would cause more containers to run on that node compared to the rest.

The hdfs dfsadmin -report will give you the info to determine if it balanced or not on the data front. It is escaping me right now the best way to check for the block distribution (I think CM has a graph some where).

avatar
Contributor

So we shutdown the cluster in order to upgrade the network cards to 10Gbit on all machines. It worked flawlessly after the cluster came back up again. The same transformation is now taking around 5 mins every time and the load on the cluster is mild.

 

Not sure if the mismatch in network speeds between the nodes or some sort of a lumbo state that the cluster was in was the problem. Maybe it was even a combination of those two. Now it works as expected. Thanks for the help @mbigelow