Created 02-18-2016 05:01 PM
EDIT: I noticed that my REAL record schema was occasionally using integers as the name, and that violated the field name requirements for avro. So, the schema worked once I added a character before the integers.
With NiFi's ConvertCSVToAvro, I have not found much guidance or example regarding the Record Schema Property. The documentation says to use an Avro schema, and it seems like a canonical Avro schema does not work. How do I set it up??
My efforts resulted in the caution symbol saying the message below, and the inability to actually use the processor.
'Record schema' validated against '<text of record schema>'
Here are the properties as I set them...
Record schema:
{ "type": "record", "name": "sensor_data", "fields": [ { "name": "TimeStamp", "type": "string"}, { "name": "Sensor", "type": "string"}, { "name": "Measure", "type": "string"}, { "name": "MeasureValue", "type": ["int","null"], "default":"null"}, { "name": "AdditionalInfo", "type": ["string","null"], "default":"null"} ] }
CSV charset: utf8
CSV delimiter: |
CSV quote character: "
CSV escape character: \
Use CSV header line: false
Lines to skip: 0
Here's some sample data:
2016-02-01T00:03:09Z|Temp_1|TempF|212|"WARNING|Overheating" 2016-02-01T00:05:03Z|Power|N/A||"STATE_CHANGE|Active" 2016-02-02T12:30:00Z|EmployeeCount|0|
Created 02-19-2016 11:35 AM
try using the avro-tools utility to generate schema file. It has a lot of helpful utilities. I remember messing alot with schema before it accepted but avro tools jar is essential
Created 04-27-2016 12:07 PM
To get the current AVRO Tools
wget http://apache.claz.org/avro/avro-1.8.0/java/avro-tools-1.8.0.jar
There's some good documentation here: https://avro.apache.org/docs/1.8.0/gettingstartedjava.html#Compiling+the+schema
This article helped me:
http://www.michael-noll.com/blog/2013/03/17/reading-and-writing-avro-files-from-the-command-line/
Created 05-20-2016 03:58 PM
If you are using Maven to build your project there is a plugin that you can use to auto generate your schema.
https://avro.apache.org/docs/1.8.0/gettingstartedjava.html
<plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>${avro.version}</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>idl-protocol</goal> <goal>schema</goal> <goal>protocol</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/resources/avro/idl</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> <stringType>String</stringType> </configuration> </execution> </executions>
Created on 05-04-2016 02:49 PM - edited 08-18-2019 06:12 AM
I ran the same flow myself and examined the AVRO file in HDFS using AVRO Cli.
Even though I didn't specify SNAPPY compression, it was there in the file.
[root@sandbox opt]# java -jar avro-tools-1.8.0.jar getmeta 23568764174290.avro log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. avro.codec snappy avro.schema {"type":"record","name":"people","doc":"Schema generated by Kite","fields":[{"name":"id","type":"long","doc":"Type inferred from '2'"},{"name":"first_name","type":"string","doc":"Type inferred from 'Gregory'"},{"name":"last_name","type":"string","doc":"Type inferred from 'Vasquez'"},{"name":"email","type":"string","doc":"Type inferred from 'gvasquez1@pcworld.com'"},{"name":"gender","type":"string","doc":"Type inferred from 'Male'"},{"name":"ip_address","type":"string","doc":"Type inferred from '32.8.254.252'"},{"name":"company_name","type":"string","doc":"Type inferred from 'Janyx'"},{"name":"domain_name","type":"string","doc":"Type inferred from 'free.fr'"},{"name":"file_name","type":"string","doc":"Type inferred from 'NonMauris.xls'"},{"name":"mac_address","type":"string","doc":"Type inferred from '03-FB-66-0F-20-A3'"},{"name":"user_agent","type":"string","doc":"Type inferred from '\"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_7;'"},{"name":"lat","type":"string","doc":"Type inferred from ' like Gecko) Version/5.0.4 Safari/533.20.27\"'"},{"name":"long","type":"double","doc":"Type inferred from '26.98829'"}]}
It's hard coded in NIFI.
It always adds SnappyCompression to every AVRO file. No options.
224 writer.setCodec(CodecFactory.snappyCodec());
Created 10-09-2016 02:20 PM
i used inferavroschema + converjsontoavro + puthivestreaming :
json
{ "name": "张三", "num": "2", "score": "3.4", "newtime": "2016-03-01 10:10:10" }
inferred.avro.schema
{ "type" : "record", "name" : "test", "fields" : [ { "name" : "name", "type" : "string", "doc" : "Type inferred from '\"张三\"'" }, { "name" : "num", "type" : "string", "doc" : "Type inferred from '\"2\"'" }, { "name" : "score", "type" : "string", "doc" : "Type inferred from '\"3.4\"'" }, { "name" : "newtime", "type" : "string", "doc" : "Type inferred from '\"2016-03-01 10:10:10\"'" } ] }
then I set the
Record schema ${inferred.avro.schema}
in converjsontoavro.
but in puthivestreaming, got this error:
2016-10-09 09:58:48,360 WARN [put-hive-streaming-0] org.apache.hive.hcatalog.data.JsonSerDe Error [org.codehaus.jackson.JsonParseException: Current token (VALUE_STRING) not numeric, can not use numeric value accessors
at [Source: java.io.ByteArrayInputStream@7fbad804; line: 1, column: 28]] parsing json text [{"name": "张三", "num": "2", "score": "3.4", "newtime": "2016-03-01 10:10:10"}].
2016-10-09 09:58:48,360 ERROR [Timer-Driven Process Thread-9] o.a.n.processors.hive.PutHiveStreaming PutHiveStreaming[id=d50d1499-3137-1226-89c0-86dfeac7bf2c] Error writing record to Hive Streaming transaction
2016-10-09 09:58:48,363 ERROR [Timer-Driven Process Thread-9] o.a.n.processors.hive.PutHiveStreaming
org.apache.hive.hcatalog.streaming.SerializationError: {metaStoreUri='thrift://hive1.wdp:9083', database='newsinfo', table='test1', partitionVals=[] } SerializationError
at org.apache.nifi.util.hive.HiveWriter.write(HiveWriter.java:119) ~[nifi-hive-processors-1.0.0.jar:1.0.0]
at org.apache.nifi.processors.hive.PutHiveStreaming.lambda$onTrigger$4(PutHiveStreaming.java:480) ~[nifi-hive-processors-1.0.0.jar:1.0.0]
at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1880) ~[na:na]
at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1851) ~[na:na]
at org.apache.nifi.processors.hive.PutHiveStreaming.onTrigger(PutHiveStreaming.java:394) ~[nifi-hive-processors-1.0.0.jar:1.0.0]
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-1.0.0.jar:1.0.0]
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1064) ~[na:na]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) ~[na:na]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) ~[na:na]
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) ~[na:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_101]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[na:1.8.0_101]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_101]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[na:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_101]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_101]
Caused by: org.apache.hive.hcatalog.streaming.SerializationError: Unable to convert byte[] record into Object
at org.apache.hive.hcatalog.streaming.StrictJsonWriter.encode(StrictJsonWriter.java:117) ~[hive-hcatalog-streaming-1.2.1.jar:1.2.1]
at org.apache.hive.hcatalog.streaming.StrictJsonWriter.write(StrictJsonWriter.java:78) ~[hive-hcatalog-streaming-1.2.1.jar:1.2.1]
at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:632) ~[hive-hcatalog-streaming-1.2.1.jar:1.2.1]
at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:113) ~[nifi-hive-processors-1.0.0.jar:1.0.0]
at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:110) ~[nifi-hive-processors-1.0.0.jar:1.0.0]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_101]
... 3 common frames omitted
Caused by: org.apache.hadoop.hive.serde2.SerDeException: org.codehaus.jackson.JsonParseException: Current token (VALUE_STRING) not numeric, can not use numeric value accessors
at [Source: java.io.ByteArrayInputStream@7fbad804; line: 1, column: 28]
at org.apache.hive.hcatalog.data.JsonSerDe.deserialize(JsonSerDe.java:179) ~[hive-hcatalog-core-1.2.1.jar:1.2.1]
at org.apache.hive.hcatalog.streaming.StrictJsonWriter.encode(StrictJsonWriter.java:115) ~[hive-hcatalog-streaming-1.2.1.jar:1.2.1]
... 8 common frames omitted
Caused by: org.codehaus.jackson.JsonParseException: Current token (VALUE_STRING) not numeric, can not use numeric value accessors
at [Source: java.io.ByteArrayInputStream@7fbad804; line: 1, column: 28]
at org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1433) ~[jackson-core-asl-1.9.13.jar:1.9.13]
at org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMinimalBase.java:521) ~[jackson-core-asl-1.9.13.jar:1.9.13]
at org.codehaus.jackson.impl.JsonParserBase._parseNumericValue(JsonParserBase.java:766) ~[jackson-core-asl-1.9.13.jar:1.9.13]
at org.codehaus.jackson.impl.JsonParserBase.getIntValue(JsonParserBase.java:622) ~[jackson-core-asl-1.9.13.jar:1.9.13]
at org.apache.hive.hcatalog.data.JsonSerDe.extractCurrentField(JsonSerDe.java:279) ~[hive-hcatalog-core-1.2.1.jar:1.2.1]
at org.apache.hive.hcatalog.data.JsonSerDe.populateRecord(JsonSerDe.java:218) ~[hive-hcatalog-core-1.2.1.jar:1.2.1]
at org.apache.hive.hcatalog.data.JsonSerDe.deserialize(JsonSerDe.java:174) ~[hive-hcatalog-core-1.2.1.jar:1.2.1]
... 9 common frames omitted
Created 05-04-2016 05:04 PM
@Steven Cardella NiFi, Avro and Kafka are proving to be "entertaining". There is a fix for the Snappy compression in the pipeline as mentioned here Previous post
Created 05-04-2016 05:06 PM
excellent, let me know when that drops or is in alpha. I will test it.
Created 05-04-2016 05:14 PM
Follow the link on my thread that refers to the ticket that has been raised. Looks like there is already a fix. Unfortunately I don't have an environment to build and test it.
Created 06-01-2017 11:55 AM
Avro 1.8.2 is now available