Support Questions

Find answers, ask questions, and share your expertise

Comparing sql data with text file using Spark.

avatar
Expert Contributor

Using Spark Can I compare 2 different data ( one from Sql DB and another from textfile )

I have two sets of data. One is text file and another is SQL table.

I would like to do a look up in to data presented in SQL table and text file and if they match, I want to delete some fields from the textfile.

Text File :
ckt_id|location|usage|port|machine
AXZCSD21DF|USA|2GB|101|MAC1
ABZCSD21DF|OTH|4GB|101|MAC2
AXZCSD21DF|USA|6GB|101|MAC4
BXZCSD21DF|USA|7GB|101|MAC6

SQL table:
+-----------+-------+
|    CCKT_NO|SEV_LVL|
+-----------+-------+
| AXZCSD21DF|      1|
| BXZCSD21DF|      1|
| ABZCSD21DF|      3|
| CXZCSD21DF|      2|
| AXZCSD22DF|      2|
| XZDCSD21DF|      3|
|ADZZCSD21DF|      1|
+-----------+-------+

Can Someone please guide me on this ?

1 ACCEPTED SOLUTION

avatar

Reference this article on how to join a text file to a SQL database table. The full working code is provided:

https://community.hortonworks.com/articles/82346/spark-pyspark-for-etl-to-join-text-files-with-data....

View solution in original post

8 REPLIES 8

avatar
Expert Contributor

You can use dataframe. Convert the text file to a dataframe like the code below and do a join to start comparing.

sc.setLogLevel("WARN")
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._


val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val conf = new SparkConf()
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

case class d1(
  ckt_id:String,
  location:String,
  usage:String,
  port:String,
  machine:String
)

val f2 = sc.textFile("textfile location")

val f1_df = f2.map(_.split("\\|"))
              .map(x => d1(
                x(0).toString,
                x(0).toString,
                x(0).toString,
                x(0).toString,
                x(0).toString
              )).toDF

// this will give u this table

+----------+----------+----------+----------+----------+
|    ckt_id|  location|     usage|      port|   machine|
+----------+----------+----------+----------+----------+
|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|
|ABZCSD21DF|ABZCSD21DF|ABZCSD21DF|ABZCSD21DF|ABZCSD21DF|
|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|
|BXZCSD21DF|BXZCSD21DF|BXZCSD21DF|BXZCSD21DF|BXZCSD21DF|
+----------+----------+----------+----------+----------+

avatar
Expert Contributor

Thank you so much Sir. You are awesome..

avatar

If you want to use PySpark, the the following works with Spark 1.6. This is from work I did parsing a text file to extract orders data.

1. Read the text file data and convert to DataFrame so that your data is organized into named columns:

## read text file and parse out fields needed.

path = "hdfs://my_server:8020/my_path/*"

lines = sc.textFile(path)

parts = lines.map(lambda l: l.split("|"))

orders = parts.map(lambda o: Row(platform=o[101], date=int(o[1]), hour=int(o[2]), order_id=o[29], parent_order_uuid=o[90]))

schemaOrders = sqlContext.createDataFrame(orders)

## register as a table

schemaOrders.registerTempTable("schemaOrders")

2. Now read your data from the SQL database and register as a table in Spark. Spark can connect to SQL Databases. Here is an article showing how to connect Spark to SQL Server: https://community.hortonworks.com/content/kbentry/59205/spark-pyspark-to-extract-from-sql-server.htm...

3. Join the 2 datasets, the data from the file with the SQL database

avatar

Reference this article on how to join a text file to a SQL database table. The full working code is provided:

https://community.hortonworks.com/articles/82346/spark-pyspark-for-etl-to-join-text-files-with-data....

avatar
Expert Contributor

Thank you so much. You're Genius 🙂

avatar
Expert Contributor

@Binu Mathew

While doing sql operation am getting one of the table is not found.

scala> gsam.show()
+-----------+-------+
|    CCKT_NO|SEV_LVL|
+-----------+-------+
| AXZCSD21DF|      1|
| BXZCSD21DF|      1|
| ABZCSD21DF|      3|
| CXZCSD21DF|      2|
| AXZCSD22DF|      2|
| XZDCSD21DF|      3|
|ADZZCSD21DF|      1|
+-----------+-------+

scala> input_file.show()
+-----------+--------+-----+----+-------+
|     ckt_id|location|usage|port|machine|
+-----------+--------+-----+----+-------+
|     ckt_id|location|usage|port|machine|
| AXZCSD21DF|     USA|  2GB| 101|   MAC1|
| ABZCSD21DF|     OTH|  4GB| 101|   MAC2|
| AXZCSD21DF|     USA|  6GB| 101|   MAC4|
| BXZCSD21DF|     USA|  7GB| 101|   MAC6|
| CXZCSD21DF|     IND|  2GB| 101|   MAC9|
| AXZCSD21DF|     USA|  1GB| 101|   MAC0|
| AXZCSD22DF|     IND|  9GB| 101|   MAC3|
|ADZZCSD21DF|     USA|  1GB| 101|   MAC4|
| AXZCSD21DF|     USA|  2GB| 101|   MAC5|
| XZDCSD21DF|     OTH|  2GB| 101|   MAC1|
+-----------+--------+-----+----+-------+

scala> input_file.registerTempTable("input_file_temp")
scala> gsam.registerTempTable("gsam_temp")
scala> val tmp = sqlContext.sql("select a.ckt_id,a.location,a.usage,a.port,a.machine,b.CCKT_NO,b.SEV_LVL FROM input_file_temp  a, gsam_temp b where a.ckt_id=b.CCKT_NO AND b.sev_lvl='3'")
org.apache.spark.sql.AnalysisException: Table not found: input_file_temp;
        at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:305)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:314)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:309)

avatar

@Dinesh Das - The code in that article is done using PySpark and using Spark 2.1. It's working code.

What version of Spark are you using? I see that your using Scala. If you are using Spark version 2 or above, did you create a SparkSession? If an earlier version of Spark, did you create a SQLContext?

avatar
Expert Contributor

@Binu Mathew

Thanks for the python code. Am tryin to do it in both scala n python for knowledge purpose.

Am using Spark 1.6.2 . Yes I have created SQLContext.