Support Questions
Find answers, ask questions, and share your expertise

JSON parser not sending messages to kafka topic

JSON parser not sending messages to kafka topic



I am trying to filter out messages from the indexing topic which have a threat triage score > 50. For this, I have created a new JSONMap parser named 'notify_admin' and a Kafka topic named 'notify_admin'. I have set up the parser to get the messages from the indexing topic by specifying the 'sensorTopic' field. To send the message to the 'notify_admin' Kafka topic I have set the 'outputTopic'. However, when messages are sent to the indexing topic from other parsers I have configured for example squid they do get enriched and indexed but the JSON parser does not output any data to the Kafka topic. The JSON parser topology shows messages being emitted and acknowledged but no data is output to the 'notify_admin' Kafka topic What am I doing wrong?

For now, I am just trying to build up the flow pipeline so haven't set up any filter using Stellar. Just trying to pass the messages from one topic to another. However, it doesn't work.

Parser Config (notify_admin.json)

 "parserClassName": "org.apache.metron.parsers.json.JSONMapParser",
 "filterClassName": null,
 "sensorTopic": "indexing",
 "outputTopic": "notify_admin",
 "errorTopic": null,
 "writerClassName": null,
 "errorWriterClassName": null,
 "readMetadata": false,
 "mergeMetadata": false,
 "numWorkers": null,
 "numAckers": null,
 "spoutParallelism": 1,
 "spoutNumTasks": 1,
 "parserParallelism": 1,
 "parserNumTasks": 1,
 "errorWriterParallelism": 1,
 "errorWriterNumTasks": 1,
 "spoutConfig": {},
 "securityProtocol": null,
 "stormConfig": {},
 "parserConfig": {},
 "fieldTransformations": [],
 "cacheConfig": {},
 "rawMessageStrategy": "DEFAULT",
 "rawMessageStrategyConfig": {}

Enrichment Config (notify_admin.json)

 "enrichment": {
  "fieldMap": {},
  "fieldToTypeMap": {},
  "config": {}
 "threatIntel": {
  "fieldMap": {},
  "fieldToTypeMap": {},
  "config": {},
  "triageConfig": {
   "riskLevelRules": [],
   "aggregator": "MAX",
   "aggregationConfig": {}
 "configuration": {}

Indexing Config (notify_admin.json)

 "hdfs": {
  "batchSize": 1,
  "enabled": true,
  "index": "notify_admin"
 "elasticsearch": {
  "batchSize": 1,
  "enabled": true,
  "index": "notify_admin"
 "solr": {
  "batchSize": 1,
  "enabled": true,
  "index": "notify_admin"

A simple message in the indexing Kafka Topic

    "full_hostname": "",
    "parallelenricher.splitter.end.ts": "1544721368377",
    "code": 503,
    "method": "GET",
    "": "Rule1",
    "threat.triage.rules.0.comment": "",
    "threat.triage.score": 50.0,
    "is_alert": "true",
    "parallelenricher.enrich.begin.ts": "1544721368377",
    "url": "http:\/\/\/",
    "source.type": "squid",
    "elapsed": 73,
    "parallelenricher.splitter.begin.ts": "1544721368377",
    "threat.triage.rules.0.score": 50,
    "original_string": "1544699853.257     73 ::1 TCP_MISS\/503 4147 GET http:\/\/\/ - HIER_NONE\/- text\/html",
    "bytes": 4147,
    "threatintels.hbaseThreatIntel.domain_without_subdomains.zeusList": "alert",
    "domain_without_subdomains": "",
    "threat.triage.rules.0.reason": null,
    "action": "TCP_MISS",
    "guid": "6e1f11be-2738-436a-85ed-b3e22bea8ab6",
    "ip_src_addr": "::1",
    "timestamp": 1544699853257

Retrieving data from notify_admin topic (No records Processed)

${HDP_HOME}/kafka-broker/bin/ --zookeeper $ZOOKEEPER --topic notify_admin --from-beginning

Alerts UI (Shows alerts indexed with threat triage score)

Storm showing messages being emitted and parsed (notify_admin)