Support Questions
Find answers, ask questions, and share your expertise

GenerateTableFetch Error when trying to read data from Oracle and ingest to Elasticsearch

New Contributor

Hii Guys

 

I am using the below nifi flow:

Oracle → GenerateTableFetch  → ExecuteSQL → Elasticsearch

My database is Oracle and I am trying out an incrmental load by setting the max column as my UPDT_TIMESTAMP column. 

 

Let's say the total number of records is 400,000 in my table. The issue which I am facing is when I use the generateTaleFetch processor, it ends up processing and injesting only 50% of the records.

when I checked the logs, couldn;t see anything unusual. I can see that it creates partitions based on the row number on the table . The page size is the default 10000.

 

I can see the below type of query running in the logs to do a retreival of the next 10,000 records. when I run these queries independently it works fine. I have not given anything in the partition property. Not sure if I should give the update time stamp field in the partition property.

 

2020-01-01 22:52:42,791 DEBUG [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.ExecuteSQL ExecuteSQL[id=016f1083-e639-1a12-4bf5-ae2a628bdf74] Executing query SELECT COLUMNS FROM (SELECT a.*, ROWNUM rnum FROM (SELECT COLUMNS FROM TABLE1 WHERE UPDT_TIMESTAMP <= timestamp '2020-01-01 13:25:23.357' ORDER BY UPDT_TIMESTAMP) a WHERE ROWNUM <= 2150000) WHERE rnum > 2140000

 

Instead of the GenerateTableFetch  if I just use a ExecuteSQL and link it to my elastic search processor, with a property (Max rows per flow file = 500). my whole database gets loaded properly.

I believe the correct way to do an incremental load and a one time load is using the GenerateTableFetch  but can please advise me as to why the number of records, I try to ingest is less than the total number of records.

 

Also have a very naive doubt about the number of concurrent tasks. If I take ExecuteSQL  as a processor and set the number of concurrent tasks to 2, does it mean that it would run the same query / action twice. In a downstream processor like PutElasticsearchHttpRecord  if I set the number of concurrent processing to two, does it mean that these 2 threads split the number of flowfiles amongst themselves from the queue, which would be nice to have as the ingestion to Elastic search would happen in a quicker time. Would there be a use case where both the threads pick up the same flow file and if that's the case, any way of circumventing that.

 

@Shu_ashu @mburgess 

Guys, can please help with these. Struggling with them and no resolution in sight 🙂