Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Super Guru

Yarn exposes cluster application REST API to get the application stats.With the Applications API, you can obtain a collection of resources, each of which represents an application. When you run a GET operation on this resource, you obtain a collection of Application Objects. In this example, we will analyze application stats using Spark-SQL.

Env: HDP 2.5

Spark Version: Spark-2

Data Preparation: get the apps.json while invoking REST API using curl command

curl -v -X GET -H "Content-Type: application/json" http://`hostname`:8088/ws/v1/cluster/apps > /tmp/apps.json

next step is to invoke spark shell and execute the sequence of steps as following

[spark@rkk1 ~]$ spark-shell 
SPARK_MAJOR_VERSION is set to 2, using Spark2
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/12/17 16:01:52 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://172.26.81.127:4040
Spark context available as 'sc' (master = local[*], app id = local-1481990510817).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.0.2.5.0.0-1133
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_77)
Type in expressions to have them evaluated.
Type :help for more information.


scala> val apps = spark.sqlContext.read.json("file:///tmp/apps.json")
16/12/17 16:02:02 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
apps: org.apache.spark.sql.DataFrame = [apps: struct<app: array<struct<allocatedMB:bigint,allocatedVCores:bigint,amContainerLogs:string,amHostHttpAddress:string,amNodeLabelExpression:string,applicationTags:string,applicationType:string,clusterId:bigint,clusterUsagePercentage:double,diagnostics:string,elapsedTime:bigint,finalStatus:string,finishedTime:bigint,id:string,logAggregationStatus:string,memorySeconds:bigint,name:string,numAMContainerPreempted:bigint,numNonAMContainerPreempted:bigint,preemptedResourceMB:bigint,preemptedResourceVCores:bigint,priority:bigint,progress:double,queue:string,... 9 more fields>>>]

