Support Questions

Find answers, ask questions, and share your expertise

Apche NIFI : problem by using QueryDatabaseTable and putelasticsearch

avatar
Explorer

Hello,

I have some questions with QueryDatabaseTable and elasticsearch processor.

I tried to get some data from PostgreSQL and index them with elasticsearch.

10488-deepinscrot-4253.png

After converAvroToJSON i got a JSON array like this,

[ {
  "gid" : 95167,
  "lat" : -0.5930874275427621,
  "lon" : 48.9348884422244
}, {
  "gid" : 95235,
  "lat" : -0.9967898987193282,
  "lon" : 48.935096381156605
}, {
  "gid" : 95307,
  "lat" : -0.7477345135642162,
  "lon" : 48.92913514241155
},
......
]

In order to use PutElasticsearch(HTTP) and geometry, i spited them and rebuild a JSON file. To splitJson and transformJSON takes a lot of time, so i want to remove splitJSON processor in order to speed up all the processor, but i cant .. Do you have some ideas ?

After JoltTransformJSON, the format likes that.

{
  "gid" : 799092,
  "coordinates" : [ 6.933880894889725, 48.0070784663541 ]
}

I need some help to improve the time of executing. I tried to index an array of data into elasticSearch, but i got an exception of index. It means that using the product of ConverAvroToJson processor and put them into ElasticSearch processor.

After some try the jetty serveur is down, i have changed JVM memory to 4096 mb and restarted the nifi, some data is locked between 2 processor ... how can i get them ? Web UI cant work.

I have read https://community.hortonworks.com/questions/31099/is-it-possible-to-to-perform-bulk-insert-to-es-wit...,

I dont want to use split, because it takes lots of time.

Thanks for you.

7 REPLIES 7

avatar
Rising Star
@qfdk

In your flow, note that the "PutElasticSearch" processor actually uses the ES bulk api: https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk.html. So if you are specifying a reasonable amount in the batch size you should essentially be "bulk inserting".

What exception in index did you receive from ElasticSearch? Were you pushing documents/data to the wrong index with an incorrect mapping? Posting the error from ES will help. The bulk index api in ES can sometimes be a bit clunky, so instead I use the ES HTTP processor and adjust the batchsize accordingly. This makes sending the raw JSON to the ES a little easier and straightforward.

For the data being stuck, can you run a nifi.sh dump and then post the contents of the bootstrap.log?

avatar
Explorer

Thank for your response

What exception in index did you receive from ElasticSearch?

→I have tried use ElasticSearchHttp processor, in ArvoToJson processor I tried both none/array for json container options. If I don't do that I will get an exceptionlike that.

10590-elastic.png

10601-exeption.png

Were you pushing documents/data to the wrong index with an incorrect mapping?

→ no, because I use httpelasticsearch processor to index the splited data using the same mapping.

Posting the error from ES will help.

ERROR [Timer-Driven Process Thread-8] o.a.n.p.e.PutElasticsearchHttp PutElasticsearchHttp[id=0159101b-95cc-1764-e634-d90d6b5877ad] Failed to insert StandardFlowFileRecord[uuid=b1f52deb-bd58-4a8c-82dc-7378f0cd3660,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1482315336809-525, container=default, section=525], offset=883475, length=51862],offset=0,name=18050202000067,size=51862] into Elasticsearch due to , transferring to failure
MapperParsingException[failed to parse]; nested: IllegalArgumentException[Malformed content, found extra data after parsing: START_OBJECT];
    at org.elasticsearch.index.mapper.DocumentParser.parseDocument(DocumentParser.java:156)
    at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:309)
    at org.elasticsearch.index.shard.IndexShard.prepareCreate(IndexShard.java:529)
    at org.elasticsearch.index.shard.IndexShard.prepareCreateOnPrimary(IndexShard.java:506)
    at org.elasticsearch.action.index.TransportIndexAction.prepareIndexOperationOnPrimary(TransportIndexAction.java:214)
    at org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary(TransportIndexAction.java:223)
    at org.elasticsearch.action.bulk.TransportShardBulkAction.shardIndexOperation(TransportShardBulkAction.java:327)
    at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:120)
    at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:68)
    at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryPhase.doRun(TransportReplicationAction.java:657)
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
Caused by: java.lang.IllegalArgumentException: Malformed content, found extra data after parsing: START_OBJECT
    at org.elasticsearch.index.mapper.DocumentParser.parseDocument(DocumentParser.java:141)
    ... 18 more
[2016-12-21 16:25:56,723][DEBUG][action.bulk              ] [D'Spayre] [test][3] failed to execute bulk item (index) index {[test][default][AVkh_SQL6SmWoIKNH3EF], source[{"gid": 266328, "coordinates": "43.7092713589835427,1.55720029588910425",

I created 3 processor group, one of them doesn’t work, when I start it, it doesn't work and the group cant be stopped..

10602-bug.png

There is 2 in red, i can't do use this processor, i waited 1 h for it ..

I put a video for you.

10603-bug2.png

Failed to load resource: the server responded with a status of 409 (Conflict)

2016-12-21 14:13:41,349 INFO [Flow Service Tasks Thread-1] o.a.nifi.controller.StandardFlowService Saved flow controller org.apache.nifi.controller.FlowController@4428a68b // Another save pending = false
2016-12-21 14:13:51,059 ERROR [Timer-Driven Process Thread-3] o.a.n.p.standard.QueryDatabaseTable QueryDatabaseTable[id=01591001-75cd-17d5-dca1-8bf6e638af98] Unable to execute SQL select query SELECT gid,concat_ws(',',ST_Y(ST_SetSRID(geom_wgs84::geometry,4326)),ST_X(ST_SetSRID(geom_wgs84::geometry,4326))) AS coordinates,id,id_tr,numero,rep,nom_voie,nom_long_voie,code_insee,code_post,type_loc,cote FROM bdadresse.point_adresse due to org.postgresql.util.PSQLException: ERREUR: annulation de la requête à la demande de l'utilisateur: org.postgresql.util.PSQLException: ERREUR: annulation de la requête à la demande de l'utilisateur
2016-12-21 14:13:51,061 ERROR [Timer-Driven Process Thread-3] o.a.n.p.standard.QueryDatabaseTable 
org.postgresql.util.PSQLException: ERREUR: annulation de la requête à la demande de l'utilisateur
 at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2455) ~[postgresql-9.4.1212.jar:9.4.1212]
 at org.apache.nifi.processors.standard.QueryDatabaseTable.onTrigger(QueryDatabaseTable.java:241) ~[nifi-standard-processors-1.1.0.2.1.0.0-165.jar:1.1.0.2.1.0.0-165]
