<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question Druid kafka ingestion from Hive - HDP 3.0 in Archives of Support Questions (Read Only)</title>
    <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Druid-kafka-ingestion-from-Hive-HDP-3-0/m-p/226446#M84706</link>
    <description>&lt;P&gt;Hi, &lt;/P&gt;&lt;P&gt;I'm trying to ingest event data form Kafka to Druid using the new Hive/Druid/Kafka integration in Hive 3 &lt;BR /&gt;(see -&lt;A href="https://cwiki.apache.org/confluence/display/Hive/Druid+Integration" rel="nofollow noopener noreferrer" target="_blank"&gt; https://cwiki.apache.org/confluence/display/Hive/Druid+Integration&lt;/A&gt;  ; section " Druid Kafka Ingestion from Hive" &lt;/P&gt;&lt;P&gt;I've got events in JSON format in a Kafka topic using the following structure : &lt;/P&gt;&lt;PRE&gt;{
  "timestamp": "2018-11-04T22:43:10Z",
  "machine1": "RXI901",
  "machine2": "RXI902",
  "priority": "74",
  "level": "[e.warning]",
  "machine3": "RXI900",
  "Protocol": "TCP",
  "SrcIP": "109.26.211.73",
  "OriginalClientIP": "::",
  "DstIP": "192.168.104.96",
  "SrcPort": "36711",
  "DstPort": "54",
  "TCPFlags": "0x0",
  "IngressInterface": "s3p4",
  "EgressInterface": "s3p3",
  "IngressZone": "INUTILISE",
  "EgressZone": "INUTILISE",
  "DE": "Primary Detection Engine (f77608a0-0e20-11e6-91d7-88d7e001637c)",
  "Policy": "Default Access Control",
  "ConnectType": "Start",
  "AccessControlRuleName": "Unknown",
  "AccessControlRuleAction": "Allow",
  "PrefilterPolicy": "Unknown",
  "UserName": "No Authentication Required",
  "InitiatorPackets": 1,
  "ResponderPackets": 0,
  "InitiatorBytes": 80,
  "ResponderBytes": 0,
  "NAPPolicy": "Network Analysis",
  "DNSResponseType": "No Error",
  "Sinkhole": "Unknown",
  "URLCategory": "Unknown",
  "URLReputation": "Risk unknown"
}&lt;/PRE&gt;&lt;P&gt;To ingest them from Kafka I've created to following external table in Hive matching the JSON structure of the messages &lt;/P&gt;&lt;PRE&gt;CREATE EXTERNAL TABLE ssh_druid_kafka (
 `__time` timestamp,
 `machine1` string,
 `machine2` string,
 `priority` string,
 `level` string,
 `machine3` string,
 `Protocol` string,
 `SrcIP` string,
 `OriginalClientIP` string,
 `DstIP` string,
 `SrcPort` string,
 `DstPort` string,
 `TCPFlags` string,
 `IngressInterface` string,
 `EgressInterface` string,
 `IngressZone` string,
 `EgressZone` string,
 `DE` string,
 `Policy` string,
 `ConnectType` string,
 `AccessControlRuleName` string,
 `AccessControlRuleAction` string,
 `PrefilterPolicy` string,
 `UserName` string,
 `InitiatorPackets` int,
 `ResponderPackets` int,
 `InitiatorBytes` int,
 `ResponderBytes` int,
 `NAPPolicy` string,
 `DNSResponseType` string,
 `Sinkhole` string,
 `URLCategory` string,
 `URLReputation` string
)
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
 TBLPROPERTIES (
 "kafka.bootstrap.servers" = "[kakfa host]:6667",
 "kafka.topic" = "log_schema_raw",
 "druid.kafka.ingestion.useEarliestOffset" = "true",
 "druid.kafka.ingestion.maxRowsInMemory" = "20",
 "druid.kafka.ingestion.startDelay" = "PT5S",
 "druid.kafka.ingestion.period" = "PT30S",
 "druid.kafka.ingestion.consumer.retries" = "2"
);

