Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How does the FetchElasticSearch Processor Work?

avatar
New Contributor

I would of thought the FetchElasticSearch Processor would be able to connect to elasticsearch and extract all the documents for a particular Index and Type since it asks for that information in the processor configuration, however, it needs an upstream task. What upstream processor does it need? The task I am working on is to copy one index into another which has a different

elasticsearch mapping.

1 ACCEPTED SOLUTION

avatar

Hi Will,

On a logical point of view, you are right, I think there is no absolute need to have an incoming connection. As it is coded right now, it requires an incoming connection to allow users to leverage the expression language in the properties of the processor. However I think that the incoming connection could be optional instead of required.

The simplest way to get things working is to add a GenerateFlowFile processor before the FetchElasticsearch one. You can configure the generate flow file processor with a 0B size to have empty flow files, and change the scheduling as cron-time based to decide the frequency of the requests that will be sent to Elasticsearch.

Hope that helps.

View solution in original post

5 REPLIES 5

avatar

Hi Will,

On a logical point of view, you are right, I think there is no absolute need to have an incoming connection. As it is coded right now, it requires an incoming connection to allow users to leverage the expression language in the properties of the processor. However I think that the incoming connection could be optional instead of required.

The simplest way to get things working is to add a GenerateFlowFile processor before the FetchElasticsearch one. You can configure the generate flow file processor with a 0B size to have empty flow files, and change the scheduling as cron-time based to decide the frequency of the requests that will be sent to Elasticsearch.

Hope that helps.

avatar
Master Guru

As @Pierre Villard mentioned, FetchElasticsearch should not require an incoming connection. This has been captured as part of NIFI-1576. However to extract all documents from a particular Index and (optional) Type, you'll need the Search API, but FetchElasticsearch uses the Get API. To use the Search API, you can use the InvokeHttp processor with your own search query. Please see this related HCC post:

https://community.hortonworks.com/questions/41951/how-to-get-all-values-with-expression-language-in....

avatar
Explorer

@Matt Burgess

Is there example of how to use the the InvokeHTTP processor to do this? I am attempting to this but having issues with the attributes to send property. The way I currently have it set up is I have an InvokeHTTP process simply connected to a PutFile process on original and response relationships and the rest (failure, no retry, and retry) auto terminating. My InvokeHTTP properties are all the default values except:

HTTP Method: GET Remote URL: http://localhost:9200

Now the I am just trying to get all the documents from a particular index called "tweet_library" and the query to do that using a curl command is:

<a href="http://localhost:9200/[your_index_name]/_search">http://localhost:9200/[your_index_name]/_search
{
  "size": [your value] //default 10
  "from": [your start index] //default 0
  "query":
   {
    "match_all": {}
   }
}

So I thought the right idea would be to just place

tweet_library/_search{"size": 10000 "query":{"match_all": {}}}

(I left off "from" because I want the default value) inside attributes to send but when I try to do this I get an error saying that "Attributes to Send validated against tweet_library/_search{ "size": 10000 "query": { "match_all": {} }} is invalid because Not a valid Java Regular Expression."

Could you please point me in a direction of a solution or possibly provide some help? Thank you.

But I also know that this curl command gives me what I want: curl -XGET http://localhost:9200/tweet_library/_search?size=10000 -d '{"query" : { "match_all" : {} }}' so I tried putting tweet_library/_search?size=10000 -d '{"query" : { "match_all" : {}}}' in my Attributes to Send and got the same warning/error.

avatar
Master Guru

Your curl command uses the -d parameter, which means it's sending that JSON in the body of the request. To do that with InvokeHttp, you could have a GenerateFlowFile -> ReplaceText processor before the InvokeHttp, where ReplaceText would set the body to the query you have above.

Alternatively you could use the URL Search API for Elasticsearch. In your example you are matching all documents (which is the default I believe) so http://localhost:9200/tweet_library/_search?size=10000 should be all you need for that case. To explicitly match all documents, you can use the q parameter:

http://localhost:9200/tweet_library/_search?size=10000&q=*:*

avatar
Explorer

Thanks for all your help so far Matt. I had to use the first option as I am going to have more complicated queries than getting all documents so I'm hoping I can just replace the match all query in my ReplaceText processor with whatever I need to query. The match all query seems to be working though but I am getting multiple copies of documents, is this because of the GenerateFlowFile Processor? I looked up the documentation for it and I'm not quite sure what it is doing. I do not want it to go through more than once.

Edit: It looks like I just got 84,000 files that are all the same documents and there's only 10,000 total in index.

Do I need an UpdateAttribute process?