Support Questions
Find answers, ask questions, and share your expertise

Unable to create Hive table using data created with Kafka Connect, HDFS sink

New Contributor

1. Create Kafka Connect

docker run -d \
  --name=connect-distributed \
  --net=host \
  -e CONNECT_BOOTSTRAP_SERVERS=<kafka cluster> \
  -e CONNECT_REST_PORT=8082 \
  -e CONNECT_GROUP_ID="ingest-hdfs-from-kafka" \
  -e CONNECT_CONFIG_STORAGE_TOPIC="kafka-hdfs-config" \
  -e CONNECT_OFFSET_STORAGE_TOPIC="kafka-hdfs-offsets" \
  -e CONNECT_STATUS_STORAGE_TOPIC="kafka-hdfs-status" \
  -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
  -e CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE="false" \
  -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
  -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE="false" \
  -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_REST_ADVERTISED_HOST_NAME=<host> \
  -v /gkn/sw/kafka-connect/etc/hadoop:/etc/hadoop:ro \
  -v /gkn/sw/kafka-connect/etc/keytab:/etc/keytab:ro \
  -v /gkn/sw/kafka-connect/etc/krb5.conf:/etc/krb5.conf:ro \
  confluentinc/cp-kafka-connect:3.3.0

2. Create HDFS Sink, default in avro format

curl -X POST -H "Content-Type: application/json" --data '{"name": "hdfs-sink", "config": {"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max":"3", "topics":"SENSORS", "hadoop.conf.dir":"/etc/hadoop", "hdfs.url":"hdfs://<hadoop>", "flush.size":"10000000", "partition.duration.ms": "300000", "partitioner.class":"io.confluent.connect.hdfs.partitioner.DailyPartitioner", "rotate.schedule.interval.ms": "300000", "topics.dir":"/myfolder/ingest/kafka/", "logs.dir":"/myfolder/ingest/logs/", "hdfs.authentication.kerberos":"true", "connect.hdfs.principal":"<principal>", "connect.hdfs.keytab":"<keytab>", "hdfs.namenode.principal":"", "locale":"en-us", "timezone":"GMT" }}' http://<kafka-connect>:28082/connectors

3. Extract schema from Avro

java -jar avro-tools-1.8.2.jar getschema sensors+0+0060650523+0060651522.avro > environment.avsc

The result is this: [ "null", "bytes" ]

4. Create Hive table

CREATE EXTERNAL TABLE SENSORDATA
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/folder/SENSORDATA'
TBLPROPERTIES 
  ('avro.schema.literal'=
     '{"name": "parent", "type": "record", "fields": [
        {"name": "children", "type": {"type": "array",  
                "items": {"name": "child", "type": "record", "fields": [
                            {"name": "id", "type": "string"},
                            {"name": "topics", "type": "string"},
                            {"name": "position", "type": "string"},
                            {"name": "category", "type": "string"},
                            {"name": "value", "type": "float"},
                            {"name": "unit", "type": "string"},
                            {"name": "timestamp", "type": "string"}
                        ]
                }}}
    ]}'
);

or

CREATE EXTERNAL TABLE SENSORDATA
(a string, i struct<id:STRING, topics:STRING, position:STRING, category:STRING, value:FLOAT, unit:STRING, ts:STRING>) 
STORED AS avro
LOCATION '/folder/SENSORDATA'

Then

select * from SENSORDATA limit 10;

Leads to this error

org.apache.hive.service.cli.HiveSQLException: java.io.IOException: org.apache.avro.AvroTypeException: Found string, expecting parent

Do you have any suggestion? Or is there a better way to stream JSON data. Note: Schema definition Kafka producer is not feasible. Any guidance is appreciated.