Member since
02-26-2016
100
Posts
111
Kudos Received
12
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2084 | 05-04-2017 12:38 PM | |
4050 | 03-21-2017 06:18 PM | |
13597 | 03-21-2017 01:14 AM | |
4412 | 02-14-2017 06:21 PM | |
7130 | 02-09-2017 03:49 AM |
07-27-2023
07:42 AM
is this solution is fit in streaming more than puthive3ql for about 10 GB during the day???
... View more
03-10-2022
11:22 PM
I have hive table which has 16 billion rows daily insert into hive table as parquet and snappy compression, can you help on optimizing the data ingestion part
... View more
06-12-2020
08:41 AM
Hello, Does COBRIX support Python ? I see only Scala api's..at https://github.com/AbsaOSS/cobrix Please advice. Thanks Sreedhar Y
... View more
09-27-2017
10:12 AM
Hadoop Connector Guide provides a brief introduction on cloud connectors and its features. The guide provides detailed information on how to set up the connector and run Data Synchronization tasks. The guide provides an overview of supported features and task operations that can be performed using Hadoop Connector. Docs for Hadoop connector for Informatica: https://kb.informatica.com/proddocs/Product%20Documentation/6/IC_Spring2017_HadoopConnectorGuide_en.pdf
... View more
05-26-2017
02:07 PM
@Saransh Sharma In addition to @Binu Mathew If you don't want to take separate backups, and there are limited table to take, even you can use Hive Import/Export option. https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ImportExport
... View more
03-23-2017
10:27 AM
Hi @Binu Mathew Thanks for your answer. I'll dive into this approach & post further if/when required. Thanks! Christoohe
... View more
03-17-2017
03:44 PM
3 Kudos
This tutorial will demonstrate how to execute PySpark jobs on an HDP cluster and pass in parameter values using the Livy REST interface. I will also demonstrate how to interact with Livy via Apache Zeppelin and use forms in Zeppelin to pass in parameter values. Livy is an open source REST interface for interacting with Spark.
Livy requires at least Spark 1.6.
By default, Livy is built against Apache Spark 1.6.2 with HDP 2.5.3
If on HDP 2.4.2, then Livy 0.2.0 might be part of HDP 2.4.2 repos. If not, then it can be cloned from: https://github.com/cloudera/livy
Livy is built against Apache Spark 1.6.2 with HDP 2.5.3.
The following was tested with HDP 2.5.3 and Spark 1.6.2.
Business Use Case:
A PySpark script that resides on the Livy server aggregates order statistics from a Hive table for a date range passed in as parameters to the Python script.
The Start and End dates are passed in as parameter values to the Python script.
The job runs on the cluster via YARN and produces the following output:
Start Date
End Date
City
Min Order Amt
Max Order Amt
Avg Order Amt
2017-02-06
2017-02-15
Chicago
54.23
788.23
354.23
2017-02-06
2017-02-15
San Francisco
98.23
1032.12
634.12
Users can execute the PySpark script via Livy using curl from a terminal window, Apache Zeppelin, or other applications.
Execute PySpark script on HDP cluster via Livy using curl
from pyspark import SparkContext, HiveContext
import sys
sc = SparkContext('yarn-client')
spark = HiveContext(sc)
start_date=sys.argv[1]
end_date=sys.argv[2]
sql = """
select '%s' start_date, '%s' as end_date, city, min(order_amt) as min_order_amount, max(order_amt) as max_order_amount, avg(order_amt) as average_order_amount from orders where
to_date(order_date) between '%s' and '%s' group by city order by city
""" %(start_date,end_date,start_date,end_date)
spark.sql(sql).show()
How to call PySpark Script:
The PySpark script is a regular file on the Livy server. Notice I’m setting the location of the file with ‘file:’
Livy allows for impersonation to create a shell running as a different user. This is the setting ‘proxyUser’. Notice that I’m running the script as ‘test_livy_impersonation’
All the properties supported by Spark such as the number of executors, number of cores, executor memory, etc., can be changed at session creation. Their format is the same as when executing via spark-submit. Notice that I’m setting the number of executor cores to 4.
I’m passing parameters to the script by setting values for ‘args’
Command to run script:
curl -X POST -H "Content-Type: application/json"ec2-xxxxxx.compute.amazonaws.com:8998/batches --data '{"file": "file:/tmp/test.py", "name": "Livy PySpark Example", "executorCores":4, "proxyUser": "test_livy_impersonation","args":["2017-01-15","2017-01-19"]}' | python -m json.tool
*Note: The default Livy port will be 8998
After executing the above command, you will get the following output which gives you the session ID number:
Get output:
Depending on the complexity of processing and availability of resources on the cluster, the job completion could take some time.
Take the session ID from the output above (33) and run the following command to get the results from the log file:
curl ec2-xxxxxx.us-east-2.compute.amazonaws.com:8998/batches/35/log | python -m json.tool
*Note: Replace 35 above with your session ID
Results from the log file:
Zeppelin with Livy and pass in parameter values via Zeppelin forms
At a minimum, configure Zeppelin’s Livy server interpreter and set ‘zeppelin.livy.url’ to your Livy server:
Also, set livy.spark.master to YARN if you want jobs to run on YARN:
If your cluster is Kerberized, then you will need to set values for ‘zeppelin.livy.keytab’ and ‘zeppelin.livy.principal’:
Sample output from my Hive table using Spark Sql interpreter (%sql):
You can access the data in this Hive table using PySpark via the Livy interpreter (%livy.pyspark) with input forms that pass parameter values to your PySpark script:
In your PySpark code enclose parameters with ‘${parameter_name}’. In the example below I am passing in start date and end date as {$start_date} and ${end_date}.
Input boxes will appear to enter values
This also works with the Spark SQL (%sql) interpreter:
... View more
Labels:
02-09-2017
03:30 AM
2 Kudos
BACKGROUND
The modern Data Warehouse contains a heterogenous mix of data: delimited text files, data in Hadoop (HDFS/Hive), relational databases, NoSQL databases, Parquet, Avro, JSON, Geospatial data, and more. ETL (Extract-Transform-Load) is a process used to integrate these disparate data types and create a unified view of the data. Apache Spark is a fast general purpose distributed computation engine for fault-tolerant parallel data processing. Spark is an excellent choice for ETL:
Works with a myriad of data sources: files, RDBMS's, NoSQL, Parquet, Avro, JSON, XML, and many more.
Write your ETL code using Java, Scala, or Python.
In-memory computing for fast data processing.
API's to easily create schemas for your data and perform SQL computations.
Distributed computing and fault-tolerance is built into the framework and abstracted from the end-user.
This article will demonstrate how easy it is to use Spark with the Python API (PySpark) for ETL processing to join data from text files with data in a MySQL table.
SOURCE DATA
Pipe (|) delimited text files stored in HDFS containing information about customer orders:
Customer table in MySQL containing information about customers:
We need to join the data from the orders files in HDFS with the customer data in MySQL so that we produce a new file in HDFS displaying customers names to orders. The new file will be stored as an ORC formatted file. ORC is a columnar file format that provides high data compression and fast access for data analysis. Conceptually, the output will look like the screenshot below, but this is not how the data is stored. I'm only showing this to visualize the joined data set. ORC is columnar storage and not row storage.
CODE
The code below was tested using Spark 2.1. Beginning in Spark 2.0 the SQLContext and HiveContext have been replaced with SparkSession; however, both SQLContext and HiveContext are preserved for backwards compatibility. Notice in my code below that I'm using SparkSession.
To run the code below you will need the CSV parsing library from DataBricks: https://github.com/databricks/spark-csv
If your working with large data volumes, then I recommend that you use the CSV parsing library from DataBricks as it will be much faster than using your typical lambda function to parse out fields from a text file. For a performance evaluation, please refer to: https://community.hortonworks.com/content/kbentry/52866/hive-on-tez-vs-pyspark-for-weblogs-parsing.html
Run the code using:
spark-submit --master yarn --deploy-mode cluster --packages com.databricks:spark-csv_2.11:1.4.0,mysql:mysql-connector-java:5.1.38 my_script.py
from pyspark.sql import SparkSession
from pyspark.sql.types import *
## create a SparkSession
spark = SparkSession .builder .appName("join_file_with_mysql") .config("spark.shuffle.service.enabled","true") .config("spark.dynamicAllocation.enabled","true") .config("spark.executor.cores","5") .getOrCreate()
## create a schema for the orders file
customSchema = StructType([ StructField("order_number", StringType(), True), StructField("customer_number", StringType(), True), StructField("order_quantity", StringType(), True), StructField("unit_price", StringType(), True), StructField("total", StringType(), True)])
## create a DataFrame for the orders files stored in HDFS
df_orders = spark.read .format('com.databricks.spark.csv') .options(header='true', delimiter='|') .load('hdfs:///orders_file/', schema = customSchema)
## register the orders data as a temporary table
df_orders.registerTempTable("orders")
## create a DataFrame for the MySQL customer table
df_customers = spark.read.format("jdbc").options(
url ="jdbc:mysql://bmathew-test.czrx1al336uo.ap-northeast-1.rds.amazonaws.com:3306/test",
driver="com.mysql.jdbc.Driver",
dbtable="customer",
user="root",
password="password"
).load()
## register the customers data as a temporary table
df_customers.registerTempTable("customers")
## join the DataSets
sql='''
SELECT a.first_name, a.last_name, b.order_number, b.total
FROM customers a, orders b
WHERE a.customer_number = b.customer_number
'''
output = spark.sql(sql)
## save the data into an ORC file
output.write.format("orc").save("/tmp/customer_orders")
... View more
Labels:
03-20-2017
07:14 PM
Hi Reddy.. Choose a delimiter which will not used easily in a data. Choose unicode as delimiter it will solve your issue. 90% of the data will not contain unicode. (row format delimited Fields terminated by '/u0001') . In your case export the the data with '/u0001' as delimiter and then insert into a hive table which has delimiter as '|'
... View more