Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar

SYNOPSIS

This article will demonstrate using Spark (PySpark) with the S3A filesystem client to access data in S3.

The code can also be found on GitHub @ https://github.com/mathewbk/spark_s3a

The source data in the S3 bucket is Omniture clickstream data (weblogs). The goal is to write PySpark code against the S3 data to RANK geographic locations by page view traffic - which areas generate the most traffic by page view counts.

The S3A filesystem client (s3a://) is a replacement for the S3 Native (s3n://):

  • It uses Amazon’s libraries to interact with S3
  • Supports larger files
  • Higher performance
  • Supports IAM role-based authentication
  • Production stable since Hadoop 2.7 (per Apache website)

The code below is a working example tested using HDP 2.4.0.0-169 (Hadoop version 2.7.1.2.4.0.0-169)

Note that I am using the HiveContext and not the SQLContext. I am NOT querying or accessing any Hive tables, but I am using the RANK() function to perform a ranking. If you want to use the RANK() function on a Data Frame writing standard SQL that you are already familiar with, then you you have to use the HiveContext. If you use the SQLContext, then the code and syntax will look much different. Please refer to the PySpark documentation.

In short, it’s easier to use the HiveContext; however, this can be done using the SQLContext.

STEP 1: Create a Spark properties file

  • Store your AWS credentials in a configuration file.
  • Specify the location for the AWS jars needed to interact with S3A. Two are required, hadoop-aws and aws-java-sdk.
  • Tab delimited file.
spark.hadoop.fs.s3a.impl        org.apache.hadoop.fs.s3a.S3AFileSystem
spark.driver.extraClassPath     /usr/hdp/2.4.0.0-169/hadoop/hadoop-aws-2.7.1.2.4.0.0-169.jar:/usr/hdp/2.4.0.0-169/hadoop/lib/aws-java-sdk-1.7.4.jar
spark.hadoop.fs.s3a.access.key  <your access key>
spark.hadoop.fs.s3a.secret.key  <your secret key>

STEP 2: PySpark code

from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import *

## set Spark properties
conf = (SparkConf()
         .setAppName("s3a_test")
         .set("spark.executor.instances", "8")
         .set("spark.executor.cores", 2)
         .set("spark.shuffle.compress", "true")
         .set("spark.io.compression.codec", "snappy")
         .set("spark.executor.memory", "2g"))
sc = SparkContext(conf = conf)

## create SQL SQLContext
sqlContext = HiveContext(sc)

## path to S3 bucket containing my files
path = "s3a://bkm-clickstream/Omniture/*"

## get those fields we need to create the schema. file is tab delimited
lines = sc.textFile(path)
parts = lines.map(lambda l: l.split("\t"))
weblogs_hit = parts.map(lambda p: Row(url=p[12], city=p[49], country = p[50], state = p[52]))

## create DataFrame
schema_weblogs_hit = sqlContext.createDataFrame(weblogs_hit)

## register DataFrame as a temporary table
schema_weblogs_hit.registerTempTable("weblogs_hit")

## RANK pageview count by geographic location - which areas generate the most traffic in terms of page views
rows = sqlContext.sql("SELECT m.location, m.page_view_count, RANK() OVER (ORDER BY m.page_view_count DESC) AS ranking FROM (SELECT CONCAT(UPPER(city),',',UPPER(country),',',UPPER(state)) AS location, count(1) AS page_view_count FROM weblogs_hit GROUP BY city, country, state ORDER BY page_view_count) m LIMIT 10")

## run SQL command and display output
output = rows.collect()
for row in output:
  row = str(row)
  print "%s" % (row)

STEP 3: Run PySpark code via YARN

/your_path_to_spark-submit --master yarn-client --properties-file <your_properties_file.conf> <your_pyspark_script.py>

OUTPUT:

4649-screen-shot-2016-05-29-at-111322-am.png

29,136 Views
Comments
avatar
Explorer

Thanks Binu for the guide . This is helpful. Could you please point me to similar guide for HBase? Also, what services are supported with S3A filesystem and HDP?