Member since
03-07-2019
158
Posts
53
Kudos Received
33
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 8076 | 03-08-2019 08:46 AM |
04-08-2019
12:53 PM
@Rabah Houaoui Sounds like maybe an issue from the api_version used. Are you trying this out on HDP 3.1 or a different HDP or HDF version? Could you share the full error stack?
... View more
03-25-2019
12:54 PM
4 Kudos
Short Description: This article covers how to run a simple producer and consumer in kafka-python (1.4.5) on Kafka 2.0.0, within a kerberized HDP 3.1.0 cluster. Article Running a producer in a kerberized HDP 3.1 Kafka 2.0.0 environment using kafka-python Covering pre-requisites Validating the kerberos setup of kafka on HDP 3.1 Running a producer and consumer in kafka-python 1.4.5 on a kerberized kafka cluster Pre-Requisites You are running a kerberized HDP 3.1.0 cluster with Kafka 2.0.0 installed You are running one of the following python versions: 2.7 , 3.4 , 3.5 , 3.6 , 3.7 Note that in the example from this article, I am using SASL_PLAINTEXT. You will need to modify some parameters if you are using SASL_SSL, as well as add some ssl producer/consumer parameters will need to be specified in your kafka-python code in addition. Please also take note of the current kafka-python compatibility notes: https://kafka-python.readthedocs.io/en/master/compatibility.html Steps We will first need to install the gssapi libraries for python in order to be able to use kafka-python with kerberos. We will also need to install kafka-python if you have not already done so. I will be using the latest kafka-python version currently available, which is version 1.4.5 at the time of writing this article. pip install gssapi
pip install kafka-python Make sure that Kerberos is correctly enabled for kafka on your cluster. We will want to verify that the kafka broker properties are set to listen on SASL_PLAINTEXT through ambari -> kafka -> configs: Also verify that GSSAPI is listed in the sasl.enabled.mechanisms property: Next, we obtain a Kerberos token before running our kafka-python producer. [kafka@c2175-node4 ~]$ kinit -kt /etc/security/keytabs/kafka.service.keytab kafka/c2175-node4.hwx.com@HWX.COM
[kafka@c2175-node4 ~]$ klist Ticket cache: FILE:/tmp/krb5cc_1006 Default principal: kafka/c2175-node4.hwx.com@HWX.COM
Valid starting Expires Service principal
03/25/2019 11:59:26 03/26/2019 11:59:26 krbtgt/HWX.COM@HWX.COM We will be running the following code from python: from kafka import KafkaProducer
import logging
logging.basicConfig(level=logging.DEBUG)
producer = KafkaProducer(bootstrap_servers='c2175-node4.hwx.com:6667', api_version='0.10', security_protocol='SASL_PLAINTEXT', sasl_mechanism='GSSAPI', sasl_kerberos_service_name='kafka', connections_max_idle_ms=1200000, request_timeout_ms=1000000, api_version_auto_timeout_ms=1000000)
for item in range(10):
producer.send('new_topic', b'Happy '+ str(item)+' Days!') Let me explain some of the parameters in the above python block. Also note that in the code example given I’ve enabled debug logging. This is of course not required in order to run the producer but it will help you in case you run into any errors. boostrap_servers List of kafka brokers (FQDN) and port (eg 6667) Api_version Kafka 2.0+ is backwards compatible, within kafka-python 1.4.5 we can’t set the api_version to 2,0,0 as this is not yet available from kafka-python but we can run other versions that are compatible on kafka-python version 1.4.5 with kafka 2.0.0. I used api version 0.10 for the examples given in this article. Security_protocol SASL_PLAINTEXT This would be SASL_SSL in case you’re enabling wire encryption on kafka at some stage. Sasl_mechanism GSSAPI For this to work, you need to install the GSSAPI libraries for python manually first. Plaintext for sasl_mechanism (not to be confused with SASL_plaintext!) WILL NOT WORK. Sasl_kerberos_service_name This defaults to ‘kafka’. If your kafka service is running under a different user, you would have to change this parameter accordingly. Here is what running the kafka-python on a kerberized HDP 3.1 cluster looks like; [kafka@c2175-node4 ~]$ klist
Ticket cache: FILE:/tmp/krb5cc_1006
Default principal: kafka/c2175-node4.hwx.com@HWX.COM
Valid starting Expires Service principal
03/25/2019 12:24:18 03/26/2019 12:24:18 krbtgt/HWX.COM@HWX.COM
[kafka@c2175-node4 ~]$ python
Python 2.7.5 (default, Apr 11 2018, 07:36:10)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from kafka import KafkaProducer
>>> import logging
>>> logging.basicConfig(level=logging.DEBUG)
>>> producer = KafkaProducer(bootstrap_servers='c2175-node4.hwx.com:6667', api_version='0.10', security_protocol='SASL_PLAINTEXT', sasl_mechanism='GSSAPI', sasl_kerberos_service_name='kafka', connections_max_idle_ms=1200000, request_timeout_ms=1000000)
DEBUG:kafka.producer.kafka:Starting the Kafka producer
WARNING:kafka.producer.kafka:use api_version=(0, 10) [tuple] -- "0.10" as str is deprecated
DEBUG:kafka.metrics.metrics:Added sensor with name connections-closed
DEBUG:kafka.metrics.metrics:Added sensor with name connections-created
DEBUG:kafka.metrics.metrics:Added sensor with name select-time
DEBUG:kafka.metrics.metrics:Added sensor with name io-time
DEBUG:kafka.metrics.metrics:Added sensor with name bufferpool-wait-time
DEBUG:kafka.metrics.metrics:Added sensor with name batch-size
DEBUG:kafka.metrics.metrics:Added sensor with name compression-rate
DEBUG:kafka.metrics.metrics:Added sensor with name queue-time
DEBUG:kafka.metrics.metrics:Added sensor with name produce-throttle-time
DEBUG:kafka.metrics.metrics:Added sensor with name records-per-request
DEBUG:kafka.metrics.metrics:Added sensor with name bytes
DEBUG:kafka.metrics.metrics:Added sensor with name record-retries
DEBUG:kafka.metrics.metrics:Added sensor with name errors
DEBUG:kafka.metrics.metrics:Added sensor with name record-size-max
DEBUG:kafka.producer.kafka:Kafka producer started
DEBUG:kafka.producer.sender:Starting Kafka producer I/O thread.
>>> DEBUG:kafka.client:Initializing connection to node bootstrap for metadata request
DEBUG:kafka.client:Initiating connection to node bootstrap at 172.25.38.138:6667
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-sent-received
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-sent
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-received
DEBUG:kafka.metrics.metrics:Added sensor with name request-latency
DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap.bytes-sent
DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap.bytes-received
DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap.latency
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <disconnected> [IPv4 None]>: creating new socket
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <disconnected> [IPv4 ('172.25.38.138', 6667)]>: setting socket option (6, 1, 1)
INFO:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <connecting> [IPv4 ('172.25.38.138', 6667)]>: connecting to 172.25.38.138:6667 [('172.25.38.138', 6667) IPv4]
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <connecting> [IPv4 ('172.25.38.138', 6667)]>: established TCP connection
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <connecting> [IPv4 ('172.25.38.138', 6667)]>: initiating SASL authentication
DEBUG:kafka.protocol.parser:Sending request SaslHandShakeRequest_v0(mechanism='GSSAPI')
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <authenticating> [IPv4 ('172.25.38.138', 6667)]> Request 1: SaslHandShakeRequest_v0(mechanism='GSSAPI')
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.protocol.parser:Received correlation id: 1
DEBUG:kafka.protocol.parser:Processing response SaslHandShakeResponse_v0
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <authenticating> [IPv4 ('172.25.38.138', 6667)]> Response 1 (1.20496749878 ms): SaslHandShakeResponse_v0(error_code=0, enabled_mechanisms=[u'GSSAPI'])
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <authenticating> [IPv4 ('172.25.38.138', 6667)]>: GSSAPI name: kafka/c2175-node4.hwx.com@HWX.COM
INFO:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <authenticating> [IPv4 ('172.25.38.138', 6667)]>: Authenticated as kafka/c2175-node4.hwx.com@HWX.COM via GSSAPI
INFO:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <authenticating> [IPv4 ('172.25.38.138', 6667)]>: Connection complete.
DEBUG:kafka.client:Node bootstrap connected
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Sending metadata request MetadataRequest_v1(topics=NULL) to node bootstrap
DEBUG:kafka.protocol.parser:Sending request MetadataRequest_v1(topics=NULL)
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <connected> [IPv4 ('172.25.38.138', 6667)]> Request 2: MetadataRequest_v1(topics=NULL)
DEBUG:kafka.protocol.parser:Received correlation id: 2
DEBUG:kafka.protocol.parser:Processing response MetadataResponse_v1
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=172.25.38.138:6667 <connected> [IPv4 ('172.25.38.138', 6667)]> Response 2 (4.12797927856 ms): MetadataResponse_v1(brokers=[(node_id=1001, host=u'c2175-node4.hwx.com', port=6667, rack=None)], controller_id=1001, topics=[(error_code=0, topic=u'test3', is_internal=False, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic=u'__consumer_offsets', is_internal=True, partitions=[(error_code=0, partition=23, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=41, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=32, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=8, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=17, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=44, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=35, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=26, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=11, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=29, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=38, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=47, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=20, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=2, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=5, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=14, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=46, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=49, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=40, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=4, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=13, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=22, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=31, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=16, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=7, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=43, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=25, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=34, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=10, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=37, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=1, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=19, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=28, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=45, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=36, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=27, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=9, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=18, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=21, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=48, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=12, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=3, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=30, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=39, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=15, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=42, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=24, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=33, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=6, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic=u'ambari_kafka_service_check', is_internal=False, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic=u'foobar', is_internal=False, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic=u'new_topic', is_internal=False, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])])])
DEBUG:kafka.cluster:Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 5, groups: 0)
>>> for item in range(10):
... producer.send('new_topic', b'Happy '+ str(item)+' Days!')
...
DEBUG:kafka.producer.kafka:Sending (key=None value='Happy 0 Days!' headers=[]) to TopicPartition(topic='new_topic', partition=0)
DEBUG:kafka.producer.record_accumulator:Allocating a new 16384 byte message buffer for TopicPartition(topic='new_topic', partition=0)
DEBUG:kafka.producer.kafka:Waking up the sender since TopicPartition(topic='new_topic', partition=0) is either full or getting a new batch
<kafka.producer.future.FutureRecordMetadata object at 0x7f5fc2c99410>
DEBUG:kafka.producer.kafka:Sending (key=None value='Happy 1 Days!' headers=[]) to TopicPartition(topic='new_topic', partition=0) Similarly, here is what a simple consumer in kafka-python 1.4.5 on an HDP 3.1 kerberized cluster looks like; from kafka import KafkaConsumer
import logging
logging.basicConfig(level=logging.DEBUG)
consumer = KafkaConsumer('new_topic', bootstrap_servers='c2175-node4.hwx.com:6667', api_version='0.10', security_protocol='SASL_PLAINTEXT', sasl_mechanism='GSSAPI', sasl_kerberos_service_name='kafka', connections_max_idle_ms=1200000, request_timeout_ms=1000000)
for msg in consumer:
print (msg) =============================================== If you are running Centos 7.x and encountering the following during the install of gssapi: Exception: Could not find main GSSAPI shared library. Please try setting GSSAPI_MAIN_LIB yourself or setting ENABLE_SUPPORT_DETECTION to 'false' This error is thrown because /bin/krb5-config is missing. To satisfy this requirement: yum install -y krb5-devel =============================================== Special thanks to Akshay Mankumbare
... View more
Labels:
03-22-2019
03:44 PM
Excellent article @Pedro Andrade !
... View more
03-08-2019
08:46 AM
1 Kudo
Hi @Michael Bronson You are specifying /folder/*.jar. If you want the .jar files from one level deeper, you would specify /folder/*/*.jar. Or, here is an alternative example. [hdfs@c2175-node4 stuff]$ hdfs dfs -find /tmp -name *.jar
/tmp/somefolder/y.jar
/tmp/x.jar
[hdfs@c2175-node4 stuff]$ for result in `hdfs dfs -find /tmp -name *.jar` ; do hdfs dfs -copyToLocal $result; done
[hdfs@c2175-node4 stuff]$ ls -al
-rw-r--r-- 1 hdfs hadoop 0 Mar 8 08:43 x.jar
-rw-r--r-- 1 hdfs hadoop 0 Mar 8 08:43 y.j
... View more
09-12-2018
10:00 AM
@Sandeep Nemuri Thank you sir!
... View more
09-12-2018
09:56 AM
Thank you @Jay Kumar SenSharma ! 🙂
... View more
09-12-2018
09:52 AM
6 Kudos
Short Description:
This article covers steps on adding a Spark 2 Dashboard to Grafana in Ambari-Metrics, in order to
monitor Spark applications for detailed resource usage statistics.
Article Monitoring Spark 2 Performance via Grafana in Ambari-Metrics
Configuration of Advanced spark2-metrics-properties
Adding a new data source to Grafana
Building a Spark 2 dashboard in Grafana Pre-Requisites
You are running HDP 2.6.4 or higher, with Spark2
and Ambari-Metrics installed.
A standalone Grafana installation may be used
instead of Ambari-Metrics.
A time-series database with support for the graphite protocol.
In this article, I will be use Graphite.
Alternatively, Prometheus with the graphite-exporter plugin can be used depending on personal preference.
Steps
In order for Spark components to forward metrics to our time-series
database, we need to add a few items to our configuration in Ambari ->
Spark2 -> Configs -> Advanced spark2-metrics-properties.
A restart of the Spark2 service is required for our new metrics properties to take
effect. We don’t need to include any references to these properties when
submitting spark jobs as the metrics properties are included by default for any
new jobs. I am using the configuration below, with a sink interval of 30
seconds. You can adjust the *.sink.graphite.period to your needs, with a
minimum
value of 1. For long running applications I found 30 seconds to be
sufficient.
Here are the metrics.properties that I am using for this setup:
*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.protocol=tcp
*.sink.graphite.host=172.0.0.1
*.sink.graphite.port=2003
*.source.jvm.class=org.apache.spark.metrics.source.JvmSource
*.sink.graphite.period=30
*.sink.graphite.unit=seconds
master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
In the above sample, please adjust the graphite.host and graphite.port for your Graphite host
and Carbon-Cache plaintext port (default port 2003).
Before we add our
new source to Grafana, let’s make sure we have some data available in Graphite.
Kick off your usual spark jobs or alternatively we can generate some data using
the example jars. I will be using spark
terasort
jobs for the next step as I want my spark jobs to generate some HDFS read
& writes.
If you have a terasort
jar built, you may follow the example that I'm using below. Please note that this sample will generate 20GB
of data in /home/hdfs. You can adjust the size or the directory. I have a small cluster with several worker nodes, so I'm using the cluster deploy-mode.
/usr/bin/spark-submit --class com.github.ehiggs.spark.terasort.TeraGen --master yarn --deploy-mode cluster \
--num-executors 3 --executor-memory 2G /mvn/spark-terasort/target/spark-terasort-1.1-SNAPSHOT.jar 20g /home/hdfs
Within a few
moments of submitting this job, we can see my application showing up in the tree
section of the Graphite web UI. When I select the write_bytes of my executor, you can see that it has already finished writing the 20GB of data:
Let’s get this data
showing on our Ambari-Metrics Grafana instance, so we have everything in
one place. We will have to add this Graphite instance a new data source to our Grafana instance.
Let’s open Grafana, via Ambari -> Ambari Metrics and selecting Grafana from the Quick
Links dropdown.
Once in the Grafana
UI, hit “sign in” on the left-hand side. Login as the Grafana Admin user. If
you have lost the Grafana Admin credentials, they can be changed in the
General Config section of Ambari-Metrics through Ambari.
Now that we are
logged in to Grafana, we will add our new Graphite data source before we can
build a dashboard for our new spark metrics. Select to Data Sources on the left-hand
side and select “Add new” in the top of the screen. Here I am adding my “Graphite-Spark”
source with Type Graphite. I did not add any authentication to Graphite, so my Http
Auth section is left empty. Access is set to direct. After you click Save, the
option “Test Connection” will be available.
Now we are ready to
add our Spark Dashboard. On the left menu in Grafana, select "Dashboards" and then on the “Home”
dropdown in the top, we will see the “+New” option as is shown here;
Go ahead and hit
the '+New' button to start a new dashboard. The first thing we will do on our new dashboard is adding a
Templating system. With the templating system, we can select the application through a dropdown menu in order to better filter the graphs down to a single application at a time, otherwise it will be difficult to keep an overview when multiple spark applications are running at the same time. Below is the templating configuration that I'm using, with the simple Query "application_*", in the preview at the bottom we can verify that this extracts our application IDs which will populate the dropdown;
I'm also changing the name for my dashboard from "New dashboard" to "Spark-Graphite-Dashboard" under Settings;
You’ll also notice
we have a drop-down menu where we can select the application we are
investigating, resulting from our templating setup. We can now move on to adding the first graph. Let’s make it something useful, say I want to monitor
the JVM statistics of the application driver, I can make a graph using
$application (from our templating) driver – jvm – heap – * with
aliasByMetric(). I have also set the left Y axis to “bytes” under Axes &
Grid, to make the size easier to read. You should also make sure that you have
selected the correct data source (Graphite-Spark in my case). The result and Query setup looks like
this:
Since our spark application is also generating files in HDFS, I want to monitor the amount of data written
by my spark application. I can do that with the following example below. Note that we have to ensure we select the right data source for the graph in the bottom right side:
This graph shows me
how much data has been written by whichever application variable selected in the “application”
drop down menu of the dashboard. You may also use write_ops for the IOPS figure or substitute write
with read to have the read statistics. We are now set up with our new datasource and example graphs to get started!
... View more
Labels: