Created on 11-16-2017 08:54 PM - edited 09-16-2022 05:32 AM
Hi, I have data in in my Hive table and I want to read that data in scala-spark,do some transformation on few columns and save the processed data into one file. How can this be done please?
Created on 11-17-2017 03:07 AM - edited 11-17-2017 03:21 AM
1. Ensure that the host from where you are running spark-shell or spark2-shell has the corresponding Spark gateway role enabled.
- Login to the CM WebUI
- Go to Spark2/Spark service
- click on the instances tab
- ensure that the Gateway role for host is there. If not, us Add roles to add it.
2. Ensure that you have selected Hive service in the Spark configuration.
- Login to CM WebUI
- Go to Spark2/Spark service
- click on the configuration tab
- in the search box type in hive
- enable the service and redeploy the client and the stale configuration.
3. Once done, open spark shell and the hive context should already be there in the form of sqlContext variable.
The example below shows a very basic SQL query on a hive table 'sample_07' which contains sample employee data with 4 columns. A transformation was applied using filter and then the resultant transformation was saved as a text file in HDFS.
$ spark-shell
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.0
/_/
Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc (master = yarn-client, app id = application_1510839440070_0006).
SQL context available as sqlContext.
scala> sqlContext.sql("show databases").show()
+-------+
| result|
+-------+
|default|
+-------+
scala> sqlContext.sql("show tables").show()
+----------+-----------+
| tableName|isTemporary|
+----------+-----------+
|hive_table| false|
| sample_07| false|
| sample_08| false|
| web_logs| false|
+----------+-----------+
scala> val df_07 = sqlContext.sql("SELECT * from sample_07")
scala> df_07.filter(df_07("salary") > 150000).show()
+-------+--------------------+---------+------+
| code| description|total_emp|salary|
+-------+--------------------+---------+------+
|11-1011| Chief executives| 299160|151370|
|29-1022|Oral and maxillof...| 5040|178440|
|29-1023| Orthodontists| 5350|185340|
|29-1024| Prosthodontists| 380|169360|
|29-1061| Anesthesiologists| 31030|192780|
|29-1062|Family and genera...| 113250|153640|
|29-1063| Internists, general| 46260|167270|
|29-1064|Obstetricians and...| 21340|183600|
|29-1067| Surgeons| 50260|191410|
|29-1069|Physicians and su...| 237400|155150|
+-------+--------------------+---------+------+
scala> df_07.filter(df_07("salary") > 150000).rdd.coalesce(1).saveAsTextFile("/tmp/c")
Note: This might not be the most elegant to store the transformed dataframe, but would work for testing. There are other ways to save the transformation as well and since we are talking about columns and dataframes, you might want to consider saving it as CSV using spark-csv library or even better in parquet format.
Once saved, you can query the resultant file from HDFS and transfer it locally (if needed).
[root@nightly ~]# hdfs dfs -ls /tmp/c
Found 2 items
-rw-r--r-- 2 systest supergroup 0 2017-11-17 02:41 /tmp/c/_SUCCESS
-rw-r--r-- 2 systest supergroup 455 2017-11-17 02:41 /tmp/c/part-00000
[root@nightly511-unsecure-1 ~]# hdfs dfs -cat /tmp/c/part-00000
[11-1011,Chief executives,299160,151370]
[29-1022,Oral and maxillofacial surgeons,5040,178440]
[29-1023,Orthodontists,5350,185340]
[29-1024,Prosthodontists,380,169360]
[29-1061,Anesthesiologists,31030,192780]
[29-1062,Family and general practitioners,113250,153640]
[29-1063,Internists, general,46260,167270]
[29-1064,Obstetricians and gynecologists,21340,183600]
[29-1067,Surgeons,50260,191410]
[29-1069,Physicians and surgeons, all other,237400,155150]
[root@nightly ~]# hdfs dfs -get /tmp/c/part-00000 result.txt
root@nightly ~]# ls
result.txt
Let us know if you've any other questions.
Good Luck!
Created 11-19-2017 08:32 PM
Thank you but what I am loooking for is that:
1: Read the data from underlying Hive table.
2: Store the result of above read into one dataframe.
3: Now apply some basic checks on few columns like append the KM at the last with column X, Also write the sum of one column data into tailer record of the file
4: And than create a file .
Basically some function which read the rows one by one from hive table and apply the checks and than save it to file
Created 12-15-2017 08:40 PM
Created 03-12-2018 06:02 AM
Is it possible to write into a location directly instead of hdfs path ?