Support Questions

Find answers, ask questions, and share your expertise
Announcements
Now Live: Explore expert insights and technical deep dives on the new Cloudera Community BlogsRead the Announcement

Nifi 2.7.x seems to break ParquetRecordReader for timestamps

avatar
Reader

Hi everyone,

I'm currently trying to write records containing timestamps to parquet.

The writing part seems to work fine, as the resulting parquet file contains the avro schema '{"name":"ts_string","type":{"type":"long","logicalType":"timestamp-millis"}}' and the parquet column type "timestamp[ms, tz=UTC]".

However when reading the same parquet file, I get the error message:

ERROR [Timer-Driven Process Thread-6] o.a.n.processors.standard.ConvertRecord ConvertRecord[id=c2746584-b622-3127-f4fd-c46a9243553e] Failed to process StandardFlowFileRecord[uuid=cef04323-a7c0-4965-84d6-c2b8d16c3fac,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1768470437932-1, container=default, section=1], offset=17423, length=6057],offset=0,name=73ca82fa-02af-44d1-9c23-27e0b2b6588f,size=6057]; will route to failure
java.lang.ClassCastException: class java.time.Instant cannot be cast to class java.lang.Long (java.time.Instant and java.lang.Long are in module java.base of loader 'bootstrap')
at org.apache.nifi.avro.AvroTypeUtil.normalizeValue(AvroTypeUtil.java:1175)
at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:979)
at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:943)
at org.apache.nifi.parquet.record.ParquetRecordReader.nextRecord(ParquetRecordReader.java:111)
at org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:251)
at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:237)
at jdk.proxy41/jdk.proxy41.$Proxy214.nextRecord(Unknown Source)
at org.apache.nifi.processors.standard.AbstractRecordProcessor.lambda$onTrigger$0(AbstractRecordProcessor.java:132)
at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:3410)
at org.apache.nifi.processors.standard.AbstractRecordProcessor.onTrigger(AbstractRecordProcessor.java:125)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1274)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:229)
at org.apache.nifi.controller.scheduling.AbstractTimeBasedSchedulingAgent.lambda$doScheduleOnce$0(AbstractTimeBasedSchedulingAgent.java:59)
at org.apache.nifi.engine.FlowEngine.lambda$wrap$1(FlowEngine.java:105)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)

Trying the same thing in Nifi 2.6.0 works as expected.

Here is a basic flow to recreate the problem:

  • GenerateRecord
    • Record Writer: ParquetRecordSetWriter
    • test (dynamic property): Adress - Full Address
  • QueryRecord
    • Record Reader: ParquetReader
    • Record Writer: PaquetRecordSetWriter
    • test (dynamic property): "SELECT *, cast(${now():toNumber()} as timestamp) AS ts FROM FLOWFILE"
  • ConvertRecord
    • Record Reader: ParquetReader
    • Record Writer: AvroRecordSetWriter

The error is thrown in the ConvertRecord Processor. Interestingly. when trying to display the flowfile in the new parquet content viewer, it also seems to crash/doesn't show anything.

On a side note: Are there any (better) recommendations on converting fields in records to their appropriate datatypes? I've tried using UpdateRecord with the "toDate" function. However, when using Schema Access Strategy "Inherit Record Schema", the resulting avro type is '["string","null"]'.

 

Kind regards

Nick

1 REPLY 1

avatar
Community Manager

@nicktk Welcome to the Cloudera Community!

To help you get the best possible solution, I have tagged our NiFi experts @MattWho @mburgess  who may be able to assist you further.

Please keep us updated on your post, and we hope you find a satisfactory solution to your query.


Regards,

Diana Torres,
Senior Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community: