Support Questions

Find answers, ask questions, and share your expertise

In pyspark dataframe.write.json() adds extra empty line at the end.Is is good idea to write more records using in df.write.json

New Contributor

We have a dataframe with 5 columns with nearly 1.5 Billion records.

We are need to write that a single line (single records) as json.

We are facing two issues

df.write.format('json') is writing all as single line or single record but there is second line (empty line is coming) we need to avoid it
df.write.format('json').save('somedir_in_HDFS') is giving error
We want the save the file as single file so that the downstream application will read it

Here is the sample code.

 

from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import *


schema = StructType([
StructField("author", StringType(), False),
StructField("title", StringType(), False),
StructField("pages", IntegerType(), False),
StructField("email", StringType(), False)
])

# likewise we have 1.5 billion records
data = [
["author1", "title1", 1, "author1@gmail.com"],
["author2", "title2", 2, "author2@gmail.com"],
["author3", "title3", 3, "author3@gmail.com"],
["author4", "title4", 4, "author4@gmail.com"]
]

 


if __name__ == "__main__":
spark=SparkSession.builder.appName("Test").enableHiveSupport().getOrCreate()
df = spark.createDataFrame(data, schema)
dfprocessed=df #here we are doing lots of joins with other tables
dfprocessed = dfprocessed.agg(
f.collect_list(f.struct(f.col('author'), f.col('title'), f.col('pages'), f.col('email'))).alias("list-item"))
dfprocessed = dfprocessed.withColumn("version", f.lit("1.0").cast(IntegerType()))
dfprocessed.printSchema()
dfprocessed.write.format("json").mode("overwrite").option("escape", "").save('./TestJson')
#the above write adds one extra empty line

---------------------------------
spark-submit --conf spark.port.maxRetries=100 --master yarn --deploy-mode cluster
--executor memory 16g --executor-core 4 --driver-memory 16g
--conf spark.driver.maxResultSize=10g
--conf spark.dynamicAllocation.enabled=true
--cong spark.dynamicAllocation.maxExecutor=30
--conf spark.dynamicAllocation.minExecutor=15
--code.py
---------------------------------
"Error occurred in save_daily_report : An error occurred while calling o149.save.
:org.apache.spark.sparkException:Job aborted
at org.apache.spark.sql.execution.datasource.FileFormatWriter$.write(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasource.InsertIntoHadoopFSRelationCommand.run(InsertIntoHadoopFSRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(command.scala:104)
...
..
..
caused by org.apache.spark.sparkException:Job aborted due to stage Failure:Task 26 in stage 211.0 failed 4 times,most recent
failure:Lost task 26.3 in stage 211.0
ExecutorLostFailure(executor xxx exited caused by one the the running task)Reason:Container maked as failed
Container exited with a non-zero exit code 143
Killed by external signal

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failedJobAndIndependent
Stages (DAGScheduler.scala:1890)

------------------

0 REPLIES 0