Created 05-06-2020 02:44 PM
Hi,
I'm trying to read avro messages from a kafka topic written to it through nifi with the use of hortonworks schema registry with schema write strategy in nifi as "hwx content-encoded". I have tried using the following schema registry urls (http://localhost:9090/api/v1/confluent/schemas/ids/ and http://127.0.0.1:9090/api/v1 ) but both are throwing the same error since confluent schemaregistry client libraries have been used in druid. The following is my json spec for ingestion task:
{
"type": "kafka",
"dataSchema": {
"dataSource": "location",
"parser": {
"type": "avro_stream",
"avroBytesDecoder": {
"type": "schema_registry",
"url": "http://localhost:9090/api/v1/confluent/schemas/ids/"
},
"parseSpec": {
"format": "avro",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"view"
]
}
}
},
"metricsSpec" : [
{"type": "count", "name": "countagg"}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "HOUR",
"rollup": false,
"intervals": null
}
},
"tuningConfig": {
"type": "kafka",
"reportParseExceptions": false,
"offsetFetchPeriod" : "PT120S",
"logParseExceptions": true
},
"ioConfig": {
"useEarliestOffset": true,
"topic": "location",
"replicas": 1,
"taskDuration": "PT120M",
"completionTimeout": "PT240M",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
}
}
}
and below is the error i'm getting from druid:
2020-05-06T20:53:30,597 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Row at partition[0] offset[10802] was unparseable.
org.apache.druid.java.util.common.parsers.ParseException: Fail to decode avro message!
at org.apache.druid.data.input.avro.SchemaRegistryBasedAvroBytesDecoder.parse(SchemaRegistryBasedAvroBytesDecoder.java:71) ~[?:?]
at org.apache.druid.data.input.AvroStreamInputRowParser.parseBatch(AvroStreamInputRowParser.java:58) ~[?:?]
at org.apache.druid.data.input.AvroStreamInputRowParser.parseBatch(AvroStreamInputRowParser.java:36) ~[?:?]
at org.apache.druid.segment.transform.TransformingInputRowParser.parseBatch(TransformingInputRowParser.java:45) ~[druid-processing-0.17.0.jar:0.17.0]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.parseWithParser(SeekableStreamIndexTaskRunner.java:380) ~[druid-indexing-service-0.17.0.jar:0.17.0]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.parseBytes(SeekableStreamIndexTaskRunner.java:370) [druid-indexing-service-0.17.0.jar:0.17.0]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:660) [druid-indexing-service-0.17.0.jar:0.17.0]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:278) [druid-indexing-service-0.17.0.jar:0.17.0]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:164) [druid-indexing-service-0.17.0.jar:0.17.0]
at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:419) [druid-indexing-service-0.17.0.jar:0.17.0]
at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:391) [druid-indexing-service-0.17.0.jar:0.17.0]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unrecognized field "responseMessage" (class io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage), not marked as ignorable (2 known properties: "error_code", "message"])
at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 106] (through reference chain: io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage["responseMessage"]); error code: 50005
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:164) ~[?:?]
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:181) ~[?:?]
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:317) ~[?:?]
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:310) ~[?:?]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:62) ~[?:?]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndID(CachedSchemaRegistryClient.java:117) ~[?:?]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getByID(CachedSchemaRegistryClient.java:99) ~[?:?]
at org.apache.druid.data.input.avro.SchemaRegistryBasedAvroBytesDecoder.parse(SchemaRegistryBasedAvroBytesDecoder.java:66) ~[?:?]
... 14 more
Thanks.
Collins