Reply
New Contributor
Posts: 2
Registered: ‎06-14-2017

Hive query need to run on single JVM

I have a data in source using ETL process Imported the data and loaded into hive staging table.Data looks like below, As this data is daily incremented to maintain the unique id of the customer for my business team i went with udf in hive as requirement suggested.

 

 

Hive staging table data: Name|Id|Address|phno|city Alan,101,JamesStreet,0122898767,Newyork Bob,102,CentralStreet,0122098560,Newyork . . . Rock,199098,Centralstreet,0133456787,Newyork

 

In that table data i need to increment the ID columns for unique customer identification for my business team i have written the UDF in Hive. After Hive UDF apply i need the column like below..

 

Name|Id|Address|phno|city Alan,1_101,JamesStreet,0122898767,Newyork Bob,2_102,CentralStreet,0122098560,Newyork . . . Rock,10000_199098,Centralstreet,0133456787,Newyork

And next day incremented data should start from previous date incremented value append to the exiting id in staging table like, and it continues as data append in staging table in hive.

 

Name|Id|Address|phno|city Smith,10001_2897778,HighLine,0122899997,Newyork Warner,10002_2345678,HighLine,0122098560,Newyork . . . Cristian,2018844_334896543,Centralstreet,0133412345,Newyork

 

Hive UDF Code:

 

package com.XXXX.hive.hiveudf;

import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.hive.ql.udf.UDFType;

@UDFType(deterministic = false)//, stateful = true) public class XXXXAutoIncrementIdUDF extends UDF {

long Incremental_Start_Value = 0; // Taking a variable to assign default value

static long flag = 0;

public String evaluate(Long input_Emp_Id, Long incremental_Value) { // Taking Employee_Id and incremental values as parameters to method eg:123456789,101

String result;

if (flag == 0) // Checking if Initial value is Zero

{ Incremental_Start_Value = incremental_Value; flag++; }

result = Incremental_Start_Value + "_" + input_Emp_Id; // Concatenating both employee_Id and input incremental value

Incremental_Start_Value++;

return result; //Returning Final value like eg: 101_123456789

}

}

Hive query to use udf code:

 

create function INC_NUMBERID as 'com.xxx.hive.hiveudf.XXXXAutoIncrementIdUDF';

 

set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; set mapred.map.tasks = 01;

 

INSERT INTO TABLE mydb.tmp_encounter_table (SELECT INC_NUMBERID(cast(id as BIGINT), 101) as udf_encounter_id from hivemydb.stagingtable;

Hive code is running fine with few columns, but when the number of records increased to millions it not working as we expected to match the incrementvalue_rowid with count in table. I tried setting the number of mapper to 1 also still no luck :-(.Then we came to understanding the hadoop executing this query in multiple JVM which causing us to varying in the increment count incrementvalue_rowid value to the table row count.

Kindly help me to complete this functionality, Happy to get suggestion to resolve the issue.

How can I set my hive UDF query to run on single JVM in hadoop environment to achive my desired output.Thanks in Advance.

Announcements