Created 10-30-2022 07:56 AM
We have a dataframe with 5 columns with nearly 1.5 Billion records.
We are need to write that a single line (single records) as json.
We are facing two issues
df.write.format('json') is writing all as single line or single record but there is second line (empty line is coming) we need to avoid it
df.write.format('json').save('somedir_in_HDFS') is giving error
We want the save the file as single file so that the downstream application will read it
Here is the sample code.
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import *
schema = StructType([
StructField("author", StringType(), False),
StructField("title", StringType(), False),
StructField("pages", IntegerType(), False),
StructField("email", StringType(), False)
])
# likewise we have 1.5 billion records
data = [
["author1", "title1", 1, "author1@gmail.com"],
["author2", "title2", 2, "author2@gmail.com"],
["author3", "title3", 3, "author3@gmail.com"],
["author4", "title4", 4, "author4@gmail.com"]
]
if __name__ == "__main__":
spark=SparkSession.builder.appName("Test").enableHiveSupport().getOrCreate()
df = spark.createDataFrame(data, schema)
dfprocessed=df #here we are doing lots of joins with other tables
dfprocessed = dfprocessed.agg(
f.collect_list(f.struct(f.col('author'), f.col('title'), f.col('pages'), f.col('email'))).alias("list-item"))
dfprocessed = dfprocessed.withColumn("version", f.lit("1.0").cast(IntegerType()))
dfprocessed.printSchema()
dfprocessed.write.format("json").mode("overwrite").option("escape", "").save('./TestJson')
#the above write adds one extra empty line
---------------------------------
spark-submit --conf spark.port.maxRetries=100 --master yarn --deploy-mode cluster
--executor memory 16g --executor-core 4 --driver-memory 16g
--conf spark.driver.maxResultSize=10g
--conf spark.dynamicAllocation.enabled=true
--cong spark.dynamicAllocation.maxExecutor=30
--conf spark.dynamicAllocation.minExecutor=15
--code.py
---------------------------------
"Error occurred in save_daily_report : An error occurred while calling o149.save.
:org.apache.spark.sparkException:Job aborted
at org.apache.spark.sql.execution.datasource.FileFormatWriter$.write(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasource.InsertIntoHadoopFSRelationCommand.run(InsertIntoHadoopFSRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(command.scala:104)
...
..
..
caused by org.apache.spark.sparkException:Job aborted due to stage Failure:Task 26 in stage 211.0 failed 4 times,most recent
failure:Lost task 26.3 in stage 211.0
ExecutorLostFailure(executor xxx exited caused by one the the running task)Reason:Container maked as failed
Container exited with a non-zero exit code 143
Killed by external signal
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failedJobAndIndependent
Stages (DAGScheduler.scala:1890)
------------------
Created 02-09-2024 04:05 AM
Make sure dfprocessed datafrmae doesn't contains any empty rows.
In Spark, you can identify and filter out empty rows in a DataFrame using the filter operation. Empty rows typically have null or empty values across all columns.
// Identify and filter out empty rows
val nonEmptyRowsDF = df.filter(not(df.columns.map(col(_).isNull).reduce(_ || _)))
This code uses the filter operation along with the not function and a condition that checks if any column in a row is null. It then removes rows where all columns are null or empty.
If you want to check for emptiness based on specific columns, you can specify those columns in the condition:
val columnsToCheck = Array("column1", "column2", "column3")
val nonEmptyRowsDF = df.filter(not(columnsToCheck.map(col(_).isNull).reduce(_ || _)))
Adjust the column names based on your DataFrame structure. The resulting nonEmptyRowsDF will contain rows that do not have null or empty values in the specified columns.