Created on 05-30-2016 02:36 PM - edited 08-17-2019 12:13 PM
This article will demonstrate using Spark (PySpark) with the S3A filesystem client to access data in S3.
The code can also be found on GitHub @ https://github.com/mathewbk/spark_s3a
The source data in the S3 bucket is Omniture clickstream data (weblogs). The goal is to write PySpark code against the S3 data to RANK geographic locations by page view traffic - which areas generate the most traffic by page view counts.
The S3A filesystem client (s3a://) is a replacement for the S3 Native (s3n://):
The code below is a working example tested using HDP 2.4.0.0-169 (Hadoop version 2.7.1.2.4.0.0-169)
Note that I am using the HiveContext and not the SQLContext. I am NOT querying or accessing any Hive tables, but I am using the RANK() function to perform a ranking. If you want to use the RANK() function on a Data Frame writing standard SQL that you are already familiar with, then you you have to use the HiveContext. If you use the SQLContext, then the code and syntax will look much different. Please refer to the PySpark documentation.
In short, it’s easier to use the HiveContext; however, this can be done using the SQLContext.
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem spark.driver.extraClassPath /usr/hdp/2.4.0.0-169/hadoop/hadoop-aws-2.7.1.2.4.0.0-169.jar:/usr/hdp/2.4.0.0-169/hadoop/lib/aws-java-sdk-1.7.4.jar spark.hadoop.fs.s3a.access.key <your access key> spark.hadoop.fs.s3a.secret.key <your secret key>
from pyspark import SparkConf, SparkContext, SQLContext from pyspark.sql import * ## set Spark properties conf = (SparkConf() .setAppName("s3a_test") .set("spark.executor.instances", "8") .set("spark.executor.cores", 2) .set("spark.shuffle.compress", "true") .set("spark.io.compression.codec", "snappy") .set("spark.executor.memory", "2g")) sc = SparkContext(conf = conf) ## create SQL SQLContext sqlContext = HiveContext(sc) ## path to S3 bucket containing my files path = "s3a://bkm-clickstream/Omniture/*" ## get those fields we need to create the schema. file is tab delimited lines = sc.textFile(path) parts = lines.map(lambda l: l.split("\t")) weblogs_hit = parts.map(lambda p: Row(url=p[12], city=p[49], country = p[50], state = p[52])) ## create DataFrame schema_weblogs_hit = sqlContext.createDataFrame(weblogs_hit) ## register DataFrame as a temporary table schema_weblogs_hit.registerTempTable("weblogs_hit") ## RANK pageview count by geographic location - which areas generate the most traffic in terms of page views rows = sqlContext.sql("SELECT m.location, m.page_view_count, RANK() OVER (ORDER BY m.page_view_count DESC) AS ranking FROM (SELECT CONCAT(UPPER(city),',',UPPER(country),',',UPPER(state)) AS location, count(1) AS page_view_count FROM weblogs_hit GROUP BY city, country, state ORDER BY page_view_count) m LIMIT 10") ## run SQL command and display output output = rows.collect() for row in output: row = str(row) print "%s" % (row)
/your_path_to_spark-submit --master yarn-client --properties-file <your_properties_file.conf> <your_pyspark_script.py>
Created on 09-22-2016 11:20 PM
Thanks Binu for the guide . This is helpful. Could you please point me to similar guide for HBase? Also, what services are supported with S3A filesystem and HDP?