Member since
10-30-2022
1
Post
0
Kudos Received
0
Solutions
10-30-2022
07:56 AM
We have a dataframe with 5 columns with nearly 1.5 Billion records. We are need to write that a single line (single records) as json. We are facing two issues df.write.format('json') is writing all as single line or single record but there is second line (empty line is coming) we need to avoid it df.write.format('json').save('somedir_in_HDFS') is giving error We want the save the file as single file so that the downstream application will read it Here is the sample code. from pyspark.sql import SparkSession import pyspark.sql.functions as f from pyspark.sql.types import * schema = StructType([ StructField("author", StringType(), False), StructField("title", StringType(), False), StructField("pages", IntegerType(), False), StructField("email", StringType(), False) ]) # likewise we have 1.5 billion records data = [ ["author1", "title1", 1, "author1@gmail.com"], ["author2", "title2", 2, "author2@gmail.com"], ["author3", "title3", 3, "author3@gmail.com"], ["author4", "title4", 4, "author4@gmail.com"] ] if __name__ == "__main__": spark=SparkSession.builder.appName("Test").enableHiveSupport().getOrCreate() df = spark.createDataFrame(data, schema) dfprocessed=df #here we are doing lots of joins with other tables dfprocessed = dfprocessed.agg( f.collect_list(f.struct(f.col('author'), f.col('title'), f.col('pages'), f.col('email'))).alias("list-item")) dfprocessed = dfprocessed.withColumn("version", f.lit("1.0").cast(IntegerType())) dfprocessed.printSchema() dfprocessed.write.format("json").mode("overwrite").option("escape", "").save('./TestJson') #the above write adds one extra empty line --------------------------------- spark-submit --conf spark.port.maxRetries=100 --master yarn --deploy-mode cluster --executor memory 16g --executor-core 4 --driver-memory 16g --conf spark.driver.maxResultSize=10g --conf spark.dynamicAllocation.enabled=true --cong spark.dynamicAllocation.maxExecutor=30 --conf spark.dynamicAllocation.minExecutor=15 --code.py --------------------------------- "Error occurred in save_daily_report : An error occurred while calling o149.save. :org.apache.spark.sparkException:Job aborted at org.apache.spark.sql.execution.datasource.FileFormatWriter$.write(FileFormatWriter.scala:198) at org.apache.spark.sql.execution.datasource.InsertIntoHadoopFSRelationCommand.run(InsertIntoHadoopFSRelationCommand.scala:159) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(command.scala:104) ... .. .. caused by org.apache.spark.sparkException:Job aborted due to stage Failure:Task 26 in stage 211.0 failed 4 times,most recent failure:Lost task 26.3 in stage 211.0 ExecutorLostFailure(executor xxx exited caused by one the the running task)Reason:Container maked as failed Container exited with a non-zero exit code 143 Killed by external signal Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failedJobAndIndependent Stages (DAGScheduler.scala:1890) ------------------
... View more
Labels:
- Labels:
-
Apache Spark