Created 01-16-2017 10:03 AM
Hello All,
I require to import and parse xml files in Hadoop.
I have an old pig 'REGEX_EXTRACT' script parser that works fine but takes a sometime to run, arround 10-15mins.
In the last 6 months, I have started to use spark, with large success in improving run time. So I am trying to move the old pig script into spark using databricks xml parser. Mentioned in the following posts: http://community.hortonworks.com/questions/71538/parsing-xml-in-spark-rdd.html http://community.hortonworks.com/questions/66678/how-to-convert-spark-dataframes-into-xml-files.html The version used is; http://github.com/databricks/spark-xml/tree/branch-0.3
The script I try to run is similar to:
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.orc._ import org.apache.spark.sql._ import org.apache.hadoop.fs._ import com.databricks.spark import com.databricks.spark.xml import org.apache.spark.sql.functions._ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType} // drop table val dfremove = hiveContext.sql("DROP TABLE FileExtract") // Create schema val xmlSchema = StructType(Array( StructField("Text1", StringType, nullable = false), StructField("Text2", StringType, nullable = false), StructField("Text3", StringType, nullable = false), StructField("Text4", StringType ,nullable = false), StructField("Text5", StringType, nullable = false), StructField("Num1", IntegerType, nullable = false), StructField("Num2", IntegerType, nullable = false), StructField("Num3", IntegerType, nullable = false), StructField("Num4", IntegerType, nullable = false), StructField("Num5", IntegerType, nullable = false), StructField("Num6", IntegerType, nullable = false), StructField("AnotherText1", StringType, nullable = false), StructField("Num7", IntegerType, nullable = false), StructField("Num8", IntegerType, nullable = false), StructField("Num9", IntegerType, nullable = false), StructField("AnotherText2", StringType, nullable = false) )) // Read file val df = hiveContext.read.format("com.databricks.spark.xml").option("rootTag", "File").option("rowTag", "row").schema(xmlSchema).load("hdfs://MyCluster/RawXMLData/RecievedToday/File/Files.tar.gz") // select val selectedData = df.select("Text1", "Text2", "Text3", "Text4", "Text5", "Num1", "Num2", "Num3", "Num4", "Num5", "Num6", "AnotherText1", "Num7", "Num8", "Num9", "AnotherText2" ) selectedData.write.format("orc").mode(SaveMode.Overwrite).saveAsTable("FileExtract")
The xml file looks similar to:
<?xml version="1.0"?> <File> <row> <Text1>something here</Text1> <Text2>something here</Text2> <Text3>something here</Text3> <Text4>something here</Text4> <Text5>something here</Text5> <Num1>2</Num1> <Num2>1</Num2> <Num3>1</Num3> <Num4>0</Num4> <Num5>1</Num5> <Num6>0</Num6> <AnotherText1>something here</AnotherText1> <Num7>2</Num7> <Num8>0</Num8> <Num9>0</Num9> <AnotherText2>something here</AnotherText2> </row> <row> <Text1>something here</Text1> <Text2>something else here</Text2> <Text3>something new here</Text3> <Text4>something here</Text4> <Text5>something here</Text5> <Num1>2</Num1> <Num2>1</Num2> <Num3>1</Num3> <Num4>0</Num4> <Num5>1</Num5> <Num6>0</Num6> <AnotherText1>something here</AnotherText1> <Num7>2</Num7> <Num8>0</Num8> <Num9>0</Num9> <AnotherText2>something here</AnotherText2> </row> ... ... </File>
Many xml files are zipped together. Hence the tar.gz file.
This runs. However for a 400MB file it takes 50mins to finish.
Does anyone have an idea why it is so slow, or how I may speed it up? I am running on a 7 machine cluster with about 120GB Yarn memory, with hortonworks HDP-2.5.3.0 and spark 1.6.2.
Many thanks in Advance!
Created 01-16-2017 12:00 PM
One problem may be partioning: the spark app may not know how to divide processing the .tar.gz amongst many workers, so is handing it off to one. That's a problem with .gz files in general.
I haven't done any XML/tar processing work in spark myself, so am not confident about where to begin. You could look at the history server to see how work was split up. Otherwise: try throwing the work at spark as a directory full of XML files (maybe .gz individually), rather than a single .tar.gz. If that speeds up, then it could be a sign that partitioning is the problem. It would then become a matter of working out how to split up those original 400MB source fies into a smaller set (e.g. 20 x 20MB files), & see if that parallelized better
Created 01-18-2017 10:19 AM
Please see my post below
Created 01-16-2017 12:27 PM
I gave it a quick try and created 50 xml files according to your structure each having 60MB. Tested on 3 workers (each 7 core 26GB per worker)
1) The tar.gz file had 450MB and took 14min with 1 (!) executor. Since it is a tar file, only one executor reads the file.
2) Putting all files as single xml.gz in one folder and starting the job again I had 3 executors involved and the job got done in under 5 min (roughly the 14 min / 3 since no shuffle required)
So I see two issues here:
1) Don't use tar.gz
2) 50 min compared to 14 min: How fast is your machine (cores, ...)?
Created 01-18-2017 10:19 AM
Please see my post below
Created 01-18-2017 10:04 AM
Thanks for a quick reply.
I am using a mixed environment (for dev):
The reason why we tar.gz the files is because we receive may small xml files, 25,000. Loading these files into hadoop will take over 4 hours. tar.gz reduces the load time to around 10mins as well as reducing the size from 14GB to 0.4GB.
I have tried removing the tar.gz, the speed becomes 1h45. This is likely to be the result of many small files.
To add, the pig parser maybe faster because the XML structure is hardcoded. This wants to be avoided because we have experienced machines changing the way the xml is produced so the Spark parsing is more robust.
Ideally, we would like to use the more robust spark parser but have the load into hadoop at around 10min and the processing time at around 10mins.
Any ideas? One idea is to tar.gz into multiple files, i.e. 25,000 into 10 files. the load time would be ~10mins, processing time somewhere in between 10mins and 50mins.
Does anyone have:
Created 01-20-2017 02:08 PM
There's also HAR files, "Hadoop archive files", which are a halfway house between a tar file and unexpanded files: they all live in a single .har file, but the work is split up into the analytics code as independent XML files.
see Har files
Created 01-25-2017 11:12 AM
So I have changed the way I tar.gz the files.
At first I tried to create files of the size of 128mb (about 4 files), then 64mb (about 8-10 files), and then 1mb (100+).
Obviously, this alters the amount of tasks that run. The task run faster the smaller the file, except one!
One task always takes ~50mins.
Why does this happen? How do I speed up this task?
Created 01-26-2017 09:05 AM
Hi Antin,
For XML files I would recommend using the sequencefile file container. Create one big sequencefile containing all your xml files. A sequencefile works similarly to a map with entries consisting of a key and a value. The key to each entry in your sequencefile could be for example the "filename + date" and the value the content of the XML file. The advantage about using a file container such as Sequencefiles, AVRO or Parquet is that they are splittable when compressed with Gz. However to improve compression & decompression speed I would recommend the Snappy compression codec.
The following Stackoverflow discussion thread will also clarify things.
Mark
Created 03-20-2017 02:45 PM
Thanks Mark. I have looked into your suggestions.
Which has lead me to LZO Compression;
http://blog.cloudera.com/blog/2009/11/hadoop-at-twitter-part-1-splittable-lzo-compression/
I think this may be something I try next. Do you have any suggestions with this? Doesn't HDP already comes with LZO? The link is a good few years old. should I try something else before I spend a few hows with this? My company is not keen on me spending a few hours writing Java sequenceFile jar.