Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Super Collaborator

How you can easily create a Kibana dashboard for monitoring your Metron deployment

Suppose you have all your Metron topologies up-and-running and error free. Your work is not finished then. Metron is meant to be a (near) real time alerting system, so how do you guarantee that events flow through the Metron chain of Kafka topics and Storm topologies in a timely manner? What if some events belonging to some threat are hung up in one of the Kafka topics? How to keep track of the topology performance when the throughput of Metron increases over time?

The way to check this is to constantly monitor the consumer lags of the Kafka topics, like this:

**
/usr/hdp/2.5.3.0-37/kafka/bin/kafka-consumer-offset-checker.sh --zookeeper $ZOOKEEPER_HOST:2181 --security-protocol SASL_PLAINTEXT --topic indexing --group indexing


[2017-10-09 21:10:20,278] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group           Topic                          Pid Offset          logSize         Lag             Owner
indexing        indexing                       0   511907295       511907378       83              none
indexing        indexing                       1   511907258       511907387       129             none
indexing        indexing                       2   511907294       511907381       87              none
indexing        indexing                       3   511907138       511907384       246             none
indexing        indexing                       4   511907226       511907378       152             none
indexing        indexing                       5   511907271       511907376       105             none
indexing        indexing                       6   511907250       511907381       131             none
indexing        indexing                       7   511907249       511907379       130             none
indexing        indexing                       8   511907248       511907383       135             none
indexing        indexing                       9   511907283       511907381       98              none
indexing        indexing                       10  511907260       511907383       123             none
indexing        indexing                       11  511907127       511907379       252             none

**We'll assume the cluster is Kerberised.

Or alternatively:

echo "security.protocol=SASL_PLAINTEXT" > /tmp/consgroupcmd.prop
/usr/hdp/2.5.3.0-37/kafka/bin/kafka-consumer-groups.sh --bootstrap-server $BROKER_LIST --new-consumer --describe --command-config /tmp/consgroupcmd.prop --group enrichments

GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
enrichments                    enrichments                    8          260926936       260927064       128
enrichments                    enrichments                    3          260926857       260927060       203
enrichments                    enrichments                    0          260927011       260927062       51
enrichments                    enrichments                    1          260926916       260927068       152
enrichments                    enrichments                    6          260926971       260927059       88 
enrichments                    enrichments                    10         260926914       260927066       152
enrichments                    enrichments                    9          260926970       260927059       89 
enrichments                    enrichments                    11         260926978       260927066       88 
enrichments                    enrichments                    4          260926985       260927066       81
enrichments                    enrichments                    5          260926945       260927067       122
enrichments                    enrichments                    7          260926979       260927066       87 
enrichments                    enrichments                    2          260926984       260927065       81              
[2017-10-09 21:32:28,060] WARN TGT renewal thread has been interrupted and will exit. (org.apache.kafka.common.security.kerberos.KerberosLogin)

The downside of this last command that uses Kafka's ConsumerGroupCommand is that will only return output when the consumer group is active.

In this case both the indexing and the enrichments consumer groups both look healthy as the lag is only around 100-200 per partition. Low lags are good while high consumer lags may signal some kind of problem:

-The topology might not have enough resources (anymore) to cope with the high input (Kafka production) into its source topic

-The topology might be deactivated, crashed or have a problem with the KafkaSpout configuration

When you consumer lags keeps increasing you might run into a state where the topic offset where it was consuming from is no longer on disk because of the topics aging out retention settings (either 'retention.ms' or 'retention.bytes' is exceeded). You should always config the topic to prevent this from happening. If it happens nonetheless manual intervention is required to set things in motion again.

But the most compelling argument to start monitoring Metron consumer lags is that you don't want the events signaling a threat being queued in some Kafka topic after millions of other events. By the time the alert eventually reaches the alerts UI it could already be too late to contain the threat.

It makes sense to schedule the kafka-consumer-offset-checker command and push the results into Elasticsearch (ES) to populate a dashboard. In this how-to I am using a bash script to collect the metrics, leverage logstash (also part of the ES stack) to send the results to ES and use Kibana to visualize.

The script will be like this:

#!/bin/bash

