Member since
02-26-2016
100
Posts
111
Kudos Received
12
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2210 | 05-04-2017 12:38 PM | |
4448 | 03-21-2017 06:18 PM | |
14237 | 03-21-2017 01:14 AM | |
4670 | 02-14-2017 06:21 PM | |
7490 | 02-09-2017 03:49 AM |
02-09-2017
03:49 AM
Reference this article on how to join a text file to a SQL database table. The full working code is provided: https://community.hortonworks.com/articles/82346/spark-pyspark-for-etl-to-join-text-files-with-data.html
... View more
02-09-2017
03:30 AM
2 Kudos
BACKGROUND
The modern Data Warehouse contains a heterogenous mix of data: delimited text files, data in Hadoop (HDFS/Hive), relational databases, NoSQL databases, Parquet, Avro, JSON, Geospatial data, and more. ETL (Extract-Transform-Load) is a process used to integrate these disparate data types and create a unified view of the data. Apache Spark is a fast general purpose distributed computation engine for fault-tolerant parallel data processing. Spark is an excellent choice for ETL:
Works with a myriad of data sources: files, RDBMS's, NoSQL, Parquet, Avro, JSON, XML, and many more.
Write your ETL code using Java, Scala, or Python.
In-memory computing for fast data processing.
API's to easily create schemas for your data and perform SQL computations.
Distributed computing and fault-tolerance is built into the framework and abstracted from the end-user.
This article will demonstrate how easy it is to use Spark with the Python API (PySpark) for ETL processing to join data from text files with data in a MySQL table.
SOURCE DATA
Pipe (|) delimited text files stored in HDFS containing information about customer orders:
Customer table in MySQL containing information about customers:
We need to join the data from the orders files in HDFS with the customer data in MySQL so that we produce a new file in HDFS displaying customers names to orders. The new file will be stored as an ORC formatted file. ORC is a columnar file format that provides high data compression and fast access for data analysis. Conceptually, the output will look like the screenshot below, but this is not how the data is stored. I'm only showing this to visualize the joined data set. ORC is columnar storage and not row storage.
CODE
The code below was tested using Spark 2.1. Beginning in Spark 2.0 the SQLContext and HiveContext have been replaced with SparkSession; however, both SQLContext and HiveContext are preserved for backwards compatibility. Notice in my code below that I'm using SparkSession.
To run the code below you will need the CSV parsing library from DataBricks: https://github.com/databricks/spark-csv
If your working with large data volumes, then I recommend that you use the CSV parsing library from DataBricks as it will be much faster than using your typical lambda function to parse out fields from a text file. For a performance evaluation, please refer to: https://community.hortonworks.com/content/kbentry/52866/hive-on-tez-vs-pyspark-for-weblogs-parsing.html
Run the code using:
spark-submit --master yarn --deploy-mode cluster --packages com.databricks:spark-csv_2.11:1.4.0,mysql:mysql-connector-java:5.1.38 my_script.py
from pyspark.sql import SparkSession
from pyspark.sql.types import *
## create a SparkSession
spark = SparkSession .builder .appName("join_file_with_mysql") .config("spark.shuffle.service.enabled","true") .config("spark.dynamicAllocation.enabled","true") .config("spark.executor.cores","5") .getOrCreate()
## create a schema for the orders file
customSchema = StructType([ StructField("order_number", StringType(), True), StructField("customer_number", StringType(), True), StructField("order_quantity", StringType(), True), StructField("unit_price", StringType(), True), StructField("total", StringType(), True)])
## create a DataFrame for the orders files stored in HDFS
df_orders = spark.read .format('com.databricks.spark.csv') .options(header='true', delimiter='|') .load('hdfs:///orders_file/', schema = customSchema)
## register the orders data as a temporary table
df_orders.registerTempTable("orders")
## create a DataFrame for the MySQL customer table
df_customers = spark.read.format("jdbc").options(
url ="jdbc:mysql://bmathew-test.czrx1al336uo.ap-northeast-1.rds.amazonaws.com:3306/test",
driver="com.mysql.jdbc.Driver",
dbtable="customer",
user="root",
password="password"
).load()
## register the customers data as a temporary table
df_customers.registerTempTable("customers")
## join the DataSets
sql='''
SELECT a.first_name, a.last_name, b.order_number, b.total
FROM customers a, orders b
WHERE a.customer_number = b.customer_number
'''
output = spark.sql(sql)
## save the data into an ORC file
output.write.format("orc").save("/tmp/customer_orders")
... View more
Labels:
02-08-2017
06:44 PM
If you want to use PySpark, the the following works with Spark 1.6. This is from work I did parsing a text file to extract orders data. 1. Read the text file data and convert to DataFrame so that your data is organized into named columns: ## read text file and parse out fields needed. path = "hdfs://my_server:8020/my_path/*" lines = sc.textFile(path) parts = lines.map(lambda l: l.split("|")) orders = parts.map(lambda o: Row(platform=o[101], date=int(o[1]), hour=int(o[2]), order_id=o[29], parent_order_uuid=o[90])) schemaOrders = sqlContext.createDataFrame(orders) ## register as a table schemaOrders.registerTempTable("schemaOrders") 2. Now read your data from the SQL database and register as a table in Spark. Spark can connect to SQL Databases. Here is an article showing how to connect Spark to SQL Server: https://community.hortonworks.com/content/kbentry/59205/spark-pyspark-to-extract-from-sql-server.html
3. Join the 2 datasets, the data from the file with the SQL database
... View more
02-08-2017
07:40 AM
1 Kudo
Can you enclose your data fields in double quotes, "", when exporting to HDFS? For example: "Field1|Test"|"Field2|Test"|"Field3|Test" Notice how I have pipe (|) as the delimiter and it is also part of the column data, but my columns are enclosed in double quotes. Then use the OpenCSV Serde when creating the DDL for your Hive table. For example, if I wanted to create a Hive table for the sample record above, and the file is already in a directory named 'test' in HDFS, then my DDL statement would be: create external table my_table ( field1 string, field2 string, field3 string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES ( "separatorChar" = "|", "quoteChar" = "\"" ) LOCATION '/test/'; Now you can query the table and Hive will correctly display the data.
... View more
02-01-2017
07:24 PM
If you are running the Hive queries using Tez as the execution engine, then Ambari has a 'Tez View' that lets you see all the queries submitted, both finished and current running queries. From here you can see query duration times and who ran the query:
... View more
02-01-2017
04:07 PM
You can also securely do this via a REST API over HTTP from any node: 1. WebHDFS: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.0/bk_hdfs_admin_tools/content/ch12.html 2. HttpFS - If you plan on using WebHDFS in a High Availability cluster (Active and Passive NameNodes)
You can also implement Knox for a single and secure Rest access point (with different port numbers) for:
- Ambari
- WebHDFS
- HCatalog
- HBase
- Oozie
-Hive
-Yarn
-Resource Manager
-Storm
http://hortonworks.com/apache/knox-gateway/ http://hortonworks.com/hadoop-tutorial/securing-hadoop-infrastructure-apache-knox/
... View more
01-31-2017
09:01 PM
The connector is a contribution from Syncsort. Syncsort has decades of experience with building tools for Mainframe data ingestion. I have used Sqoop extensively; however, never for Mainframe data. Syncsort states: "Each data set will be stored as a separate HDFS file and EBCDIC encoded fixed length data will be stored as ASCII encoded variable length text on HDFS" http://blog.syncsort.com/2014/06/big-data/big-iron-big-data-mainframe-hadoop-apache-sqoop/ There is also a Spark connector to import Mainframe data: https://github.com/Syncsort/spark-mainframe-connector
... View more
01-31-2017
06:28 PM
2 Kudos
Sqoop has a connector to import Mainframe data into HDFS: https://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html#_literal_sqoop_import_mainframe_literal
... View more
01-27-2017
07:33 PM
Configure the 'StoreInKiteDataset' processor to set the URI for your Hive table. Your Avro formatted data will be converted to a Parquet formatted Hive table. In the processor, set the property 'Target dataset URI' to your Hive table. For example, I'm writing to a Hive table named weblogs_parquet - dataset:hive://ip-172-31-2-102.us-west-2.compute.internal:9083/weblogs_parquet Learn more about Kite Datasets at http://kitesdk.org/docs/current/
... View more
01-27-2017
07:18 PM
Do you need to convert to JSON? From Avro you can use the Kite DataSet processor and store in Hive as Parquet: https://community.hortonworks.com/articles/70257/hdfnifi-to-convert-row-formatted-text-files-to-col.html
... View more