Member since
04-25-2016
579
Posts
609
Kudos Received
111
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2357 | 02-12-2020 03:17 PM | |
1638 | 08-10-2017 09:42 AM | |
11199 | 07-28-2017 03:57 AM | |
2677 | 07-19-2017 02:43 AM | |
1980 | 07-13-2017 11:42 AM |
12-18-2016
12:29 PM
3 Kudos
while debugging kafka producer slowness, I observed following steps Kafka producer take while producing a single record to kafka broker. ENV: HDP 2.5 2 node kafka cluster, single producer producing a record to 'testtopic' which has 2 partitions with 2 replicas Kafka Producer start with the configured settings it start adding matrices sensors. update cluster metadata version which includes cluster information like broker nodes and partitions, assign version id to this cluster metadata version. Updated cluster metadata version 1 to Cluster(nodes = [Node(-2, rkk2, 6667), Node(-1, rkk1, 6667)], partitions = []) Set up and start Kafka producer I/O thread aka Sender thread. Request metadata update for topic testtopic. Producer's NetworkClient metadata request to one of the broker which consist of api_key,api_version,correlation_id and client_id. Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=producer-1}, body={topics=[testtopic]}), isInitiatedByNetworkClient, createdTimeMs=1482047450018, sendTimeMs=0) to node -1 In the response get metadata from cluster and update it's own copy of metadata, the response include broker information along with topic partitions, its leader and ISR. Updated cluster metadata version 2 to Cluster(nodes = [Node(1002, rkk2.hdp.local, 6667), Node(1001, rkk1.hdp.local, 6667)], partitions = [Partition(topic = testtopic, partition = 1, leader = 1002, replicas = [1002,1001,], isr = [1002,1001,], Partition(topic = testtopic, partition = 0, leader = 1001, replicas = [1002,1001,], isr = [1001,1002,]]) producer serialized key and value sent as produce record to the leader of that partition, the partition is decided based on the default partitioner scheme if not configured. The default partitioning strategy has following flow while deciding partition, If a partition is specified in the record, use it If no partition is specified but a key is present choose a partition based on a hash of the key If no partition or key is present choose a partition in a round-robin fashion Producer allocate memory buffer for topic configured using batch.size Producer wake up Sender thread once the buffer is full or linger.ms reached or if it is a new batch. Sender thread create a produce request to a leader of partition like this for a produce record with a correlation_id. Created 1 produce requests: [ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.producer.internals.Sender$1@11b2c43e, request=RequestSend(header={api_key=0,api_version=1,correlation_id=1,client_id=producer-1}, body={acks=1,timeout=30000,topic_data=[{topic=testtopic,data=[{partition=1,record_set=java.nio.HeapByteBuffer[pos=0 lim=76 cap=100000]}]}]}), createdTimeMs=1482047460410, sendTimeMs=0)] once the record written successfully to brokers based on ack settings, Sender thread get the response back for correlation_id and Callback get called. Received produce response from node 1002 with correlation id 1
... View more
Labels:
08-23-2018
12:56 PM
Nice and very useful Article @Rajkumar Singh ..
... View more
12-17-2016
04:47 PM
2 Kudos
Yarn exposes cluster application REST API to get the application stats.With the Applications API, you can obtain a collection of resources, each of which represents an application. When you run a GET operation on this resource, you obtain a collection of Application Objects. In this example, we will analyze application stats using Spark-SQL. Env: HDP 2.5 Spark Version: Spark-2 Data Preparation: get the apps.json while invoking REST API using curl command curl -v -X GET -H "Content-Type: application/json" http://`hostname`:8088/ws/v1/cluster/apps > /tmp/apps.json next step is to invoke spark shell and execute the sequence of steps as following [spark@rkk1 ~]$ spark-shell
SPARK_MAJOR_VERSION is set to 2, using Spark2
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/12/17 16:01:52 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://172.26.81.127:4040
Spark context available as 'sc' (master = local[*], app id = local-1481990510817).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.0.0.2.5.0.0-1133
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_77)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val apps = spark.sqlContext.read.json("file:///tmp/apps.json")
16/12/17 16:02:02 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
apps: org.apache.spark.sql.DataFrame = [apps: struct<app: array<struct<allocatedMB:bigint,allocatedVCores:bigint,amContainerLogs:string,amHostHttpAddress:string,amNodeLabelExpression:string,applicationTags:string,applicationType:string,clusterId:bigint,clusterUsagePercentage:double,diagnostics:string,elapsedTime:bigint,finalStatus:string,finishedTime:bigint,id:string,logAggregationStatus:string,memorySeconds:bigint,name:string,numAMContainerPreempted:bigint,numNonAMContainerPreempted:bigint,preemptedResourceMB:bigint,preemptedResourceVCores:bigint,priority:bigint,progress:double,queue:string,... 9 more fields>>>]
scala> apps.printSchema
root
|-- apps: struct (nullable = true)
| |-- app: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- allocatedMB: long (nullable = true)
| | | |-- allocatedVCores: long (nullable = true)
| | | |-- amContainerLogs: string (nullable = true)
| | | |-- amHostHttpAddress: string (nullable = true)
| | | |-- amNodeLabelExpression: string (nullable = true)
| | | |-- applicationTags: string (nullable = true)
| | | |-- applicationType: string (nullable = true)
| | | |-- clusterId: long (nullable = true)
| | | |-- clusterUsagePercentage: double (nullable = true)
| | | |-- diagnostics: string (nullable = true)
| | | |-- elapsedTime: long (nullable = true)
| | | |-- finalStatus: string (nullable = true)
| | | |-- finishedTime: long (nullable = true)
| | | |-- id: string (nullable = true)
| | | |-- logAggregationStatus: string (nullable = true)
| | | |-- memorySeconds: long (nullable = true)
| | | |-- name: string (nullable = true)
| | | |-- numAMContainerPreempted: long (nullable = true)
| | | |-- numNonAMContainerPreempted: long (nullable = true)
| | | |-- preemptedResourceMB: long (nullable = true)
| | | |-- preemptedResourceVCores: long (nullable = true)
| | | |-- priority: long (nullable = true)
| | | |-- progress: double (nullable = true)
| | | |-- queue: string (nullable = true)
| | | |-- queueUsagePercentage: double (nullable = true)
| | | |-- runningContainers: long (nullable = true)
| | | |-- startedTime: long (nullable = true)
| | | |-- state: string (nullable = true)
| | | |-- trackingUI: string (nullable = true)
| | | |-- trackingUrl: string (nullable = true)
| | | |-- unmanagedApplication: boolean (nullable = true)
| | | |-- user: string (nullable = true)
| | | |-- vcoreSeconds: long (nullable = true)
scala> apps.createOrReplaceTempView("application_stats")
scala> val sqlDf = spark.sql("select t.apps.app[0].user,t.apps.app[0].name,t.apps.app[0].id,t.apps.app[0].allocatedMB,t.apps.app[0].allocatedVCores,t.apps.app[0].finalStatus from application_stats t")
sqlDf: org.apache.spark.sql.DataFrame = [apps.app AS `app`[0].user: string, apps.app AS `app`[0].name: string ... 4 more fields]
scala> sqlDf.show
+-------------------------+-------------------------+-----------------------+--------------------------------+------------------------------------+--------------------------------+
|apps.app AS `app`[0].user|apps.app AS `app`[0].name|apps.app AS `app`[0].id|apps.app AS `app`[0].allocatedMB|apps.app AS `app`[0].allocatedVCores|apps.app AS `app`[0].finalStatus|
+-------------------------+-------------------------+-----------------------+--------------------------------+------------------------------------+--------------------------------+
| hive| HIVE-0b45b111-226...| application_14805...| -1| -1| SUCCEEDED|
+-------------------------+-------------------------+-----------------------+--------------------------------+------------------------------------+--------------------------------+
val userNameAndId = spark.sql("select t.apps.app.user,t.apps.app.id from application_stats t")
scala> userNameAndId.collect.foreach(println)
[WrappedArray(hive, hive, spark, spark, spark, spark, spark, spark, hive, hive, hive, hive, hive, hive, spark, hive, hive, hive, hive, hive, hive, hive, hive, spark, spark, spark, hive, hive, hive, hive, zeppelin, hive, ambari-qa, ambari-qa, ambari-qa, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive, hive),WrappedArray(application_1480592262242_0021, application_1480592262242_0022, application_1480592262242_0023, application_1480592262242_0024, application_1480592262242_0025, application_1480592262242_0026, application_1480592262242_0027, application_1480592262242_0028, application_1480592262242_0013, application_1480592262242_0014, application_1480592262242_0015, application_1480592262242_0016, application_1480592262242_0017, application_1480592262242_0018, application_1480592262242_0020, application_1480592262242_0037, application_1480592262242_0038, application_1480592262242_0039, application_1480592262242_0040, application_1480592262242_0041, application_1480592262242_0042, application_1480592262242_0043, application_1480592262242_0044, application_1480592262242_0029, application_1480592262242_0030, application_1480592262242_0031, application_1480592262242_0032, application_1480592262242_0033, application_1480592262242_0034, application_1480592262242_0035, application_1480592262242_0036, application_1480587591844_0001, application_1480180301135_0001, application_1480180301135_0002, application_1480180301135_0003, application_1480182107893_0002, application_1480182107893_0003, application_1480182107893_0004, application_1480182107893_0005, application_1480182107893_0001, application_1480592262242_0005, application_1480592262242_0006, application_1480592262242_0007, application_1480592262242_0008, application_1480592262242_0009, application_1480592262242_0010, application_1480592262242_0011, application_1480592262242_0012, application_1480592262242_0001, application_1480592262242_0002, application_1480592262242_0003, application_1480592262242_0004)]
scala> val userNameAndIdDF = userNameAndId.toDF
userNameAndIdDF: org.apache.spark.sql.DataFrame = [user: array<string>, id: array<string>]
scala> userNameAndIdDF.printSchema
root
|-- user: array (nullable = true)
| |-- element: string (containsNull = true)
|-- id: array (nullable = true)
| |-- element: string (containsNull = true)
scala> userNameAndIdDF.show
+--------------------+--------------------+
| user| id|
+--------------------+--------------------+
|[hive, hive, spar...|[application_1480...|
+--------------------+--------------------+
scala> val dfId = userNameAndIdDF.select(explode( userNameAndIdDF("id"))).toDF("id")
dfId: org.apache.spark.sql.DataFrame = [id: string]
scala> dfId.show
+--------------------+
| id|
+--------------------+
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
|application_14805...|
+--------------------+
only showing top 20 rows
using dataframe we can create different struct on the same data and run different query to know the differnet aspect of the data.
... View more
Labels:
11-13-2018
06:33 PM
I think you need to delete those files as well, then it works... [root@centos10 krb5kdc]# ll total 28 -rw------- 1 root root 29 Nov 13 09:36 kadm5.acl -rw------- 1 root root 29 Nov 13 09:24 kadm5.acl.rpmsave -rw------- 1 root root 29 Nov 13 09:36 kadm5.acly -rw------- 1 root root 448 Nov 13 09:35 kdc.conf -rw------- 1 root root 448 Nov 13 09:24 kdc.conf.rpmsave -rw------- 1 root root 8192 Nov 13 09:27 principal <<<<<<<<<<<<<<<<< -rw------- 1 root root 0 Nov 13 09:37 principal.ok<<<<<<<<<<<<<<<<< then it works [root@centos10 ~]# /usr/sbin/kdb5_util create -r BEER.LOC -s Loading random data Initializing database '/var/kerberos/krb5kdc/principal' for realm 'BEER.LOC', master key name 'K/M@BEER.LOC' You will be prompted for the database Master Password. It is important that you NOT FORGET this password. Enter KDC database master key: Re-enter KDC database master key to verify: [root@centos10 ~]#
... View more
06-18-2018
11:39 PM
@Kit Menke isn't wrong. Take a look at the API docs. You'll notice there are several options for creating data frames from an RDD. In your case; it looks as though you have an RDD of class type Row; so you'll need to also provide a schema to the createDataFrame() method. Scala API docs: https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.SQLContext import org.apache.spark.sql._
import org.apache.spark.sql.types._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val schema =
StructType(
StructField("name", StringType, false) ::
StructField("age", IntegerType, true) :: Nil)
val people =
sc.textFile("examples/src/main/resources/people.txt").map(
_.split(",")).map(p => Row(p(0), p(1).trim.toInt))
val dataFrame = sqlContext.createDataFrame(people, schema)
dataFrame.printSchema
// root
// |-- name: string (nullable = false)
// |-- age: integer (nullable = true)
dataFrame.createOrReplaceTempView("people")
sqlContext.sql("select name from people").collect.foreach(println)
... View more
11-29-2016
07:46 PM
Thank you very much, its working fine now.
... View more
11-15-2016
02:09 PM
@Iyappan Gopalakrishnan Adding new users to a secured NiFI 0.x / HDF 1.x version of NiFi can be done directly via the NIFi UI. I am assuming you have already secured your NiFi and manually added the initial admin to the authorized-users.xml file. Adding new users is pretty easy. Simply have the new user attempt to access the https web address for your nifi 0.7 install. After authenticating successfully (Done via client certs, ldap, etc), they will see a screen telling them they are not authorized for access and giving them the opportunity to request access. Once they request access, the "Admin" user who does have access to the UI will need need to go into the users UI by clicking on the below Icon which can be found in the upper right corner of the NiFi UI: From within that UI you will see all existing users and any pending authorization users. Simply click on the pencil icon to the right of any pending users to authorize them for 1 or more of the available user roles (Administrator, Dataflow Manger, Read Only, or Provenance). The administration-guide included with your install can give you more details on each of the roles and what privileges each grants to the users. Thanks, Matt
... View more
10-18-2016
11:08 PM
1 Kudo
HiveServer2 does not expose jmx or equivalent metrics but often we require to monitor the Hiveserver2 process to know cpu usage, heap usage and state of various thread along with some other params like no of class loaded by hs2 jvm. it is also very useful if you want to run some cpu/memory sampling to profile hiveserver2. Step 1: login into Ambari and open hive advanced configuration Step 2: expand advanced hive-env settings and add the following configuration in hive-env template if [ "$SERVICE" = "hiveserver2" ]; then
export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=8008"
fi
Step 3: Restart HiveServer2 Step 4: on client machine open jvisualvm($JAVA_HOME/bin), once open go to File -> Add JMX Connection -> enter hostname and port no(it's 8008 in this case) and you are good to go.
... View more
Labels:
10-09-2016
10:16 PM
1 Kudo
You are welcome. Glad it worked.
... View more
10-11-2016
01:31 AM
6 Kudos
@Rajkumar Singh All broker configurations can be found in Kafka /conf folder. Broker configuration are stored in files with names like server.properties. There will be one server.properties per broker, usually named server1.properties, server2.properties etc. +++++++++ If any of the responses was helpful, please don't forget to vote and accept the best answer to your question.
... View more