- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Created on 05-30-2016 02:36 PM - edited 08-17-2019 12:13 PM
SYNOPSIS
This article will demonstrate using Spark (PySpark) with the S3A filesystem client to access data in S3.
The code can also be found on GitHub @ https://github.com/mathewbk/spark_s3a
The source data in the S3 bucket is Omniture clickstream data (weblogs). The goal is to write PySpark code against the S3 data to RANK geographic locations by page view traffic - which areas generate the most traffic by page view counts.
The S3A filesystem client (s3a://) is a replacement for the S3 Native (s3n://):
- It uses Amazon’s libraries to interact with S3
- Supports larger files
- Higher performance
- Supports IAM role-based authentication
- Production stable since Hadoop 2.7 (per Apache website)
The code below is a working example tested using HDP 2.4.0.0-169 (Hadoop version 2.7.1.2.4.0.0-169)
Note that I am using the HiveContext and not the SQLContext. I am NOT querying or accessing any Hive tables, but I am using the RANK() function to perform a ranking. If you want to use the RANK() function on a Data Frame writing standard SQL that you are already familiar with, then you you have to use the HiveContext. If you use the SQLContext, then the code and syntax will look much different. Please refer to the PySpark documentation.
In short, it’s easier to use the HiveContext; however, this can be done using the SQLContext.
STEP 1: Create a Spark properties file
- Store your AWS credentials in a configuration file.
- Specify the location for the AWS jars needed to interact with S3A. Two are required, hadoop-aws and aws-java-sdk.
- Tab delimited file.
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem spark.driver.extraClassPath /usr/hdp/2.4.0.0-169/hadoop/hadoop-aws-2.7.1.2.4.0.0-169.jar:/usr/hdp/2.4.0.0-169/hadoop/lib/aws-java-sdk-1.7.4.jar spark.hadoop.fs.s3a.access.key <your access key> spark.hadoop.fs.s3a.secret.key <your secret key>
STEP 2: PySpark code
from pyspark import SparkConf, SparkContext, SQLContext from pyspark.sql import * ## set Spark properties conf = (SparkConf() .setAppName("s3a_test") .set("spark.executor.instances", "8") .set("spark.executor.cores", 2) .set("spark.shuffle.compress", "true") .set("spark.io.compression.codec", "snappy") .set("spark.executor.memory", "2g")) sc = SparkContext(conf = conf) ## create SQL SQLContext sqlContext = HiveContext(sc) ## path to S3 bucket containing my files path = "s3a://bkm-clickstream/Omniture/*" ## get those fields we need to create the schema. file is tab delimited lines = sc.textFile(path) parts = lines.map(lambda l: l.split("\t")) weblogs_hit = parts.map(lambda p: Row(url=p[12], city=p[49], country = p[50], state = p[52])) ## create DataFrame schema_weblogs_hit = sqlContext.createDataFrame(weblogs_hit) ## register DataFrame as a temporary table schema_weblogs_hit.registerTempTable("weblogs_hit") ## RANK pageview count by geographic location - which areas generate the most traffic in terms of page views rows = sqlContext.sql("SELECT m.location, m.page_view_count, RANK() OVER (ORDER BY m.page_view_count DESC) AS ranking FROM (SELECT CONCAT(UPPER(city),',',UPPER(country),',',UPPER(state)) AS location, count(1) AS page_view_count FROM weblogs_hit GROUP BY city, country, state ORDER BY page_view_count) m LIMIT 10") ## run SQL command and display output output = rows.collect() for row in output: row = str(row) print "%s" % (row)
STEP 3: Run PySpark code via YARN
/your_path_to_spark-submit --master yarn-client --properties-file <your_properties_file.conf> <your_pyspark_script.py>
OUTPUT:
Created on 09-22-2016 11:20 PM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
Thanks Binu for the guide . This is helpful. Could you please point me to similar guide for HBase? Also, what services are supported with S3A filesystem and HDP?