Member since
02-26-2016
100
Posts
111
Kudos Received
12
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2150 | 05-04-2017 12:38 PM | |
4212 | 03-21-2017 06:18 PM | |
13876 | 03-21-2017 01:14 AM | |
4518 | 02-14-2017 06:21 PM | |
7331 | 02-09-2017 03:49 AM |
11-30-2016
06:21 AM
I notice when a cluster is idle (no jobs running) that the Resource Manager heap usage fluctuates between 6% up to 32%. This is when no jobs are running on the cluster. Is it normal for heap usage to get this high? I start the cluster and all services via Ambari, heap is at 6%, goes up to 32%, drops, and then repeats.
... View more
Labels:
- Labels:
-
Cloudera Manager
11-29-2016
10:07 PM
The maximum number of files in HDFS depends on the amount of memory
available for the NameNode. Each file object and each block object
takes about 150 bytes of the memory. For example, if you have 10 million files
and each file has 1 one block each, then you would need about 3GB of
memory for the NameNode. If I had 10 million files, each using a block, then we would be using:
10 million + 10 million = 20 million * 150 = 3,000,000,000 bytes = 3 GB MEMORY. Keep in mind the NameNode will need memory for other processes. So to support 10 million files then your NameNode will need much more than 3GB of memory.
... View more
11-28-2016
06:15 AM
11 Kudos
SYNOPSIS
The Optimized Row Columnar (ORC) file is a columnar storage format for Hive. Specific Hive configuration settings for ORC formatted tables can improve query performance resulting in faster execution and reduced usage of computing resources. Some of these settings may already be turned
on by default, whereas others require some educated guesswork.
The table below compares Tez job statistics for the same Hive query that was submitted without and with certain configuration settings. Notice the performance gains with optimization. This article will explain how the performance improvements were achieved.
QUERY EXECUTION
Source Data:
102,602,110 Clickstream page view records across 5 days of data for multiple countries
Table is partitioned by date in the format YYYY-MM-DD.
There are no indexes and table is not bucketed.
The HiveQL is ranking each page per user by how many times the user viewed that page for a specific date and within the United States. Breakdown of the query:
Scan all the page views for each user.
Filter for page views on 1 date partition and only include traffic in the United States.
For each user, rank each page in terms of how many times it was viewed by that user.
For example, I view Page A 3 times and Page B once. Page A would rank 1 and Page B would rank 2.
Without optimization
With optimization
Notice the change in reducers
The final output size of all the reducers is 920 MB.
For the first run, 73 reducers completed resulting in 73 output files. This is excessive. 920 MB into 73 reducers is around 12.5 MB per reducer output. This is unnecessary overhead resulting in too many small files. More parallelism does not always equate to better performance.
The second run launched 10 reducers resulting in 10 reduce files. 920 MB into 10 reducers is about 92 MB per reducer output. Much less overhead and we don’t run into the small files problem. The maximum number of files in HDFS depends on the amount of memory available in the NameNode. Each block, file, and directory in HDFS is represented as an object in the NameNode’s memory each of which occupies about 150 Bytes.
OPTIMIZATION
Always collect statistics on those tables for which data changes frequently. Schedule an automated ETL job to run at certain times:
ANALYZE TABLE page_views_orc COMPUTE STATISTICS FOR COLUMNS;
Run the Hive query with the following settings:
SET hive.optimize.ppd=true;
SET hive.optimize.ppd.storage=true;
SET hive.vectorized.execution.enabled=true;
SET hive.vectorized.execution.reduce.enabled = true;
SET hive.cbo.enable=true;
SET hive.compute.query.using.stats=true;
SET hive.stats.fetch.column.stats=true;
SET hive.stats.fetch.partition.stats=true;
SET hive.tez.auto.reducer.parallelism=true;
SET hive.tez.max.partition.factor=20;
SET hive.exec.reducers.bytes.per.reducer=128000000;
Partition your tables by date if you are storing a high volume of data per day. Table management becomes easier. You can easily drop partitions that are no longer needed or for which data has to be reprocessed.
SUMMARY
Let’s look at each of the Hive settings.
Enable predicate pushdown (PPD) to filter at the storage layer:
SET hive.optimize.ppd=true;
SET hive.optimize.ppd.storage=true
Vectorized query execution processes data in batches of 1024 rows instead of one by one:
SET hive.vectorized.execution.enabled=true;
SET hive.vectorized.execution.reduce.enabled=true;
Enable the Cost Based Optimizer (COB) for efficient query execution based on cost and fetch table statistics:
SET hive.cbo.enable=true;
SET hive.compute.query.using.stats=true;
SET hive.stats.fetch.column.stats=true;
SET hive.stats.fetch.partition.stats=true;
Partition and column statistics from fetched from the metastsore. Use this with caution. If you have too many partitions and/or columns, this could degrade performance.
Control reducer output:
SET hive.tez.auto.reducer.parallelism=true;
SET hive.tez.max.partition.factor=20;
SET hive.exec.reducers.bytes.per.reducer=128000000;
This last set is important. The first run produced 73 output files with each file being around 12.5 MB in size. This is inefficient as I explained earlier. With the above settings, we are basically telling Hive an approximate maximum number of reducers to run with the caveat that the size for each reduce output should be restricted to 128 MB. Let's examine this:
The parameter hive.tez.max.partition.factor is telling Hive to launch up to 20 reducers. This is just a guess on my part and Hive will not necessarily enforce this. My job completed with only 10 reducers - 10 output files.
Since I set a value of 128 MB for hive.exec.reducers.bytes.per.reducer, Hive will try to fit the reducer output into files that are come close to 128 MB each and not just run 20 reducers.
If I did not set hive.exec.reducers.bytes.per.reducer, then Hive would have launched 20 reducers, because my query output would have allowed for this. I tested this and 20 reducers ran.
128 MB is an approximation for each reducer output when setting hive.exec.reducers.bytes.per.reducer. In this example the total size of the output files is 920 MB. Hive launched 10 reducers which is about 92 MB per reducer file. When I set this to 64 MB, then Hive launched the 20 reducers with each file being around 46 MB.
If hive.exec.reducers.bytes.per.reducer is set to a very high value then you will have fewer reducers than if set to a lower value. Higher values result in fewer reducers being launched which can also degrade performance. You need just the right level of parallelism.
... View more
Labels:
11-22-2016
01:26 AM
1 Kudo
If using Cloudbreak to launch a cluster on AWS, can you specify a database (mysql, postgresql) in Amazon RDS to store the metadata for components such as Hive, Oozie, Ambari? What If I want to use RDS instead of having cloudbreak setup the databases locally on the cluster. Or is the option then to have Cloudbrek create the database locally on my cluster, and then change the config files to point to a new database in Amazon RDS (I will copy over metadata from existing local repo to RDS).
... View more
Labels:
- Labels:
-
Hortonworks Cloudbreak
11-18-2016
09:57 AM
11 Kudos
Synopsis
You are a marketing analyst tasked with developing a BI dashboard for management that reports on website activity in realtime throughout the day for Black Friday, the day after Thanksgiving.
Unfortunately, all of the data engineers are busy with other projects and cannot immediately help you.
Not to worry. With Hortonworks DataFlow, powered by Apache NiFi, you can easily create a data flow in minutes to process the data needed for the dashboard.
Source Data:
Fictitious online retailer Initech Corporation
JSON Formatted Clickstream data
Your company stores the raw Clickstream log files in HDFS. The JSON formatted log files are written to HDFS directories partitioned by day and hour. For example:
/data/clickstream/2016-11-20/hour=00
/data/clickstream/2016-11-20/hour=01
/data/clickstream/2016-11-20/hour=...
/data/clickstream/2016-11-20/hour=23
There could also be subdirectories under the hour
This directory structure or a similar variant is commonly found in organizations when storing Clickstream logs
The JSON data contains nested hierarchies. Sample JSON record:
Solution considerations:
New files are written throughout the day for each hour and late arriving data is added to existing hours.
Since files are constantly arriving, you need the data flow to always be on and running
HBase is a fast and scalable option for the backend datastore:
Data is semi-structured JSON
HBase does not require a Schema
HBase was designed for high throughput writes, whereas Hive is better suited for batch loads
Use Apache Phoenix to create a view atop the HBase table:
Phoenix creates the schema for reporting
Phoenix lets you write ANSI SQL queries against the HBase data
Phoenix JDBC driver allows any BI tool to connect
HDF DataFlow Overview:
* You may need to increase the heap size used by NiFi. You will know if/when you start getting the error: java.lang.OutOfMemoryError
Throughput:
HDF loaded 215,699 records into HBase in under 33 seconds
I'm running HDF on a single node with 64 GB Ram and 16 cores
There are other Hadoop and Hadoop related services running on this node including Node Manager, DataNode, Livy Server, RegionServer, Phoenix Query Server, NFSGateway, Metrics Monitor
Step 1:
Create the HBase table. You can easily do this via Apache Zeppelin.
Create 1 column family 'pageview'
Multiple versions are not needed as there are no updates to this data. Each website hit is unique.
Step 2:
Create the Phoenix view. You can easily do this via Apache Zeppelin.
Step 3:
Configure the ListHDFS processor
The ListHDFS processor will only list new files as they are written to directories
The DataFlow will be constantly running so we do not want to process the same data twice. Thus, the ListHDFS processor will prevent us from processing the same data twice
Our top-level directory is the Clickstream data for 2016-11-20
ListHDFS will list all new files beneath the subdirectories under the top-level directory of 2016-11-20
Notice that the value for 'Directory' is the top-level directory '/data/clickstream/2016-11-20'
The value for 'Recursive Subdirectories' is set to 'true' so that all new files will be picked up as they are added
The Hadoop Configuration Resources are the full path locations to core-site.xml and hdfs-site.xml
Step 4:
Configure the FetchHDFS processor
The ListHDFS processor will list files
FetchHDFS will retrieve the contents from the files listed
Step 5:
Use the SplitText processor to split each JSON record from the HDFS files
I used the processor as is without any changes, other than renaming it to 'Split_Lines_In_File'
Step 6:
Configure the JoltTransformJSON processor to extract only those fields needed for the dashboard.
We only need the following fields from the JSON data:
event ID - unique identifier for each website hit
event time - time when the website hit occurred
url - url of the page viewed
country - country that the page was served
session id - session id for the user
user cookie
os - the users device os
browser - browser that the user used
Some of the above fields are child elements for a top-level parent. The property 'Jolt Specification' needs to be set so that we properly extract those fields:
Step 7:
Configure the PutHBaseJSON processor to store the record.
Apache Zeppelin Notebook:
... View more
09-30-2016
01:56 PM
3 Kudos
Use Apache Spark to connect to SQL Server, extract a table from SQL Server, and load the extracted rows into a Hive table:
You will need to download JDBC driver for SQL Server Download the driver from: https://www.microsoft.com/en-us/download/details.aspx?id=11774 Run code: /usr/bin/spark-submit --driver-class-path /home/centos/sqljdbc_4.0/enu/sqljdbc4.jar --master yarn-client my_script.py
import os
from pyspark import SparkConf,SparkContext
from pyspark.sql import HiveContext
conf = (SparkConf()
.setAppName("data_import")
.set("spark.dynamicAllocation.enabled","true")
.set("spark.shuffle.service.enabled","true"))
sc = SparkContext(conf = conf)
sqlctx = HiveContext(sc)
df = sqlctx.load(
source="jdbc",
url="jdbc:sqlserver://ec2-54-244-44-6.us-west-2.compute.amazonaws.com:1433;database=sales;user=my_username;password=my_password",
dbtable="orders")
## this is how to write to an ORC file
df.write.format("orc").save("/tmp/orc_query_output")
## this is how to write to a hive table
df.write.mode('overwrite').format('orc').saveAsTable("test_table")
... View more
Labels:
08-26-2016
10:58 PM
1 Kudo
I was able to successfully test the CSV reader. Please refer to an article I wrote on this: https://community.hortonworks.com/articles/52866/hive-on-tez-vs-pyspark-for-weblogs-parsing.html
... View more
08-25-2016
02:08 PM
7 Kudos
SYNOPSIS
Which is faster when analyzing data using Spark 1.6.1: HDP with HDFS for storage, or EMR?
Testing shows that HDP using HDFS has performance gains over using EMR. HDP/HDFS outperforms EMR by 46% when tested against 1 full day of 37 GB Clickstream (Web) data. HDP/HDFS EMR Time Elapsed 3 mins, 29 sec 5 mins, 5 sec
* See below at end of article for validation and screen prints showing Resource Manager logs
HDP
Hortonworks Data Platform (HDP) is the industry's only true secure, enterprise-ready open source Apache Hadoop distribution. The Hadoop Distributed File System (HDFS) is a Java-based distributed block storage file system that is used to store all data in HDP.
EMR
Amazon Elastic MapReduce (Amazon EMR) is a managed Hadoop framework to distribute and process vast amounts of data across dynamically scalable Amazon EC2 instances.
S3 is an inexpensive object store that can theoretically scale out infinitely without the limitations inherent to a hierarchical block storage file system.
Objects are not stored in file systems; instead, users create objects and associate keys with them.
Object storage also has the option of tagging metadata with your data.
TEST
Spark (PySpark) using DataFrames to get a Count of Page Views by Operating System (Desktop and Mobile OS types) against a full day of Clickstream data (24 hours) and listing the top 20 most used operating systems. Ran the same Spark code against an AWS HDP cluster on EC2 instances with data stored in HDFS and against an AWS EMR cluster.
Test Data
COMPANY X is a global online marketplace connecting consumers with merchants. Total data size is 37 GB. 1 Full day of page views data (24 hours of Clickstream logs). 22.3 Million page view records from 13 countries in North America, Latin America, and Asia. Data is in JSON format and uncompressed. 143 files totaling 37 GB. Each file averages 256 MB. All 143 source JSON files were placed into HDFS on HDP and into S3 on EMR.
Platform Versions
HDP 2.3.0 - Hadoop version 2.7.1 EMR 4.5.0 - Hadoop version 2.7.2
AWS HDP and EMR Clusters were sized/configured similarly
m4.2xlarge Instances 1 master and 4 worker nodes
TEST RESULTS
Spark 1.6.1 on HDP/HDFS outperformed Spark 1.6.1 on EMR 46% Total elapsed time for HDP/HDFS: 3 minutes 29 seconds Total elapsed time for EMR: 5 minutes 5 seconds
TESTING VALIDATION
Sample JSON record
Total disk usage in HDFS consumed by all files is 37 G
Source data consists of 143 JSON files. Each file averages 256 MB for a total data volume of 37 GB
Output produced. Operating system and total page view count:
HDP Resource Manager log
EMR Resource Manager log
... View more
Labels:
08-22-2016
08:27 PM
7 Kudos
Synopsis Both Pig and Spark (PySpark) excel at iterative data processing against weblogs data in text delimited format. Is one faster than the other?
Pig on Tez shows to outperform PySpark PySpark using the Databricks CSV parser shows to outperform Pig on Tez Pig
Apache Pig allows users to MapReduce transformations using a simple scripting language called Pig Latin. Pig translates the Pig Latin script into MapReduce so that it can be executed within YARN against a dataset stored in the Hadoop Distributed File System (HDFS) Apache Tez is a distributed data processing framework for building batch and interactive applications. Tez improves the MapReduce paradigm by dramatically improving its speed, while maintaining MapReduce’s ability to scale to petabytes of data. Pig on Tez excels at iterative data processing. It is fast, easy, and the optimal solution to implement when it comes to parsing weblogs. Spark
General purpose in-memory distributed data processing engine
API’s to write code in Java, Scala, or Python
Libraries for SQL, Streaming, Machine Learning, and Graph Processing
Excellent for iterative data processing and a good choice for parsing weblogs Test
Working with a customer who is parsing tab delimited text weblogs data consisting of 11 fields. Sample record: 2015-02-12 15:00:26 198.102.62.250 GET /origin-services.failover.arcgisonline.com.akadns.net/ArcGIS/rest/services/World_Street_Map/MapServer/tile/6.386012949206202/50/25 200 2649 0 "http://www.DOMAIN.com" "Mozilla/5.0 (Linux; U; Android 4.1.2; es-us; GT-N8010 Build/JZO54K) AppleWebKit/534.30 (KHTML, like Gecko) Version/4.0 Safari/534.30" -
Their pig script is using Regular Expressions to parse out the individual fields
Wrote a PySpark script to parse out the weblogs data
Wrote another PySpark script using the Databricks CSV parsing library to parse out he weblogs data
Test Results Data Volume Hive on Tez PySpark PySpark using Databricks CSV parsing library 75 Million records across 15 files in HDFS totaling 21.5 GB in size 1mins, 51 sec 2mins, 13 sec
1mins, 21 sec
150 Million records across 30 files in HDFS totaling 43.0 G in size 2mins, 53 sec
3mins, 43 sec
2mins, 2sec
Conclusion Pig on Tez wins over PySpark. Native out of the box functionality. PySpark using the Databricks CSV parsing library beats Pig on Tez; however, this is not native functionality and requires the overhead of maintaining an external JAR file developed by a third party. Code Pig, PySpark, and PySpark using Databricks CSV parsing library are all below. Pig
TEXT_LOGS = load '/data/weblogs/*.dat' using TextLoader AS (line:chararray);
set job.name 'parse_weblogs_pig'
VALID_LOGS = FILTER TEXT_LOGS by line matches '^\\s*\\w.*';
LOGS = FOREACH VALID_LOGS GENERATE
FLATTEN (
REGEX_EXTRACT_ALL(line, '^(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+"([^"]*)"\\s+([-]|"[^"]*")\\s+"?([^"]*)"?')
)
AS (
event_date: chararray,
event_time: chararray,
cs_ip: chararray,
cs_method: chararray,
cs_uri: chararray,
sc_status: chararray,
sc_bytes: chararray,
time_taken: chararray,
cs_referer: chararray,
cs_useragent: chararray,
cs_cookie: chararray
);
TLOGS = FOREACH LOGS GENERATE
event_date,
event_time,
cs_ip,
cs_method,
cs_uri,
(int)sc_status,
(int)sc_bytes,
time_taken,
cs_referer,
cs_useragent,
cs_cookie
;
STORE TLOGS into 'weblogs_data_parsed_pig' USING org.apache.hive.hcatalog.pig.HCatStorer();
PySpark #!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import os
from pyspark.sql import *
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql.types import *
## write to snappy compressed output
conf = (SparkConf()
.setAppName("parse_weblogs_spark")
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.sql.parquet.compression.codec", "uncompressed")
.set("spark.shuffle.compress", "true")
.set("spark.io.compression.codec", "snappy")
.set("spark.executor.instances", "60")
.set("spark.executor.cores", 10)
.set("spark.executor.memory", "20g"))
sc = SparkContext(conf = conf)
sqlContext = HiveContext(sc)
## read text file
## and parse out fields needed
## file is tab delimited
path = "/data/weblogs/*.dat"
lines = sc.textFile(path)
parts = lines.map(lambda l: l.split("\t"))
weblog_hits = parts.map(lambda o: Row(event_date=o[0], event_time=o[1], cs_ip=o[2], cs_method=o[3], cs_uri=o[4],sc_status=o[5],sc_bytes=o[6],time_taken=o[7],cs_referer=o[8],cs_useragent=o[9],cs_cookie=o[10]))
schemaHits = sqlContext.createDataFrame(weblog_hits)
schemaHits.write.format("parquet").save("/tmp/parquet_query_output")
PySpark with Databricks CSV parsing library You will need to get the JAR file containing the CSV parsing library: https://github.com/databricks/spark-csv When you submit the spark job, pass in the package: $SPARK_HOME/bin/spark-submit --master yarn-client --packages com.databricks:spark-csv_2.11:1.4.0 my_python_script.py #!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import os
from pyspark.sql import *
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql.types import *
conf = (SparkConf()
.setAppName("parse_weblogs_spark")
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.sql.parquet.compression.codec", "uncompressed")
.set("spark.shuffle.compress", "true")
.set("spark.io.compression.codec", "snappy")
.set("spark.executor.instances", "60")
.set("spark.executor.cores", 10)
.set("spark.executor.memory", "20g"))
sc = SparkContext(conf = conf)
sqlContext = HiveContext(sc)
customSchema = StructType([ \
StructField("event_date", StringType(), True), \
StructField("event_time", StringType(), True), \
StructField("cs_ip", StringType(), True), \
StructField("cs_method", StringType(), True), \
StructField("cs_uri", StringType(), True), \
StructField("sc_status", IntegerType(), True), \
StructField("sc_bytes", IntegerType(), True), \
StructField("time_taken", StringType(), True), \
StructField("cs_referer", StringType(), True), \
StructField("cs_useragent", StringType(), True), \
StructField("cs_cookie", StringType(), True)])
df = sqlContext.read \
.format('com.databricks.spark.csv') \
.options(header='false', delimiter='\t') \
.load('/data/weblogs/*.dat', schema = customSchema)
df.saveAsTable("weblogs_parsed_spark")
... View more
Labels:
08-08-2016
09:15 PM
use kyro when working with RDD's. prob won't help with DatFrames. I never used kyro with DataFrames. maybe you can test and post your results
... View more