Member since
03-16-2016
707
Posts
1753
Kudos Received
203
Solutions
10-06-2017
06:21 PM
@uday kv See this new article: https://community.hortonworks.com/articles/141035/jmeter-kerberos-setup-for-hive-load-testing.html
... View more
08-12-2016
11:03 PM
3 Kudos
Introduction This article is not meant to show how to install or create a
“Hello World” Nifi data flow, but how to resolve a data filtering problem with
NiFi providing two approaches, using a filter list as a file on the disk, which could be
static or dynamic, and a list stored in a distributed cache populated from the
same file. The amount of data used was minimal and simplistic and no
performance difference can be perceived, however, at scale, where memory is
available, a caching implementation should perform better. This article assumes some familiarity with NiFi, knowing
what a processor or a queue is and how to set the basic configurations for a
processor or a queue, also how to visualize the data at various steps throughout
the flow, starting and stopping processors. Since you are somehow familiar with Nifi, you probably know
how to install it and start it, however, I will provide a quick refresher
below. Pre-requisites For this demo, I used the latest version of Nifi available
at the date of working on this demo, 0.6.1.
This version was not part of the HDP 2.4.2 which was available at the
time of this demo, it has also 0.5.1. HDP 2.5 was just launched last month at
the Hadoop Summit in Santa Clara. If you wanted for your OSX installation to use brew install nifi that will only install nifi 0.5.1 which
does not have some of the features needed for the demo, e.g.
PutDistributedCacheMap or FetchDistributedCacheMap. Instead, use the following steps: A reference about downloading and installing on Linux and Mac is here. You can download Nifi 0.6.1 from any of the sites listed
there, for example: https://www.apache.org/dyn/closer.lua?path=/nifi/0.6.1/nifi-0.6.1-bin.tar.gz I prefer wget and installing my apps to /opt cd /opt
wget http://supergsego.com/apache/nifi/0.6.1/nifi-0.6.1-bin.tar.gz That will download a 421.19MB file tar –xvf nifi-0.6.1-bin.tar.gz
ls -l and here is your /opt/nifi-0.6.1 cd /opt/nifi-0.6.1/bin That is your NIFI_HOME. ./nifi.sh start Open a browser and type: http://localhost:8080/nifi You will need to import the NiFi .xml template posted in my github
repo, mentioned earlier. Clone it to your local folder of preference, assuming that you have a
git client installed: git clone https://github.com/cstanca1/nifi-filter.git After importing the model, instantiate it. It will show as the following: Required Changes In order for the template to work for your specific folder structure, you will need to make a few changes to tell GetFile processor (right-click on Get File
processor header, View Configuration, Properties tab, Input Directory, from where
to go to get the data). Keep in mind that the GetFile processor once started it
will read the file and delete it. If you want to re-feed it for test, you just
have to drop it again in the same folder and it will re-ingest it. You can also
place multiple files of the same structure in that folder and they will be
ingested all and every line. In real-life, GetFile can be replaced with a
different processor capable to read from an actual log. For this demo, I used a
static file as an input. Also, enable and start DistributedMapCacheServer Controller Service.This is required for the put and fetch
distributed cache. The DistributedMapCacheServer can be started just like any other Controller Service, by configuring it to be valid and hit the "start" button. The unique thing about the DistributedMapCacheServer is that processors work with the cache by utilizing a DistributedMapCacheClientService. So you will create both a Server and Client Service. Then configure the processor to use the Client Service. Next start both the server and service. Finally start the processor. Test Data For your test, you can use the two files
checked-in to the git repo that you just cloned locally: macaddresses-blacklist.txt and macaddresses-log.txt. macaddresses-blacklist.txt is a list of
blacklisted mac addresses which will be used to filter the incoming stream fed
by macaddresses-log.txt using GetFile ingest, line by line. To understand what happens step-by-step, I
suggest to start each processor and inspect the queue and data lineage. Populating
DistributedMapCache is performed in the flow presented on the right side of the
model that you imported at the previous step. The filtering flow, via Scan Attribute or
FetchDistributedCacheMap: Use of GetFile, SplitText and ExtractText processor is well
documented and a basic Google will return several good examples, however, a
good example of how to use FetchDistributedMapCache and PutDistributedMapCache
is not that well documented. That was the main reason to write this article. I
could not find another good reference. I am sure others felt the same way and
hopefully this helps. ScanAttribute
Approach Before starting this processor, you need to right click on
its header, choose Configuration and go to Properties tab and change the
Dictionary file to be your macaddress-blacklist.txt. This is a clone of the
same file you use in GetFile processor, but I suggest to put it in a separate
folder as such GetFile will not ingest it and delete it after use. This needs to
be permanent like a lookup file on the disk. SplitText is used to split the file line by line. You can
check this property by righ-clicking the header of the processor and choosing “Properties”
tab. Line Split Count is set to 1. ExtractText processor uses a custom regex to extract the mac
address from macaddress-log.txt. You can
find in Properties, last property in the list. ScanAttribute processor sets the Dictionary File to the
folder/file of choice. In this demo, I used macaddresses-blacklist.txt file
included in the repo that you cloned at one of the previous steps. DistributedCache
Approach The right branch of the flow in the left uses the
DistributedCache populated by the flow on the right of the model. Inspect each
processor by checking each processor properties. They are already similar with
the first half of the flow on the left, excepting the use of
PutDistributedCache processor which sets the Cache Entry Identifier for
mac.address value. I’ll refer only to the consumption of the mac.address
property value set by PutDistributedCache. Set Cache Entry Identifier to the same mac.address Please note that DistributedMapCacheClientService is
enabled. You can achieve that by clicking on NiFi Flow Settings icon, fourth on
the right corner, "Controller Services" Learning Technique Don’t forget to start all processors and inspect the queues.
My approach is to start processors one at the time in the order of the data
flow and processing and check all the stats and lineage on connection queues.
This is what I love about NiFi, it is so easy to test and learn. Credit Thanks to Simon Ball for taking a few minutes of his time to
review the model on DistributedCache approach. Conclusion Dynamic filtering has large applicability in any type of simple event processing. I am sure that there are
many other ways to skin the same cat with NiFi. Enjoy!
... View more
Labels:
07-14-2016
02:21 AM
Multi-Polygon example: SELECT st_astext(ST_MultiPolygon('multipolygon (((0 0, 0 1, 1 0, 0 0)), ((2 2, 2 3, 3 2, 2 2)))'))
from YourTable LIMIT 1;
... View more
04-18-2017
03:45 PM
@rbiswas What about adding new host and assign it at the same time to a specific rack? I'd like to avoid the host (data node) and then have to set the rack and restart again. Is there a way via Ambari UI?
... View more
07-06-2016
09:32 PM
@Alex McLintock You are correct. This is a local reference to a remote repo. Here is a reference for an actual local repo with no Internet access, but it gets complicated with Vagrant: https://docs.hortonworks.com/HDPDocuments/Ambari-2.2.2.0/bk_Installing_HDP_AMB/content/_setting_up_a_local_repository_with_no_internet_access.html
... View more
06-20-2017
04:59 AM
Hi, Is there any steps to take our own kafka server metrics to elasticsearch..because we have grafana which will have all the dashboards but some of our project requirement need to keep few kafka metrics in kibana visualization.so we want to index the kafka metrics logs to elasticsearch. Can we consume kafka metrics into elasticsearch ?
... View more
05-24-2016
03:53 PM
@Constantin Stanca what do you mean by "Additionally, you need to start the JVM with something like this in order to be able to truly access the JVM remotely"? JVM start as they normally do to use this tool.
... View more
05-27-2016
09:34 PM
@Vladimir Zlatkin Updated section "NUMA optimization" to include a link to OS CPU optimizations for RHEL. "Spark applications performance could be improved not only by configuring various Spark parameters and JVM options, but also using the operating system side optimizations, e.g. CPU affinity, NUMA policy, hardware performance policy etc. to take advantage of the most recent hardware NUMA capable." The referenced section is vast. We could qualify some of the settings to best choices. This could be a follow-up article if it part 1 presented real interest.
... View more
05-17-2016
10:59 PM
16 Kudos
This article is a continuation of Monitoring
Kafka with Burrow - Part 1. Before diving into evaluation rules, HTTP endpoint API and notifiers, I would like to point-out a few other tools that are utilizing Burrow.
Burrower (http://github.com/splee/burrower), a tool for gathering consumer lag information from Burrow and sending it into influxDB ansible-burrow (https://github.com/slb350/ansible-burrow) provides an Ansible role for installing Burrow. Consumer Lag Evaluation Status The status of a consumer group in Burrow is determined based on
several rules evaluated against the offsets for each partition the group
consumes. Thus, there is no need for
setting a discrete threshold for the number of messages a consumer is allowed
to be behind before alerts go off. By evaluating against every partition the
group consumes, the entire consumer group health status is evaluated, and not
just the topics that are being monitored. This is very important for wildcard consumers, such as Kafka Mirror Maker. Window The lagcheck configuration determines the length of the sliding window, specifying the
number of offsets to store for each partition that a consumer group consumes. This window moves forward with each offset the
consumer commits (the oldest offset is removed when the new offset is added). For
each consumer offset, the following are stored: the offset itself, the timestamp that the
consumer committed it, and the lag at the point Burrow received it. The lag is
calculated as the difference between the head offset of the broker and the
consumer's offset. Because broker offsets are fetched on a fixed interval, the result could be a negative number, however, by convention, the
stored lag value is zero. Rules The
following rules are used for evaluation of a group's status for a given
partition: If
any lag within the window is zero, the status is considered to be OK. If
the consumer offset does not change over the window, and the lag is either
fixed or increasing, the consumer is in an ERROR state, and the partition
is marked as STALLED. If
the consumer offsets are increasing over the window, but the lag either
stays the same or increases between every pair of offsets, the consumer is
in a WARNING state. This means that the consumer is slow, and is falling
behind. If
the difference between the time now and the time of the most recent offset
is greater than the difference between the most recent offset and the
oldest offset in the window, the consumer is in an ERROR state and the
partition is marked as STOPPED. However, if the consumer offset and the
current broker offset for the partition are equal, the partition is not considered
to be in error. If
the lag is -1, this is a special value that means we do not have a broker
offset yet for that partition. This only happens when Burrow is starting
up, and the status is considered to be OK. HTTP Endpoint API The HTTP Server in Burrow provides a convenient way to interact
with both Burrow and the Kafka and Zookeeper clusters. Requests are simple HTTP
calls and all responses are formatted as JSON. For bad requests,
Burrow will return an appropriate HTTP status code in the 400 or 500 range. The
response body will contain a JSON object with more detail on the particular
error encountered. Examples of requests: Request URL Path Description Healthcheck GET /burrow/admin Healthcheck of Burrow, whether for monitoring or load balancing within a VIP. List Clusters T /v2/kafka GET /v2/zookeeper List of the Kafka clusters that Burrow is configured with. Kafka Cluster Detail GET /v2/kafka/(cluster) Detailed information about a single cluster, specified in the URL. This will include a list of the brokers and zookeepers that Burrow is aware of. List Consumers GET /v2/kafka/(cluster)/consumer List of the consumer groups that Burrow is aware of from offset commits in the specified Kafka cluster.
Remove Consumer Group DELETE /v2/kafka/(cluster)/consumer/(group) Removes the offsets for a single consumer group from a cluster. This is useful in the case where the topic list for a consumer has changed, and Burrow believes the consumer is consuming topics that it no longer is. The consumer group will be removed, but it will automatically be repopulated if the consumer is continuing to commit offsets. List Consumer Topics GET /v2/kafka/(cluster)/consumer/(group)/topic List of the topics the topics that Burrow is aware of from offset commits consumed by the specified consumer group in the specified Kafka cluster.
Consumer Topic Detail GET /v2/kafka/(cluster)/consumer/(group)/topic/(topic) Most recent offsets for each partition in the specified topic, as committed by the specified consumer group. Consumer Group Status GET /v2/kafka/(cluster)/consumer/(group)/status or GET /v2/kafka/(cluster)/consumer/(group)/lag Current status of the consumer group, based on evaluation of all partitions it consumes. The evaluation is performed on request, and the result is calculated based on the consumer lag evaluation rules. There are two versions of this request. The endpoint "/status" will return an object that only includes the partitions that are in a bad state. The endpoint "/lag" will return an object that includes all partitions for the consumer, regardless of the evaluated state of the partition. The second version can be used for full reporting of consumer message lag on all partitions. List Cluster Topics GET /v2/kafka/(cluster)/topic List of the topics in the specified Kafka cluster. Cluster Topic Detail GET /v2/kafka/(cluster)/topic/(topic) Head offsets for each partition in the specified topic, as retrieved from the brokers. Note that these brokers may be up to the number of seconds old specified by the broker-offsets configuration parameter. List Clusters GET /v2/kafka GET /v2/zookeeper List of the Kafka clusters that Burrow is configured with. Notifiers Two notifier modules are available to configure to check and report consumer group status: email and HTTP. Email Notifier The email notifier is used
to send out emails to a specified address whenever a consumer group is in a bad
state. Multiple groups can be configured for a single email address, and the
interval to check the status on (and send out emails on) is configurable per
email address. Before configuring
any email notifiers, the [smtp] section needs to be configured in Burrow
configuration file. Example of configuration: [smtp]
server=mailserver.example.com
port=25a
auth-type=plain
username=emailuser
password=s3cur3!
from=burrow-noreply@example.com
template=config/default-email.tmpl Multiple email
notifiers can be configured in the Burrow configuration file. Each notifier
configuration resides in its own section. Example of configuration: [email "bofh@example.com"]
group=local,critical-consumer-group
group=local,other-consumer-group
interval=60 The email that is sent is formatted according to the template
specified in the [smtp] configuration section. A default template is provided
as part of the Burrow distribution in theconfig/default-email.tmplfile. The template format is the
standard Golang
text template. There are several good posts available on how to
compose Golang templates:
http://andlabs.lostsig.com/blog/2014/05/26/8/the-go-templates-post
http://jan.newmarch.name/go/template/chapter-template.html
http://golangtutorials.blogspot.com/2011/06/go-templates.html A timer is set up
inside Burrow to fire everyintervalseconds and check the listed consumer
groups. The current status is requested for each group, and if any group in the
list is not in an OK state, an email is sent out with the status of all groups.
This means that the email can contain listings for both good and bad groups,
but no email will be sent out if everything is OK. HTTP Notifier The HTTP notifier reports error states for all consumer groups
to an external HTTP endpoint via POST requests. DELETE
requests can be also sent to the same endpoint when a consumer group returns to normal. The HTTP notifier is used to
send POST requests to an external endpoint, such as for a monitoring or
notification system, on a specified interval whenever a consumer group is in a
bad state. This notifier operates on all consumer groups in all clusters
(excluding groups matched by the blacklist). Incidents of a consumer group
going bad have a unique ID generated that is maintained until that group
transitions back to a good state. This allows notification systems to handle incidents,
rather than individual reports of consumer group status, if needed. The configuration for the HTTP notifier is specified under a
heading [httpnotifier]. This is where is configured the URL to connect to, as
well as the templates to use for POST and DELETE request bodies. Extra fields can be provided as they are provided to the template. An example HTTP
notifier configuration looks like this: [httpnotifier]
url=http://notification.server.example.com:9000/v1/alert
interval=60
extra=field1=custom information
extra=field2=special info to pass to template
template-post=config/default-http-post.tmpl
template-delete=config/default-http-delete.tmpl
timeout=5
keepalive=30
The request body that is sent is with each HTTP request is
formatted according to the templates specified. A default template is provided
as part of the Burrow distribution in theconfig/default-http-post.tmplandconfig/default-http-delete.tmplfiles. The template format is the standard Golang
text template. There are several good posts available on how to
compose Golang templates:
http://andlabs.lostsig.com/blog/2014/05/26/8/the-go-templates-post http://jan.newmarch.name/go/template/chapter-template.html http://golangtutorials.blogspot.com/2011/06/go-templates.html A timer is set up inside
Burrow to fire everyintervalseconds. When the timer fires, all
consumer groups in all Kafka clusters are enumerated and the current status is
requested for each group. For each group that is not in an OK state, a unique
ID is generated (if it does not already exist) and a POST request is generated
for that group. For each group that is in an OK state, a check is performed as
to whether or not an ID exists for that group currently. If it does, the ID is
removed (as the group has transitioned to OK). If the DELETE template is
specified, a DELETE request is generated for that group. Conclusion The most important metric to watch is whether or not the consumer is keeping up with the messages that are being produced. Until Burrow, the fundamental approach was to monitor the consumer lag and alert on that number. Burrow monitors the consumer lag and keeps track of the health of the consuming application automatically monitoring all consumers, for every partition that they consume. It does this by consuming the special internal Kafka topic to which consumer offsets are written. Burrow provides consumer information as a centralized service that is separate from any single consumer, based on the offsets that the consumers are committing and the broker's state.
... View more
Labels:
- « Previous
- Next »