Support Questions

Find answers, ask questions, and share your expertise

Is it possible to perform bulk insert to ES with Nifi processor (putElasticsearch)?

avatar
Contributor

Hello All,

I want to write to ES using nifi. So I choose to use putElasticsearch processor to do this.

I want to know is it possible to do bulk insert to ES (https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html) using putElasticsearch processor?

My understanding: putElasticsearch writes to ES by reading the content of the input flowfile i.e, just one record in json format as shown below. ex: {"priority": "DEBUG", "classname": "ServiceImpl", "message": "Getting node by name XXX", "creationTime": "2016-04-20T15:38:43.000000Z"}

However, I want to perform a bulk insert. thus my flowfile contents will be in the format: (as specified by ES bulk api)

action_and_meta_data\n optional_source\n

action_and_meta_data\n optional_source\n ....

For example:

{"index": {"_type": "xxxxxlogs", "_id": "2016-04-20-15:13:57-945", "_index": "xxxxlogs-2016-04-20"}} \n

{"priority": "DEBUG", "classname": "ServiceImpl", "message": "Getting node by name XXXX", "creationTime": "2016-04-20T15:13:57.000000Z"} \n

{"index": {"_type": "xxxxlogs-", "_id": "2016-04-20-15:13:57-941", "_index": "xxxxlogs-2016-04-20"}} \n

{"priority": "DEBUG", "classname": "ServiceImpl", "message": "Got node idx XXX", "creationTime": "2016-04-20T15:13:57.000000Z"} \n

.......

Thousands of such entries: (action_and_meta_data\n optional_source\n)

Please let me know if this can be achieved with putElasticsearch processor? or point me to any specific format for my flowfile conten?

Otherwise please let me know how to achieve bulk insert to es using NIFI ?

Many thanks.

Regards,

Amarnath

1 ACCEPTED SOLUTION

avatar

Hi @Amar ch So the putElasicsearch processor as you have identified, is designed to write individual flowfiles, or indeed batches of flowfiles.

Those batches are controlled via the "Batch Size" property. I guess it really depends what you mean by bulk insert, I don't see any limiations on the "Batch Size" so it should be possible to increase that until you get the size insert you require.

For more information on the properties, please take a look at:

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.elasticsearch.PutElasti...

From reviewing the JIRA associated with the processor, it does look as if the putElasticsearch projcessor does make use of the bulk load api.

https://issues.apache.org/jira/browse/NIFI-1275

View solution in original post

12 REPLIES 12

avatar

Hi @Amar ch So the putElasicsearch processor as you have identified, is designed to write individual flowfiles, or indeed batches of flowfiles.

Those batches are controlled via the "Batch Size" property. I guess it really depends what you mean by bulk insert, I don't see any limiations on the "Batch Size" so it should be possible to increase that until you get the size insert you require.

For more information on the properties, please take a look at:

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.elasticsearch.PutElasti...

From reviewing the JIRA associated with the processor, it does look as if the putElasticsearch projcessor does make use of the bulk load api.

https://issues.apache.org/jira/browse/NIFI-1275

avatar
Contributor

Hello @drussell

I know that putElasticsearch does write contents of mutile flowfile's at a time to ES driven by Batch size attribute. where each flowfile contain just 1 record, i.e., one single json record.

