- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Conversion of a file(with pipe(|), comma(,) and inverted commas(")) to avro format
- Labels:
-
Apache Hive
-
HDFS
Created on ‎10-21-2018 07:48 PM - edited ‎09-16-2022 06:49 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I have a flat file with column names separated by comma(,) and column values spearated by pipe(,) and comma(,).
can someone help how can I convert this file to avro file/format.
for example:
"EMP-CO","EMP-ID" - column names
|ABC|,|123456| - Values.
thanks in advance.
Created ‎10-22-2018 12:15 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Should be doable in Spark using the CSV and Avro reader/writer.
Your header is quite odd with quoting characters surrounding its column names, so it cannot be understood directly ('"' is an illegal character for an avro field name). We could have the Spark CSV reader ignore this line as a comment since no other line should start with a '"' character.
Your data is expressed as quoted values with the quoted character being '|'.
Something like the below can achieve a conversion, for CDH5:
~> spark-shell --packages com.databricks:spark-csv_2.10:1.5.0,com.databricks:spark-avro_2.10:4.0.0 > import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType} > // Manual schema declaration of the 'co' and 'id' column names and types > val customSchema = StructType(Array( StructField("co", StringType, true), StructField("id", IntegerType, true))) > val df = sqlContext.read.format("com.databricks.spark.csv").option("comment", "\"").option("quote", "|").schema(customSchema).load("/tmp/file.txt") > df.write.format("com.databricks.spark.avro").save("/tmp/avroout") > // Note: /tmp/file.txt is input file/dir, and /tmp/avroout is the output dir
Created ‎10-22-2018 12:15 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Should be doable in Spark using the CSV and Avro reader/writer.
Your header is quite odd with quoting characters surrounding its column names, so it cannot be understood directly ('"' is an illegal character for an avro field name). We could have the Spark CSV reader ignore this line as a comment since no other line should start with a '"' character.
Your data is expressed as quoted values with the quoted character being '|'.
Something like the below can achieve a conversion, for CDH5:
~> spark-shell --packages com.databricks:spark-csv_2.10:1.5.0,com.databricks:spark-avro_2.10:4.0.0 > import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType} > // Manual schema declaration of the 'co' and 'id' column names and types > val customSchema = StructType(Array( StructField("co", StringType, true), StructField("id", IntegerType, true))) > val df = sqlContext.read.format("com.databricks.spark.csv").option("comment", "\"").option("quote", "|").schema(customSchema).load("/tmp/file.txt") > df.write.format("com.databricks.spark.avro").save("/tmp/avroout") > // Note: /tmp/file.txt is input file/dir, and /tmp/avroout is the output dir
Created on ‎10-22-2018 07:48 PM - edited ‎10-22-2018 08:10 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
thanks for the reply.
can you please provide the steps in detailed.
Created ‎10-22-2018 08:24 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I am getting below error after trying...
scala> df.write.format("com.databricks.spark.avro").save("C:/Users/madan/Downloads/Avro/out/")
java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormat
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:62)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:62)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:62)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:62)
at scala.util.Try.orElse(Try.scala:82)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:62)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:219)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:31)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
at $iwC$$iwC$$iwC.<init>(<console>:44)
at $iwC$$iwC.<init>(<console>:46)
at $iwC.<init>(<console>:48)
at <init>(<console>:50)
at .<init>(<console>:54)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.FileFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 68 more
scala>
Created ‎10-24-2018 07:42 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi Harsha,
can you please tell me the /tmp location (are you refering the tmp folder under root or different one)
because I have given in same way but I am getting below error.
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
Created ‎10-24-2018 08:10 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
path if your Spark is configured to use HDFS as a default FS - I used /tmp
just for illustration.
The same should work on local FS modes too, but I've not tried it.
Created ‎10-24-2018 08:26 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
yes spark is configured to use HDFS as a default FS.
could you please let me know where exaactly I should keep the input file.
I tried many ways but did not suceed. I am new to Hadoop
Created ‎11-04-2018 07:54 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@Harsh thanks for providing solution.
I am able to generate avro files, but for 10kb flat file I am getting two avro files(part1 and part2).
but all my flat files are of more than 50mb, in this case I will get many number of .avro files which difficult to maintain. so is there a away to generate one avro file even if the flat file is large.
Created ‎11-05-2018 05:41 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@Harsh J is there a way to avoid giving the column names manually...
beacuse I have 150 columns per table and more than 200 tables which is a huge number.
