Created 03-12-2018 12:28 PM
I working on spark streaming job in which incoming stream join with existing hive table. I have created a singleton hiveContext. When hiveContext fetch the table data from hive, spark give warning and after few day warning converts into error.
18/03/10 15:55:28 INFO parquet.ParquetRelation$anonfun$buildInternalScan$1$anon$1: Input split: ParquetInputSplit{part: hdfs://nameservice1/user/hive/warehouse/iot.db/iotdevice/part-r-00000-931d1d81-af03-41a4-b659-81a883131289.gz.parquet start: 0 end: 5695 length: 5695 hosts: []} 18/03/10 15:55:28 WARN security.UserGroupInformation: PriviledgedActionException as:svc-ra-iotloaddev (auth:SIMPLE) cause:org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt) 18/03/10 15:55:28 WARN kms.LoadBalancingKMSClientProvider: KMS provider at [https://iotserver9009.kd.iotserver.com:16000/kms/v1/] threw an IOException [org.apache.hadoop.security.authentication.client.AuthenticationException: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]!!
It will stop the job after some day.
Here is code for creating hivecontext
@transient private var instance: HiveContext = _ def getHiveContext(sparkContext: SparkContext, propertiesBroadcast: Broadcast[Properties]): HiveContext = { synchronized { val configuration = new Configuration configuration.addResource("/etc/hadoop/conf/hdfs-site.xml") UserGroupInformation.setConfiguration(configuration) UserGroupInformation.getCurrentUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS) val secure = propertiesBroadcast.value.getProperty("kerberosSecurity").toBoolean if (instance == null) { UserGroupInformation.loginUserFromKeytabAndReturnUGI( propertiesBroadcast.value.getProperty("hadoop.kerberos.principal"), sparkContext.getConf.get("spark.yarn.keytab")) .doAs(new PrivilegedExceptionAction[HiveContext]() { @Override def run(): HiveContext = { System.setProperty("hive.metastore.uris", propertiesBroadcast.value.getProperty("hive.metastore.uris")); if (secure) { System.setProperty("hive.metastore.sasl.enabled", "true") System.setProperty("hive.metastore.kerberos.keytab.file", sparkContext.getConf.get("spark.yarn.keytab")) System.setProperty("hive.security.authorization.enabled", "false") System.setProperty("hive.metastore.kerberos.principal", propertiesBroadcast.value.getProperty("hive.metastore.kerberos.principal")) System.setProperty("hive.metastore.execute.setugi", "true") } instance = new HiveContext(sparkContext) instance.setConf("spark.sql.parquet.writeLegacyFormat", "true") instance.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") instance.setConf("hive.exec.dynamic.partition", "true") instance.setConf("hive.exec.dynamic.partition.mode", "nonstrict") instance } }) } UserGroupInformation.loginUserFromKeytabAndReturnUGI( propertiesBroadcast.value.getProperty("hadoop.kerberos.principal"), sparkContext.getConf.get("spark.yarn.keytab")) .doAs(new PrivilegedExceptionAction[HiveContext]() { @Override def run(): HiveContext = { instance } }) } }
Created 03-12-2018 02:18 PM
1. How long was the ticket valid.
2. You need to renew the ticket before it expires.
Created 03-12-2018 05:31 PM
Every 5 minutes batch I used this method UserGroupInformation.loginUserFromKeytabAndReturnUGI for renew. Do you have any reference ?