BROKER_LIST=broker-1:6667,broker-2:6667,broker-3:6667
ZOOKEEPER_HOSTS=zk-host1:2181,zk-host2:2181,zk-host3:2181
KAFKA_HOME=/usr/hdp/2.5.3.0-37/kafka

kinit -kt /etc/security/keytabs/metron.headless.keytab metron@<YOUR-REALM>

get_consumer_lag () {

    TOPIC=`echo $1 | cut -d':' -f1`
    GROUP=`echo $1 | cut -d':' -f2`

    $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --security-protocol SASL_PLAINTEXT --topic $TOPIC --broker-list $BROKER_LIST --time -2 | sed -n "/$TOPIC/p" | sed -e "s/:/\t/g" > /tmp/kafka_start_offset

    TOPIC_MIN_OFFSET=`cat /tmp/kafka_min_offset | cut -f3 | paste -sd+ | bc`

    $KAFKA_HOME/bin/kafka-consumer-offset-checker.sh --zookeeper $ZOOKEEPER_HOSTS --security-protocol SASL_PLAINTEXT --topic $TOPIC --group $GROUP | tr -s " " | cut -d' ' --output-delimiter=' ' -f 1,3,4,5,6 | sed -e '1d' > /tmp/kafka_lag_summary

    TOPIC_MAX_OFFSET=`cat /tmp/kafka_lag_summary | cut -f4 | paste -sd+ | bc`
    CONSUMER_OFFSET=`cat /tmp/kafka_lag_summary | cut -f3 | paste -sd+ | bc`
    CONSUMER_LAG=`cat /tmp/kafka_lag_summary | cut -f5 | paste -sd+ | bc`
    TOPIC_SIZE=$(( $TOPIC_MAX_OFFSET-$TOPIC_START_OFFSET ))
    CONSUMER_REL_MIN=`echo "scale=2; ( $CONSUMER_OFFSET-$TOPIC_MIN_OFFSET ) / $TOPIC_SIZE" | bc -l`

    #CONSUMER_GROUP  TOPIC  TOPIC_MIN_OFFSET  TOPIC_MAX_OFFSET  TOPIC_SIZE   CONSUMER_OFFSET CONSUMER_REL_MIN    CONSUMER_LAG
    echo "$GROUP  $TOPIC  $TOPIC_MIN_OFFSET $TOPIC_MAX_OFFSET   $TOPIC_SIZE   $CONSUMER_OFFSET  $CONSUMER_REL_MIN    $CONSUMER_LAG"
}  

for CONSUMER_TOPIC_AND_GROUP in "suricata:suricata_parser" "checkpoint_lea:checkpoint_lea_parser" "enrichments:enrichments" "indexing:profiler" "indexing:indexing" "indexing:metron"
do
    get_consumer_lag $CONSUMER_TOPIC_AND_GROUP
done

1. Copy the script, customize the BROKER_LIST and ZOOKEEPER_HOSTS variables and the list of " <topic>:<consumer_group> " combinations to loop over. Save it at a location, I will save it at /home/ubuntu/kafka_summary.sh

2. Don't forget to make the new script executable

Some explanation:

The script fires 2 Kafka CLI commands per <topic>:<consumer_group>. Moreover it strips the stdout, makes the data columnar, aggregates over topic partitions (if any) and performs some calculations to come to 1 summary line.

3. Run the script once and check the output. For each <topic>:<cons_group> combination, it should look like this:

[2017-10-09 22:47:22,432] WARN TGT renewal thread has been interrupted and will exit. (org.apache.kafka.common.security.kerberos.KerberosLogin)
[2017-10-09 22:47:23,328] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
indexing  indexing  5129160011 6143890404   1014730393   6143708348  .99    182056

Lets focus on the line with the numbers. The columns should be read as :

CONSUMER_GROUP TOPIC TOPIC_MIN_OFFSET TOPIC_MAX_OFFSET TOPIC_SIZE CONSUMER_OFFSET CONSUMER_REL_MIN CONSUMER_LAG

The column that need some explanation:

