Member since
01-16-2018
25
Posts
2
Kudos Received
0
Solutions
04-01-2019
07:04 AM
I have created a mapreduce program which reads some files from local and stores them to hdfs after processing. When I am trying to run this on HDP cluster it reads null value when all files are processed in the folder and throws NullPointerException. This problem is only coming in aws HDP cluster, whereas in our local dev environment all the components are installed separately and the same program is running perfectly. We are using hdp 2.6.2 version and all the components on dev environment are same as hdp 2.6.2.
... View more
Labels:
03-25-2019
05:43 AM
I am writing a Mapreduce program to process Dicom images. The purpose of this Mapreduce program is to process the dicom image, extract metadata from it, index to solr and finally in Reducer phase it should save the raw image in hdfs. I want to save the same file in HDFS as a reducer output So I have achieved most of the functionality, but in reducer phase when storing the same file in hdfs it is not working. I have tested the processed Dicom file with a dicom image viewer and it says the file is curropted and also the size of processed dicom file is slightly increase. **Ex.** Original Dicom size is 628Kb and when reducer save this file in hdfs it size changes to 630Kb. I have tried solution from these links but none of them give the expected results. Here is the code for Reading Dicom file as a single file (without splitting it). public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
}
Custom RecordReader public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable>{
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) {
byte[] contents = new byte[(int) fileSplit.getLength()];
System.out.println("Inside nextKeyvalue");
System.out.println(fileSplit.getLength());
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public void close() throws IOException {
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException
{
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return processed ? 1.0f : 0.0f;
}
}
Mapper Class The mapper class working perfectly as per our need. public class MapClass{
public static class Map extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>{
@Override
protected void map(NullWritable key, BytesWritable value,
Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
throws IOException, InterruptedException {
value.setCapacity(value.getLength());
InputStream in = new ByteArrayInputStream(value.getBytes());
ProcessDicom.metadata(in); // Process dicom image and extract metadata from it
Text keyOut = getFileName(context);
context.write(keyOut, value);
}
private Text getFileName(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
{
InputSplit spl = context.getInputSplit();
Path filePath = ((FileSplit)spl).getPath();
String fileName = filePath.getName();
Text text = new Text(fileName);
return text;
}
@Override
protected void setup(Mapper<NullWritable, BytesWritable, Text, BytesWritable>.Context context)
throws IOException, InterruptedException {
super.setup(context);
}
}
Reducer Class This is the reducer class. public class ReduceClass{
public static class Reduce extends Reducer<Text, BytesWritable, BytesWritable, BytesWritable>{
@Override
protected void reduce(Text key, Iterable<BytesWritable> value,
Reducer<Text, BytesWritable, BytesWritable, BytesWritable>.Context context)
throws IOException, InterruptedException {
Iterator<BytesWritable> itr = value.iterator();
while(itr.hasNext())
{
BytesWritable wr = itr.next();
wr.setCapacity(wr.getLength());
context.write(new BytesWritable(key.copyBytes()), itr.next());
}
}
}
Main Class public class DicomIndexer{
public static void main(String[] argss) throws Exception{
String args[] = {"file:///home/b3ds/storage/dd","hdfs://192.168.38.68:8020/output"};
run(args);
}
public static void run(String[] args) throws Exception {
//Initialize the Hadoop job and set the jar as well as the name of the Job
Configuration conf = new Configuration();
Job job = new Job(conf, "WordCount");
job.setJarByClass(WordCount.class);
// job.getConfiguration().set("mapreduce.output.basename", "hi");
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(BytesWritable.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
WholeFileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
So I completely clueless what do to do. Some of the link says it is not possible as Mapreduce works on <Key,value> pair and some says to use NullWritable. So far I have tried NullWritable, SequenceFileOutputFormat, but none of them working.
... View more
Labels:
09-29-2018
05:31 AM
Is there any way to do the same task using zookeeper ?
... View more
09-28-2018
12:24 PM
We are working on application which contains Spark, hdfs, and kafka.
We want to deploy this application on existing HDP cluster. So what would be the best approach/way to deploy this application on HDP in less time. What I want to do is to create some script that will co-ordinate with ambari and findout which component is already installed on existing HDP. For. Ex if some HDP cluster doesnot contain spark then it will automatically download (from hortonwork repo) and configure spark on that HDP cluster otherwise simply load all tasks/jobs.
Can I use zookeeper to detect which service is installed and detect its state (running/stopped/maintainence) ?
... View more
Labels:
09-13-2018
05:08 AM
I wanted to register a java function as udf in spark. I am using java to build the spark application. Java class that contain function. public class Decryption{
public String decrypt(final String encryptedText){
return new String(decryptedText);
}
} Spark code I am trying to do something like this spark.udf().register("decrypt", decryption.decrypt(encryptedText))
... View more
Labels:
06-19-2018
08:56 AM
I'd seen that already but its just for reading not for changing. I need to read and then manipulate the variable's value
... View more
06-19-2018
05:10 AM
I want to access the process group variable from execute script and then need to change its value. I am using python. I have read the article which describe how to access flowfile attributes but not variables. My requirement is when some job complete successfully then some value is to be stored in variable.
... View more
Labels:
06-05-2018
05:07 AM
I am trying to read zip file in nifi execute processor and I am using python as a scripting language. When I run the script it throws no viable alternative input at line 25 (flowFile = session.get()). What is the real cause behind this. Here is my script from zipfile import ZipFile
from org.apache.nifi.processor.io import InputStreamCallback
import java.io
import json
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class ReadVersion(InputStreamCallback):
def __init__(self):
self.flowFile = None
self.version = ''
self.error = ''
def process(self,inputStream):
try:
zipname = self.flowFile.getAttribute('filename')
zippath = self.flowFile.getAttribute('absolute.path')
zfile = zipfile.ZipFile(zippath+zipname)
with zipFile(zfile,'r') as zip:
pageview = zip.read('pageview.json').decode("utf-8")
pageview = json.loads(clients)
pam = zip.read('pam.json')
pam= json.loads(Company.decode("utf-8") )
flowFile = session.get()
if (flowFile != None):
callback = ReadVersion()
callback.flowFile = flowFile
session.read(flowFile, callback)
if (callback.version != ''):
flowFile = session.putAttribute(flowFile,'MSVersion',callback.version)
session.transfer(flowFile, REL_SUCCESS)
if (callback.error == 'error'):
session.transfer(ff, REL_FAILURE)
... View more
Labels:
06-04-2018
07:23 AM
I am trying to create some tables in Hive from Apache NIFI but I didn't find any exact Processor for that. However I found a processor name PutHiveQl which can be used for DDL/DML operation but I didn't find any property in which I can write the query. If it is the right processor for this purpose then how it can be used in my case.
... View more
Labels:
05-28-2018
12:14 PM
I want to identify two flowfiles coming from two different processors in execute script processor. Suppose one processor is getFile processor and reading a file and second is getmongo processor, now I want to do some operation on both the flow files in same execute script processor. Let say I want to compare the elements in both the flow file.
... View more
Labels:
05-11-2018
10:19 AM
I am trying to create external table in hive of druid storage handler type. When I am trying to fetch the data from hive it throws the below exception. Failed with exception java.io.IOException:java.io.IOException: org.apache.hive.druid.com.fasterxml.jackson.core.JsonParseException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
at [Source: org.apache.hive.druid.com.metamx.http.client.io.AppendableByteArrayInputStream@4b916cc2; line: 1, column: 2]
at org.apache.hive.druid.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581)
at org.apache.hive.druid.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533)
at org.apache.hive.druid.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:462)
at org.apache.hive.druid.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2613)
at org.apache.hive.druid.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:841)
at org.apache.hive.druid.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:737)
at org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3776)
at org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3721)
at org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2803)
at org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat.distributeSelectQuery(DruidQueryBasedInputFormat.java:227)
at org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat.getInputSplits(DruidQueryBasedInputFormat.java:160)
at org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat.getSplits(DruidQueryBasedInputFormat.java:104)
at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextSplits(FetchOperator.java:372)
at org.apache.hadoop.hive.ql.exec.FetchOperator.getRecordReader(FetchOperator.java:304)
at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:459)
at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:428)
at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:147)
at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:2208)
at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:253)
at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:184)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:403)
at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:821)
at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:759)
at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:686)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
The datasource is present in hdfs and I can query the datasource using superset. I have created the table in hive using below format. create external table sample stored by 'org.apache.hadoop.hive.druid.DruidStorageHandler' TBLPROPERTIES ("druid.datasource" = "demo");
... View more
Labels:
04-09-2018
06:30 AM
I have query in sql server which I want to simplify so that it can be Hive compatible. This is the query written in SQL format SELECT session_id,
Substring ((SELECT ( ';' + tag_name )
FROM session_tag st2
WHERE st2.session_id = st.session_id
FOR xml path ( '' )), 2, 1000) AS tags
FROM session_tag
GROUP BY session_id; So I tried to simplify these query to get the same result the first query shows nothing SELECT SUBSTRING(';'+tag_name,2,1000) as tag from session_tag st1
where st1.session_id = (select st2.session_id from session_tag st2 where st1.session_id = st2.session_id for xml path (''))
... View more
Labels:
04-06-2018
11:54 AM
I am trying to split an array of record using SplitJson processor. But it fails to split the record. I am unable to find the correct expression for my json. Here is my json [
{"id":"30fb76fa-acbe-463b-830e-66f203bb0911","session_id":"804e8d5b-c266-92b7-4a1d-eed3650d3b4a","tag_name":"call"},
{"id":"23986d19-c91f-4d98-8cfd-08fb26c5ff85","session_id":"804e8d5b-c266-92b7-4a1d-eed3650d3b4a","tag_name":"direct-call"},
{"id":"7c374ae9-b96a-4383-85ce-6d45cbc5f8a4","session_id":"804e8d5b-c266-92b7-4a1d-eed3650d3b4a","tag_name":"homepage"},
{"id":"599bf3e0-2c76-4d38-8349-91cc04e34c33","session_id":"b8f17ef9-d7df-dec0-71e3-3ed28991d396","tag_name":"bounce"},
{"id":"55791f8a-3243-48b3-bb4a-70404a21148d","session_id":"b8f17ef9-d7df-dec0-71e3-3ed28991d396","tag_name":"homepage"}
]
I want split each record as seprate flowfile, means there will be five flow files. What is the correct Json path expression ?
... View more
Labels:
04-05-2018
11:30 AM
I am building a job in which I have to validate phone numbers and we wants to use 'google-libphonenumber' npm package. I am using javascript in executescript processor. What is the correct way to include the npm package?
... View more
Labels:
04-02-2018
03:04 PM
I am creating a Job in Apache NIFI which loads the data from mongodb in json format and then convert it to csv. I am using execute script to manipulate some data in json. So the data is coming from GetMongo processor it then it passes to the Executescript. I am using Javascript to manipulate the data. Problem: When I set the Result per flow file property to default it throws a error that certain property of flowfile is undefined. But if I changed it to 1 or more it runs successfully. Ex. in the below script it throws cannot read property packageName from undefined. This is my java script var flowFile = session.get();
if (flowFile != null) {
var newFlowFile = [];
var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback")
var IOUtils = Java.type("org.apache.commons.io.IOUtils")
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets")
var index = 0;
flowFile = session.write(flowFile,new StreamCallback(function(inputStream,outputStream){
var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
var obj = JSON.parse(JSON.stringify(text));
for(var i in obj)
{
newFlowFile[i] = obj[i];
newFlowFile[i]["id"] = obj[i]._id;
newFlowFile[i]["package_name"] = obj[i].interceptionInfos.packagename || null;
newFlowFile[i]["package_version"] = obj[i].interceptionInfos.packageVersionName || null;
newFlowFile[i]["sdk_version"] = obj[i].interceptionInfos.sdkVersionName || null;
newFlowFile[i]["app_id"] = obj[i].appId || null;
delete newFlowFile[i]._id;
delete newFlowFile[i].interceptionInfos;
}
newFlowFile = JSON.parse(JSON.stringify(newFlowFile));
outputStream.write(JSON.stringify(newFlowFile).getBytes(StandardCharsets.UTF_8))
}));
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
} This is the screenshot of GetMongoProcessor. Remember if I set the property result per flow file to 1 or more it runs successfully. Any Idea why this is happening
... View more
Labels:
04-02-2018
06:08 AM
1 Kudo
I am using apache nifi to convert json to csv. I want to change the headers of the generated csv . Is there any specific processor for this. I know how to achieve this using ExecuteScript processor but is there any easy approach. Ex. "_id", "name","time" to "id", "browser_name","duration"
... View more
- Tags:
- apache-nifi
- csv
Labels:
03-29-2018
09:19 AM
Thanks setting max bin age property works.
... View more
03-29-2018
09:07 AM
No I am not getting any error. It just hang and do nothing. I mean flow files are reaching to MergeProcessor after after converting record but after that nothing happens. You can see in the new image.
... View more
03-29-2018
08:56 AM
@Abdelkrim Hadjidj I tried. In this case it results in failure.
... View more
03-29-2018
08:12 AM
I am using apache nifi and retrieving data in bulk from mongodb in json format and converting to csv but the problem is multiple csv is generating for each json record. How could I merge all the csv in NIFI. I have tried MergeRecord processor but still multiple csv are generating. I not sure whether all the setting is valid for MergeRecord.
... View more
- Tags:
- apache-nifi
- csv
Labels:
03-28-2018
07:47 AM
I am developing a JSON to CSV converter job in NIFI and I have to generate a UUID for each json element and add that generated UUID to the flowfile. I don't find any suitable processor for generating UUID ? My question is how can I generate a UUID for each incoming flowfile.
... View more
Labels:
03-23-2018
01:29 AM
1 Kudo
@Rahul Soni Yes Mr. Soni thanks its working. Thanks for your time.
... View more
03-21-2018
10:16 AM
I am trying to fetch some data from mongodb using Nifi and I am using GetMongo processor. I am trying to write a query inside GetMongo but I dont have any idea how t write query inside this processor. This is my sample query db.Person.find({
$and:[
{"createdAt":{$ne:null}},
{"updatedAt":{$ne:null}}
]
}) when I write this query it shows the exclamation marks over the processor showing that "Query validated against .... is invalid because JSON was expecting a value but find db." Help me with the correct expression variables at least a good sample will be helpful.
... View more
Labels:
03-20-2018
11:24 AM
I want to convert Json coming from MongoDb to csv and I am using ConvertRecord processor for this purpose. I have configured all the required controllers and I am using AvroSchemaRegistry controller to validate the json and .JsonTreereader controller to read the json but when I run the job it it throws the error that schema not found. I have two question Which is the most proper way to do this Job. i.e Converting JSON to CSV ? How to pass schema into the JsonTreeReader This is ConvertRecord Processor Setting This my json file demo {"id":1001, "name":"vivek"}
... View more
Labels: