Created on 02-06-2019 12:28 AM - edited 09-16-2022 07:07 AM
I am trying to move data from table: system_releases from Greenplum to Hive in the below manner:
val yearDF = spark.read.format("jdbc").option("url", "urltemplate;MaxNumericScale=30;MaxNumericPrecision=40;") .option("dbtable", s"(${execQuery}) as year2016") .option("user", "user") .option("password", "pwd") .option("partitionColumn","release_number") .option("lowerBound", 306) .option("upperBound", 500) .option("numPartitions",2) .load()
Inferred Schema of the dataFrame yearDF by spark:
description:string status_date:timestamp time_zone:string table_refresh_delay_min:decimal(38,30) online_patching_enabled_flag:string release_number:decimal(38,30) change_number:decimal(38,30) interface_queue_enabled_flag:string rework_enabled_flag:string smart_transfer_enabled_flag:string patch_number:decimal(38,30) threading_enabled_flag:string drm_gl_source_name:string reverted_flag:string table_refresh_delay_min_text:string release_number_text:string change_number_text:string
I have the same table on hive with following datatypes:
val hiveCols=string,status_date:timestamp,time_zone:string,table_refresh_delay_min:double,online_patching_enabled_flag:string,release_number:double,change_number:double,interface_queue_enabled_flag:string,rework_enabled_flag:string,smart_transfer_enabled_flag:string,patch_number:double,threading_enabled_flag:string,drm_gl_source_name:string,reverted_flag:string,table_refresh_delay_min_text:string,release_number_text:string,change_number_text:string
The columns:
table_refresh_delay_min, release_number, change_number and patch_number
are giving too many decimal points even though there aren't many in GP. So I tried to save it as a CSV file to take a look at how data is being read by spark. For example, the max number of release_number on GP is: 306.00 but in the csv file I saved the dataframe: yearDF, the value becoms 306.000000000000000000.
I tried to take the hive table schema and converted to StructType to apply that on yearDF as below.
def convertDatatype(datatype: String): DataType = { val convert = datatype match { case "string" => StringType case "bigint" => LongType case "int" => IntegerType case "double" => DoubleType case "date" => TimestampType case "boolean" => BooleanType case "timestamp" => TimestampType } convert } val schemaList = hiveCols.split(",") val schemaStructType = new StructType(schemaList.map(col => col.split(":")).map(e => StructField(e(0), convertDatatype(e(1)), true))) val newDF = spark.createDataFrame(yearDF.rdd, schemaStructType) newDF.write.format("csv").save("hdfs/location")
But I am getting the error:
Caused by: java.lang.RuntimeException: java.math.BigDecimal is not a valid external type for schema of double at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfFalseExpr8$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_2$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287) ... 17 more
I tried to cast the decimal columns into DoubleType in the below manner but I still face the same exception.
val pattern = """DecimalType\(\d+,(\d+)\)""".r val df2 = dataDF.dtypes. collect{ case (dn, dt) if pattern.findFirstMatchIn(dt).map(_.group(1)).getOrElse("0") != "0" => dn }. foldLeft(dataDF)((accDF, c) => accDF.withColumn(c, col(c).cast("Double"))) Caused by: java.lang.RuntimeException: java.math.BigDecimal is not a valid external type for schema of double at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfFalseExpr8$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_2$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287) ... 17 more
I am out of ideas after trying to implement the above two ways. Could anyone let me know how can I cast the columns of a dataframe properly to the required datatypes ?