Member since
11-05-2016
5
Posts
0
Kudos Received
0
Solutions
12-29-2016
08:41 AM
@Matt Burgess If the query returns too many rows in QueryDatabaseTable, can you set Max Rows Per Flow File and Fetch Size to something like 10000? -> yes, I have do that, but it doesn't work... I have 30 000 000 + data in postgreSQL so an exception of OutOfMemory is normal. I'm looking forward to your answer for PutElasticSearchHTTP.
... View more
12-28-2016
08:43 AM
Thank you, sorry for late. I found why it doesn't work... because the query is too huge to execute. The QueryDatabaseTable processor has a problem, it will read all the data then i got an outofmemeory exception ...
Then i tried to modifier GereateTableFetch to add a where condition in order to using index in database not a offset 🙂 OFFSET is bad for skipping previous rows.
so it can generate sql like this
select xxx
From xxxxx
where 200000=>id
order by id
limit 200000
input files
{"gid": 49900, "coordinates": "44.5638145014092331,4.4830168699343016"}
{"gid": 49901, "coordinates": "44.5110194507808501,4.26131697875612936"}
after replaceText
{"index":{"_index":"point_adresse","_type":"default","_id": 650}}
{"gid":650, "coordinates": "46.3066856051698537,5.94381332350432334"}
{"index":{"_index":"point_adresse","_type":"default","_id": 651}}
{"gid":651, "coordinates": "46.2950964034383858,5.93722413905152013"}
there is a /n
mapping
"mappings": {
"default": {
"_all": {
"enabled": true
},
"dynamic_templates": [
{
"string_fields": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"type": "string",
"index": "analyzed",
"omit_norms": true,
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed",
"ignore_above": 256
}
}
}
}
}
],
"properties": {
"coordinates": {
"type": "geo_point"
},
"gid": {
"type": "string"
}
}
}
}
I think it's a problem of ESHTTP processor and processor SQL ...
... View more
12-22-2016
08:09 AM
2016-12-22 08:49:17,342 WARN [Timer-Driven Process Thread-4] o.a.n.p.standard.QueryDatabaseTable QueryDatabaseTable[id=01591001-75cd-17d5-dca1-8bf6e638af98] Processor Administratively Yielded for 1 sec due to processing failure
2016-12-22 08:49:17,342 WARN [Timer-Driven Process Thread-4] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding QueryDatabaseTable[id=01591001-75cd-17d5-dca1-8bf6e638af98] due to uncaught Exception: java.lang.OutOfMemoryError: Java heap space
2016-12-22 08:49:17,342 WARN [Timer-Driven Process Thread-4] o.a.n.c.t.ContinuallyRunProcessorTask
java.lang.OutOfMemoryError: Java heap space I have used arg5=5120M for jvm
... View more
12-21-2016
10:56 AM
Thank for your response
What
exception in index did you receive from ElasticSearch?
→I
have tried use ElasticSearchHttp
processor,
in
ArvoToJson processor I
tried
both none/array for json container options.
If I don't do that I will get an exceptionlike
that.
Were
you pushing documents/data to the wrong index with an incorrect
mapping?
→
no,
because I use httpelasticsearch processor to index the splited data
using the same mapping.
Posting the error from ES will help.
ERROR [Timer-Driven Process Thread-8] o.a.n.p.e.PutElasticsearchHttp PutElasticsearchHttp[id=0159101b-95cc-1764-e634-d90d6b5877ad] Failed to insert StandardFlowFileRecord[uuid=b1f52deb-bd58-4a8c-82dc-7378f0cd3660,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1482315336809-525, container=default, section=525], offset=883475, length=51862],offset=0,name=18050202000067,size=51862] into Elasticsearch due to , transferring to failure
MapperParsingException[failed to parse]; nested: IllegalArgumentException[Malformed content, found extra data after parsing: START_OBJECT];
at org.elasticsearch.index.mapper.DocumentParser.parseDocument(DocumentParser.java:156)
at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:309)
at org.elasticsearch.index.shard.IndexShard.prepareCreate(IndexShard.java:529)
at org.elasticsearch.index.shard.IndexShard.prepareCreateOnPrimary(IndexShard.java:506)
at org.elasticsearch.action.index.TransportIndexAction.prepareIndexOperationOnPrimary(TransportIndexAction.java:214)
at org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary(TransportIndexAction.java:223)
at org.elasticsearch.action.bulk.TransportShardBulkAction.shardIndexOperation(TransportShardBulkAction.java:327)
at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:120)
at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:68)
at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryPhase.doRun(TransportReplicationAction.java:657)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
Caused by: java.lang.IllegalArgumentException: Malformed content, found extra data after parsing: START_OBJECT
at org.elasticsearch.index.mapper.DocumentParser.parseDocument(DocumentParser.java:141)
... 18 more
[2016-12-21 16:25:56,723][DEBUG][action.bulk ] [D'Spayre] [test][3] failed to execute bulk item (index) index {[test][default][AVkh_SQL6SmWoIKNH3EF], source[{"gid": 266328, "coordinates": "43.7092713589835427,1.55720029588910425",
I
created 3 processor group, one of them doesn’t work, when I start
it, it doesn't work and
the
group cant be stopped..
There is 2 in red, i can't do use this processor, i waited 1 h for it ..
I put a video for you.
Failed to load resource: the server responded with a status of 409 (Conflict)
2016-12-21 14:13:41,349 INFO [Flow Service Tasks Thread-1] o.a.nifi.controller.StandardFlowService Saved flow controller org.apache.nifi.controller.FlowController@4428a68b // Another save pending = false
2016-12-21 14:13:51,059 ERROR [Timer-Driven Process Thread-3] o.a.n.p.standard.QueryDatabaseTable QueryDatabaseTable[id=01591001-75cd-17d5-dca1-8bf6e638af98] Unable to execute SQL select query SELECT gid,concat_ws(',',ST_Y(ST_SetSRID(geom_wgs84::geometry,4326)),ST_X(ST_SetSRID(geom_wgs84::geometry,4326))) AS coordinates,id,id_tr,numero,rep,nom_voie,nom_long_voie,code_insee,code_post,type_loc,cote FROM bdadresse.point_adresse due to org.postgresql.util.PSQLException: ERREUR: annulation de la requête à la demande de l'utilisateur: org.postgresql.util.PSQLException: ERREUR: annulation de la requête à la demande de l'utilisateur
2016-12-21 14:13:51,061 ERROR [Timer-Driven Process Thread-3] o.a.n.p.standard.QueryDatabaseTable
org.postgresql.util.PSQLException: ERREUR: annulation de la requête à la demande de l'utilisateur
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2455) ~[postgresql-9.4.1212.jar:9.4.1212]
at org.apache.nifi.processors.standard.QueryDatabaseTable.onTrigger(QueryDatabaseTable.java:241) ~[nifi-standard-processors-1.1.0.2.1.0.0-165.jar:1.1.0.2.1.0.0-165]
➜ ~ nifi.sh dump
Java home: /usr/lib/jvm/java-8-oracle
NiFi home: /home/qfdk/bin/nifi-1.1.0.2.1.0.0-165
Bootstrap Config File: /home/qfdk/bin/nifi-1.1.0.2.1.0.0-165/conf/bootstrap.conf
Exception in thread "main" java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at org.apache.nifi.bootstrap.RunNiFi.dump(RunNiFi.java:700)
at org.apache.nifi.bootstrap.RunNiFi.main(RunNiFi.java:226)
I will share my logs here desktop.zip
... View more
12-19-2016
08:56 AM
Hello,
I have some questions with QueryDatabaseTable and elasticsearch processor.
I tried to get some data from PostgreSQL and index them with elasticsearch.
After converAvroToJSON i got a JSON array like this,
[ {
"gid" : 95167,
"lat" : -0.5930874275427621,
"lon" : 48.9348884422244
}, {
"gid" : 95235,
"lat" : -0.9967898987193282,
"lon" : 48.935096381156605
}, {
"gid" : 95307,
"lat" : -0.7477345135642162,
"lon" : 48.92913514241155
},
......
]
In order to use PutElasticsearch(HTTP) and geometry, i spited them and rebuild a JSON file. To splitJson and transformJSON takes a lot of time, so i want to remove splitJSON processor in order to speed up all the processor, but i cant .. Do you have some ideas ? After JoltTransformJSON, the format likes that. {
"gid" : 799092,
"coordinates" : [ 6.933880894889725, 48.0070784663541 ]
} I need some help to improve the time of executing. I tried to index an array of data into elasticSearch, but i got an exception of index. It means that using the product of ConverAvroToJson processor and put them into ElasticSearch processor. After some try the jetty serveur is down, i have changed JVM memory to 4096 mb and restarted the nifi, some data is locked between 2 processor ... how can i get them ? Web UI cant work. I have read https://community.hortonworks.com/questions/31099/is-it-possible-to-to-perform-bulk-insert-to-es-wit.html, I dont want to use split, because it takes lots of time. Thanks for you.
... View more
Labels:
- Labels:
-
Apache NiFi