Reply
Explorer
Posts: 13
Registered: ‎10-21-2018
Accepted Solution

Conversion of a file(with pipe(|), comma(,) and inverted commas(")) to avro format

I have a flat file with column names separated by comma(,) and column values spearated by pipe(,) and comma(,).

can someone help how can I convert this file to avro file/format.

 

for example:

 

"EMP-CO","EMP-ID" - column names

|ABC|,|123456| - Values.

 

thanks in advance.

Posts: 1,754
Kudos: 371
Solutions: 279
Registered: ‎07-31-2013

Re: Conversion of a file(with pipe(|), comma(,) and inverted commas(")) to avro format

Should be doable in Spark using the CSV and Avro reader/writer.

 

Your header is quite odd with quoting characters surrounding its column names, so it cannot be understood directly ('"' is an illegal character for an avro field name). We could have the Spark CSV reader ignore this line as a comment since no other line should start with a '"' character.

 

Your data is expressed as quoted values with the quoted character being '|'.

 

Something like the below can achieve a conversion, for CDH5:

~> spark-shell --packages com.databricks:spark-csv_2.10:1.5.0,com.databricks:spark-avro_2.10:4.0.0

> import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

> // Manual schema declaration of the 'co' and 'id' column names and types
> val customSchema = StructType(Array(
StructField("co", StringType, true),
StructField("id", IntegerType, true)))

> val df = sqlContext.read.format("com.databricks.spark.csv").option("comment", "\"").option("quote", "|").schema(customSchema).load("/tmp/file.txt")

> df.write.format("com.databricks.spark.avro").save("/tmp/avroout")

> // Note: /tmp/file.txt is input file/dir, and /tmp/avroout is the output dir

 

Explorer
Posts: 13
Registered: ‎10-21-2018

Re: Conversion of a file(with pipe(|), comma(,) and inverted commas(")) to avro format

[ Edited ]

thanks for the reply.

 

can you please provide the steps in detailed.

Explorer
Posts: 13
Registered: ‎10-21-2018

Re: Conversion of a file(with pipe(|), comma(,) and inverted commas(")) to avro format

I am getting below error after trying...

 

scala> df.write.format("com.databricks.spark.avro").save("C:/Users/madan/Downloads/Avro/out/")
java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormat
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:62)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:62)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:62)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:62)
at scala.util.Try.orElse(Try.scala:82)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:62)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:219)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:31)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
at $iwC$$iwC$$iwC.<init>(<console>:44)
at $iwC$$iwC.<init>(<console>:46)
at $iwC.<init>(<console>:48)
at <init>(<console>:50)
at .<init>(<console>:54)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.FileFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 68 more


scala>

 

 

Explorer
Posts: 13
Registered: ‎10-21-2018

Re: Conversion of a file(with pipe(|), comma(,) and inverted commas(")) to avro format

Hi Harsha,

 

can you please tell me the /tmp location (are you refering the tmp folder under root or different one)

 

because I have given in same way but I am getting below error.

 

org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:

 

 

Posts: 1,754
Kudos: 371
Solutions: 279
Registered: ‎07-31-2013

Re: Conversion of a file(with pipe(|), comma(,) and inverted commas(")) to avro format

The /tmp/file.txt is a HDFS path, not a local path. It could be any HDFS
path if your Spark is configured to use HDFS as a default FS - I used /tmp
just for illustration.

The same should work on local FS modes too, but I've not tried it.
Explorer
Posts: 13
Registered: ‎10-21-2018

Re: Conversion of a file(with pipe(|), comma(,) and inverted commas(")) to avro format

yes spark is configured to use HDFS as a default FS.

 

could you please let me know where exaactly I should keep the input file.

 

I tried many ways but did not suceed. I am new to Hadoop

Explorer
Posts: 13
Registered: ‎10-21-2018

Re: Conversion of a file(with pipe(|), comma(,) and inverted commas(")) to avro format

@Harsh thanks for providing solution.

 

I am able to generate avro files, but for 10kb flat file I am getting two avro files(part1 and part2).

 

but all my flat files are of more than 50mb, in this case I will get many number of .avro files which difficult to maintain. so is there a away to generate one avro file even if the flat file is large.

Explorer
Posts: 13
Registered: ‎10-21-2018

Re: Conversion of a file(with pipe(|), comma(,) and inverted commas(")) to avro format

@Harsh J is there a way to avoid giving the column names manually...

 

beacuse I have 150 columns per table and more than 200 tables which is a huge number.

Announcements
New solutions