Community Articles
Find and share helpful community-sourced technical articles
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.
Labels (1)

Yarn exposes cluster application REST API to get the application stats.With the Applications API, you can obtain a collection of resources, each of which represents an application. When you run a GET operation on this resource, you obtain a collection of Application Objects. In this example, we will analyze application stats using Spark-SQL.

Env: HDP 2.5

Spark Version: Spark-2

Data Preparation: get the apps.json while invoking REST API using curl command

curl -v -X GET -H "Content-Type: application/json" http://`hostname`:8088/ws/v1/cluster/apps > /tmp/apps.json

next step is to invoke spark shell and execute the sequence of steps as following

[spark@rkk1 ~]$ spark-shell 
SPARK_MAJOR_VERSION is set to 2, using Spark2
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/12/17 16:01:52 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://172.26.81.127:4040
Spark context available as 'sc' (master = local[*], app id = local-1481990510817).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.0.2.5.0.0-1133
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_77)
Type in expressions to have them evaluated.
Type :help for more information.


scala> val apps = spark.sqlContext.read.json("file:///tmp/apps.json")
16/12/17 16:02:02 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
apps: org.apache.spark.sql.DataFrame = [apps: struct<app: array<struct<allocatedMB:bigint,allocatedVCores:bigint,amContainerLogs:string,amHostHttpAddress:string,amNodeLabelExpression:string,applicationTags:string,applicationType:string,clusterId:bigint,clusterUsagePercentage:double,diagnostics:string,elapsedTime:bigint,finalStatus:string,finishedTime:bigint,id:string,logAggregationStatus:string,memorySeconds:bigint,name:string,numAMContainerPreempted:bigint,numNonAMContainerPreempted:bigint,preemptedResourceMB:bigint,preemptedResourceVCores:bigint,priority:bigint,progress:double,queue:string,... 9 more fields>>>]

