Support Questions

Find answers, ask questions, and share your expertise

saving TCP stream in to hive using pyspark

avatar
Expert Contributor

Hi,

I am recieving data from TCP as a json stream using pyspark.

I want to save the files(append files and basically a file is a minute based ex:yyyyMMddHHmm (file), so all messages in one min should go to the corresponding file) and parallelly I want to save the json to orc hive table.

I have two questions involved

1.

*[path : '/folder/file']

When I receive data in Dstream I flatMap and split("\n") and then repartition(1).saveAsTextfile(path,"json")

lines = ssc.socketTextStream("localhost", 9999)
flat_map = lines.flatMap(lambda x: x.split("\n"))
flat_map.repartition(1).saveAsTextFiles(path,"json")

The above saves to the path given, but instead of giving one single file per minute and save to the folder, this makes three folders with a _SUCCESS file and a part_00000 file in every folder, which is not expected.

Please help me how to solve this as expected : basically one folder per day and one file per minute under the folder?

2. If I want to save the json to orc hive table.. can I do it from a dstream? or I have to change the dstream to rdd and then perform some processing to save it to orc?

as I am new to pyspark please help with the above or with some examples.

1 ACCEPTED SOLUTION

avatar

Hi @Mark,

Here are my suggestions:

1. Previous to saving the rdd, I recommend you transform it to a dataframe and use the dataframewriter:

https://spark.apache.org/docs/2.3.1/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

As per your requirement to avoid the directory and part file names, I believe this is not possible out of the box. You can write a single part file but the directory will be created by default. You can read more here:

https://community.hortonworks.com/questions/142479/pyspark-creating-directory-when-trying-to-rdd-as-...

One of the possible solutions is to write to a temporary directory and then move the single file renaming it to the appropriate folder. You can have a single file created inside the temporary directory by using the coalesce method like this:

df.coalesce(1).write.format("json").mode("overwrite").save("temp_dir/test.json")

2. For saving json to orc hive table unless you plan to store it as string column you will need to parse the json and use flatmap to get correct columns you like to store. You can review the dataframewriter api saveAsTable method and example:

https://spark.apache.org/docs/2.3.1/api/scala/index.html#org.apache.spark.sql.DataFrameWriter@saveAs...

And also check out this article that shows how to append to an orc table:

http://jugsi.blogspot.com/2017/12/append-data-with-spark-to-hive-oarquet.html

As always if you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.

Thanks!

View solution in original post

5 REPLIES 5

avatar
Expert Contributor

@Felix Albani can you please suggest

avatar

Hi @Mark,

Here are my suggestions:

1. Previous to saving the rdd, I recommend you transform it to a dataframe and use the dataframewriter:

https://spark.apache.org/docs/2.3.1/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

As per your requirement to avoid the directory and part file names, I believe this is not possible out of the box. You can write a single part file but the directory will be created by default. You can read more here:

https://community.hortonworks.com/questions/142479/pyspark-creating-directory-when-trying-to-rdd-as-...

One of the possible solutions is to write to a temporary directory and then move the single file renaming it to the appropriate folder. You can have a single file created inside the temporary directory by using the coalesce method like this:

df.coalesce(1).write.format("json").mode("overwrite").save("temp_dir/test.json")

2. For saving json to orc hive table unless you plan to store it as string column you will need to parse the json and use flatmap to get correct columns you like to store. You can review the dataframewriter api saveAsTable method and example:

https://spark.apache.org/docs/2.3.1/api/scala/index.html#org.apache.spark.sql.DataFrameWriter@saveAs...

And also check out this article that shows how to append to an orc table:

http://jugsi.blogspot.com/2017/12/append-data-with-spark-to-hive-oarquet.html

As always if you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.

Thanks!

avatar

@Mark Sorry, I just realized you were looking for pyspark solution and I provided the scala references instead. Everything I mentioned above also applies to pyspark, and the DataFrameWriter api link is here:

https://spark.apache.org/docs/2.3.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter

HTH

avatar

@Mark One last thing, you may want to reconsider the saving of the files ever minute. If files are small then you will endup causing a problem to HDFS Namenode in long term. This is a known issue:

https://community.hortonworks.com/questions/167615/what-is-small-file-problem-in-hdfs.html

We recommend to avoid writing lots of small files but rather trying to keep them at least the size of hdfs block.

avatar
Expert Contributor
@Felix Albani

Thanks for the helping arm, I will go through them and could ask for suggsetions if required.

Thank you.