Created 09-06-2016 03:02 PM
Please be conscious of answers... We're very newbie to HortonWorks. We've got 3 x 6 VERY LARGE files that need to be joined and stored so that we can query it as an EXTERNAL table from Hive. The cluster is 6 compute nodes, 8 cores and 128 GB RAM on each node, and a 5-node Isilon feeding the HDFS filesystem. Each of the compute nodes have 3 partitions of 1 TB dedicated to /, /data, & /var. We're running Ambari, and I'm running this through an enabled Pig View, though not on Tez cause I don't want to struggle with the configuration to get Pig enabled on Tez at this current moment.
We've got 3 directories, each with 6 partitions of fixed-width files. The 3 directories each have their own fixed-width format. Total size of 3 directories: 120 GB, 210 GB, 700 GB. Yes, this is Big Data, and I thought that's what HortonWorks is supposed to be able to manage. The layout of each directory generally looks like this
-rwxrwxrwx 3 root 1000000 19332000000 2016-07-14 17:31 file01.txt
-rwxrwxrwx 3 root 1000000 19332000000 2016-07-14 17:42 file02.txt
...
I'm following the suggestions in this website: http://deathbytape.com/articles/2015/08/21/pig-hive-query-data.html
The pig script is a bit of a monster in terms of length, but its relatively simple. Pseudocode:
1) Define 3 schemas using LOAD and org.apache.pig.piggybank.storage.FixedWidthLoader().
2) Join 3 files on their respective keys.
3) Describe final table and Dump 5 lines to ensure everything looks alright.
4) STORE final table [USING OrcStorage(); as suggested by the hyperlink for speed and efficiency] in a new directory so that we can eventually build a Hive EXTERNAL table to call the schema we've defined.
Again:
data1 = LOAD 'hdfs://fileshare:8020/directory1' USING org.apache.pig.piggybank.storage.FixedWidthLoader(
'1-10 ,11-27 ,28-31 ,32-35 ,36-39 ,40-47 ,48-50 ,51-54 ,55-58 ... ', '', 'col1a long, col2a int, ... etc')
data2 = LOAD 'hdfs://fileshare:8020/directory2' USING org.apache.pig.piggybank.storage.FixedWidthLoader(
'1-10 ,11-19 , ... ', '', 'col1b long, col2b int, ... etc')
data3 = LOAD 'hdfs://fileshare:8020/directory3' USING org.apache.pig.piggybank.storage.FixedWidthLoader(
'1-10 ,11-27 , ... ', '', 'col1c long, col2c int, ... etc')
ALLDATA = JOIN data1 BY encrypted_keya, data2 BY encrypted_keyb, data3 BY encrypted_keyc;
DESCRIBE ALLDATA;
alias_lim = LIMIT ALLDATA 5;
DUMP alias_lim;
STORE ALLDATA INTO 'hdfs://fileshare:8020/pig_schema' USING OrcStorage();
This BROKE two of the compute nodes.... filled them both to capacity... A total of 120+210+700 GB = 1.03 TB put 3+ TB on two of the compute nodes. This implies 6 TB of junk created for running a join meant to create a 1 TB table.
So now I have a lot of questions.
A) How is this robust? Hadoop is now almost a decade old, and in the little bit of learning I've done, it is supposed to be running things like YARN, which I've learned is a "Resource Management" workhorse... but the empirical results here demonstrate that it doesn't know how to manage resources.
B) Why would 6 TB be created to manage the simple instructions required to do an inner join of approximately 1 TB of data?
C) HOW DO I GET MY COMPUTE NODES BACK TO A FUNCTIONAL STATE? There is now 6+ Terabytes of junk that I have to clean from the partitions since the two VMs running these functions have been crippled by the query. Let's say I reboot the VMs. Now what?? Can I just "rm -rf" everything in a particular directory?? Which one? Will this break the entire HortonWorks install? This seems to be so much more trouble than it's worth.
D) Last question from a bird's eye view: In general, how fragile is Hadoop? In a normal ETL environment, it seems that testing needs to be done to determine the most efficient way of ingesting and transforming data. If I'm going to test multiple methods of ingesting a TB of data, I can't take a week of time to tiptoe around 19 levels of configurations before feeling "safe" enough to run some commands because it might just blow up the cluster and force a reinstallation of the entire configuration, HortonWorks distro, and permissions, etc. Who has time for this ridiculous amount of overhead??
Created 09-28-2016 07:23 PM
Thank you for your detailed description of your issues.
There is a single overarching theme to my answer: your cluster is not properly sized for the processing you are doing. Big Data on Hadoop leverages horizontal scaling and like all data processing ... it can reach resource constraints under a given implementation.
I had a similar situation happen to me the first time I worked on Hadoop. I had 2.53 billion records that I bulk uploaded to HBase where each record held 57 columns. I was on a 8 node cluster and the first time I bulk loaded to HBase it brought zookeeper, hbase and the cluster to its knees and then to a groaning death. Ultimate root cause was the number of zookepper connections were configured way too low (for the extreme workload I threw at it). I had to configure these and then I bulk loaded in separate chunks as opposed to one shot. Things were still not ideal because HBase major compaction ran for hours afterwards, stressing cpu and memory on all of the nodes.
I eventually resized the cluster (added more nodes) to accommodate the load that I was throwing at the cluster.
To answer your question, you are throwing too much load at your cluster, given its size. Hadoop is famously robust but only when properly sized. Regarding your local directories filling up, pig runs map-reduce jobs under the covers and intermediate (temporary) data is written to disk between the map and reduce steps. The large amount of intermediate data (produced by your triple join of a TB of data) is spread among so few nodes in you cluster that it exceeds capacity on some of them.
My suggestion is to start with lower loads on your given cluster and learn how to optimize your jobs. For example, one common optimization is to compress your intermediate data. See this link on optimizing pig: https://community.hortonworks.com/questions/57449/fine-tune-the-pig-job.html#comment-58059
Next suggestion (after learning to optimize) is to add more data nodes to your cluster to horizontally scale the load. You could simply add nodes and not optimize ... but we always want to optimize to use resources more wisely. See this link for help on sizing your cluster: http://info.hortonworks.com/SizingGuide.html
Created 09-28-2016 07:23 PM
Thank you for your detailed description of your issues.
There is a single overarching theme to my answer: your cluster is not properly sized for the processing you are doing. Big Data on Hadoop leverages horizontal scaling and like all data processing ... it can reach resource constraints under a given implementation.
I had a similar situation happen to me the first time I worked on Hadoop. I had 2.53 billion records that I bulk uploaded to HBase where each record held 57 columns. I was on a 8 node cluster and the first time I bulk loaded to HBase it brought zookeeper, hbase and the cluster to its knees and then to a groaning death. Ultimate root cause was the number of zookepper connections were configured way too low (for the extreme workload I threw at it). I had to configure these and then I bulk loaded in separate chunks as opposed to one shot. Things were still not ideal because HBase major compaction ran for hours afterwards, stressing cpu and memory on all of the nodes.
I eventually resized the cluster (added more nodes) to accommodate the load that I was throwing at the cluster.
To answer your question, you are throwing too much load at your cluster, given its size. Hadoop is famously robust but only when properly sized. Regarding your local directories filling up, pig runs map-reduce jobs under the covers and intermediate (temporary) data is written to disk between the map and reduce steps. The large amount of intermediate data (produced by your triple join of a TB of data) is spread among so few nodes in you cluster that it exceeds capacity on some of them.
My suggestion is to start with lower loads on your given cluster and learn how to optimize your jobs. For example, one common optimization is to compress your intermediate data. See this link on optimizing pig: https://community.hortonworks.com/questions/57449/fine-tune-the-pig-job.html#comment-58059
Next suggestion (after learning to optimize) is to add more data nodes to your cluster to horizontally scale the load. You could simply add nodes and not optimize ... but we always want to optimize to use resources more wisely. See this link for help on sizing your cluster: http://info.hortonworks.com/SizingGuide.html