Support Questions

Find answers, ask questions, and share your expertise

Apache Metron - Storm parserbolt exception

avatar
Explorer

Hello everyone, 

 

I'm a newbie to the world of Apache metron. And I'm facing an issue with Apache storm topology. At first i wanted to test a simple syslog RFC-5424 to start my journey with metron. First, I've created a kafka topic: syslog5424 and then created syslog5424.json parser file in zookeeper, and another one for indexing. Afterwards i reloaded the zookper configuration with this command:

./zk_load_configs.sh -i ../config/zookeeper -m PUSH -z mdr-1:2181

Then i started the parser topology through the management UI, and then i followed the instruction for defining an index in elastic search with a template. this is my reference documentation that i'm following by changing the squid into syslog: https://cwiki.apache.org/confluence/display/METRON/2016/04/25/Metron+Tutorial+-+Fundamentals+Part+1%...

 

And in Storm UI I keep getting this error msg, without showing any data in Kibana:

 

2020-01-26 17:11:05.949 o.a.k.c.p.KafkaProducer Thread-13-parserBolt-executor[5 5] [INFO] Closing the Kafka producer with timeoutMillis = 0 ms.
2020-01-26 17:11:05.950 o.a.s.util Thread-13-parserBolt-executor[5 5] [ERROR] Async loop died!
java.lang.IllegalStateException: Unable to initialize message writer
	at org.apache.metron.parsers.bolt.WriterHandler.init(WriterHandler.java:80) ~[stormjar.jar:?]
	at org.apache.metron.parsers.bolt.ParserBolt.prepare(ParserBolt.java:233) ~[stormjar.jar:?]
	at org.apache.storm.daemon.executor$fn__10195$fn__10208.invoke(executor.clj:800) ~[storm-core-1.1.0.2.6.5.1175-1.jar:1.1.0.2.6.5.1175-1]
	at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:482) [storm-core-1.1.0.2.6.5.1175-1.jar:1.1.0.2.6.5.1175-1]
	at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:335) ~[stormjar.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:163) ~[stormjar.jar:?]
	at org.apache.metron.writer.kafka.KafkaWriter.init(KafkaWriter.java:211) ~[stormjar.jar:?]
	at org.apache.metron.parsers.bolt.WriterHandler.init(WriterHandler.java:78) ~[stormjar.jar:?]
	... 5 more
Caused by: java.lang.NullPointerException
	at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:40) ~[stormjar.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:275) ~[stormjar.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:163) ~[stormjar.jar:?]
	at org.apache.metron.writer.kafka.KafkaWriter.init(KafkaWriter.java:211) ~[stormjar.jar:?]
	at org.apache.metron.parsers.bolt.WriterHandler.init(WriterHandler.java:78) ~[stormjar.jar:?]
	... 5 more
2020-01-26 17:11:05.958 o.a.s.d.executor Thread-13-parserBolt-executor[5 5] [ERROR] 
java.lang.IllegalStateException: Unable to initialize message writer
	at org.apache.metron.parsers.bolt.WriterHandler.init(WriterHandler.java:80) ~[stormjar.jar:?]
	at org.apache.metron.parsers.bolt.ParserBolt.prepare(ParserBolt.java:233) ~[stormjar.jar:?]
	at org.apache.storm.daemon.executor$fn__10195$fn__10208.invoke(executor.clj:800) ~[storm-core-1.1.0.2.6.5.1175-1.jar:1.1.0.2.6.5.1175-1]
	at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:482) [storm-core-1.1.0.2.6.5.1175-1.jar:1.1.0.2.6.5.1175-1]
	at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:335) ~[stormjar.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:163) ~[stormjar.jar:?]
	at org.apache.metron.writer.kafka.KafkaWriter.init(KafkaWriter.java:211) ~[stormjar.jar:?]
	at org.apache.metron.parsers.bolt.WriterHandler.init(WriterHandler.java:78) ~[stormjar.jar:?]
	... 5 more
