Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Druid kafka ingestion from Hive - HDP 3.0

avatar
Contributor

Hi,

I'm trying to ingest event data form Kafka to Druid using the new Hive/Druid/Kafka integration in Hive 3
(see - https://cwiki.apache.org/confluence/display/Hive/Druid+Integration ; section " Druid Kafka Ingestion from Hive"

I've got events in JSON format in a Kafka topic using the following structure :

{
  "timestamp": "2018-11-04T22:43:10Z",
  "machine1": "RXI901",
  "machine2": "RXI902",
  "priority": "74",
  "level": "[e.warning]",
  "machine3": "RXI900",
  "Protocol": "TCP",
  "SrcIP": "109.26.211.73",
  "OriginalClientIP": "::",
  "DstIP": "192.168.104.96",
  "SrcPort": "36711",
  "DstPort": "54",
  "TCPFlags": "0x0",
  "IngressInterface": "s3p4",
  "EgressInterface": "s3p3",
  "IngressZone": "INUTILISE",
  "EgressZone": "INUTILISE",
  "DE": "Primary Detection Engine (f77608a0-0e20-11e6-91d7-88d7e001637c)",
  "Policy": "Default Access Control",
  "ConnectType": "Start",
  "AccessControlRuleName": "Unknown",
  "AccessControlRuleAction": "Allow",
  "PrefilterPolicy": "Unknown",
  "UserName": "No Authentication Required",
  "InitiatorPackets": 1,
  "ResponderPackets": 0,
  "InitiatorBytes": 80,
  "ResponderBytes": 0,
  "NAPPolicy": "Network Analysis",
  "DNSResponseType": "No Error",
  "Sinkhole": "Unknown",
  "URLCategory": "Unknown",
  "URLReputation": "Risk unknown"
}

To ingest them from Kafka I've created to following external table in Hive matching the JSON structure of the messages

CREATE EXTERNAL TABLE ssh_druid_kafka (
 `__time` timestamp,
 `machine1` string,
 `machine2` string,
 `priority` string,
 `level` string,
 `machine3` string,
 `Protocol` string,
 `SrcIP` string,
 `OriginalClientIP` string,
 `DstIP` string,
 `SrcPort` string,
 `DstPort` string,
 `TCPFlags` string,
 `IngressInterface` string,
 `EgressInterface` string,
 `IngressZone` string,
 `EgressZone` string,
 `DE` string,
 `Policy` string,
 `ConnectType` string,
 `AccessControlRuleName` string,
 `AccessControlRuleAction` string,
 `PrefilterPolicy` string,
 `UserName` string,
 `InitiatorPackets` int,
 `ResponderPackets` int,
 `InitiatorBytes` int,
 `ResponderBytes` int,
 `NAPPolicy` string,
 `DNSResponseType` string,
 `Sinkhole` string,
 `URLCategory` string,
 `URLReputation` string
)
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
 TBLPROPERTIES (
 "kafka.bootstrap.servers" = "[kakfa host]:6667",
 "kafka.topic" = "log_schema_raw",
 "druid.kafka.ingestion.useEarliestOffset" = "true",
 "druid.kafka.ingestion.maxRowsInMemory" = "20",
 "druid.kafka.ingestion.startDelay" = "PT5S",
 "druid.kafka.ingestion.period" = "PT30S",
 "druid.kafka.ingestion.consumer.retries" = "2"
);

ALTER TABLE ssh_druid_kafka SET TBLPROPERTIES("druid.kafka.ingestion" = 'START');

I'm getting an indexing task in Druid supervisor...

93517-screen-shot-2018-11-05-at-000525.png

=> but no data source in the Druid Broker 😞

Upon closer look at the the task logs in Druid Supervisor, I see parsing errors :

2018-11-04T23:06:06,305 ERROR [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [60] Unparseable events! Turn on debug logging to see exception stack trace.
2018-11-04T23:09:06,306 ERROR [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [60] Unparseable events! Turn on debug logging to see exception stack trace.
...


Questions :

1. How do I enable Debug Logging on tasks ?
=> I've tried setting the log4j level to DEBUG in the Ambari Druid tab. That does affect the the log levels of the components but doesn't seem to affect the indexing tasks.

2. What is the format expected by Druid for using the Kafka Indexing Service ?

Am I missing something ?

Thank for your help

1 ACCEPTED SOLUTION

avatar
Expert Contributor

Looked at the code and seems like the current state of the art the timestamp column is hard coded to be __time, thus that is why you are getting the exceptions since your column is called `timestamp`.

https://github.com/apache/hive/blob/a51e6aeaf816bdeea5e91ba3a0fab8a31b3a496d/druid-handler/src/java/...

If this is the case this is a serious limitation and need to be fixed. @Nishant Bangarwa what you think?

View solution in original post

3 REPLIES 3

avatar
Expert Contributor

Looked at the code and seems like the current state of the art the timestamp column is hard coded to be __time, thus that is why you are getting the exceptions since your column is called `timestamp`.

https://github.com/apache/hive/blob/a51e6aeaf816bdeea5e91ba3a0fab8a31b3a496d/druid-handler/src/java/...

If this is the case this is a serious limitation and need to be fixed. @Nishant Bangarwa what you think?

avatar
Contributor

Hi @Slim

Thanks, yep that was it.

Another little quirk identified with the help of Charles Bernard :

- All names in the JSON object must be in lower case for them to be parsed

- A corollary of this is that all columns names must also be in lower case

avatar
Expert Contributor

Correct @Matthieu Lamairesse Druid is case sensitive while Hive is not, thus, to make it work you need to make sure that all the columns are in lowercase format.