Member since
11-11-2016
43
Posts
4
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
721 | 08-22-2017 09:21 AM | |
690 | 02-25-2017 03:18 AM |
10-30-2018
10:27 AM
Here is the command that i use to Import data from RDS into S3 . sqoop import \
-D mapreduce.map.memory.mb=6144 -D mapreduce.map.java.opts=-Xmx1024m \
--connect jdbc:mysql://a205067-pcfp-rds-abcd.dfgfdg.us-east-1.rds.amazonaws.com/tprdb \
--username tpruser \
--password Welcome12345 \
--query 'SELECT d.* from DnB_WB_UniverseMaster d join DnB_WB_UniverseMaster_Incr c on d.DunsNumber = c.DunsNumber where $CONDITIONS' \
--boundary-query "SELECT * FROM
(
SELECT
MIN( DunsNumber ) min_
from
DnB_WB_UniverseMaster
) v1,
(
SELECT
MAX( DunsNumber ) max_
from
DnB_WB_UniverseMaster
) v2" \
--split-by d.DunsNumber \
-num-mappers 100 \
--fields-terminated-by '|' \
--lines-terminated-by '\n' \
--target-dir s3://12345-pcfp-latest-new/output/processing/APCFP/IMPORT When i run it last mapper is getting stuck at 99% and it does not move at all . It always stuck at 99%. Same thing when i do it for the single table then it works fine . Can some one suggest the changes .
... View more
Labels:
10-06-2017
05:38 AM
This is how i load my csv file in spark data frame val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://MAIN")
val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
val df1Final=df1result.withColumn("DataPartition", lit(null: String)) This is example of one of my input file name .
Fundamental.FinancialLineItem.FinancialLineItem.SelfSourcedPrivate.CUS.1.2017-09-07-1056.Full Now i want to read this file and split it with "." operator and then add CUS as new column in place of DataPartition .
... View more
Labels:
08-22-2017
09:21 AM
So i will answer my question
here is what was needed to make it work
Because we use HBase to store our data and this reducer outputs its result to HBase table, Hadoop is telling us that he doesn’t know how to serialize our data. That is why we need to help it. Inside setUp set the io.serializations variable hbaseConf.setStrings("io.serializations", new String[]{hbaseConf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName()});
... View more
08-21-2017
05:56 AM
Hi Jay, Thanks for responding I don't have such property in the core-site.xml . Here is the details also . <property>
<name>fs.defaultFS</name>
<value>hdfs://quickstart.cloudera:8020</value>
</property>
<!-- OOZIE proxy user setting -->
<property>
<name>hadoop.proxyuser.oozie.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.oozie.groups</name>
<value>*</value>
</property>
<!-- HTTPFS proxy user setting -->
<property>
<name>hadoop.proxyuser.httpfs.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.httpfs.groups</name>
<value>*</value>
</property>
<!-- Llama proxy user setting -->
<property>
<name>hadoop.proxyuser.llama.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.llama.groups</name>
<value>*</value>
</property>
<!-- Hue proxy user setting -->
<property>
<name>hadoop.proxyuser.hue.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hue.groups</name>
<value>*</value>
</property>
</configuration>
... View more
08-21-2017
05:23 AM
I have taken the Hbase table backup using Hbase Export utility tool .
I got all data transferred into HDFS correctly in sequence file format .
Now i want to run mapreduce to read the key value from the output file but getting below exception
java.lang.Exception: java.io.IOException: Could not find a deserializer for the Value class: 'org.apache.hadoop.hbase.client.Result'. Please ensure that the configuration 'io.serializations' is properly configured, if you're using custom serialization.
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:406)
Caused by: java.io.IOException: Could not find a deserializer for the Value class: 'org.apache.hadoop.hbase.client.Result'. Please ensure that the configuration 'io.serializations' is properly configured, if you're using custom serialization.
at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1964)
at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1811)
at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1760)
at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1774)
at org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader.initialize(SequenceFileRecordReader.java:50)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:478)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:671)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
Here is my driver code package SEQ;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SeqDriver extends Configured implements Tool
{
public static void main(String[] args) throws Exception{
int exitCode = ToolRunner.run(new SeqDriver(), args);
System.exit(exitCode);
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.printf("Usage: %s needs two arguments files\n",
getClass().getSimpleName());
return -1;
}
String outputPath = args[1];
FileSystem hfs = FileSystem.get(getConf());
Job job = new Job();
job.setJarByClass(SeqDriver.class);
job.setJobName("SequenceFileReader");
HDFSUtil.removeHdfsSubDirIfExists(hfs, new Path(outputPath), true);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Result.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapperClass(MySeqMapper.class);
job.setNumReduceTasks(0);
int returnValue = job.waitForCompletion(true) ? 0:1;
if(job.isSuccessful()) {
System.out.println("Job was successful");
} else if(!job.isSuccessful()) {
System.out.println("Job was not successful");
}
return returnValue;
}
} Here is my mapper code package SEQ;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MySeqMapper extends Mapper <ImmutableBytesWritable, Result, Text, Text>{
@Override
public void map(ImmutableBytesWritable row, Result value,Context context)
throws IOException, InterruptedException {
}
}
... View more
Labels:
06-03-2017
07:04 PM
Solved it after using correct path Create snapshot snapshot 'FundamentalAnalytic','FundamentalAnalyticSnapshot' Export Snapshot to local hdfs hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot -snapshot FundamentalAnalyticSnapshot -copy-to /tmp -mappers 16 Driver Job Configuration to rum mapreduce on Hbase snapshot
String snapshotName="FundamentalAnalyticSnapshot";
Path restoreDir = new Path("hdfs://quickstart.cloudera:8020/tmp");
String hbaseRootDir = "hdfs://quickstart.cloudera:8020/hbase";
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, // snapshot name
scan, // Scan instance to control CF and attribute selection
DefaultMapper.class, // mapper class
NullWritable.class, // mapper output key
Text.class, // mapper output value
job,
true,
restoreDir); Also running mapreduce on Hbase snapshot will skip scan on Hbase table and also there will be no impact on region server.
... View more
06-01-2017
03:54 PM
I did not get your point snapshot to exist in the HBase installation.Do i have to move snapshot somewhere ? When i take snapshot will this not automatically available in the Hbase directory . Also i changes the restorePath as hdfs://quickstart.cloudera:8020/hbase. java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: hdfs://quickstart.cloudera:8020bd03e5d6-bb0a-46ac-b900-65ae0fe0a439
... View more
06-01-2017
09:28 AM
In order to avoid full table scan on Hbase table i thought to run mapreduce on Hbase table snapshot . I have created snapshot of my table using below command snapshot 'FundamentalAnalytic','FundamentalAnalyticSnapshot' After that to run mapreduce i have to transfer it to my local HDFS .So i ran export command like following and copy it to tmp dir . hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot -snapshot FundamentalAnalyticSnapshot -copy-to /tmp/ -mappers 16 It got copied successfully not i ran mapreduce job that has driver code like this. String snapshotName="FundamentalAnalyticSnapshot";
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, DefaultMapper.class, NullWritable.class, Text.class, job,true, new Path("/tmp"); But it throw error org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException: Couldn't read snapshot info from:file:/tmp/hbase-cloudera/hbase/.hbase-snapshot/FundamentalAnalyticSnapshot/.snapshotinfo at org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils.readSnapshotInfo(SnapshotDescriptionUtils.java:294) at org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper.copySnapshotForScanner(RestoreSnapshotHelper.java:818) at org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.setInput(TableSnapshotInputFormatImpl.java:355) at org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.setInput(TableSnapshotInputFormat.java:204) at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableSnapshotMapperJob(TableMapReduceUtil.java:335) at com.thomsonretuers.hbase.HBaseToFileDriver.run(HBaseToFileDriver.java:128) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at com.thomsonretuers.hbase.HBaseToFileDriver.main(HBaseToFileDriver.java:75) Caused by: java.io.FileNotFoundException: File file:/tmp/hbase-cloudera/hbase/.hbase-snapshot/FundamentalAnalyticSnapshot/.snapshotinfo does not exist I know i am doing some mistake in not exporting snapshot to correct dir . Please help me . Thanks, Sudarshan
... View more
Labels:
05-24-2017
04:30 AM
1 Kudo
Hi i have developed an application where i have to store TB of data for the first time and then 20 GB monthly incremental like insert/update/delete in the form of xml that will be applied on top of this 5 TB of data .
And finally on request basis i have to generate full snapshot of all data and create 5K text files based on the logic so that respective data should be in the respective files .
I have done this project using HBase . I have created 35 tables in the HBase having region from 10 to 500 .
I have my data in my HDFS and the using mapreduce i bulk load data into receptive Hbase tables .
After that i have SAX parser application written in java to parse all incoming xml incremental files and update HBase tables .The frequency of the xml files are approx 10 xml files per minutes and total of 2000 updates .
The incremental message are strictly in order .
Finally on request basis i run my last mapreduce application to scan all Hbase table and create 5K text files and deliver it to the client . All 3 steps are working fine but when i went to deploy my application on production server that is shared cluster ,the infrastructure team are not allowing us to run my application because i do full table scan on HBase .
I have used 94 node cluster and the biggest HBase table data that i have is approx 2 billions . All other tables has less than a millions of data .
Total time for mapreduce to scan and create text files takes 2 hours.
Now i am looking for some other solution to implement this .
I can use HIVE because i have records level insert/update and delete that too in very precise manner.
I have also integrated HBase and HIVE table so that for incremental data HBase table will be used and for full table scan HIVE will be used . But as HIVE uses Hbase storage handler i cant create partition in HIVE table and that is why HIVE full table scan becomes very very slow even 10 times slower that HBase Full table scan
I cant think of any solution right now kind of stuck . Please help me with some other solution where HBase is not involved .
Can i use AVRO or perquet file in this use case .But i am not sure how AVRO will support record level update .
... View more
Labels:
04-19-2017
11:52 AM
Hi i have an application that reads records from HBase and writes into text files HBase table has 200 regions.
I am using MultipleOutputs in the mapper class to write into multiple files and i am making file name from the incoming records . I am making 40 unique file names . I am able to get records properly but my problem is that when mapreduce finishes it creates 40 files and also 2k extra files with proper name but appended
with m-000 and so on. This is because i have 200 regions and MultipleOutputs creates files for each mapper so 200 mapper and for each mapper there are 40 unique files so that is why it creates 40*200 files .
I don't know how to avoid this situation without custom partitioner .
Is there any way to force write records into belonging files only not to split into multiple files. I have used custom partitioner class and its working fine but i don't want to use that as i am just reading from HBase and not doing reducer operation. Also if any extra file name i have to create then i have to change my code also .
Here is my mapper code public class DefaultMapper extends TableMapper<NullWritable, Text> {
private Text text = new Text();
MultipleOutputs<NullWritable, Text> multipleOutputs;
String strName = "";
@Override()
public void setup(Context context) throws java.io.IOException, java.lang.InterruptedException {
multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
}
String FILE_NAME = new String(value.getValue(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
Bytes.toBytes(HbaseBulkLoadMapperConstants.FILE_NAME)));
multipleOutputs.write(NullWritable.get(), new Text(text.toString()),FILE_NAME);
} No reducer class This is how my output looks like ideally only one Japan.BUS.gz file should be created.Other files are very small files also
Japan.BUS-m-00193.gz
Japan.BUS-m-00194.gz
Japan.BUS-m-00195.gz
Japan.BUS-m-00196.gz
... View more
Labels:
04-13-2017
10:36 AM
I have replaced multipleOutputs.write(NullWritable.get(), new Text(sb.toString()), strName);
with
context.write()
and i got the correct output .
... View more
04-13-2017
04:57 AM
Hi i am running an application which reads records from HBase and writes into text files . I have used combiner in my application and custom partitioner also.I have used 41 reducer in my application because i need to create 40 reducer output file that satisfies my condition in custome partitioner class . All working fine but when i use combiner in my application it creates map output file per regions . Foe example i have 40 regions in my application then it create 40 *41 mapoutput files . Data in the files are correct but no of files increased . Any idea how can i get only 40 files. // Reducer Class
job.setCombinerClass(CommonReducer.class);
job.setReducerClass(CommonReducer.class); // reducer class
below is my Job details Submitted: Mon Apr 10 09:42:55 CDT 2017
Started: Mon Apr 10 09:43:03 CDT 2017
Finished: Mon Apr 10 10:11:20 CDT 2017
Elapsed: 28mins, 17sec
Diagnostics:
Average Map Time 6mins, 13sec
Average Shuffle Time 17mins, 56sec
Average Merge Time 0sec
Average Reduce Time 0sec Here is Reducer/Combiner Code import java.io.IOException;
import org.apache.log4j.Logger;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class CommonCombiner extends Reducer<NullWritable, Text, NullWritable, Text> {
private Logger logger = Logger.getLogger(CommonCombiner.class);
private MultipleOutputs<NullWritable, Text> multipleOutputs;
String strName = "";
private static final String DATA_SEPERATOR = "\\|\\!\\|";
public void setup(Context context) {
logger.info("Inside Combiner.");
multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
}
@Override
public void reduce(NullWritable Key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
final String valueStr = value.toString();
StringBuilder sb = new StringBuilder();
if ("".equals(strName) && strName.length() == 0) {
String[] strArrFileName = valueStr.split(DATA_SEPERATOR);
String strFullFileName[] = strArrFileName[1].split("\\|\\^\\|");
strName = strFullFileName[strFullFileName.length - 1];
String strArrvalueStr[] = valueStr.split(DATA_SEPERATOR);
if (!strArrvalueStr[0].contains(HbaseBulkLoadMapperConstants.FF_ACTION)) {
sb.append(strArrvalueStr[0] + "|!|");
}
multipleOutputs.write(NullWritable.get(), new Text(sb.toString()), strName);
context.getCounter(Counters.FILE_DATA_COUNTER).increment(1);
continue;
}
String strArrvalueStr[] = valueStr.split(DATA_SEPERATOR);
}
}
public void cleanup(Context context) throws IOException, InterruptedException {
multipleOutputs.close();
}
}
... View more
Labels:
03-31-2017
05:52 AM
Customer needs data in the proper file .Even if one file will have 10 kb data also .
... View more
03-31-2017
05:51 AM
No i can not go for PIG now my full application is developed on mapreduce .
... View more
03-09-2017
03:19 AM
Hi me too getting the same error .i Did kinit but still exception persist.
... View more
03-04-2017
03:48 AM
My mapreduce has to read records from HBase and need to write into zip files. Client has asked specifically that the reducer output files should be .zip files only .
For this i have written ZipFileOutputFormat wrapper to compress the records and write into the zip files .
All seems ok but there is one problem .
1. zip file is getting created for each key .
Inside my output file i can see many output file and those are separate file per row key .
I dont know how to combined it inside the zip file .
Here is my implementation of the ZipFileOutputFormat.java
@Override public void write(K key, V value) throws IOException { String fname = null; if (key instanceof BytesWritable) { BytesWritable bk = (BytesWritable) key; fname = new String(bk.getBytes(), 0, bk.getLength()); } else { fname = key.toString(); } ZipEntry ze = new ZipEntry(fname); zipOut.closeEntry(); zipOut.putNextEntry(ze); if (value instanceof BytesWritable) { zipOut.write(((BytesWritable) value).getBytes(), 0,((BytesWritable) value).getLength()); } else { zipOut.write(value.toString().getBytes()); } }
... View more
Labels:
03-02-2017
06:49 AM
I was able to do it explicitly after my job finishes and thats ok for me.No delay in the job
if (b){ DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-HHmm"); Calendar cal = Calendar.getInstance(); String strDate=dateFormat.format(cal.getTime()); FileSystem hdfs = FileSystem.get(getConf()); FileStatus fs[] = hdfs.listStatus(new Path(args[1])); if (fs != null){ for (FileStatus aFile : fs) { if (!aFile.isDir()) { hdfs.rename(aFile.getPath(), new Path(aFile.getPath().toString()+".txt")); } } } }
... View more
03-02-2017
05:15 AM
Yes but in this way also r-00000 will not be removed .
... View more
03-02-2017
05:04 AM
I am able to rename my reducer output file correctly but r-00000 is still persisting .
I have used MultipleOutputs in my reducer class .
Here is details of the that .Not sure what am i missing or what extra i have to do?
public class MyReducer extends Reducer<NullWritable, Text, NullWritable, Text> { private Logger logger = Logger.getLogger(MyReducer.class); private MultipleOutputs<NullWritable, Text> multipleOutputs; String strName = ""; public void setup(Context context) { logger.info("Inside Reducer."); multipleOutputs = new MultipleOutputs<NullWritable, Text>(context); } @Override public void reduce(NullWritable Key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { final String valueStr = value.toString(); StringBuilder sb = new StringBuilder(); sb.append(strArrvalueStr[0] + "|!|"); multipleOutputs.write(NullWritable.get(), new Text(sb.toString()),strName); } } public void cleanup(Context context) throws IOException, InterruptedException { multipleOutputs.close(); } }
... View more
Labels:
02-28-2017
05:09 AM
i have an application where i read HBase and write records into files. Final output should be in .zip compressed format not hadoop supported format . For this i have used custom ZipFileOutputFormat to get records in .zip files. Here is my implementation ZipFileOutputFormat.setOutputPath(job, new Path(args[1]));
This is details of the ZipFileOutputFormat.class public class ZipFileOutputFormat extends FileOutputFormat<NullWritable, Text> {
@Override
public RecordWriter<NullWritable, Text> getRecordWriter(
TaskAttemptContext job) throws IOException, InterruptedException {
Path file = getDefaultWorkFile(job, ".zip");
FileSystem fs = file.getFileSystem(job.getConfiguration());
return new ZipRecordWriter(fs.create(file, false));
}
public static class ZipRecordWriter extends
RecordWriter<NullWritable, Text> {
protected ZipOutputStream zos;
public ZipRecordWriter(FSDataOutputStream os) {
zos = new ZipOutputStream(os);
}
@Override
public void write(NullWritable key, Text value) throws IOException,
InterruptedException {
// TODO: create new ZipEntry & add to the ZipOutputStream (zos)
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
zos.close();
}
}
}
I am not getting any error but my output in still in r-000001 format . Am i missing any configuration here ?
... View more
Labels:
02-25-2017
03:18 AM
Finally i manged to resolve it . I just used multipleOutputs.write(NullWritable.get(), new Text(sb.toString()),strName); inside the for loop and that solved my problem .I have tested it with very huge data set 19 gb file and it worked fine for me . This is my final solution .Initially i thought it might create many objects but it is working fine for me .Map reduce is also getting competed very fast .
... View more
02-13-2017
05:24 AM
@Artem Ervits i have edited my code and posted .Can you please have a look
... View more
02-12-2017
06:01 PM
How can i change that .If i define StringBuilder in setup() method will it be accesible in the reduce () method.Do i have to pass as parameter ?
... View more
02-12-2017
12:38 PM
I am getting JAVA Heap space error in my reducer phase .I have used 41 reducer in my application and also Custom Partitioner class . Below is my reducer code that throws below error . Here is my reducer code.. public class MyReducer extends Reducer<NullWritable, Text, NullWritable, Text> {
private Logger logger = Logger.getLogger(MyReducer.class);
StringBuilder sb = new StringBuilder();
private MultipleOutputs<NullWritable, Text> multipleOutputs;
public void setup(Context context) {
logger.info("Inside Reducer.");
multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
}
@Override
public void reduce(NullWritable Key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
final String valueStr = value.toString();
if (valueStr.contains("Japan")) {
sb.append(valueStr.substring(0, valueStr.length() - 20));
} else if (valueStr.contains("SelfSourcedPrivate")) {
sb.append(valueStr.substring(0, valueStr.length() - 29));
} else if (valueStr.contains("SelfSourcedPublic")) {
sb.append(value.toString().substring(0, valueStr.length() - 29));
} else if (valueStr.contains("ThirdPartyPrivate")) {
sb.append(valueStr.substring(0, valueStr.length() - 25));
}
}
multipleOutputs.write(NullWritable.get(), new Text(sb.toString()), "MyFileName");
}
public void cleanup(Context context) throws IOException, InterruptedException {
multipleOutputs.close();
}
}
17/02/1205:26:45 INFO mapreduce.Job: map 98% reduce 0%17/02/1205:28:02 INFO mapreduce.Job: map 100% reduce 0%17/02/1205:28:09 INFO mapreduce.Job: map 100% reduce 17%17/02/1205:28:10 INFO mapreduce.Job: map 100% reduce 39%17/02/1205:28:11 INFO mapreduce.Job: map 100% reduce 46%17/02/1205:28:12 INFO mapreduce.Job: map 100% reduce 51%17/02/1205:28:13 INFO mapreduce.Job: map 100% reduce 54%17/02/1205:28:14 INFO mapreduce.Job: map 100% reduce 56%17/02/1205:28:15 INFO mapreduce.Job: map 100% reduce 88%17/02/1205:28:17/02/12 06:21:45 INFO mapreduce.Job: map 78% reduce 0%
17/02/12 06:21:46 INFO mapreduce.Job: map 82% reduce 0%
17/02/12 06:21:47 INFO mapreduce.Job: map 85% reduce 0%
17/02/12 06:21:48 INFO mapreduce.Job: map 87% reduce 0%
17/02/12 06:21:49 INFO mapreduce.Job: map 88% reduce 0%
17/02/12 06:21:50 INFO mapreduce.Job: map 93% reduce 0%
17/02/12 06:21:51 INFO mapreduce.Job: map 94% reduce 0%
17/02/12 06:21:53 INFO mapreduce.Job: map 95% reduce 0%
17/02/12 06:21:58 INFO mapreduce.Job: map 96% reduce 0%
17/02/12 06:21:59 INFO mapreduce.Job: map 97% reduce 0%
17/02/12 06:22:02 INFO mapreduce.Job: map 98% reduce 0%
17/02/12 06:23:46 INFO mapreduce.Job: map 99% reduce 0%
17/02/12 06:23:50 INFO mapreduce.Job: map 100% reduce 0%
17/02/12 06:23:54 INFO mapreduce.Job: map 100% reduce 12%
17/02/12 06:23:55 INFO mapreduce.Job: map 100% reduce 32%
17/02/12 06:23:56 INFO mapreduce.Job: map 100% reduce 46%
17/02/12 06:23:57 INFO mapreduce.Job: map 100% reduce 51%
17/02/12 06:23:58 INFO mapreduce.Job: map 100% reduce 54%
17/02/12 06:23:59 INFO mapreduce.Job: map 100% reduce 59%
17/02/12 06:24:00 INFO mapreduce.Job: map 100% reduce 88%
17/02/12 06:24:01 INFO mapreduce.Job: map 100% reduce 90%
17/02/12 06:24:03 INFO mapreduce.Job: map 100% reduce 93%
17/02/12 06:24:06 INFO mapreduce.Job: map 100% reduce 95%
17/02/12 06:24:06 INFO mapreduce.Job: Task Id : attempt_1486663266028_2715_r_000020_0, Status : FAILED
Error: Java heap space
17/02/12 06:24:07 INFO mapreduce.Job: map 100% reduce 93%
17/02/12 06:24:10 INFO mapreduce.Job: Task Id : attempt_1486663266028_2715_r_000021_0, Status : FAILED
Error: Java heap space
17/02/12 06:24:11 INFO mapreduce.Job: map 100% reduce 91%
17/02/12 06:24:11 INFO mapreduce.Job: Task Id : attempt_1486663266028_2715_r_000027_0, Status : FAILED
Error: Java heap space
17/02/12 06:24:12 INFO mapreduce.Job: map 100% reduce 90%
... View more
Labels:
02-12-2017
07:12 AM
I have changed the code Now i am passing Key,Value both as Text but still getting same error .
... View more
02-12-2017
06:08 AM
I have a requirement where i need only value from HBase not row-key to write into the output file .
For that i have used NullWritable.class as my map-output key type.
Now i have to partition my output data based on the columns value .But as we know custom partition works based on key and for that i am getting below exception. This is where i am getting eception if (partition < 0 || partition >= partitions) {
throw new IOException("Illegal partition for " + key + " (" +
partition + ")");
} Caused by: java.io.IOException: Illegal partition for (null) (40)
Here is my driver code that i am using . TableMapReduceUtil.initTableMapperJob(args[0], // input table
scan, // Scan instance to control CF and attribute selection
DefaultMapper.class, // mapper class
NullWritable.class, // mapper output value
Text.class, // mapper output key
job);
This is my Partition code public class FinancialLineItemPartioner extends Partitioner< NullWritable,Text> {
public int getPartition(NullWritable key, Text value, int setNumRedTask) {
String str = key.toString();
if (str.contains("Japan|^|BUS")) {
return 0;
} else if (str.contains("Japan|^|CAS")) {
return 1;
} else if (str.contains("Japan|^|CUS")) {
return 2;
}else {
return 3;
}
Please suggest .. Note :If i interchange the map-output key/value parameter then my reducer is not working .
... View more
Labels:
12-21-2016
10:50 AM
2 Kudos
Hi , I have implemented custom partition based on my logic .and i am able to get files also properly .But because of the condition some of the reducer is having very huge data and that leads to delay in reducer phase. So is there any way so that i can create many small files inside one reducer output file . Here is my custom partioner public class MyPartioner extends Partitioner<Text, IntWritable> {
public int getPartition(Text key, IntWritable value, int setNumRedTask) {
String str = key.toString();
if (str.contains("Japan|2014")) {
return 0;
} else if (str.contains("Japan|2013")) {
return 1;
} else if (str.contains("Japan|2012")) {
return 2;
} else if (str.contains("Japan|2011")) {
return 3;
} else
return 4;
} First condition have very huge amount of data like 20 GB but last will have 12 mb .
... View more
Labels:
12-01-2016
05:08 AM
Phoenix is not installed on my cluster as it is not bundeled in CDH 5 distribution ,Any other idea
... View more
11-30-2016
07:43 AM
Hi I have a hive table on HBASE that has 200gb of records .
I am running simple hive query to fetch 20 gb records .
But this takes around 4 hours of time .
I can not create partition on HIVE table cause it is integrated on HBASE.
Please suggest any idea to improve performance
This is my HIVE query
INSERT OVERWRITE LOCAL DIRECTORY '/hadoop/user/m6034690/FSDI/FundamentalAnalytic/FundamentalAnalytic_2014.txt'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
select * from hbase_table_FundamentalAnalytic where FilePartition='ThirdPartyPrivate' and FilePartitionDate='2014';
... View more
Labels:
11-29-2016
09:17 AM
I have 95 Live Nodes in my cluster and REGIONS_COUNT i am passing as 90
... View more