Member since
02-04-2022
2
Posts
0
Kudos Received
0
Solutions
02-07-2022
06:27 PM
Hi @mszurap , Thanks for taking the time to respond. I will explore both the jstacks and debug logging on the driver to obtain some more information. I believe you are correct as there seems to be a linear correlation between the number of files in the destinations and the pause duration. I am thrown off by the interval of the pause, as it is rather rhythmic, occurring about once an hour. I'd imagined that some cache expires, resulting in some refresh of the HDFS file list, as you describe. I wonder if there's a way to tune this as I do not perform any reads from the table, only writes. RE: - committing files - after new inserts / save operations - refreshing the HDFS file list / file details of the table into which data has been Do you have any general information about the types of tasks that the driver performs after a job completes? I would agree that HBase / Kudu would likely be better suited. Perhaps even adding a partition or writing parquets directly without using the hive format would help alleviate the issue. Will update shortly after gathering more information.
... View more
02-04-2022
10:41 AM
I have a Spark v2.4.5 Spark-Streaming application running on CDP 7.1.7 that listens to relatively small JSON messages from a Kafka topic, de-serializes them, and then writes to an appropriate unpartitioned Hive table with parquet records. This issue also occurred with CDH 5.16 and CDH 6.3.4. The code looks similar to this: def processStream(stream: DStream[ConsumerRecord[String, KafkaMessageWrapper]]): Unit = {
stream.foreachRDD(rdd => {
// Peek at the message to extract the event type and raw payload
val rowRdd = rdd.flatMap(MessageProcessor.processRecord)
val rootDF: DataFrame = rowRdd
.toDF("topic", "partition", "offset", "eventName", "version", "payload")
rootDF.createOrReplaceTempView("data")
// Process only the USER records
spark.sql(s"select eventName, version, payload from data where eventName = 'USER_CREATE'")
.na.drop()
// Map the records to the expected schema // This flatten the incoming JSON and does some validation / transformations
.withColumn("parsed", processUserUDF($"payload"))
.select("parsed.*")
.withColumn("batch_date", lit(BATCH_DATE.format(OffsetDateTime.now(ZoneOffset.UTC))))
.write
.mode(SaveMode.Append)
.format("hive")
.saveAsTable(s"schema_name.users")
// Omitted kafka offset commit
})
} Data collects in the destination as expected and then we regularly compact the written parquets as the bursty nature of the topic lends itself to relatively small files. As the job accumulates data in the destination, about once an hour, processing stalls. The additional processing time is not accounted for in the Spark UI / History. All stages / tasks finish and then the job hangs. It seems to grow linearly with the number of files in the destination directories (hence the compaction), however it also occurs with no-op batches. There are many batches where all records are discarded as they aren't relevant. For example, I have a job that reported a total Op Duration of 5.5 min for handling 120 records that were ultimately discarded, but the sum of all stages is 4 sec. Subsequent jobs are stalled and report scheduling delays. There is no resource contention and none of the containers report any excessive garbage collection / IO tasks. I suspect the driver is performing some task related to the destination hive tables (caching HDFS path leaf nodes, cleaning up lineage?) not accounted for in the UI, but I can't find any logs that correlate to anything specific.
... View more
Labels:
- Labels:
-
Apache Spark