<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question In pyspark dataframe.write.json() adds extra empty line at the end.Is is good idea to write more records using in df.write.json in Support Questions</title>
    <link>https://community.cloudera.com/t5/Support-Questions/In-pyspark-dataframe-write-json-adds-extra-empty-line-at-the/m-p/356466#M237297</link>
    <description>&lt;P&gt;We have a dataframe with 5 columns with nearly 1.5 Billion records.&lt;/P&gt;&lt;P&gt;We are need to write that a single line (single records) as json.&lt;/P&gt;&lt;P&gt;We are facing two issues&lt;/P&gt;&lt;P&gt;df.write.format('json') is writing all as single line or single record but there is second line (empty line is coming) we need to avoid it&lt;BR /&gt;df.write.format('json').save('somedir_in_HDFS') is giving error&lt;BR /&gt;We want the save the file as single file so that the downstream application will read it&lt;/P&gt;&lt;P&gt;Here is the sample code.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;from pyspark.sql import SparkSession&lt;BR /&gt;import pyspark.sql.functions as f&lt;BR /&gt;from pyspark.sql.types import *&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;schema = StructType([&lt;BR /&gt;StructField("author", StringType(), False),&lt;BR /&gt;StructField("title", StringType(), False),&lt;BR /&gt;StructField("pages", IntegerType(), False),&lt;BR /&gt;StructField("email", StringType(), False)&lt;BR /&gt;])&lt;/P&gt;&lt;P&gt;# likewise we have 1.5 billion records&lt;BR /&gt;data = [&lt;BR /&gt;["author1", "title1", 1, "author1@gmail.com"],&lt;BR /&gt;["author2", "title2", 2, "author2@gmail.com"],&lt;BR /&gt;["author3", "title3", 3, "author3@gmail.com"],&lt;BR /&gt;["author4", "title4", 4, "author4@gmail.com"]&lt;BR /&gt;]&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;if __name__ == "__main__":&lt;BR /&gt;spark=SparkSession.builder.appName("Test").enableHiveSupport().getOrCreate()&lt;BR /&gt;df = spark.createDataFrame(data, schema)&lt;BR /&gt;dfprocessed=df #here we are doing lots of joins with other tables&lt;BR /&gt;dfprocessed = dfprocessed.agg(&lt;BR /&gt;f.collect_list(f.struct(f.col('author'), f.col('title'), f.col('pages'), f.col('email'))).alias("list-item"))&lt;BR /&gt;dfprocessed = dfprocessed.withColumn("version", f.lit("1.0").cast(IntegerType()))&lt;BR /&gt;dfprocessed.printSchema()&lt;BR /&gt;dfprocessed.write.format("json").mode("overwrite").option("escape", "").save('./TestJson')&lt;BR /&gt;#the above write adds one extra empty line&lt;/P&gt;&lt;P&gt;---------------------------------&lt;BR /&gt;spark-submit --conf spark.port.maxRetries=100 --master yarn --deploy-mode cluster&lt;BR /&gt;--executor memory 16g --executor-core 4 --driver-memory 16g&lt;BR /&gt;--conf spark.driver.maxResultSize=10g&lt;BR /&gt;--conf spark.dynamicAllocation.enabled=true&lt;BR /&gt;--cong spark.dynamicAllocation.maxExecutor=30&lt;BR /&gt;--conf spark.dynamicAllocation.minExecutor=15&lt;BR /&gt;--code.py&lt;BR /&gt;---------------------------------&lt;BR /&gt;"Error occurred in save_daily_report : An error occurred while calling o149.save.&lt;BR /&gt;:org.apache.spark.sparkException:Job aborted&lt;BR /&gt;at org.apache.spark.sql.execution.datasource.FileFormatWriter$.write(FileFormatWriter.scala:198)&lt;BR /&gt;at org.apache.spark.sql.execution.datasource.InsertIntoHadoopFSRelationCommand.run(InsertIntoHadoopFSRelationCommand.scala:159)&lt;BR /&gt;at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(command.scala:104)&lt;BR /&gt;...&lt;BR /&gt;..&lt;BR /&gt;..&lt;BR /&gt;caused by org.apache.spark.sparkException:Job aborted due to stage Failure:Task 26 in stage 211.0 failed 4 times,most recent&lt;BR /&gt;failure:Lost task 26.3 in stage 211.0&lt;BR /&gt;ExecutorLostFailure(executor xxx exited caused by one the the running task)Reason:Container maked as failed&lt;BR /&gt;Container exited with a non-zero exit code 143&lt;BR /&gt;Killed by external signal&lt;BR /&gt;&lt;BR /&gt;Driver stacktrace:&lt;BR /&gt;at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failedJobAndIndependent&lt;BR /&gt;Stages (DAGScheduler.scala:1890)&lt;BR /&gt;&lt;BR /&gt;------------------&lt;/P&gt;</description>
    <pubDate>Sun, 30 Oct 2022 14:56:42 GMT</pubDate>
    <dc:creator>CodeHeaven</dc:creator>
    <dc:date>2022-10-30T14:56:42Z</dc:date>
    <item>
      <title>In pyspark dataframe.write.json() adds extra empty line at the end.Is is good idea to write more records using in df.write.json</title>
      <link>https://community.cloudera.com/t5/Support-Questions/In-pyspark-dataframe-write-json-adds-extra-empty-line-at-the/m-p/356466#M237297</link>
      <description>&lt;P&gt;We have a dataframe with 5 columns with nearly 1.5 Billion records.&lt;/P&gt;&lt;P&gt;We are need to write that a single line (single records) as json.&lt;/P&gt;&lt;P&gt;We are facing two issues&lt;/P&gt;&lt;P&gt;df.write.format('json') is writing all as single line or single record but there is second line (empty line is coming) we need to avoid it&lt;BR /&gt;df.write.format('json').save('somedir_in_HDFS') is giving error&lt;BR /&gt;We want the save the file as single file so that the downstream application will read it&lt;/P&gt;&lt;P&gt;Here is the sample code.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;from pyspark.sql import SparkSession&lt;BR /&gt;import pyspark.sql.functions as f&lt;BR /&gt;from pyspark.sql.types import *&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;schema = StructType([&lt;BR /&gt;StructField("author", StringType(), False),&lt;BR /&gt;StructField("title", StringType(), False),&lt;BR /&gt;StructField("pages", IntegerType(), False),&lt;BR /&gt;StructField("email", StringType(), False)&lt;BR /&gt;])&lt;/P&gt;&lt;P&gt;# likewise we have 1.5 billion records&lt;BR /&gt;data = [&lt;BR /&gt;["author1", "title1", 1, "author1@gmail.com"],&lt;BR /&gt;["author2", "title2", 2, "author2@gmail.com"],&lt;BR /&gt;["author3", "title3", 3, "author3@gmail.com"],&lt;BR /&gt;["author4", "title4", 4, "author4@gmail.com"]&lt;BR /&gt;]&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;if __name__ == "__main__":&lt;BR /&gt;spark=SparkSession.builder.appName("Test").enableHiveSupport().getOrCreate()&lt;BR /&gt;df = spark.createDataFrame(data, schema)&lt;BR /&gt;dfprocessed=df #here we are doing lots of joins with other tables&lt;BR /&gt;dfprocessed = dfprocessed.agg(&lt;BR /&gt;f.collect_list(f.struct(f.col('author'), f.col('title'), f.col('pages'), f.col('email'))).alias("list-item"))&lt;BR /&gt;dfprocessed = dfprocessed.withColumn("version", f.lit("1.0").cast(IntegerType()))&lt;BR /&gt;dfprocessed.printSchema()&lt;BR /&gt;dfprocessed.write.format("json").mode("overwrite").option("escape", "").save('./TestJson')&lt;BR /&gt;#the above write adds one extra empty line&lt;/P&gt;&lt;P&gt;---------------------------------&lt;BR /&gt;spark-submit --conf spark.port.maxRetries=100 --master yarn --deploy-mode cluster&lt;BR /&gt;--executor memory 16g --executor-core 4 --driver-memory 16g&lt;BR /&gt;--conf spark.driver.maxResultSize=10g&lt;BR /&gt;--conf spark.dynamicAllocation.enabled=true&lt;BR /&gt;--cong spark.dynamicAllocation.maxExecutor=30&lt;BR /&gt;--conf spark.dynamicAllocation.minExecutor=15&lt;BR /&gt;--code.py&lt;BR /&gt;---------------------------------&lt;BR /&gt;"Error occurred in save_daily_report : An error occurred while calling o149.save.&lt;BR /&gt;:org.apache.spark.sparkException:Job aborted&lt;BR /&gt;at org.apache.spark.sql.execution.datasource.FileFormatWriter$.write(FileFormatWriter.scala:198)&lt;BR /&gt;at org.apache.spark.sql.execution.datasource.InsertIntoHadoopFSRelationCommand.run(InsertIntoHadoopFSRelationCommand.scala:159)&lt;BR /&gt;at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(command.scala:104)&lt;BR /&gt;...&lt;BR /&gt;..&lt;BR /&gt;..&lt;BR /&gt;caused by org.apache.spark.sparkException:Job aborted due to stage Failure:Task 26 in stage 211.0 failed 4 times,most recent&lt;BR /&gt;failure:Lost task 26.3 in stage 211.0&lt;BR /&gt;ExecutorLostFailure(executor xxx exited caused by one the the running task)Reason:Container maked as failed&lt;BR /&gt;Container exited with a non-zero exit code 143&lt;BR /&gt;Killed by external signal&lt;BR /&gt;&lt;BR /&gt;Driver stacktrace:&lt;BR /&gt;at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failedJobAndIndependent&lt;BR /&gt;Stages (DAGScheduler.scala:1890)&lt;BR /&gt;&lt;BR /&gt;------------------&lt;/P&gt;</description>
      <pubDate>Sun, 30 Oct 2022 14:56:42 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/In-pyspark-dataframe-write-json-adds-extra-empty-line-at-the/m-p/356466#M237297</guid>
      <dc:creator>CodeHeaven</dc:creator>
      <dc:date>2022-10-30T14:56:42Z</dc:date>
    </item>
    <item>
      <title>Re: In pyspark dataframe.write.json() adds extra empty line at the end.Is is good idea to write more records using in df.write.json</title>
      <link>https://community.cloudera.com/t5/Support-Questions/In-pyspark-dataframe-write-json-adds-extra-empty-line-at-the/m-p/383374#M244882</link>
      <description>&lt;P&gt;Make sure&amp;nbsp;&lt;SPAN&gt;dfprocessed &lt;STRONG&gt;datafrmae&lt;/STRONG&gt; doesn't contains any empty rows.&amp;nbsp;&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;In Spark, you can identify and filter out empty rows in a DataFrame using the filter operation. Empty rows typically have null or empty values across all columns.&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;// Identify and filter out empty rows
val nonEmptyRowsDF = df.filter(not(df.columns.map(col(_).isNull).reduce(_ || _)))&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;This code uses the filter operation along with the not function and a condition that checks if any column in a row is null. It then removes rows where all columns are null or empty.&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;If you want to check for emptiness based on specific columns, you can specify those columns in the condition:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;val columnsToCheck = Array("column1", "column2", "column3")
val nonEmptyRowsDF = df.filter(not(columnsToCheck.map(col(_).isNull).reduce(_ || _)))&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Adjust the column names based on your DataFrame structure. The resulting nonEmptyRowsDF will contain rows that do not have null or empty values in the specified columns.&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 09 Feb 2024 12:05:29 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/In-pyspark-dataframe-write-json-adds-extra-empty-line-at-the/m-p/383374#M244882</guid>
      <dc:creator>ggangadharan</dc:creator>
      <dc:date>2024-02-09T12:05:29Z</dc:date>
    </item>
  </channel>
</rss>

