Created 02-04-2022 10:41 AM
I have a Spark v2.4.5 Spark-Streaming application running on CDP 7.1.7 that listens to relatively small JSON messages from a Kafka topic, de-serializes them, and then writes to an appropriate unpartitioned Hive table with parquet records. This issue also occurred with CDH 5.16 and CDH 6.3.4.
The code looks similar to this:
def processStream(stream: DStream[ConsumerRecord[String, KafkaMessageWrapper]]): Unit = { stream.foreachRDD(rdd => { // Peek at the message to extract the event type and raw payload val rowRdd = rdd.flatMap(MessageProcessor.processRecord) val rootDF: DataFrame = rowRdd .toDF("topic", "partition", "offset", "eventName", "version", "payload") rootDF.createOrReplaceTempView("data") // Process only the USER records spark.sql(s"select eventName, version, payload from data where eventName = 'USER_CREATE'") .na.drop() // Map the records to the expected schema
// This flatten the incoming JSON and does some validation / transformations .withColumn("parsed", processUserUDF($"payload")) .select("parsed.*") .withColumn("batch_date", lit(BATCH_DATE.format(OffsetDateTime.now(ZoneOffset.UTC)))) .write .mode(SaveMode.Append) .format("hive") .saveAsTable(s"schema_name.users") // Omitted kafka offset commit }) }
Data collects in the destination as expected and then we regularly compact the written parquets as the bursty nature of the topic lends itself to relatively small files.
As the job accumulates data in the destination, about once an hour, processing stalls. The additional processing time is not accounted for in the Spark UI / History. All stages / tasks finish and then the job hangs. It seems to grow linearly with the number of files in the destination directories (hence the compaction), however it also occurs with no-op batches. There are many batches where all records are discarded as they aren't relevant.
For example, I have a job that reported a total Op Duration of 5.5 min for handling 120 records that were ultimately discarded, but the sum of all stages is 4 sec. Subsequent jobs are stalled and report scheduling delays. There is no resource contention and none of the containers report any excessive garbage collection / IO tasks.
I suspect the driver is performing some task related to the destination hive tables (caching HDFS path leaf nodes, cleaning up lineage?) not accounted for in the UI, but I can't find any logs that correlate to anything specific.
Created 02-07-2022 11:14 AM
Hi @CRowlett ,
As I understand you observe that within a single Spark streaming application you see that the individual Spark jobs are quickly running, however there are some unaccounted delays between those.
Whenever you see such symptoms, then you need to check what the Spark Driver is doing. The driver may not log every operation it's doing, however an increased - DEBUG level logging may help you understand that.
In general you can expect the driver to do additionally (after the Spark executors finish their jobs):
- committing files - after new inserts / save operations
- refreshing the HDFS file list / file details of the table into which data has been inserted/saved.
- alter the table - Hive Metastore table partitions if the table is partitioned, or updating statistics
The first two of the above involves HDFS NameNode communication, the third involves HMS communication.
To troubleshoot further either enable DEBUG level logging, or collect "jstacks" from the Spark driver. The Jstack is less intrusive, as you do not need to modify any job configuration. From there you will be able to capture what the driver was doing when it was in "hung" state.
Based on your description, it seems to me that the "too many files" / "small files" are causing the delays, as Spark driver has to refresh the file listing after inserts to make sure it still has the latest "catalog" information about the table - to be able to reliably continue working with the table.
For Spark Streaming / "hot data" you may want to consider to save your data into HBase or to Kudu instead, they are more suitable for those use cases.
Best regards
Miklos
Created 02-07-2022 11:14 AM
Hi @CRowlett ,
As I understand you observe that within a single Spark streaming application you see that the individual Spark jobs are quickly running, however there are some unaccounted delays between those.
Whenever you see such symptoms, then you need to check what the Spark Driver is doing. The driver may not log every operation it's doing, however an increased - DEBUG level logging may help you understand that.
In general you can expect the driver to do additionally (after the Spark executors finish their jobs):
- committing files - after new inserts / save operations
- refreshing the HDFS file list / file details of the table into which data has been inserted/saved.
- alter the table - Hive Metastore table partitions if the table is partitioned, or updating statistics
The first two of the above involves HDFS NameNode communication, the third involves HMS communication.
To troubleshoot further either enable DEBUG level logging, or collect "jstacks" from the Spark driver. The Jstack is less intrusive, as you do not need to modify any job configuration. From there you will be able to capture what the driver was doing when it was in "hung" state.
Based on your description, it seems to me that the "too many files" / "small files" are causing the delays, as Spark driver has to refresh the file listing after inserts to make sure it still has the latest "catalog" information about the table - to be able to reliably continue working with the table.
For Spark Streaming / "hot data" you may want to consider to save your data into HBase or to Kudu instead, they are more suitable for those use cases.
Best regards
Miklos
Created 02-07-2022 06:27 PM
Hi @mszurap ,
Thanks for taking the time to respond. I will explore both the jstacks and debug logging on the driver to obtain some more information. I believe you are correct as there seems to be a linear correlation between the number of files in the destinations and the pause duration. I am thrown off by the interval of the pause, as it is rather rhythmic, occurring about once an hour. I'd imagined that some cache expires, resulting in some refresh of the HDFS file list, as you describe. I wonder if there's a way to tune this as I do not perform any reads from the table, only writes.
RE:
- committing files - after new inserts / save operations
- refreshing the HDFS file list / file details of the table into which data has been
Do you have any general information about the types of tasks that the driver performs after a job completes?
I would agree that HBase / Kudu would likely be better suited. Perhaps even adding a partition or writing parquets directly without using the hive format would help alleviate the issue.
Will update shortly after gathering more information.
Created 02-08-2022 01:00 AM
Hi,
Do you have any general information about the types of tasks that the driver performs after a job completes?
I do not have a comprehensive list of such tasks, this is just what we usually observe through cases and slowness reports.
Of course there may be completely different tasks what the driver does - any custom spark code which does not involve parallel execution / data processing might run on the Spark driver side only, for example connecting to an external system through JDBC, or doing some computation (not with RDDs or DataFrames).