Member since
02-26-2016
100
Posts
111
Kudos Received
12
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2084 | 05-04-2017 12:38 PM | |
4050 | 03-21-2017 06:18 PM | |
13597 | 03-21-2017 01:14 AM | |
4412 | 02-14-2017 06:21 PM | |
7129 | 02-09-2017 03:49 AM |
05-04-2017
12:38 PM
Backup: 1. The Hive Metastore - This is in an RDBMS. MySQL by default. The RDBMS will have tools for backup. - Metastore contains metadata, table partition info, DDL, info about tables 2. The data - This is in HDFS - You can use DistCp to copy the data to another cluster
... View more
03-22-2017
01:30 AM
If your using HDFS just to land tables (rows and columns) extracted from an RDBMS via Sqoop, then just store it as raw text if your looking for speed. Compress it if your concerned about space in HDFS. Use Avro if you want a schema for the data. I would use Parquet for the final Hive table if the query access patterns are to select only a few columns and do aggregations. If the query access patterns are to select all the columns, then a columnar format such as Parquet would not be needed. What type of analysis would you do on the files using Spark? Spark has a lot of optimizations for Parquet. Not only can Spark quickly parse and process data in Parquet files, Spark can also push filtering down to the disk layer via Predicate Pushdown Optimization. Spark can also process text files very quickly via the CSV parser from Databricks.
... View more
03-21-2017
06:47 PM
I tested the above code with Spark 1.6.2. I have not tested with Spark 2.0. I see your using Spark 2.0. Spark 2.0 uses the SparkSession which replaces both HiveContext and SQLContext. You will have to use SparkSession. See below. After creating the SparkSession, like how I do below, I don't know off hand if the following will work: 'spark.load(
source="jdbc",
url="jdbc:sqlserver://ec2-54-244-44-6.us-west-2.compute.amazonaws.com:1433;database=sales;user=my_username;password=my_password",
dbtable="orders")' from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("my_application_name") \ .config("spark.shuffle.service.enabled","true") \ .config("spark.dynamicAllocation.enabled","true") \ .getOrCreate()
... View more
03-21-2017
06:18 PM
1 Kudo
If your schema is changing from hour to hour, then maybe try this: 1. Use Spark with the CSV reader from DataBricks to process the data. The CSV reader can automatically infer the schema. 2. Write the DataFrame to HBase. With HBase you don't need a schema defined and each row can have varying number of columns. When you are ready to analyze the data in HBase you can use Apache Phoenix to create a schema atop the HBase table. 3. You could even check the number of columns in the DataFrame and then route to a Hive table based on the number of columns. For example, if I count 7 fields, then route to table A, if 10 fields, then table B. Hive has a fixed number of columns, whereas HBase does not.
... View more
03-21-2017
04:30 AM
Performance will be optimized when you colocate storage and compute - i.e. store in HDFS on a cluster using direct attached storage and access via Spark or Hive. In this type of architecture Spark or Hive on HDFS will outperform Spark or Hive on S3. The data is not traveling far when reading from disk and loading into memory. On the other hand, there are economies of scale with S3 (object store) that outweigh the performance benefits of a block storage file system like HDFS. Doing analysis directly on the data in HDFS will give better performance. In fact, I have even tested this. Please see here: https://community.hortonworks.com/content/kbentry/53271/performance-of-spark-on-hdphdfs-vs-spark-on-emrs3.html You could use Sqoop or Spark to extract from a relational database and store in S3. Both will work. Sqoop by default will launch 4 parallel tasks (mappers) against the source table. With Spark you could do something like local mode and specify the number of threads to use, local[n], or use as many threads as the number of processors available to the JVM, local[*]. Apache NiFi is something that could also work. There are processors to extract from a relational database either single threaded or multi-threaded and write to S3. With NiFi you create a DataFlow via a GUI for rapid application development.
... View more
03-21-2017
01:14 AM
If your data is already in S3, then I would suggest you use Spark to analyze it. You can use the S3A filesystem client to process the data from S3 and use SparkSQL for SQL analysis on the data, similar to how you would with Hive. Using S3 over HDFS is for very specific use cases: 1. You want very cheap storage 2. You don't have a long running Hadoop cluster with HDFS 3. You want to access the data anytime and from anywhere The S3A filesystem client provides very fast access to data in S3 buckets. The Hortonworks Data Cloud in the AWS marketplace has an enhanced S3A filesystem client for even faster performance. There are different types of clusters you can spin up in the Hortonworks Data Cloud based on your workload: ETL, Data Warehouse, or Data Science. These clusters are pre-configured and tuned to run Apache Spark and Hive so you don't have to setup. Just click on which cluster type you want, your EC2 instance types, and the cluster is launched for you. You use S3 storage with Hortonworks Data Cloud to separate storage from compute since clusters launched in the Hortonworks Data Cloud are meant to be on-demand/ephemeral clusters and not long running.
... View more
03-17-2017
03:44 PM
3 Kudos
This tutorial will demonstrate how to execute PySpark jobs on an HDP cluster and pass in parameter values using the Livy REST interface. I will also demonstrate how to interact with Livy via Apache Zeppelin and use forms in Zeppelin to pass in parameter values. Livy is an open source REST interface for interacting with Spark.
Livy requires at least Spark 1.6.
By default, Livy is built against Apache Spark 1.6.2 with HDP 2.5.3
If on HDP 2.4.2, then Livy 0.2.0 might be part of HDP 2.4.2 repos. If not, then it can be cloned from: https://github.com/cloudera/livy
Livy is built against Apache Spark 1.6.2 with HDP 2.5.3.
The following was tested with HDP 2.5.3 and Spark 1.6.2.
Business Use Case:
A PySpark script that resides on the Livy server aggregates order statistics from a Hive table for a date range passed in as parameters to the Python script.
The Start and End dates are passed in as parameter values to the Python script.
The job runs on the cluster via YARN and produces the following output:
Start Date
End Date
City
Min Order Amt
Max Order Amt
Avg Order Amt
2017-02-06
2017-02-15
Chicago
54.23
788.23
354.23
2017-02-06
2017-02-15
San Francisco
98.23
1032.12
634.12
Users can execute the PySpark script via Livy using curl from a terminal window, Apache Zeppelin, or other applications.
Execute PySpark script on HDP cluster via Livy using curl
from pyspark import SparkContext, HiveContext
import sys
sc = SparkContext('yarn-client')
spark = HiveContext(sc)
start_date=sys.argv[1]
end_date=sys.argv[2]
sql = """
select '%s' start_date, '%s' as end_date, city, min(order_amt) as min_order_amount, max(order_amt) as max_order_amount, avg(order_amt) as average_order_amount from orders where
to_date(order_date) between '%s' and '%s' group by city order by city
""" %(start_date,end_date,start_date,end_date)
spark.sql(sql).show()
How to call PySpark Script:
The PySpark script is a regular file on the Livy server. Notice I’m setting the location of the file with ‘file:’
Livy allows for impersonation to create a shell running as a different user. This is the setting ‘proxyUser’. Notice that I’m running the script as ‘test_livy_impersonation’
All the properties supported by Spark such as the number of executors, number of cores, executor memory, etc., can be changed at session creation. Their format is the same as when executing via spark-submit. Notice that I’m setting the number of executor cores to 4.
I’m passing parameters to the script by setting values for ‘args’
Command to run script:
curl -X POST -H "Content-Type: application/json"ec2-xxxxxx.compute.amazonaws.com:8998/batches --data '{"file": "file:/tmp/test.py", "name": "Livy PySpark Example", "executorCores":4, "proxyUser": "test_livy_impersonation","args":["2017-01-15","2017-01-19"]}' | python -m json.tool
*Note: The default Livy port will be 8998
After executing the above command, you will get the following output which gives you the session ID number:
Get output:
Depending on the complexity of processing and availability of resources on the cluster, the job completion could take some time.
Take the session ID from the output above (33) and run the following command to get the results from the log file:
curl ec2-xxxxxx.us-east-2.compute.amazonaws.com:8998/batches/35/log | python -m json.tool
*Note: Replace 35 above with your session ID
Results from the log file:
Zeppelin with Livy and pass in parameter values via Zeppelin forms
At a minimum, configure Zeppelin’s Livy server interpreter and set ‘zeppelin.livy.url’ to your Livy server:
Also, set livy.spark.master to YARN if you want jobs to run on YARN:
If your cluster is Kerberized, then you will need to set values for ‘zeppelin.livy.keytab’ and ‘zeppelin.livy.principal’:
Sample output from my Hive table using Spark Sql interpreter (%sql):
You can access the data in this Hive table using PySpark via the Livy interpreter (%livy.pyspark) with input forms that pass parameter values to your PySpark script:
In your PySpark code enclose parameters with ‘${parameter_name}’. In the example below I am passing in start date and end date as {$start_date} and ${end_date}.
Input boxes will appear to enter values
This also works with the Spark SQL (%sql) interpreter:
... View more
Labels:
03-17-2017
02:54 PM
Have you used the dfsadmin command before? This will tell you dfs remaining and used: hdfs dfsadmin -report
... View more
02-14-2017
06:21 PM
2 Kudos
@Bala Vignesh N V A 'SELECT *' behaves differently on Text vs ORC for certain conditions: 1. On both ORC and Text, no MR or Tez job will be launched if you SELECT all columns: "SELECT * FROM my_table". MR or Tez will just read from the file without launching any mappers. 2. On Text, a MR or Tez job will be launched if you add a WHERE condition: "SELECT * FROM my_table WHERE 'some_condition'". Mappers will be launched for Text formatted data. 3. On ORC, no MR or Tez job will be launched if you add a WHERE condition: "SELECT * FROM my_table WHERE 'some_condition'". No Mappers will be launched for ORC formatted data. Thus, ORC performs better than Text in those cases in which you submit a WHERE condition. 4. On both ORC and Text, no MR or Tez job will be launched if you SELECT a few columns: "SELECT column1, column2 FROM my_table". MR or Tez will just read from the file without launching any mappers. 5. For any aggregations on a few columns, MR or Tez job will be launched and ORC results in less I/O than Text because only those columns are being read
... View more
02-10-2017
02:41 AM
@Dinesh Das - The code in that article is done using PySpark and using Spark 2.1. It's working code. What version of Spark are you using? I see that your using Scala. If you are using Spark version 2 or above, did you create a SparkSession? If an earlier version of Spark, did you create a SQLContext?
... View more