03:23 AM
I am trying to exchange a partition from staging db after merging the incremental data with the existing one as below: 1. Created staging table with partition: CREATE TABLE stg.customers_testcontrol_staging(customer_id bigint,customer_name string, customer_number string,status string,attribute_category string,attribute1 string, attribute2 string, attribute3 string, attribute4 string, attribute5 string) PARTITIONED BY (source_name string) ROW FORMAT SERDE'' STORED AS INPUTFORMAT '' OUTPUTFORMAT '' Location('/apps/hive/warehouse/stg.db/customers_testcontrol_staging' 2. Inserted data into above table after merging with the base table data INSERT OVERWRITE TABLE finstg.customers_testcontrol_staging PARTITION (source_name) SELECT t1.* FROM (SELECT * FROM base.customers where source_name='ORACLE' UNION ALL SELECT * FROM external.customers_incremental_data) t1 JOIN (SELECT customer_id,source_name, max(updated_date) max_modified FROM (SELECT * FROM base.customers where source_name='ORACLE' UNION ALL SELECT * FROM external.customers_incremental_data) t2 GROUP BY customer_id,source_name) s ON t1.customer_id=s.customer_id AND t1.source_name=s.source_name; Primary Keys of the table I am performing the join are: customer_id & source_name 3. Exchange partition step: ALTER TABLE base.customers EXCHANGE PARTITION (source_name = 'ORACLE') WITH TABLE stg.customers_testcontrol_staging; But the exchange partition step fails with the exception: Error: Error while compiling statement: FAILED: SemanticException [Error 10118]: Partition already exists [customers(source_name=ORACLE)] I took the syntax from Hive Confluence page Is there anything I missed to include in the EXCHANGE partition statement ? Could anyone let me what is the mistake I am doing here & how can I fix it ?
Apache Hive
Apache Spark
09:31 PM
I am trying to load a dataframe into a Hive table by following the below steps:
Read the source table and save the dataframe as a CSV file on HDFS
val yearDF ="jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2016").option("user", devUserName).option("password", devPassword).option("partitionColumn","header_id").option("lowerBound", 199199).option("upperBound", 284058).option("numPartitions",10).load()
Order the columns as per my Hive table columns My hive table columns are present in a string in the format of:
val hiveCols = "col1:coldatatype|col2:coldatatype|col3:coldatatype|col4:coldatatype...col200:datatype"
val schemaList = hiveCols.split("\\|")
val hiveColumnOrder = => e.split("\\:")).map(e => e(0)).toSeq
val finalDF = yearDF.selectExpr(hiveColumnOrder:_*)
The order of columns that I read in "execQuery" are same as "hiveColumnOrder" and just to make sure of the order, I select the columns in yearDF once again using selectExpr
Saving the dataframe as a CSV file on HDFS:
Once I save the dataframe, I take the same columns from "hiveCols", prepare a DDL to create a hive table on the same location with values being comma separated as given below:
create table if not exists schema.tablename(col1 coldatatype,col2 coldatatype,col3 coldatatype,col4 coldatatype...col200 datatype)
LOCATION 'hdfs://username/apps/hive/warehouse/database.db/lines_test_data56/';
After I load the dataframe into the table created, the problem I am facing here is when I query the table, I am getting improper output in the query. For ex: If I apply the below query on the dataframe before saving it as a file:
select header_id,line_num,debit_rate,debit_rate_text,credit_rate,credit_rate_text,activity_amount,activity_amount_text,exchange_rate,exchange_rate_text,amount_cr,amount_cr_text from tmpTable where header_id=19924598 and line_num=2
I get the output properly. All the values are properly aligned to the columns:
But after saving the dataframe in a CSV file, create a table on top of it (step4) and apply the same query on the created table I see the data is jumbled and improperly mapped with the columns:
select header_id,line_num,debit_rate,debit_rate_text,credit_rate,credit_rate_text,activity_amount,activity_amount_text,exchange_rate,exchange_rate_text,amount_cr,amount_cr_text from schema.tablename where header_id=19924598 and line_num=2
| header_id | line_num | debit_rate | debit_rate_text | credit_rate | credit_rate_text | activity_amount | activity_amount_text | exchange_rate | exchange_rate_text | amount_cr | amount_cr_text |
| 19924598 | 2 | NULL | | 381761.4 | | 5686.76 | 5686.76 | NULL | -5686.76 | NULL | |
So I tried use a different approach where I created the hive table upfront and insert data into it from:
dataframe:Running the DDL in step4 above
spark.sql("insert into schema.table select * from tmpTable")
And even this way fails if I run the aforementioned select query once the job is completed. I tried to refresh the table using
refresh table schema.table and msckrepair table schema.table
just to see if there is any problem with the metadata but nothing seems to workout.
Could anyone let me know what is causing this phenomenon, is there is any problem with the way I operating the data here ?
Apache Hive
Apache Spark
12:28 AM
I am trying to move data from table: system_releases from Greenplum to Hive in the below manner: val yearDF ="jdbc").option("url", "urltemplate;MaxNumericScale=30;MaxNumericPrecision=40;")
.option("dbtable", s"(${execQuery}) as year2016")
.option("user", "user")
.option("password", "pwd")
.option("lowerBound", 306)
.option("upperBound", 500)
.load() Inferred Schema of the dataFrame yearDF by spark: description:string
change_number_text:string I have the same table on hive with following datatypes: val hiveCols=string,status_date:timestamp,time_zone:string,table_refresh_delay_min:double,online_patching_enabled_flag:string,release_number:double,change_number:double,interface_queue_enabled_flag:string,rework_enabled_flag:string,smart_transfer_enabled_flag:string,patch_number:double,threading_enabled_flag:string,drm_gl_source_name:string,reverted_flag:string,table_refresh_delay_min_text:string,release_number_text:string,change_number_text:string The columns: table_refresh_delay_min, release_number, change_number and patch_number are giving too many decimal points even though there aren't many in GP. So I tried to save it as a CSV file to take a look at how data is being read by spark. For example, the max number of release_number on GP is: 306.00 but in the csv file I saved the dataframe: yearDF, the value becoms 306.000000000000000000. I tried to take the hive table schema and converted to StructType to apply that on yearDF as below. def convertDatatype(datatype: String): DataType = {
val convert = datatype match {
case "string" => StringType
case "bigint" => LongType
case "int" => IntegerType
case "double" => DoubleType
case "date" => TimestampType
case "boolean" => BooleanType
case "timestamp" => TimestampType
val schemaList = hiveCols.split(",")
val schemaStructType = new StructType( => col.split(":")).map(e => StructField(e(0), convertDatatype(e(1)), true)))
val newDF = spark.createDataFrame(yearDF.rdd, schemaStructType)
newDF.write.format("csv").save("hdfs/location") But I am getting the error: Caused by: java.lang.RuntimeException: java.math.BigDecimal is not a valid external type for schema of double
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfFalseExpr8$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_2$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287)
... 17 more I tried to cast the decimal columns into DoubleType in the below manner but I still face the same exception. val pattern = """DecimalType\(\d+,(\d+)\)""".r
val df2 = dataDF.dtypes.
collect{ case (dn, dt) if pattern.findFirstMatchIn(dt).map("0") != "0" => dn }.
foldLeft(dataDF)((accDF, c) => accDF.withColumn(c, col(c).cast("Double")))
Caused by: java.lang.RuntimeException: java.math.BigDecimal is not a valid external type for schema of double
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfFalseExpr8$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_2$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287)
... 17 more I am out of ideas after trying to implement the above two ways. Could anyone let me know how can I cast the columns of a dataframe properly to the required datatypes ?
Apache Hive
Apache Spark
05:56 AM
I have a dataframe yeadDF, created by reading an RDBMS table as below: val yearDF ="jdbc").option("url", connectionUrl)
.option("dbtable", s"(${query}) as year2017")
.option("user", devUserName)
.option("password", devPassword)
.load() I have to apply a regex pattern to the above dataframe before ingesting it into Hive table on HDFS. Below is the regex pattern: regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(%s, E'[\\\\n]+', ' ', 'g' ), E'[\\\\r]+', ' ', 'g' ), E'[\\\\t]+', ' ', 'g' ), E'[\\\\cA]+', ' ', 'g' ), E'[\\\\ca]+', ' ', 'g' ) I should be applying this regex only on the columns that are of datatype String in the dataframe: yearDF . I tried the following way: val regExpr = => if(x.dataType == String)
"regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(%s, E'[\\\\n]+', ' ', 'g' ), E'[\\\\r]+', ' ', 'g' ), E'[\\\\t]+', ' ', 'g' ), E'[\\\\cA]+', ' ', 'g' ), E'[\\\\ca]+', ' ', 'g' ) as %s".format(x,x))
yearDF.selectExpr(regExpr:_*) But it gives me a compilation error: Type mismatch, expected: Seq[String], actual: Array[Any] I cannot use: as this will act on all the columns and I am unable to properly form the logic here. Could anyone let me know how can I apply the regex mentioned above on the dataframe:yearDF only on the columns that are of String type ?
Apache Spark
11:58 PM
I am trying to set up SPARK2 on my cloudera cluster. For that, I have JDK1.8:
I have installed scala 2.11.8 using the rpm file:
I have downloaded, extracted the spark version 2.2.0 on my home directory: /home/cloudera.
I made changes to the PATH variable in .bashrc as below:
But when I try to execute spark-shell from the home directory: /home/cloudera, it says no such file or directory which can be seen below:
[cloudera@quickstart ~]$ spark-shell
/home/cloudera/spark/bin/spark-class: line 71: /usr/java/jdk1.7.0_67-cloudera/bin/java: No such file or directory
[cloudera@quickstart ~]$
Could anyone let me know how can I fix the problem and configure it properly ?
Apache Spark
Quickstart VM
07:03 AM
I have hdfs cluster on linux centos which contains Spark.1.6.0 by default. Since it is an old one, I updated spark version to Spark.2.2.0 and Scala version to 2.11.8. In Spark2 using spark object, I am trying to load a file into spark in using below steps:
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> val spark = SparkSession.builder().getOrCreate()
18/07/09 01:25:27 WARN SparkContext: Using an existing SparkContext; some configuration may not take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@9ae0794
I don't see any problem until now. But when I load a text file into spark, I see the below exception:
scala> val input ="input")
18/07/09 01:26:04 ERROR Schema: Failed initialising database.
Unable to open a test connection to the given database. JDBC url = jdbc:derby:;databaseName=metastore_db;create=true, username = APP. Terminating connection pool (set lazyInit to true if you expect to start your database after your app). Original Exception: ------
java.sql.SQLException: Failed to start database 'metastore_db' with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@46bff388, see the next exception for details.
at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
at org.apache.derby.impl.jdbc.Util.seeNextException(Unknown Source)
at org.apache.derby.impl.jdbc.EmbedConnection.bootDatabase(Unknown Source)
at org.apache.derby.impl.jdbc.EmbedConnection.<init>(Unknown Source)
at org.apache.derby.jdbc.InternalDriver$ Source)
at org.apache.derby.jdbc.InternalDriver$ Source)
at Method)
at org.apache.derby.jdbc.InternalDriver.getNewEmbedConnection(Unknown Source)
at org.apache.derby.jdbc.InternalDriver.connect(Unknown Source)
at org.apache.derby.jdbc.InternalDriver.connect(Unknown Source)
at org.apache.derby.jdbc.AutoloadedDriver.connect(Unknown Source)
at java.sql.DriverManager.getConnection(
at java.sql.DriverManager.getConnection(
at com.jolbox.bonecp.BoneCP.obtainRawInternalConnection(
at com.jolbox.bonecp.BoneCP.<init>(
at com.jolbox.bonecp.BoneCPDataSource.getConnection(
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
at java.lang.reflect.Constructor.newInstance(
at org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(
at org.datanucleus.plugin.PluginManager.createExecutableExtension(
at org.datanucleus.NucleusContext.createStoreManagerForProperties(
at org.datanucleus.NucleusContext.initialise(
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
at java.lang.reflect.Method.invoke(
at javax.jdo.JDOHelper$
at Method)
at javax.jdo.JDOHelper.invoke(
at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(
at javax.jdo.JDOHelper.getPersistenceManagerFactory(
at javax.jdo.JDOHelper.getPersistenceManagerFactory(
at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(
at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(
at org.apache.hadoop.hive.metastore.ObjectStore.initialize(
at org.apache.hadoop.hive.metastore.ObjectStore.setConf(
at org.apache.hadoop.util.ReflectionUtils.setConf(
at org.apache.hadoop.util.ReflectionUtils.newInstance(
at org.apache.hadoop.hive.metastore.RawStoreProxy.<init>(
at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(
at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(
at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
at java.lang.reflect.Constructor.newInstance(
at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(
at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(
at org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(
at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(
at org.apache.hadoop.hive.ql.metadata.Hive.<clinit>(
at org.apache.hadoop.hive.ql.session.SessionState.start(
at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:191)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
at java.lang.reflect.Constructor.newInstance(
at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:264)
at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:362)
at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:266)
at org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:66)
at org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:65)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply$mcZ$sp(HiveExternalCatalog.scala:194)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:194)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:194)
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:193)
at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:105)
at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:93)
at org.apache.spark.sql.hive.HiveSessionStateBuilder.externalCatalog(HiveSessionStateBuilder.scala:39)
at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog$lzycompute(HiveSessionStateBuilder.scala:54)
at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog(HiveSessionStateBuilder.scala:52)
at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog(HiveSessionStateBuilder.scala:35)
at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1050)
at org.apache.spark.sql.SparkSession$$anonfun$sessionState$2.apply(SparkSession.scala:130)
at org.apache.spark.sql.SparkSession$$anonfun$sessionState$2.apply(SparkSession.scala:130)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:129)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:126)
at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:689)
at $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:20)
at $line15.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:25)
at $line15.$read$$iw$$iw$$iw$$iw.<init>(<console>:27)
at $line15.$read$$iw$$iw$$iw.<init>(<console>:29)
at $line15.$read$$iw$$iw.<init>(<console>:31)
at $line15.$read$$iw.<init>(<console>:33)
at $line15.$read.<init>(<console>:35)
at $line15.$read$.<init>(<console>:39)
at $line15.$read$.<clinit>(<console>)
at $line15.$eval$.$print$lzycompute(<console>:7)
at $line15.$eval$.$print(<console>:6)
at $line15.$eval.$print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
at java.lang.reflect.Method.invoke(
at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
at org.apache.spark.repl.Main$.doMain(Main.scala:70)
at org.apache.spark.repl.Main$.main(Main.scala:53)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
at java.lang.reflect.Method.invoke(
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: ERROR XJ040: Failed to start database 'metastore_db' with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@46bff388, see the next exception for details.
at org.apache.derby.iapi.error.StandardException.newException(Unknown Source)
at org.apache.derby.impl.jdbc.SQLExceptionFactory.wrapArgsForTransportAcrossDRDA(Unknown Source)
... 147 more
Caused by: ERROR XSDB6: Another instance of Derby may have already booted the database /home/cloudera/metastore_db.
at org.apache.derby.impl.jdbc.EmbedConnection$ Source)
at Method)
at org.apache.derby.impl.jdbc.EmbedConnection.startPersistentService(Unknown Source)
... 144 more
org.datanucleus.exceptions.NucleusDataStoreException: Unable to open a test connection to the given database. JDBC url = jdbc:derby:;databaseName=metastore_db;create=true, username = APP. Terminating connection pool (set lazyInit to true if you expect to start your database after your app). Original Exception: ------
java.sql.SQLException: Failed to start database 'metastore_db' with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@46bff388, see the next exception for details.
I checked f there are any other instances running using the command: ps -ef | grep spark-shell and found couple of instances running. So I killed all of them and started to work on the same. But the same exception occurs when I repeat all same steps to load a textFile.
I checked for solutions in similar questions and tried to fix it in vain. Could anyone let me know how can I fix this problem.
Apache Spark
07:40 AM
I am trying to retrieve records from the Hive tables. For that I have written the following connection in Java. public static Connection getHiveConnection() throws IOException, SQLException {
System.out.println("Preparing Hive connection1");
Configuration conf = new Configuration();
conf.set("", "Kerberos");
System.setProperty("", "ip-x-x-x-x.bc9.internal");
System.setProperty("", "ABCDEV.COM");
UserGroupInformation.loginUserFromKeytab("username@ABCDEV.COM", "/home/username/username.keytab");
// UserGroupInformation.setConfiguration(conf);
System.out.println("Kerberos Connected");
// Hive Connection
try {
if((hiveConnection == null) || hiveConnection.isClosed()) {
hiveConnection = DriverManager.getConnection("jdbc:hive2://x.x.x.x.x:10500/schema;principal=hive/_HOST@ABCDEV.COM", "user", "pwd");
return hiveConnection;
} else {
return hiveConnection;
} catch (ClassNotFoundException e) {
return null;
} catch (SQLException e) {
return null;
} When I try to use the connection from this class, in other class like below: Connection hiveCon = (Connection) DbManager.getHiveConnection();
PreparedStatement hive_pstmnt = hiveCon.prepareStatement(hiveCountQuery);
ResultSet hiveRs = hive_pstmnt.executeQuery(); I am getting the exception: Preparing Hive connection1
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See for further details.
Kerberos Connected
java.sql.SQLException: Could not open client transport with JDBC Uri: jdbc:hive2://x.x.x.x.x:10500/schema;principal=hive/_HOST@ABCDEV.COM: GSS initiate failed
Caused by: org.apache.thrift.transport.TTransportException: GSS initiate failed In the code, I have commented the line: UserGroupInformation.setConfiguration(conf); and if I remove the comments and execute the code, I get a different exception: Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
at java.lang.reflect.Method.invoke(
at org.eclipse.jdt.internal.jarinjarloader.JarRsrcLoader.main(
Caused by: java.lang.IllegalArgumentException: Invalid attribute value for of Kerberos
at com.dbconnections.gphive.DbManager.getHiveConnection( I have done kinit and generated the ticket just to make sure the ticket has valid time period. Could anyone let me know what is the mistake I am doing here so that can correct the code.
Apache Hive
10:30 PM
The files will not be in a specific order. Is this a solution: Load all the files into Spark & create a dataframe out of it and then split this main dataframe into smaller ones by using the delimiter("...") which is present at the end of each file. Once this is done, map the dataframes by checking if the third line of each file contains the words: "SEVERE: Error" and group/merge them together. Similarly following the approach for the other cases and finally have three separate dataframes exclusice for each case. Is this approach viable or is there any better way I can follow.
03:22 AM
In a directory, I have sub directories which are created everyday. My requirement is to work on the files that are created yesterday. To do that, I came up with a logic that will get the latest dirs. In my case yesterday's dirs. I was able to do it using the below code. val simpDate = new java.text.SimpleDateFormat("yyyy-MM-dd")
val currDate = simpDate.format(new java.util.Date())
val now = // Gets current date in the format:2017-12-13T09:40:29.920Z
val today = now.toEpochMillival yesterday = now.minus(Duration.ofDays(1))
val yesterdayMilliSec = yesterday.toEpochMillival todaySimpDate = t(today)
val yesterdaySimpDate = t(yesterdayMilliSec)
val local:String = "file://"
val folders = getFileTree(new File("dailylogs")).filterNot(_.getName.endsWith(".log")) // Gets the date of dir
val folderCrtDateDesc = => (y,y.lastModified)).sortBy(-_._2)
val latestFolder =>(y._1,t(y._2)))
val folderToday = latestFolder.filter(y => y._2==todaySimpDate) Now I have the latest dir in folderToday which looks like: "dailylogs/auditlogsdec27". Using the above code I can load the whole dir into spark, which in turn loads all the files into spark in a single dataframe. Each file starts with the record: "JobID" and ends with the record: "[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper..." There are 3 kinds of status in files in that directory. They are error, success, failure The status for 'error' can be identified from the third line. For 'success' & 'failure' the same could be found on sixth line in the file. file1: status: errorJobID: 454
[Wed Dec 27 05:38:47 UTC 2017] INFO: Starting Auditing for : baseTable1[Wed Dec 27 05:38:49 UTC 2017] SEVERE: Error while compiling statement: FAILED: SemanticException [Error 10004]: Line 1:261 Invalid table alias or column [Wed Dec 27 05:38:49 UTC 2017] INFO:
Completed Auditing for : baseTable1[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper...file2: status: success
JobID: 455
[Wed Dec 27 05:38:18 UTC 2017] INFO: Starting Auditing for : baseTable2[Wed Dec 27 05:38:19 UTC 2017] INFO: Connections established to gp and finance ...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Starting the auditing for the intial fetched set of records...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Number of pk columns in the src table: 16. Number of PK Columns in the dest table: 16
[Wed Dec 27 05:38:20 UTC 2017] INFO: Success
Completed Auditing for : baseTable2[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper...file3: status: failure
JobID: 547
[Wed Dec 27 05:38:18 UTC 2017] INFO: Starting Auditing for : baseTable3[Wed Dec 27 05:38:19 UTC 2017] INFO: Connections established to gp and finance ...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Starting the auditing for the intial fetched set of records...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Number of pk columns in the src table: 16. Number of PK Columns in the dest table: 5
[Wed Dec 27 05:38:20 UTC 2017] INFO: Failed. Invalid data found.
Completed Auditing for : baseTable3[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper... I know how to load a single file into spark and work on that dataframe. Since there are huge number of files in the dir everyday, I want to follow this approach of loading the whole dir into a single dataframe and then work on the data inside it rather open and read every small file. I want to split the dataframe based on the last record as the delimiter (in this case, each file ends with ... ) and create three separate dataframes for the error, success & failure (three dataframes of their own). Can anyone tell me how can I implement that ?
Apache Spark
04:36 AM
I am trying to load a dataset into Hive table using Spark. But when I try to load the file from HDFS directory to Spark, I get the exception: org.apache.spark.sql.AnalysisException: Path does not exist: file:/home/cloudera/partfile; These are the steps before loading the file. val wareHouseLocation = "file:${system:user.dir}/spark-warehouse"
val SparkSession = SparkSession.builder.master("spark://localhost:7077") \
.appName("SparkHive") \
.enableHiveSupport() \
.config("hive.exec.dynamic.partition", "true") \
.config("hive.exec.dynamic.partition.mode","nonstrict") \
.config("hive.metastore.warehouse.dir","/user/hive/warehouse") \
import sparkSession.implicits._
val partf ="partfile") Exception for the statement -> val partf ="partfile")
org.apache.spark.sql.AnalysisException: Path does not exist: file:/home/cloudera/partfile; But I have the file in my home directory of HDFS. hadoop fs -ls
Found 1 items
-rw-r--r-- 1 cloudera cloudera 58 2017-06-30 02:23 partfile My spark version is 2.0.2 Could anyone tell me how to fix it ?
Apache Spark