Member since
02-26-2016
100
Posts
111
Kudos Received
12
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2089 | 05-04-2017 12:38 PM | |
4061 | 03-21-2017 06:18 PM | |
13617 | 03-21-2017 01:14 AM | |
4424 | 02-14-2017 06:21 PM | |
7144 | 02-09-2017 03:49 AM |
05-30-2016
02:36 PM
6 Kudos
SYNOPSIS This article will demonstrate using Spark (PySpark) with the S3A filesystem client to access data in S3. The code can also be found on GitHub @ https://github.com/mathewbk/spark_s3a The source data in the S3 bucket is Omniture clickstream data (weblogs). The goal is to write PySpark code against the S3 data to RANK geographic locations by page view traffic - which areas generate the most traffic by page view counts. The S3A filesystem client (s3a://) is a replacement for the S3 Native (s3n://): It uses Amazon’s libraries to interact with S3 Supports larger files Higher performance Supports IAM role-based authentication Production stable since Hadoop 2.7 (per Apache website)
The code below is a working example tested using HDP 2.4.0.0-169 (Hadoop version 2.7.1.2.4.0.0-169) Note that I am using the HiveContext and not the SQLContext. I am NOT querying or accessing any Hive tables, but I am using the RANK() function to perform a ranking. If you want to use the RANK() function on a Data Frame writing standard SQL that you are already familiar with, then you you have to use the HiveContext. If you use the SQLContext, then the code and syntax will look much different. Please refer to the PySpark documentation. In short, it’s easier to use the HiveContext; however, this can be done using the SQLContext. STEP 1: Create a Spark properties file Store your AWS credentials in a configuration file. Specify the location for the AWS jars needed to interact with S3A. Two are required, hadoop-aws and aws-java-sdk. Tab delimited file. spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
spark.driver.extraClassPath /usr/hdp/2.4.0.0-169/hadoop/hadoop-aws-2.7.1.2.4.0.0-169.jar:/usr/hdp/2.4.0.0-169/hadoop/lib/aws-java-sdk-1.7.4.jar
spark.hadoop.fs.s3a.access.key <your access key>
spark.hadoop.fs.s3a.secret.key <your secret key> STEP 2: PySpark code from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import *
## set Spark properties
conf = (SparkConf()
.setAppName("s3a_test")
.set("spark.executor.instances", "8")
.set("spark.executor.cores", 2)
.set("spark.shuffle.compress", "true")
.set("spark.io.compression.codec", "snappy")
.set("spark.executor.memory", "2g"))
sc = SparkContext(conf = conf)
## create SQL SQLContext
sqlContext = HiveContext(sc)
## path to S3 bucket containing my files
path = "s3a://bkm-clickstream/Omniture/*"
## get those fields we need to create the schema. file is tab delimited
lines = sc.textFile(path)
parts = lines.map(lambda l: l.split("\t"))
weblogs_hit = parts.map(lambda p: Row(url=p[12], city=p[49], country = p[50], state = p[52]))
## create DataFrame
schema_weblogs_hit = sqlContext.createDataFrame(weblogs_hit)
## register DataFrame as a temporary table
schema_weblogs_hit.registerTempTable("weblogs_hit")
## RANK pageview count by geographic location - which areas generate the most traffic in terms of page views
rows = sqlContext.sql("SELECT m.location, m.page_view_count, RANK() OVER (ORDER BY m.page_view_count DESC) AS ranking FROM (SELECT CONCAT(UPPER(city),',',UPPER(country),',',UPPER(state)) AS location, count(1) AS page_view_count FROM weblogs_hit GROUP BY city, country, state ORDER BY page_view_count) m LIMIT 10")
## run SQL command and display output
output = rows.collect()
for row in output:
row = str(row)
print "%s" % (row) STEP 3: Run PySpark code via YARN /your_path_to_spark-submit --master yarn-client --properties-file <your_properties_file.conf> <your_pyspark_script.py> OUTPUT:
... View more
Labels:
05-26-2016
08:58 PM
3 Kudos
SYNOPSIS
Democratization of Big Data processing is here with Spark and Data Frames. Those days of writing complex parsers to parse and store weblogs data are over.
You can easily use Spark's in-memory processing capabilities to quickly ingest and parse weblogs data.
For this example, I downloaded some Omniture weblogs data from Adobe for a fictional company:
The data consists of 421,266 records across 5 files which I put into HDFS
Each record could contain up to 178 columns
These are tab delimited text files
Use Spark with Data Frames via PySpark to parse out the fields we need and output into new Parquet file
Build an External Hive table over this Parquet file so analysts can easily query the data
The code is at the end of this article. Before showing the code, I have some screenshots from Zeppelin that I took while doing this exercise to showcase Zeppelin's UI to quickly develop, analyze, and visualize your work.
The code is a basic PySpark script to get you started with parsing text files and using Spark with Data Frames
Zeppelin screenshot showing the record count across all the raw and unparsed files:
Zeppelin screenshot showing a sample record from the raw and unparsed files:
Use Zeppelin query visualization features to show top 5 most visited web pages from parsed data:
Use Zeppelin query visualization features to show the distribution of page view counts by hour from parsed data:
PySpark code:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import os
from pyspark.sql import *
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import HiveContext
from pyspark.sql.types import *
## write snappy compressed output
conf = (SparkConf()
.setAppName("parse_weblogs")
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.executor.cores", 4)
.set("spark.executor.instances", 20)
.set("spark.sql.parquet.compression.codec", "snappy")
.set("spark.shuffle.compress", "true")
.set("spark.io.compression.codec", "snappy"))
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
## read text file and parse out fields needed
## file is tab delimited
path = "hdfs://ip-000-00-0-00.xxxx.xxxx.internal:8020/landing/weblogs/*"
lines = sc.textFile(path)
parts = lines.map(lambda l: l.split("\t"))
weblogs_hit = parts.map(lambda p: Row(hit_timestamp=p[1], swid=p[13], ip_address=p[7], url=p[12], user_agent=p[43], city=p[49], country = p[50], state = p[52]))
## create a Data Frame from the fields we parsed
schema_weblogs_hit = sqlContext.createDataFrame(weblogs_hit)
## register Data Frame as a temporary table
schema_weblogs_hit.registerTempTable("weblogs_hit")
## do some basic formatting and convert some values to uppercase
rows = sqlContext.sql("SELECT hit_timestamp, swid, ip_address, url, user_agent, UPPER(city) AS city, UPPER(country) AS country, UPPER(state) AS state from weblogs_hit")
## write to 1 parquet file
rows.coalesce(1).write.mode('overwrite').format("parquet").save("/data/weblogs_parsed_parquet")
... View more
Labels:
05-26-2016
04:07 PM
2 Kudos
@Sunile Manjee - Below is some sections from working PySpark code.
Notice how I set SparkConf with specific settings and then later in my code I execute Hive statements.
In those Hive statements you could do:
sql = "set mapred.input.dir.recursive=true" sqlContext.sql(sql)
Here is my SparkConf:
conf = (SparkConf() .setAppName(“ucs_data_profiling") .set("spark.executor.instances", “50”) .set("spark.executor.cores", 4) .set("spark.driver.memory", “2g") .set("spark.executor.memory", “6g") .set("spark.dynamicAllocation.enabled", “false”) .set("spark.shuffle.service.enabled", "true") .set("spark.io.compression.codec", "snappy") .set("spark.shuffle.compress", "true")) sc = SparkContext(conf = conf) sqlContext = HiveContext(sc) ## the rest of code parses files and converts to SchemaRDD ## lines of code etc........ ## lines of code etc........
## here i set some hive properties before I load my data into a hive table
## i have more HiveQL statements, i just show one here to demonstrate that this will work sqlContext.sql(sql) sql = """ set hive.exec.dynamic.partition.mode=nonstrict """
... View more
05-26-2016
06:43 AM
5 Kudos
What is CDC
Change Data Capture (CDC) in Data Warehousing typically refers to the process of capturing changes over time in Dimension tables. Dimension tables qualify Fact tables (measures) by containing information to answer questions around Who (e.g. Customer) , What (e.g. product), Where (e.g. Location), and When (e.g. Time).
This article steps will demonstrate how to implement a very basic and rudimentary solution to CDC in Hadoop using MySQL, Sqoop, Spark, and Hive. It includes basic PySpark code to get you started with using Spark Data Frames. In a real world example you would include audit tables to store information for each run.
How to do CDC in Hadoop
In the past, I have worked at a company in which CDC on Hadoop was a big challenge for us. Initially, we were using Hive to merge source changes (Sqoop extracts) with the existing Dimension tables and then building new tables. This worked well until our data volumes starting getting in the hundreds of millions of records. At those larger data volumes, not only did the CDC take longer, but memory and CPU consumption also increased causing resource contention on the Hadoop cluster. The solution was to use HBase and the HBase Bulk Loader to load the changes from the source.
The Hive update functionality was not a solution for our use case.
If you have small data volumes in the millions of records to even tens of millions of records, then Spark could suffice as a solution. The test I did below was performed on a Dimension table containing 5 million existing customer records and merging 1 million changed customer records + 4 new customer records. It took about 1 minute for the Spark code to complete. The cluster was an 8 node AWS cluster (M4 large instances) running HDP 2.4.
The solution below shows how to perform a Type 1 Slowly Changing Dimension (SCD). Type 1 SCD is a Data Warehouse term used to describe a type of CDC in which we overwrite the current value in the Dimension table with the new value from our source.
Again, this is a very basic way to perform CDC and was put together for demo purposes to show how easy it is to use Spark with Parquet files and join with existing Hive tables.
Problem:
The CUSTOMER Hive Dimension table needs to capture changes from the source MySQL database. It's currently at 5 million records. 1 Million customer records in the source has changes and 4 new customer records were added to the source.
Step 1:
Run Sqoop with the incremental option to get new changes from the source MySQL database and import this into HDFS as a Parquet file
The source MySQL database has the column modified_date. For each run we capture the maximum modified_date so that on the next run we get all records greater (>) than this date. Sqoop will do this automatically with the incremental option. You just specify the column name and give a value.
sqoop import --connect jdbc:mysql://ip-172-31-2-69.us-west-2.compute.internal:3306/mysql --username root --password password -table customer -m4 --target-dir /landing/staging_customer_update_spark --incremental lastmodified --check-column modified_date --last-value "2016-05-23 00:00:00" --as-parquetfile
Step 2:
Merge the data from the Sqoop extract with the existing Hive CUSTOMER Dimension table. Read the Parquet file extract into a Spark DataFrame and lookup against the Hive table to create a new table. Go to end of article to view the PySpark code with enough comments to explain what the code is doing. This is basic code to demonstrate how easily Spark integrates with Parquet to easily infer a schema and then perform SQL operations on the data.
Zeppelin screenshots:
The count of records in the CUSTOMER Dimension table before capturing new updates:
Hive query to compare counts of old table with new table:
Sample 5 records from the new table and compare with old table. Notice the changes for first_name, last_name, and modified_date:
New data:
What these records looked like before the changes:
PySpark code:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import os
from pyspark.sql import *
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import HiveContext
from pyspark.sql.types import *
## write to snappy compressed output
conf = (SparkConf()
.setAppName("spark_cdc")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.shuffle.service.enabled", "true")
.set("spark.rdd.compress", "true"))
sc = SparkContext(conf = conf)
sqlContext = HiveContext(sc)
## read parquet file generated by the Sqoop incremental extract from MySQL and imported into HDFS
path = "hdfs://ip-172-31-2-63.us-west-2.compute.internal:8020/landing/staging_customer_update_spark/*parquet*"
parquetFile = sqlContext.read.parquet(path)
parquetFile.registerTempTable("customer_extract");
sql = "DROP TABLE IF EXISTS customer_update_spark"
sqlContext.sql(sql)
sql = """
CREATE TABLE customer_update_spark
(
cust_no int
,birth_date date
,first_name string
,last_name string
,gender string
,join_date date
,created_date timestamp
,modified_date timestamp
)
STORED AS PARQUET
"""
sqlContext.sql(sql)
## get those records that did not change
## these are those records from the existing Dimension table that are not in the Sqoop extract
sql = """
INSERT INTO TABLE customer_update_spark
SELECT
a.cust_no,
a.birth_date,
a.first_name,
a.last_name,
a.gender,
a.join_date,
a.created_date,
a.modified_date
FROM customer a LEFT OUTER JOIN customer_extract b ON a.cust_no = b.cust_no
WHERE b.cust_no IS NULL
"""
sqlContext.sql(sql)
## get the changed records from the Parquet extract generated from Sqoop
## the dates in the Parquet file will be in epoch time with milliseconds
## this will be a 13 digit number
## we don't need milliseconds so only get first 10 digits and not all 13
## for birth_date and join date convert to date in format YYYY-MM-DD
## for created_date and modified date convert to format YYYY-MM-DD HH:MI:SS
sql = """
INSERT INTO customer_update_spark
SELECT
a.cust_no,
TO_DATE(FROM_UNIXTIME(CAST(SUBSTR(a.created_date, 1,10) AS INT))) AS birth_date,
a.first_name,
a.last_name,
a.gender,
TO_DATE(FROM_UNIXTIME(CAST(SUBSTR(a.join_date, 1,10) AS INT))) AS join_date,
FROM_UNIXTIME(CAST(SUBSTR(a.created_date, 1,10) AS INT)) AS created_date,
FROM_UNIXTIME(CAST(SUBSTR(a.modified_date, 1,10) AS INT)) AS modified_date
FROM customer_extract a
"""
sqlContext.sql(sql)
... View more
Labels:
05-25-2016
02:33 AM
1 Kudo
I could say this is a major issue. Users cannot use the shell interpreter to run a hive file containing DDL and DML statements. For example: %sh hive -f my_hive_file.hql Is there a way to resolve this?
... View more
05-25-2016
12:19 AM
I have a file with multiple HiveSQL statements. I can run the file from bash command line: hive -f my_file_name When I do this using the bash Interpreter in Zeppelin I receive the message: Listening for transport dt_socket at address: 8000 It's not really an error. The job just runs and hangs
... View more
05-24-2016
11:26 PM
2 Kudos
Listening for transport dt_socket at address: 8000 If I run a HiveQL file from Zeppelin, I receive the above error. Any workarounds?
... View more
Labels:
- Labels:
-
Apache Zeppelin
05-23-2016
11:15 PM
Here is some sample PySpark working code using the HiveContext:
run the code below via yarn using the command: $SPARK_HOME/bin/spark-submit --queue <if you have queue's setup> --master yarn-client <name_of_my_python_script> code: from pyspark.sql import * from pyspark import SparkConf, SparkContext, SQLContext from pyspark.sql import HiveContext conf = (SparkConf() .setAppName("bmathew_ucs_data_profiling") .set("spark.executor.instances", "40") .set("spark.executor.cores", 4) .set("spark.executor.memory", "5g")) sc = SparkContext(conf = conf) sqlContext = HiveContext(sc) cmd = "hadoop fs -rm -r /tmp/spatton_cdc_test1" os.system(cmd) cmd = "hadoop fs -mkdir /tmp/spatton_cdc_test1" os.system(cmd) cmd = "hadoop fs -chmod 777 /tmp/spatton_cdc_test1" os.system(cmd) sqlContext.sql("DROP TABLE IF EXISTS spatton_cdc_test1") sql = """ CREATE EXTERNAL TABLE spatton_cdc_test1 (
widget_key bigint
,widget_content_key bigint
,field_name string
,field_value string )
LOCATION '/tmp/spatton_cdc_test1' """ sqlContext.sql(sql) sql = """ INSERT INTO TABLE spatton_cdc_test1
SELECT w.widget_key,
wc.widget_content_key,
new.field_name,
new.field_value
FROM bloodhound_uniq_widget_content new
JOIN bloodhound_widget w
ON w.name = new.widget_name
JOIN bloodhound_widget_content wc
ON wc.widget_key = w.widget_key
AND wc.name = new.name
AND wc.type = new.type
AND wc.position = new.position """ sqlContext.sql(sql)
... View more
05-12-2016
07:24 PM
Does Informatica, when used with HDP, encrypt data in motion? For example, through an Informatica mapping we are reading from source DB2 tables and then writing this to Hive tables? Does Informatica authenticate and authorize with HDP through Knox and Ranger. I know that through Ranger we can set permissions on the Hive table for access including insert, read, and we can also setup up encryption on that Hive table for data at rest. But how about data in motion as the Informatica mapping is reading from DB2 and then writing to Hive, is this being encrypted?
... View more
Labels:
05-08-2016
10:51 AM
1 Kudo
I can setup a policy in Ranger for a certain Hive table so that only certain users can SELECT from the table. When I use the Beeline command-line interface and login as a user that DOES NOT have SELECT access to that table, then I will get an error, as expected, indicating permission denied. It's my understanding that Beeline communicates with HiveServer2 using HiveServer2’s Thrift APIs. However, if I am logged into a terminal as that same user who DOES NOT have SELECT access, and I launch the Hive CLI, then I am able to read from the table. I'm guessing this has to due with the Hive CLI not interacting with HiveServer 2? Is this the reason why I am able to SELECT from the table via Hive CLI, whereas through Beeline I am not able to SELECT from the table. My Ranger policy is set so that only certain users should be able to SELECT from the table. Beeline enforces this policy, whereas Hive CLI does not.
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache Ranger
- « Previous
- Next »