Member since
06-08-2017
1049
Posts
517
Kudos Received
312
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
9891 | 04-15-2020 05:01 PM | |
5929 | 10-15-2019 08:12 PM | |
2413 | 10-12-2019 08:29 PM | |
9568 | 09-21-2019 10:04 AM | |
3505 | 09-19-2019 07:11 AM |
09-27-2017
03:20 PM
1 Kudo
@Foivos A, Generate flowfile processor won't accept any incoming connections, Use one GetHDFS processor and connect same success relation to Two-PutFTP processor and keep the same file name. The next success relation give to UpdateAttribute in that add another property to change file name of the flowfile. First PutFTP is storing the HDFS files, storing as is into your remote location Second PutFTP parsing the HDFS files and changing the name and storing to your remote location. UpdateAttribute:- Add new property to update attribute as below ${filename}.parsed This update attribute changes the filename to samefilename(as we are having before) with .parsed as the extension to the filename by using this method you can easily find which file has been parsed and which is not parsed. UpdataAttribute Configs:- Just Remainder GetHDFS processor by default deletes the HDFS files once it get those files, if you dont want to delete the files then change the property Keep source files to True(by default this property is set to false). it's better to use List and Fetch HDFS processors to fetch the files from hdfs as listHDFS processor stores the state in it, it will catches the changes in the directory by it self. ***FYI*** if you are thinking to use ListHDFS and FetchHDFS processors then use the below link i have explained how to configure them https://community.hortonworks.com/questions/139391/hello-all-what-are-the-differences-between-service.html?childToView=139415#answer-139415 **** Flow:- Hope this will helps ...!!!
... View more
09-27-2017
01:16 PM
1 Kudo
Hi @Xtr yarhid, In Apache NiFi, Controller Services are shared services that can be used by Processors, i.e let's take if you are thinking to get the data or store the data to Hbase Hive tables then these processors need Hive,Hbase controller services First we needs to enable there services and then use them in the processors. Coming back to your question, Use ListHDFS processor this processor will store the state and run this processor for 15 mins using cron (or) timer driven if there is no changes made to the directory (or) file it won't list the flowfile, If there is any change in the directory or file then this processor gives only the new file that got changed in the directory and updates the state of processor with new file created timestamp Configure the directory property. In this way ListHDFS processor gives an flowfile with path and filename attributes which are used by FetchHDFS processor to fetch the data from HDFS directory. This processor won't do any fetching of files it will do just listing all the available files in the directory and FetchHDFS processor will do actual fetching of files. ListHDFS Configs:- in this processor i have given /user/yashu/del_test as the directory property and this process runs for every 900 sec on Timer driven, for the first time this processor lists all the files that are in del_test directory and filename,path will attributes with the flow file(if you are having 2 files in the directory then there will be 2 flowfiles each ff will have file name and path attribute to it). if you want to see the state in ListHDFS processor right click on processor and click on view state button. FetchHDFS:- Then use FetchHDFS processor and leave that with default configs as this processor gets attributes ${path}/${filename} from ListHDFS processor. in this processor fetches actual data from HDFS as list hdfs processor only lists the files that are changed in last 15 mins. In addition, this way after ListHDFS processor you can use Site-to-site processor, S2S will distributes the work across the cluster and use FetchHDFS we can do actual fetching the data. Hope this helps..!!
... View more
09-27-2017
02:41 AM
1 Kudo
@Simran Kaur FlowScreenshot:- This flow demonstrates how to do things with both method1 and method2. in place of update attributes in the screenshot you can use PutHDFS,PutS3..etc. Keep in mind don't connect all the available relations(like success,failure,matched,unmatched,etc) to the next processor, take the above screenshot as reference just connect the required relations to next processor and for other relations that are not shown in this screenshot Auto terminate them in processor Settings.
... View more
09-27-2017
02:22 AM
2 Kudos
@Simran Kaur, you can do this in 2 methods 1. by using GenerateTableFetch Processor 2. by using QueryDatabaseTable Processor Method1:- GenerateTableFetch processor gives incremental sql statements based on partition size In my configuration i have given PartitionSize 10000 so the output of the table fetch processor will be SELECT * FROM Table_Name ORDER BY Incremental_column_name LIMIT 10000
If you are using NiFi 1.3 then the Fetch statements would be in correct syntax as i'm using 1.1 i don't have any DatabaseType property i need to select only Generic it will result the above fetch statements. i need to use replace text processor to correct these fetch statements before passing statements to ExecuteSql processor. But in new versions we are having Specific Database Type which will generates correct syntax fetch statements, you dont need to use replace text processors. Just connect Success relation of Tablefetch processor to ExecuteSQL. ExecuteSQL Processor:- Drag and drop an executesql processor and don't mention any sql statements in it, as this processor executes all the sql statements that got generated by GenerateTableFetch processor. In nifi every executesql,querydatabase processor results default format is avro. Method2:- QueryDatabaseTable Processor does the both two steps(generating table fetch and executing sql) and stores the state as GenerateTableFetch processor does and as output we will have data in Avro format(default in NiFi) Configs:- As a result from both the methods above we will have Avro format data and having not more than 10000(we mentioned in Processor) records in them. We cannot extract text from Avro format so we need to convert avro format to Json. ConvertAvroToJSON processor:- in new version of NiFi there is Convert record processor which will convert Avro format data to CSV directly but in my version i dont have this processor. This processor will converts all the avro to json data once we get json data then you can store the json data directly to HDFS... etc. If you are thinking to store the data as text format not as Json then you need to Split the json array as individual records Split Array using SplitJson Processor:- We are having array of json message so use splitjson processor and property for JsonPath Expression as $.* It will splits the json array to individual records so each flowfile will have one record data in it. Input to SplitJson:-
[{"id":1,"first_name":"Jeanette","last_name":"Penddreth","email":"jpenddreth0@census.gov","gender":"Female"},{"id":2,"first_name":"Jte","last_name":"ght","email":"ddreth0@census.gov","gender":"male"}] Output:- Flowfile1:- {"id":1,"first_name":"Jeanette","last_name":"Penddreth","email":"jpenddreth0@census.gov","gender":"Female"} flowfile2:- {"id":2,"first_name":"Jte","last_name":"ght","email":"ddreth0@census.gov","gender":"male"} Once we splitted all the json array then use EvaluateJsonPath processor configs:- use the Destination property to flowfile-attribute,then add the list required of attributes that you need to be extracted as attributes from the content. in this screenshot i am extracting all the contents as attributes to the flow file, but you can extract only the required content values as attributes to the flow file. Use ReplaceText Processor:- in this processor keep all the attributes in it to extract only the values of the attributes. we are having all the listed attributes below as flowfile attributes so i am extracting the values from flowfile. change the Replacement Strategy property to AlwaysReplace. Output from this processor:- Flowfile1 Content:- 1,Jeanette,Penddreth,jpenddreth0@census.gov,Female Right now we dont have any json data, if you want to merge the contents of flow files then use MergeContent processor :- Configure this processor to how many flowfiles you want to merge, increase Minimum Group Size to some number Default it is 0 B then this processor will wait to reach Minimum group size and merges all the contents flowfiles into one. ***Note:-*** MergeContent causes memory issues follow instructions in the below link, there are lot of questions on community you can refer to them before configuring this processor https://community.hortonworks.com/questions/64337/apache-nifi-merge-content.html * i have reached max attachements for this answer,i'm unable to attach the flow screenshot,i'm going to attach flow ss in comments*. Hope this Helps...!
... View more
09-27-2017
12:48 AM
1 Kudo
Hi @Sanaz Janbakhsh, Sure, if you are following Method1 then use AttributesToJson processor as we are having all the json message data as attributes of the flowfile. In AttributesList property give all the list of attributes that you are thinking to make them as json message. AttributesToJson processor configs:- once you have given all the required attributes in Attributes List property then the success relation of the processor to ConvertJSONToSQL processor. The output of AttributesToJson should be a flatten json message as this processor converts all the attributes that are listed to a flatten json message. Once we got flatten json message then ConvertJSONtoSQL processor will prepare INSERT/UPDATE statements to it. Output:-
{ "nodeName":"XXXX", "fPort":"1", "data":"IA==", "fCnt":"7", "ri_loRaSNR":"XXX", "devEUI":"XX", "ri_time":"2017-07-19T21:39:35Z", "ri_name":"SaroshGateway", "dRbandwidth":"125", "ri_latitude":"51.0216123", "dRmodulation":"LORA", "dRspreadFactor":"10", "ri_altitude":"0", "tI-frequency":"902300000", "tI-codeRate":"4/5", "tI-adr":"true", "ri_mac":"647fdafffe00416b", "applicationID":"9", "ri_rssi":"-1", "applicationName":"Sensors", "ri_longitude":"-114.039802" } *validate your attributes to json processor output with above output message. If you are following Method2:- let's have a look on replace text processor configs, as i attached screenshots in the above answer to prepare INSERT/UPDATE statements and replacing all the attribute values in sql statements.
... View more
09-26-2017
06:58 PM
Hi @sally sally, if you are extracting only one value to attribute then its easy to use ExtractText processor:- by adding new property to it by adding regex like below. <count>(.*)<\/count> ExtractText Processor configs:- This regex only captures the value in <count></count> message and adds an attribute count to the flowfile.
... View more
09-26-2017
02:53 AM
@sally sally Route on content is to match the contents of ff we cannot compare attribute inside this processor. But we can acheive by using RouteOnAttribute Processor:- as we are having ${message.body} as attribute then add a property ${message.body:contains('<person>')} it will look for message.body attribute contains <person> or not. if yes then it routes to person relation and if ff not having <person> it will routes to unmatched relation. RouteOnContent Processor:- As you are having $message.body as attribute we are not able to use RouteOnContent processor, because it will look the content of ff to find match. E.g:-How RouteOnContent works? Input:- <?xml version="1.0" encoding="UTF-8"?>
<note>
<person>Tove</to>
<from>Jani</from>
<heading>Reminder</heading>
<body>Don't forget me this weekend!</body>
</note> If we are having the above input as the ff content then in Route on content processor we need to give property as It will look for is the content of ff having <person> or not In our case it is yes then it will route to person relation.
... View more
09-26-2017
01:55 AM
2 Kudos
Hi @Sanaz Janbakhsh Prepare
INSERT/UPDATE statements:- As you
are having array inside your json message,if you have array in your json file
then you need to flatten out json message before ConvertJSONToSQL processor as
it expects flat json message to prepare INSERT/UPDATE statements.I tried
with your data There
are 2 ways to prepare INSERT/UPDATE statements for PutSQL processor. Method1:-
Using JSONToSQL to prepare INSERT/UPDATE:- Input:- {
"applicationID": "9",
"applicationName": "Sensors",
"nodeName": "XXXX",
"devEUI": "XX",
"rxInfo": [
{
"mac": "647fdafffe00416b",
"time": "2017-07-19T21:39:35Z",
"rssi": -1,
"loRaSNR": XXX,
"name": "SaroshGateway",
"latitude": 51.0216123,
"longitude": -114.039802,
"altitude": 0
}
],
"txInfo": {
"frequency": 902300000,
"dataRate": {
"modulation": "LORA",
"bandwidth": 125,
"spreadFactor": 10
},
"adr": true,
"codeRate": "4/5"
},
"fCnt": 7,
"fPort": 1,
"data": "IA=="
} Flattening Base
Level Json data:- First we need to flatten out the base fields which are not
in Array. To flatten Json message in nifi we are having JoltTransformJSON
processor which will do flattening the json file. You can test out flattening json on http://jolt-demo.appspot.com Use the Jolt Specification below to flatten the base
fields:- [
{
"operation": "shift",
"spec": {
"*": "&",
"txInfo": {
"*": "tI-&",
"dataRate": {
"*": "dR&"
}
}
}
}
]
Output:- {
"applicationID": "9",
"applicationName": "Sensors",
"nodeName": "XXXX",
"devEUI": "XX",
"rxInfo": [{
"mac": "647fdafffe00416b",
"time": "2017-07-19T21:39:35Z",
"rssi": -1,
"loRaSNR": "XXX",
"name": "SaroshGateway",
"latitude": 51.0216123,
"longitude": -114.039802,
"altitude": 0
}],
"tI-frequency": 902300000,--there is tI before frequency because in jolt i have given "*": "tI-&"
"dRmodulation": "LORA",--same with dR also i have given "*": "dR&"
"dRbandwidth": 125,
"dRspreadFactor": 10,
"tI-adr": true,
"tI-codeRate": "4/5",
"fCnt": 7,
"fPort": 1,
"data": "IA=="
} In this step we have flatten all the base fields in json
message except array. jolt configs:- Extract Attributes
for Flatten Json Fileds using EvaluateJsonPath:- In this step use the Destination property as flowfile-attribute so that we can
extract the content of the json message to attributes. Give all the attribute names which are not part of array "rxInfo": [{
"mac": "647fdafffe00416b",
"time": "2017-07-19T21:39:35Z",
"rssi": -1,
"loRaSNR": "XXX",
"name": "SaroshGateway",
"latitude": 51.0216123,
"longitude": -114.039802,
"altitude": 0
}] For all the other data in json message add properties to the
by clicking + symbol on top right Add properties like below applicationID as $. applicationID Do
the same thing for all the other attributes except for array elements. EvaluateJson configs:- Split Array using
SplitJson Processor:- We are having array which we haven’t transformed yet so use
splitjson processor and property for JsonPath Expression as $.rxInfo. It will splits the json array
and flatten json message we are having as a flowfile now. Once we got flatten json message extract all the values of
message to attributes by using evaluate json path processor as we did before. AttributesToJSON
Processor: For every flowfile we are having all your json message data
as attributes now so use Attributestojson processor to get all the attributes
into json message as this processor will result flatten json message. In AttributesList property give all the list of attributes
that you are thinking to make them as json message. nodeName,fPort,data,fCnt,devEUI,ri_altitude,tI-frequency,applicationID,applicationName,ri_loRaSNR,ri_time,ri_name,dRbandwidth,ri_latitude,dRmodulation,dRspreadFactor,tI-codeRate,tI-adr,ri_mac,ri_rssi,ri_longitude Output:- {
"nodeName": "XXXX",
"fPort": "1",
"data": "IA==",
"fCnt": "7",
"ri_loRaSNR": "XXX",
"devEUI": "XX",
"ri_time": "2017-07-19T21:39:35Z",
"ri_name": "SaroshGateway",
"dRbandwidth": "125",
"ri_latitude": "51.0216123",
"dRmodulation": "LORA",
"dRspreadFactor": "10",
"ri_altitude": "0",
"tI-frequency": "902300000",
"tI-codeRate": "4/5",
"tI-adr": "true",
"ri_mac": "647fdafffe00416b",
"applicationID": "9",
"ri_rssi": "-1",
"applicationName": "Sensors",
"ri_longitude": "-114.039802"
} ConvertJSONToSQL
processor:- Right now we are having flatten json message we can make use JSONToSQL processor to prepare INSERT/UPDATE statements, as
keep in mind I have renamed the array and some other message data you can
change the attribute value in evaluate json path (or) in jolt transform
processors. Method2:- Replace
text to prepare INSERT/UPDATE statements As my flow screenshot shows Use Replace text processor after you got all the message
contents as flowfile attributes(i.e after Extract Attributes for Json Array
data) processor Replace text Processor Configs:- We are have all the contents of json message to attributes
for each flow file so use those attributes inside replace text processor and
change the following properties Replacement Value property to INSERT INTO table_name (nodeName,fPort,....) VALUES (${nodeName},${fPort},....) Here we are preparing insert sql statement and replacing with attribute values as ${nodeName}
returns the value associated with the flowfile. Output:- INSERT INTO table_name (nodeName,fPort,....) VALUES (XXXX,
1,....) ReplaceText Config:- Right now we have prepared insert and update statements and
replaced all the attributes with values associated with the ff. PutSQL:- From both methods will prepare INSERT/UPDATE statements then use PutSQL processor and Configure the connection pool and auto terminate
success message. Flow screenshot:- Take reference as my screenshot and make sure you have given Correct Relations(like success,split,matched...) to the next processors. Hope this helps..!
... View more
09-24-2017
11:42 PM
1 Kudo
@Shailesh Nookala as you have given Inferavroschema processor Success relation to convertAvrotoJson processor then you need to Except success relation Auto terminate the following relations failure,original,unsupported content i.e click on the check boxes for these 3 relations. Make sure you haven't selected success relation. Click on Apply button why we haven't Autoterminated Success Relation? Reason is we have connected success relation to convertavrotojson processor. Once you are done with this do the same steps for all the other processors auto terminate all the relations Except the relation that you have connected to the next processor. For the final processor(PutSQL) Auto Terminate success and reconnect failure and retry to the same processor.
... View more
09-24-2017
12:23 PM
@sally sally, i think if you need to merge files based on filename then you have to use those many merge content processors(e.g if 100 filenames you need to have 100 merge contents). Can you please share more details about your merging strategy on what basis you are merging the flow files is it based on size(or) some thing else? and can also share us the configs that you are using now to merge content based on filename?
... View more