Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Sqoop import table as parquet file then read it with Spark gives error of mr-parquet versioning

Solved Go to solution

Sqoop import table as parquet file then read it with Spark gives error of mr-parquet versioning

Explorer

I have imported a table from DB2 using Sqoop 1.4.6.2 to HDFS in parquet format

sqoop import -Dmapreduce.job.user.classpath.first=true -Dhadoop.security.credential.provider.path=jceks://hdfs/user/toto/creds.jceks -Dorg.apache.sqoop.splitter.allow_text_splitter=true --connect jdbc:db2://myserver:myport/db:currentSchema=myschema;" --username <username> --password-alias <password> -m 2 --as-parquetfile --outdir /tmp/java --driver com.ibm.db2.jcc.DB2Driver --target-dir /user/toto/db/mytable --delete-target-dir --table mytable

I have my table directly imported into HDFS.

Then i try to read it using Spark 2.1.1.2 and Scala 2.11.8 and change the column type which come from DB2 as TIMESTAMP and imported by Sqoop as BIGINT. I have to modify the column type using Spark then erase the old parquet file with the new one:

spark-shell
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.parquet("/user/toto/db/mytable")

df.withColumn("col_bigint", ($"col_bigint / 1000).cast(TimestampType))
df.show(5, false)

Throwing a WARNING

WARN CorruptStatistics: Ignoring statistics because created_by could not be parsed (see PARQUET-251): parquet-mr
org.apache.parquet.VersionParsers$VersionParseException: Could not parse created_by: parquet-mr using format: (.+) version ((.*))?\(build ?(.*)\)

With Sqoop, it's impossible to have directly the desire column format in TIMESTAMP. It automatically put it in a BIGINT type. So i want to read and to perform some transformation on these parquets file to have the right format.

I already try to add every parquet-* JAR file into /usr/hdp/current/sqoop-cli/lib/

  • parquet-column-1.8.1.jar
  • parquet-common-1.8.1.jar
  • parquet-encoding-1.8.1.jar
  • parquet-format-2.3.0-incubating.jar
  • parquet-generator-1.8.1.jar
  • parquet-hadoop-1.8.1.jar
  • parquet-hadoop-bundle-1.6.0.jar
  • parquet-jackson-1.8.1.jar
  • parquet-avro-1.6.0.jar

If i change the parquet-avro-1.6.0.jar by the parquet-avro-1.8.1.jar, Sqoop couldn't process it, because he can't find the method AvroWriter

Initially, each JARs files in the Sqoop-CLI library were in version 1.6.0 but i change it and put them with the same version of my spark2 jar folder.

If anyone can find a way to make it work, I will be very grateful

1 ACCEPTED SOLUTION

Accepted Solutions
Highlighted

Re: Sqoop import table as parquet file then read it with Spark gives error of mr-parquet versioning

Explorer

A solution to import your data as parquet file and be able to treat the TIMESTAMP and DATE format which come from RDBMS such as IBM DB2 or MySQL is to import using the sqoop import --as-parquet command and map each field using --map-column-java which are TIMESTAMP and DATE to a String Java type.

After that, you should be able to interrogate the Hive database though a SparkSession by changing the configuration of the actual Spark Session and set spark.sql.hive.convertMetastoreParquet to false. SparkSQL will use the Hive SerDe for reading parquet tables instead of the built in support.

spark.sql.hive.convertMetastoreParquet false
import org.apache.spark.sql.SparkSession

val sparkSession = SparkSession.builder()
.appName("test interrogate Hive parquet file using Spark")
.config("spark.sql.parquet.compression.codec", "snappy")
.config("spark.sql.warehouse.dir","/apps/hive/warehouse")
.config("hive.metastore.uris","thrift://sdsl-hdp-01.mycluster:9083")
.config("spark.sql.hive.convertMetastoreParquet", false)
.enableHiveSupport()
.getOrCreate()

import spark.implicits._
import spark.sql
 
val df = sql("SELECT CAST(COL1 AS TIMESTAMP), COL2, COL3, CAST(COL4 AS TIMESTAMP), COL5 FROM db.mytable")
df.printSchema
root
 |-- COL1: timestamp (nullable = true)
 |-- COL2: string (nullable = true)
 |-- COL3: string (nullable = true)
 |-- COL4: timestamp (nullable = true)
 |-- COL5: integer (nullable = true)

df.show(5, false)
+--------------------------+--------+--------+--------------------------+------+
|COL1                	   |COL2    |COL3    |COL4                      |COL5|
+--------------------------+--------+--------+--------------------------+------+
|2003-01-01 00:00:00.100001|        |00001   |2003-01-01 00:00:00.10361 |1     |
|2003-01-01 00:00:00.100002|        |00002   |2003-01-01 00:00:00.100002|2     |
|2003-01-01 00:00:00.100003|        |00003   |2003-01-01 00:00:00.100003|3     |
|2003-01-01 00:00:00.100004|        |00004   |2003-01-01 00:00:00.100004|4     |
|2003-01-01 00:00:00.100005|        |00005   |2003-01-01 00:00:00.100005|5     |
+--------------------------+--------+--------+--------------------------+------+
only showing top 5 row

View solution in original post

7 REPLIES 7
Highlighted

Re: Sqoop import table as parquet file then read it with Spark gives error of mr-parquet versioning

Expert Contributor
@Thomas Bazzucchi,

I just did a quick test and can see that the sqoop does pick up the datetime correctly,

18/03/20 23:36:29 DEBUG manager.SqlManager: Found column create_time of type DATETIME

18/03/20 23:36:29 DEBUG manager.SqlManager: Found column update_time of type DATETIME

18/03/20 23:36:29 DEBUG manager.SqlManager: Found column added_by_id of type BIGINT

18/03/20 23:36:29 DEBUG manager.SqlManager: Found column upd_by_id of type BIGINT

Can you enable verbose on the sqoop command which can provide some lead on this, apart from this if you can share the ddl along with some sample data that would also help in getting where the issue would lie.

Also out of curiosity, have you tried using --map-column-java fieldname=TIMESTAMP like this?

Highlighted

Re: Sqoop import table as parquet file then read it with Spark gives error of mr-parquet versioning

Explorer

Sqoop picks up the DATETIME correctly but when you import data from database as parquet file format, with Sqoop you'll have issues regarding the parquet schema saved in Hive/HDFS.

The structure of the table is the following:

TAG002_AGENT

  • SDINTERMED (TIMESTAMP PK FK)
  • CSTATRED (CHAR(3))
  • VIDAGENT (TIMESTAMP Nullable FK)
  • CONTAG (SMALLINT)

I also add options --verbose and --map-column-java SDINTERMED=Timestamp (and also try SDINTERMED=java.sql.Timestamp)

This is the following command that i launch:

sqoop import -Dmapreduce.job.user.classpath.first=true -Dhadoop.security.credential.provider.path=jceks://hdfs/user/airflow/credentials.jceks -Dorg.apache.sqoop.splitter.allow_text_splitter=true --connect "jdbc:db2://MVSTST1.lefoyer.lu:5047/DB2B:currentSchema=DEVB;" --username TAPBDB2B --password-alias db2.password -m 2 --as-parquetfile --outdir /tmp/java --driver com.ibm.db2.jcc.DB2Driver --target-dir /user/airflow/db2b/TAG002_AGENT --delete-target-dir --table TAG002_AGENT --map-column-java SDAGRESP=Timestamp,SDINTERMED=Timestamp --verbose
Highlighted

Re: Sqoop import table as parquet file then read it with Spark gives error of mr-parquet versioning

Explorer

Actual STDOUT output

[2018-03-23 09:07:48,433] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG sqoop.ConnFactory: Loaded manager factory: org.apache.sqoop.manager.oracle.OraOopManagerFactory
[2018-03-23 09:07:48,445] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG sqoop.ConnFactory: Loaded manager factory: com.cloudera.sqoop.manager.DefaultManagerFactory
[2018-03-23 09:07:48,445] {bash_operator.py:101} INFO - 18/03/23 09:07:48 WARN sqoop.ConnFactory: Parameter --driver is set to an explicit driver however appropriate connection manager is not being set (via --connection-manager). Sqoop is going to fall back to org.apache.sqoop.manager.GenericJdbcManager. Please specify explicitly which connection manager should be used next time.
[2018-03-23 09:07:48,457] {bash_operator.py:101} INFO - 18/03/23 09:07:48 INFO manager.SqlManager: Using default fetchSize of 1000
[2018-03-23 09:07:48,457] {bash_operator.py:101} INFO - 18/03/23 09:07:48 INFO tool.CodeGenTool: Will generate java class as codegen_TAG002_AGENT
[2018-03-23 09:07:48,494] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG manager.SqlManager: Execute getColumnInfoRawQuery : SELECT t.* FROM TAG002_AGENT AS t WHERE 1=0
[2018-03-23 09:07:48,555] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG manager.SqlManager: No connection paramenters specified. Using regular API for making connection.
[2018-03-23 09:07:48,920] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG manager.SqlManager: Using fetchSize for next query: 1000
[2018-03-23 09:07:48,920] {bash_operator.py:101} INFO - 18/03/23 09:07:48 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM TAG002_AGENT AS t WHERE 1=0
[2018-03-23 09:07:48,959] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG manager.SqlManager: Found column SDINTERMED of type [93, 26, 6]
[2018-03-23 09:07:48,959] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG manager.SqlManager: Found column CSTATRED of type [1, 3, 0]
[2018-03-23 09:07:48,959] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG manager.SqlManager: Found column VIDAGENT of type [1, 5, 0]
[2018-03-23 09:07:48,959] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG manager.SqlManager: Found column SDAGRESP of type [93, 26, 6]
[2018-03-23 09:07:48,959] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG manager.SqlManager: Found column CONTAG of type [5, 5, 0]
[2018-03-23 09:07:48,960] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG manager.SqlManager: Using fetchSize for next query: 1000
[2018-03-23 09:07:48,960] {bash_operator.py:101} INFO - 18/03/23 09:07:48 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM TAG002_AGENT AS t WHERE 1=0
[2018-03-23 09:07:48,967] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG manager.SqlManager: Found column SDINTERMED
[2018-03-23 09:07:48,967] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG manager.SqlManager: Found column CSTATRED
[2018-03-23 09:07:48,967] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG manager.SqlManager: Found column VIDAGENT
[2018-03-23 09:07:48,968] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG manager.SqlManager: Found column SDAGRESP
[2018-03-23 09:07:48,968] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG manager.SqlManager: Found column CONTAG
[2018-03-23 09:07:48,968] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG orm.ClassWriter: selected columns:
[2018-03-23 09:07:48,968] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG orm.ClassWriter:   SDINTERMED
[2018-03-23 09:07:48,968] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG orm.ClassWriter:   CSTATRED
[2018-03-23 09:07:48,968] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG orm.ClassWriter:   VIDAGENT
[2018-03-23 09:07:48,968] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG orm.ClassWriter:   SDAGRESP
[2018-03-23 09:07:48,969] {bash_operator.py:101} INFO - 18/03/23 09:07:48 DEBUG orm.ClassWriter:   CONTAG
[2018-03-23 09:07:48,982] {bash_operator.py:101} INFO - 18/03/23 09:07:48 INFO orm.ClassWriter: Overriding type of column SDINTERMED to Timestamp
[2018-03-23 09:07:48,985] {bash_operator.py:101} INFO - 18/03/23 09:07:48 INFO orm.ClassWriter: Overriding type of column SDINTERMED to Timestamp
[2018-03-23 09:07:48,985] {bash_operator.py:101} INFO - 18/03/23 09:07:48 ERROR orm.ClassWriter: No ResultSet method for Java type Timestamp
[2018-03-23 09:07:48,985] {bash_operator.py:101} INFO - 18/03/23 09:07:48 INFO orm.ClassWriter: Overriding type of column SDAGRESP to Timestamp
[2018-03-23 09:07:48,985] {bash_operator.py:101} INFO - 18/03/23 09:07:48 ERROR orm.ClassWriter: No ResultSet method for Java type Timestamp
[2018-03-23 09:07:48,986] {bash_operator.py:101} INFO - 18/03/23 09:07:48 INFO orm.ClassWriter: Overriding type of column SDINTERMED to Timestamp
[2018-03-23 09:07:48,986] {bash_operator.py:101} INFO - 18/03/23 09:07:48 ERROR tool.ImportTool: Imported Failed: No ResultSet method for Java type Timestamp
Highlighted

Re: Sqoop import table as parquet file then read it with Spark gives error of mr-parquet versioning

@Thomas Bazzucchi

That's why ORC should be your preferred file format :)

Ok, jokes apart, here is a similar issue reported already! This one talks about access issue to the log files and the user was able to fix it. Please have a look and let know if that fix your issue.

Highlighted

Re: Sqoop import table as parquet file then read it with Spark gives error of mr-parquet versioning

Explorer

@Rahul Soni I totally agree with you! Especially when you're working with Hive tables. But the customer is working with Spark and require parquet files as input data for the Spark jobs.

I already check the reported issue and made the necessary modification on the /tmp/parquet-0.log and /tmp/parquet-0.log.lock access for my user (which is not hive in my case).

Highlighted

Re: Sqoop import table as parquet file then read it with Spark gives error of mr-parquet versioning

@Thomas Bazzucchi

Were you able to fix the issue?

Highlighted

Re: Sqoop import table as parquet file then read it with Spark gives error of mr-parquet versioning

Explorer

A solution to import your data as parquet file and be able to treat the TIMESTAMP and DATE format which come from RDBMS such as IBM DB2 or MySQL is to import using the sqoop import --as-parquet command and map each field using --map-column-java which are TIMESTAMP and DATE to a String Java type.

After that, you should be able to interrogate the Hive database though a SparkSession by changing the configuration of the actual Spark Session and set spark.sql.hive.convertMetastoreParquet to false. SparkSQL will use the Hive SerDe for reading parquet tables instead of the built in support.

spark.sql.hive.convertMetastoreParquet false
import org.apache.spark.sql.SparkSession

val sparkSession = SparkSession.builder()
.appName("test interrogate Hive parquet file using Spark")
.config("spark.sql.parquet.compression.codec", "snappy")
.config("spark.sql.warehouse.dir","/apps/hive/warehouse")
.config("hive.metastore.uris","thrift://sdsl-hdp-01.mycluster:9083")
.config("spark.sql.hive.convertMetastoreParquet", false)
.enableHiveSupport()
.getOrCreate()

import spark.implicits._
import spark.sql
 
val df = sql("SELECT CAST(COL1 AS TIMESTAMP), COL2, COL3, CAST(COL4 AS TIMESTAMP), COL5 FROM db.mytable")
df.printSchema
root
 |-- COL1: timestamp (nullable = true)
 |-- COL2: string (nullable = true)
 |-- COL3: string (nullable = true)
 |-- COL4: timestamp (nullable = true)
 |-- COL5: integer (nullable = true)

df.show(5, false)
+--------------------------+--------+--------+--------------------------+------+
|COL1                	   |COL2    |COL3    |COL4                      |COL5|
+--------------------------+--------+--------+--------------------------+------+
|2003-01-01 00:00:00.100001|        |00001   |2003-01-01 00:00:00.10361 |1     |
|2003-01-01 00:00:00.100002|        |00002   |2003-01-01 00:00:00.100002|2     |
|2003-01-01 00:00:00.100003|        |00003   |2003-01-01 00:00:00.100003|3     |
|2003-01-01 00:00:00.100004|        |00004   |2003-01-01 00:00:00.100004|4     |
|2003-01-01 00:00:00.100005|        |00005   |2003-01-01 00:00:00.100005|5     |
+--------------------------+--------+--------+--------------------------+------+
only showing top 5 row

View solution in original post

Don't have an account?
Coming from Hortonworks? Activate your account here