Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11158 | 04-15-2020 05:01 PM | |
| 7064 | 10-15-2019 08:12 PM | |
| 3080 | 10-12-2019 08:29 PM | |
| 11327 | 09-21-2019 10:04 AM | |
| 4232 | 09-19-2019 07:11 AM |
04-17-2018
01:34 PM
@Matt Clarke, seems quite refined approach. happy to see your response.
... View more
04-17-2018
02:01 PM
I am working on NIFI-4456 which will allow the JSON reader/writer to support the "one JSON per line" format as well as the "JSON array" format for input and output, so you will be able to read in one JSON per line and output a JSON array, using ConvertRecord (or any other record-aware processor). In the meantime, you can use the following crude script in an ExecuteGroovyScript processor to process your entire file (avoiding the Split/Merge pattern), it should get you what you want: def flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile, {inStream, outStream ->
outStream.write('['.bytes)
inStream.eachLine { line, i ->
if(i > 1) outStream.write(','.bytes)
outStream.write(line.bytes)
}
outStream.write(']'.bytes)
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS) The script just adds array brackets around the whole doc, and separates the lines by a comma. I did the crude version because it doesn't need to load the entire input content into memory. If you need more control over the JSON objects, you could iterate over the lines (still with eachLine), use JsonSlurper to deserialize each string into a JSON object, then add each object to an array, then use JsonOutput to serialize the whole thing back to a string. However that involves having the entire content in memory and could get unwieldy for large input flow files.
... View more
04-16-2018
04:47 AM
I too tried something in the meanwhile. Here is the screenshot of the flow. In this flow I simple split them using regular expression and then extracted what was needed using the success connectors. Will surely try one of the methods mentioned by you as well and get back here. @Shu
... View more
04-17-2018
05:19 AM
I got the issue resolved. The Issue is we cannot disable the service immediately after stopping the process group via rest api. I'm putting it to sleep for 2 mins and then sending a request to disable the service and its working fine. Thanks a lot for responding
... View more
10-04-2018
03:58 PM
Hello @Shu, I am wondering how can use the same technique but to replace order by clause with nothing. I would like to not use order by at all. Thanks.
... View more
04-14-2018
07:40 PM
@Laurie McIntosh If you are having more than one file to process then use Merge Content processor after GetFile processor,Merge Content processor merges more than one file into one file. Flow:- Get File --> MergeContent -->Truncate Table --> insert csv into Table --> clean up by using merge content processor we are processing one file at a time even though you are having more than one file. PutDatabaseRecord processor(all record based processors) are pretty powerful in NiFi which can handle millions of records. Please refer to below links to know how to configure merge content processor https://community.hortonworks.com/questions/64337/apache-nifi-merge-content.html https://community.hortonworks.com/questions/161827/mergeprocessor-nifi-using-the-correlation-attribut.html https://community.hortonworks.com/questions/149047/nifi-how-to-handle-with-mergecontent-processor.html in addition there are wait and notify processors in NiFi, which Routes incoming FlowFiles to the 'wait' relationship until a matching release signal is stored in the distributed cache from a corresponding Notify processor. When a matching release signal is identified, a waiting FlowFile is routed to the 'success' relationship. http://ijokarumawak.github.io/nifi/2017/02/02/nifi-notify-batch/
... View more
04-06-2018
12:50 PM
@umang s
if you are having url as attribute to the flowfile then you can use update attribute processor with replaceFirst function to replace 5 characters with empty string. Example:- I'm having url attribute to the flowfile with value as https://community.hortonworks.com/questions/182384/how-to-remove-just-only-5-characters-from-url.html?childToView=183771#answer-183771 Now i'm using update attribute processor and added new property in update attribute processor url
${url:replaceFirst('\w{5}', '')} (or) ${url:replaceAll('^.{0,5}', '')} Output Flowfile Attribute value:- Now we have removed first 5 characters in url attribute value by using update attribute processor.
... View more
04-03-2018
10:51 AM
@Faheem Shoukat Additional Where Clause property has been introduces from NiFi 1.4.0+ versions of NiFi, As you are using NiFi 1.3.0 that's the reason why you are not able to find the Additional where clause property in your QuerydatabaseTable processor. Possible ways to achieve where clause functionality:- You can upgrade your NiFi instance to newer versions (or) Use Execute Sql processor(add where clause in your query), store the state in hive/hbase and pull the state again for incremental run using Execute Sql processor (or) Use combination of GenerateTableFetch(supports additional Where clause property) + RemoteProcessorGroup + ExecuteSql processors to achieve where clause property. NiFi 1.4 Querydatabase table processor configs:- once you have nifi 1.4 version then click on configure on QueryDatabase table processor you are going to have Additional Where clause property, Below is the reference link for Jira ticket addressing about additional where clause property. https://issues.apache.org/jira/browse/NIFI-4257
... View more
04-14-2018
08:14 PM
@shu, I have managed to write a groovy script to extract primary keys and paritioned columns information form flow file attributes and successfully ingested the data in valid and invalid tables. Script is given below for reference. def flowFile = session.get() if(!flowFile) return def fieldstructure= flowFile.getAttribute("metadata.table.feedFieldStructure") def fieldpartition= flowFile.getAttribute("metadata.table.partitionSpecs") def primaryKeys= "" def partitioncolumn="" def partitioncolumns def partitioncolumnsline1 def partitioncolumnsline2 def partitioncolumnlist = [] def count=0 if(fieldstructure!=null) { def lines= fieldstructure.tokenize('\n') for(line in lines) { def column= line.tokenize('|') if(column[2]=='1') {
count=count+1
if (count > 1) { primaryKeys= " and " + primaryKeys + " and " + column[0] + " is not null" } else { primaryKeys= " and " + column[0] + " is not null" }
}
}
} else{
primaryKeys=null
} if(fieldpartition!=null) { def partitoned = fieldpartition.tokenize('\n') for(fieldpartitionline in partitoned)
{ def partitioncolumnsline=fieldpartitionline.tokenize('|') if(partitioncolumnsline[2].contains('('))
{ partitioncolumnsline1=partitioncolumnsline[2].tokenize('(')
partitioncolumnsline2=partitioncolumnsline1[1].tokenize(')') partitioncolumns = partitioncolumnsline2[0]
} else{
partitioncolumns = partitioncolumnsline1[2]
} partitioncolumnlist.add(partitioncolumns) partitioncolumnlist=partitioncolumnlist.unique(false)
} for(String partition in partitioncolumnlist ) { if(partitioncolumnlist.size()>1)
{
partitioncolumn= " and " + partitioncolumn + " and " + partition + " is not null"
} else {
partitioncolumn=" and " + partition + " is not null"
}
}
} else
{
partitioncolumn = null
} flowFile = session.putAttribute(flowFile,"PartitionColumns",partitioncolumn)
flowFile = session.putAttribute(flowFile,"PrimaryKey",primaryKeys)
session.transfer(flowFile, REL_SUCCESS)
... View more
04-26-2019
08:36 AM
@Shu This solution doesn't work if I have multiple arrays in one json message. Example - {"TxnMessage": {"HeaderData": {"EventCatg": "F"},"PostInrlTxn": {"Key": {"Acnt": "1234567890","Date": "20181018"},"Id": "3456","AdDa": {"Area": [{"HgmId": "","HntAm": 0},{"HgmId": "","HntAm": 0}]},"escTx": "Reload","seTb": {"seEnt": [{"seId": "CAKE","rCd": 678},{"seId": "","rCd": 0}]},"Bal": 6766}}} If you could help with analysis and/or pointers on how each element of array in such input can be split into a separate rows?
... View more