Support Questions

Find answers, ask questions, and share your expertise

NiFi: Applying an Avro Schema in ConvertCSVToAvro

avatar
Explorer

EDIT: I noticed that my REAL record schema was occasionally using integers as the name, and that violated the field name requirements for avro. So, the schema worked once I added a character before the integers.

With NiFi's ConvertCSVToAvro, I have not found much guidance or example regarding the Record Schema Property. The documentation says to use an Avro schema, and it seems like a canonical Avro schema does not work. How do I set it up??

My efforts resulted in the caution symbol saying the message below, and the inability to actually use the processor.

'Record schema' validated against '<text of record schema>'

Here are the properties as I set them...

Record schema:

{
  "type": "record",
  "name": "sensor_data",
  "fields":
  [
    { "name": "TimeStamp", "type": "string"},
    { "name": "Sensor", "type": "string"},
    { "name": "Measure", "type": "string"},
    { "name": "MeasureValue", "type": ["int","null"], "default":"null"},
    { "name": "AdditionalInfo", "type": ["string","null"], "default":"null"}
  ]
}

CSV charset: utf8

CSV delimiter: |

CSV quote character: "

CSV escape character: \

Use CSV header line: false

Lines to skip: 0

Here's some sample data:

2016-02-01T00:03:09Z|Temp_1|TempF|212|"WARNING|Overheating"
2016-02-01T00:05:03Z|Power|N/A||"STATE_CHANGE|Active"
2016-02-02T12:30:00Z|EmployeeCount|0|
9 REPLIES 9

avatar
Master Mentor
@Steven Cardella

try using the avro-tools utility to generate schema file. It has a lot of helpful utilities. I remember messing alot with schema before it accepted but avro tools jar is essential

avatar

If you are using Maven to build your project there is a plugin that you can use to auto generate your schema.

https://avro.apache.org/docs/1.8.0/gettingstartedjava.html

<plugin>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-maven-plugin</artifactId>
  <version>${avro.version}</version>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals>
        <goal>idl-protocol</goal>
        <goal>schema</goal>
        <goal>protocol</goal>
      </goals>
      <configuration>
        <sourceDirectory>${project.basedir}/src/main/resources/avro/idl</sourceDirectory>
        <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
        <stringType>String</stringType>
      </configuration>
    </execution>
  </executions>


avatar
Master Guru

3996-hdfs.png

I ran the same flow myself and examined the AVRO file in HDFS using AVRO Cli.

Even though I didn't specify SNAPPY compression, it was there in the file.

[root@sandbox opt]# java -jar avro-tools-1.8.0.jar getmeta 23568764174290.avro
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
avro.codec snappy
avro.schema {"type":"record","name":"people","doc":"Schema generated by Kite","fields":[{"name":"id","type":"long","doc":"Type inferred from '2'"},{"name":"first_name","type":"string","doc":"Type inferred from 'Gregory'"},{"name":"last_name","type":"string","doc":"Type inferred from 'Vasquez'"},{"name":"email","type":"string","doc":"Type inferred from 'gvasquez1@pcworld.com'"},{"name":"gender","type":"string","doc":"Type inferred from 'Male'"},{"name":"ip_address","type":"string","doc":"Type inferred from '32.8.254.252'"},{"name":"company_name","type":"string","doc":"Type inferred from 'Janyx'"},{"name":"domain_name","type":"string","doc":"Type inferred from 'free.fr'"},{"name":"file_name","type":"string","doc":"Type inferred from 'NonMauris.xls'"},{"name":"mac_address","type":"string","doc":"Type inferred from '03-FB-66-0F-20-A3'"},{"name":"user_agent","type":"string","doc":"Type inferred from '\"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_7;'"},{"name":"lat","type":"string","doc":"Type inferred from ' like Gecko) Version/5.0.4 Safari/533.20.27\"'"},{"name":"long","type":"double","doc":"Type inferred from '26.98829'"}]}

It's hard coded in NIFI.

https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/sr...

It always adds SnappyCompression to every AVRO file. No options.

 224   writer.setCodec(CodecFactory.snappyCodec());

avatar
Expert Contributor

i used inferavroschema + converjsontoavro + puthivestreaming :

json

{ "name": "张三", "num": "2", "score": "3.4", "newtime": "2016-03-01 10:10:10" }

inferred.avro.schema

{ "type" : "record", "name" : "test", "fields" : [ { "name" : "name", "type" : "string", "doc" : "Type inferred from '\"张三\"'" }, { "name" : "num", "type" : "string", "doc" : "Type inferred from '\"2\"'" }, { "name" : "score", "type" : "string", "doc" : "Type inferred from '\"3.4\"'" }, { "name" : "newtime", "type" : "string", "doc" : "Type inferred from '\"2016-03-01 10:10:10\"'" } ] }

then I set the

Record schema ${inferred.avro.schema}

in converjsontoavro.

but in puthivestreaming, got this error:

