Member since
02-26-2016
100
Posts
111
Kudos Received
12
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2084 | 05-04-2017 12:38 PM | |
4051 | 03-21-2017 06:18 PM | |
13602 | 03-21-2017 01:14 AM | |
4412 | 02-14-2017 06:21 PM | |
7133 | 02-09-2017 03:49 AM |
08-08-2016
08:45 PM
1 Kudo
When using RDD’s in your Java or Scala Spark code, Spark distributes the data to nodes within the cluster by using the default Java serialization. For Java and Scala objects, Spark has to send the data and structure between nodes. Java serialization doesn’t result in small byte-arrays, whereas Kyro serialization does produce smaller byte-arrays. Thus, you can store more using the same amount of memory when using Kyro. Furthermore, you can also add compression such as snappy. WIth RDD's and Java serialization there is also an additional overhead of garbage collection. If your working with RDD's, use Kyro serialization. With DataFrames, a schema is used to describe the data and Spark only passes data between nodes, not the structure. Thus, for certain types of computation on specific file formats you can expect faster performance. It's not 100% true that DataFrames always outperform RDD's. Please see my post here: https://community.hortonworks.com/content/kbentry/42027/rdd-vs-dataframe-vs-sparksql.html
... View more
07-05-2016
06:41 PM
I'm saying that I did what you recommended. I want into Ambari and changed the property for Enable Authorization to both 'false' and 'none' . tried both. still does not fix the issue.
... View more
07-04-2016
09:19 PM
That still didn't solve the issue. I set 'Enable Authorization' to false and I still get the same error message: FAILED: RuntimeException java.io.FileNotFoundException: /etc/hive/2.5.0.0-817/0/xasecure-audit.xml (No such file or directory)
... View more
07-03-2016
04:35 PM
installed HDP 2.5 TP, keep getting this error for Hive: FAILED: SemanticException java.lang.RuntimeException: java.io.FileNotFoundException: /etc/hive/2.5.0.0-817/0/xasecure-audit.xml (No such file or directory)
... View more
Labels:
- Labels:
-
Apache Hadoop
-
Apache Hive
06-28-2016
08:02 PM
4 Kudos
Synopsis This tutorial will demonstrate using Spark for data processing operations on a large set of data consisting of pipe delimited text files. Delimited text files are a common format seen in Data Warehousing:
Random lookup for a single record Grouping data with aggregation and sorting the output 3 Different techniques will be used to solve the above 2 problems and then compare how they perform against each other:
Using RDD’s Using DataFrames Using SparkSQL The
purpose of this tutorial is to provide you with code snippets for the
above 3 techniques and to demonstrate how RDD’s outperform DataFrames
and SparkSQL for certain types of data processing. See below at the end
of this article for all code. Also, these tests are demonstrating the native functionality within Spark for RDDs, DataFrames, and SparkSQL without calling additional modules/readers for file format conversions or other optimizations. For joining datasets, DataFrames and SparkSQL are much more intuitive to use, especially SparkSQL, and may perhaps yield better performance results than RDDs. Source Data:
9 Million unique order records across 3 files in HDFS Total size of HDFS data files is 1.4 GB
Each order record could be for 1 of 8 different products Pipe delimited text files with each record containing 11 fields
Data is fictitious and was auto-generated programmatically Environment: HDP 2.4 Hadoop version 2.7 Spark 1.6 HDP Sandbox
RDDs, DataFrames, and SparkSQL: At its core, Spark operates on the concept of Resilient Distributed Datasets, or RDD’s:
Resilient - if data in memory is lost, it can be recreated Distributed - immutable distributed collection of objects in memory partitioned across many data nodes in a cluster Dataset - initial data can from from files, be created programmatically, from data in memory, or from another RDD
DataFrames API is a data abstraction framework that organizes your data into named columns:
Create a schema for the data Conceptually equivalent to a table in a relational database Can be constructed from many sources including structured data files, tables in Hive, external databases, or existing RDDs Provides a relational view of the data for easy SQL like data manipulations and aggregations Under the hood, it is a row of RDD’s
SparkSQL is a Spark module for structured data processing. You can interact with SparkSQL through:
SQL DataFrames API Datasets API
Test results: RDD’s outperformed DataFrames and SparkSQL for certain types of data processing DataFrames and SparkSQL performed almost about the same, although with analysis involving aggregation and sorting SparkSQL had a slight advantage Syntactically speaking, DataFrames and SparkSQL are much more intuitive than using RDD’s Took the best out of 3 for each test Times were consistent and not much variation between tests Jobs were run individually with no other jobs running Random lookup against 1 order ID from 9 Million unique order ID's GROUP all the different products with their total COUNTS and SORT DESCENDING by product name Code: RDD Random Lookup #!/usr/bin/env python
from time import time
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
.setAppName("rdd_random_lookup")
.set("spark.executor.instances", "10")
.set("spark.executor.cores", 2)
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.shuffle.service.enabled", "false")
.set("spark.executor.memory", "500MB"))
sc = SparkContext(conf = conf)
t0 = time()
path = "/data/customer_orders*"
lines = sc.textFile(path)
## filter where the order_id, the second field, is equal to 96922894
print lines.map(lambda line: line.split('|')).filter(lambda line: int(line[1]) == 96922894).collect()
tt = str(time() - t0)
print "RDD lookup performed in " + tt + " seconds" DataFrame Random Lookup #!/usr/bin/env python
from time import time
from pyspark.sql import *
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
.setAppName("data_frame_random_lookup")
.set("spark.executor.instances", "10")
.set("spark.executor.cores", 2)
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.shuffle.service.enabled", "false")
.set("spark.executor.memory", "500MB"))
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
t0 = time()
path = "/data/customer_orders*"
lines = sc.textFile(path)
## create data frame
orders_df = sqlContext.createDataFrame( lines.map(lambda l: l.split("|")) .map(lambda p: Row(cust_id=int(p[0]), order_id=int(p[1]), email_hash=p[2], ssn_hash=p[3], product_id=int(p[4]), product_desc=p[5], country=p[6], state=p[7], shipping_carrier=p[8], shipping_type=p[9], shipping_class=p[10] ) ) )
## filter where the order_id, the second field, is equal to 96922894
orders_df.where(orders_df['order_id'] == 96922894).show()
tt = str(time() - t0)
print "DataFrame performed in " + tt + " seconds" SparkSQL Random Lookup #!/usr/bin/env python
from time import time
from pyspark.sql import *
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
.setAppName("spark_sql_random_lookup")
.set("spark.executor.instances", "10")
.set("spark.executor.cores", 2)
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.shuffle.service.enabled", "false")
.set("spark.executor.memory", "500MB"))
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
t0 = time()
path = "/data/customer_orders*"
lines = sc.textFile(path)
## create data frame
orders_df = sqlContext.createDataFrame( lines.map(lambda l: l.split("|")) .map(lambda p: Row(cust_id=int(p[0]), order_id=int(p[1]), email_hash=p[2], ssn_hash=p[3], product_id=int(p[4]), product_desc=p[5], country=p[6], state=p[7], shipping_carrier=p[8], shipping_type=p[9], shipping_class=p[10] ) ) )
## register data frame as a temporary table
orders_df.registerTempTable("orders")
## filter where the customer_id, the first field, is equal to 96922894
print sqlContext.sql("SELECT * FROM orders where order_id = 96922894").collect()
tt = str(time() - t0)
print "SparkSQL performed in " + tt + " seconds" RDD with GroupBy, Count, and Sort Descending #!/usr/bin/env python
from time import time
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
.setAppName("rdd_aggregation_and_sort")
.set("spark.executor.instances", "10")
.set("spark.executor.cores", 2)
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.shuffle.service.enabled", "false")
.set("spark.executor.memory", "500MB"))
sc = SparkContext(conf = conf)
t0 = time()
path = "/data/customer_orders*"
lines = sc.textFile(path)
counts = lines.map(lambda line: line.split('|')) .map(lambda x: (x[5], 1)) .reduceByKey(lambda a, b: a + b) .map(lambda x:(x[1],x[0])) .sortByKey(ascending=False)
for x in counts.collect():
print x[1] + '\t' + str(x[0])
tt = str(time() - t0)
print "RDD GroupBy performed in " + tt + " seconds"
DataFrame with GroupBy, Count, and Sort Descending #!/usr/bin/env python
from time import time
from pyspark.sql import *
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
.setAppName("data_frame_aggregation_and_sort")
.set("spark.executor.instances", "10")
.set("spark.executor.cores", 2)
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.shuffle.service.enabled", "false")
.set("spark.executor.memory", "500MB"))
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
t0 = time()
path = "/data/customer_orders*"
lines = sc.textFile(path)
## create data frame
orders_df = sqlContext.createDataFrame( lines.map(lambda l: l.split("|")) .map(lambda p: Row(cust_id=int(p[0]), order_id=int(p[1]), email_hash=p[2], ssn_hash=p[3], product_id=int(p[4]), product_desc=p[5], country=p[6], state=p[7], shipping_carrier=p[8], shipping_type=p[9], shipping_class=p[10] ) ) )
results = orders_df.groupBy(orders_df['product_desc']).count().sort("count",ascending=False)
for x in results.collect():
print x
tt = str(time() - t0)
print "DataFrame performed in " + tt + " seconds" SparkSQL with GroupBy, Count, and Sort Descending #!/usr/bin/env python
from time import time
from pyspark.sql import *
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
.setAppName("spark_sql_aggregation_and_sort")
.set("spark.executor.instances", "10")
.set("spark.executor.cores", 2)
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.shuffle.service.enabled", "false")
.set("spark.executor.memory", "500MB"))
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
t0 = time()
path = "/data/customer_orders*"
lines = sc.textFile(path)
## create data frame
orders_df = sqlContext.createDataFrame(lines.map(lambda l: l.split("|")) .map(lambda r: Row(product=r[5])))
## register data frame as a temporary table
orders_df.registerTempTable("orders")
results = sqlContext.sql("SELECT product, count(*) AS total_count FROM orders GROUP BY product ORDER BY total_count DESC")
for x in results.collect():
print x
tt = str(time() - t0)
print "SparkSQL performed in " + tt + " seconds" Verification:
Source data files in HDFS:
Total size of source data files in HDFS: Order ID is second field in pipe delimited file. There are 9 Million unique order ID records: Output produced by GroupBy, Count, and Sort Descending (format will not be same for all, however, numbers will be same):
... View more
Labels:
06-17-2016
02:45 PM
increase heap space to be at least 471 MB. try to set VM options to -Xmx512m.
... View more
06-13-2016
07:00 PM
If your looking for a way to just delete the S3 path from your code and if your using PySpark, then the following will work: import os cmd="hdfs dfs -rm -r -skipTrash s3a://my-bucket/test_delete_me" os.system(cmd)
... View more
06-10-2016
10:37 PM
2 Kudos
Platform: HDP 2.4 (Sandbox)
Hadoop version: 2.7
OS: CentOS 6.8 For every data node that Spark will run on, Python 2.7 and all dependent libraries that your code uses must be installed on each data node - e.g.Pandas, Matplotlib, Scipy Below are the steps I performed to get iPython up and running. My notebook jobs are executed via YARN.
I did this install as ‘root’ user.
STEP 1 Install/configure the following 5 iPython dependencies by typing in the following commands:
yum install nano centos-release-SCL zlib-devel yum install bzip2-devel openssl-devel ncurses-devel yum install sqlite-devel readline-devel tk-devel yum install gdbm-devel db4-devel libpcap-devel xz-devel yum install libpng-devel libjpg-devel atlas-devel
STEP 2 iPython has a requirement for Python 2.7 or higher.
Check which version of Python you are using: In my case, I had an older version of Python so I then had to install a new version. Install the “Development tools” dependency for Python 2.7 Command to run: yum groupinstall "Development
tools" Install Python 2.7:
wget http://python.org/ftp/python/2.7.6/Python-2.7.6.tar.xz tar xf Python-2.7.6.tar.xz cd Python-2.7.6 ./configure --prefix=/usr/local --enable-unicode=ucs4
--enable-shared LDFLAGS="-Wl,-rpath /usr/local/lib" make && make altinstall source /opt/rh/python27/enable Verify 2.7 is there
STEP 3 Download easy_install to configure pip (Python package
installer):
wget https://bitbucket.org/pypa/setuptools/raw/0.7.4/ez_setup.py python ez_setup.py
These are a good set of libraries to start with. Install
the following data science packages:
pip install numpy scipy pandas pip install scikit-learn tornado pyzmq pip install pygments matplotlib jsonschema pip install jinja2 --upgrade
STEP 4 Install iPython notebook on a node with the Spark Client Command to run: pip install "ipython[notebook]"
STEP 5 Create a IPython profile for pyspark Command to run: ipython profile create pyspark
STEP 6 Create a jupyter config file Command to run: jupyter notebook --generate-config This will create file /root/.jupyter/jupyter_notebook_config.py
STEP 7 Create the shell script: start_ipython_notebook.sh Add this to file: #!/bin/bash
source /opt/rh/python27/enable
IPYTHON_OPTS="notebook --port 8889 --notebook-dir='/usr/hdp/2.3.2.0-2950/spark/' --ip='*' --no-browser" pyspark
Give the shell script execute permissions. Chmod 755 start_ipython_notebook.sh
STEP 8 Add a port forwarding rule for port 8889
STEP 9
Edit bash_profile and set PYSPARK_PYTHON with the path to
python 2.7, or you could set this in your notebook. My PySpark code example
shows how to do this. If this is not set, then you will see the following error in
your notebook after submitting code: Cannot run program "python2.7": error=2, No such
file or directory You might need to set the following also, however, I did
not: HADOOP_HOME JAVA_HOME PYTHONPATH
STEP 10 Run iPython: ./start_ipython_notebook.sh
STEP 11 Start a Notebook Below is my PySpark code (from notebook), a screenshot of my
my job running in Resource Manager showing YARN resource allocation, and the
output shown in the notebook. CODE:
## stop existing SparkContext
## i did this because i create a new SparkContext with my specific properties
sc.stop()
import os
from pyspark import SparkContext, SparkConf
## set path to Python 2.7
os.environ["PYSPARK_PYTHON"] = "/usr/local/bin/python2.7"
sc = SparkContext(
conf = SparkConf()
.setMaster("yarn-client")
.setAppName("ipython_yarn_test")
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.executor.instances", "4")
.set("spark.executor.cores", 1)
.set("spark.executor.memory", "1G"))
## get a word count from a file in HDFS and list them in order by counts
## only showing top 10
text_file = sc.textFile("/tmp/test_data.dat")
word_counts = text_file .flatMap(lambda line: line.split()) .map(lambda word: (word, 1)) .reduceByKey(lambda a, b: a + b) .map(lambda x:(x[1],x[0])) .sortByKey(ascending=False)
word_counts.take(10)
STEP 12 Finally, run the code in the Notebook. Below is the output from the Notebook and Resource Manager showing the job.
... View more
Labels:
06-02-2016
04:02 PM
3 Kudos
I have faced this issue numerous times as well: "“WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources” The problem was with Dynamic Resource Allocation over allocating. After turning off Dynamic Resource allocation and then specifying number of executors, executor memory, and cores, my jobs were running. Turn off Dynamic Resource Allocation: conf = (SparkConf() .setAppName("my_job_name") .set("spark.shuffle.service.enabled", "false") .set("spark.dynamicAllocation.enabled", "false") .set("spark.io.compression.codec", "snappy") .set("spark.rdd.compress", "true")) sc = SparkContext(conf = conf) Give values with spark submit (you could also set these in SparkConf as well): /usr/hdp/2.3.4.0-3485/spark/bin/spark-submit --master yarn --deploy-mode client /home/ec2-user/scripts/validate_employees.py --driver-memory 3g --executor-memory 3g --num-executors 4 --executor-cores 2
... View more
05-30-2016
04:57 PM
@Kirk Haslbeck - I was working on something similar. Writing PySpark to use SparkSQL to analyze data in S3 using the S3A filesystem client. I documented my work with instructions here: https://community.hortonworks.com/articles/36339/spark-s3a-filesystem-client-from-hdp-to-access-s3.html
... View more