I appreciate this may be hard to answer for my specific use case but any information on how to go about solving this problem or experience with a similar situation would be greatly appreciated...
I have a Spark Streaming app which reads data from Kafka, 3 streams, each with 5 min batch intervals. The data is processed and written to both Elasticsearch + HDFS. I am deploying my app in Yarn cluster mode.
If I run the app with no messages on Kafka (i.e. streaming micro-batches of 0 events), the time taken to process each batch slowly but steadily increases - even when there are 0 events being processed in the micro-batches.
Below is an example of the Spark Web UI when I run the app:
As you can see the processing time is gradually increasing over time. I would expect the processing time to stay relatively flat considering there is no data being processed in any of the micro-batches.
I've tried comparing 2 different batches via the Spark UI as can be seen below:
The duration columns show the time taken to process each of the 3 streams. It is clear that in the second batch the durations have increased (5s -> 13s, 7s -> 13s, 4s -> 9s). What is not clear, however, is what tasks are increasing in time. The job times do increase slightly but not enough to justify the duration increase from 5 seconds to 13 seconds for example.
So how do I go about finding out what exactly is causing the total processing time of my streaming app to increase? I am currently attempting some JVM profiling to see if there is any obvious memory leaks or long GC pauses that may be causing the increased processing time.
Any information on how to go about solving such an issue would be greatly appreciated.
We're running on Spark v1.6.0 and using direct streams. One issue we have recently noticed is when writing partitioned data in append mode to HDFS in parquet format, our streaming app slows down over time as more and more partitions are created. Simple fix was to write to a different directory in HDFS. This may not explain the above issue however as there was no data going through the app at the time.
There's checkpointing to consider too: a checkpoint saves all the state as serialized objects into the FS. The more state you have to save, the longer the checkpoint time.
IT should be relatively straightforward to see if this is a problem by increasing the checkpoint interval and see if this has any effect
I am currently facing a very similar issue with Spark Streaming Application in Yarn Cluster Mode which is Long Running. I have observed that the processing times slowly increase over time irrespective of the events published. I am also trying to do some JVM profiling to understand if the GC needs to be changed or if memory is the problem.
Specifically do you have an insight on, if there are some old objects/metadata which gets retained and accumulated over time which leads to higher processing times.
Any pointers on how to profile Spark Streaming Applications for this purpose? Any tools you suggest would help. Appreciate the response.
Hi @Prerna Rao.
I'm not sure what your processing pipeline looks like but in my case I was writing parquet files to HDFS. A summary metadata file was written to HDFS after each write from Spark which was increasing in size and slowing down my long-running streaming app.
The fix for me was to disable the writing of this metadata file e.g.
Thanks for the response @Forbes Ingram . My processing pipeline is the Spark Streaming Applications read data from Kafka via Dstream and write to cassandra post processing. We tried using tools like Java Flight Recording and Flame Graphs to analyze CPU time and we used the Spark Metrics CSV Sink to understand if there are spikes in the memory.
We did eventually notice that most of our executor nodes had spinning disks for storage and we figured that might have been the cause for delays. Although we use HDFS only for checkpointing and no other Write Operation.