2016-10-09 09:58:48,360 WARN [put-hive-streaming-0] org.apache.hive.hcatalog.data.JsonSerDe Error [org.codehaus.jackson.JsonParseException: Current token (VALUE_STRING) not numeric, can not use numeric value accessors

at [Source: java.io.ByteArrayInputStream@7fbad804; line: 1, column: 28]] parsing json text [{"name": "张三", "num": "2", "score": "3.4", "newtime": "2016-03-01 10:10:10"}].

2016-10-09 09:58:48,360 ERROR [Timer-Driven Process Thread-9] o.a.n.processors.hive.PutHiveStreaming PutHiveStreaming[id=d50d1499-3137-1226-89c0-86dfeac7bf2c] Error writing record to Hive Streaming transaction

2016-10-09 09:58:48,363 ERROR [Timer-Driven Process Thread-9] o.a.n.processors.hive.PutHiveStreaming

org.apache.hive.hcatalog.streaming.SerializationError: {metaStoreUri='thrift://hive1.wdp:9083', database='newsinfo', table='test1', partitionVals=[] } SerializationError

at org.apache.nifi.util.hive.HiveWriter.write(HiveWriter.java:119) ~[nifi-hive-processors-1.0.0.jar:1.0.0]

at org.apache.nifi.processors.hive.PutHiveStreaming.lambda$onTrigger$4(PutHiveStreaming.java:480) ~[nifi-hive-processors-1.0.0.jar:1.0.0]

at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1880) ~[na:na]

at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1851) ~[na:na]

at org.apache.nifi.processors.hive.PutHiveStreaming.onTrigger(PutHiveStreaming.java:394) ~[nifi-hive-processors-1.0.0.jar:1.0.0]

at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-1.0.0.jar:1.0.0]

at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1064) ~[na:na]

at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) ~[na:na]

at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) ~[na:na]

at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) ~[na:na]

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_101]

at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[na:1.8.0_101]

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_101]

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[na:1.8.0_101]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_101]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_101]

at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_101]

Caused by: org.apache.hive.hcatalog.streaming.SerializationError: Unable to convert byte[] record into Object

at org.apache.hive.hcatalog.streaming.StrictJsonWriter.encode(StrictJsonWriter.java:117) ~[hive-hcatalog-streaming-1.2.1.jar:1.2.1]

at org.apache.hive.hcatalog.streaming.StrictJsonWriter.write(StrictJsonWriter.java:78) ~[hive-hcatalog-streaming-1.2.1.jar:1.2.1]

at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:632) ~[hive-hcatalog-streaming-1.2.1.jar:1.2.1]

at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:113) ~[nifi-hive-processors-1.0.0.jar:1.0.0]

at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:110) ~[nifi-hive-processors-1.0.0.jar:1.0.0]

at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_101]

... 3 common frames omitted

Caused by: org.apache.hadoop.hive.serde2.SerDeException: org.codehaus.jackson.JsonParseException: Current token (VALUE_STRING) not numeric, can not use numeric value accessors

at [Source: java.io.ByteArrayInputStream@7fbad804; line: 1, column: 28]

at org.apache.hive.hcatalog.data.JsonSerDe.deserialize(JsonSerDe.java:179) ~[hive-hcatalog-core-1.2.1.jar:1.2.1]

at org.apache.hive.hcatalog.streaming.StrictJsonWriter.encode(StrictJsonWriter.java:115) ~[hive-hcatalog-streaming-1.2.1.jar:1.2.1]

... 8 common frames omitted

Caused by: org.codehaus.jackson.JsonParseException: Current token (VALUE_STRING) not numeric, can not use numeric value accessors

at [Source: java.io.ByteArrayInputStream@7fbad804; line: 1, column: 28]

at org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1433) ~[jackson-core-asl-1.9.13.jar:1.9.13]

at org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMinimalBase.java:521) ~[jackson-core-asl-1.9.13.jar:1.9.13]

at org.codehaus.jackson.impl.JsonParserBase._parseNumericValue(JsonParserBase.java:766) ~[jackson-core-asl-1.9.13.jar:1.9.13]

at org.codehaus.jackson.impl.JsonParserBase.getIntValue(JsonParserBase.java:622) ~[jackson-core-asl-1.9.13.jar:1.9.13]

at org.apache.hive.hcatalog.data.JsonSerDe.extractCurrentField(JsonSerDe.java:279) ~[hive-hcatalog-core-1.2.1.jar:1.2.1]

at org.apache.hive.hcatalog.data.JsonSerDe.populateRecord(JsonSerDe.java:218) ~[hive-hcatalog-core-1.2.1.jar:1.2.1]

at org.apache.hive.hcatalog.data.JsonSerDe.deserialize(JsonSerDe.java:174) ~[hive-hcatalog-core-1.2.1.jar:1.2.1]

... 9 common frames omitted

avatar
Rising Star

@Steven Cardella NiFi, Avro and Kafka are proving to be "entertaining". There is a fix for the Snappy compression in the pipeline as mentioned here Previous post

avatar
Master Guru

excellent, let me know when that drops or is in alpha. I will test it.

avatar
Rising Star

Follow the link on my thread that refers to the ticket that has been raised. Looks like there is already a fix. Unfortunately I don't have an environment to build and test it.

avatar
Master Guru

Avro 1.8.2 is now available