Member since 
    
	
		
		
		04-25-2016
	
	
	
	
	
	
	
	
	
	
	
	
	
	
			
      
                579
            
            
                Posts
            
        
                609
            
            
                Kudos Received
            
        
                111
            
            
                Solutions
            
        My Accepted Solutions
| Title | Views | Posted | 
|---|---|---|
| 2926 | 02-12-2020 03:17 PM | |
| 2136 | 08-10-2017 09:42 AM | |
| 12474 | 07-28-2017 03:57 AM | |
| 3411 | 07-19-2017 02:43 AM | |
| 2522 | 07-13-2017 11:42 AM | 
			
    
	
		
		
		12-18-2016
	
		
		12:29 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		3 Kudos
		
	
				
		
	
		
					
							 while debugging kafka producer slowness, I observed following steps Kafka producer take while producing a single record to kafka broker.  ENV: HDP 2.5  2 node kafka cluster, single producer producing a record to 'testtopic' which has 2 partitions with 2 replicas  Kafka Producer start with the configured settings it start adding matrices sensors.  update cluster metadata version which includes cluster information like broker nodes and partitions, assign version id to this cluster metadata version.  Updated cluster metadata version 1 to Cluster(nodes = [Node(-2, rkk2, 6667), Node(-1, rkk1, 6667)], partitions = [])  Set up and start Kafka producer I/O thread aka Sender thread.  Request metadata update for topic testtopic.  Producer's NetworkClient metadata request to one of the broker which consist of api_key,api_version,correlation_id and client_id.  Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=producer-1}, body={topics=[testtopic]}), isInitiatedByNetworkClient, createdTimeMs=1482047450018, sendTimeMs=0) to node -1  In the response get metadata from cluster and update it's own copy of metadata, the response include broker information along with topic partitions, its leader and ISR.  Updated cluster metadata version 2 to Cluster(nodes = [Node(1002, rkk2.hdp.local, 6667), Node(1001, rkk1.hdp.local, 6667)], partitions = [Partition(topic = testtopic, partition = 1, leader = 1002, replicas = [1002,1001,], isr = [1002,1001,], Partition(topic = testtopic, partition = 0, leader = 1001, replicas = [1002,1001,], isr = [1001,1002,]])  producer serialized key and value sent as produce record to the leader of that partition, the partition is decided based on the default partitioner scheme if not configured.  The default partitioning strategy has following flow while deciding partition,  If a partition is specified in the record, use it  If no partition is specified but a key is present choose a partition based on a hash of the key  If no partition or key is present choose a partition in a round-robin fashion  Producer allocate memory buffer for topic configured using batch.size Producer wake up Sender thread once the buffer is full or linger.ms reached or if it is a new batch.  Sender thread create a produce request to a leader of partition like this for a produce record with a correlation_id.  Created 1 produce requests: [ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.producer.internals.Sender$1@11b2c43e, request=RequestSend(header={api_key=0,api_version=1,correlation_id=1,client_id=producer-1}, body={acks=1,timeout=30000,topic_data=[{topic=testtopic,data=[{partition=1,record_set=java.nio.HeapByteBuffer[pos=0 lim=76 cap=100000]}]}]}), createdTimeMs=1482047460410, sendTimeMs=0)]  once the record written successfully to brokers based on ack settings, Sender thread get the response back for correlation_id and Callback get called.  Received produce response from node 1002 with correlation id 1 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
	
					
			
		
	
	
	
	
				
		
	
	
			
    
	
		
		
		08-23-2018
	
		
		12:56 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Nice and very useful Article @Rajkumar Singh .. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		12-17-2016
	
		
		04:47 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		2 Kudos
		
	
				
		
	
		
					
							 Yarn exposes cluster application REST API to get the application stats.With the Applications API, you can obtain a collection of resources, each of which represents an application. When you run a GET operation on this resource, you obtain a collection of Application Objects. In this example, we will analyze application stats using Spark-SQL.  Env: HDP 2.5  Spark Version: Spark-2  Data Preparation: get the apps.json while invoking REST API using curl command  curl -v -X GET -H "Content-Type: application/json" http://`hostname`:8088/ws/v1/cluster/apps > /tmp/apps.json  next step is to invoke spark shell and execute the sequence of steps as following  [spark@rkk1 ~]$ spark-shell 
SPARK_MAJOR_VERSION is set to 2, using Spark2
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/12/17 16:01:52 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://172.26.81.127:4040
Spark context available as 'sc' (master = local[*], app id = local-1481990510817).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.0.2.5.0.0-1133
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_77)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val apps = spark.sqlContext.read.json("file:///tmp/apps.json")
16/12/17 16:02:02 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
apps: org.apache.spark.sql.DataFrame = [apps: struct<app: array<struct<allocatedMB:bigint,allocatedVCores:bigint,amContainerLogs:string,amHostHttpAddress:string,amNodeLabelExpression:string,applicationTags:string,applicationType:string,clusterId:bigint,clusterUsagePercentage:double,diagnostics:string,elapsedTime:bigint,finalStatus:string,finishedTime:bigint,id:string,logAggregationStatus:string,memorySeconds:bigint,name:string,numAMContainerPreempted:bigint,numNonAMContainerPreempted:bigint,preemptedResourceMB:bigint,preemptedResourceVCores:bigint,priority:bigint,progress:double,queue:string,... 9 more fields>>>]
scala> apps.printSchema
root
 |-- apps: struct (nullable = true)
 |    |-- app: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- allocatedMB: long (nullable = true)
 |    |    |    |-- allocatedVCores: long (nullable = true)
 |    |    |    |-- amContainerLogs: string (nullable = true)
 |    |    |    |-- amHostHttpAddress: string (nullable = true)
 |    |    |    |-- amNodeLabelExpression: string (nullable = true)
 |    |    |    |-- applicationTags: string (nullable = true)
 |    |    |    |-- applicationType: string (nullable = true)
 |    |    |    |-- clusterId: long (nullable = true)
 |    |    |    |-- clusterUsagePercentage: double (nullable = true)
 |    |    |    |-- diagnostics: string (nullable = true)
 |    |    |    |-- elapsedTime: long (nullable = true)
 |    |    |    |-- finalStatus: string (nullable = true)
 |    |    |    |-- finishedTime: long (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- logAggregationStatus: string (nullable = true)
 |    |    |    |-- memorySeconds: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- numAMContainerPreempted: long (nullable = true)
 |    |    |    |-- numNonAMContainerPreempted: long (nullable = true)
 |    |    |    |-- preemptedResourceMB: long (nullable = true)
 |    |    |    |-- preemptedResourceVCores: long (nullable = true)
 |    |    |    |-- priority: long (nullable = true)
 |    |    |    |-- progress: double (nullable = true)
 |    |    |    |-- queue: string (nullable = true)
 |    |    |    |-- queueUsagePercentage: double (nullable = true)
 |    |    |    |-- runningContainers: long (nullable = true)
 |    |    |    |-- startedTime: long (nullable = true)
 |    |    |    |-- state: string (nullable = true)
 |    |    |    |-- trackingUI: string (nullable = true)
 |    |    |    |-- trackingUrl: string (nullable = true)
 |    |    |    |-- unmanagedApplication: boolean (nullable = true)
 |    |    |    |-- user: string (nullable = true)
 |    |    |    |-- vcoreSeconds: long (nullable = true)
scala> apps.createOrReplaceTempView("application_stats")
scala> val sqlDf = spark.sql("select t.apps.app[0].user,t.apps.app[0].name,t.apps.app[0].id,t.apps.app[0].allocatedMB,t.apps.app[0].allocatedVCores,t.apps.app[0].finalStatus from  application_stats t")
sqlDf: org.apache.spark.sql.DataFrame = [apps.app AS `app`[0].user: string, apps.app AS `app`[0].name: string ... 4 more fields]
scala> sqlDf.show
+-------------------------+-------------------------+-----------------------+--------------------------------+------------------------------------+--------------------------------+
|apps.app AS `app`[0].user|apps.app AS `app`[0].name|apps.app AS `app`[0].id|apps.app AS `app`[0].allocatedMB|apps.app AS `app`[0].allocatedVCores|apps.app AS `app`[0].finalStatus|
+-------------------------+-------------------------+-----------------------+--------------------------------+------------------------------------+--------------------------------+
|                     hive|     HIVE-0b45b111-226...|   application_14805...|                              -1|                                  -1|                       SUCCEEDED|
+-------------------------+-------------------------+-----------------------+--------------------------------+------------------------------------+--------------------------------+
val userNameAndId = spark.sql("select t.apps.app.user,t.apps.app.id from application_stats t")
scala> userNameAndId.collect.foreach(println)
[WrappedArray(hive, hive, spark, spark, spark, spark, spark, spark, hive, hive, hive, hive, hive, hive, spark, hive, hive, hive, hive, hive, hive, hive, hive, spark, spark, spark, hive, hive, hive, hive, zeppelin, hive, ambari-qa, ambari-qa, ambari-qa, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive),WrappedArray(application_1480592262242_0021, application_1480592262242_0022, application_1480592262242_0023, application_1480592262242_0024, application_1480592262242_0025, application_1480592262242_0026, application_1480592262242_0027, application_1480592262242_0028, application_1480592262242_0013, application_1480592262242_0014, application_1480592262242_0015, application_1480592262242_0016, application_1480592262242_0017, application_1480592262242_0018, application_1480592262242_0020, application_1480592262242_0037, application_1480592262242_0038, application_1480592262242_0039, application_1480592262242_0040, application_1480592262242_0041, application_1480592262242_0042, application_1480592262242_0043, application_1480592262242_0044, application_1480592262242_0029, application_1480592262242_0030, application_1480592262242_0031, application_1480592262242_0032, application_1480592262242_0033, application_1480592262242_0034, application_1480592262242_0035, application_1480592262242_0036, application_1480587591844_0001, application_1480180301135_0001, application_1480180301135_0002, application_1480180301135_0003, application_1480182107893_0002, application_1480182107893_0003, application_1480182107893_0004, application_1480182107893_0005, application_1480182107893_0001, application_1480592262242_0005, application_1480592262242_0006, application_1480592262242_0007, application_1480592262242_0008, application_1480592262242_0009, application_1480592262242_0010, application_1480592262242_0011, application_1480592262242_0012, application_1480592262242_0001, application_1480592262242_0002, application_1480592262242_0003, application_1480592262242_0004)]
scala> val userNameAndIdDF =  userNameAndId.toDF
userNameAndIdDF: org.apache.spark.sql.DataFrame = [user: array<string>, id: array<string>]
scala> userNameAndIdDF.printSchema
root
 |-- user: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: array (nullable = true)
 |    |-- element: string (containsNull = true)
 scala> userNameAndIdDF.show
+--------------------+--------------------+
|                user|                  id|
+--------------------+--------------------+
|[hive, hive, spar...|[application_1480...|
+--------------------+--------------------+
scala> val dfId =  userNameAndIdDF.select(explode( userNameAndIdDF("id"))).toDF("id")
dfId: org.apache.spark.sql.DataFrame = [id: string]
scala> dfId.show
+--------------------+
|                  id|
+--------------------+
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
+--------------------+
only showing top 20 rows
  using dataframe we can create different struct on the same data and run different query to know the differnet aspect of the data. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
	
					
			
		
	
	
	
	
				
		
	
	
			
    
	
		
		
		11-13-2018
	
		
		06:33 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 I think you need to delete those files as well, then it works...  [root@centos10 krb5kdc]# ll  total 28  -rw------- 1 root root  29 Nov 13 09:36 kadm5.acl  -rw------- 1 root root  29 Nov 13 09:24 kadm5.acl.rpmsave  -rw------- 1 root root  29 Nov 13 09:36 kadm5.acly  -rw------- 1 root root  448 Nov 13 09:35 kdc.conf  -rw------- 1 root root  448 Nov 13 09:24 kdc.conf.rpmsave  -rw------- 1 root root 8192 Nov 13 09:27 principal   <<<<<<<<<<<<<<<<<  -rw------- 1 root root  0 Nov 13 09:37 principal.ok<<<<<<<<<<<<<<<<<  then  it works  [root@centos10 ~]# /usr/sbin/kdb5_util create -r BEER.LOC -s   Loading random data  Initializing database '/var/kerberos/krb5kdc/principal' for realm 'BEER.LOC',  master key name 'K/M@BEER.LOC'  You will be prompted for the database Master Password.  It is important that you NOT FORGET this password.  Enter KDC database master key:   Re-enter KDC database master key to verify:   [root@centos10 ~]#  
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		06-18-2018
	
		
		11:39 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 @Kit Menke isn't wrong.  Take a look at the API docs. You'll notice there are several options for creating data frames from an RDD. In your case; it looks as though you have an RDD of class type Row; so you'll need to also provide a schema to the createDataFrame() method.  Scala API docs:  https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.SQLContext  import org.apache.spark.sql._
import org.apache.spark.sql.types._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val schema =
  StructType(
    StructField("name", StringType, false) ::
    StructField("age", IntegerType, true) :: Nil)
val people =
  sc.textFile("examples/src/main/resources/people.txt").map(
    _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
val dataFrame = sqlContext.createDataFrame(people, schema)
dataFrame.printSchema
// root
// |-- name: string (nullable = false)
// |-- age: integer (nullable = true)
dataFrame.createOrReplaceTempView("people")
sqlContext.sql("select name from people").collect.foreach(println) 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		11-29-2016
	
		
		07:46 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Thank you very much, its working fine now. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		11-15-2016
	
		
		02:09 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							@Iyappan Gopalakrishnan Adding new users to a secured NiFI 0.x / HDF 1.x version of NiFi can be done directly via the NIFi UI.  I am assuming you have already secured your NiFi and manually added the initial admin to the authorized-users.xml file.  Adding new users is pretty easy.  Simply have the new user attempt to access the https web address for your nifi 0.7 install.  After authenticating successfully (Done via client certs, ldap, etc), they will see a screen telling them they are not authorized for access and giving them the opportunity to request access.  Once they request access, the "Admin" user who does have access to the UI will need need to go into the users UI by clicking on the below Icon which can be found in the upper right corner of the NiFi UI:      From within that UI you will see all existing users and any pending authorization users.  Simply click on the pencil icon to the right of any pending users to authorize them for 1 or more of the available user roles (Administrator, Dataflow Manger, Read Only, or Provenance).  The administration-guide included with your install can give you more details on each of the roles and what privileges  each grants to the users.  Thanks,  Matt 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		10-18-2016
	
		
		11:08 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 HiveServer2 does not expose jmx or equivalent metrics but often we require to monitor the Hiveserver2 process to know cpu usage, heap usage and state of various thread along with some other params like no of class loaded by hs2 jvm. it is also very useful if you want to run some cpu/memory sampling to profile hiveserver2.  Step 1: login into Ambari and open hive advanced configuration  Step 2: expand advanced hive-env settings and add the following configuration in hive-env template  if [ "$SERVICE" = "hiveserver2" ]; then
  export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS  -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=8008"
fi
  Step 3: Restart HiveServer2  Step 4: on client machine open jvisualvm($JAVA_HOME/bin), once open go to File -> Add JMX Connection -> enter hostname and port no(it's 8008 in this case) and you are good to go.     
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
	
					
			
		
	
	
	
	
				
		
	
	
			
    
	
		
		
		10-09-2016
	
		
		10:16 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 You are welcome. Glad it worked. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		10-11-2016
	
		
		01:31 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		6 Kudos
		
	
				
		
	
		
					
							 @Rajkumar Singh  All broker configurations can be found in Kafka /conf folder. Broker configuration are stored in files with names like server.properties. There will be one server.properties per broker, usually named server1.properties, server2.properties etc.  +++++++++  If any of the responses was helpful, please don't forget to vote and accept the best answer to your question. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		 
        













