Member since 
    
	
		
		
		06-08-2017
	
	
	
	
	
	
	
	
	
	
	
	
	
	
			
      
                1049
            
            
                Posts
            
        
                518
            
            
                Kudos Received
            
        
                312
            
            
                Solutions
            
        My Accepted Solutions
| Title | Views | Posted | 
|---|---|---|
| 11187 | 04-15-2020 05:01 PM | |
| 7084 | 10-15-2019 08:12 PM | |
| 3084 | 10-12-2019 08:29 PM | |
| 11381 | 09-21-2019 10:04 AM | |
| 4284 | 09-19-2019 07:11 AM | 
			
    
	
		
		
		04-17-2018
	
		
		01:34 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 @Matt Clarke, seems quite refined approach. happy to see your response. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		04-17-2018
	
		
		02:01 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 I am working on NIFI-4456 which will allow the JSON reader/writer to support the "one JSON per line" format as well as the "JSON array" format for input and output, so you will be able to read in one JSON per line and output a JSON array, using ConvertRecord (or any other record-aware processor).  In the meantime, you can use the following crude script in an ExecuteGroovyScript processor to process your entire file (avoiding the Split/Merge pattern), it should get you what you want:  def flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile, {inStream, outStream ->
  outStream.write('['.bytes)
  inStream.eachLine { line, i ->
    if(i > 1) outStream.write(','.bytes)
    outStream.write(line.bytes)
  }
  outStream.write(']'.bytes)
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)  The script just adds array brackets around the whole doc, and separates the lines by a comma. I did the crude version because it doesn't need to load the entire input content into memory. If you need more control over the JSON objects, you could iterate over the lines (still with eachLine), use JsonSlurper to deserialize each string into a JSON object, then add each object to an array, then use JsonOutput to serialize the whole thing back to a string. However that involves having the entire content in memory and could get unwieldy for large input flow files. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		04-16-2018
	
		
		04:47 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 I too tried something in the meanwhile. Here is the screenshot of the flow. In this flow I simple split them using regular expression and then extracted what was needed using the success connectors. Will surely try one of the methods mentioned by you as well and get back here. @Shu     
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		04-17-2018
	
		
		05:19 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 I got the issue resolved.  The Issue is we cannot disable the service immediately after stopping the process group via rest api.  I'm putting it to sleep for 2 mins and then sending a request to disable the service and  its working fine.  Thanks a lot for responding  
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		10-04-2018
	
		
		03:58 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Hello @Shu,  I am wondering how can use the same technique but to replace order by clause with nothing. I would like to not use order by at all.  Thanks. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		04-14-2018
	
		
		07:40 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							@Laurie McIntosh If you are having more than one file to process then use Merge Content processor after GetFile processor,Merge Content processor merges more than one file into one file.  Flow:-     Get File --> MergeContent -->Truncate Table --> insert csv into Table --> clean up  by using merge content processor we are processing one file at a time even though you are having more than one file. PutDatabaseRecord processor(all record based processors) are pretty powerful in NiFi which can handle millions of records.  Please refer to below links to know how to configure merge content processor  https://community.hortonworks.com/questions/64337/apache-nifi-merge-content.html  https://community.hortonworks.com/questions/161827/mergeprocessor-nifi-using-the-correlation-attribut.html  https://community.hortonworks.com/questions/149047/nifi-how-to-handle-with-mergecontent-processor.html  in addition there are wait and notify processors in NiFi, which Routes incoming FlowFiles to the 'wait' relationship until a matching release signal is stored in the distributed cache from a corresponding Notify processor. When a matching release signal is identified, a waiting FlowFile is routed to the 'success' relationship.  http://ijokarumawak.github.io/nifi/2017/02/02/nifi-notify-batch/  
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		04-06-2018
	
		
		12:50 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 @umang s
  if you are having url as attribute to the flowfile then you can use update attribute processor with replaceFirst function to replace 5 characters with empty string.  Example:-  I'm having url attribute to the flowfile with value as  https://community.hortonworks.com/questions/182384/how-to-remove-just-only-5-characters-from-url.html?childToView=183771#answer-183771  Now i'm using update attribute processor and added new property in update attribute processor  url
  ${url:replaceFirst('\w{5}', '')}  (or)  ${url:replaceAll('^.{0,5}', '')}      Output Flowfile Attribute value:-      Now we have removed first 5 characters in url attribute value by using update attribute processor. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		04-03-2018
	
		
		10:51 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 @Faheem Shoukat  Additional Where Clause property has been introduces from NiFi 1.4.0+ versions of NiFi, As you are using NiFi 1.3.0 that's the reason why you are not able to find the Additional where clause property in your QuerydatabaseTable processor.  Possible ways to achieve where clause functionality:-  You can upgrade your NiFi instance to newer versions   (or)   Use Execute Sql processor(add where clause in your query), store the state in hive/hbase and pull the state again for incremental run using Execute Sql processor   (or)   Use combination of GenerateTableFetch(supports additional Where clause property) + RemoteProcessorGroup + ExecuteSql processors to achieve where clause property.  NiFi 1.4 Querydatabase table processor configs:-  once you have nifi 1.4 version then click on configure on QueryDatabase table processor you are going to have Additional Where clause property,      Below is the reference link for Jira ticket addressing about additional where clause property.  https://issues.apache.org/jira/browse/NIFI-4257 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		04-14-2018
	
		
		08:14 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 @shu,   I have managed to write a groovy script to extract primary keys and paritioned columns information form flow file attributes and successfully ingested the data in valid and invalid tables. Script is given below for reference.  def flowFile = session.get()   if(!flowFile) return   def fieldstructure= flowFile.getAttribute("metadata.table.feedFieldStructure")   def fieldpartition= flowFile.getAttribute("metadata.table.partitionSpecs")   def primaryKeys= ""   def partitioncolumn=""   def partitioncolumns   def partitioncolumnsline1   def partitioncolumnsline2   def partitioncolumnlist = []   def count=0   if(fieldstructure!=null)   {   def lines= fieldstructure.tokenize('\n')   for(line in lines) {                  def column= line.tokenize('|')                  if(column[2]=='1')  {
                              count=count+1  
                              if (count > 1)  {                                                primaryKeys= " and " + primaryKeys + " and " + column[0] + " is not null"                                 }   else {                                                primaryKeys= " and " + column[0] + " is not null"                                 }
               }
}
}   else{
         primaryKeys=null
}   if(fieldpartition!=null)   {   def partitoned = fieldpartition.tokenize('\n')   for(fieldpartitionline in partitoned)
{                     def partitioncolumnsline=fieldpartitionline.tokenize('|')     if(partitioncolumnsline[2].contains('('))  
  {   partitioncolumnsline1=partitioncolumnsline[2].tokenize('(')  
partitioncolumnsline2=partitioncolumnsline1[1].tokenize(')')                                       partitioncolumns = partitioncolumnsline2[0]
  }     else{
partitioncolumns = partitioncolumnsline1[2]
  }      partitioncolumnlist.add(partitioncolumns)     partitioncolumnlist=partitioncolumnlist.unique(false)   
}   for(String partition in partitioncolumnlist )   {                      if(partitioncolumnlist.size()>1)  
                        {
                        partitioncolumn= " and " + partitioncolumn + " and " + partition + " is not null" 
                         }           else { 
                               partitioncolumn=" and " + partition + " is not null"
                    }    
}
}   else
{
partitioncolumn = null
}   flowFile = session.putAttribute(flowFile,"PartitionColumns",partitioncolumn)
flowFile = session.putAttribute(flowFile,"PrimaryKey",primaryKeys)
session.transfer(flowFile, REL_SUCCESS)     
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		04-26-2019
	
		
		08:36 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 @Shu This solution doesn't work if I have multiple arrays in one json message. Example - {"TxnMessage": {"HeaderData": {"EventCatg": "F"},"PostInrlTxn": {"Key": {"Acnt": "1234567890","Date": "20181018"},"Id": "3456","AdDa": {"Area": [{"HgmId": "","HntAm": 0},{"HgmId": "","HntAm": 0}]},"escTx": "Reload","seTb": {"seEnt": [{"seId": "CAKE","rCd": 678},{"seId": "","rCd": 0}]},"Bal": 6766}}}   If you could help with analysis and/or pointers on how each element of array in such input can be split into a separate rows? 
						
					
					... View more