Created 08-13-2018 02:14 PM
Hi,
I am recieving data from TCP as a json stream using pyspark.
I want to save the files(append files and basically a file is a minute based ex:yyyyMMddHHmm (file), so all messages in one min should go to the corresponding file) and parallelly I want to save the json to orc hive table.
I have two questions involved
1.
*[path : '/folder/file']
When I receive data in Dstream I flatMap and split("\n") and then repartition(1).saveAsTextfile(path,"json")
lines = ssc.socketTextStream("localhost", 9999) flat_map = lines.flatMap(lambda x: x.split("\n")) flat_map.repartition(1).saveAsTextFiles(path,"json")
The above saves to the path given, but instead of giving one single file per minute and save to the folder, this makes three folders with a _SUCCESS file and a part_00000 file in every folder, which is not expected.
Please help me how to solve this as expected : basically one folder per day and one file per minute under the folder?
2. If I want to save the json to orc hive table.. can I do it from a dstream? or I have to change the dstream to rdd and then perform some processing to save it to orc?
as I am new to pyspark please help with the above or with some examples.
Created 08-14-2018 12:29 PM
Hi @Mark,
Here are my suggestions:
1. Previous to saving the rdd, I recommend you transform it to a dataframe and use the dataframewriter:
https://spark.apache.org/docs/2.3.1/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
As per your requirement to avoid the directory and part file names, I believe this is not possible out of the box. You can write a single part file but the directory will be created by default. You can read more here:
One of the possible solutions is to write to a temporary directory and then move the single file renaming it to the appropriate folder. You can have a single file created inside the temporary directory by using the coalesce method like this:
df.coalesce(1).write.format("json").mode("overwrite").save("temp_dir/test.json")
2. For saving json to orc hive table unless you plan to store it as string column you will need to parse the json and use flatmap to get correct columns you like to store. You can review the dataframewriter api saveAsTable method and example:
And also check out this article that shows how to append to an orc table:
http://jugsi.blogspot.com/2017/12/append-data-with-spark-to-hive-oarquet.html
As always if you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.
Thanks!
Created 08-14-2018 08:46 AM
@Felix Albani can you please suggest
Created 08-14-2018 12:29 PM
Hi @Mark,
Here are my suggestions:
1. Previous to saving the rdd, I recommend you transform it to a dataframe and use the dataframewriter:
https://spark.apache.org/docs/2.3.1/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
As per your requirement to avoid the directory and part file names, I believe this is not possible out of the box. You can write a single part file but the directory will be created by default. You can read more here:
One of the possible solutions is to write to a temporary directory and then move the single file renaming it to the appropriate folder. You can have a single file created inside the temporary directory by using the coalesce method like this:
df.coalesce(1).write.format("json").mode("overwrite").save("temp_dir/test.json")
2. For saving json to orc hive table unless you plan to store it as string column you will need to parse the json and use flatmap to get correct columns you like to store. You can review the dataframewriter api saveAsTable method and example:
And also check out this article that shows how to append to an orc table:
http://jugsi.blogspot.com/2017/12/append-data-with-spark-to-hive-oarquet.html
As always if you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.
Thanks!
Created 08-14-2018 12:36 PM
@Mark Sorry, I just realized you were looking for pyspark solution and I provided the scala references instead. Everything I mentioned above also applies to pyspark, and the DataFrameWriter api link is here:
https://spark.apache.org/docs/2.3.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter
HTH
Created 08-14-2018 12:42 PM
@Mark One last thing, you may want to reconsider the saving of the files ever minute. If files are small then you will endup causing a problem to HDFS Namenode in long term. This is a known issue:
https://community.hortonworks.com/questions/167615/what-is-small-file-problem-in-hdfs.html
We recommend to avoid writing lots of small files but rather trying to keep them at least the size of hdfs block.
Created 08-14-2018 03:17 PM
Thanks for the helping arm, I will go through them and could ask for suggsetions if required.
Thank you.