Created on 11-04-2018 11:18 PM - edited 09-16-2022 06:51 AM
Hi,
I'm trying to ingest event data form Kafka to Druid using the new Hive/Druid/Kafka integration in Hive 3 
(see - https://cwiki.apache.org/confluence/display/Hive/Druid+Integration  ; section " Druid Kafka Ingestion from Hive" 
I've got events in JSON format in a Kafka topic using the following structure :
{
  "timestamp": "2018-11-04T22:43:10Z",
  "machine1": "RXI901",
  "machine2": "RXI902",
  "priority": "74",
  "level": "[e.warning]",
  "machine3": "RXI900",
  "Protocol": "TCP",
  "SrcIP": "109.26.211.73",
  "OriginalClientIP": "::",
  "DstIP": "192.168.104.96",
  "SrcPort": "36711",
  "DstPort": "54",
  "TCPFlags": "0x0",
  "IngressInterface": "s3p4",
  "EgressInterface": "s3p3",
  "IngressZone": "INUTILISE",
  "EgressZone": "INUTILISE",
  "DE": "Primary Detection Engine (f77608a0-0e20-11e6-91d7-88d7e001637c)",
  "Policy": "Default Access Control",
  "ConnectType": "Start",
  "AccessControlRuleName": "Unknown",
  "AccessControlRuleAction": "Allow",
  "PrefilterPolicy": "Unknown",
  "UserName": "No Authentication Required",
  "InitiatorPackets": 1,
  "ResponderPackets": 0,
  "InitiatorBytes": 80,
  "ResponderBytes": 0,
  "NAPPolicy": "Network Analysis",
  "DNSResponseType": "No Error",
  "Sinkhole": "Unknown",
  "URLCategory": "Unknown",
  "URLReputation": "Risk unknown"
}To ingest them from Kafka I've created to following external table in Hive matching the JSON structure of the messages
CREATE EXTERNAL TABLE ssh_druid_kafka (
 `__time` timestamp,
 `machine1` string,
 `machine2` string,
 `priority` string,
 `level` string,
 `machine3` string,
 `Protocol` string,
 `SrcIP` string,
 `OriginalClientIP` string,
 `DstIP` string,
 `SrcPort` string,
 `DstPort` string,
 `TCPFlags` string,
 `IngressInterface` string,
 `EgressInterface` string,
 `IngressZone` string,
 `EgressZone` string,
 `DE` string,
 `Policy` string,
 `ConnectType` string,
 `AccessControlRuleName` string,
 `AccessControlRuleAction` string,
 `PrefilterPolicy` string,
 `UserName` string,
 `InitiatorPackets` int,
 `ResponderPackets` int,
 `InitiatorBytes` int,
 `ResponderBytes` int,
 `NAPPolicy` string,
 `DNSResponseType` string,
 `Sinkhole` string,
 `URLCategory` string,
 `URLReputation` string
)
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
 TBLPROPERTIES (
 "kafka.bootstrap.servers" = "[kakfa host]:6667",
 "kafka.topic" = "log_schema_raw",
 "druid.kafka.ingestion.useEarliestOffset" = "true",
 "druid.kafka.ingestion.maxRowsInMemory" = "20",
 "druid.kafka.ingestion.startDelay" = "PT5S",
 "druid.kafka.ingestion.period" = "PT30S",
 "druid.kafka.ingestion.consumer.retries" = "2"
);
ALTER TABLE ssh_druid_kafka SET TBLPROPERTIES("druid.kafka.ingestion" = 'START');
I'm getting an indexing task in Druid supervisor...
=> but no data source in the Druid Broker 😞
Upon closer look at the the task logs in Druid Supervisor, I see parsing errors :
2018-11-04T23:06:06,305 ERROR [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [60] Unparseable events! Turn on debug logging to see exception stack trace. 2018-11-04T23:09:06,306 ERROR [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [60] Unparseable events! Turn on debug logging to see exception stack trace. ...
Questions : 
1. How do I enable Debug Logging on tasks ? 
=> I've tried setting the log4j level to DEBUG in the Ambari Druid tab.  That does affect the the log levels of the components but doesn't seem to affect the indexing tasks. 
2. What is the format expected by Druid for using the Kafka Indexing Service ?
Am I missing something ?
Thank for your help
Created 11-06-2018 08:19 PM
Looked at the code and seems like the current state of the art the timestamp column is hard coded to be __time, thus that is why you are getting the exceptions since your column is called `timestamp`.
If this is the case this is a serious limitation and need to be fixed. @Nishant Bangarwa what you think?
Created 11-06-2018 08:19 PM
Looked at the code and seems like the current state of the art the timestamp column is hard coded to be __time, thus that is why you are getting the exceptions since your column is called `timestamp`.
If this is the case this is a serious limitation and need to be fixed. @Nishant Bangarwa what you think?
Created 11-07-2018 07:05 PM
Hi @Slim
Thanks, yep that was it.
Another little quirk identified with the help of Charles Bernard :
- All names in the JSON object must be in lower case for them to be parsed
- A corollary of this is that all columns names must also be in lower case
Created 11-07-2018 07:07 PM
Correct @Matthieu Lamairesse Druid is case sensitive while Hive is not, thus, to make it work you need to make sure that all the columns are in lowercase format.
 
					
				
				
			
		
