Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Druid hortonworks schema registry kafka integartion error

Highlighted

Druid hortonworks schema registry kafka integartion error

New Contributor

Hi,

I'm trying to read avro messages from a kafka topic written to it through nifi with the use of hortonworks schema registry with schema write strategy in nifi as "hwx content-encoded". I have tried using the following schema registry urls (http://localhost:9090/api/v1/confluent/schemas/ids/ and http://127.0.0.1:9090/api/v1 ) but both are throwing the same error since confluent schemaregistry client libraries have been used in druid. The following is my json spec for ingestion task:

{
"type": "kafka",
"dataSchema": {
"dataSource": "location",
"parser": {
"type": "avro_stream",
"avroBytesDecoder": {
"type": "schema_registry",
"url": "http://localhost:9090/api/v1/confluent/schemas/ids/"
},
"parseSpec": {
"format": "avro",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"view"
]
}
}
},
"metricsSpec" : [
{"type": "count", "name": "countagg"}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "HOUR",
"rollup": false,
"intervals": null
}
},
"tuningConfig": {
"type": "kafka",
"reportParseExceptions": false,
"offsetFetchPeriod" : "PT120S",
"logParseExceptions": true
},
"ioConfig": {
"useEarliestOffset": true,
"topic": "location",
"replicas": 1,
"taskDuration": "PT120M",
"completionTimeout": "PT240M",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
}
}
}

and below is the error i'm getting from druid:

2020-05-06T20:53:30,597 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Row at partition[0] offset[10802] was unparseable.
org.apache.druid.java.util.common.parsers.ParseException: Fail to decode avro message!
at org.apache.druid.data.input.avro.SchemaRegistryBasedAvroBytesDecoder.parse(SchemaRegistryBasedAvroBytesDecoder.java:71) ~[?:?]
at org.apache.druid.data.input.AvroStreamInputRowParser.parseBatch(AvroStreamInputRowParser.java:58) ~[?:?]
at org.apache.druid.data.input.AvroStreamInputRowParser.parseBatch(AvroStreamInputRowParser.java:36) ~[?:?]
at org.apache.druid.segment.transform.TransformingInputRowParser.parseBatch(TransformingInputRowParser.java:45) ~[druid-processing-0.17.0.jar:0.17.0]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.parseWithParser(SeekableStreamIndexTaskRunner.java:380) ~[druid-indexing-service-0.17.0.jar:0.17.0]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.parseBytes(SeekableStreamIndexTaskRunner.java:370) [druid-indexing-service-0.17.0.jar:0.17.0]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:660) [druid-indexing-service-0.17.0.jar:0.17.0]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:278) [druid-indexing-service-0.17.0.jar:0.17.0]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:164) [druid-indexing-service-0.17.0.jar:0.17.0]
at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:419) [druid-indexing-service-0.17.0.jar:0.17.0]
at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:391) [druid-indexing-service-0.17.0.jar:0.17.0]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unrecognized field "responseMessage" (class io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage), not marked as ignorable (2 known properties: "error_code", "message"])
at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 106] (through reference chain: io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage["responseMessage"]); error code: 50005
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:164) ~[?:?]
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:181) ~[?:?]
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:317) ~[?:?]
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:310) ~[?:?]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:62) ~[?:?]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndID(CachedSchemaRegistryClient.java:117) ~[?:?]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getByID(CachedSchemaRegistryClient.java:99) ~[?:?]
at org.apache.druid.data.input.avro.SchemaRegistryBasedAvroBytesDecoder.parse(SchemaRegistryBasedAvroBytesDecoder.java:66) ~[?:?]
... 14 more

 

Thanks.

Collins

Don't have an account?
Coming from Hortonworks? Activate your account here