scala> apps.printSchema
root
 |-- apps: struct (nullable = true)
 |    |-- app: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- allocatedMB: long (nullable = true)
 |    |    |    |-- allocatedVCores: long (nullable = true)
 |    |    |    |-- amContainerLogs: string (nullable = true)
 |    |    |    |-- amHostHttpAddress: string (nullable = true)
 |    |    |    |-- amNodeLabelExpression: string (nullable = true)
 |    |    |    |-- applicationTags: string (nullable = true)
 |    |    |    |-- applicationType: string (nullable = true)
 |    |    |    |-- clusterId: long (nullable = true)
 |    |    |    |-- clusterUsagePercentage: double (nullable = true)
 |    |    |    |-- diagnostics: string (nullable = true)
 |    |    |    |-- elapsedTime: long (nullable = true)
 |    |    |    |-- finalStatus: string (nullable = true)
 |    |    |    |-- finishedTime: long (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- logAggregationStatus: string (nullable = true)
 |    |    |    |-- memorySeconds: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- numAMContainerPreempted: long (nullable = true)
 |    |    |    |-- numNonAMContainerPreempted: long (nullable = true)
 |    |    |    |-- preemptedResourceMB: long (nullable = true)
 |    |    |    |-- preemptedResourceVCores: long (nullable = true)
 |    |    |    |-- priority: long (nullable = true)
 |    |    |    |-- progress: double (nullable = true)
 |    |    |    |-- queue: string (nullable = true)
 |    |    |    |-- queueUsagePercentage: double (nullable = true)
 |    |    |    |-- runningContainers: long (nullable = true)
 |    |    |    |-- startedTime: long (nullable = true)
 |    |    |    |-- state: string (nullable = true)
 |    |    |    |-- trackingUI: string (nullable = true)
 |    |    |    |-- trackingUrl: string (nullable = true)
 |    |    |    |-- unmanagedApplication: boolean (nullable = true)
 |    |    |    |-- user: string (nullable = true)
 |    |    |    |-- vcoreSeconds: long (nullable = true)

scala> apps.createOrReplaceTempView("application_stats")

scala> val sqlDf = spark.sql("select t.apps.app[0].user,t.apps.app[0].name,t.apps.app[0].id,t.apps.app[0].allocatedMB,t.apps.app[0].allocatedVCores,t.apps.app[0].finalStatus from  application_stats t")
sqlDf: org.apache.spark.sql.DataFrame = [apps.app AS `app`[0].user: string, apps.app AS `app`[0].name: string ... 4 more fields]

scala> sqlDf.show
+-------------------------+-------------------------+-----------------------+--------------------------------+------------------------------------+--------------------------------+
|apps.app AS `app`[0].user|apps.app AS `app`[0].name|apps.app AS `app`[0].id|apps.app AS `app`[0].allocatedMB|apps.app AS `app`[0].allocatedVCores|apps.app AS `app`[0].finalStatus|
+-------------------------+-------------------------+-----------------------+--------------------------------+------------------------------------+--------------------------------+
|                     hive|     HIVE-0b45b111-226...|   application_14805...|                              -1|                                  -1|                       SUCCEEDED|
+-------------------------+-------------------------+-----------------------+--------------------------------+------------------------------------+--------------------------------+

val userNameAndId = spark.sql("select t.apps.app.user,t.apps.app.id from application_stats t")


scala> userNameAndId.collect.foreach(println)
[WrappedArray(hive, hive, spark, spark, spark, spark, spark, spark, hive, hive, hive, hive, hive, hive, spark, hive, hive, hive, hive, hive, hive, hive, hive, spark, spark, spark, hive, hive, hive, hive, zeppelin, hive, ambari-qa, ambari-qa, ambari-qa, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive),WrappedArray(application_1480592262242_0021, application_1480592262242_0022, application_1480592262242_0023, application_1480592262242_0024, application_1480592262242_0025, application_1480592262242_0026, application_1480592262242_0027, application_1480592262242_0028, application_1480592262242_0013, application_1480592262242_0014, application_1480592262242_0015, application_1480592262242_0016, application_1480592262242_0017, application_1480592262242_0018, application_1480592262242_0020, application_1480592262242_0037, application_1480592262242_0038, application_1480592262242_0039, application_1480592262242_0040, application_1480592262242_0041, application_1480592262242_0042, application_1480592262242_0043, application_1480592262242_0044, application_1480592262242_0029, application_1480592262242_0030, application_1480592262242_0031, application_1480592262242_0032, application_1480592262242_0033, application_1480592262242_0034, application_1480592262242_0035, application_1480592262242_0036, application_1480587591844_0001, application_1480180301135_0001, application_1480180301135_0002, application_1480180301135_0003, application_1480182107893_0002, application_1480182107893_0003, application_1480182107893_0004, application_1480182107893_0005, application_1480182107893_0001, application_1480592262242_0005, application_1480592262242_0006, application_1480592262242_0007, application_1480592262242_0008, application_1480592262242_0009, application_1480592262242_0010, application_1480592262242_0011, application_1480592262242_0012, application_1480592262242_0001, application_1480592262242_0002, application_1480592262242_0003, application_1480592262242_0004)]


scala> val userNameAndIdDF =  userNameAndId.toDF
userNameAndIdDF: org.apache.spark.sql.DataFrame = [user: array<string>, id: array<string>]


scala> userNameAndIdDF.printSchema
root
 |-- user: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: array (nullable = true)
 |    |-- element: string (containsNull = true)


 scala> userNameAndIdDF.show
+--------------------+--------------------+
|                user|                  id|
+--------------------+--------------------+
|[hive, hive, spar...|[application_1480...|
+--------------------+--------------------+


scala> val dfId =  userNameAndIdDF.select(explode( userNameAndIdDF("id"))).toDF("id")
dfId: org.apache.spark.sql.DataFrame = [id: string]


scala> dfId.show
+--------------------+
|                  id|
+--------------------+
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
+--------------------+
only showing top 20 rows

using dataframe we can create different struct on the same data and run different query to know the differnet aspect of the data.

2,294 Views