TOPIC_MIN_OFFSETThe lowest available topic offset in Kafka. The data is from the GetOffsetShell tool. This offset will increase as Kafka ages out log segments according to the retention settings. The start offsets of all partitions are aggregated***.
TOPIC_MAX_OFFSETThe ever increasing offset at the most recent end of the topic, as new events are produced. The offsets of all partitions are aggregated
TOPIC_SIZETOPIC_MAX_OFFSET - TOPIC_MIN_OFFSET
CONSUMER_OFFSETThe offset where the consumer group is currently at, aggregated over all partitions
CONSUMER_REL_MINThis indicates how many percentage points the consumer has progressed relative to the TOPIC_MIN_OFFSET. The formula is ( $CONSUMER_OFFSET-$TOPIC_MIN_OFFSET ) / $TOPIC_SIZE. This is a measure of how far the consumer is away from 'the edge' where the data is deleted from the 'old' end of the topic. When this happens event data is lost. High values (0.99) signal that the consumer is as far from the edge as it can be, while low values (< 0.10) call for immediate action.
CONSUMER_LAGHow far the consumer group is behind the TOPIC_MAX_OFFSET, aggregated over all partitions

All these metrics could also be evaluated at the partition level, but I chose to NOT have this level of detail anymore as it complicates matters dramatically later on in Kibana. As a first defense against consumer problems, aggregating over partitions is good enough. Just beware that sometimes lag can be high for one partition while the consumer only has a small lag for another. Usually this is the advent for greater problems shortly after.

The line chart below shows just the time series for the CONSUMER_LAG metric over 36 hours in Kibana. The metrics of the kafka_summary.sh script were run every 5 minutes.

39730-lag.png

Now we can get some interesting insights about our Metron deployment. The y-axis (consumer lag) has a 'square root' scale type so values in the upper part of the graph are actually worse than it appears.

The consumer groups 'suricata_parser' and 'checkpoint_lea_parser' make up the parser topologies. Both have negligible consumer offsets over time, so they seem more than capable to cope with the incoming volume of sensor events. The same is true for the indexing topology.

In this instance the enrichment topology was restarted and configured to reprocess all of the enrichments topic from the earliest available offset in Kafka (start all over). Although the lag was very high in the beginning at 800 million events, the topology was able to catch up again after 19 hours of processing at roughly 6:30 in the morning. This actually tells us something interesting about the maximum capacity of the enrichment topology. Something you would want to know anyway before taking Metron to production. After catching up it was able to keep up, alas a few non-serious spikes between 10:00 and 14:00.

The blue consumer named 'metron' is exemplary for how things can go wrong if you don't monitor. This group consumes from the indexing topic and is actually not a Metron related consumer but a NiFi consumer that takes the data elsewhere. We can see that the the lag for 'metron' is increasing at roughly the same rate as 'enrichments' decreases because they share the indexing topic as source and sink. Worrying is that 'metron' can not keep the pace, by far, at least not until 14:00 on the second day. So what happened at around 16:00 the first day when the lag was at a whopping 1 billion (!!) events? It seems as though the NiFi process suddenly came alive and consumed and committed a huge volume within 15 minutes. But further investigation showed that this was not what happened. Actually 'old' topic log segments were deleted by Kafka retention policies at the point where the slow NiFi process was consuming from. The consumer offset was lost and stuck but the auto.offset.reset property in the NiFi Kafka consumer made it revert toward the latest available offset in the indexing topic. This stresses the point for monitoring consumer lags closely; data and possible threats could have been lost in this sudden leap through the topic. This should be prevented at all costs.

After the big leap the consumer still was not fast enough to keep up with the indexing topic and again had a lag of nearly 500 million 17 hours later. Then the NiFi process got tuned which increased its capacity. Moreover, the influx into the indexing topic diminished sharply after the enrichment topology cought up 07:00, so the NiFi process was under far less stress and should still be under scrutiny and may need further tuning.

So we have seen that monitoring the consumer lags can be very telling about the Metron topologies and other processing that have a relation to the Metron Kafka topics.

In the next part of this series I will show how to embed the script kafka_summary.sh in a Logstash pipeline to push the metrics into ES and from there into a Kibana dashboard.

2,587 Views
Version history
Last update:
‎08-17-2019 10:47 AM
Updated by:
Contributors