Support Questions

Find answers, ask questions, and share your expertise

How to determine if a ExecuteSQL has ingested all the data from the desired table

avatar
New Contributor

Hello Community,

I am working on a data ingestion pipeline using Apache NiFi, and I am using the `ExecuteSQL` processor to read data from a database table and ingest it into my data flow. I want to ensure that all records from the table have been successfully ingested, but I am unsure how to verify this within NiFi. Here are the details of my current setup and the specific challenges I am facing:

Current Setup:
1. Database Source: I am querying a relational database.
2. NiFi Processor: I am using the `ExecuteSQL` processor to execute a SQL query and fetch data from the table.
3. Downstream Processors: The flow files generated by `ExecuteSQL` are processed by subsequent processors such as `ConvertRecord`, `PutHDFS`, etc.

Specific Challenges:
1. Ensuring Completeness: How can I verify that all rows from the table have been ingested? Is there a mechanism within NiFi to confirm that no records have been missed?
2. Record Counting: Is there a way to count the number of records fetched by `ExecuteSQL` and compare it to the total number of records in the database table to ensure completeness?
3. Error Handling: What are the best practices for handling errors or retries in case the ingestion process is interrupted or fails partway through?
4. Monitoring and Logging: How can I effectively monitor and log the progress of data ingestion to track the number of records ingested vs. the total number of records in the table?

Desired Outcome:
- A method to compare the record count ingested by NiFi with the actual record count in the source table.
- Best practices for setting up error handling and retries in the event of ingestion failures.
- Recommendations for monitoring and logging to ensure the completeness and integrity of the data ingestion process.

Example Scenario:
For example, if my source table contains 100,000 records, I want to be confident that all 100,000 records have been processed and ingested by NiFi. Additionally, if the process fails after ingesting 50,000 records, I would like to know how to handle such situations to ensure data consistency.

Any advice, examples, or best practices from the community would be greatly appreciated. Thank you!

Best regards,
Mohammed NAJB

3 REPLIES 3

avatar
Master Mentor

@mohammed_najb 

Is the ExecuteSQL the first processor in your dataflow or is it being fed by an inbound connection from some upstream processor such as the GenerateTableFetch processor?
I only ask since ExecuteSQL processor does not retain and state so it alone would not be the best choice for ingesting from an active table that may be having additional rows added to the DB regularly.  

As far as the ExecuteSQL, it writes out attributes on the FlowFiles it produces.  The "executesql.row.count" will record the number of rows returned by the query OR the number of rows in the specific produced NiFi FlowFile's content when "Max rows per FlowFile" property is configured with a non zero value.

When multiple FlowFiles are being produced, you could use an UpdateCounter processor to create a counter and use the NiFi Expression Language "${executesql.row.count}" as the delta.

As far as your query about "process fails " is concerned.  The ExecuteSQL will execute the SQL query and based on configuration create 1 or more FlowFiles.  Also based on configuration it will incrementally release FlowFiles to the downstream connection or release them all at once (default) via OutputBatch Size configuration.  Assuming using default, no FlowFiles are output until until query is complete and all FlowFiles are ready fro transfer to the outbound connection.  If failure happens prior to the is transfer (system crash, etc.), no FlowFiles are output.  On next execution of the ExecuteSQL the query is executed again if no inbound connection.
If ExecuteSQL is utilizing and inbound FlowFile from an inbound connection to trigger the execution, processing failure would result in FlowFile routing to failure relationship which you could setup to retry.
If system crash,  FlowFile remains in inbound connection an simply starts over execution on system restore.

Hopefully tis gives you some insight to experiment with.  As is the case with many use cases, NiFi often has more then 1 way to build them and multiple processor options.  The more detailed you are with yoru use case, the better feedback you may get in the community.

Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt



avatar
New Contributor

Thank you so much for your clear response. As I am new to Nifi, my flow looks like the following:

screen.png

My primary concern is to ensure that the entire NiFi flow runs successfully without any errors and that all data extracted from the database table is accurately written to HDFS. The reason behind this concern is that I want to trigger a Spark job using a scheduler such as Apache Airflow immediately after the data ingestion process completes.

- Flow Overview
- GenerateTableFetch -> ExecuteSQL -> PartitionRecord -> PutHDFS -> PutEmail

Specific Concerns:
1. Flow Completion:
- Error-Free Execution: How can I ensure that the entire flow runs without any errors from start to finish and if any error occurred I want to be able to see in the logs enventually in the airflow logs?
- Verification of Data Written: How can I confirm that all the data extracted from the table is successfully written to HDFS?

2. Integration with Scheduler:
- Triggering Spark Job: What are the best practices to trigger a Spark job in Apache Airflow right after confirming that the data ingestion process has completed successfully?

Thank you for your assistance!

avatar
Master Mentor

@mohammed_najb 

  1. It is impossible to guarantee a flow will always run error free. You need to plan and design for handling failure.  How are you handling the "failure" relationships on your ExecuteSQL and putHDFS processors?
  2. The PutHDFS will either be successful or route FlowFile to failure relationship r rollback the session.  NiFi does not auto remove FlowFiles.  It is responsibility of dataflow designr to handle failures to avoid dataloss.  For example, do not auto-terminate any component relationships where FlowFile may get routed.
  3. I don't know what would be the "best practice" as that comes with testing. Since you are using GenerateTableFetch processor, it creates attributes on the output FlowFiles.  One of which is "fragment.count".  You could potentially use this to track that all records are written to HDFS successfully.   Look at UpdateAttributes stateful usage options.  This would allow you to setup RouteOnAttribute to route last FlowFile once stateful count equals "fragement.count" to a processor that triggers your Spark job.

Just a suggestion, but others in the community may have other flow design options.

Please help our community thrive. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped.

Thank you,
Matt