Created on 12-17-2016 04:47 PM
Yarn exposes cluster application REST API to get the application stats.With the Applications API, you can obtain a collection of resources, each of which represents an application. When you run a GET operation on this resource, you obtain a collection of Application Objects. In this example, we will analyze application stats using Spark-SQL.
Env: HDP 2.5
Spark Version: Spark-2
Data Preparation: get the apps.json while invoking REST API using curl command
curl -v -X GET -H "Content-Type: application/json" http://`hostname`:8088/ws/v1/cluster/apps > /tmp/apps.json
next step is to invoke spark shell and execute the sequence of steps as following
[spark@rkk1 ~]$ spark-shell SPARK_MAJOR_VERSION is set to 2, using Spark2 Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). 16/12/17 16:01:52 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect. Spark context Web UI available at http://172.26.81.127:4040 Spark context available as 'sc' (master = local[*], app id = local-1481990510817). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.0.0.2.5.0.0-1133 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_77) Type in expressions to have them evaluated. Type :help for more information. scala> val apps = spark.sqlContext.read.json("file:///tmp/apps.json") 16/12/17 16:02:02 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf. apps: org.apache.spark.sql.DataFrame = [apps: struct<app: array<struct<allocatedMB:bigint,allocatedVCores:bigint,amContainerLogs:string,amHostHttpAddress:string,amNodeLabelExpression:string,applicationTags:string,applicationType:string,clusterId:bigint,clusterUsagePercentage:double,diagnostics:string,elapsedTime:bigint,finalStatus:string,finishedTime:bigint,id:string,logAggregationStatus:string,memorySeconds:bigint,name:string,numAMContainerPreempted:bigint,numNonAMContainerPreempted:bigint,preemptedResourceMB:bigint,preemptedResourceVCores:bigint,priority:bigint,progress:double,queue:string,... 9 more fields>>>] scala> apps.printSchema root |-- apps: struct (nullable = true) | |-- app: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- allocatedMB: long (nullable = true) | | | |-- allocatedVCores: long (nullable = true) | | | |-- amContainerLogs: string (nullable = true) | | | |-- amHostHttpAddress: string (nullable = true) | | | |-- amNodeLabelExpression: string (nullable = true) | | | |-- applicationTags: string (nullable = true) | | | |-- applicationType: string (nullable = true) | | | |-- clusterId: long (nullable = true) | | | |-- clusterUsagePercentage: double (nullable = true) | | | |-- diagnostics: string (nullable = true) | | | |-- elapsedTime: long (nullable = true) | | | |-- finalStatus: string (nullable = true) | | | |-- finishedTime: long (nullable = true) | | | |-- id: string (nullable = true) | | | |-- logAggregationStatus: string (nullable = true) | | | |-- memorySeconds: long (nullable = true) | | | |-- name: string (nullable = true) | | | |-- numAMContainerPreempted: long (nullable = true) | | | |-- numNonAMContainerPreempted: long (nullable = true) | | | |-- preemptedResourceMB: long (nullable = true) | | | |-- preemptedResourceVCores: long (nullable = true) | | | |-- priority: long (nullable = true) | | | |-- progress: double (nullable = true) | | | |-- queue: string (nullable = true) | | | |-- queueUsagePercentage: double (nullable = true) | | | |-- runningContainers: long (nullable = true) | | | |-- startedTime: long (nullable = true) | | | |-- state: string (nullable = true) | | | |-- trackingUI: string (nullable = true) | | | |-- trackingUrl: string (nullable = true) | | | |-- unmanagedApplication: boolean (nullable = true) | | | |-- user: string (nullable = true) | | | |-- vcoreSeconds: long (nullable = true) scala> apps.createOrReplaceTempView("application_stats") scala> val sqlDf = spark.sql("select t.apps.app[0].user,t.apps.app[0].name,t.apps.app[0].id,t.apps.app[0].allocatedMB,t.apps.app[0].allocatedVCores,t.apps.app[0].finalStatus from application_stats t") sqlDf: org.apache.spark.sql.DataFrame = [apps.app AS `app`[0].user: string, apps.app AS `app`[0].name: string ... 4 more fields] scala> sqlDf.show +-------------------------+-------------------------+-----------------------+--------------------------------+------------------------------------+--------------------------------+ |apps.app AS `app`[0].user|apps.app AS `app`[0].name|apps.app AS `app`[0].id|apps.app AS `app`[0].allocatedMB|apps.app AS `app`[0].allocatedVCores|apps.app AS `app`[0].finalStatus| +-------------------------+-------------------------+-----------------------+--------------------------------+------------------------------------+--------------------------------+ | hive| HIVE-0b45b111-226...| application_14805...| -1| -1| SUCCEEDED| +-------------------------+-------------------------+-----------------------+--------------------------------+------------------------------------+--------------------------------+ val userNameAndId = spark.sql("select t.apps.app.user,t.apps.app.id from application_stats t") scala> userNameAndId.collect.foreach(println) [WrappedArray(hive, hive, spark, spark, spark, spark, spark, spark, hive, hive, hive, hive, hive, hive, spark, hive, hive, hive, hive, hive, hive, hive, hive, spark, spark, spark, hive, hive, hive, hive, zeppelin, hive, ambari-qa, ambari-qa, ambari-qa, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive),WrappedArray(application_1480592262242_0021, application_1480592262242_0022, application_1480592262242_0023, application_1480592262242_0024, application_1480592262242_0025, application_1480592262242_0026, application_1480592262242_0027, application_1480592262242_0028, application_1480592262242_0013, application_1480592262242_0014, application_1480592262242_0015, application_1480592262242_0016, application_1480592262242_0017, application_1480592262242_0018, application_1480592262242_0020, application_1480592262242_0037, application_1480592262242_0038, application_1480592262242_0039, application_1480592262242_0040, application_1480592262242_0041, application_1480592262242_0042, application_1480592262242_0043, application_1480592262242_0044, application_1480592262242_0029, application_1480592262242_0030, application_1480592262242_0031, application_1480592262242_0032, application_1480592262242_0033, application_1480592262242_0034, application_1480592262242_0035, application_1480592262242_0036, application_1480587591844_0001, application_1480180301135_0001, application_1480180301135_0002, application_1480180301135_0003, application_1480182107893_0002, application_1480182107893_0003, application_1480182107893_0004, application_1480182107893_0005, application_1480182107893_0001, application_1480592262242_0005, application_1480592262242_0006, application_1480592262242_0007, application_1480592262242_0008, application_1480592262242_0009, application_1480592262242_0010, application_1480592262242_0011, application_1480592262242_0012, application_1480592262242_0001, application_1480592262242_0002, application_1480592262242_0003, application_1480592262242_0004)] scala> val userNameAndIdDF = userNameAndId.toDF userNameAndIdDF: org.apache.spark.sql.DataFrame = [user: array<string>, id: array<string>] scala> userNameAndIdDF.printSchema root |-- user: array (nullable = true) | |-- element: string (containsNull = true) |-- id: array (nullable = true) | |-- element: string (containsNull = true) scala> userNameAndIdDF.show +--------------------+--------------------+ | user| id| +--------------------+--------------------+ |[hive, hive, spar...|[application_1480...| +--------------------+--------------------+ scala> val dfId = userNameAndIdDF.select(explode( userNameAndIdDF("id"))).toDF("id") dfId: org.apache.spark.sql.DataFrame = [id: string] scala> dfId.show +--------------------+ | id| +--------------------+ |application_14805...| |application_14805...| |application_14805...| |application_14805...| |application_14805...| |application_14805...| |application_14805...| |application_14805...| |application_14805...| |application_14805...| |application_14805...| |application_14805...| |application_14805...| |application_14805...| |application_14805...| |application_14805...| |application_14805...| |application_14805...| |application_14805...| |application_14805...| +--------------------+ only showing top 20 rows
using dataframe we can create different struct on the same data and run different query to know the differnet aspect of the data.