Created on 10-21-2018 07:48 PM - edited 09-16-2022 06:49 AM
I have a flat file with column names separated by comma(,) and column values spearated by pipe(,) and comma(,).
can someone help how can I convert this file to avro file/format.
for example:
"EMP-CO","EMP-ID" - column names
|ABC|,|123456| - Values.
thanks in advance.
Created 10-22-2018 12:15 AM
Should be doable in Spark using the CSV and Avro reader/writer.
Your header is quite odd with quoting characters surrounding its column names, so it cannot be understood directly ('"' is an illegal character for an avro field name). We could have the Spark CSV reader ignore this line as a comment since no other line should start with a '"' character.
Your data is expressed as quoted values with the quoted character being '|'.
Something like the below can achieve a conversion, for CDH5:
~> spark-shell --packages com.databricks:spark-csv_2.10:1.5.0,com.databricks:spark-avro_2.10:4.0.0 > import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType} > // Manual schema declaration of the 'co' and 'id' column names and types > val customSchema = StructType(Array( StructField("co", StringType, true), StructField("id", IntegerType, true))) > val df = sqlContext.read.format("com.databricks.spark.csv").option("comment", "\"").option("quote", "|").schema(customSchema).load("/tmp/file.txt") > df.write.format("com.databricks.spark.avro").save("/tmp/avroout") > // Note: /tmp/file.txt is input file/dir, and /tmp/avroout is the output dir
Created 10-22-2018 12:15 AM
Should be doable in Spark using the CSV and Avro reader/writer.
Your header is quite odd with quoting characters surrounding its column names, so it cannot be understood directly ('"' is an illegal character for an avro field name). We could have the Spark CSV reader ignore this line as a comment since no other line should start with a '"' character.
Your data is expressed as quoted values with the quoted character being '|'.
Something like the below can achieve a conversion, for CDH5:
~> spark-shell --packages com.databricks:spark-csv_2.10:1.5.0,com.databricks:spark-avro_2.10:4.0.0 > import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType} > // Manual schema declaration of the 'co' and 'id' column names and types > val customSchema = StructType(Array( StructField("co", StringType, true), StructField("id", IntegerType, true))) > val df = sqlContext.read.format("com.databricks.spark.csv").option("comment", "\"").option("quote", "|").schema(customSchema).load("/tmp/file.txt") > df.write.format("com.databricks.spark.avro").save("/tmp/avroout") > // Note: /tmp/file.txt is input file/dir, and /tmp/avroout is the output dir
Created on 10-22-2018 07:48 PM - edited 10-22-2018 08:10 PM
thanks for the reply.
can you please provide the steps in detailed.
Created 10-22-2018 08:24 PM
I am getting below error after trying...
scala> df.write.format("com.databricks.spark.avro").save("C:/Users/madan/Downloads/Avro/out/")
java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormat
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:62)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:62)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:62)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:62)
at scala.util.Try.orElse(Try.scala:82)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:62)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:219)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:31)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
at $iwC$$iwC$$iwC.<init>(<console>:44)
at $iwC$$iwC.<init>(<console>:46)
at $iwC.<init>(<console>:48)
at <init>(<console>:50)
at .<init>(<console>:54)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.FileFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 68 more
scala>
Created 10-24-2018 07:42 PM
Hi Harsha,
can you please tell me the /tmp location (are you refering the tmp folder under root or different one)
because I have given in same way but I am getting below error.
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
Created 10-24-2018 08:10 PM
Created 10-24-2018 08:26 PM
yes spark is configured to use HDFS as a default FS.
could you please let me know where exaactly I should keep the input file.
I tried many ways but did not suceed. I am new to Hadoop
Created 11-04-2018 07:54 PM
@Harsh thanks for providing solution.
I am able to generate avro files, but for 10kb flat file I am getting two avro files(part1 and part2).
but all my flat files are of more than 50mb, in this case I will get many number of .avro files which difficult to maintain. so is there a away to generate one avro file even if the flat file is large.
Created 11-05-2018 05:41 PM
@Harsh J is there a way to avoid giving the column names manually...
beacuse I have 150 columns per table and more than 200 tables which is a huge number.