scala> apps.printSchema
root
 |-- apps: struct (nullable = true)
 |    |-- app: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- allocatedMB: long (nullable = true)
 |    |    |    |-- allocatedVCores: long (nullable = true)
 |    |    |    |-- amContainerLogs: string (nullable = true)
 |    |    |    |-- amHostHttpAddress: string (nullable = true)
 |    |    |    |-- amNodeLabelExpression: string (nullable = true)
 |    |    |    |-- applicationTags: string (nullable = true)
 |    |    |    |-- applicationType: string (nullable = true)
 |    |    |    |-- clusterId: long (nullable = true)
 |    |    |    |-- clusterUsagePercentage: double (nullable = true)
 |    |    |    |-- diagnostics: string (nullable = true)
 |    |    |    |-- elapsedTime: long (nullable = true)
 |    |    |    |-- finalStatus: string (nullable = true)
 |    |    |    |-- finishedTime: long (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- logAggregationStatus: string (nullable = true)
 |    |    |    |-- memorySeconds: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- numAMContainerPreempted: long (nullable = true)
 |    |    |    |-- numNonAMContainerPreempted: long (nullable = true)
 |    |    |    |-- preemptedResourceMB: long (nullable = true)
 |    |    |    |-- preemptedResourceVCores: long (nullable = true)
 |    |    |    |-- priority: long (nullable = true)
 |    |    |    |-- progress: double (nullable = true)
 |    |    |    |-- queue: string (nullable = true)
 |    |    |    |-- queueUsagePercentage: double (nullable = true)
 |    |    |    |-- runningContainers: long (nullable = true)
 |    |    |    |-- startedTime: long (nullable = true)
 |    |    |    |-- state: string (nullable = true)
 |    |    |    |-- trackingUI: string (nullable = true)
 |    |    |    |-- trackingUrl: string (nullable = true)
 |    |    |    |-- unmanagedApplication: boolean (nullable = true)
 |    |    |    |-- user: string (nullable = true)
 |    |    |    |-- vcoreSeconds: long (nullable = true)

scala> apps.createOrReplaceTempView("application_stats")

scala> val sqlDf = spark.sql("select t.apps.app[0].user,t.apps.app[0].name,t.apps.app[0].id,t.apps.app[0].allocatedMB,t.apps.app[0].allocatedVCores,t.apps.app[0].finalStatus from  application_stats t")
sqlDf: org.apache.spark.sql.DataFrame = [apps.app AS `app`[0].user: string, apps.app AS `app`[0].name: string ... 4 more fields]

scala> sqlDf.show
+-------------------------+-------------------------+-----------------------+--------------------------------+------------------------------------+--------------------------------+
|apps.app AS `app`[0].user|apps.app AS `app`[0].name|apps.app AS `app`[0].id|apps.app AS `app`[0].allocatedMB|apps.app AS `app`[0].allocatedVCores|apps.app AS `app`[0].finalStatus|
+-------------------------+-------------------------+-----------------------+--------------------------------+------------------------------------+--------------------------------+
|                     hive|     HIVE-0b45b111-226...|   application_14805...|                              -1|                                  -1|                       SUCCEEDED|
+-------------------------+-------------------------+-----------------------+--------------------------------+------------------------------------+--------------------------------+

val userNameAndId = spark.sql("select t.apps.app.user,t.apps.app.id from application_stats t")


scala> userNameAndId.collect.foreach(println)
[WrappedArray(hive, hive, spark, spark, spark, spark, spark, spark, hive, hive, hive, hive, hive, hive, spark, hive, hive, hive, hive, hive, hive, hive, hive, spark, spark, spark, hive, hive, hive, hive, zeppelin, hive, ambari-qa, ambari-qa, ambari-qa, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive),WrappedArray(application_1480592262242_0021, application_1480592262242_0022, application_1480592262242_0023, application_1480592262242_0024, application_1480592262242_0025, application_1480592262242_0026, application_1480592262242_0027, application_1480592262242_0028, application_1480592262242_0013, application_1480592262242_0014, application_1480592262242_0015, application_1480592262242_0016, application_1480592262242_0017, application_1480592262242_0018, application_1480592262242_0020, application_1480592262242_0037, application_1480592262242_0038, application_1480592262242_0039, application_1480592262242_0040, application_1480592262242_0041, application_1480592262242_0042, application_1480592262242_0043, application_1480592262242_0044, application_1480592262242_0029, application_1480592262242_0030, application_1480592262242_0031, application_1480592262242_0032, application_1480592262242_0033, application_1480592262242_0034, application_1480592262242_0035, application_1480592262242_0036, application_1480587591844_0001, application_1480180301135_0001, application_1480180301135_0002, application_1480180301135_0003, application_1480182107893_0002, application_1480182107893_0003, application_1480182107893_0004, application_1480182107893_0005, application_1480182107893_0001, application_1480592262242_0005, application_1480592262242_0006, application_1480592262242_0007, application_1480592262242_0008, application_1480592262242_0009, application_1480592262242_0010, application_1480592262242_0011, application_1480592262242_0012, application_1480592262242_0001, application_1480592262242_0002, application_1480592262242_0003, application_1480592262242_0004)]


scala> val userNameAndIdDF =  userNameAndId.toDF
userNameAndIdDF: org.apache.spark.sql.DataFrame = [user: array<string>, id: array<string>]


scala> userNameAndIdDF.printSchema
root
 |-- user: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: array (nullable = true)
 |    |-- element: string (containsNull = true)


 scala> userNameAndIdDF.show
+--------------------+--------------------+
|                user|                  id|
+--------------------+--------------------+
|[hive, hive, spar...|[application_1480...|
+--------------------+--------------------+


scala> val dfId =  userNameAndIdDF.select(explode( userNameAndIdDF("id"))).toDF("id")
dfId: org.apache.spark.sql.DataFrame = [id: string]


scala> dfId.show
+--------------------+
|                  id|
+--------------------+
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
+--------------------+
only showing top 20 rows

using dataframe we can create different struct on the same data and run different query to know the differnet aspect of the data.

1,558 Views
Don't have an account?
Coming from Hortonworks? Activate your account here
Version history
Revision #:
1 of 1
Last update:
‎12-17-2016 04:47 PM
Updated by:
 
Contributors
Top Kudoed Authors