Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

In pyspark dataframe.write.json() adds extra empty line at the end.Is is good idea to write more records using in df.write.json

avatar
New Contributor

We have a dataframe with 5 columns with nearly 1.5 Billion records.

We are need to write that a single line (single records) as json.

We are facing two issues

df.write.format('json') is writing all as single line or single record but there is second line (empty line is coming) we need to avoid it
df.write.format('json').save('somedir_in_HDFS') is giving error
We want the save the file as single file so that the downstream application will read it

Here is the sample code.

 

from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import *


schema = StructType([
StructField("author", StringType(), False),
StructField("title", StringType(), False),
StructField("pages", IntegerType(), False),
StructField("email", StringType(), False)
])

# likewise we have 1.5 billion records
data = [
["author1", "title1", 1, "author1@gmail.com"],
["author2", "title2", 2, "author2@gmail.com"],
["author3", "title3", 3, "author3@gmail.com"],
["author4", "title4", 4, "author4@gmail.com"]
]

 


if __name__ == "__main__":
spark=SparkSession.builder.appName("Test").enableHiveSupport().getOrCreate()
df = spark.createDataFrame(data, schema)
dfprocessed=df #here we are doing lots of joins with other tables
dfprocessed = dfprocessed.agg(
f.collect_list(f.struct(f.col('author'), f.col('title'), f.col('pages'), f.col('email'))).alias("list-item"))
dfprocessed = dfprocessed.withColumn("version", f.lit("1.0").cast(IntegerType()))
dfprocessed.printSchema()
dfprocessed.write.format("json").mode("overwrite").option("escape", "").save('./TestJson')
#the above write adds one extra empty line

---------------------------------
spark-submit --conf spark.port.maxRetries=100 --master yarn --deploy-mode cluster
--executor memory 16g --executor-core 4 --driver-memory 16g
--conf spark.driver.maxResultSize=10g
--conf spark.dynamicAllocation.enabled=true
--cong spark.dynamicAllocation.maxExecutor=30
--conf spark.dynamicAllocation.minExecutor=15
--code.py
---------------------------------
"Error occurred in save_daily_report : An error occurred while calling o149.save.
:org.apache.spark.sparkException:Job aborted
at org.apache.spark.sql.execution.datasource.FileFormatWriter$.write(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasource.InsertIntoHadoopFSRelationCommand.run(InsertIntoHadoopFSRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(command.scala:104)
...
..
..
caused by org.apache.spark.sparkException:Job aborted due to stage Failure:Task 26 in stage 211.0 failed 4 times,most recent
failure:Lost task 26.3 in stage 211.0
ExecutorLostFailure(executor xxx exited caused by one the the running task)Reason:Container maked as failed
Container exited with a non-zero exit code 143
Killed by external signal

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failedJobAndIndependent
Stages (DAGScheduler.scala:1890)

------------------

1 REPLY 1

avatar
Expert Contributor

Make sure dfprocessed datafrmae doesn't contains any empty rows. 

In Spark, you can identify and filter out empty rows in a DataFrame using the filter operation. Empty rows typically have null or empty values across all columns.

 

// Identify and filter out empty rows
val nonEmptyRowsDF = df.filter(not(df.columns.map(col(_).isNull).reduce(_ || _)))

 


This code uses the filter operation along with the not function and a condition that checks if any column in a row is null. It then removes rows where all columns are null or empty.


If you want to check for emptiness based on specific columns, you can specify those columns in the condition:

 

val columnsToCheck = Array("column1", "column2", "column3")
val nonEmptyRowsDF = df.filter(not(columnsToCheck.map(col(_).isNull).reduce(_ || _)))

 

Adjust the column names based on your DataFrame structure. The resulting nonEmptyRowsDF will contain rows that do not have null or empty values in the specified columns.