Created 04-20-2022 04:05 PM
I saved thousands of small json files in SequenceFile format to resolve the "small file issue". I use the following pyspark code to parse the json data from saved sequence files.
reader= sc.sequenceFile("/mysequencefile_dir", "org.apache.hadoop.io.Text", "org.apache.hadoop.io.Text")
rdd=reader.map(lambda x: x[1])
mydf=spark.read.schema(myschema).json(rdd)
mydf.show(truncate=False)
The code worked. However, I do not know how to put the key value from the sequence file, which is actually the original json file name, into the mydf dataframe. Please advise. Thank you.
Regards,
Created 04-20-2022 10:27 PM
@Seaport ,
Please try the below:
import json
def jsonize(k, v):
ret = json.loads(v)
ret.update({'key': k})
return ret
...
rdd = reader.map(lambda x: jsonize(*x))
...
You need to make sure your schema includes the added key column.
Cheers,
André
Created 04-20-2022 05:46 PM
I did a workaround by injecting the myfilepath element into the json string.
rdd=reader.map(lambda x: str(x[1])[0]+'"myfilepath":"'+x[0]+'",'+str(x[1])[1:])
It does not look like a very clean solution. Is there a better one? Thanks.
Regards
Created 04-20-2022 10:27 PM
@Seaport ,
Please try the below:
import json
def jsonize(k, v):
ret = json.loads(v)
ret.update({'key': k})
return ret
...
rdd = reader.map(lambda x: jsonize(*x))
...
You need to make sure your schema includes the added key column.
Cheers,
André
Created 04-21-2022 05:07 PM
André,
Thanks for the elegant solution.
Regards,