Member since
09-19-2017
10
Posts
5
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
5763 | 09-27-2017 12:01 PM |
06-05-2018
04:54 PM
Abdelkrim Hadjidj, yes you can do it if you know what you want to extract. This code will help if user want to load the attribute from json file , so that the attribute value is not hardcoded in the flow.xml. Often some values can be kept in variable for specific environment , eg : Dev , Test , Prod. and these can be separated out in a json file which will not change with the updates in the flow.xml With the latest version of nifi (variable registry ) this is not required. My intention is just to show the need for the same.
... View more
06-04-2018
05:43 PM
Load the json in to flowfile content . Feed this to a Executescript Processor with the below code. Note this code assume that the json does not have nested element. Hope this help import org.apache.commons.io.IOUtils
import java.nio.charset.*
def flowFile = session.get();
if (flowFile == null) {
return;
}
def slurper = new groovy.json.JsonSlurper()
def attrs = [:] as Map<String,String>
session.read(flowFile,
{ inputStream ->
def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
def obj = slurper.parseText(text)
obj.each {k,v ->
attrs[k] = v.toString()
}
} as InputStreamCallback)
def text = ''
// Cast a closure with an outputStream parameter to OutputStreamCallback
flowFile = session.write(flowFile, {outputStream ->
outputStream.write(text.getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
flowFile = session.putAllAttributes(flowFile, attrs)
session.transfer(flowFile , REL_SUCCESS)
... View more
06-04-2018
05:37 PM
Many ppl has me this question on how to load the attributes from json.
... View more
Labels:
- Labels:
-
Apache NiFi
01-29-2018
04:48 AM
Step 1 : Check Service Status: should use get request curl -u admin:admin -H "X-Requested-By:ambari"-i -X GET http://sandbox.hortonworks.com:8080/api/v1/clusters/Sandbox/services/NIFI
... View more
09-27-2017
04:01 PM
One more point use listHDFS and putHDFS than getHDFS . If you are using TDE then there is a bug in getHDFS which can happen in rare scenarios.
... View more
09-27-2017
03:59 PM
1 Kudo
I would love to make this flow complicated. > What if one putftp is success and another one failed. 🙂 do u want one to be success and other failed . or do you want to handle it like a retry
... View more
09-27-2017
03:13 PM
/-------updateattrtibute(filename) ----putftp---
Try ---ListHDFS----fetchhdfs ---------------mergecontent-----putftp --- ---
... View more
09-27-2017
12:01 PM
2 Kudos
Here i used validation_table attribute to have the table name in the flow file Create your own logic to count the rows from oracle and hive . Then merge the 2 flows using merge processor. I have created a process group to count oracle table and another for counting hive which will add a oracle_cnt attribute and hive_cnt attribute with the result. The result is merged to a single flow file by correlating using the co relation attribute name . Allso mention the attribute strategy as "keep all unique attribute"
... View more
09-27-2017
11:42 AM
Sure , You can do that with MergeContent Processor . if you are using only source and target then you can set the processor property Min no of entries to 2 and max no of entries to 2 and also mention a correlation attribute to do the merge.
... View more
09-27-2017
11:33 AM
2 Kudos
@Mohammed If your application is tollerable for near realtime processing (ie between 500ms to 2 sec ) latency then you can use spark streaming. If you application need complex processing (like join few extracted value with another stream of data and conclude some result) and also need to be real time processing then you need to go with Storm. If it you want to ingest the data and do some simple transformation then you may go with Nifi. Irrespective of above if you want to handle the reliability, congestion control , back pressure then it is good to use kafka , because the stream data will be first put in kafka and then nifi/spark/storm can pull from kafka for processing. I hope i could explain each components usage 🙂
... View more