Created on 12-19-2016 08:56 AM - edited 08-18-2019 04:49 AM
Hello,
I have some questions with QueryDatabaseTable and elasticsearch processor.
I tried to get some data from PostgreSQL and index them with elasticsearch.
After converAvroToJSON i got a JSON array like this,
[ { "gid" : 95167, "lat" : -0.5930874275427621, "lon" : 48.9348884422244 }, { "gid" : 95235, "lat" : -0.9967898987193282, "lon" : 48.935096381156605 }, { "gid" : 95307, "lat" : -0.7477345135642162, "lon" : 48.92913514241155 }, ...... ]
In order to use PutElasticsearch(HTTP) and geometry, i spited them and rebuild a JSON file. To splitJson and transformJSON takes a lot of time, so i want to remove splitJSON processor in order to speed up all the processor, but i cant .. Do you have some ideas ?
After JoltTransformJSON, the format likes that.
{ "gid" : 799092, "coordinates" : [ 6.933880894889725, 48.0070784663541 ] }
I need some help to improve the time of executing. I tried to index an array of data into elasticSearch, but i got an exception of index. It means that using the product of ConverAvroToJson processor and put them into ElasticSearch processor.
After some try the jetty serveur is down, i have changed JVM memory to 4096 mb and restarted the nifi, some data is locked between 2 processor ... how can i get them ? Web UI cant work.
I have read https://community.hortonworks.com/questions/31099/is-it-possible-to-to-perform-bulk-insert-to-es-wit...,
I dont want to use split, because it takes lots of time.
Thanks for you.
Created 12-20-2016 06:28 PM
In your flow, note that the "PutElasticSearch" processor actually uses the ES bulk api: https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk.html. So if you are specifying a reasonable amount in the batch size you should essentially be "bulk inserting".
What exception in index did you receive from ElasticSearch? Were you pushing documents/data to the wrong index with an incorrect mapping? Posting the error from ES will help. The bulk index api in ES can sometimes be a bit clunky, so instead I use the ES HTTP processor and adjust the batchsize accordingly. This makes sending the raw JSON to the ES a little easier and straightforward.
For the data being stuck, can you run a nifi.sh dump and then post the contents of the bootstrap.log?
Created on 12-21-2016 10:56 AM - edited 08-18-2019 04:49 AM
Thank for your response
What exception in index did you receive from ElasticSearch?
→I have tried use ElasticSearchHttp processor, in ArvoToJson processor I tried both none/array for json container options. If I don't do that I will get an exceptionlike that.
Were you pushing documents/data to the wrong index with an incorrect mapping?
→ no, because I use httpelasticsearch processor to index the splited data using the same mapping.
Posting the error from ES will help.
ERROR [Timer-Driven Process Thread-8] o.a.n.p.e.PutElasticsearchHttp PutElasticsearchHttp[id=0159101b-95cc-1764-e634-d90d6b5877ad] Failed to insert StandardFlowFileRecord[uuid=b1f52deb-bd58-4a8c-82dc-7378f0cd3660,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1482315336809-525, container=default, section=525], offset=883475, length=51862],offset=0,name=18050202000067,size=51862] into Elasticsearch due to , transferring to failure MapperParsingException[failed to parse]; nested: IllegalArgumentException[Malformed content, found extra data after parsing: START_OBJECT]; at org.elasticsearch.index.mapper.DocumentParser.parseDocument(DocumentParser.java:156) at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:309) at org.elasticsearch.index.shard.IndexShard.prepareCreate(IndexShard.java:529) at org.elasticsearch.index.shard.IndexShard.prepareCreateOnPrimary(IndexShard.java:506) at org.elasticsearch.action.index.TransportIndexAction.prepareIndexOperationOnPrimary(TransportIndexAction.java:214) at org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary(TransportIndexAction.java:223) at org.elasticsearch.action.bulk.TransportShardBulkAction.shardIndexOperation(TransportShardBulkAction.java:327) at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:120) at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:68) at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryPhase.doRun(TransportReplicationAction.java:657) at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) Caused by: java.lang.IllegalArgumentException: Malformed content, found extra data after parsing: START_OBJECT at org.elasticsearch.index.mapper.DocumentParser.parseDocument(DocumentParser.java:141) ... 18 more [2016-12-21 16:25:56,723][DEBUG][action.bulk ] [D'Spayre] [test][3] failed to execute bulk item (index) index {[test][default][AVkh_SQL6SmWoIKNH3EF], source[{"gid": 266328, "coordinates": "43.7092713589835427,1.55720029588910425",
I created 3 processor group, one of them doesn’t work, when I start it, it doesn't work and the group cant be stopped..
There is 2 in red, i can't do use this processor, i waited 1 h for it ..
I put a video for you.
Failed to load resource: the server responded with a status of 409 (Conflict)
2016-12-21 14:13:41,349 INFO [Flow Service Tasks Thread-1] o.a.nifi.controller.StandardFlowService Saved flow controller org.apache.nifi.controller.FlowController@4428a68b // Another save pending = false 2016-12-21 14:13:51,059 ERROR [Timer-Driven Process Thread-3] o.a.n.p.standard.QueryDatabaseTable QueryDatabaseTable[id=01591001-75cd-17d5-dca1-8bf6e638af98] Unable to execute SQL select query SELECT gid,concat_ws(',',ST_Y(ST_SetSRID(geom_wgs84::geometry,4326)),ST_X(ST_SetSRID(geom_wgs84::geometry,4326))) AS coordinates,id,id_tr,numero,rep,nom_voie,nom_long_voie,code_insee,code_post,type_loc,cote FROM bdadresse.point_adresse due to org.postgresql.util.PSQLException: ERREUR: annulation de la requête à la demande de l'utilisateur: org.postgresql.util.PSQLException: ERREUR: annulation de la requête à la demande de l'utilisateur 2016-12-21 14:13:51,061 ERROR [Timer-Driven Process Thread-3] o.a.n.p.standard.QueryDatabaseTable org.postgresql.util.PSQLException: ERREUR: annulation de la requête à la demande de l'utilisateur at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2455) ~[postgresql-9.4.1212.jar:9.4.1212] at org.apache.nifi.processors.standard.QueryDatabaseTable.onTrigger(QueryDatabaseTable.java:241) ~[nifi-standard-processors-1.1.0.2.1.0.0-165.jar:1.1.0.2.1.0.0-165]
➜ ~ nifi.sh dump Java home: /usr/lib/jvm/java-8-oracle NiFi home: /home/qfdk/bin/nifi-1.1.0.2.1.0.0-165 Bootstrap Config File: /home/qfdk/bin/nifi-1.1.0.2.1.0.0-165/conf/bootstrap.conf Exception in thread "main" java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:170) at java.net.SocketInputStream.read(SocketInputStream.java:141) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:161) at java.io.BufferedReader.readLine(BufferedReader.java:324) at java.io.BufferedReader.readLine(BufferedReader.java:389) at org.apache.nifi.bootstrap.RunNiFi.dump(RunNiFi.java:700) at org.apache.nifi.bootstrap.RunNiFi.main(RunNiFi.java:226)
I will share my logs here desktop.zip
Created 12-22-2016 08:09 AM
2016-12-22 08:49:17,342 WARN [Timer-Driven Process Thread-4] o.a.n.p.standard.QueryDatabaseTable QueryDatabaseTable[id=01591001-75cd-17d5-dca1-8bf6e638af98] Processor Administratively Yielded for 1 sec due to processing failure 2016-12-22 08:49:17,342 WARN [Timer-Driven Process Thread-4] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding QueryDatabaseTable[id=01591001-75cd-17d5-dca1-8bf6e638af98] due to uncaught Exception: java.lang.OutOfMemoryError: Java heap space 2016-12-22 08:49:17,342 WARN [Timer-Driven Process Thread-4] o.a.n.c.t.ContinuallyRunProcessorTask java.lang.OutOfMemoryError: Java heap space
I have used arg5=5120M for jvm
Created 12-24-2016 04:58 PM
Would you mind sending a copy of the data that comes out of the "replaceText" processor? I would like to try on my local system to post to ES. Do you have the mapping for that index as well in ES? Is the data as simple as:
{ "gid" : 799092, "coordinates" : [ 6.933880894889725, 48.0070784663541 ]}, { "gid" : 799092, "coordinates" : [ 6.933880894889725, 48.0070784663541 ]},{ "gid" : 799092, "coordinates" : [ 6.933880894889725, 48.0070784663541 ]}
It looks to me like you should be using just the PutElasticSearch processor which uses the bulk api. I see in your "PostHTTP" processor in your video that you are pushing out to the "_bulk" endpoint. Looking at your error log, it seems it is not formatted correctly when being indexed in ES: "Caused by: java.lang.IllegalArgumentException: Malformed content, found extra data after parsing: START_OBJECT"
Created 12-28-2016 08:43 AM
Thank you, sorry for late. I found why it doesn't work... because the query is too huge to execute. The QueryDatabaseTable processor has a problem, it will read all the data then i got an outofmemeory exception ...
Then i tried to modifier GereateTableFetch to add a where condition in order to using index in database not a offset 🙂 OFFSET is bad for skipping previous rows.
so it can generate sql like this
select xxx From xxxxx where 200000=>id order by id limit 200000
input files
{"gid": 49900, "coordinates": "44.5638145014092331,4.4830168699343016"} {"gid": 49901, "coordinates": "44.5110194507808501,4.26131697875612936"}
after replaceText
{"index":{"_index":"point_adresse","_type":"default","_id": 650}} {"gid":650, "coordinates": "46.3066856051698537,5.94381332350432334"} {"index":{"_index":"point_adresse","_type":"default","_id": 651}} {"gid":651, "coordinates": "46.2950964034383858,5.93722413905152013"} there is a /n
mapping
"mappings": { "default": { "_all": { "enabled": true }, "dynamic_templates": [ { "string_fields": { "match": "*", "match_mapping_type": "string", "mapping": { "type": "string", "index": "analyzed", "omit_norms": true, "fields": { "raw": { "type": "string", "index": "not_analyzed", "ignore_above": 256 } } } } } ], "properties": { "coordinates": { "type": "geo_point" }, "gid": { "type": "string" } } } }
I think it's a problem of ESHTTP processor and processor SQL ...
Created 12-28-2016 11:51 PM
If the query returns too many rows in QueryDatabaseTable, can you set Max Rows Per Flow File and Fetch Size to something like 10000? That should prevent the entire result set from being loaded into memory (if the JDBC driver supports it). Also if you want to do something like 200000 >= id, you can set the Initial Max Value for that field using a dynamic property initial.maxvalue.id to 200000.
Created 12-29-2016 08:41 AM
If the query returns too many rows in QueryDatabaseTable, can you set Max Rows Per Flow File and Fetch Size to something like 10000?
-> yes, I have do that, but it doesn't work... I have 30 000 000 + data in postgreSQL so an exception of OutOfMemory is normal.
I'm looking forward to your answer for PutElasticSearchHTTP.