Created 02-08-2017 04:22 PM
Using Spark Can I compare 2 different data ( one from Sql DB and another from textfile )
I have two sets of data. One is text file and another is SQL table.
I would like to do a look up in to data presented in SQL table and text file and if they match, I want to delete some fields from the textfile.
Text File : ckt_id|location|usage|port|machine AXZCSD21DF|USA|2GB|101|MAC1 ABZCSD21DF|OTH|4GB|101|MAC2 AXZCSD21DF|USA|6GB|101|MAC4 BXZCSD21DF|USA|7GB|101|MAC6 SQL table: +-----------+-------+ | CCKT_NO|SEV_LVL| +-----------+-------+ | AXZCSD21DF| 1| | BXZCSD21DF| 1| | ABZCSD21DF| 3| | CXZCSD21DF| 2| | AXZCSD22DF| 2| | XZDCSD21DF| 3| |ADZZCSD21DF| 1| +-----------+-------+
Can Someone please guide me on this ?
Created 02-09-2017 03:49 AM
Reference this article on how to join a text file to a SQL database table. The full working code is provided:
Created 02-08-2017 05:15 PM
You can use dataframe. Convert the text file to a dataframe like the code below and do a join to start comparing.
sc.setLogLevel("WARN") import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.functions.broadcast import org.apache.spark.sql.types._ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val conf = new SparkConf() .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") case class d1( ckt_id:String, location:String, usage:String, port:String, machine:String ) val f2 = sc.textFile("textfile location") val f1_df = f2.map(_.split("\\|")) .map(x => d1( x(0).toString, x(0).toString, x(0).toString, x(0).toString, x(0).toString )).toDF // this will give u this table +----------+----------+----------+----------+----------+ | ckt_id| location| usage| port| machine| +----------+----------+----------+----------+----------+ |AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF| |ABZCSD21DF|ABZCSD21DF|ABZCSD21DF|ABZCSD21DF|ABZCSD21DF| |AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF| |BXZCSD21DF|BXZCSD21DF|BXZCSD21DF|BXZCSD21DF|BXZCSD21DF| +----------+----------+----------+----------+----------+
Created 02-09-2017 08:53 AM
Thank you so much Sir. You are awesome..
Created 02-08-2017 06:44 PM
If you want to use PySpark, the the following works with Spark 1.6. This is from work I did parsing a text file to extract orders data.
1. Read the text file data and convert to DataFrame so that your data is organized into named columns:
## read text file and parse out fields needed.
path = "hdfs://my_server:8020/my_path/*"
lines = sc.textFile(path)
parts = lines.map(lambda l: l.split("|"))
orders = parts.map(lambda o: Row(platform=o[101], date=int(o[1]), hour=int(o[2]), order_id=o[29], parent_order_uuid=o[90]))
schemaOrders = sqlContext.createDataFrame(orders)
## register as a table
schemaOrders.registerTempTable("schemaOrders")
2. Now read your data from the SQL database and register as a table in Spark. Spark can connect to SQL Databases. Here is an article showing how to connect Spark to SQL Server: https://community.hortonworks.com/content/kbentry/59205/spark-pyspark-to-extract-from-sql-server.htm...
3. Join the 2 datasets, the data from the file with the SQL database
Created 02-09-2017 03:49 AM
Reference this article on how to join a text file to a SQL database table. The full working code is provided:
Created 02-09-2017 08:30 AM
Thank you so much. You're Genius 🙂
Created 02-09-2017 11:46 AM
While doing sql operation am getting one of the table is not found.
scala> gsam.show() +-----------+-------+ | CCKT_NO|SEV_LVL| +-----------+-------+ | AXZCSD21DF| 1| | BXZCSD21DF| 1| | ABZCSD21DF| 3| | CXZCSD21DF| 2| | AXZCSD22DF| 2| | XZDCSD21DF| 3| |ADZZCSD21DF| 1| +-----------+-------+ scala> input_file.show() +-----------+--------+-----+----+-------+ | ckt_id|location|usage|port|machine| +-----------+--------+-----+----+-------+ | ckt_id|location|usage|port|machine| | AXZCSD21DF| USA| 2GB| 101| MAC1| | ABZCSD21DF| OTH| 4GB| 101| MAC2| | AXZCSD21DF| USA| 6GB| 101| MAC4| | BXZCSD21DF| USA| 7GB| 101| MAC6| | CXZCSD21DF| IND| 2GB| 101| MAC9| | AXZCSD21DF| USA| 1GB| 101| MAC0| | AXZCSD22DF| IND| 9GB| 101| MAC3| |ADZZCSD21DF| USA| 1GB| 101| MAC4| | AXZCSD21DF| USA| 2GB| 101| MAC5| | XZDCSD21DF| OTH| 2GB| 101| MAC1| +-----------+--------+-----+----+-------+ scala> input_file.registerTempTable("input_file_temp") scala> gsam.registerTempTable("gsam_temp") scala> val tmp = sqlContext.sql("select a.ckt_id,a.location,a.usage,a.port,a.machine,b.CCKT_NO,b.SEV_LVL FROM input_file_temp a, gsam_temp b where a.ckt_id=b.CCKT_NO AND b.sev_lvl='3'") org.apache.spark.sql.AnalysisException: Table not found: input_file_temp; at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:305) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:314) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:309)
Created 02-10-2017 02:41 AM
@Dinesh Das - The code in that article is done using PySpark and using Spark 2.1. It's working code.
What version of Spark are you using? I see that your using Scala. If you are using Spark version 2 or above, did you create a SparkSession? If an earlier version of Spark, did you create a SQLContext?
Created 02-10-2017 12:09 PM
Thanks for the python code. Am tryin to do it in both scala n python for knowledge purpose.
Am using Spark 1.6.2 . Yes I have created SQLContext.