Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11193 | 04-15-2020 05:01 PM | |
| 7091 | 10-15-2019 08:12 PM | |
| 3088 | 10-12-2019 08:29 PM | |
| 11401 | 09-21-2019 10:04 AM | |
| 4297 | 09-19-2019 07:11 AM |
01-13-2018
04:59 AM
2 Kudos
@Surendra Shringi We can do this parsing inside NiFi by using Example:- Let's consider your csv file having n number of rows in it Surendra,24,"{"city":"Chennai","state":"TN","zipcode":"600345"}"
Surendra,25,"{"city":"Chennai","state":"TN","zipcode":"609345"}" We need to split this file into individual flowfile having each record in one flowfile for splitting we need to use SplitText:- processor with below configs as Line Split Count
1 So if our input csv having 2 lines in it then split text processor will split the input file having 2 lines into 2 flowfiles having each line in one flowfile. Once we are having each record in one flowfile then we need to use ExtractText:- to extract the content of the flowfile using Extract text processor by adding new properties to the processor as below. Address_city
"city":"(.*?)"
Address_state
"state":"(.*?)"
Address_zipcode
"zipcode":"(.*?)"
Age
,(.*?),
Name ^(.*?), So in this processor we are going to extract contents of flowfile and keep them as flowfile attributes by adding matching regex. To create and test regex click here. You need to change Maximum Buffer Size value (default is 1MB) based on your flowfile size. Replace Text Configs:- In the previous step we have extracted all the contents of flowfile based on the properties in Replace Text processor we are going to create a new csv file with comma delimiter(you can use any delimiter you want), By changing below properties and adding replacement value property as follows. Configs:- Search Value
(?s)(^.*$)
Replacement Value
${Name},${Age},${Address_city},${Address_state},${Address_zipcode} Maximum Buffer Size
1 MB
Replacement Strategy
Always Replace
Evaluation Mode
Entire text So the output of the replace text processor would be Surendra,24,Chennai,TN,24 Surendra,25,Chennai,TN,24 we have created a csv file without json message now but we are going to have 2 csv files(because our input data having 2 lines),if your input file having 1000 lines then we are going to end up with 1000 ourput csv files. If you don't want to create 2 output files and want them to merge into 1 output file then you need to use Merge Content Processor:- With the below configs, You need to change all the highlighted properties as per your requirements as per my configs shows Max bin age of 1 min so processor waits for 1 minute before merging all the queued flowfiles and merges them into 1 file. Delimiter strategy to Text(default is filename) because we need to have our contents of individual flowfile needs to add as newlines in the merged file, so we need to make use of Demarcator property as Shift+Enter(this property helps to add new contents to the newline). Output:- 1 file having both records in it Surendra,24,Chennai,TN,600345
Surendra,25,Chennai,TN,609345 I highly sugges you to refer below links to get familiar with all properties in merge content processor https://community.hortonworks.com/questions/149047/nifi-how-to-handle-with-mergecontent-processor.html https://community.hortonworks.com/questions/149047/nifi-how-to-handle-with-mergecontent-processor.html . I'm attaching the xml to the post you can save the xml and import to nifi and make changes to that accordingly.parse-file-nifi-159780.xml . If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of errors.
... View more
01-11-2018
04:26 PM
3 Kudos
@Daniel Müller Can you create alias name for the aggregate column sum(menge) as menge then the processor will work. Change the SQL Query as Select sum(menge)menge from <table>
... View more
01-10-2018
03:10 PM
@Ravikiran Dasari Create a sqoop job for your import as sqoop job --create <job-name> -- import --connect "jdbc:sqlserver://10.21.29.15:1433;database=db;username=ReportingServices;password=ReportingServices" --check-column batchid --incremental append -m 1 --hive-table mmidwpresentation.journeypositions_archive --table JourneyPositions --hive-import --schema safedrive So once you create sqoop job sqoop will store the last value for the batchid(it's check column argument), when ever you run the job again sqoop will pull only new records after the last state value. Sqoop Job Arguments:- $ sqoop job
--create <job-name>Define a new saved job with the specified job-id (name). A second Sqoop comm and-lin e, separated by a -- should be specified; this defines the saved job.
--delete <job-name>Delete a saved job.
--exec <job-name>Given a job defined with --create, run the saved job.
--show <job-name>Show the parameters for a saved job.
--list these are all the arguments you can use with sqoop job command to execute, list,delete jobs ..etc. Use --password-file option to Set path for a file containing the authentication password while creating sqoop jobs
... View more
01-09-2018
09:48 AM
2 Kudos
@Shakeel Ahmad You need to use InvokeHTTP processor to get response back from the API. InvokeHTTP Configs:- change the Remote URL property value to your url that gives back you required response. Change the Username,Password property values also. In this invoke http processor will get the JSON response and then you need to use Evaluate Json Path processor:- if you are going to extract only one attribute then you need to add new property that matches with the Json attribute. Example:- 1.If your json response is flattened as like below {
"id": "1",
"Certificate": "HCC"
} Then the output of EvalJsonpath processor with above Configs would be HCC 2.If your json response is like below {
"data": {
"id": "1",
"Certificate": "HCC"
}
}
Configs of EvalJSON Processor:- As you can see above screenshot i have changed the Certificate Property value as my JSON response Certificate is inside the data list. so i have matched the Certificate attribute using $.data.Certificate which will result the output as HCC You need to change the Certificate property value as per your JSON Response. Once you get output from the EvaluateJSONPath processor the results are stored in a File, You can store the file using PutFile(or)PutHDFS...etc Flow:- 1.GenerateFlowFile Processor //Schedule to trigger InvokeHTTP processor
2.InvokeHTTP //Change the url,username,password property values and connect Response relation to EvalJsonPath
3.EvalJsonPath //add Certificate matching property value and connect matched relation to next processor
if you are going to store the result into HDFS or Local then
4.PutHDFS(or)PutFile //to store the output file from EvalJsonPath to Local (or) HDFS
... View more
01-09-2018
08:51 AM
2 Kudos
@Ravikiran Dasari, Do Insert into table <historical-table-name> select * from <new-table-name>; Insert into statement means we are going to append to the existing data. If the table structure of both historical and new table is same then you can use select * from new table. If the structure is not same you need to match the historical table structure when you do select from new table. In addition if you want to overwrite the existing data then you just need to change overwrite instead of into and the statement as follows Insert overwrite table <historical-table-name> select * from <new-table-name>;
... View more
01-05-2018
10:43 PM
1 Kudo
@Aadil M There is no predefined properties that we can specify in NiFi processor that can Truncate the table for us before loading into it. But there are some ways we can do that using NiFi Processor. Going through your Flow i suggest You to remove SplitJson processor before ConvertJSONtoSQL processor because ConvertJSONtoSQL does Splitting of records i.e lets take you are having a json array having 1000 records in it, ConvertJsontoSQL processor prepares sql statements for 1000 records and gives 1000 Flowfiles individually. There is no need of using SplitJson processor before ConvertJSONtoSQL processor. To Truncate Table use Execute SQL processor before ConvertJSONtoSQL processor. Method1:- Execute SQL Processor Configs:- Change the below property to SQL select query
truncate table <table-name> so when you got json file fron convertavrotojson processor, executesql processor gets that file to trigger the truncate query and we need to connect Failure of executesql processor to ConvertJSONtoSQL processor. Why failure relation we need to connect? Because executesql processor is used to execute select sql statements and get results but in our case we are not getting results back as Flowfile. But we are just executing a truncate statment that's not executesql processor meant to do, but it executes the truncate statement and gives an error(the statement is not resulting any thing). It actually truncates the data in the table. So when we connects failure relation to another processor it will transfer our json file to convertjsontosql processor then your loading data will be done everytime after truncating the table. Flow:- ListSFTP->FetchSFTP->InferAvroSchema->ConvertCSVtoAvro->ConvertAvrotoJSON->Executesql//executes truncate table statment and connect failure relation to next processor) ->ConvertJSONtoSQL->PutSQL In the above method you are executing truncate statement inside your NiFi Flow it self. (or) Method2:- Keep Truncating table process seperately like Flow:- Generate Flowfile processor //keep truncate table statement --> Putsql processor(execute the truncate table statement) Before loading data trigger this Generateflowfile flow first(i.e method 2) then trigger your actual flow(i.e in your Question) seperately. Schedule these flows non overlapping times that can trigger first truncating the table then schedule loading data into table. . If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
01-04-2018
04:19 AM
1 Kudo
@dhieru singh Can you once make sure the search value property having no spaces in it. Search Value ^//no space $(.*)//no space Configs:- Compare your Replace Text processor configs with above screenshot configs and don't keep any spaces after ^ and $(.*) in search value property and Rerun the processor. If you are still having issues please attach your input file contents...!!
... View more
01-04-2018
02:39 AM
1 Kudo
@Kevin Vasko If you know 13 columns before the string then you can use Replace Text processor that matches until 13th comma then match everything after 13th comma until last comma(not including last comma). Replace text processor Configs:- Change the below properties as Search Value
^((?:[^,]*,){13})(.*), //capture group 1 until 13th comma and capture group 2 after 13th comma until last comma
Replacement Value
$1"$2" //keep 1 group as is and add double quotes for 2 group Maximum Buffer Size 1 MB //if your flowfile content is more than 1 MB then you need to change this value. Replacement Strategy
Regex Replace
Evaluation Mode
Line-by-Line Configs:- By using above search value property we are just replacing as is until 13th comma and matching whole content after 13th comma until last comma and enclosing whole content in double quotes("). If you are thinking that regex will blow up by using above replace text processor configurations then you can use Split Text processor before Replace Text processor to split big csv file each flowfile having 1 line,Then use Replace Text Processor will work with 1 small flowfile, After replace text you can use Merge Content Processor(if you want to merge small flowfiles into 1 big file again).
... View more
01-03-2018
08:57 PM
1 Kudo
@dhieru singh I think just add capture all after $ as shown below screenshot will work. Search Value
^ $(.*)
Replacement Value
$1 Replacement Strategy
Regex Replace
Evaluation Mode
Line-by-Line Replace Text Configs:-
... View more
01-03-2018
07:43 PM
1 Kudo
@Kevin Vasko
Yes we can do your case using NiFi Processors without using any external scripts. Here is what i tried:- First you need to extract the date from filename and keep it as attribute to the flowfile by using Update Attribute processor:- add new property date ${filename:substringAfter('_'):substringBefore('.')} Configs:- Then use Replace text processor to prepend the existing flowfile csv data with extracted date attribute Change the Replacement Value
${date} **keep space after ${date} in above. Maximum Buffer Size
1 MB //if your flowfile content is more than 1 MB then you need to change this value.
Replacement Strategy
Prepend
Evaluation Mode
Line-by-Line Configs:- So in this processor we are just prepending the data with date attribute that is extracted in Update attribute processor and we are going to prepend date value Line-by-Line evoluation mode. Then use another replace text processor to replace last , with double quotes " Search Value (.*),(.*),(.*), Replacement Value $1,"$2,$3"
Maximum Buffer Size 1 MB //if your flowfile content is more than 1 MB then you need to change this value. Replacement Strategy Regex Replace Evaluation Mode Line-by-Line
Configs:- So in this processor we are extracting all the data between last 2 commas and then replacing first group as is and enclosing group2 and group3 with double quotes and removing , at last. Output:- 2017-09-20 23:49:38.637,162929511757,$009389BF,36095,,,,,,,,,,"Failed to fit max attempts (1=>3), fit failing entirely (Fit Failure=True)"
2017-09-20 23:49:38.638,162929512814,$008EE9F6,-16777208,,,,,,,,,,"Command Measure, Targets complete - Elapsed: 76064 ms" I have attached the xml file you can use the same xml file and change the configs as per your needs. Flow:- Reference xml:- date-155661.xml Let me know if you have any issues..!! If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of errors.
... View more