Created 02-08-2017 04:22 PM
Using Spark Can I compare 2 different data ( one from Sql DB and another from textfile )
I have two sets of data. One is text file and another is SQL table.
I would like to do a look up in to data presented in SQL table and text file and if they match, I want to delete some fields from the textfile.
Text File : ckt_id|location|usage|port|machine AXZCSD21DF|USA|2GB|101|MAC1 ABZCSD21DF|OTH|4GB|101|MAC2 AXZCSD21DF|USA|6GB|101|MAC4 BXZCSD21DF|USA|7GB|101|MAC6 SQL table: +-----------+-------+ | CCKT_NO|SEV_LVL| +-----------+-------+ | AXZCSD21DF| 1| | BXZCSD21DF| 1| | ABZCSD21DF| 3| | CXZCSD21DF| 2| | AXZCSD22DF| 2| | XZDCSD21DF| 3| |ADZZCSD21DF| 1| +-----------+-------+
Can Someone please guide me on this ?
Created 02-09-2017 03:49 AM
Reference this article on how to join a text file to a SQL database table. The full working code is provided:
Created 02-08-2017 05:15 PM
You can use dataframe. Convert the text file to a dataframe like the code below and do a join to start comparing.
sc.setLogLevel("WARN")
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
case class d1(
ckt_id:String,
location:String,
usage:String,
port:String,
machine:String
)
val f2 = sc.textFile("textfile location")
val f1_df = f2.map(_.split("\\|"))
.map(x => d1(
x(0).toString,
x(0).toString,
x(0).toString,
x(0).toString,
x(0).toString
)).toDF
// this will give u this table
+----------+----------+----------+----------+----------+
| ckt_id| location| usage| port| machine|
+----------+----------+----------+----------+----------+
|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|
|ABZCSD21DF|ABZCSD21DF|ABZCSD21DF|ABZCSD21DF|ABZCSD21DF|
|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|
|BXZCSD21DF|BXZCSD21DF|BXZCSD21DF|BXZCSD21DF|BXZCSD21DF|
+----------+----------+----------+----------+----------+
Created 02-09-2017 08:53 AM
Thank you so much Sir. You are awesome..
Created 02-08-2017 06:44 PM
If you want to use PySpark, the the following works with Spark 1.6. This is from work I did parsing a text file to extract orders data.
1. Read the text file data and convert to DataFrame so that your data is organized into named columns:
## read text file and parse out fields needed.
path = "hdfs://my_server:8020/my_path/*"
lines = sc.textFile(path)
parts = lines.map(lambda l: l.split("|"))
orders = parts.map(lambda o: Row(platform=o[101], date=int(o[1]), hour=int(o[2]), order_id=o[29], parent_order_uuid=o[90]))
schemaOrders = sqlContext.createDataFrame(orders)
## register as a table
schemaOrders.registerTempTable("schemaOrders")
2. Now read your data from the SQL database and register as a table in Spark. Spark can connect to SQL Databases. Here is an article showing how to connect Spark to SQL Server: https://community.hortonworks.com/content/kbentry/59205/spark-pyspark-to-extract-from-sql-server.htm...
3. Join the 2 datasets, the data from the file with the SQL database
Created 02-09-2017 03:49 AM
Reference this article on how to join a text file to a SQL database table. The full working code is provided:
Created 02-09-2017 08:30 AM
Thank you so much. You're Genius 🙂
Created 02-09-2017 11:46 AM
While doing sql operation am getting one of the table is not found.
scala> gsam.show()
+-----------+-------+
| CCKT_NO|SEV_LVL|
+-----------+-------+
| AXZCSD21DF| 1|
| BXZCSD21DF| 1|
| ABZCSD21DF| 3|
| CXZCSD21DF| 2|
| AXZCSD22DF| 2|
| XZDCSD21DF| 3|
|ADZZCSD21DF| 1|
+-----------+-------+
scala> input_file.show()
+-----------+--------+-----+----+-------+
| ckt_id|location|usage|port|machine|
+-----------+--------+-----+----+-------+
| ckt_id|location|usage|port|machine|
| AXZCSD21DF| USA| 2GB| 101| MAC1|
| ABZCSD21DF| OTH| 4GB| 101| MAC2|
| AXZCSD21DF| USA| 6GB| 101| MAC4|
| BXZCSD21DF| USA| 7GB| 101| MAC6|
| CXZCSD21DF| IND| 2GB| 101| MAC9|
| AXZCSD21DF| USA| 1GB| 101| MAC0|
| AXZCSD22DF| IND| 9GB| 101| MAC3|
|ADZZCSD21DF| USA| 1GB| 101| MAC4|
| AXZCSD21DF| USA| 2GB| 101| MAC5|
| XZDCSD21DF| OTH| 2GB| 101| MAC1|
+-----------+--------+-----+----+-------+
scala> input_file.registerTempTable("input_file_temp")
scala> gsam.registerTempTable("gsam_temp")
scala> val tmp = sqlContext.sql("select a.ckt_id,a.location,a.usage,a.port,a.machine,b.CCKT_NO,b.SEV_LVL FROM input_file_temp a, gsam_temp b where a.ckt_id=b.CCKT_NO AND b.sev_lvl='3'")
org.apache.spark.sql.AnalysisException: Table not found: input_file_temp;
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:305)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:314)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:309)
Created 02-10-2017 02:41 AM
@Dinesh Das - The code in that article is done using PySpark and using Spark 2.1. It's working code.
What version of Spark are you using? I see that your using Scala. If you are using Spark version 2 or above, did you create a SparkSession? If an earlier version of Spark, did you create a SQLContext?
Created 02-10-2017 12:09 PM
Thanks for the python code. Am tryin to do it in both scala n python for knowledge purpose.
Am using Spark 1.6.2 . Yes I have created SQLContext.