Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

nifi processor to copy the data between hive tables. Any available processor

avatar
Explorer

Hi, I am creating a reusable flow where i need to copy the data between hive tables although source and target table definitions will be same but for each unique execution number of columns and partition columns will be different. Is there any nifi processor available to do this.

Thanks in advance.

1 ACCEPTED SOLUTION

avatar
Explorer

@shu,

I have managed to write a groovy script to extract primary keys and paritioned columns information form flow file attributes and successfully ingested the data in valid and invalid tables. Script is given below for reference.

def flowFile = session.get()

if(!flowFile) return

def fieldstructure= flowFile.getAttribute("metadata.table.feedFieldStructure")

def fieldpartition= flowFile.getAttribute("metadata.table.partitionSpecs")

def primaryKeys= ""

def partitioncolumn=""

def partitioncolumns

def partitioncolumnsline1

def partitioncolumnsline2

def partitioncolumnlist = []

def count=0

if(fieldstructure!=null)

{

def lines= fieldstructure.tokenize('\n')

for(line in lines) {

def column= line.tokenize('|')

if(column[2]=='1')

{ count=count+1

if (count > 1)

{

primaryKeys= " and " + primaryKeys + " and " + column[0] + " is not null"

}

else {

primaryKeys= " and " + column[0] + " is not null"

} } } }

else{ primaryKeys=null }

if(fieldpartition!=null)

{

def partitoned = fieldpartition.tokenize('\n')

for(fieldpartitionline in partitoned) {

def partitioncolumnsline=fieldpartitionline.tokenize('|')

if(partitioncolumnsline[2].contains('('))

{

partitioncolumnsline1=partitioncolumnsline[2].tokenize('(')

partitioncolumnsline2=partitioncolumnsline1[1].tokenize(')')

partitioncolumns = partitioncolumnsline2[0] }

else{ partitioncolumns = partitioncolumnsline1[2] }

partitioncolumnlist.add(partitioncolumns)

partitioncolumnlist=partitioncolumnlist.unique(false)

}

for(String partition in partitioncolumnlist )

{

if(partitioncolumnlist.size()>1)

{ partitioncolumn= " and " + partitioncolumn + " and " + partition + " is not null" }

else { partitioncolumn=" and " + partition + " is not null" }

} }

else { partitioncolumn = null }

flowFile = session.putAttribute(flowFile,"PartitionColumns",partitioncolumn) flowFile = session.putAttribute(flowFile,"PrimaryKey",primaryKeys) session.transfer(flowFile, REL_SUCCESS)


View solution in original post

4 REPLIES 4

avatar
Master Guru
<br>

@Omer Shahzad

By using PutHiveQL processor we can execute Hive DDL/DML statements, As you need to copy data between hive tables so we need to prepare hive statements and execute them using PutHiveQL processor.

Example:-

Let's take i need to copy 5 source tables in hive to target database in hive for this case keep all your insert/insert overwrite statements in Generate Flowfile processor.

GenerateFlowfile Processor configs:-

im having 5 target tables which are having different partiton columns dt,yr,ts and source data table also in hive having same partition columns, in the below statements i'm selecting a specific partition and inserting data into final table for the specific partition.

Hive DML staements as follows:-

insert into target_db_name.target_table_name1 partition(dt) select * from source_db_name.tablename1 where dt='partition_value'
insert into target_db_name.target_table_name2 partition(yr) select * from source_db_name.tablename2 where yr='partition_value'
insert into target_db_name.target_table_name3 partition(dt) select * from source_db_name.tablename3 where dt='partition_value'
insert into target_db_name.target_table_name4 partition(ts) select * from source_db_name.tablename4 where ts='partition_value'
insert into target_db_name.target_table_name5 partition(dt) select * from source_db_name.tablename5 where dt='partition_value'

Schedule this processor either using cron (or) timer driven to run daily (or) based on your use case.

If you want to change the partition column value based on time then use expression language to change the value dynamically based on time.

example:-

dt='${now():format("yyyy-MM-dd")}' output dt='2018-04-01' 

Refer to below this link for more date manipulations in NiFi.

64974-gff.png

Then use Split Text processor:-

As in the above generate flowfile processor we are having all the statements in one flowfile, now by using split text processor split the contents of flowfile by each line.

64975-splittext.png

