Member since
09-24-2015
38
Posts
41
Kudos Received
6
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
4406 | 10-18-2017 01:27 PM | |
28770 | 04-22-2016 09:23 PM | |
1603 | 12-22-2015 02:41 PM | |
2237 | 12-22-2015 12:54 PM | |
4264 | 12-08-2015 03:44 PM |
10-18-2017
02:31 PM
That is correct @Rajesh Reddy, think of HDFS as a performance layer for workloads to do the inbetween work, and S3 as the place for datasets to live long term. You can then reduce the storage on your datanodes becuase its only for intermetidate processing.
... View more
10-18-2017
02:21 PM
No your clear, you cannot do what your saying. the defaultFS cannot be replaced by the S3 connector as your attempting to describe. You still need a cluster with HDFS deployed even if its a much lower volume of HDFS space. And then your jobs can 'target' the larger datasets you have stored on S3. But HDFS is still required as all the things above and as the documentation states the connectors cannot be used as an replacment for HDFS. @Rajesh Reddy Even if no data is stored in HDFS and everything is stored S3 you will still require a defaultFS for the API layer and for how processing engines work today. Drop-in replacements are block storage not object like S3 and include products like Isilon and Spectrum Scale.
... View more
10-18-2017
02:03 PM
@Rajesh Reddy explained in my above comment HDFS is used for intermediate storage of many datasets between stages depending on the workload engines being used. Additionally if these engines ie MapReduce or spark make use of a distributed cache for jars, etc they will be pushed to HDFS not S3. The link you provided talks about using S3 as a source and sink for a dataset, it does not describe replacing the entire defaultFS for HDFS with s3. The first page of the guide you linked also states this. - "These connectors are not a replacement for HDFS and cannot be used as a replacement for HDFS defaultFS ." https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.2/bk_cloud-data-access/content/intro.html
... View more
10-18-2017
01:27 PM
1 Kudo
@Rajesh Reddy At this time S3 cannot be used as an outright replacement for an HDFS deployment; so your data-lifecycle you describe would be required to be scripted and such. Today jobs that have multiple map-reduce stages will output data to HDFS for the second stage. S3 can be used as a Source and Sink for input and final datasets but is not used for the intermediate data. Also Rack-Awareness is a HDFS Block based behaviour and would not apply to S3. If you want your dataset in S3 to also be located in another AWS region you could setup S3 Cross-Region Replication. https://aws.amazon.com/blogs/aws/new-cross-region-replication-for-amazon-s3/
... View more
10-18-2017
12:31 PM
1 Kudo
@Rajesh Reddy That is correct, Replication Factor is an HDFS specific setting. When you load your data into S3 your utilising however S3 performs robust durability measures. In the document linked below they claim a "99.999999999% of durability for objects
stored within a given region." S3 still redundantly stores multiple copies over multiple regions for durability and performs many of the same actions that HDFS does in terms of detecting corrupt replica's and replacing them. https://d0.awsstatic.com/whitepapers/protecting-s3-against-object-deletion.pdf http://docs.aws.amazon.com/AmazonS3/latest/dev/DataDurability.html
... View more
04-22-2016
09:23 PM
14 Kudos
Single String vs Nested Partitions
When creating partitions by date it is almost always more effective to partition by a single string of ‘YYYY-MM-DD’ rather than use a multi-depth partition with The Year, Months, and Days all as their own. The advantage to using the single string approach allows for more SQL operators to be utilized such as LIKE, IN, and BETWEEN which cannot be as easily used if one utilizes nested partitions. Example: Dates to Select: 2015-01-01, 2015-02-03,2016-01-01 Tables Table A, Partitioned by DateStamp String as YYYY-MM-DD Table B, Partitioned by YEAR INT, Month INT, Day INT Queries on Table A All Dates SELECT * FROM TableA WHERE DateStamp IN (‘2015-01-01’, ‘2015-02-03’, ‘2016-01-01’) Only 2015 SELECT * FROM TableA WHERE DateStamp LIKE ‘2015-%’ Only 2015 and February SELECT * FROM TableA WHERE DateStamp LIKE ‘2015-02-%’ All Days that start/end with a 5 SELECT * FROM TableA WHERE DateStamp LIKE ‘%-%-%5’ All Days Between 2015-01-01 and 2015-03-01 SELECT * FROM TableA WHERE DateStamp BETWEEN ‘2015-01-01’ AND ‘2015-03-01’ Queries on Table B All Dates SELECT * FROM TableB WHERE (YEAR=2015 AND MONTH=01 AND DAY=01) OR (YEAR=2015 AND MONTH=02 AND DAY=03) OR (YEAR=2016 AND MONTH=01 AND DAY=01) Only 2015 SELECT * FROM TableB WHERE YEAR=2015 Only 2015 and February SELECT * FROM TableB WHERE YEAR=2015 and MONTH=02 All Days that start/end with a 5 SELECT * FROM TableB WHERE DAY LIKE ‘%5’ All Days Between 2015-01-01 and 2015-03-01 SELECT * FROM TableA WHERE YEAR=2015 and ((MONTH=01 OR MONTH=02) OR (MONTH =03 and DAY =01))
... View more
03-22-2016
03:45 PM
Partitions are important just to reduce the dataset size that your joining on, if your table is triple petabytes and you have to scan it each time to ingest data thats not a very smart design. That said columns do not have to be partitions!! We have a column which is the hash, we could have any number of extra columns here as well! We dont partition on the hash column at all, if your usecase can handle scanning the entire dataset each time to remove dupes then dont worry about them. One way or another you need a way to test for unquieness of the record... A left outter join does this easily with a test for null to see which records are in the 'ingest' dataset which are not in the master dataset... insert into table master_table select id, col1, col2, colN FROM ( select s.id, col1, col2, colN, m.id from Staging_table s LEFT JOIN Master_table m on (s.id == m.id) WHERE m.id is null; )
... View more
03-21-2016
09:31 PM
4 Kudos
TL DR; Use Hive on Tez... Tez can do Dynamic Partition Pruning witch lets you join partitions without putting them in your WHERE clause. IT DOES NOT WORK WITH MAPREDUCE ONLY TEZ!!! #Longer Version So this has been done before, some people look to HBase for this but the prospect of needing dedicated infrastructure for deduplication is painful at scale and may not allow for a wide enough temporal window to be stored to dedupe against. For truly big data applications where you are potentially storing into the triple digit petabytes you will want to keep it inside YARN not out of but you already assumed that much. In the applications I have worked with we have a HASHID in the record already which is ready to be ingested into a master table that we can join against. We also have partitions.. The Partitions are very important, as they let you reduce the subset of data you are going to check against we have to do our due diligence to avoid hitting the entire dataset just to be able to ingest deduped records. Using Tez we have something called Dynamic Partition Pruning which lets us drop partitions which are not doing to be used in the target 'master' table. So using a temp table we can ingest data from a staging table deduping it against the partition it resides in and ingesting it should it not exist.
PS - Left outerjoin and test for null in the WHERE is probably better for scaling then UNION DISTINCT if you are worried about a reducer problem. Same join syntax as the example below...
PS 2- We have found a fun case where if you try to use this to dedupe or clean existing master data (like a full table cleanup, no ingestion), and that's if a partition has a single record which is invalid according to new filtering logic that it will not be removed unless new records are being inserted into that partition. This is actually expected behavior because a partition is not overwritten unless its inserted into, so by filtering out the only row which would have made its way back into the table we effectively do nothing to that partition. But again, if you insert anything new which isnt filtered out, or have a good record in there then it will work.
Example - #### #### create table stage_new ( hash string, line string, p1 string, p2 string, p3 string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’ STORED AS TEXTFILE; /apps/hive/warehouse/stage_new create table master_base ( hash string, line string ) PARTITIONED BY ( p1 string, p2 string, p3 string ); set hive.exec.dynamic.partition.mode=nonstrict; INSERT OVERWRITE TABLE master_base PARTITION(p1, p2, p3) SELECT * FROM stage_new; ##test data (initaly loaded to prepopulat emaster) 12,THEDUPE,1,1,1 11,test2,1,1,2 10,test3,2,1,1 9,test4,2,1,1 ##data with dupe (put into the stage_new table after cleaning it out from the inital test data and loading it) 12,THEDUPE,1,1,1 8,test1,1,1,1 7,test2,1,1,2 6,test3,3,1,1 ##Populate set hive.exec.dynamic.partition.mode=nonstrict; create temporary table new_parts AS select p1, p2, p3 from stage_new group by p1, p2, p3; ##DEDUPE and INSERT INSERT OVERWRITE TABLE master_base PARTITION(p1, p2, p3) select hash, line, p1, p2, p3 from stage_new UNION DISTINCT select hash, line, mb.p1, mb.p2, mb.p3 from master_base mb JOIN new_parts np ON (mb.p1 = np.p1 AND mb.p2 = np.p2 AND mb.p3 = np.p3); ##master_base BEFORE Insert of dedupe required data## hive> select * from master_base; OK 12 THEDUPE 1 1 1 11 test2 1 1 2 10 test3 2 1 1 9 test4 2 1 1 ##master_base AFTER Insert## hive> select * from master_base; OK 12 THEDUPE 1 1 1 8 test1 1 1 1 11 test2 1 1 2 7 test2 1 1 2 10 test3 2 1 1 9 test4 2 1 1 6 test3 3 1 1
... View more
01-28-2016
06:19 PM
4 Kudos
Map tasks are controlled in MapReduce by default by the number of blocks, you will get 1 mapper for 1 block. This can be configured so that you can take more or less data into a single map task with the below configs, in the below case we are taking in ~1gb - 1.5gb to the maptask rather then the default block of 128mb. Reduces can be configured as some of the other comments but you cannot have more reduces then you have distinct Keys emitted from the Maps. mapreduce.input.fileinputformat.split.minsize=1000000000 mapreduce.input.fileinputformat.split.maxsize=1500000000 mapreduce.job.reduces=10
... View more
01-27-2016
02:06 AM
First we need the MR Classes, we can get that by running the job once via the CLI, then navigate to the RM UI and access the Job History, and select configuration on the left. It should take you to a table with multiple pages, on the far right is a search. You want to look for the following properties: mapreduce.job.map.class, mapreduce.job.reduce.class, mapreduce.job.combine.class, mapreduce.job.partitioner.class The value of these will need to be provided for Oozie to use the MapReduce action. If you want an example of the Oozie MapReduce action see my github here. https://github.com/josephxsxn/hdp2WordCountOozie/blob/master/ooziewc/workflow.xml The behaviour of your ShellAction sounds more like your clicking on the MApReduce job for the 'ShellAction' and not the job that the ShellAction is launching, same with the JavaAction your running the Driver/Launcher program in that MapReduce Map of the Oozie Launcher and then its launching another MapReduce job with another ApplicationID... This is why you want the MapReduce action so that you dont have redundant containers running on the cluster.
... View more