Support Questions

Find answers, ask questions, and share your expertise
Announcements
We’ve updated our product names and community labels - click here for full details

Nifi 2.7.x seems to break ParquetRecordReader for timestamps

avatar
New Contributor

Hi everyone,

I'm currently trying to write records containing timestamps to parquet.

The writing part seems to work fine, as the resulting parquet file contains the avro schema '{"name":"ts_string","type":{"type":"long","logicalType":"timestamp-millis"}}' and the parquet column type "timestamp[ms, tz=UTC]".

However when reading the same parquet file, I get the error message:

ERROR [Timer-Driven Process Thread-6] o.a.n.processors.standard.ConvertRecord ConvertRecord[id=c2746584-b622-3127-f4fd-c46a9243553e] Failed to process StandardFlowFileRecord[uuid=cef04323-a7c0-4965-84d6-c2b8d16c3fac,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1768470437932-1, container=default, section=1], offset=17423, length=6057],offset=0,name=73ca82fa-02af-44d1-9c23-27e0b2b6588f,size=6057]; will route to failure
java.lang.ClassCastException: class java.time.Instant cannot be cast to class java.lang.Long (java.time.Instant and java.lang.Long are in module java.base of loader 'bootstrap')
at org.apache.nifi.avro.AvroTypeUtil.normalizeValue(AvroTypeUtil.java:1175)
at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:979)
at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:943)
at org.apache.nifi.parquet.record.ParquetRecordReader.nextRecord(ParquetRecordReader.java:111)
at org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:251)
at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:237)
at jdk.proxy41/jdk.proxy41.$Proxy214.nextRecord(Unknown Source)
at org.apache.nifi.processors.standard.AbstractRecordProcessor.lambda$onTrigger$0(AbstractRecordProcessor.java:132)
at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:3410)
at org.apache.nifi.processors.standard.AbstractRecordProcessor.onTrigger(AbstractRecordProcessor.java:125)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1274)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:229)
at org.apache.nifi.controller.scheduling.AbstractTimeBasedSchedulingAgent.lambda$doScheduleOnce$0(AbstractTimeBasedSchedulingAgent.java:59)
at org.apache.nifi.engine.FlowEngine.lambda$wrap$1(FlowEngine.java:105)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)

Trying the same thing in Nifi 2.6.0 works as expected.

Here is a basic flow to recreate the problem:

  • GenerateRecord
    • Record Writer: ParquetRecordSetWriter
    • test (dynamic property): Adress - Full Address
  • QueryRecord
    • Record Reader: ParquetReader
    • Record Writer: PaquetRecordSetWriter
    • test (dynamic property): "SELECT *, cast(${now():toNumber()} as timestamp) AS ts FROM FLOWFILE"
  • ConvertRecord
    • Record Reader: ParquetReader
    • Record Writer: AvroRecordSetWriter

The error is thrown in the ConvertRecord Processor. Interestingly. when trying to display the flowfile in the new parquet content viewer, it also seems to crash/doesn't show anything.

On a side note: Are there any (better) recommendations on converting fields in records to their appropriate datatypes? I've tried using UpdateRecord with the "toDate" function. However, when using Schema Access Strategy "Inherit Record Schema", the resulting avro type is '["string","null"]'.

 

Kind regards

Nick

1 ACCEPTED SOLUTION

avatar
New Contributor

Thanks to the issue created by @svenvk this was at least partially addressed in NiFi 2.8.0. The problem was that newer Avro/Parquet libraries are using Java 8 Datatypes instead of Long/Integer to represent timestamps.

I say partially, because the fix only implements the Avro types:

  • time-millis
  • time-micros
  • timestamp-millis
  • timestamp-micros

Missing is the type for nanosecond precision "timestamp-nanos" as well as the three "local-timestamp-{millis,micros,nanos}" types.

See https://avro.apache.org/docs/1.12.0/specification/#timestamps 

Additionally, the new parquet viewer when displaying a flow file seems to suffer from the same problem that created the error described here.

I will create new issues in the NiFi Jira 🙂

 

View solution in original post

3 REPLIES 3

avatar
Community Manager

@nicktk Welcome to the Cloudera Community!

To help you get the best possible solution, I have tagged our NiFi experts @MattWho @mburgess  who may be able to assist you further.

Please keep us updated on your post, and we hope you find a satisfactory solution to your query.


Regards,

Diana Torres,
Senior Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
New Member

I noticed the same. Whenever trying to read a parquet file with timestamps in v2.7, it fails with that error. 

I've created a ticket on the nifi project: https://issues.apache.org/jira/browse/NIFI-15548

 

avatar
New Contributor

Thanks to the issue created by @svenvk this was at least partially addressed in NiFi 2.8.0. The problem was that newer Avro/Parquet libraries are using Java 8 Datatypes instead of Long/Integer to represent timestamps.

I say partially, because the fix only implements the Avro types:

  • time-millis
  • time-micros
  • timestamp-millis
  • timestamp-micros

Missing is the type for nanosecond precision "timestamp-nanos" as well as the three "local-timestamp-{millis,micros,nanos}" types.

See https://avro.apache.org/docs/1.12.0/specification/#timestamps 

Additionally, the new parquet viewer when displaying a flow file seems to suffer from the same problem that created the error described here.

I will create new issues in the NiFi Jira 🙂