ALTER TABLE ssh_druid_kafka SET TBLPROPERTIES("druid.kafka.ingestion" = 'START');
&lt;/PRE&gt;&lt;P&gt;I'm getting an indexing task in Druid supervisor...&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="93517-screen-shot-2018-11-05-at-000525.png" style="width: 1802px;"&gt;&lt;img src="https://community.cloudera.com/t5/image/serverpage/image-id/15359iC730E2B341858527/image-size/medium?v=v2&amp;amp;px=400" role="button" title="93517-screen-shot-2018-11-05-at-000525.png" alt="93517-screen-shot-2018-11-05-at-000525.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;=&amp;gt; but no data source in the Druid Broker &lt;span class="lia-unicode-emoji" title=":disappointed_face:"&gt;😞&lt;/span&gt;&lt;/P&gt;&lt;P&gt;Upon closer look at the the task logs in Druid Supervisor, I see parsing errors : &lt;/P&gt;&lt;PRE&gt;2018-11-04T23:06:06,305 ERROR [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [60] Unparseable events! Turn on debug logging to see exception stack trace.
2018-11-04T23:09:06,306 ERROR [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [60] Unparseable events! Turn on debug logging to see exception stack trace.
...&lt;/PRE&gt;&lt;P&gt;&lt;BR /&gt;Questions : &lt;/P&gt;&lt;P&gt;1. How do I enable Debug Logging on tasks ? &lt;BR /&gt;=&amp;gt; I've tried setting the log4j level to DEBUG in the Ambari Druid tab.  That does affect the the log levels of the components but doesn't seem to affect the indexing tasks. &lt;/P&gt;&lt;P&gt;2. What is the format expected by Druid for using the Kafka Indexing Service ? &lt;/P&gt;&lt;P&gt;Am I missing something ? &lt;/P&gt;&lt;P&gt;Thank for your help &lt;/P&gt;</description>
    <pubDate>Fri, 16 Sep 2022 13:51:53 GMT</pubDate>
    <dc:creator>mlamairesse</dc:creator>
    <dc:date>2022-09-16T13:51:53Z</dc:date>
    <item>
      <title>Druid kafka ingestion from Hive - HDP 3.0</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Druid-kafka-ingestion-from-Hive-HDP-3-0/m-p/226446#M84706</link>
      <description>&lt;P&gt;Hi, &lt;/P&gt;&lt;P&gt;I'm trying to ingest event data form Kafka to Druid using the new Hive/Druid/Kafka integration in Hive 3 &lt;BR /&gt;(see -&lt;A href="https://cwiki.apache.org/confluence/display/Hive/Druid+Integration" rel="nofollow noopener noreferrer" target="_blank"&gt; https://cwiki.apache.org/confluence/display/Hive/Druid+Integration&lt;/A&gt;  ; section " Druid Kafka Ingestion from Hive" &lt;/P&gt;&lt;P&gt;I've got events in JSON format in a Kafka topic using the following structure : &lt;/P&gt;&lt;PRE&gt;{
  "timestamp": "2018-11-04T22:43:10Z",
  "machine1": "RXI901",
  "machine2": "RXI902",
  "priority": "74",
  "level": "[e.warning]",
  "machine3": "RXI900",
  "Protocol": "TCP",
  "SrcIP": "109.26.211.73",
  "OriginalClientIP": "::",
  "DstIP": "192.168.104.96",
  "SrcPort": "36711",
  "DstPort": "54",
  "TCPFlags": "0x0",
  "IngressInterface": "s3p4",
  "EgressInterface": "s3p3",
  "IngressZone": "INUTILISE",
  "EgressZone": "INUTILISE",
  "DE": "Primary Detection Engine (f77608a0-0e20-11e6-91d7-88d7e001637c)",
  "Policy": "Default Access Control",
  "ConnectType": "Start",
  "AccessControlRuleName": "Unknown",
  "AccessControlRuleAction": "Allow",
  "PrefilterPolicy": "Unknown",
  "UserName": "No Authentication Required",
  "InitiatorPackets": 1,
  "ResponderPackets": 0,
  "InitiatorBytes": 80,
  "ResponderBytes": 0,
  "NAPPolicy": "Network Analysis",
  "DNSResponseType": "No Error",
  "Sinkhole": "Unknown",
  "URLCategory": "Unknown",
  "URLReputation": "Risk unknown"
}&lt;/PRE&gt;&lt;P&gt;To ingest them from Kafka I've created to following external table in Hive matching the JSON structure of the messages &lt;/P&gt;&lt;PRE&gt;CREATE EXTERNAL TABLE ssh_druid_kafka (
 `__time` timestamp,
 `machine1` string,
 `machine2` string,
 `priority` string,
 `level` string,
 `machine3` string,
 `Protocol` string,
 `SrcIP` string,
 `OriginalClientIP` string,
 `DstIP` string,
 `SrcPort` string,
 `DstPort` string,
 `TCPFlags` string,
 `IngressInterface` string,
 `EgressInterface` string,
 `IngressZone` string,
 `EgressZone` string,
 `DE` string,
 `Policy` string,
 `ConnectType` string,
 `AccessControlRuleName` string,
 `AccessControlRuleAction` string,
 `PrefilterPolicy` string,
 `UserName` string,
 `InitiatorPackets` int,
 `ResponderPackets` int,
 `InitiatorBytes` int,
 `ResponderBytes` int,
 `NAPPolicy` string,
 `DNSResponseType` string,
 `Sinkhole` string,
 `URLCategory` string,
 `URLReputation` string
)
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
 TBLPROPERTIES (
 "kafka.bootstrap.servers" = "[kakfa host]:6667",
 "kafka.topic" = "log_schema_raw",
 "druid.kafka.ingestion.useEarliestOffset" = "true",
 "druid.kafka.ingestion.maxRowsInMemory" = "20",
 "druid.kafka.ingestion.startDelay" = "PT5S",
 "druid.kafka.ingestion.period" = "PT30S",
 "druid.kafka.ingestion.consumer.retries" = "2"
);

