I am using the ScrollElasticsearchHttp NiFi processor to read all messages for a particular set of ElasticSearch indexes which I am then writing into a data warehouse after some processing. I know that there are about 25 mil records in total in the ElasticSearch indexes. After reading about 100 000 messages for example, the processor fails with:
ScrollElasticsearchHttp[id=0b7cc9a1-0171-1000-ffff-ffffd34fb1b4] Elasticsearch returned code 404 with message Not Found
I know that this 404 error is temporary/transient because when I query ElasticSearch directly through a browser, it returns content just fine.
How do I set the ScrollElasticsearchHttp processor to continue from the scrollid where the failure occurred, rather than clearing the state and starting reading from scratch again?
When I view the state of the ScrollElasticsearchHttp processor, it shows me that it has stopped at a scrollid with a value of:
In a recent Use Case of NiFi + ELK, we route all failures for ELK Processors to a Replay Process Group. If any failures, we have a notification, and collect them in a replay queue that is disabled. Once the notification is seen, and or some corrective action taken to make sure there are no issues, a user logs in and enables queue, which routes back to original processor to execute again.
I hope a similar setup can help resolve your issue.
The thing is, there were no failed flowfiles from the ScrollElasticsearchHttp processor going to a failure output port that I could consider handling in the way you describe.
The flow just showed as running, with the below errors appearing consistently on the ScrollElasticsearchHttp processor:
2020-04-07 13:35:14,294 WARN [Timer-Driven Process Thread-5] o.a.n.p.e.ScrollElasticsearchHttp ScrollElasticsearchHttp[id=0b7cc9a1-0171-1000-ffff-ffffd34fb1b4] Elasticsearch returned code 404 with message Not Found
So, is there a way to stop the flow and restart it to continue from a particular scrollid without having to reset the state and start from scratch?
No I found no solution to this. I abandoned reading from ElasticSearch and rather read from the Kafka topics instead that LogStash used to populate ElasticSearch. I'm amazed that the whole Cloudera Community provided no response to this. Please let me know if you find a solution.
I think I understood the problem and the solution is currently working for me pretty well.
The culprit property here is Scroll Duration in the processor ScrollElasticSearchHttp. The property default is 1m(minute) and by definition it means How long each search context is kept in memory (of ES itself).
In my configuration, Page Size is 100, means 100 records per flow file.
1. For some reason, fetching a page(*current cycle) took more than 1 minute.
2. 1 minute passed means, since Scroll Duration is set to 1 minute, ES removed that ScrollId(in View State) from memory and no longer recognises it.
3. Nifi processor uses the ScrollId to fetch the next page after completing the *current cycle.
4. Gets 404 because there's no such ScrollId in ES memory.
Solution(working for me):
Increase the value of Scroll Duration to 5m and hope each cycle completes within 5 mins(which IMO is a pretty good time).
NOTE: Don't increase the value much or it might be a huge pressure on the ES and might stall it(haven't tried, just a theory).
Hope this solves your problem too. You can try and let me know. Best of luck.
For all others, please understand this is NOT a solution to the original question. This is about how to avoid the scenario mentioned in it.
the 404 warning unfortunately does not fully explain what fails. I assume the data is still there in ElasticSearch and not removed while the processor is running? It could be helpful to look around in the Nifi logs on one of the nodes (goes to /var/log/nifi-app.log by default). Also I wonder if there's anything written in the ElasticSearch logs at the same time the warnings appear in Nifi.
I'm not aware of a setting out of the box to continue from the same scrollid. It might help to set a run schedule for the processor other than 0, like 10-20 seconds. (edit: reference)
Should you have Cloudera Support subscription, please open a support case with us to assist you in a more timely manner.