Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to cast Decimal columns of dataframe to DoubleType while moving data to Hive using spark ?

avatar
Explorer

I am trying to move data from table: system_releases from Greenplum to Hive in the below manner:

val yearDF = spark.read.format("jdbc").option("url", "urltemplate;MaxNumericScale=30;MaxNumericPrecision=40;")
                                      .option("dbtable", s"(${execQuery}) as year2016")
                                      .option("user", "user")
                                      .option("password", "pwd")
                                      .option("partitionColumn","release_number")
                                      .option("lowerBound", 306)
                                      .option("upperBound", 500)
                                      .option("numPartitions",2)
                                      .load()

Inferred Schema of the dataFrame yearDF by spark:

description:string
status_date:timestamp
time_zone:string
table_refresh_delay_min:decimal(38,30)
online_patching_enabled_flag:string
release_number:decimal(38,30)
change_number:decimal(38,30)
interface_queue_enabled_flag:string
rework_enabled_flag:string
smart_transfer_enabled_flag:string
patch_number:decimal(38,30)
threading_enabled_flag:string
drm_gl_source_name:string
reverted_flag:string
table_refresh_delay_min_text:string
release_number_text:string
change_number_text:string

I have the same table on hive with following datatypes:

val hiveCols=string,status_date:timestamp,time_zone:string,table_refresh_delay_min:double,online_patching_enabled_flag:string,release_number:double,change_number:double,interface_queue_enabled_flag:string,rework_enabled_flag:string,smart_transfer_enabled_flag:string,patch_number:double,threading_enabled_flag:string,drm_gl_source_name:string,reverted_flag:string,table_refresh_delay_min_text:string,release_number_text:string,change_number_text:string

The columns: 

table_refresh_delay_min, release_number, change_number and patch_number

are giving too many decimal points even though there aren't many in GP. So I tried to save it as a CSV file to take a look at how data is being read by spark. For example, the max number of release_number on GP is: 306.00 but in the csv file I saved the dataframe: yearDF, the value becoms 306.000000000000000000.

 

I tried to take the hive table schema and converted to StructType to apply that on yearDF as below.

def convertDatatype(datatype: String): DataType = {
  val convert = datatype match {
    case "string"     => StringType
    case "bigint"     => LongType
    case "int"        => IntegerType
    case "double"     => DoubleType
    case "date"       => TimestampType
    case "boolean"    => BooleanType
    case "timestamp"  => TimestampType
  }
  convert
}

val schemaList        = hiveCols.split(",")
val schemaStructType  = new StructType(schemaList.map(col => col.split(":")).map(e => StructField(e(0), convertDatatype(e(1)), true)))
val newDF = spark.createDataFrame(yearDF.rdd, schemaStructType)
newDF.write.format("csv").save("hdfs/location")

But I am getting the error:

Caused by: java.lang.RuntimeException: java.math.BigDecimal is not a valid external type for schema of double
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfFalseExpr8$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_2$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287)
    ... 17 more

I tried to cast the decimal columns into DoubleType in the below manner but I still face the same exception.

 val pattern = """DecimalType\(\d+,(\d+)\)""".r
  val df2 = dataDF.dtypes.
    collect{ case (dn, dt) if pattern.findFirstMatchIn(dt).map(_.group(1)).getOrElse("0") != "0" => dn }.
    foldLeft(dataDF)((accDF, c) => accDF.withColumn(c, col(c).cast("Double")))

   Caused by: java.lang.RuntimeException: java.math.BigDecimal is not a valid external type for schema of double
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalIfFalseExpr8$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_2$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287)
    ... 17 more

I am out of ideas after trying to implement the above two ways. Could anyone let me know how can I cast the columns of a dataframe properly to the required datatypes ?

2 REPLIES 2

avatar
Expert Contributor

Can you share  2 sample records from the DataFrame for better understanding. 

avatar
Super Collaborator

Hi @Sidhartha 

Could you please try the following sample

  def convertDatatype(datatype: String): DataType = {
    val convert = datatype match {
      case "string" => StringType
      case "short" => ShortType
      case "int" => IntegerType
      case "bigint" => LongType
      case "float" => FloatType
      case "double" => DoubleType
      case "decimal" => DecimalType(38,30)
      case "date" => TimestampType
      case "boolean" => BooleanType
      case "timestamp" => TimestampType
    }
    convert
  }

  val input_data = List(Row(1l, "Ranga", 27, BigDecimal(306.000000000000000000)), Row(2l, "Nishanth", 6, BigDecimal(606.000000000000000000)))
  val input_rdd = spark.sparkContext.parallelize(input_data)
  val hiveCols="id:bigint,name:string,age:int,salary:decimal"
  val schemaList = hiveCols.split(",")
  val schemaStructType = new StructType(schemaList.map(col => col.split(":")).map(e => StructField(e(0), convertDatatype(e(1)), true)))
  val myDF = spark.createDataFrame(input_rdd, schemaStructType)
  myDF.printSchema()
  myDF.show()

  val myDF2 = myDF.withColumn("new_salary", col("salary").cast("double"))
  myDF2.printSchema()
  myDF2.show()