ALTER TABLE ssh_druid_kafka SET TBLPROPERTIES("druid.kafka.ingestion" = 'START');
&lt;/PRE&gt;&lt;P&gt;I'm getting an indexing task in Druid supervisor...&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="93517-screen-shot-2018-11-05-at-000525.png" style="width: 1802px;"&gt;&lt;img src="https://community.cloudera.com/t5/image/serverpage/image-id/15359iC730E2B341858527/image-size/medium?v=v2&amp;amp;px=400" role="button" title="93517-screen-shot-2018-11-05-at-000525.png" alt="93517-screen-shot-2018-11-05-at-000525.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;=&amp;gt; but no data source in the Druid Broker &lt;span class="lia-unicode-emoji" title=":disappointed_face:"&gt;😞&lt;/span&gt;&lt;/P&gt;&lt;P&gt;Upon closer look at the the task logs in Druid Supervisor, I see parsing errors : &lt;/P&gt;&lt;PRE&gt;2018-11-04T23:06:06,305 ERROR [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [60] Unparseable events! Turn on debug logging to see exception stack trace.
2018-11-04T23:09:06,306 ERROR [MonitorScheduler-0] io.druid.segment.realtime.RealtimeMetricsMonitor - [60] Unparseable events! Turn on debug logging to see exception stack trace.
...&lt;/PRE&gt;&lt;P&gt;&lt;BR /&gt;Questions : &lt;/P&gt;&lt;P&gt;1. How do I enable Debug Logging on tasks ? &lt;BR /&gt;=&amp;gt; I've tried setting the log4j level to DEBUG in the Ambari Druid tab.  That does affect the the log levels of the components but doesn't seem to affect the indexing tasks. &lt;/P&gt;&lt;P&gt;2. What is the format expected by Druid for using the Kafka Indexing Service ? &lt;/P&gt;&lt;P&gt;Am I missing something ? &lt;/P&gt;&lt;P&gt;Thank for your help &lt;/P&gt;</description>
      <pubDate>Fri, 16 Sep 2022 13:51:53 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Druid-kafka-ingestion-from-Hive-HDP-3-0/m-p/226446#M84706</guid>
      <dc:creator>mlamairesse</dc:creator>
      <dc:date>2022-09-16T13:51:53Z</dc:date>
    </item>
    <item>
      <title>Re: Druid kafka ingestion from Hive - HDP 3.0</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Druid-kafka-ingestion-from-Hive-HDP-3-0/m-p/226447#M84707</link>
      <description>&lt;P&gt;Looked at the code and seems like the current state of the art the timestamp column is hard coded to be __time, thus that is why you are getting the exceptions since your column is called `timestamp`. &lt;/P&gt;&lt;P&gt;&lt;A href="https://github.com/apache/hive/blob/a51e6aeaf816bdeea5e91ba3a0fab8a31b3a496d/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java#L301" target="_blank"&gt;https://github.com/apache/hive/blob/a51e6aeaf816bdeea5e91ba3a0fab8a31b3a496d/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java#L301&lt;/A&gt;&lt;/P&gt;&lt;P&gt;If this is the case this is a serious limitation and need to be fixed. &lt;A rel="user" href="https://community.cloudera.com/users/10777/nbangarwa.html" nodeid="10777"&gt;@Nishant Bangarwa&lt;/A&gt; what you think?&lt;/P&gt;</description>
      <pubDate>Wed, 07 Nov 2018 04:19:55 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Druid-kafka-ingestion-from-Hive-HDP-3-0/m-p/226447#M84707</guid>
      <dc:creator>sbouguerra</dc:creator>
      <dc:date>2018-11-07T04:19:55Z</dc:date>
    </item>
    <item>
      <title>Re: Druid kafka ingestion from Hive - HDP 3.0</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Druid-kafka-ingestion-from-Hive-HDP-3-0/m-p/226448#M84708</link>
      <description>&lt;P&gt;Hi &lt;A rel="user" href="https://community.cloudera.com/users/12341/sbouguerra.html" nodeid="12341"&gt;@Slim&lt;/A&gt;&lt;/P&gt;&lt;P&gt;Thanks, yep that was it. &lt;/P&gt;&lt;P&gt;Another little quirk identified with the help of Charles Bernard : &lt;/P&gt;&lt;P&gt;- All names in the JSON object must be in &lt;STRONG&gt;lower case&lt;/STRONG&gt; for them to be parsed &lt;/P&gt;&lt;P&gt;- A corollary of this is that all columns names must also be in lower case &lt;/P&gt;</description>
      <pubDate>Thu, 08 Nov 2018 03:05:21 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Druid-kafka-ingestion-from-Hive-HDP-3-0/m-p/226448#M84708</guid>
      <dc:creator>mlamairesse</dc:creator>
      <dc:date>2018-11-08T03:05:21Z</dc:date>
    </item>
    <item>
      <title>Re: Druid kafka ingestion from Hive - HDP 3.0</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/Druid-kafka-ingestion-from-Hive-HDP-3-0/m-p/226449#M84709</link>
      <description>&lt;P&gt;Correct &lt;A rel="user" href="https://community.cloudera.com/users/13183/mlamairesse.html" nodeid="13183"&gt;@Matthieu Lamairesse&lt;/A&gt; Druid is case sensitive while Hive is not, thus, to make it work you need to make sure that all the columns are in lowercase format.&lt;/P&gt;</description>
      <pubDate>Thu, 08 Nov 2018 03:07:13 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/Druid-kafka-ingestion-from-Hive-HDP-3-0/m-p/226449#M84709</guid>
      <dc:creator>sbouguerra</dc:creator>
      <dc:date>2018-11-08T03:07:13Z</dc:date>
    </item>
  </channel>
</rss>

