- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Putting SequenceFile key value into a data frame
Created 04-20-2022 04:05 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I saved thousands of small json files in SequenceFile format to resolve the "small file issue". I use the following pyspark code to parse the json data from saved sequence files.
reader= sc.sequenceFile("/mysequencefile_dir", "org.apache.hadoop.io.Text", "org.apache.hadoop.io.Text")
rdd=reader.map(lambda x: x[1])
mydf=spark.read.schema(myschema).json(rdd)
mydf.show(truncate=False)
The code worked. However, I do not know how to put the key value from the sequence file, which is actually the original json file name, into the mydf dataframe. Please advise. Thank you.
Regards,
Created 04-20-2022 10:27 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@Seaport ,
Please try the below:
import json
def jsonize(k, v):
ret = json.loads(v)
ret.update({'key': k})
return ret
...
rdd = reader.map(lambda x: jsonize(*x))
...
You need to make sure your schema includes the added key column.
Cheers,
André
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Created 04-20-2022 05:46 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I did a workaround by injecting the myfilepath element into the json string.
rdd=reader.map(lambda x: str(x[1])[0]+'"myfilepath":"'+x[0]+'",'+str(x[1])[1:])
It does not look like a very clean solution. Is there a better one? Thanks.
Regards
Created 04-20-2022 10:27 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@Seaport ,
Please try the below:
import json
def jsonize(k, v):
ret = json.loads(v)
ret.update({'key': k})
return ret
...
rdd = reader.map(lambda x: jsonize(*x))
...
You need to make sure your schema includes the added key column.
Cheers,
André
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Created 04-21-2022 05:07 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
André,
Thanks for the elegant solution.
Regards,
