Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

GenerateTableFetch Error when trying to read data from Oracle and ingest to Elasticsearch

Highlighted

GenerateTableFetch Error when trying to read data from Oracle and ingest to Elasticsearch

New Contributor

Hii Guys

 

I am using the below nifi flow:

Oracle → GenerateTableFetch  → ExecuteSQL → Elasticsearch

My database is Oracle and I am trying out an incrmental load by setting the max column as my UPDT_TIMESTAMP column. 

 

Let's say the total number of records is 400,000 in my table. The issue which I am facing is when I use the generateTaleFetch processor, it ends up processing and injesting only 50% of the records.

when I checked the logs, couldn;t see anything unusual. I can see that it creates partitions based on the row number on the table . The page size is the default 10000.

 

I can see the below type of query running in the logs to do a retreival of the next 10,000 records. when I run these queries independently it works fine. I have not given anything in the partition property. Not sure if I should give the update time stamp field in the partition property.

 

2020-01-01 22:52:42,791 DEBUG [Timer-Driven Process Thread-6] o.a.nifi.processors.standard.ExecuteSQL ExecuteSQL[id=016f1083-e639-1a12-4bf5-ae2a628bdf74] Executing query SELECT COLUMNS FROM (SELECT a.*, ROWNUM rnum FROM (SELECT COLUMNS FROM TABLE1 WHERE UPDT_TIMESTAMP <= timestamp '2020-01-01 13:25:23.357' ORDER BY UPDT_TIMESTAMP) a WHERE ROWNUM <= 2150000) WHERE rnum > 2140000

 

Instead of the GenerateTableFetch  if I just use a ExecuteSQL and link it to my elastic search processor, with a property (Max rows per flow file = 500). my whole database gets loaded properly.

I believe the correct way to do an incremental load and a one time load is using the GenerateTableFetch  but can please advise me as to why the number of records, I try to ingest is less than the total number of records.

 

Also have a very naive doubt about the number of concurrent tasks. If I take ExecuteSQL  as a processor and set the number of concurrent tasks to 2, does it mean that it would run the same query / action twice. In a downstream processor like PutElasticsearchHttpRecord  if I set the number of concurrent processing to two, does it mean that these 2 threads split the number of flowfiles amongst themselves from the queue, which would be nice to have as the ingestion to Elastic search would happen in a quicker time. Would there be a use case where both the threads pick up the same flow file and if that's the case, any way of circumventing that.

 

@Shu_ashu @mburgess 

Guys, can please help with these. Struggling with them and no resolution in sight
Don't have an account?
Coming from Hortonworks? Activate your account here