Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Is it possible to perform bulk insert to ES with Nifi processor (putElasticsearch)?

avatar
Contributor

Hello All,

I want to write to ES using nifi. So I choose to use putElasticsearch processor to do this.

I want to know is it possible to do bulk insert to ES (https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html) using putElasticsearch processor?

My understanding: putElasticsearch writes to ES by reading the content of the input flowfile i.e, just one record in json format as shown below. ex: {"priority": "DEBUG", "classname": "ServiceImpl", "message": "Getting node by name XXX", "creationTime": "2016-04-20T15:38:43.000000Z"}

However, I want to perform a bulk insert. thus my flowfile contents will be in the format: (as specified by ES bulk api)

action_and_meta_data\n optional_source\n

action_and_meta_data\n optional_source\n ....

For example:

{"index": {"_type": "xxxxxlogs", "_id": "2016-04-20-15:13:57-945", "_index": "xxxxlogs-2016-04-20"}} \n

{"priority": "DEBUG", "classname": "ServiceImpl", "message": "Getting node by name XXXX", "creationTime": "2016-04-20T15:13:57.000000Z"} \n

{"index": {"_type": "xxxxlogs-", "_id": "2016-04-20-15:13:57-941", "_index": "xxxxlogs-2016-04-20"}} \n

{"priority": "DEBUG", "classname": "ServiceImpl", "message": "Got node idx XXX", "creationTime": "2016-04-20T15:13:57.000000Z"} \n

.......

Thousands of such entries: (action_and_meta_data\n optional_source\n)

Please let me know if this can be achieved with putElasticsearch processor? or point me to any specific format for my flowfile conten?

Otherwise please let me know how to achieve bulk insert to es using NIFI ?

Many thanks.

Regards,

Amarnath

1 ACCEPTED SOLUTION

avatar

Hi @Amar ch So the putElasicsearch processor as you have identified, is designed to write individual flowfiles, or indeed batches of flowfiles.

Those batches are controlled via the "Batch Size" property. I guess it really depends what you mean by bulk insert, I don't see any limiations on the "Batch Size" so it should be possible to increase that until you get the size insert you require.

For more information on the properties, please take a look at:

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.elasticsearch.PutElasti...

From reviewing the JIRA associated with the processor, it does look as if the putElasticsearch projcessor does make use of the bulk load api.

https://issues.apache.org/jira/browse/NIFI-1275

View solution in original post

12 REPLIES 12

avatar

Here is the flow where I have removed sensitive info (sorry for bad quality!!)

4119-flow.png

Here are the processors

AddHeader

4115-replacetext-addheader.png

{"index":{"_index":"${name:toLower()}","_type":"${observableProperty}"}}

{"observableProperty"

MERGE

4116-merge.png

ADJUST FORMAT IN BULK

4117-replacetext-adjustformatinbulk.png

POST HTTP TO ES4118-posthttp.png

avatar
Contributor

Thank @Massimiliano Nigrelli for the information, it is helpful.

avatar
Master Mentor

@Amar ch please accept one of the answers to close the thread.