Member since
12-11-2017
21
Posts
3
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
168 | 12-21-2020 02:44 AM | |
336 | 07-10-2020 08:42 AM |
12-21-2020
02:44 AM
Hello @Lokeswar The queryRecord processor does exactly what you want. But you need to have your job in a record-oriented approach, using json reader and json writter. Then you don't work with flow-file attribute, but directly with your flow-file content. So, you have your CSV: - use a convertRecord to transform it in record flow-file, using CSVreader as reader, and JSONTreeWriter as out writer as you want JSON - add a queryRecord processor, and your query should look like this: SELECT * FROM FLOWFILE WHERE resourceState='INSTALLED' OR resourceState='RETIRED' warning: don't use double quote for values, just simple quote After this, you will have a condition at the output of the queryrecord that you can plugto your next processor.
... View more
12-17-2020
11:06 PM
1 Kudo
hello @spa I've been looking for this also but it doesn't exist. Then, you can use a script (python, groovy,...) In case you have performance issue with scriptprocessor, you can improve the situation using the trick here: InvokeScriptedProcessor template (a faster ExecuteScript)
... View more
12-17-2020
01:24 AM
Hi @justenji Same for me, I've tried to use avro schema generator, including the inferschema from Nifi, but no luck.
... View more
12-16-2020
11:36 PM
hello @Anurag007 Your description is a little bit 'dry'. Anyway, you can probably do what you want with the following processors: - getFile (or better, listFile + fetchFile) to get the content of your files - routeOnContent, which allows you to define some routing rules based on file content using regexp You will find easily many examples of how to use these processors, probably using the search feature of this site
... View more
12-16-2020
03:45 AM
Hello @justenji Thanks a lot for the time you spend on my issue, I really appreciate. Yes, as I mentionned at the beguinning of my post, it works with basic JoltTranformJSON on a single JSON entry, and this is what I'm doing now: split my records and then use this processor. But I want to keep the record-oriented approach which is really more efficient regarding performances. I wanted to test some different thing regarding schema, as suggested by @TimothySpann . I guess we need to tell Jolt that the output will be an array of record. I've tried various attempts with avro schema but no luck. Actually, I've even tried to use inferSchema to create a schema, but the AvroRegistrySchema doesn't want to take take it, and the error message I have is "Not a named Type" Here is the basic avro schema: {
"type": "array",
"namespace":"nothing",
"items": {
"type": "record",
"name": "steps",
"fields": [
{
"name": "index",
"type": "string",
"doc": "Type inferred from index"
}
]
}
} Do we have avro guru around the corner? Thanks Stéphane
... View more
12-15-2020
07:57 AM
Hello @TimothySpann Thanks for your reply. I use a basic JsonTreeReader with no schema, just infer schema.
... View more
12-14-2020
11:58 PM
Hello, I'm facing a weird issue with jolt. I have a flowfile which is record-oriented, one JSON object per line with the following structure: {"aleas": [{object1}, {object2}, {object3}]} and why I basically want to do is to get rid of this "aleas" root key andhave something like this: [{object1}, {object2}, {object3}] I've tested this spec on the Jolt demo site: [
{
"operation": "shift",
"spec": {
"aleas": {
"*": []
}
}
}
] But when I run it on Nifi (lastest release) using a JoltTransformRecord processor, I get the following error message: 2020-12-15 07:50:17,415 ERROR [Timer-Driven Process Thread-8] o.a.n.p.jolt.record.JoltTransformRecord JoltTransformRecord[id=654dabc3-0176-1000-0c3a-067d307c6f07] Unable to transform StandardFlowFileRecord[uuid=b818aa99-b538-48bb-942e-c39d70854c53,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1608018617233-9, container=default, section=9], offset=570949, length=1329453],offset=0,name=60dcc444-f06a-4c65-b667-8309583eb782_Feuil1.csv,size=1329453] due to org.apache.nifi.processor.exception.ProcessException: Error transforming the first record: org.apache.nifi.processor.exception.ProcessException: Error transforming the first record
org.apache.nifi.processor.exception.ProcessException: Error transforming the first record
at org.apache.nifi.processors.jolt.record.JoltTransformRecord.onTrigger(JoltTransformRecord.java:335)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1174)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) I use a basic jsonTreeReader as record reader, with all the options set to default. The funny part is that if I put a split record processor and process each JSON flowfile using JoltTransformJSON, it works nicely. Neverheless, I'd like to avoid this solution which is really bad for performance and breaks my whole "record-oriented" flow. Any idea? Thaks for your support Stéphane
... View more
Labels:
12-14-2020
07:23 AM
Hello @justenji Thanks for the detailed explanation, this is also what I have tested on my side and in my opinion this does the job being "record oriented" 🙂 Have a nice day
... View more
12-09-2020
02:12 AM
I need to test this, but actually it seems that the queryRecord is exactly why I need. It is possible to build some complex conditions using SQL-like language on the record content and then perform some routing based on that
... View more
12-09-2020
01:25 AM
Hi @TimothySpann Thanks so much for pointing me to this processor, it looks so great!!
... View more
12-08-2020
08:24 AM
Hello @justenji Thanks anyway to have taken some time to look after my issue. For now, the only solution I have found is a groovy script. Stéphane
... View more
12-07-2020
08:37 AM
Hello @justenji Thanks for you reply. The problem here is that you are working with attributes and expression language, which would mean to have one flowfile per json object. I'm trying to have a record-oriented flow-file, which means that everything happens with recordpath and / or Jolt transformations. My flow file have multiple JSON records and as such attributes would take multiple values. And I don't want to split my records, this is really resource consuming. Best regards, Stéphane
... View more
11-30-2020
09:09 AM
hello, Have you tried with a syslog listener on Nifi side?
... View more
11-30-2020
09:02 AM
Hello, Nifi can easily get data from a database, do you really need a Python script for that? If you really want to do what you describe here, I think you should use updateAttribute to set your set your database information in the flowfile attributes and then from your script use the getAttribute function to get it.
... View more
11-30-2020
08:51 AM
Hello, Can you show the config of your CSV recordReader processor? Your CSV looks nice, there is no need to replace anything here. In order to simplify the debug, you can also select "Infer schema" instead of using a Avro schema. It's of course better to work with Avro schema when you go to production
... View more
11-30-2020
08:37 AM
Hello all, I'd like to use the updateRecord processor to create some fields in my JSON. But I'd like to use some conditions to do that. In the recordpath documentation, I can see that there is a "filter" capability, which works like this: /field[filter]/path, but as far as I can see it's not possible to have multiple conditions in this filter. I'd like to do something like: if (field1 == A or field1 == B) {
field2 = C
} else if (field1 == D and field1 != E) {
field2 = G
} The example is silly but this is to give you the idea. By the way, except using a script, I don't know how to do that, except may be extracting the fields, put it as attribute and play with expression language but this would mean working with single-record flows and I don't want to do that. Any idea? Thanks for your suggestions Stéphane
... View more
Labels:
07-10-2020
08:42 AM
2 Kudos
Answer to myself: I solved the problem by converting this boolean to string using Jolt: {
"operation": "modify-overwrite-beta",
"spec": {
"status": {
"*": {
"isCurrentStatus": "=toString"
}
}
}
}
... View more
07-02-2020
07:37 AM
Hello, I have a piece of JSON on which I want to apply an updateRecord processor using some filter capabilities. My JSON looks like this: "status": [
{
"code": "InProgress",
"isCurrentStatus": false,
"startDateTime": "2020-03-05T20:45:00Z"
},
{
"code": "Pending",
"isCurrentStatus": true,
"startDateTime": "2020-03-05T21:20:00Z"
}
], And my RecordPath expression is the following: /status[0..-1][./isCurrentStatus = true]/code This expression doesn't work and I have an error in my logs file stating about incorrect syntax: Unexpected token ']' at line 1 If I put the following syntax: /status[0..-1][./isCurrentStatus = "true"]/code it doesn't give anymore error, but of course the filter doesn't work. So, I have the feeling that recordPath filter doesn't support the use of boolean values. My Nifi is Nifi 1.9.0 coming with HDF 3.4. Any idea? Thanks a lot, Stéphane
... View more
11-21-2019
11:48 PM
Hello @JoeWitt , Thanks for your feedback. Actually, my flowFile is created by a syslog processor. I see no error in the Nifi log file regarding processing, and by the way I think I collect all my data correctly. Stéphane
... View more
11-07-2019
01:10 AM
Hello, By chance, have you found anything around this problem? Nothing on my side unfortunatly 😞
... View more
10-08-2019
01:14 AM
Hello, I've exactly the same problem, I need to restart Nifi on a regular basis to have content_repository cleaned. When I go in data provenance, I can see that all the ContentFiles are in DROP state. My flow is really basic: syslog -> updateattribute -> HDFS Please note that at the syslog level I work with batch of 1000 files. What is the detail of your flow? PS: Yes, I've read this: https://community.cloudera.com/t5/Community-Articles/Understanding-how-NiFi-s-Content-Repository-Archiving-works/ta-p/249418 and also this https://community.cloudera.com/t5/Community-Articles/How-to-determine-which-FlowFiles-are-associated-to-the-same/ta-p/249185
... View more