Created 02-05-2018 04:43 PM
Hi,
I am trying to run metron on my Ambari managed hcp cluster with hdp 2.5 & Metron 0.4.0.
I have configured storm topology using metron UI. When I populated my input kafka topic with squid client logs using nifi, Nothing coming in indexing / enrichments kafka topics. I am following this link.
Grok statement is : SQUID_DELIMITED %{NUMBER:timestamp}.*%{INT:elapsed} %{IP:ip_src_address} %{WORD:action}/%{NUMBER:code} %{NUMBER:bytes} %{WORD:method} %{NOTSPACE:url}.*%{IP:ip_dst_addr}
Sensor parser configuration is as below :
{ "parserClassName": "org.apache.metron.parsers.GrokParser", "filterClassName": null, "sensorTopic": "newcheck", "writerClassName": null, "errorWriterClassName": null, "invalidWriterClassName": null, "readMetadata": false, "mergeMetadata": false, "numWorkers": null, "numAckers": null, "spoutParallelism": 1, "spoutNumTasks": 1, "parserParallelism": 1, "parserNumTasks": 1, "errorWriterParallelism": 1, "errorWriterNumTasks": 1, "spoutConfig": {}, "securityProtocol": null, "stormConfig": {}, "parserConfig": { "grokPath": "/apps/metron/patterns/newcheck", "patternLabel": "NEWCHECK" }, "fieldTransformations": [ { "input": [], "output": [ "ip_dst_addr_copy" ], "transformation": "STELLAR", "config": { "ip_dst_addr_copy": "DOMAIN_TO_TLD(DOMAIN_REMOVE_SUBDOMAINS(ip_dst_addr))" } } ] }
Enrichment configuration :
{ "enrichment": { "fieldMap": {}, "fieldToTypeMap": {}, "config": {} }, "threatIntel": { "fieldMap": {}, "fieldToTypeMap": {}, "config": {}, "triageConfig": { "riskLevelRules": [], "aggregator": "MAX", "aggregationConfig": {} } }, "configuration": {} }
Indexing configuration :
{ "hdfs": { "batchSize": 1, "enabled": true, "index": "newcheck" }, "elasticsearch": { "batchSize": 1, "enabled": true, "index": "newcheck" }, "solr": { "batchSize": 1, "enabled": true, "index": "newcheck" } }
When I checked the topology worker logs ( Using id from storm UI ) I get the below messages when debug level logging is configured.
2018-02-05 15:11:43.991 o.a.s.k.s.KafkaSpoutRetryExponentialBackoff Thread-15-kafkaSpout-executor[4 4] [DEBUG] Topic partitions with entries re ady to be retried [[]] 2018-02-05 15:11:43.993 o.a.m.p.GrokParser Thread-13-parserBolt-executor[5 5] [DEBUG] Grok parser parsing message: 1517843503.788 1008 127.0. 0.1 TCP_MISS/200 48493 GET https://www.woodlandworldwide.com/ - HIER_DIRECT/182.71.43.17 text/html 2018-02-05 15:11:43.994 o.a.m.p.GrokParser Thread-13-parserBolt-executor[5 5] [DEBUG] Grok parser parsed message: {elapsed=8, code=200, ip_dst_ addr=182.71.43.17, original_string=1517843503.788 1008 127.0.0.1 TCP_MISS/200 48493 GET https://www.woodlandworldwide.com/ - HIER_DIRECT/182. 71.43.17 text/html , ip_src_address=127.0.0.1, method=GET, bytes=48493, action=TCP_MISS, url=https://www.woodlandworldwide.com/, timestamp=1517843503.788} 2018-02-05 15:11:43.994 o.a.m.p.GrokParser Thread-13-parserBolt-executor[5 5] [DEBUG] Grok parser validating message: {code=200, ip_src_address =127.0.0.1, method=GET, url=https://www.woodlandworldwide.com/, source.type=newcheck, elapsed=8, ip_dst_addr=182.71.43.17, original_string=1517 843503.788 1008 127.0.0.1 TCP_MISS/200 48493 GET https://www.woodlandworldwide.com/ - HIER_DIRECT/182.71.43.17 text/html , ip_dst_addr_copy=null, bytes=48493, action=TCP_MISS, guid=f02b21fe-9a5b-4114-921a-82bf64ebb79b, timestamp=1517843503.788} 2018-02-05 15:11:43.994 o.a.m.p.GrokParser Thread-13-parserBolt-executor[5 5] [DEBUG] Grok parser did not validate message: {code=200, ip_src_a ddress=127.0.0.1, method=GET, url=https://www.woodlandworldwide.com/, source.type=newcheck, elapsed=8, ip_dst_addr=182.71.43.17, original_strin g=1517843503.788 1008 127.0.0.1 TCP_MISS/200 48493 GET https://www.woodlandworldwide.com/ - HIER_DIRECT/182.71.43.17 text/html , ip_dst_addr_copy=null, bytes=48493, action=TCP_MISS, guid=f02b21fe-9a5b-4114-921a-82bf64ebb79b, timestamp=1517843503.788}
Is the 'Grok parser did not validate message' in above log causing this issue ? Do I need to check anywhere else for getting something ?
Created 02-05-2018 08:30 PM
Yes, not validating will fail the message. The Grok Parser will not validate if it doesn't find a "timestamp" field that has a > 0 value.
@Override public boolean validate(JSONObject message) { LOG.debug("Grok parser validating message: {}", message); Object timestampObject = message.get(Constants.Fields.TIMESTAMP.getName()); if (timestampObject instanceof Long) { Long timestamp = (Long) timestampObject; if (timestamp > 0) { LOG.debug("Grok parser validated message: {}", message); return true; } } LOG.debug("Grok parser did not validate message: {}", message); return false; }
Created 02-06-2018 10:54 AM
Thanks for the reply. As per the code snippet it will invalidate if <0 plus not instance of long right ? My squid log message is
1517911793.839 2186 127.0.0.1 TCP_MISS/200 48493 GET https://www.woodlandworldwide.com/ - HIER_DIRECT/182.71.43.17 text/html
And my grok parser expression is
SQUID_DELIMITED %{NUMBER:timestamp}.*%{INT:elapsed} %{IP:ip_src_address} %{WORD:action}/%{NUMBER:code} %{NUMBER:bytes} %{WORD:method} %{NOTSPACE:url}.*%{IP:ip_dst_addr}
It creates the timestamp as 1517911793.839 . So here the failing check would be of long right ? Why a check like that is present ? Squid's timestamp format is like this only right ? What should I do here in such case ? I even tried directly pushing a message without decimal in timestamp. But that also failed the validation. Another interesting thing is If I add storm config directly ( Not vial UI ) Validation works fine.
Created 02-06-2018 01:04 PM
I don't understand what you mean by "add storm config directly". Can you put the steps you take? Are you saying if you take those steps everything works correctly?
Created 02-06-2018 01:41 PM
I have added the sensor configuration directly via command line without using metron UI after seeing this issue. I am talking about step 4 in this link. Parser bolt worked correctly. It pushed parsed message to configured kafka topic 'indexing'. But Indexing bolt did not work. It gave me the below error in 'indexing' topic.
"error_type":"indexing_error","message":"\/apps\/metron\/indexing\/indexed\/error\/enrichment-hdfsIndexingBolt-3-0-1517911841271.json (No such file or directory)<br>
Created 02-06-2018 01:08 PM
In the Apache Metron codebase, the squid grok rules are as such, and produce a timestamp without the .NN.
SQUID_DELIMITED %{NUMBER:timestamp}[^0-9]*%{INT:elapsed} %{IP:ip_src_addr} %{WORD:action}/%{NUMBER:code} %{NUMBER:bytes} %{WORD:method} %{NOTSPACE:url}[^0-9]*(%{IP:ip_dst_addr})?
Can you try that?
Created 02-06-2018 02:17 PM
I tried. But same validation failure message.
Log coming in worker.log
2018-02-06 13:19:50.949 o.a.m.p.GrokParser Thread-13-parserBolt-executor[5 5] [DEBUG] Grok parser did not validate message: {elapsed=2288, code =200, ip_dst_addr=182.71.43.17, original_string=1517923190.328 2288 127.0.0.1 TCP_MISS/200 48615 GET https://www.woodlandworldwide.com/ - HIE R_DIRECT/182.71.43.17 text/html , method=GET, bytes=48615, action=TCP_MISS, guid=a279b46f-ad0b-4d65-b6be-6e8a3e94e79f, ip_src_addr=127.0.0.1, url=https://www.woodlandworldwide .com/, timestamp=1517923190.328, source.type=test}
Configuration in given in metron UI.
SQUID_DELIMITED %{NUMBER:timestamp}[^0-9]*%{INT:elapsed} %{IP:ip_src_addr} %{WORD:action}/%{NUMBER:code} %{NUMBER:bytes} %{WORD:method} %{NOTSPACE:url}[^0-9]*(%{IP:ip_dst_addr})?
My squid log message is
1517923190.328 2288 127.0.0.1 TCP_MISS/200 48615 GET https://www.woodlandworldwide.com/ - HIER_DIRECT/182.71.43.17 text/html
Created 02-06-2018 07:43 PM
I ran your message through the integration test code, and it worked. After changing the grok, you have to restart the parser topology in storm.
Created 02-07-2018 04:34 AM
How can I restart parser topology in storm ? I could not see an option in storm UI. I restarted both storm server & metron components using ambari after changing the sensor configuration using metron UI. But the same error message is coming. Do I need to restart any other component or missing any step ?
Created 02-07-2018 05:04 AM
Thanks a lot for your support. I finally resolved the issue. Root cause was in the sensor configuration json we have to add "timestampField": "timestamp" explicitly under key 'parserConfig'. If I add that and stopping and starting sensor using metron UI solved the issue. Then only like you said parser will create proper timestamp in long.
Now I get the below error in indexing bolt when checked via storm UI. Any idea about what could be causing this ?
java.io.FileNotFoundException: /apps/metron/indexing/indexed/test/enrichment-hdfsIndexingBolt-3-0-1517978596427.json (No such file or directory) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream.<init>(FileOutputStream.java:213) at java.io.FileOutputStream.<init>(FileOutputStream.java:101) at org.apache.metron.writer.hdfs.SourceHandler.createOutputFile(SourceHandler.java:156) at