Support Questions

Find answers, ask questions, and share your expertise

How to cast Decimal columns of dataframe to DoubleType while moving data to Hive using spark ?

avatar
Explorer

I am trying to move data from table: system_releases from Greenplum to Hive in the below manner:

val yearDF = spark.read.format("jdbc").option("url", "urltemplate;MaxNumericScale=30;MaxNumericPrecision=40;")
                                      .option("dbtable", s"(${execQuery}) as year2016")
                                      .option("user", "user")
                                      .option("password", "pwd")
                                      .option("partitionColumn","release_number")
                                      .option("lowerBound", 306)
                                      .option("upperBound", 500)
                                      .option("numPartitions",2)
                                      .load()

Inferred Schema of the dataFrame yearDF by spark:

description:string
status_date:timestamp
time_zone:string
table_refresh_delay_min:decimal(38,30)
online_patching_enabled_flag:string
release_number:decimal(38,30)
change_number:decimal(38,30)
interface_queue_enabled_flag:string
rework_enabled_flag:string
smart_transfer_enabled_flag:string
patch_number:decimal(38,30)
threading_enabled_flag:string
drm_gl_source_name:string
reverted_flag:string
table_refresh_delay_min_text:string
release_number_text:string
change_number_text:string

I have the same table on hive with following datatypes:

val hiveCols=string,status_date:timestamp,time_zone:string,table_refresh_delay_min:double,online_patching_enabled_flag:string,release_number:double,change_number:double,interface_queue_enabled_flag:string,rework_enabled_flag:string,smart_transfer_enabled_flag:string,patch_number:double,threading_enabled_flag:string,drm_gl_source_name:string,reverted_flag:string,table_refresh_delay_min_text:string,release_number_text:string,change_number_text:string

The columns: 

table_refresh_delay_min, release_number, change_number and patch_number

are giving too many decimal points even though there aren't many in GP. So I tried to save it as a CSV file to take a look at how data is being read by spark. For example, the max number of release_number on GP is: 306.00 but in the csv file I saved the dataframe: yearDF, the value becoms 306.000000000000000000.

 

I tried to take the hive table schema and converted to StructType to apply that on yearDF as below.

def convertDatatype(datatype: String): DataType = {
  val convert = datatype match {
    case "string"     => StringType
    case "bigint"     => LongType
    case "int"        => IntegerType
    case "double"     => DoubleType
    case "date"       => TimestampType
    case "boolean"    => BooleanType
    case "timestamp"  => TimestampType
  }
  convert
}

val schemaList        = hiveCols.split(",")
val schemaStructType  = new StructType(schemaList.map(col => col.split(":")).map(e => StructField(e(0), convertDatatype(e(1)), true)))
val newDF = spark.createDataFrame(yearDF.rdd, schemaStructType)
newDF.write.format("csv").save("hdfs/location")

But I am getting the error:

Caused by: java.lang.RuntimeException: java.math.BigDecimal is not a valid external type for schema of double
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfFalseExpr8$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_2$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287)
    ... 17 more

I tried to cast the decimal columns into DoubleType in the below manner but I still face the same exception.

 val pattern = """DecimalType\(\d+,(\d+)\)""".r
  val df2 = dataDF.dtypes.
    collect{ case (dn, dt) if pattern.findFirstMatchIn(dt).map(_.group(1)).getOrElse("0") != "0" => dn }.
    foldLeft(dataDF)((accDF, c) => accDF.withColumn(c, col(c).cast("Double")))

   Caused by: java.lang.RuntimeException: java.math.BigDecimal is not a valid external type for schema of double
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfFalseExpr8$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_2$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287)
    ... 17 more

I am out of ideas after trying to implement the above two ways. Could anyone let me know how can I cast the columns of a dataframe properly to the required datatypes ?

2 REPLIES 2

avatar
Master Collaborator

Can you share  2 sample records from the DataFrame for better understanding. 

avatar
Master Collaborator

Hi @Sidhartha 

Could you please try the following sample

  def convertDatatype(datatype: String): DataType = {
    val convert = datatype match {
      case "string" => StringType
      case "short" => ShortType
      case "int" => IntegerType
      case "bigint" => LongType
      case "float" => FloatType
      case "double" => DoubleType
      case "decimal" => DecimalType(38,30)
      case "date" => TimestampType
      case "boolean" => BooleanType
      case "timestamp" => TimestampType
    }
    convert
  }

  val input_data = List(Row(1l, "Ranga", 27, BigDecimal(306.000000000000000000)), Row(2l, "Nishanth", 6, BigDecimal(606.000000000000000000)))
  val input_rdd = spark.sparkContext.parallelize(input_data)
  val hiveCols="id:bigint,name:string,age:int,salary:decimal"
  val schemaList = hiveCols.split(",")
  val schemaStructType = new StructType(schemaList.map(col => col.split(":")).map(e => StructField(e(0), convertDatatype(e(1)), true)))
  val myDF = spark.createDataFrame(input_rdd, schemaStructType)
  myDF.printSchema()
  myDF.show()

  val myDF2 = myDF.withColumn("new_salary", col("salary").cast("double"))
  myDF2.printSchema()
  myDF2.show()