Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

NIFI-How to ingest dynamic csv file into postgresql

avatar
New Contributor

I was working with NIFI to ingest csv file into postgresql. But I have many different csv file so cannot define specific avro schema. So I use inferavroschema processor. But there is some problem

1. I have to ingest csv file which have timestamp type column. But when I run inferavroschema processor, it define the type of column as string

2. If first row of column is null, processor define the type of column as string, even though its original type is integer or double

So what I figured out was to retrieve the avroschema from the table in the database using the filename, but I do not know if there is a processor that does this. Can anyone help me with this?
1 ACCEPTED SOLUTION

avatar
Master Guru

@Jeong Jong-Duk

Using external Database:

There are LookupRecord/LookupAttribute/SelectHiveQL/FetchHbase processor available in NiFi as you can setup lookupservice then you are able to retrieve the avro schema from Database.

(or)

Using NiFi processors:

You can add Avro schema attribute by using NiFi UpdateAttribute Advanced usage by checking the filename you can add the appropriate avro schema as the flowfile attribute then in your Record Reader/Writer controller services uses that schema.

This is basically a case statement in UpdateAttribute processor based on filename we are making decision which schema we need to add to the flowfile, you need to define schema with avro logical type(timestamo) and configure record reader/writer controller services with the timestamp/time/date formats so that processor can read/write the flowfile content to postgressql.

Refer to this link to configure and usage of UpdateAttribute Advanced usage.

View solution in original post

2 REPLIES 2

avatar
Master Guru

@Jeong Jong-Duk

Using external Database:

There are LookupRecord/LookupAttribute/SelectHiveQL/FetchHbase processor available in NiFi as you can setup lookupservice then you are able to retrieve the avro schema from Database.

(or)

Using NiFi processors:

You can add Avro schema attribute by using NiFi UpdateAttribute Advanced usage by checking the filename you can add the appropriate avro schema as the flowfile attribute then in your Record Reader/Writer controller services uses that schema.

This is basically a case statement in UpdateAttribute processor based on filename we are making decision which schema we need to add to the flowfile, you need to define schema with avro logical type(timestamo) and configure record reader/writer controller services with the timestamp/time/date formats so that processor can read/write the flowfile content to postgressql.

Refer to this link to configure and usage of UpdateAttribute Advanced usage.

avatar
New Contributor

@Shu

Greate! It work well. Tank you