Member since
07-29-2020
540
Posts
289
Kudos Received
163
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
98 | 10-29-2024 03:05 AM | |
356 | 10-14-2024 05:21 PM | |
285 | 09-07-2024 07:58 AM | |
415 | 09-07-2024 12:59 AM | |
517 | 09-03-2024 12:36 AM |
10-31-2024
04:55 PM
2 Kudos
Hi @Syed0000 , Sorry for the delay. You json is quite complex and very nested which make jolt very hard to write. You probably know your data very well and my recommendation before writing jolt for such Json is that you try to simplify it first by stripping un need blocks and\or flattening the structure where the transformation is not going to be on the same nested level. I honestly tried but when the jolt got so nested it got harder to reference the upper fields and upper array indexes to maintain the same array position and then maintain the required grouping of fields, and may be that is the draw back of using jolt spec in these scenarios (without prior simplification of course :). This leads me to the second option, which I think I have mentioned to you in prior post regarding using JSLT Transformation instead. JSLT is better option in these cases because you can traverse the structure much easier , and since you have some condition on how to set fields and values this also would be easier to achieve with some expression language like if-else . For example if we take the transformation to create catalogCondition strucutre which seems to the most complex, here is how the jstl looks like: let paths ={
"paths":[for(.tiers)
{
for(.conditions)
"catalogCondition":
{
"category":
[for(.conditionsMerch)
{
"include": if(.excludeInd==0) "yes" else "no",
"categoryList":[.webCategoryId]
}
if(.merchandiseLevel!=5 and not(.keyItemNum))]
}
+
{
"products" : [for(.conditionsMerch)
{
"include": if(.excludeInd==0) "yes" else "no",
"productList":[.keyItemNum]
}
if(not(.webCategoryId))]
}
}
]
}
{
"pricePaths": $paths
} I know this might look a little intimidating at beginning but its not near as bad as when you try to do it with jolt. I understand that jstl is bit of learning curve but I believe it would save you tons of time long term specially when dealing with long, complex and nested Json transformation. I know this is not the answer you were looking for but hopefully this can help you when facing other json transformation challenges in the future. Good luck.
... View more
10-30-2024
04:41 AM
1 Kudo
Hi @fisblack , Welcome to the community. Your transformation is kind of odd. Usually transformation is driven by existing values or structure even when generating new fields with defaults. I have never seen a case where newly generated values drive transformation of existing ones. I assume you are getting the list of courseSubCategory from somewhere and the script is rather dynamic utilizing some expression language & attributes. if you provide some context to how the list is coming about and what are you trying to achieve , maybe we can provide you with more accurate solution. There are multiple ways of solving your problem as is - assuming you always want to replicate an existing field(s) against newly generated 3 elements array with default values of 1,2&3- , here is one of them: [
{
"operation": "shift",
"spec": {
"*": "existing.&",
"#1|#2|#3": "courseSubCategory"
}
},
{
"operation": "shift",
"spec": {
"courseSubCategory": {
"*": {
"@(2,existing)": "[&1]",
"@": "[&1].courseSubCategory"
}
}
}
}
] If you want to make this more dynamic in terms of number of "courseSubCatgory", you can construct the array assignment string "#1|#2|#3" in some upstream processors and store the value as flow attribute (let says its called the same) , then you can make it more dynamic by using the flowfile attribute where the assignment of the courseSubCategory in the first shift will look like this: "${courseSubCategory}": "courseSubCategory" Hope that helps. If it does help, please accept the solution. Thanks
... View more
10-29-2024
07:51 PM
1 Kudo
Hi @Syed0000 , Do you mind posting what the expected output looks like. Also please provide description of what are you trying to achieve with the transformation?
... View more
10-29-2024
07:01 AM
1 Kudo
Hi @AnikaBaykus , I dont think this is going to work through InvokeHttp. I recommend you use custom Groovy code through Executescript or using Curl through ExecuteStreamCommand. Here are some links that can help you getting started: Curl via StreamCommand: https://community.cloudera.com/t5/Support-Questions/NiFi-Execute-Stream-Command-Curl-Call/m-p/368127 API vs Groovy script: https://stackoverflow.com/questions/57242452/in-nifi-how-to-call-a-rest-service-using-groovy-script ExecuteScript tutorial: https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-1/ta-p/248922 https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-2/ta-p/249018 https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-3/ta-p/249148 If that helps please accept the solution. Thanks
... View more
10-29-2024
03:05 AM
1 Kudo
Hi @rturn , Its an interesting case indeed. I'm not sure if this is intended by design or if its an actual bug. The fact that this happens only after the flowfiles exit the process group lead me to believe that whatever you are doing there it's indirectly causing this error. Explanation: - Typically when you get such an error , it means that you are trying pass number of value arguments to a query that doesnt have a matching number of column placeholders. You query doesnt list columns placeholders (?) hence the value 0 . The question is where is the 1 argument is coming from? - If you read the PutSQL processor description it allows for a value argument placeholder using the question mark character (?) provided argument value\type tuple in the form of sql.args.N.type & sql.args.N.value as flowfile attributes, where N stands for the position of each argument placeholder in the sql command. - Given your data flow, it seems like you are doing some insert sql commands in the upstream process group while utilizing argument value placeholders. - When doing the Merge (regardless how you set the Attribute Strategy) the merged output seem to keep those sql argument type & value attributes that are common (name & value) across the merged flowfiles. - The PutSQL behavior (this is where its a potential bug) seem to always pass any provided sql argument regardless if the actual query has placeholders or not. When the JDBC tries to execute the query it sees you are providing arguments but there is no placeholders and therefore it throws this exception. Mitigation: 1- One way to mitigate this issue is by adding an UpdateAttribute before the final PutSQL to delete all sql placeholder arguments with the following Regix in the "Delete Attribute Expression" property: sql\.args\.[0-9]+\.(type|value) 2- Another option to avoid passing attributes that might indirectly cause unintended consequences to downstream processors through using Process Group Concurrency and merge, as an alternative you can consider something like Wait-Notify pattern, however this will require a redesign of the dataflow. @MattWho @stevenmatison , Do you think this qualifies to be reported as a bug? Steps to Reproduce: 1- Create starting processor like GetFile or GenerateFlowFile 2- Add UpdateAttribute processor to create the following attributes: 3- Add a PutSQL processor that executes any arbitrary crud statement that doesnt have any placeholders. Resolution: Check the sql command for any placeholder and if there is none then dont pass arguments. If that helps please accept the solution. Thanks
... View more
10-25-2024
07:54 PM
2 Kudos
@rturn, Do you mind posting screenshot of your data flow and the configuration of the main processors? It doesnt need to be the exact thing if you can simplify so that you are able to replicate the error that would be fine. This will help us better troubleshoot the issue and replicate it to our environment if needed to see what its going on.
... View more
10-19-2024
07:49 PM
2 Kudos
Hi @AndreyDE , The reason you are getting that many flowfiles is because you are continously running the upstream processor that gets the CSV input on 0 Secs Schedule . You seem to be new to Nifi and its typical beginner mistake. we all have been there :). By default the scheduling on every processor is set to 0 secs in earlier version, but in later releases to help avoid getting this issue the default has changed to 1 min. To fix this, if you are doing testing , I would stop the processor that generates\gets the CSV input and whenever you want to run a test you can right click and select "Run Once". If you are planning to run the flow as batch process where every time you are expecting to git a different file, then go the processor configuration , under Scheduling tab you can adjust the schedule accordingly by selecting either "Timer Schedule" or "Cron Schedule". For more info on scheduling please refer to the following: https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#scheduling-tab https://www.youtube.com/watch?v=pZq0EbfDBy4 Hop that helps.
... View more
10-14-2024
05:21 PM
2 Kudos
Hi @drewski7 , I tried the code you have posted and it worked for me ! Here are the steps I followed: 1- Create Main Folder called TransformOpenskyStates-nar 2- Created TransformOpenskyStates.py with the code you posted under the main folder above. 3- Created the folder structure as follows: 3 - Under the META-INF I have created the MANIFEST.FM and add the following text: Manifest-Version: 1.0
Build-Timestamp: 2024-10-07T16:22:20Z
Nar-Id: TransformOpenskyStates-nar
Nar-Group: nifi.py.processors
Nar-Version: 0.0.2 4- Under NAR-INF I have created an Empty folder of "bundled-dependencies" since you dont seem to have any external dependencies. 5- I have downloaded and installed 7-zip then I went inside the main directory created at step 1 , selected all (2 folders and 1 py file) , right click , select 7-Zip menu item and then select Add to Archive 6- In the 7-Zip Add To Archive window, type the name of your package and save as .zip. No need to change any configuration. If you create a zip file on the main folder level then it will add the main folder to the package and that might cause problem as nifi expects the py file to be on the root level and that could be your probelm. 7- rename the .zip to .nar and then I would try first to place it under the lib folder and if it works you can move it to others. 8 - Restart Nifi and in my case the log file had the following entries regarding this processor: 2024-10-14 20:10:00,495 INFO [main] org.apache.nifi.nar.NarClassLoaders Loaded NAR file: \nifi-2.0.0-M4-dev\.\work\nar\extensions\TransformOpenskyStates-nar.nar-unpacked as class loader org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\TransformOpenskyStates-nar.nar-unpacked]
...
2024-10-14 20:10:02,036 INFO [main] o.a.n.n.StandardExtensionDiscoveringManager Loaded extensions for nifi.py.processors:TransformOpenskyStates-nar:0.0.2 in 3 millis
...
2024-10-14 20:10:14,316 INFO [main] o.a.n.n.StandardExtensionDiscoveringManager Discovered Python Processor TransformOpenskyStates 9- Once UI is up and running , I m able to see and select the processor: When I ran it however I got an error but that is a different story 🙂 Error Message: 20:14:13 EDT
ERROR
8d85ee2d-0192-1000-7594-fe7475f52c1d
PythonProcessor[type=TransformOpenskyStates, id=8d85ee2d-0192-1000-7594-fe7475f52c1d] Failed to transform FlowFile[filename=e8370eae-8bdd-4867-91dc-1416fd5d4727]: py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "F:\nifi-2.0.0-M4-dev\python\framework\py4j\java_gateway.py", line 2466, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "F:\nifi-2.0.0-M4-dev\python\api\nifiapi\flowfiletransform.py", line 33, in transformFlowFile
return self.transform(self.process_context, flowfile)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "F:\nifi-2.0.0-M4-dev\.\work\nar\extensions\TransformOpenskyStates-nar.nar-unpacked\TransformOpenskyStates.py", line 49, in transform
contents = json.loads(flow_file.getContentsAsBytes())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Program Files\Python\311\Lib\json\__init__.py", line 346, in loads
return _default_decoder.decode(s)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Program Files\Python\311\Lib\json\decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Program Files\Python\311\Lib\json\decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0) Im emailing you a copy of the zip file . Try to change it to nar and see if that works. Im suspecting that the way you package (zip) the folder structure is causing the issue. Hope that helps.
... View more
10-13-2024
04:31 PM
1 Kudo
Hi @drewski7 , I never tried this before but I decided to give a shot. What I did is I to take a Python Extension Sample like ChunkDocument and add it as regular python extension just to get all dependencies downloaded. After that I created the folder structure similar to what you have: my-nar.nar
+-- META-INF/
+-- MANIFEST.MF
+-- NAR-INF/
+-- bundled-dependencies/
+-- dependency1
+-- dependency2
+-- etc.
+-- MyProcessor.py Then I followed the following steps: I copied all the downloaded dependencies to NAR-INF/bundled-dependencies I created a MANIFEST.MF and copied the same info you have provided under META-INF/ I copied the ChunkDocument.py and then rename it to MFChunkDocument.py (I will explain later why I renamed to MFCh...) to be on the root level similar to where you have your custom TransformOpenskyStates.py. I Zipped the file and called it to "nifi-ChunkDocument-nar.zip" I rename from .zip to .nar and copied to NIFI-HOME/lib . I think you copied it under NIFI-HOME/extensions which should work as well. I deleted the created python extension for the same processor under work/python/extension to download all dependencies earlier. When I launched Nifi Im able to find the processor and set it up accordingly. The reason I renamed the file from ChunkDocument to MFChunkDocument because there seem to be a bug. Please refer to my comment here. I dont see anything wrong with what you did. However you mentioned your did not include anything under bundled-dependencies ....is this because you dont have any? if you do then you should copy all these dependencies. If that is not the issue , I would look into the processor itself and just for sake of testing add as python extension (not as NAR) to see if it works, if it doesnt then there is something wrong with the processor itself. If you want you can email a copy of your py file and I can try it from my end. Hope that helps. If it does, please accept solution. Thanks
... View more
10-09-2024
08:37 AM
1 Kudo
Can you break the data into smaller chunks ? this way you speed up the process by taking advantage of parallelism and multi threading.
... View more