➜  ~ nifi.sh dump 
Java home: /usr/lib/jvm/java-8-oracle
NiFi home: /home/qfdk/bin/nifi-1.1.0.2.1.0.0-165
Bootstrap Config File: /home/qfdk/bin/nifi-1.1.0.2.1.0.0-165/conf/bootstrap.conf
Exception in thread "main" java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at org.apache.nifi.bootstrap.RunNiFi.dump(RunNiFi.java:700)
at org.apache.nifi.bootstrap.RunNiFi.main(RunNiFi.java:226)

I will share my logs here desktop.zip

avatar
Explorer
2016-12-22 08:49:17,342 WARN [Timer-Driven Process Thread-4] o.a.n.p.standard.QueryDatabaseTable QueryDatabaseTable[id=01591001-75cd-17d5-dca1-8bf6e638af98] Processor Administratively Yielded for 1 sec due to processing failure
2016-12-22 08:49:17,342 WARN [Timer-Driven Process Thread-4] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding QueryDatabaseTable[id=01591001-75cd-17d5-dca1-8bf6e638af98] due to uncaught Exception: java.lang.OutOfMemoryError: Java heap space
2016-12-22 08:49:17,342 WARN [Timer-Driven Process Thread-4] o.a.n.c.t.ContinuallyRunProcessorTask 
java.lang.OutOfMemoryError: Java heap space

I have used arg5=5120M for jvm

avatar
Rising Star

@qfdk

Would you mind sending a copy of the data that comes out of the "replaceText" processor? I would like to try on my local system to post to ES. Do you have the mapping for that index as well in ES? Is the data as simple as:

{  "gid" : 799092,  "coordinates" : [ 6.933880894889725, 48.0070784663541 ]},
{  "gid" : 799092,  "coordinates" : [ 6.933880894889725, 48.0070784663541 ]},{  "gid" : 799092,  "coordinates" : [ 6.933880894889725, 48.0070784663541 ]}

It looks to me like you should be using just the PutElasticSearch processor which uses the bulk api. I see in your "PostHTTP" processor in your video that you are pushing out to the "_bulk" endpoint. Looking at your error log, it seems it is not formatted correctly when being indexed in ES: "Caused by: java.lang.IllegalArgumentException: Malformed content, found extra data after parsing: START_OBJECT"

avatar
Explorer

Thank you, sorry for late. I found why it doesn't work... because the query is too huge to execute. The QueryDatabaseTable processor has a problem, it will read all the data then i got an outofmemeory exception ...

Then i tried to modifier GereateTableFetch to add a where condition in order to using index in database not a offset 🙂 OFFSET is bad for skipping previous rows.

so it can generate sql like this

select xxx
From xxxxx
where 200000=>id
order by id
limit 200000

input files

{"gid": 49900, "coordinates": "44.5638145014092331,4.4830168699343016"}
{"gid": 49901, "coordinates": "44.5110194507808501,4.26131697875612936"}

after replaceText

{"index":{"_index":"point_adresse","_type":"default","_id": 650}}
{"gid":650, "coordinates": "46.3066856051698537,5.94381332350432334"}
{"index":{"_index":"point_adresse","_type":"default","_id": 651}}
{"gid":651, "coordinates": "46.2950964034383858,5.93722413905152013"}
there is a /n

mapping

    "mappings": {
      "default": {
         "_all": {
        "enabled": true
      },
      "dynamic_templates": [
        {
          "string_fields": {
            "match": "*",
            "match_mapping_type": "string",
            "mapping": {
              "type": "string",
              "index": "analyzed",
              "omit_norms": true,
              "fields": {
                "raw": {
                  "type": "string",
                  "index": "not_analyzed",
                  "ignore_above": 256
                }
              }
            }
          }
        }
      ],
        "properties": {
          "coordinates": {
            "type": "geo_point"
          },
          "gid": {
            "type": "string"
          }
        }
      }
    }

I think it's a problem of ESHTTP processor and processor SQL ...

avatar
Master Guru

If the query returns too many rows in QueryDatabaseTable, can you set Max Rows Per Flow File and Fetch Size to something like 10000? That should prevent the entire result set from being loaded into memory (if the JDBC driver supports it). Also if you want to do something like 200000 >= id, you can set the Initial Max Value for that field using a dynamic property initial.maxvalue.id to 200000.

avatar
Explorer

@Matt Burgess

If the query returns too many rows in QueryDatabaseTable, can you set Max Rows Per Flow File and Fetch Size to something like 10000?

-> yes, I have do that, but it doesn't work... I have 30 000 000 + data in postgreSQL so an exception of OutOfMemory is normal.

I'm looking forward to your answer for PutElasticSearchHTTP.