But Let me refine my words: I am looking to have a multiple flowfile, where each flowfile contain mutile records unlike in the above case (in this format - (action_and_meta_data\n optional_source\n). As shown in my question.

Is it clear what I am looking at?

Regards,

Amarnath

avatar

Ahh now that's clearer, unfortunately I don't believe that NiFi is able to do that currently.

avatar
Contributor

Thanks @drussell for your response. I wish other community members could confirm this.

avatar
Master Guru

I can confirm this, the PutElasticsearch processor will take each flow file (assumed to contain a single record) and create a Bulk Insert from them. As @drussell mentioned, you can set the batch size to the desired number of flowfiles per batch, and depending on the throughput / latency of the flow, you may want to schedule PutElasticsearch differently. For example if the PutElasticsearch seems to only be handling one flow file at a time (but your batch size is set to, say, 100), then the flow files are not coming in fast enough (i.e. there is only one flow file available when PutElasticsearch runs), so if you want to batch them, you can have the PutElasticsearch processor run less often, such that flow files will queue up in the incoming connection, then when it runs, it will put more flow files into the bulk insert.

If you already have a flow file with multiple documents, and if they are single-line JSON objects, you may be able to use SplitText to split them up into individual documents, then use the technique(s) above to get the bulk insert behavior you are looking for. Also feel free to add an Improvement Jira to let the user specify (or possibly detect) whether the flow file contains multiple documents.

avatar
Guru

The NiFi processor does in fact use the elastic search BulkRequest java api, so in fact, even if you set the batch size to 1, you will be using batch loading from the ES perspective.

If you want to send in a file with multiple JSON records, you have two choices. You could either use InvokeHttp to post to the REST API.

For a more NiFi centric solution, use SplitText to divide up the JSON records, and then process these with a decent BatchSize using the PutElasticsearch processor. This will give you good control, especially if you need to use back-pressure to even out the flow and protect the ES cluster.

avatar
Contributor

Thanks @mburgess and @Simon Elliston Ball for confirming. Also, on suggested ways (SplitText) to send multiple documents to ES, I'm afraid that multiple flow files / events gets created and it is too much on a Raspberry Pi. My original files are each 1.1 MB and at max 10 such files can exists. I tried http post (documents formatted as per ES bulk api) it works fine, however processing and converting to the required format takes a lot of time on pi (using executeScript in python), So I am going with a script and ES client in python (No Nifi) to process and write to ES.

avatar

This is what I did...................................I had a series of observation coming from a sensor...............something like:

{"attr1":"{value}","attr2":"{value}","attr3":"{value}","attr4":"{value}","timestamp":t1}

{"attr1":"{value}","attr2":"{value}","attr3":"{value}","attr4":"{value}","timestamp":t2}

{"attr1":"{value}","attr2":"{value}","attr3":"{value}","attr4":"{value}","timestamp":t3}

.......................................................................................................

{"attr1":"{value}","attr2":"{value}","attr3":"{value}","attr4":"{value}","timestamp":tn}

I have used a ReplaceText processor to transform the series in something which would fit the POST BULK ES API

First, I have replaced

{"attr1":

with:

{"index":{"_index":"{your_index_value}","_type":"attr1"}} {"attr1":

thus getting the newly transformed series:

{"index":{"_index":"{your_index_value}","_type":"attr1"}}

{"attr1":"{value}","attr2":"{value}","attr3":"{value}","attr4":"{value}","timestamp":t1}

{"index":{"_index":"{your_index_value}","_type":"attr1"}}

{"attr1":"{value}","attr2":"{value}","attr3":"{value}","attr4":"{value}","timestamp":t2}

..........................................................................................................

{"index":{"_index":"{your_index_value}","_type":"attr1"}}

{"attr1":"{value}","attr2":"{value}","attr3":"{value}","attr4":"{value}","timestamp":tn}

I have then merged the series with a MergeProcessor and got something like:

{"index":{"_index":"{your_index_value}","_type":"attr1"}}

{"attr1":"{value}","attr2":"{value}","attr3":"{value}","attr4":"{value}","timestamp":t1}{"index":{"_index":"{your_index_value}","_type":"attr1"}}

{"attr1":"{value}","attr2":"{value}","attr3":"{value}","attr4":"{value}","timestamp":t2}{"index":{"_index":"{your_index_value}","_type":"attr1"}}

{"attr1":"{value}","attr2":"{value}","attr3":"{value}","attr4":"{value}","timestamp":t3}......................................................................{"index":{"_index":"{your_index_value}","_type":"attr1"}}

{"attr1":"{value}","attr2":"{value}","attr3":"{value}","attr4":"{value}","timestamp":tn}

At this stage, a ReplaceText processor has helped me breaking the merged block into lines to be sent to ES.

I have replaced

}{"index"

with

}

{"index"

and got

{"index":{"_index":"{your_index_value}","_type":"attr1"}}

{"attr1":"{value}","attr2":"{value}","attr3":"{value}","attr4":"{value}","timestamp":t1}

{"index":{"_index":"{your_index_value}","_type":"attr1"}}

{"attr1":"{value}","attr2":"{value}","attr3":"{value}","attr4":"{value}","timestamp":t2}

{"index":{"_index":"{your_index_value}","_type":"attr1"}}

{"attr1":"{value}","attr2":"{value}","attr3":"{value}","attr4":"{value}","timestamp":t3}

......................................................................

{"index":{"_index":"{your_index_value}","_type":"attr1"}}

{"attr1":"{value}","attr2":"{value}","attr3":"{value}","attr4":"{value}","timestamp":tn}

Finally I have posted the series to ES via a PostHTTP processor with the following configuration:

URL: http://{my-ip}:9200/_bulk

sendAsFlowFile: false

I tell you it is working. I hope it would help.

Massimiliano

avatar
Contributor

Thanks @Massimiliano Nigrelli , would you mind sharing your flow as a template (.xml), I am looking for configurations for 2 ReplaceText processor's and MergeProcessor. or even pictures would do. Let me know if you need my email.?

Regards,

Amarnath