Created on 01-26-2020 07:24 AM - last edited on 01-26-2020 02:22 PM by ask_bill_brooks
Hello everyone,
I'm a newbie to the world of Apache metron. And I'm facing an issue with Apache storm topology. At first i wanted to test a simple syslog RFC-5424 to start my journey with metron. First, I've created a kafka topic: syslog5424 and then created syslog5424.json parser file in zookeeper, and another one for indexing. Afterwards i reloaded the zookper configuration with this command:
./zk_load_configs.sh -i ../config/zookeeper -m PUSH -z mdr-1:2181
Then i started the parser topology through the management UI, and then i followed the instruction for defining an index in elastic search with a template. this is my reference documentation that i'm following by changing the squid into syslog: https://cwiki.apache.org/confluence/display/METRON/2016/04/25/Metron+Tutorial+-+Fundamentals+Part+1%...
And in Storm UI I keep getting this error msg, without showing any data in Kibana:
2020-01-26 17:11:05.949 o.a.k.c.p.KafkaProducer Thread-13-parserBolt-executor[5 5] [INFO] Closing the Kafka producer with timeoutMillis = 0 ms. 2020-01-26 17:11:05.950 o.a.s.util Thread-13-parserBolt-executor[5 5] [ERROR] Async loop died! java.lang.IllegalStateException: Unable to initialize message writer at org.apache.metron.parsers.bolt.WriterHandler.init(WriterHandler.java:80) ~[stormjar.jar:?] at org.apache.metron.parsers.bolt.ParserBolt.prepare(ParserBolt.java:233) ~[stormjar.jar:?] at org.apache.storm.daemon.executor$fn__10195$fn__10208.invoke(executor.clj:800) ~[storm-core-1.1.0.2.6.5.1175-1.jar:1.1.0.2.6.5.1175-1] at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:482) [storm-core-1.1.0.2.6.5.1175-1.jar:1.1.0.2.6.5.1175-1] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112] Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:335) ~[stormjar.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:163) ~[stormjar.jar:?] at org.apache.metron.writer.kafka.KafkaWriter.init(KafkaWriter.java:211) ~[stormjar.jar:?] at org.apache.metron.parsers.bolt.WriterHandler.init(WriterHandler.java:78) ~[stormjar.jar:?] ... 5 more Caused by: java.lang.NullPointerException at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:40) ~[stormjar.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:275) ~[stormjar.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:163) ~[stormjar.jar:?] at org.apache.metron.writer.kafka.KafkaWriter.init(KafkaWriter.java:211) ~[stormjar.jar:?] at org.apache.metron.parsers.bolt.WriterHandler.init(WriterHandler.java:78) ~[stormjar.jar:?] ... 5 more 2020-01-26 17:11:05.958 o.a.s.d.executor Thread-13-parserBolt-executor[5 5] [ERROR] java.lang.IllegalStateException: Unable to initialize message writer at org.apache.metron.parsers.bolt.WriterHandler.init(WriterHandler.java:80) ~[stormjar.jar:?] at org.apache.metron.parsers.bolt.ParserBolt.prepare(ParserBolt.java:233) ~[stormjar.jar:?] at org.apache.storm.daemon.executor$fn__10195$fn__10208.invoke(executor.clj:800) ~[storm-core-1.1.0.2.6.5.1175-1.jar:1.1.0.2.6.5.1175-1] at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:482) [storm-core-1.1.0.2.6.5.1175-1.jar:1.1.0.2.6.5.1175-1] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112] Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:335) ~[stormjar.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:163) ~[stormjar.jar:?] at org.apache.metron.writer.kafka.KafkaWriter.init(KafkaWriter.java:211) ~[stormjar.jar:?] at org.apache.metron.parsers.bolt.WriterHandler.init(WriterHandler.java:78) ~[stormjar.jar:?] ... 5 more Caused by: java.lang.NullPointerException at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:40) ~[stormjar.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:275) ~[stormjar.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:163) ~[stormjar.jar:?] at org.apache.metron.writer.kafka.KafkaWriter.init(KafkaWriter.java:211) ~[stormjar.jar:?] at org.apache.metron.parsers.bolt.WriterHandler.init(WriterHandler.java:78) ~[stormjar.jar:?] ... 5 more 2020-01-26 17:11:05.982 o.a.s.util Thread-13-parserBolt-executor[5 5] [ERROR] Halting process: ("Worker died") java.lang.RuntimeException: ("Worker died") at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.1.0.2.6.5.1175-1.jar:1.1.0.2.6.5.1175-1] at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?] at org.apache.storm.daemon.worker$fn__10799$fn__10800.invoke(worker.clj:763) [storm-core-1.1.0.2.6.5.1175-1.jar:1.1.0.2.6.5.1175-1] at org.apache.storm.daemon.executor$mk_executor_data$fn__10011$fn__10012.invoke(executor.clj:276) [storm-core-1.1.0.2.6.5.1175-1.jar:1.1.0.2.6.5.1175-1] at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:494) [storm-core-1.1.0.2.6.5.1175-1.jar:1.1.0.2.6.5.1175-1] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112] 2020-01-26 17:11:05.985 o.a.s.d.worker Thread-18 [INFO] Shutting down worker syslog5424-24-1580051309 9fc257f5-6775-4d90-afdc-344c5ba108aa 6701 2020-01-26 17:11:05.985 o.a.s.d.worker Thread-18 [INFO] Terminating messaging context 2020-01-26 17:11:11.914 STDERR Thread-2 [INFO] JMXetricAgent instrumented JVM, see https://github.com/ganglia/jmxetric
This is parser configuration:
SENSOR PARSER CONFIG
{
"parserClassName": "org.apache.metron.parsers.syslog.Syslog5424Parser",
"filterClassName": null,
"sensorTopic": "syslog5424",
"outputTopic": "syslog5424",
"errorTopic": null,
"writerClassName": "org.apache.metron.writer.kafka.KafkaWriter",
"errorWriterClassName": null,
"readMetadata": false,
"mergeMetadata": false,
"numWorkers": 1,
"numAckers": 1,
"spoutParallelism": 1,
"spoutNumTasks": 1,
"parserParallelism": 1,
"parserNumTasks": 1,
"errorWriterParallelism": 1,
"errorWriterNumTasks": 1,
"spoutConfig": {},
"securityProtocol": null,
"stormConfig": {},
"parserConfig": {
"nilPolicy": "DASH"
},
"fieldTransformations": [],
"cacheConfig": {
"stellar.cache.maxSize": 20000,
"stellar.cache.maxTimeRetain": 20
},
"rawMessageStrategy": "DEFAULT",
"rawMessageStrategyConfig": {}
}
and Indexing Configuration
{
"hdfs": {
"batchSize": 1,
"enabled": true,
"index": "syslog5424"
},
"elasticsearch": {
"batchSize": 1,
"enabled": true,
"index": "syslog5424_index"
},
"solr": {
"batchSize": 1,
"enabled": true,
"index": "syslog5424"
}
}
I truly appreciate your help 🙂 🙂
Created 02-02-2020 12:00 AM
I solved this issue by replacing the parser with GROK pattern instead of the typical java class and it worked ! 🙂
Grok i used :
%{GREEDYDATA:unwanted} %{HOSTNAME:host}: %{SYSLOGTIMESTAMP:timestamp}:%{GREEDYDATA:info_type}: %{GREEDYDATA:msg}
Created 01-28-2020 10:43 PM
After digging in the issue i found that the bootstrap.servers that's provided to the Kafka producer is null which is the one that keeps having a null pointer exception on but i did put it in my spout configuration and both producer.properties and consumer.properties and still it didn't work !
How can i fix this??
Created 02-02-2020 12:00 AM
I solved this issue by replacing the parser with GROK pattern instead of the typical java class and it worked ! 🙂
Grok i used :
%{GREEDYDATA:unwanted} %{HOSTNAME:host}: %{SYSLOGTIMESTAMP:timestamp}:%{GREEDYDATA:info_type}: %{GREEDYDATA:msg}