Member since
06-08-2017
1049
Posts
510
Kudos Received
312
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
5674 | 04-15-2020 05:01 PM | |
2969 | 10-15-2019 08:12 PM | |
1171 | 10-12-2019 08:29 PM | |
6098 | 09-21-2019 10:04 AM | |
1998 | 09-19-2019 07:11 AM |
09-30-2017
08:51 PM
Hi @Simon Jespersen, you can use extract text processor and get status attribute to ff and then convert csvtoavro then use routeonattribute processor to split the data into 2 routes. GetFile----->SplitText----->ExtractText---->InferAvroSchema----->ConvertCSVToAvro----->RouteOnAttribute---->(status=6),(status=7) SplitText Processor:- we need to extract status value as attribute for this purpose we need to split our file into each record in to seperate flowfile. so that the input to ExtractText processor would be one record as ff. Connect the splits relation to ExtactText processor. Input:- id=10,age=10,status=6,salary=90000
id=11,age=11,status=7,salary=100000
id=12,age=12,status=8,salary=110000 output:- We mentioned in processor configs LineSplitCount as 1 output of splittext splits the file into individual records as one record per flowfile. ff1:- id=10,age=10,status=6,salary=90000 ff2:- id=11,age=11,status=7,salary=100000 ff3:- id=12,age=12,status=8,salary=110000 ExtractText Processor:- Evaluates one or more Regular Expressions against the content of a FlowFile. The results of those Regular Expressions are assigned to FlowFile Attributes. We need to extract status value from ff content as attribute of ff by using Regex. Config:- Regex Property add a new property to extract status value as attribute to the flowfile status=(\d*) In this processor we have extracted the status value as attribute to every flowfile. Then use InferAvroSchema processor and ConvertAvrotoJSON and then use RouteOnAttribute to split the flowfiles by adding below properties. status=6 ${status:equals("6")} status=7 ${status:equals("7")} Then make use of these two properties to connect to another processors Flow:- Once make sure in your flow that you have connected only the exact same relations that are in screenshot to the next processors. Hope this will helps ...!!!
... View more
09-29-2017
10:57 PM
1 Kudo
@manisha jain, In process group there are ExecuteProcess and Output port once you will push the data out from output port. We need to get this data that got pushed out from output port by using 1.Processor 2.Another Processor Group 3.Remote Processor Group Output port not having capability to transmit data outside of process group, we need to use Output port only inside of Process Group to push data out of PG. Here i have used output port inside Process Group1 and connected Output port to 1. UpdateAttribute Processor you can use any other processors that will allows incoming connections in place of Update Attribute Processor. 2. Another Processor Group In this group you need to have input port that gets data from Output port in PG1. Processors in PG2 :- We need to use Input port inside PG2 to get the data from Output port. 3. RemoteProcessorGroup This is the only ways that you can get the data that got pushed from output port, if the output (or) input ports are not inside the PG then it will not transmit or receive any data.
... View more
09-29-2017
08:26 PM
@manisha jain, Output port only transfers the data from a process group to Outside processors(or) processor groups. Can you add an Processor Group and use these two processors inside Processor Group then you can enable transmission from Output port. Flow:- Inside the processor group keep all your processors.
... View more
09-29-2017
03:45 PM
Yacine Belhoul, As per Apache Hadoop model we cannot create directories if the filename having / or : characters in it. HDFS Path elements MUST NOT contain the characters {'/', ':'} The only way is to replace (:) with %3A hadoop fs -mkdir /2017-09-28\ \12%3A00%3A09.0 If you do dynamic partition by time stamp field also hive stored these colon(:) replace with %3A hadoop directories. My dynamic partition time stamp columns are:- 2011-07-07 02:04:51.0 2011-07-07 02:04:52.0 2013-01-30 08:27:16.0 once i'm done with creating dynamic partitions for the table, if i list out the directories they are replace with %3A in place of (:) Hadoop directories for dynamic partitions: /apps/hive/warehouse/test_fac/dat=2011-07-07 02%3A04%3A51.0
/apps/hive/warehouse/test_fac/dat=2011-07-07 02%3A04%3A52.0
/apps/hive/warehouse/test_fac/dat=2013-01-30 08%3A27%3A16.0 Show partitions for dynamic partitioned table:- if you list the partitions that are in table hive shows those partitions with %3A as a replacement for colon(:) show partitions test_fac;
+--------------------------------+--+
| partition |
+--------------------------------+--+
| dat=2011-07-07 02%3A04%3A51.0 |
| dat=2011-07-07 02%3A04%3A52.0 |
| dat=2013-01-30 08%3A27%3A16.0 |
+--------------------------------+--+ I tried to add paritition to the table alter table test_fac add partition(dat='2017-09-29 90:00:00'); still it replace colon(:) with %3A. show partitions test_fac;
+--------------------------------+--+
| partition |
+--------------------------------+--+
| dat=2017-09-29 90%3A00%3A00 |
+--------------------------------+--+ but in local file system we can create directories with colon(:) characters in them Example:- [~]$ mkdir 2017-09-28\ 12\:00\:09
[~]$ ls
[~]$ 2017-09-28 12:00:09
... View more
09-29-2017
02:30 AM
Hi @Kiem Nguyen, You can use Wait processor if you are using NiFi 1.2. You can refer to the below links to find the solutions for the similar kind of issue:- https://community.hortonworks.com/questions/89048/how-can-i-wait-on-two-processors-in-nifi.html http://ijokarumawak.github.io/nifi/2017/02/02/nifi-notify-batch/ https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.3.0/org.apache.nifi.processors.standard.Wait/index.html https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.3.0/org.apache.nifi.processors.standard.Notify/index.html
... View more
09-29-2017
01:57 AM
@sally sally, it looks like complicated logic and i think there is no way we can delete only the first 10 flow files unless if you name them as appropriately to find them uniquely before filtering them in RouteonAttribute.
... View more
09-28-2017
11:06 PM
Hi @Sanaz Janbakhsh, for sure this behaviour is because of your Destination property in AttributestoJson processor default will be flowfile-attribute but we need to change that property to flowfile-content Attributestojson configs Screnshot:- change the below highlighted property you are using flowfile-attribute that means this processor keeps all the list of attributes to ff attributes but we need all the attributes needs to be in flowfile-content. Hope this Helps....!!
... View more
09-28-2017
01:25 AM
Adrian Oprea, Flow:-
... View more
09-28-2017
01:23 AM
1 Kudo
Adrian Oprea, Hi i have the same input json as you in generate flow file processor to test the entire flow, {
"ARTGEntryJsonResult": {
"AnnualChargeExemptWaverDate": "null",
"Conditions": [
""
],
"ConsumerInformation": {
"DocumentLink": ""
},
"EntryType": "Medicine",
"LicenceClass": "",
"LicenceId": "152567"
},
"Products": [
{
"AdditionalInformation": [],
"Components": [
{
"DosageForm": "Drug delivery system, transdermal",
"RouteOfAdministration": "Transdermal",
"VisualIdentification": "Dull, homogenous"
}
],
"Containers": [
{
"Closure": "",
"Conditions": [
"Store at room temperature"
],
"LifeTime": "2 Years",
"Material": null,
"Temperature": "Store below 25 degrees Celsius",
"Type": "Sachet"
}
],
"EffectiveDate": "2017-09-18",
"GMDNCode": "",
"GMDNTerm": "",
"Ingredients": [
{
"Name": "Fentanyl",
"Strength": "6.3000 mg"
}
],
"Name": "FENTANYL SANDOZ ",
"Packs": [
{
"PoisonSchedule": "(S8) Controlled Drug",
"Size": "1"
},
{
"PoisonSchedule": "(S8) Controlled Drug",
"Size": "10"
},
{
"PoisonSchedule": "(S8) Controlled Drug",
"Size": "2"
},
{
"PoisonSchedule": "(S8) Controlled Drug",
"Size": "3"
},
{
"PoisonSchedule": "(S8) Controlled Drug",
"Size": "4"
},
{
"PoisonSchedule": "(S8) Controlled Drug",
"Size": "5"
},
{
"PoisonSchedule": "(S8) Controlled Drug",
"Size": "7"
},
{
"PoisonSchedule": "(S8) Controlled Drug",
"Size": "8"
}
],
"SpecificIndications": [
"Management of chronic pain requiring opioid analgesia."
],
"StandardIndications": [],
"Type": "Single Medicine Product",
"Warnings": []
}
]
} JoltTransformation:- i use this processor because we need to extract the name attribute from products array [
{
"operation": "shift",
"spec": {
"*": "&",
"Products": {
"*": "Products"
}
}
}
] after this processor we get output like below:- {
"ARTGEntryJsonResult" : {
"AnnualChargeExemptWaverDate" : "null",
"Conditions" : [ "" ],
"ConsumerInformation" : {
"DocumentLink" : ""
},
"EntryType" : "Medicine",
"LicenceClass" : "",
"LicenceId" : "152567"
},
"Products" : {
"AdditionalInformation" : [ ],
"Components" : [ {
"DosageForm" : "Drug delivery system, transdermal",
"RouteOfAdministration" : "Transdermal",
"VisualIdentification" : "Dull, homogenous"
} ],
"Containers" : [ {
"Closure" : "",
"Conditions" : [ "Store at room temperature" ],
"LifeTime" : "2 Years",
"Material" : null,
"Temperature" : "Store below 25 degrees Celsius",
"Type" : "Sachet"
} ],
"EffectiveDate" : "2017-09-18",
"GMDNCode" : "",
"GMDNTerm" : "",
"Ingredients" : [ {
"Name" : "Fentanyl",
"Strength" : "6.3000 mg"
} ],
"Name" : "FENTANYL SANDOZ ",
"Packs" : [ {
"PoisonSchedule" : "(S8) Controlled Drug",
"Size" : "1"
}, {
"PoisonSchedule" : "(S8) Controlled Drug",
"Size" : "10"
}, {
"PoisonSchedule" : "(S8) Controlled Drug",
"Size" : "2"
}, {
"PoisonSchedule" : "(S8) Controlled Drug",
"Size" : "3"
}, {
"PoisonSchedule" : "(S8) Controlled Drug",
"Size" : "4"
}, {
"PoisonSchedule" : "(S8) Controlled Drug",
"Size" : "5"
}, {
"PoisonSchedule" : "(S8) Controlled Drug",
"Size" : "7"
}, {
"PoisonSchedule" : "(S8) Controlled Drug",
"Size" : "8"
} ],
"SpecificIndications" : [ "Management of chronic pain requiring opioid analgesia." ],
"StandardIndications" : [ ],
"Type" : "Single Medicine Product",
"Warnings" : [ ]
}
} without product as an array, now its easy to get Name attribute from the json message. Configs for jolt:- Evaluate Json path expression:- to extract licenseid and name attributes from the content i added the below properties as licenceid as $.ARTGEntryJsonResult.LicenceId
name as $.Products.Name EJ configs:- Split Packs Array using splitjson processor:- change JsonPath Expression property to $.Products.Packs once you split the packs array then the every message in array will be one seperate flow file, as in my input we are having 8 messages in packs array so we get 8 flowfiles having licenceid,name attributes associated with each flowfile. Splijson Config:- Extract PoisonSchedule,Size using evaluatejson path:- now we need to extract all the contents of flowfile as attributes as we are having {
"PoisonSchedule" : "(S8) Controlled Drug",
"Size" : "1"
} so we need to add 2 properties in processor and change Destination property to flowfile-attribute PoisonSchedule as $.PoisonSchedule
Size as $.Size Configs:- Right now we are having all the desired contents of message as attributes we can use replace text processor(if you need output as text) (or) attributestojson processor(if you want json message) ReplaceText Processor:- we are having all the list of attributes ${licenseid},${name},${PoisonSchedule},${Size} here i kept , as seperator you can mention what ever you like, so keep them in Replacement Value property and change Replacement strategy to AlwaysReplace. Output:- flowfile1:- 152567,FENTANYL SANDOZ ,(S8) Controlled Drug,4 flowfile2:- 152567,FENTANYL SANDOZ ,(S8) Controlled Drug,3 Configs Replacetext:- AttributesToJSON:- if you want to convert the results as json documents then use this processor and in attributes list keep property as licenseid,name,PoisonSchedule,Size it will converts the attributes as json message Output:- {"Size":"10","PoisonSchedule":"(S8) Controlled Drug","name":"FENTANYL SANDOZ ","licenseid":"152567"} if you want to merge these flowfiles together,use Mergecontent Processor and change the properties as for your requirements. Flow Screenshot:- is attached in comments Hope this Helps...!!
... View more
09-27-2017
09:12 PM
1 Kudo
@Foivos A, New_Flow:- in this screenshot i have used GetHDFS processor but you can replace this with ListHDFS--> FetchHDFS --> PutFTP(files without parsing)--> UpdateAttribute(files that got parsed) --> PutFtp You have got all the listed files in the HDFS directory then use PutFTP processor and keep the Success relation of first PutFTP processor to next UpdateAttribute processor Here we i have given failure and rejected to loop it back to the same PutFTP processor, so the files that got stored into remote location only we are processing from First PutFTP processor. All the files that got rejected or failed to store into remote location are routing back to retry one more time with the same processor. Only the files that got successfully stored into remote location will be routed to Update Attribute processor, you can parse and change the names of the files before storing to RemoteLocation(in second PutFTP) processor. In this method we only process the files that got successfully stored into Remote Location by using First PutFTP processor(as we are using only success from FirstPutFTP to updateattribute).For all the other relations like rejected or failed you can auto terminate or you can retry them to store. Note:- we have looped failure and retry to same processor if some files are in this relations they will retry to same processor which will keep more load on the cluster.
... View more
09-27-2017
03:20 PM
1 Kudo
@Foivos A, Generate flowfile processor won't accept any incoming connections, Use one GetHDFS processor and connect same success relation to Two-PutFTP processor and keep the same file name. The next success relation give to UpdateAttribute in that add another property to change file name of the flowfile. First PutFTP is storing the HDFS files, storing as is into your remote location Second PutFTP parsing the HDFS files and changing the name and storing to your remote location. UpdateAttribute:- Add new property to update attribute as below ${filename}.parsed This update attribute changes the filename to samefilename(as we are having before) with .parsed as the extension to the filename by using this method you can easily find which file has been parsed and which is not parsed. UpdataAttribute Configs:- Just Remainder GetHDFS processor by default deletes the HDFS files once it get those files, if you dont want to delete the files then change the property Keep source files to True(by default this property is set to false). it's better to use List and Fetch HDFS processors to fetch the files from hdfs as listHDFS processor stores the state in it, it will catches the changes in the directory by it self. ***FYI*** if you are thinking to use ListHDFS and FetchHDFS processors then use the below link i have explained how to configure them https://community.hortonworks.com/questions/139391/hello-all-what-are-the-differences-between-service.html?childToView=139415#answer-139415 **** Flow:- Hope this will helps ...!!!
... View more
09-27-2017
01:16 PM
1 Kudo
Hi @Xtr yarhid, In Apache NiFi, Controller Services are shared services that can be used by Processors, i.e let's take if you are thinking to get the data or store the data to Hbase Hive tables then these processors need Hive,Hbase controller services First we needs to enable there services and then use them in the processors. Coming back to your question, Use ListHDFS processor this processor will store the state and run this processor for 15 mins using cron (or) timer driven if there is no changes made to the directory (or) file it won't list the flowfile, If there is any change in the directory or file then this processor gives only the new file that got changed in the directory and updates the state of processor with new file created timestamp Configure the directory property. In this way ListHDFS processor gives an flowfile with path and filename attributes which are used by FetchHDFS processor to fetch the data from HDFS directory. This processor won't do any fetching of files it will do just listing all the available files in the directory and FetchHDFS processor will do actual fetching of files. ListHDFS Configs:- in this processor i have given /user/yashu/del_test as the directory property and this process runs for every 900 sec on Timer driven, for the first time this processor lists all the files that are in del_test directory and filename,path will attributes with the flow file(if you are having 2 files in the directory then there will be 2 flowfiles each ff will have file name and path attribute to it). if you want to see the state in ListHDFS processor right click on processor and click on view state button. FetchHDFS:- Then use FetchHDFS processor and leave that with default configs as this processor gets attributes ${path}/${filename} from ListHDFS processor. in this processor fetches actual data from HDFS as list hdfs processor only lists the files that are changed in last 15 mins. In addition, this way after ListHDFS processor you can use Site-to-site processor, S2S will distributes the work across the cluster and use FetchHDFS we can do actual fetching the data. Hope this helps..!!
... View more
09-27-2017
02:41 AM
1 Kudo
@Simran Kaur FlowScreenshot:- This flow demonstrates how to do things with both method1 and method2. in place of update attributes in the screenshot you can use PutHDFS,PutS3..etc. Keep in mind don't connect all the available relations(like success,failure,matched,unmatched,etc) to the next processor, take the above screenshot as reference just connect the required relations to next processor and for other relations that are not shown in this screenshot Auto terminate them in processor Settings.
... View more
09-27-2017
02:22 AM
2 Kudos
@Simran Kaur, you can do this in 2 methods 1. by using GenerateTableFetch Processor 2. by using QueryDatabaseTable Processor Method1:- GenerateTableFetch processor gives incremental sql statements based on partition size In my configuration i have given PartitionSize 10000 so the output of the table fetch processor will be SELECT * FROM Table_Name ORDER BY Incremental_column_name LIMIT 10000
If you are using NiFi 1.3 then the Fetch statements would be in correct syntax as i'm using 1.1 i don't have any DatabaseType property i need to select only Generic it will result the above fetch statements. i need to use replace text processor to correct these fetch statements before passing statements to ExecuteSql processor. But in new versions we are having Specific Database Type which will generates correct syntax fetch statements, you dont need to use replace text processors. Just connect Success relation of Tablefetch processor to ExecuteSQL. ExecuteSQL Processor:- Drag and drop an executesql processor and don't mention any sql statements in it, as this processor executes all the sql statements that got generated by GenerateTableFetch processor. In nifi every executesql,querydatabase processor results default format is avro. Method2:- QueryDatabaseTable Processor does the both two steps(generating table fetch and executing sql) and stores the state as GenerateTableFetch processor does and as output we will have data in Avro format(default in NiFi) Configs:- As a result from both the methods above we will have Avro format data and having not more than 10000(we mentioned in Processor) records in them. We cannot extract text from Avro format so we need to convert avro format to Json. ConvertAvroToJSON processor:- in new version of NiFi there is Convert record processor which will convert Avro format data to CSV directly but in my version i dont have this processor. This processor will converts all the avro to json data once we get json data then you can store the json data directly to HDFS... etc. If you are thinking to store the data as text format not as Json then you need to Split the json array as individual records Split Array using SplitJson Processor:- We are having array of json message so use splitjson processor and property for JsonPath Expression as $.* It will splits the json array to individual records so each flowfile will have one record data in it. Input to SplitJson:-
[{"id":1,"first_name":"Jeanette","last_name":"Penddreth","email":"jpenddreth0@census.gov","gender":"Female"},{"id":2,"first_name":"Jte","last_name":"ght","email":"ddreth0@census.gov","gender":"male"}] Output:- Flowfile1:- {"id":1,"first_name":"Jeanette","last_name":"Penddreth","email":"jpenddreth0@census.gov","gender":"Female"} flowfile2:- {"id":2,"first_name":"Jte","last_name":"ght","email":"ddreth0@census.gov","gender":"male"} Once we splitted all the json array then use EvaluateJsonPath processor configs:- use the Destination property to flowfile-attribute,then add the list required of attributes that you need to be extracted as attributes from the content. in this screenshot i am extracting all the contents as attributes to the flow file, but you can extract only the required content values as attributes to the flow file. Use ReplaceText Processor:- in this processor keep all the attributes in it to extract only the values of the attributes. we are having all the listed attributes below as flowfile attributes so i am extracting the values from flowfile. change the Replacement Strategy property to AlwaysReplace. Output from this processor:- Flowfile1 Content:- 1,Jeanette,Penddreth,jpenddreth0@census.gov,Female Right now we dont have any json data, if you want to merge the contents of flow files then use MergeContent processor :- Configure this processor to how many flowfiles you want to merge, increase Minimum Group Size to some number Default it is 0 B then this processor will wait to reach Minimum group size and merges all the contents flowfiles into one. ***Note:-*** MergeContent causes memory issues follow instructions in the below link, there are lot of questions on community you can refer to them before configuring this processor https://community.hortonworks.com/questions/64337/apache-nifi-merge-content.html * i have reached max attachements for this answer,i'm unable to attach the flow screenshot,i'm going to attach flow ss in comments*. Hope this Helps...!
... View more
09-27-2017
01:03 AM
@Simran Kaur, if the flowfiles are overwriting then you need to use object key as ${UUID()} instead ${filename}. This UUID is uniquely generated you will not have overwriting issues anymore.
... View more
09-27-2017
12:48 AM
1 Kudo
Hi @Sanaz Janbakhsh, Sure, if you are following Method1 then use AttributesToJson processor as we are having all the json message data as attributes of the flowfile. In AttributesList property give all the list of attributes that you are thinking to make them as json message. AttributesToJson processor configs:- once you have given all the required attributes in Attributes List property then the success relation of the processor to ConvertJSONToSQL processor. The output of AttributesToJson should be a flatten json message as this processor converts all the attributes that are listed to a flatten json message. Once we got flatten json message then ConvertJSONtoSQL processor will prepare INSERT/UPDATE statements to it. Output:-
{ "nodeName":"XXXX", "fPort":"1", "data":"IA==", "fCnt":"7", "ri_loRaSNR":"XXX", "devEUI":"XX", "ri_time":"2017-07-19T21:39:35Z", "ri_name":"SaroshGateway", "dRbandwidth":"125", "ri_latitude":"51.0216123", "dRmodulation":"LORA", "dRspreadFactor":"10", "ri_altitude":"0", "tI-frequency":"902300000", "tI-codeRate":"4/5", "tI-adr":"true", "ri_mac":"647fdafffe00416b", "applicationID":"9", "ri_rssi":"-1", "applicationName":"Sensors", "ri_longitude":"-114.039802" } *validate your attributes to json processor output with above output message. If you are following Method2:- let's have a look on replace text processor configs, as i attached screenshots in the above answer to prepare INSERT/UPDATE statements and replacing all the attribute values in sql statements.
... View more
09-26-2017
10:20 PM
@sally sally, We can do that by using PutHDFS processor before that Can you give me more details about how you are going to delete (or) update ff in directories.. Lets assume you are having one file already exists in directory1 with same file name as ff Are you going to Delete that file? (or) Else you are going to Update the file with new contents of ff? My question is how you are detecting which flowfile to Delete and which flowfile to Update?. Give me your logic to Delete (or) Update flowfile in directories(1,2) so that i can help you..!!
... View more
09-26-2017
06:58 PM
Hi @sally sally, if you are extracting only one value to attribute then its easy to use ExtractText processor:- by adding new property to it by adding regex like below. <count>(.*)<\/count> ExtractText Processor configs:- This regex only captures the value in <count></count> message and adds an attribute count to the flowfile.
... View more
09-26-2017
02:53 AM
@sally sally Route on content is to match the contents of ff we cannot compare attribute inside this processor. But we can acheive by using RouteOnAttribute Processor:- as we are having ${message.body} as attribute then add a property ${message.body:contains('<person>')} it will look for message.body attribute contains <person> or not. if yes then it routes to person relation and if ff not having <person> it will routes to unmatched relation. RouteOnContent Processor:- As you are having $message.body as attribute we are not able to use RouteOnContent processor, because it will look the content of ff to find match. E.g:-How RouteOnContent works? Input:- <?xml version="1.0" encoding="UTF-8"?>
<note>
<person>Tove</to>
<from>Jani</from>
<heading>Reminder</heading>
<body>Don't forget me this weekend!</body>
</note> If we are having the above input as the ff content then in Route on content processor we need to give property as It will look for is the content of ff having <person> or not In our case it is yes then it will route to person relation.
... View more
09-26-2017
01:55 AM
2 Kudos
Hi @Sanaz Janbakhsh Prepare
INSERT/UPDATE statements:- As you
are having array inside your json message,if you have array in your json file
then you need to flatten out json message before ConvertJSONToSQL processor as
it expects flat json message to prepare INSERT/UPDATE statements.I tried
with your data There
are 2 ways to prepare INSERT/UPDATE statements for PutSQL processor. Method1:-
Using JSONToSQL to prepare INSERT/UPDATE:- Input:- {
"applicationID": "9",
"applicationName": "Sensors",
"nodeName": "XXXX",
"devEUI": "XX",
"rxInfo": [
{
"mac": "647fdafffe00416b",
"time": "2017-07-19T21:39:35Z",
"rssi": -1,
"loRaSNR": XXX,
"name": "SaroshGateway",
"latitude": 51.0216123,
"longitude": -114.039802,
"altitude": 0
}
],
"txInfo": {
"frequency": 902300000,
"dataRate": {
"modulation": "LORA",
"bandwidth": 125,
"spreadFactor": 10
},
"adr": true,
"codeRate": "4/5"
},
"fCnt": 7,
"fPort": 1,
"data": "IA=="
} Flattening Base
Level Json data:- First we need to flatten out the base fields which are not
in Array. To flatten Json message in nifi we are having JoltTransformJSON
processor which will do flattening the json file. You can test out flattening json on http://jolt-demo.appspot.com Use the Jolt Specification below to flatten the base
fields:- [
{
"operation": "shift",
"spec": {
"*": "&",
"txInfo": {
"*": "tI-&",
"dataRate": {
"*": "dR&"
}
}
}
}
]
Output:- {
"applicationID": "9",
"applicationName": "Sensors",
"nodeName": "XXXX",
"devEUI": "XX",
"rxInfo": [{
"mac": "647fdafffe00416b",
"time": "2017-07-19T21:39:35Z",
"rssi": -1,
"loRaSNR": "XXX",
"name": "SaroshGateway",
"latitude": 51.0216123,
"longitude": -114.039802,
"altitude": 0
}],
"tI-frequency": 902300000,--there is tI before frequency because in jolt i have given "*": "tI-&"
"dRmodulation": "LORA",--same with dR also i have given "*": "dR&"
"dRbandwidth": 125,
"dRspreadFactor": 10,
"tI-adr": true,
"tI-codeRate": "4/5",
"fCnt": 7,
"fPort": 1,
"data": "IA=="
} In this step we have flatten all the base fields in json
message except array. jolt configs:- Extract Attributes
for Flatten Json Fileds using EvaluateJsonPath:- In this step use the Destination property as flowfile-attribute so that we can
extract the content of the json message to attributes. Give all the attribute names which are not part of array "rxInfo": [{
"mac": "647fdafffe00416b",
"time": "2017-07-19T21:39:35Z",
"rssi": -1,
"loRaSNR": "XXX",
"name": "SaroshGateway",
"latitude": 51.0216123,
"longitude": -114.039802,
"altitude": 0
}] For all the other data in json message add properties to the
by clicking + symbol on top right Add properties like below applicationID as $. applicationID Do
the same thing for all the other attributes except for array elements. EvaluateJson configs:- Split Array using
SplitJson Processor:- We are having array which we haven’t transformed yet so use
splitjson processor and property for JsonPath Expression as $.rxInfo. It will splits the json array
and flatten json message we are having as a flowfile now. Once we got flatten json message extract all the values of
message to attributes by using evaluate json path processor as we did before. AttributesToJSON
Processor: For every flowfile we are having all your json message data
as attributes now so use Attributestojson processor to get all the attributes
into json message as this processor will result flatten json message. In AttributesList property give all the list of attributes
that you are thinking to make them as json message. nodeName,fPort,data,fCnt,devEUI,ri_altitude,tI-frequency,applicationID,applicationName,ri_loRaSNR,ri_time,ri_name,dRbandwidth,ri_latitude,dRmodulation,dRspreadFactor,tI-codeRate,tI-adr,ri_mac,ri_rssi,ri_longitude Output:- {
"nodeName": "XXXX",
"fPort": "1",
"data": "IA==",
"fCnt": "7",
"ri_loRaSNR": "XXX",
"devEUI": "XX",
"ri_time": "2017-07-19T21:39:35Z",
"ri_name": "SaroshGateway",
"dRbandwidth": "125",
"ri_latitude": "51.0216123",
"dRmodulation": "LORA",
"dRspreadFactor": "10",
"ri_altitude": "0",
"tI-frequency": "902300000",
"tI-codeRate": "4/5",
"tI-adr": "true",
"ri_mac": "647fdafffe00416b",
"applicationID": "9",
"ri_rssi": "-1",
"applicationName": "Sensors",
"ri_longitude": "-114.039802"
} ConvertJSONToSQL
processor:- Right now we are having flatten json message we can make use JSONToSQL processor to prepare INSERT/UPDATE statements, as
keep in mind I have renamed the array and some other message data you can
change the attribute value in evaluate json path (or) in jolt transform
processors. Method2:- Replace
text to prepare INSERT/UPDATE statements As my flow screenshot shows Use Replace text processor after you got all the message
contents as flowfile attributes(i.e after Extract Attributes for Json Array
data) processor Replace text Processor Configs:- We are have all the contents of json message to attributes
for each flow file so use those attributes inside replace text processor and
change the following properties Replacement Value property to INSERT INTO table_name (nodeName,fPort,....) VALUES (${nodeName},${fPort},....) Here we are preparing insert sql statement and replacing with attribute values as ${nodeName}
returns the value associated with the flowfile. Output:- INSERT INTO table_name (nodeName,fPort,....) VALUES (XXXX,
1,....) ReplaceText Config:- Right now we have prepared insert and update statements and
replaced all the attributes with values associated with the ff. PutSQL:- From both methods will prepare INSERT/UPDATE statements then use PutSQL processor and Configure the connection pool and auto terminate
success message. Flow screenshot:- Take reference as my screenshot and make sure you have given Correct Relations(like success,split,matched...) to the next processors. Hope this helps..!
... View more
09-25-2017
08:12 PM
@sally sally If you want to delete the same flowfile from both folder1,folder2, we can do that in nifi by connecting success of one Delete HDFS processor to another Delte HDFS processor to delete fetched flowfile from both directories. Here is the Example that i tried:- both folder1,folder2 are having same 2 files as listed below. /user/yashu/folder2/part1.txt /user/yashu/folder2/part1_sed.txt so in my flow i'm fetching from folder2 Once i listed the files that are presented in folder2 directory Then use FetchHbase processor to fetch the files from folder2. Once you fetched the files from folder2 then i'm giving success relation to DeleteHDFS processor to delete same flies from folder1. As i have fetched from folder2 directory the ff will have attribute values for directory and filename so i'm using expression to get filename attributes from ff. Attributes in ff:- DeleteHDFS config for folder1:- Once i delete the files from folder1 then i connected success relation to another DeleteHDFS processor to delete same files from folder2. DeleteHDFS config for folder2:- So in our first DeleteHDFS processor we have deleted folder1 files and in second DeleteHDFS we have deleted folder2 files. Flow:-
... View more
09-25-2017
01:02 PM
Hi @sally sally GetHDFS processor won't store the state that means if you start and stop the processor it will fetches the files from Directory and deletes the files from HDFS, this is default behaviour of GetHDFS processor (or) if you don't want to delete the files then change Keep Source File property to true that is fetch the source file and keep the source file in HDFS directory. When the GetHDFS processor runs again it will fetches the same file because processor won't remember the fetched files. Use ListHDFS processor this processor will store the state and
if there is no changes made to the directory (or) file it won't list the flowfile, If there is any change in the directory or file then this processor gives only the file only the new file that got changed in the directory and updates the state of processor with new file created timestamp
Configure the directory property. In this way ListHDFS processor gives an flowfile with path and filename attributes which are used by FetchHDFS processor to fetch the data from HDFS directory. This processor won't do any fetching of files it will do just listing all the available files in the directory and FetchHDFS processor will do actual fetching of files. FetchHDFS:- Then use FetchHDFS processor and leave that with default configs as this processor gets attributes ${path}/${filename} from ListHDFS processor. Flow:-
In addition,
this way after ListHDFS processor you can use Site-to-site processor, S2S will distributes the work across the cluster and use FetchHDFS we can do actual fetching the data.
... View more
09-24-2017
11:42 PM
1 Kudo
@Shailesh Nookala as you have given Inferavroschema processor Success relation to convertAvrotoJson processor then you need to Except success relation Auto terminate the following relations failure,original,unsupported content i.e click on the check boxes for these 3 relations. Make sure you haven't selected success relation. Click on Apply button why we haven't Autoterminated Success Relation? Reason is we have connected success relation to convertavrotojson processor. Once you are done with this do the same steps for all the other processors auto terminate all the relations Except the relation that you have connected to the next processor. For the final processor(PutSQL) Auto Terminate success and reconnect failure and retry to the same processor.
... View more
09-24-2017
11:00 PM
1 Kudo
Hi @Mamta Chawla, You are having finpolicy1 table with 4(batchid,color,suit,ds(partition)) columns. For finpolicy2 table you have added another column so there are 5(batchid,color,suit,PIA,ds(partition)). both tables are pointing to same location on HDFS i.e /user/mamta.chawla/fin/avro with different schemas. when you are trying to insert data into finpolicy2 from text table with static partition value ds='Q1' it will create the below partition directory drwxr-xr-x - cloudera supergroup 0 2017-09-24 14:54 /user/mamta.chawla/fin/avro/ds=Q1 which having new finpolicy2(5) columns in it. Reason no record returned:- we have only inserted records from text table to finpolicy2 table, if you want to see the records in finpolicy1 table then do msck repair table finpolicy1;
(or)
alter table finpolicy1 add partition(ds='Q1') location '/user/mamta.chawla/fin/avro/ds=Q1'; Keep in mind as we are having 4 columns in finpolicy1 if you did insert into table finpolicy1 partition(ds='Q1') select batchid,color,suit from text_table; this means we are getting only batchid,color,suit from text table so we are missing PIA column msck repair table finpolicy2; the result for finpolicy2 will have NULL in PIA column. it's better to load data to finpolicy2 table as it is having all the columns that we are using for finpolicy1 insert into table finpolicy2 partition(ds='Q1') select * from text_table; then do msck repair table finpolicy1;
select * from finpolicy1; now the table finpolicy1 will have all the data for batchid,color,suit columns because those columns are available in finpolicy2 table.
... View more
09-24-2017
12:23 PM
@sally sally, i think if you need to merge files based on filename then you have to use those many merge content processors(e.g if 100 filenames you need to have 100 merge contents). Can you please share more details about your merging strategy on what basis you are merging the flow files is it based on size(or) some thing else? and can also share us the configs that you are using now to merge content based on filename?
... View more
09-24-2017
04:55 AM
Hi @Biswajit Chakrabort, As you are having rolling logs for daily, i tried to TailFile processor by using File(s) to Tail property as follows /my/path/directory/my-app-${now():format("yyyy-MM-dd")}.log The above expression looks for my-app-2017-09-24.log file in /my/path/directory and tails the file if the file is presented in that directory.
... View more
09-24-2017
02:36 AM
@Shailesh Nookala i don't think header is causing these issues as i tried the same way as you and its working as expected. But as i have observed in your flow you have connected all the available relations on each processor to the next processor, that's causing issues for you and i have seen PutFile processor instead of PutSQL in your flow. Are you storing the prepared records to file? i thought you are pushing the prepared records to SQL server, is my assumption is wrong?. Connect only the following Relationships to the Next Processors:- GetFile(Sucputsql.xmlcess)-->InferAvroSchema(Success)-->ConvertCSVToAvro(Success)-->ConvertAvroToJSON(Success)-->SplitJson(Split)-->EvaluateJsonPath(Matched)-->ReplaceText(Success)-->PutSQL(Autoterminate Success). I have attached my .xml file use this make changes to that as per you needs. putsql.xml input :- name,adresse,zip
1,hi,8000
InferAvroSchema configs:-
infer-avro.png Convertcsvtoavro configs:- csvtoavro.png Output:- {
"name" : 1,
"adresse" : "hi",
"zip" : 8000
}
... View more
09-22-2017
08:04 PM
@Ravi Koppula, I think you haven't taken off run schedules which you are have used to trigger ExecuteStreamCommand processors before adding GenerateFlofile processor. Take off all the run schedules and keep them to default as Timer driven and Run Schedule to 0 Sec (or) you can use Event Driven when ever there is flow file it will triggers the processor.
... View more
09-22-2017
06:35 PM
1 Kudo
Hi @Ravi Koppula, I don't think there is a way to schedule processor groups but there is a way to schedule them all of them on one shot. 1.As you are using ExecuteStreamCommand processor,processor will accepts incoming connections as we can schedule the processor in Timer driven,Cron driven,Event Driven. 2.Just use GenerateFlowfile processor and give success relation to all ExecuteStreamCommand processors and schedule GenerateFlowfile to schedule a run on either cron (or) timer driven. 3.Generateflowfile processor runs on scheduled time and gives trigger flowfile to run all the other ExecuteStreamCommand processors. Sample Flow:-
... View more
09-22-2017
01:57 PM
Hi @Simon Jespersen, in your evalJsonpath processor you are using Path Not Found Behavior property as warn i.e it will generate a warning when a JSON path expression is not found, as in your csv file you are for some of the records wont have any data for zip. This warn message wont effect your flowfile, flowfile still routes to success relationship with all the available content will be extracted as attributes and for no content attributes values will be Empty string set. if you don't want to see those warn messages on the processor then just change Path Not Found Behavior property to ignore(default) which will ignore if the content is not found for any of the processor. Example:- i have recreated same WARN message as you are having with the below Json doc {
"name" : "else",
"adresse" : "route 66",
"by" : "Hadoop City" <br>} with ignore property This is my json doc to evaljson processor with ignore as path not found property processor wont return any warn messages as it ignore if there is no content for the jsonpath expression. With warn property:- If you change path not found property to warn processor will return the same warn message as you are having in the question. both cases results the same output as zip attribute value is Empty string set and routes to Success relation.
... View more
- « Previous
- Next »