Caused by: java.lang.NullPointerException
	at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:40) ~[stormjar.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:275) ~[stormjar.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:163) ~[stormjar.jar:?]
	at org.apache.metron.writer.kafka.KafkaWriter.init(KafkaWriter.java:211) ~[stormjar.jar:?]
	at org.apache.metron.parsers.bolt.WriterHandler.init(WriterHandler.java:78) ~[stormjar.jar:?]
	... 5 more
2020-01-26 17:11:05.982 o.a.s.util Thread-13-parserBolt-executor[5 5] [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
	at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.1.0.2.6.5.1175-1.jar:1.1.0.2.6.5.1175-1]
	at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
	at org.apache.storm.daemon.worker$fn__10799$fn__10800.invoke(worker.clj:763) [storm-core-1.1.0.2.6.5.1175-1.jar:1.1.0.2.6.5.1175-1]
	at org.apache.storm.daemon.executor$mk_executor_data$fn__10011$fn__10012.invoke(executor.clj:276) [storm-core-1.1.0.2.6.5.1175-1.jar:1.1.0.2.6.5.1175-1]
	at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:494) [storm-core-1.1.0.2.6.5.1175-1.jar:1.1.0.2.6.5.1175-1]
	at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
2020-01-26 17:11:05.985 o.a.s.d.worker Thread-18 [INFO] Shutting down worker syslog5424-24-1580051309 9fc257f5-6775-4d90-afdc-344c5ba108aa 6701
2020-01-26 17:11:05.985 o.a.s.d.worker Thread-18 [INFO] Terminating messaging context
2020-01-26 17:11:11.914 STDERR Thread-2 [INFO] JMXetricAgent instrumented JVM, see https://github.com/ganglia/jmxetric

 

 

This is  parser configuration:

SENSOR PARSER CONFIG

{
"parserClassName": "org.apache.metron.parsers.syslog.Syslog5424Parser",
"filterClassName": null,
"sensorTopic": "syslog5424",
"outputTopic": "syslog5424",
"errorTopic": null,
"writerClassName": "org.apache.metron.writer.kafka.KafkaWriter",
"errorWriterClassName": null,
"readMetadata": false,
"mergeMetadata": false,
"numWorkers": 1,
"numAckers": 1,
"spoutParallelism": 1,
"spoutNumTasks": 1,
"parserParallelism": 1,
"parserNumTasks": 1,
"errorWriterParallelism": 1,
"errorWriterNumTasks": 1,
"spoutConfig": {},
"securityProtocol": null,
"stormConfig": {},
"parserConfig": {
"nilPolicy": "DASH"
},
"fieldTransformations": [],
"cacheConfig": {
"stellar.cache.maxSize": 20000,
"stellar.cache.maxTimeRetain": 20
},
"rawMessageStrategy": "DEFAULT",
"rawMessageStrategyConfig": {}

 

and Indexing Configuration

 

{
"hdfs": {
"batchSize": 1,
"enabled": true,
"index": "syslog5424"
},
"elasticsearch": {
"batchSize": 1,
"enabled": true,
"index": "syslog5424_index"
},
"solr": {
"batchSize": 1,
"enabled": true,
"index": "syslog5424"
}
}

 

I truly appreciate your help 🙂 🙂 

1 ACCEPTED SOLUTION

avatar
Explorer

I solved this issue by replacing the parser with GROK pattern instead of the typical java class and it worked !  🙂 

 

Grok i used : 

%{GREEDYDATA:unwanted} %{HOSTNAME:host}: %{SYSLOGTIMESTAMP:timestamp}:%{GREEDYDATA:info_type}: %{GREEDYDATA:msg}

View solution in original post

2 REPLIES 2

avatar
Explorer

After digging in the issue i found that the bootstrap.servers that's provided to the Kafka producer is null which is the one that keeps having a null pointer exception on but i did put it in my spout configuration and both producer.properties and consumer.properties and still it didn't work ! 

How can i fix this??

Capture.PNG

avatar
Explorer

I solved this issue by replacing the parser with GROK pattern instead of the typical java class and it worked !  🙂 

 

Grok i used : 

%{GREEDYDATA:unwanted} %{HOSTNAME:host}: %{SYSLOGTIMESTAMP:timestamp}:%{GREEDYDATA:info_type}: %{GREEDYDATA:msg}