Member since
06-02-2020
331
Posts
64
Kudos Received
49
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1028 | 07-11-2024 01:55 AM | |
2959 | 07-09-2024 11:18 PM | |
2542 | 07-09-2024 04:26 AM | |
1927 | 07-09-2024 03:38 AM | |
2196 | 06-05-2024 02:03 AM |
08-30-2021
11:24 PM
Hi @Seaport Yes it is required when if you want to run Python UDFs or do something outside spark SQL operations in your application. If you are just using the Spark SQL API there’s no runtime requirement for python. If you are going to install Spark3, please check below supported versions: Spark 2.4 supports Python 2.7 and 3.4-3.7. Spark 3.0 supports Python 2.7 and 3.4 and higher, although support for Python 2 and 3.4 to 3.5 is deprecated. Spark 3.1 supports Python 3.6 and higher. CDS Powered by Apache Spark requires one of the following Python versions: Python 2.7 or higher, when using Python 2. Python 3.4 or higher, when using Python 3. (CDS 2.0 only supports Python 3.4 and 3.5; CDS 2.1 and higher include support for Python 3.6 and higher). Python 3.4 or higher, when using Python 3 (CDS 3). Note: Spark 2.4 is not compatible with Python 3.8. The latest version recommended is Python 3.4+ (https://spark.apache.org/docs/2.4.0/#downloading). The Apache Jira SPARK-29536 related to Python 3.8 is fixed in Spark3.
... View more
08-30-2021
07:52 PM
Hi @Sbofa Yes you are right. Based on kind it will decide which kind of spark shell needs to start.
... View more
08-25-2021
08:51 PM
1 Kudo
In this tutorial, we will learn how to create Apache Ozone volumes, buckets, and keys. After that, we will see how we can access Apache Ozone data in Apache Spark.
Ozone
Create the volume with the name vol1 in Apache Ozone. # ozone sh volume create /vol1
21/08/25 06:23:27 INFO rpc.RpcClient: Creating Volume: vol1, with root as owner.
Create the bucket with the name bucket1 under vol1. # ozone sh bucket create /vol1/bucket1
21/08/25 06:24:09 INFO rpc.RpcClient: Creating Bucket: vol1/bucket1, with Versioning false and Storage Type set to DISK and Encryption set to false
Create the employee.csv file to upload to Ozone. # vi /tmp/employee.csv
id,name,age
1,Ranga,33
2,Nishanth,4
3,Raja,60
Upload the employee.csv file to Ozone # ozone sh key put /vol1/bucket1/employee.csv /tmp/employee.csv
Add the fs.o3fs.impl property to core-site.xml
Go to Cloudera Manager > HDFS > Configuration > search for core-site.xml > Cluster-wide Advanced Configuration Snippet (Safety Valve) for core-site.xml <property>
<name>fs.o3fs.impl</name>
<value>org.apache.hadoop.fs.ozone.OzoneFileSystem</value>
</property>
Display the files created earlier using 'hdfs' command. Note: Before running the following command, update the om-host.example.com value. hdfs dfs -ls o3fs://bucket1.vol1.om-host.example.com/
Spark
Launch spark-shell spark spark-shell
Run the following command to print the employee.csv file content. Note: Update the omHost value. scala> val omHost="om.host.example.com"
scala> val df=spark.read.option("header", "true").option("inferSchema", "true").csv(s"o3fs://bucket1.vol1.${omHost}/employee.csv")
scala> df.show()
+---+--------+---+
| id| name|age|
+---+--------+---+
| 1| Ranga| 33|
| 2|Nishanth| 4|
| 3| Raja| 60|
+---+--------+---+
Kerberized environment
Pre-requisites:
Create a user and provide proper Ranger permissions to create Ozone volume and buckets, etc.
kinit with the user
Steps:
Create Ozone volumes, buckets, and keys mentioned in Ozone section.
Launch spark-shell
Replace the KEY_TAB, PRINCIPAL, and om.host.example.com in spark-shell spark-shell \
--keytab ${KEY_TAB} \
--principal ${PRINCIPAL} \
--conf spark.yarn.access.hadoopFileSystems=o3fs://bucket1.vol1.om.host.example.com:9862 Note: In a Kerberized environment, mandatorily, we need to specify the spark.yarn.access.hadoopFileSystems configuration, otherwise, it will display the following error: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]
Run the following command to print the employee.csv file content. Note: Update the omHost value. scala> val omHost="om.host.example.com"
scala> val df=spark.read.option("header", "true").option("inferSchema", "true").csv(s"o3fs://bucket1.vol1.${omHost}/employee.csv")
scala> df.show()
+---+--------+---+
| id| name|age|
+---+--------+---+
| 1| Ranga| 33|
| 2|Nishanth| 4|
| 3| Raja| 60|
+---+--------+---+
scala> val age30DF = df.filter(df("age") > 30)
scala> val outputPath = s"o3fs://bucket1.vol1.${omHost}/employee_age30.csv"
scala> age30DF.write.option("header", "true").mode("overwrite").csv(outputPath)
scala> val df2=spark.read.option("header", "true").option("inferSchema", "true").csv(outputPath)
scala> df2.show()
+---+-----+---+
| id| name|age|
+---+-----+---+
| 1|Ranga| 33|
| 3| Raja| 60|
+---+-----+---+
Note: If you get the java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.ozone.OzoneFileSystem not found, add the /opt/cloudera/parcels/CDH/jars/hadoop-ozone-filesystem-hadoop3-*.jar to spark class path using --jars option.
Thanks for reading this article. If you liked this article, you can give kudos.
... View more
Labels:
08-24-2021
02:22 AM
In this article, we will learn how to register a Hive UDFs using Spark HiveWarehouseSession.
Download and build the Spark Hive UDF example. git clone https://github.com/rangareddy/spark-hive-udf
cd spark-hive-udf
mvn clean package -DskipTests
Copy the target/spark-hive-udf-1.0.0-SNAPSHOT.jar to the edge node.
Login to edge node and upload the spark-hive-udf-1.0.0-SNAPSHOT.jar to HDFS location for example, /tmp. hdfs dfs -put ./brickhouse-0.7.1-SNAPSHOT.jar /tmp
Launch the spark-shell with 'hwc' parameters. spark-shell \
--jars /opt/cloudera/parcels/CDH/jars/hive-warehouse-connector-assembly-*.jar \
--conf spark.sql.hive.hiveserver2.jdbc.url='jdbc:hive2://hiveserver2_host1:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2' \
--conf spark.sql.hive.hwc.execution.mode=spark \
--conf spark.datasource.hive.warehouse.metastoreUri='thrift://metastore_host:9083' \
--conf spark.datasource.hive.warehouse.load.staging.dir='/tmp' \
--conf spark.datasource.hive.warehouse.user.name=hive \
--conf spark.datasource.hive.warehouse.password=hive \
--conf spark.datasource.hive.warehouse.smartExecution=false \
--conf spark.datasource.hive.warehouse.read.via.llap=false \
--conf spark.datasource.hive.warehouse.read.jdbc.mode=cluster \
--conf spark.datasource.hive.warehouse.read.mode=DIRECT_READER_V2 \
--conf spark.security.credentials.hiveserver2.enabled=false \
--conf spark.sql.extensions=com.hortonworks.spark.sql.rule.Extensions
Create the HiveWarehouseSession. import com.hortonworks.hwc.HiveWarehouseSession
import com.hortonworks.hwc.HiveWarehouseSession._
val hive = HiveWarehouseSession.session(spark).build()
Execute the following statement to register a Hive UDF. hive.executeUpdate("CREATE FUNCTION uppercase AS 'com.ranga.spark.hive.udf.UpperCaseUDF' USING JAR 'hdfs:///tmp/spark-hive-udf-1.0.0-SNAPSHOT.jar'")
Test the registered function, for example, uppercase. scala> val data1 = hive.executeQuery("select id, uppercase(name), age, salary from employee")
scala> data1.show()
+---+-----------------------+---+---------+
| id|default.uppercase(name)|age| salary|
+---+-----------------------+---+---------+
| 1| RANGA| 32| 245000.3|
| 2| NISHANTH| 2| 345000.1|
| 3| RAJA| 32|245000.86|
| 4| MANI| 14| 45000.0|
+---+-----------------------+---+---------+
Thanks for reading this article.
... View more
Labels:
08-11-2021
08:26 PM
Hi @RonyA If you are a cloudera customer, please create the case we will work on this issue.
... View more
08-08-2021
11:43 PM
1 Kudo
In this article, we will learn to pass atlas-application.properties configuration file from a different location in spark-submit command.
When Atlas service is enabled in CDP, and we run Spark application by default, atlas-application.properties file is picked from /etc/spark/conf.cloudera.spark_on_yarn/ directory.
Let's test with SparkPi example:
spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client /opt/cloudera/parcels/CDH/jars/spark-examples*.jar 10
We can see the following output in the application log.
21/08/23 06:12:03 INFO atlas.ApplicationProperties: Looking for atlas-application.properties in classpath
21/08/23 06:12:03 INFO atlas.ApplicationProperties: Loading atlas-application.properties from file:/etc/spark/conf.cloudera.spark_on_yarn/atlas-application.properties
If we want to pass the atlas-application.properties configuration file from a different location, for example /tmp directory, copy the atlas-application.properties from /etc/spark/conf.cloudera.spark_on_yarn to /tmp directory and pass it using -Datlas.conf=/tmp/ variable in spark-submit.
Let's test with same SparkPi example by adding --driver-java-options="-Datlas.conf=/tmp/" property to the spark-submit.
spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client --driver-java-options="-Datlas.conf=/tmp/" /opt/cloudera/parcels/CDH/jars/spark-examples*.jar 10
We can see the following output in the application log.
21/08/05 14:36:24 INFO atlas.ApplicationProperties: Looking for atlas-application.properties in classpath
21/08/05 14:36:24 INFO atlas.ApplicationProperties: Loading atlas-application.properties from file:/tmp/atlas-application.properties
In order to run the same SparkPi example in cluster mode, we need to place the atlas-application.properties file in all nodes /tmp directory and run the Spark application as follows:
spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster \
--files /tmp/atlas-application.properties#atlas-application.properties --driver-java-options="-Datlas.conf=/tmp/" \
/opt/cloudera/parcels/CDH/jars/spark-examples*.jar 10
or,
sudo -u spark spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster \
--files /tmp/atlas-application.properties --conf spark.driver.extraJavaOptions="-Datlas.conf=./" \
/opt/cloudera/parcels/CDH/jars/spark-examples*.jar 10
We can see the following output:
21/08/23 06:12:07 INFO atlas.ApplicationProperties: Loading atlas-application.properties from file:/data1/tmp/usercache/spark/appcache/application_1629693759177_0016/container_e74_1629693759177_0016_01_000001/./atlas-application.properties
... View more
Labels:
08-05-2021
09:17 PM
Hi @vnandigam Good news. Now Spark Atlas integration is supported using CDP cluster. References: 1. https://docs.cloudera.com/cdp-private-cloud-base/7.1.6/atlas-reference/topics/atlas-spark-metadata-collection.html 2. https://docs.cloudera.com/cdp-private-cloud-upgrade/latest/upgrade-hdp/topics/amb-enable-spark-cm.html
... View more
07-30-2021
08:57 AM
Hi @RonyA You haven't shared what is your dataset size. Apart from data you need to tune on few spark parameters. spark = (SparkSession
.builder.master("yarn")
.config("spark.executor.cores", "5") # you have mentioned 12
.config("spark.num.executors", "10")
.config("spark.executor.memory", "10G")
.config("spark.executor.memoryOverhead", "2G") # executor memory * 0.1 or 0.2 %
.config("spark.driver.memory", "10G")
.config("spark.driver.memoryOverhead", "2G") # driver memory * 0.1 or 0.2 %
.config("spark.sql.hive.convertMetastoreOrc", "true")
.config("spark.executor.heartbeatInterval", "60s") # default 10s
.config("spark.network.timeout", "600s") # default 120s
.config("spark.driver.maxResultSize", "2g")
.config("spark.driver.cores","4")
.config("spark.executor.extraClassPath", "-Dhdp.version=current")
.config("spark.debug.maxToStringFields", 200)
.config("spark.sql.catalogImplementation", "hive")
.config("spark.memory.fraction", "0.8")
.config("spark.memory.storageFraction", "0.2")
.config("spark.sql.hive.filesourcePartitionFileCacheSize", "0")
.config("spark.yarn.maxAppAttempts", "10")
.appName(app_name)
.enableHiveSupport().getOrCreate()) Apart from above if you are doing any kind of wide operation shuffle is involved. To set shuffle value we will use below calculation: spark.sql.shuffle.partitions = shuffle input size/ hdfs block size for example, shuffle input size is 10GB and hdfs block size is 128 MB then shuffle partitions is 10GB/128MB = 80 partitions. And also check you have enabled dynamic allocation or not. You can open Spark UI --> Select Application --> Go to the Environment page --> find spark.dynamicallocation.enabled property.
... View more
07-30-2021
08:47 AM
Hi @BabaHer CDP onward to support Spark and HBase, cloudera is recommended to use hbase-spark jar. https://mvnrepository.com/artifact/org.apache.hbase.connectors.spark/hbase-spark?repo=cloudera-repos The latest hbase-spark jar version is 1.0.0.7.2.10.0-148. To integrate Spark3 with Hbase you can find sample example below: https://kontext.tech/column/spark/628/spark-connect-to-hbase
... View more
07-22-2021
06:01 AM
Hi @SudEl Please try to modify required parameters (memory and other tuning parameters) in spark interpreter.
... View more