Member since
03-31-2018
9
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3961 | 04-14-2018 08:14 PM |
06-12-2019
10:44 AM
@shu, Thanks for the solution. It worked for me.
... View more
05-29-2019
11:14 AM
Hi @shu, Thanks for the input.
... View more
04-14-2018
08:14 PM
@shu, I have managed to write a groovy script to extract primary keys and paritioned columns information form flow file attributes and successfully ingested the data in valid and invalid tables. Script is given below for reference. def flowFile = session.get() if(!flowFile) return def fieldstructure= flowFile.getAttribute("metadata.table.feedFieldStructure") def fieldpartition= flowFile.getAttribute("metadata.table.partitionSpecs") def primaryKeys= "" def partitioncolumn="" def partitioncolumns def partitioncolumnsline1 def partitioncolumnsline2 def partitioncolumnlist = [] def count=0 if(fieldstructure!=null) { def lines= fieldstructure.tokenize('\n') for(line in lines) { def column= line.tokenize('|') if(column[2]=='1') {
count=count+1
if (count > 1) { primaryKeys= " and " + primaryKeys + " and " + column[0] + " is not null" } else { primaryKeys= " and " + column[0] + " is not null" }
}
}
} else{
primaryKeys=null
} if(fieldpartition!=null) { def partitoned = fieldpartition.tokenize('\n') for(fieldpartitionline in partitoned)
{ def partitioncolumnsline=fieldpartitionline.tokenize('|') if(partitioncolumnsline[2].contains('('))
{ partitioncolumnsline1=partitioncolumnsline[2].tokenize('(')
partitioncolumnsline2=partitioncolumnsline1[1].tokenize(')') partitioncolumns = partitioncolumnsline2[0]
} else{
partitioncolumns = partitioncolumnsline1[2]
} partitioncolumnlist.add(partitioncolumns) partitioncolumnlist=partitioncolumnlist.unique(false)
} for(String partition in partitioncolumnlist ) { if(partitioncolumnlist.size()>1)
{
partitioncolumn= " and " + partitioncolumn + " and " + partition + " is not null"
} else {
partitioncolumn=" and " + partition + " is not null"
}
}
} else
{
partitioncolumn = null
} flowFile = session.putAttribute(flowFile,"PartitionColumns",partitioncolumn)
flowFile = session.putAttribute(flowFile,"PrimaryKey",primaryKeys)
session.transfer(flowFile, REL_SUCCESS)
... View more