Created 03-23-2018 03:53 PM
I have added more rows to the source table but even though the QueryDatabaseTable is showing green arrow ,means its running, but its not picking up the newly added rows . The old rows in the table it successfully moved to HIVE.
I am attaching the schedule qdb-schedule.jpgflow-diag.jpg
Created on 03-23-2018 03:58 PM - edited 08-18-2019 12:10 AM
ah wait a second .. it loaded the hive table with the new rows but the nifi console is showing everything as zero ? why ?
Created on 03-23-2018 04:29 PM - edited 08-18-2019 12:09 AM
All the changes that got made to your table is pulled before 5 mins so the console showing all zero's, as all the processor level usage/stats on NiFi Processors will show sliding 5 mins window.
As you have scheduled to run QueryDatabaseTable processor to 10 sec as the processor runs for every 10 sec and checks is there any new records got added after the stored state.
Now the processor got pulled all the newly added records and there is no records got added after the last state that got stored in the processor.
Even though processor runs for every 10 sec at the time in NiFi console querydatabase table processor updates the Read/Write and Tasks/Time usage/stats if there are any new changes made to the table then processor will have out stat will be updated.
When you see out stat having some size then only you can see ConvertAvroTo ORC,PutHDFS processor usage/stats will be updated.
Example:-
As you can see in the above screenshot i'm running QDB every 10 sec and there are stats on Read/write,Tasks/Time but nothing got in Out stats that means processor running every 10 sec but still could find any updated records, that's the reason why nifi console showing everything is zero for ConvertAvroToOrc and PutHDFS processors.
Clear the state in QDB and run again:-
i have cleared the state in QDB processor and ran again now we can see out having 468 bytes of filesize i.e size of flowfile that got sent from QDB processor and convertAvroToOrc having updated stats.
So when QDB processor sends an output flowfile then only all the processor stats will be updated and they will last for 5 minutes.
when there are no changes made on the table but still QDB processor checks for new records if it found any new records updates the state and sends an output flowfile, If not then QDB processor won't send any flowfile and won't update the state.
In addition as you are running QDB processor on all nodes but this processor is intended to run on only primary node if the processor running on all nodes it will duplicate the data. Please change the running to primary node.
Created on 03-23-2018 06:06 PM - edited 08-18-2019 12:09 AM
hi Shu
is it possible to find how long it took for the last step in Nifi? I want to see how long it took to load the newly added bunch of rows to a table.
Also I loaded 55 million rows to the table and I know nifi is trying to load it since I see a "1" in the processor window but other than that every other counter is zero and not showing any data transfer ..why ?
Created on 03-23-2018 06:45 PM - edited 08-18-2019 12:09 AM
Created 03-23-2018 06:59 PM
Please change the below properties to true
Normalize Table/Column Namestrue
Look into below properties
Fetch Size | 0 | The number of result rows to be fetched from the result set at a time. This is a hint to the driver and may not be honored and/or exact. If the value specified is zero, then the hint is ignored. Supports Expression Language: true |
Max Rows Per Flow File | 0 | The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile. Supports Expression Language: true |
Maximum Number of Fragments | 0 | The maximum number of fragments. If the value specified is zero, then all fragments are returned. This prevents OutOfMemoryError when this processor ingests huge table. Supports Expression Language: true |
Change the properties accordingly to your requirements and if these properties are set to default values then you can get all 55 million records in one flowfile until QDB processor pulls all the records you can see the thread running on Right corner of the QDB processor.
Created 03-23-2018 07:12 PM
If you want to do this pull more parallelly then Generatetablefetch +RemoteProcessorGroup + ExecuteSql processor will do that
for more details about Generate Table fetch processor
Created on 03-23-2018 09:36 PM - edited 08-18-2019 12:09 AM
I have changed the properties as shown , restarted NIfi and restarted the flow but I see only 4 new ORC files got produced , on the flow window I keep seeing the thread go from 0-4 and tasks go from 0 -44 .
the hive table only showing 5.5millon rows .
is there a log file where nifi logs how many rows it has processed and what is it currently doing?
these information are not in /var/log/nifi/nifi-user.log or /var/log/nifi/nifi-app.log
Created 03-23-2018 09:05 PM
I recommend against using Maximum Number of Fragments, since the result set is not ordered, you can miss rows on the next execution if the largest current value for the max-value column has been encountered in the first set of "fragments". Instead, in NiFi 1.6.0 / HDF 3.2 (via NIFI-4836), you will be able to send out flow files while the result set is still being processed (via the Output Batch Size parameter). Note that when you use that parameter, the flow files won't have the total result set count information (because some flow files have already been transferred before the total count was ascertained), and it's not transactional (meaning an error halfway through the result set processing may end up with duplicate data being transmitted downstream on the next run).
Created 03-23-2018 09:40 PM
like now the threads are not even showing and tasks is showing as 44 . after a while it will pop up with threads 4 .
but no more orc files are being produced.
these are the files that were produced and it amounts to 5.5 million rows . my source table has 55 million rows
[hdfs@hadoop1 ~]$ hdfs dfs -ls /user/sami Found 8 items -rw-r--r-- 3 nifi hdfs 53015 2018-03-23 10:24 /user/sami/73040114026486.orc -rw-r--r-- 3 nifi hdfs 4836592 2018-03-23 11:37 /user/sami/77434128924536.orc -rw-r--r-- 3 nifi hdfs 5059986 2018-03-23 12:02 /user/sami/78946299398134.orc -rw-r--r-- 3 nifi hdfs 188138117 2018-03-23 12:13 /user/sami/79332578073548.orc -rw-r--r-- 3 nifi hdfs 12860834 2018-03-23 16:40 /user/sami/89388425185109.orc -rw-r--r-- 3 nifi hdfs 12860834 2018-03-23 16:40 /user/sami/89388494417871.orc -rw-r--r-- 3 nifi hdfs 12860834 2018-03-23 16:40 /user/sami/89388714549104.orc -rw-r--r-- 3 nifi hdfs 12860834 2018-03-23 16:40 /user/sami/89388732452940.orc [hdfs@hadoop1 ~]$
Created 03-24-2018 09:00 PM
You can do nifi.sh to analyse what does the threads are working on
./nifi.sh dump thread-dump.txt
Could you please clear state in the Querydatabase table processor and then run again the processor, we are encountering the issues that Matt Burgess mentioned in the comments,matt's comment "I recommend against using Maximum Number of Fragments, since the result set is not ordered, you can miss rows on the next execution if the largest current value for the max-value column has been encountered in the first set of "fragments". Instead, in NiFi 1.6.0 / HDF 3.2 (via NIFI-4836), you will be able to send out flow files while the result set is still being processed (via the Output Batch Size parameter). Note that when you use that parameter, the flow files won't have the total result set count information (because some flow files have already been transferred before the total count was ascertained), and it's not transactional (meaning an error halfway through the result set processing may end up with duplicate data being transmitted downstream on the next run)."
Please keep Maximum Number of Fragments to zero and re run the processor again then we are not going to miss any records.
Created 03-26-2018 02:03 PM
"FileSystemRepository Workers Thread-4" Id=59 WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2d0 0fd51 at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) "Finalizer" Id=3 WAITING on java.lang.ref.ReferenceQueue$Lock@59551962 at java.lang.Object.wait(Native Method) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164) at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
Created on 03-26-2018 02:59 PM - edited 08-18-2019 12:09 AM
after clearing everything I started the flow again but its still doing nothing . the dump file shows the same waiting messages.
I had changed the concurrent tasks to 4 in the QDB-scheduling, maybe this is causing the issue , but I cant set it back to default value . ?
Btw my "max nr of fragments" is set to zero.
Created on 03-27-2018 01:11 AM - edited 08-18-2019 12:09 AM
Could you please try to run QDT processor with the following configs after clearing the state in the processor
For the first pull will take long time wait until it completes to pull all the 55 million records, after that processor needs to check for incremental records for every 10 sec.
if the processor doesn't seems to be working just threads are hanging for ever then
bin/nifi.sh dump thread-dump.txt
then share the
nifi-bootstrap.log
Created 04-08-2019 03:11 PM
@Matt Burgess your comment on max fragment size feature if querydb processor. is this applicable to max rows fetch size too, i have set this in my flow and facing data loss issues. few records are missed by the processor working on 3 node cluster.