Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Comparing sql data with text file using Spark.

avatar
Expert Contributor

Using Spark Can I compare 2 different data ( one from Sql DB and another from textfile )

I have two sets of data. One is text file and another is SQL table.

I would like to do a look up in to data presented in SQL table and text file and if they match, I want to delete some fields from the textfile.

Text File :
ckt_id|location|usage|port|machine
AXZCSD21DF|USA|2GB|101|MAC1
ABZCSD21DF|OTH|4GB|101|MAC2
AXZCSD21DF|USA|6GB|101|MAC4
BXZCSD21DF|USA|7GB|101|MAC6

SQL table:
+-----------+-------+
|    CCKT_NO|SEV_LVL|
+-----------+-------+
| AXZCSD21DF|      1|
| BXZCSD21DF|      1|
| ABZCSD21DF|      3|
| CXZCSD21DF|      2|
| AXZCSD22DF|      2|
| XZDCSD21DF|      3|
|ADZZCSD21DF|      1|
+-----------+-------+

Can Someone please guide me on this ?

1 ACCEPTED SOLUTION

avatar

Reference this article on how to join a text file to a SQL database table. The full working code is provided:

https://community.hortonworks.com/articles/82346/spark-pyspark-for-etl-to-join-text-files-with-data....

View solution in original post

8 REPLIES 8

avatar
Expert Contributor

You can use dataframe. Convert the text file to a dataframe like the code below and do a join to start comparing.

sc.setLogLevel("WARN")
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._


val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val conf = new SparkConf()
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

case class d1(
  ckt_id:String,
  location:String,
  usage:String,
  port:String,
  machine:String
)

val f2 = sc.textFile("textfile location")

val f1_df = f2.map(_.split("\\|"))
              .map(x => d1(
                x(0).toString,
                x(0).toString,
                x(0).toString,
                x(0).toString,
                x(0).toString
              )).toDF

// this will give u this table

+----------+----------+----------+----------+----------+
|    ckt_id|  location|     usage|      port|   machine|
+----------+----------+----------+----------+----------+
|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|
|ABZCSD21DF|ABZCSD21DF|ABZCSD21DF|ABZCSD21DF|ABZCSD21DF|
|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|
|BXZCSD21DF|BXZCSD21DF|BXZCSD21DF|BXZCSD21DF|BXZCSD21DF|
+----------+----------+----------+----------+----------+

avatar
Expert Contributor

Thank you so much Sir. You are awesome..

avatar

If you want to use PySpark, the the following works with Spark 1.6. This is from work I did parsing a text file to extract orders data.

1. Read the text file data and convert to DataFrame so that your data is organized into named columns:

## read text file and parse out fields needed.

path = "hdfs://my_server:8020/my_path/*"

lines = sc.textFile(path)

parts = lines.map(lambda l: l.split("|"))

orders = parts.map(lambda o: Row(platform=o[101], date=int(o[1]), hour=int(o[2]), order_id=o[29], parent_order_uuid=o[90]))

schemaOrders = sqlContext.createDataFrame(orders)

## register as a table

schemaOrders.registerTempTable("schemaOrders")

2. Now read your data from the SQL database and register as a table in Spark. Spark can connect to SQL Databases. Here is an article showing how to connect Spark to SQL Server: https://community.hortonworks.com/content/kbentry/59205/spark-pyspark-to-extract-from-sql-server.htm...

3. Join the 2 datasets, the data from the file with the SQL database

avatar

Reference this article on how to join a text file to a SQL database table. The full working code is provided:

https://community.hortonworks.com/articles/82346/spark-pyspark-for-etl-to-join-text-files-with-data....

avatar
Expert Contributor

Thank you so much. You're Genius 🙂

avatar
Expert Contributor

@Binu Mathew

While doing sql operation am getting one of the table is not found.

scala> gsam.show()
+-----------+-------+
|    CCKT_NO|SEV_LVL|
+-----------+-------+
| AXZCSD21DF|      1|
| BXZCSD21DF|      1|
| ABZCSD21DF|      3|
| CXZCSD21DF|      2|
| AXZCSD22DF|      2|
| XZDCSD21DF|      3|
|ADZZCSD21DF|      1|
+-----------+-------+

scala> input_file.show()
+-----------+--------+-----+----+-------+
|     ckt_id|location|usage|port|machine|
+-----------+--------+-----+----+-------+
|     ckt_id|location|usage|port|machine|
| AXZCSD21DF|     USA|  2GB| 101|   MAC1|
| ABZCSD21DF|     OTH|  4GB| 101|   MAC2|
| AXZCSD21DF|     USA|  6GB| 101|   MAC4|
| BXZCSD21DF|     USA|  7GB| 101|   MAC6|
| CXZCSD21DF|     IND|  2GB| 101|   MAC9|
| AXZCSD21DF|     USA|  1GB| 101|   MAC0|
| AXZCSD22DF|     IND|  9GB| 101|   MAC3|
|ADZZCSD21DF|     USA|  1GB| 101|   MAC4|
| AXZCSD21DF|     USA|  2GB| 101|   MAC5|
| XZDCSD21DF|     OTH|  2GB| 101|   MAC1|
+-----------+--------+-----+----+-------+

scala> input_file.registerTempTable("input_file_temp")
scala> gsam.registerTempTable("gsam_temp")
scala> val tmp = sqlContext.sql("select a.ckt_id,a.location,a.usage,a.port,a.machine,b.CCKT_NO,b.SEV_LVL FROM input_file_temp  a, gsam_temp b where a.ckt_id=b.CCKT_NO AND b.sev_lvl='3'")
org.apache.spark.sql.AnalysisException: Table not found: input_file_temp;
        at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:305)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:314)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:309)

avatar

@Dinesh Das - The code in that article is done using PySpark and using Spark 2.1. It's working code.

What version of Spark are you using? I see that your using Scala. If you are using Spark version 2 or above, did you create a SparkSession? If an earlier version of Spark, did you create a SQLContext?

avatar
Expert Contributor

@Binu Mathew

Thanks for the python code. Am tryin to do it in both scala n python for knowledge purpose.

Am using Spark 1.6.2 . Yes I have created SQLContext.