Support Questions

Find answers, ask questions, and share your expertise

Conversion of a file(with pipe(|), comma(,) and inverted commas(")) to avro format

avatar

I have a flat file with column names separated by comma(,) and column values spearated by pipe(,) and comma(,).

can someone help how can I convert this file to avro file/format.

 

for example:

 

"EMP-CO","EMP-ID" - column names

|ABC|,|123456| - Values.

 

thanks in advance.

1 ACCEPTED SOLUTION

avatar
Mentor

Should be doable in Spark using the CSV and Avro reader/writer.

 

Your header is quite odd with quoting characters surrounding its column names, so it cannot be understood directly ('"' is an illegal character for an avro field name). We could have the Spark CSV reader ignore this line as a comment since no other line should start with a '"' character.

 

Your data is expressed as quoted values with the quoted character being '|'.

 

Something like the below can achieve a conversion, for CDH5:

~> spark-shell --packages com.databricks:spark-csv_2.10:1.5.0,com.databricks:spark-avro_2.10:4.0.0

> import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

> // Manual schema declaration of the 'co' and 'id' column names and types
> val customSchema = StructType(Array(
StructField("co", StringType, true),
StructField("id", IntegerType, true)))

> val df = sqlContext.read.format("com.databricks.spark.csv").option("comment", "\"").option("quote", "|").schema(customSchema).load("/tmp/file.txt")

> df.write.format("com.databricks.spark.avro").save("/tmp/avroout")

> // Note: /tmp/file.txt is input file/dir, and /tmp/avroout is the output dir

 

View solution in original post

8 REPLIES 8

avatar
Mentor

Should be doable in Spark using the CSV and Avro reader/writer.

 

Your header is quite odd with quoting characters surrounding its column names, so it cannot be understood directly ('"' is an illegal character for an avro field name). We could have the Spark CSV reader ignore this line as a comment since no other line should start with a '"' character.

 

Your data is expressed as quoted values with the quoted character being '|'.

 

Something like the below can achieve a conversion, for CDH5:

~> spark-shell --packages com.databricks:spark-csv_2.10:1.5.0,com.databricks:spark-avro_2.10:4.0.0

> import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

> // Manual schema declaration of the 'co' and 'id' column names and types
> val customSchema = StructType(Array(
StructField("co", StringType, true),
StructField("id", IntegerType, true)))

> val df = sqlContext.read.format("com.databricks.spark.csv").option("comment", "\"").option("quote", "|").schema(customSchema).load("/tmp/file.txt")

> df.write.format("com.databricks.spark.avro").save("/tmp/avroout")

> // Note: /tmp/file.txt is input file/dir, and /tmp/avroout is the output dir

 

avatar

thanks for the reply.

 

can you please provide the steps in detailed.

avatar

I am getting below error after trying...

 

scala> df.write.format("com.databricks.spark.avro").save("C:/Users/madan/Downloads/Avro/out/")
java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormat
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:62)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:62)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:62)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:62)
at scala.util.Try.orElse(Try.scala:82)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:62)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:219)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:31)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
at $iwC$$iwC$$iwC.<init>(<console>:44)
at $iwC$$iwC.<init>(<console>:46)
at $iwC.<init>(<console>:48)
at <init>(<console>:50)
at .<init>(<console>:54)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.FileFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 68 more


scala>

 

 

avatar

Hi Harsha,

 

can you please tell me the /tmp location (are you refering the tmp folder under root or different one)

 

because I have given in same way but I am getting below error.

 

org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:

 

 

avatar
Mentor
The /tmp/file.txt is a HDFS path, not a local path. It could be any HDFS
path if your Spark is configured to use HDFS as a default FS - I used /tmp
just for illustration.

The same should work on local FS modes too, but I've not tried it.

avatar

yes spark is configured to use HDFS as a default FS.

 

could you please let me know where exaactly I should keep the input file.

 

I tried many ways but did not suceed. I am new to Hadoop

avatar

@Harsh thanks for providing solution.

 

I am able to generate avro files, but for 10kb flat file I am getting two avro files(part1 and part2).

 

but all my flat files are of more than 50mb, in this case I will get many number of .avro files which difficult to maintain. so is there a away to generate one avro file even if the flat file is large.

avatar

@Harsh J is there a way to avoid giving the column names manually...

 

beacuse I have 150 columns per table and more than 200 tables which is a huge number.