Created on 12-14-2020 11:58 PM - edited 12-14-2020 11:59 PM
Hello,
I'm facing a weird issue with jolt. I have a flowfile which is record-oriented, one JSON object per line with the following structure:
{"aleas": [{object1}, {object2}, {object3}]}
and why I basically want to do is to get rid of this "aleas" root key andhave something like this:
[{object1}, {object2}, {object3}]
I've tested this spec on the Jolt demo site:
[
{
"operation": "shift",
"spec": {
"aleas": {
"*": []
}
}
}
]
But when I run it on Nifi (lastest release) using a JoltTransformRecord processor, I get the following error message:
2020-12-15 07:50:17,415 ERROR [Timer-Driven Process Thread-8] o.a.n.p.jolt.record.JoltTransformRecord JoltTransformRecord[id=654dabc3-0176-1000-0c3a-067d307c6f07] Unable to transform StandardFlowFileRecord[uuid=b818aa99-b538-48bb-942e-c39d70854c53,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1608018617233-9, container=default, section=9], offset=570949, length=1329453],offset=0,name=60dcc444-f06a-4c65-b667-8309583eb782_Feuil1.csv,size=1329453] due to org.apache.nifi.processor.exception.ProcessException: Error transforming the first record: org.apache.nifi.processor.exception.ProcessException: Error transforming the first record
org.apache.nifi.processor.exception.ProcessException: Error transforming the first record
at org.apache.nifi.processors.jolt.record.JoltTransformRecord.onTrigger(JoltTransformRecord.java:335)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1174)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
I use a basic jsonTreeReader as record reader, with all the options set to default.
The funny part is that if I put a split record processor and process each JSON flowfile using JoltTransformJSON, it works nicely. Neverheless, I'd like to avoid this solution which is really bad for performance and breaks my whole "record-oriented" flow.
Any idea?
Thaks for your support
Stéphane
Created 12-15-2020 07:19 AM
it may be an adjustment of the jolttransformrecord
what settings do you have?
what reader are you using
do you have a schema
Created 12-15-2020 07:57 AM
Hello @TimothySpann
Thanks for your reply. I use a basic JsonTreeReader with no schema, just infer schema.
Created 12-15-2020 08:15 AM
it may be guessing the schema wrong, that is my thought.
it may not be seeing those seperate lines as separate json files.
try a schema with just those 3 fields you want
Created 12-15-2020 10:42 PM
I assume that this question follows the previous one
So I tried with my test-JSON to do the same like you.
{
"myJSON": [
{
"myfield": "JustForHavingJson",
"myfield1": "A",
"myfield2": "C"
},
{
"myfield": "JustForHavingJson",
"myfield1": "B",
"myfield2": "C"
},
{
"myfield": "JustForHavingJson",
"myfield1": "C",
"myfield2": ""
},
{
"myfield": "JustForHavingJson",
"myfield1": "E",
"myfield2": ""
},
{
"myfield": "JustForHavingJson",
"myfield1": "X",
"myfield2": ""
},
{
"myfield": "JustForHavingJson",
"myfield1": "",
"myfield2": ""
},
{
"myfield": "JustForHavingJson",
"myfield1": "D",
"myfield2": "G"
}
]
}
But neither without nor with schema the JoltTransformRecord works. Getting the same error as you (NiFi 1.11.1).
One possibility and quick solution I found is to:
If you get the JoltTransformRecord work I would like to know how. Thanks.
Created 12-15-2020 11:31 PM
Now I came one step further. I defined this schema for the JsonTreeReader in JoltTransformRecord.
{
"name": "HCC_JOLTTRANSFORMRECORD_IN",
"type": "record",
"namespace": "HCC_JOLTTRANSFORMRECORD_IN",
"fields": [
{
"name": "myJSON",
"type": {
"type": "array",
"items": {
"name": "myJSON_record",
"type": "record",
"fields": [
{
"name": "myfield",
"type": "string"
},
{
"name": "myfield1",
"type": "string"
},
{
"name": "myfield2",
"type": "string"
}
]
}
}
}
]
}
So the error
Error transforming the first record:
is gone!
Now get another error concerning the Writer-schema:
JoltTransformRecord[id=65b2b5fd-0176-1000-ffff-ffffd0f23bd9] Unable to transform StandardFlowFileRecord[uuid=7e4fe006-1eb0-44cd-9e16-f4c8a8c533df,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1608074221236-4, container=default, section=4], offset=584493, length=780],offset=0,name=d92eab41-fa79-441f-a5d9-c6e7f6be10c0,size=780] due to org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value [[Ljava.lang.Object;@1b321d9e] of type class [Ljava.lang.Object; to Record for field r: Cannot convert value [[Ljava.lang.Object;@1b321d9e] of type class [Ljava.lang.Object; to Record for field r
Working on it...
Created 12-16-2020 12:31 AM
When I use your above JOLT-spec within JoltTransformJSON it works fine.
No need of readers, writers, schemas.
Output:
[ {
"myfield" : "JustForHavingJson",
"myfield1" : "A",
"myfield2" : "C"
}, {
"myfield" : "JustForHavingJson",
"myfield1" : "B",
"myfield2" : "C"
}, {
"myfield" : "JustForHavingJson",
"myfield1" : "C",
"myfield2" : ""
}, {
"myfield" : "JustForHavingJson",
"myfield1" : "E",
"myfield2" : ""
}, {
"myfield" : "JustForHavingJson",
"myfield1" : "X",
"myfield2" : ""
}, {
"myfield" : "JustForHavingJson",
"myfield1" : "",
"myfield2" : ""
}, {
"myfield" : "JustForHavingJson",
"myfield1" : "D",
"myfield2" : "G"
} ]
Created 12-16-2020 03:45 AM
Hello @justenji
Thanks a lot for the time you spend on my issue, I really appreciate.
Yes, as I mentionned at the beguinning of my post, it works with basic JoltTranformJSON on a single JSON entry, and this is what I'm doing now: split my records and then use this processor. But I want to keep the record-oriented approach which is really more efficient regarding performances.
I wanted to test some different thing regarding schema, as suggested by @TimothySpann . I guess we need to tell Jolt that the output will be an array of record. I've tried various attempts with avro schema but no luck. Actually, I've even tried to use inferSchema to create a schema, but the AvroRegistrySchema doesn't want to take take it, and the error message I have is "Not a named Type"
Here is the basic avro schema:
{
"type": "array",
"namespace":"nothing",
"items": {
"type": "record",
"name": "steps",
"fields": [
{
"name": "index",
"type": "string",
"doc": "Type inferred from index"
}
]
}
}
Do we have avro guru around the corner?
Thanks
Stéphane
Created 12-16-2020 11:44 PM
Have only found the correct avro-schema for the JsonTreeReader (see above), unfortunately I failed at the output array definition. When trying to use an online generated output schema, errors occurred continuously.
Hope that an avro-guru can come forward and help.
I am looking forward to the solution, which I will find after my vacation in January.
I'm outta here now, all the best!
Created 12-17-2020 01:24 AM
Hi @justenji
Same for me, I've tried to use avro schema generator, including the inferschema from Nifi, but no luck.