now we are splitting the contents of flowfile into each line to new flowfile as we are having 5 insert statements so we will get 5 flowfile each having insert staetments in them.

PutHiveQl Processor:-

Put hive ql processor expects the content of an incoming FlowFile is expected to be the HiveQL command to execute.

We are having insert statements as our flowfile content so use the Splits relation from split text processor relation and connect to Puthive Ql processor. each statement will be executed by using put hive ql processor.

By using this method if you need to add new table to copy data from source to target then just add the appropriate insert staement in generate flowfile processor and we are re using the same flow for copy data from source to final.

in addition if you want to distribute the flowfiles then use Remote processor group after Split text processor for distribution.

Following link explains how to configure remote processor group

https://community.hortonworks.com/articles/16461/nifi-understanding-how-to-use-process-groups-and-r....

Flow:-

64976-flow.png

avatar
Explorer

@shu,

Thanks for your reply. I have managed to do this by using HiveQLProcessor but i have another question. Is there any processor or property available to get Primary key columns on Hive Tables. Some junk/duplicate null rows are inserted in table which are not in source file therefore i wanted to remove them before moving the data into next stage by using where primary keys are not null.

Thanks

avatar
Master Guru
@Omer Shahzad

Check data before storing into Staging area:-

You can use Validate Record processor with Strict type checking property to true, to check the primary key columns data before storing the data into Staging area.

Please refer to below link for the usage of validate record processor

https://community.hortonworks.com/articles/147198/using-apache-nifi-to-validate-records-adhere-to-a....

https://community.hortonworks.com/articles/147200/using-apache-nifi-to-validate-that-records-adhere....

(or)

Check data before moving from staging area to next stage:-

in our hive statements we can add filter to check the primary key columns are not null and length(col-name) !=0(not equals to zero) before moving the data to next stage

Ex:-

insert into target_db_name.target_table_name1 partition(dt)select*from source_db_name.tablename1 where dt='partition_value' and <pk-col-name> is not null and length(<pk-col-name>) !=0

by using above hive statement we are going to ingest only the data that is not null from the source db to target db.

avatar
Explorer

@shu,

I have managed to write a groovy script to extract primary keys and paritioned columns information form flow file attributes and successfully ingested the data in valid and invalid tables. Script is given below for reference.

def flowFile = session.get()

if(!flowFile) return

def fieldstructure= flowFile.getAttribute("metadata.table.feedFieldStructure")

def fieldpartition= flowFile.getAttribute("metadata.table.partitionSpecs")

def primaryKeys= ""

def partitioncolumn=""

def partitioncolumns

def partitioncolumnsline1

def partitioncolumnsline2

def partitioncolumnlist = []

def count=0

if(fieldstructure!=null)

{

def lines= fieldstructure.tokenize('\n')

for(line in lines) {

def column= line.tokenize('|')

if(column[2]=='1')

{ count=count+1

if (count > 1)

{

primaryKeys= " and " + primaryKeys + " and " + column[0] + " is not null"

}

else {

primaryKeys= " and " + column[0] + " is not null"

} } } }

else{ primaryKeys=null }

if(fieldpartition!=null)

{

def partitoned = fieldpartition.tokenize('\n')

for(fieldpartitionline in partitoned) {

def partitioncolumnsline=fieldpartitionline.tokenize('|')

if(partitioncolumnsline[2].contains('('))

{

partitioncolumnsline1=partitioncolumnsline[2].tokenize('(')

partitioncolumnsline2=partitioncolumnsline1[1].tokenize(')')

partitioncolumns = partitioncolumnsline2[0] }

else{ partitioncolumns = partitioncolumnsline1[2] }

partitioncolumnlist.add(partitioncolumns)

partitioncolumnlist=partitioncolumnlist.unique(false)

}

for(String partition in partitioncolumnlist )

{

if(partitioncolumnlist.size()>1)

{ partitioncolumn= " and " + partitioncolumn + " and " + partition + " is not null" }

else { partitioncolumn=" and " + partition + " is not null" }

} }

else { partitioncolumn = null }

flowFile = session.putAttribute(flowFile,"PartitionColumns",partitioncolumn) flowFile = session.putAttribute(flowFile,"PrimaryKey",primaryKeys) session.transfer(flowFile, REL_SUCCESS)