Created 06-08-2021 05:36 PM
Hello, I created a Hive UDF function to in java , ,and uploaded the jar package to the cluster, query the data in the hive table by call the UDF function through hiveserver2.Now executing UDF in hive is successed, and execute in beeline is failed.Meanwhile, hiveserver2 is stuck,even input "show databases;",there is no response.I don't know why. There are no exception logs in hiveserver2 and hive.
package com.account.analyzer.hadoop.udf.util;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.sql.*;
import java.util.*;
/**
* Created by zhengtx19755 on 2021/5/31.
*/
public class HiveRead {
private Logger logger = LoggerFactory.getLogger(HiveRead.class);
private Configuration configuration = new Configuration();
private Connection connection = null;
private static volatile HiveRead instance = null;
public static HiveRead getInstance() throws Exception {
if (instance == null){
synchronized (HiveRead.class){
if (instance == null){
instance = new HiveRead();
instance.configurationInit();
instance.connectionInit();
}
}
}
return instance;
}
private void kerberosInit(String env) throws Exception {
Class.forName(configuration.get("dbDriver"));
String path = "kerberos_"+env;
if (! new File("/etc/krb5.conf").exists()){
throw new Exception("/etc/krb5.conf 文件不存在;主机名:"+ InetAddress.getLocalHost().getHostName());
}
if (! new File("/var/kerberos/keytab/apollo.keytab").exists()){
throw new Exception("/var/kerberos/keytab/apollo.keytab 文件不存在;主机名:"+ InetAddress.getLocalHost().getHostName());
}
//登录Kerberos账号
System.setProperty("java.security.krb5.conf","/etc/krb5.conf");
logger.info("krb5.conf:" + "/etc/krb5.conf");
configuration.set("hadoop.security.authentication" , "Kerberos" );
UserGroupInformation.setConfiguration(configuration);
logger.info("keytab:"+"/var/kerberos/keytab/apollo.keytab");
UserGroupInformation.loginUserFromKeytab("apollo@GYDW.COM",
"/var/kerberos/keytab/apollo.keytab");
}
private void connectionInit() throws Exception {
//加载数据库驱动
Class.forName(configuration.get("dbDriver"));
// urlSB.append( "jdbc:hive2://").append(configuration.get("dbIP")).append(":").append(configuration.get("dbPort"));
// if(configuration.get("iskrb5","false").equalsIgnoreCase("true")){
// urlSB.append("/;principal=").append(configuration.get("krb5Principal"));
// }
String env = checkIp(configuration.get("ip_list","10.20.32.117:hsdev"));
// String env = "gydev";
String url = "hive_url_"+env;
String dbURL = configuration.get(url,"");
logger.info("获取hive连接url:"+dbURL);
if(dbURL.contains("principal")){
kerberosInit(env);
}
// connection = DriverManager.getConnection(dbURL);
connection = DriverManager.getConnection(dbURL,"apollo","");
}
private String checkIp(String ip_list) {
for (String ipAndDev: ip_list.split(";")){
String[] ip_env = ipAndDev.split(":");
if(IpUtil.ping(ip_env[0])){
return ip_env[1];
}
}
return null;
}
private void configurationInit() throws IOException {
// String domain_path = HiveRead.class.getProtectionDomain().getCodeSource().getLocation().getPath();
// //在服务器端时,domain方式获取的路径带有jar包的名字,此时需要做处理
// if(domain_path.endsWith(".jar")){
// domain_path = domain_path.substring(0,domain_path.lastIndexOf(File.separator));
// System.out.println("domain方式服务器端处理后(去掉jar包名)的绝对路径=>"+domain_path);
// }
// configuration.addResource(domain_path+File.separator+"application.properties");
configuration.set("dbDriver","org.apache.hive.jdbc.HiveDriver");
configuration.set("ip_list",
"node131:mydev;" +
"10.20.32.117:hsdev;" +
"dev-dw-nn01:gydev;" +
"gyzq-dw-nn01:gypro");
configuration.set("hive_url_mydev","jdbc:hive2://node131:10000");
configuration.set("hive_url_hsdev","jdbc:hive2://10.20.32.117:10000");
//开发环境
configuration.set("hive_url_gydev","jdbc:hive2://dev-dw-nn01:10000/;principal=hive/dev-dw-nn01@GYDW.COM");
// 生产环境
configuration.set("hive_url_gypro","jdbc:hive2://gyzq-dw-nn01:10000/;principal=hive/gyzq-dw-nn01@GYDW.COM");
configuration.set("close_table","apollo_ods_origindb.hs_ods_closedate");
configuration.set("close_field","close_date");
}
//查询表,不带参数
public List<String> getExchangeDays(){
List<String> closes = new ArrayList<>();
Map<String,Object> param = new HashMap<>();
param.put("close_table",configuration.get("close_table"));
param.put("close_field",configuration.get("close_field"));
List<Map<String,Object>> resultMap = selectTable("select ${close_field} from ${close_table} order by ${close_field} asc",param);
for (Map<String,Object> map : resultMap) {
if (map.containsKey(configuration.get("close_field"))) {
closes.add(map.get(configuration.get("close_field")).toString());
}
}
return closes;
}
public String addParam(String sql,Map<String,Object> param){
String retSql = sql;
if (param == null || param.size() == 0){
}else{
for (Map.Entry<String,Object> entry:param.entrySet()) {
retSql = retSql.replace("${"+ entry.getKey() +"}",entry.getValue().toString());
}
}
return retSql;
}
//查询表,带参数
public List<Map<String,Object>> selectTable(String sql,Map<String,Object> param){
List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
Connection conn;
Statement st;
ResultSet rs ;
try {
conn = connection;
st = conn.createStatement();
rs = st.executeQuery(param == null ?sql:addParam(sql,param));
ResultSetMetaData md = rs.getMetaData();
int columnCount = md.getColumnCount();
while(rs.next()){
Map<String,Object> rowData = new HashMap<>();
for (int i = 1; i <= columnCount; i++) {
rowData.put(md.getColumnName(i), rs.getObject(i));
}
list.add(rowData);
}
release(conn,st,rs);
} catch (SQLException e) {
e.printStackTrace();
}
return list;
}
public void release(Connection conn, Statement st, ResultSet rs){
if(rs!=null){
try{
rs.close();
}catch (Exception e) {
e.printStackTrace();
}
}
if(st!=null){
try{
st.close();
}catch (Exception e) {
e.printStackTrace();
}
}
if(conn!=null){
try{
conn.close();
}catch (Exception e) {
e.printStackTrace();
}
}
}
}
Created 06-09-2021 07:28 AM
Hi @magicchu
Could you please try following example.
https://github.com/rangareddy/spark-hive-udf
Build the application and copy the jar and upload it to hdfs. After that register as function and try the example. If it is working then try to implement same way.
Created 06-10-2021 12:00 AM
I test in a CDH cluster with kerberos and sentry,so it is different with the demo.I test succeed in hive CLI,but failed in beeline.
Created 06-22-2021 02:19 AM