Member since
07-29-2020
558
Posts
307
Kudos Received
167
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
112 | 11-28-2024 06:07 AM | |
78 | 11-25-2024 09:21 AM | |
213 | 11-22-2024 03:12 AM | |
116 | 11-20-2024 09:03 AM | |
314 | 10-29-2024 03:05 AM |
10-29-2024
07:51 PM
1 Kudo
Hi @Syed0000 , Do you mind posting what the expected output looks like. Also please provide description of what are you trying to achieve with the transformation?
... View more
10-29-2024
07:01 AM
1 Kudo
Hi @AnikaBaykus , I dont think this is going to work through InvokeHttp. I recommend you use custom Groovy code through Executescript or using Curl through ExecuteStreamCommand. Here are some links that can help you getting started: Curl via StreamCommand: https://community.cloudera.com/t5/Support-Questions/NiFi-Execute-Stream-Command-Curl-Call/m-p/368127 API vs Groovy script: https://stackoverflow.com/questions/57242452/in-nifi-how-to-call-a-rest-service-using-groovy-script ExecuteScript tutorial: https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-1/ta-p/248922 https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-2/ta-p/249018 https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-3/ta-p/249148 If that helps please accept the solution. Thanks
... View more
10-29-2024
03:05 AM
1 Kudo
Hi @rturn , Its an interesting case indeed. I'm not sure if this is intended by design or if its an actual bug. The fact that this happens only after the flowfiles exit the process group lead me to believe that whatever you are doing there it's indirectly causing this error. Explanation: - Typically when you get such an error , it means that you are trying pass number of value arguments to a query that doesnt have a matching number of column placeholders. You query doesnt list columns placeholders (?) hence the value 0 . The question is where is the 1 argument is coming from? - If you read the PutSQL processor description it allows for a value argument placeholder using the question mark character (?) provided argument value\type tuple in the form of sql.args.N.type & sql.args.N.value as flowfile attributes, where N stands for the position of each argument placeholder in the sql command. - Given your data flow, it seems like you are doing some insert sql commands in the upstream process group while utilizing argument value placeholders. - When doing the Merge (regardless how you set the Attribute Strategy) the merged output seem to keep those sql argument type & value attributes that are common (name & value) across the merged flowfiles. - The PutSQL behavior (this is where its a potential bug) seem to always pass any provided sql argument regardless if the actual query has placeholders or not. When the JDBC tries to execute the query it sees you are providing arguments but there is no placeholders and therefore it throws this exception. Mitigation: 1- One way to mitigate this issue is by adding an UpdateAttribute before the final PutSQL to delete all sql placeholder arguments with the following Regix in the "Delete Attribute Expression" property: sql\.args\.[0-9]+\.(type|value) 2- Another option to avoid passing attributes that might indirectly cause unintended consequences to downstream processors through using Process Group Concurrency and merge, as an alternative you can consider something like Wait-Notify pattern, however this will require a redesign of the dataflow. @MattWho @stevenmatison , Do you think this qualifies to be reported as a bug? Steps to Reproduce: 1- Create starting processor like GetFile or GenerateFlowFile 2- Add UpdateAttribute processor to create the following attributes: 3- Add a PutSQL processor that executes any arbitrary crud statement that doesnt have any placeholders. Resolution: Check the sql command for any placeholder and if there is none then dont pass arguments. If that helps please accept the solution. Thanks
... View more
10-25-2024
07:54 PM
2 Kudos
@rturn, Do you mind posting screenshot of your data flow and the configuration of the main processors? It doesnt need to be the exact thing if you can simplify so that you are able to replicate the error that would be fine. This will help us better troubleshoot the issue and replicate it to our environment if needed to see what its going on.
... View more
10-19-2024
07:49 PM
2 Kudos
Hi @AndreyDE , The reason you are getting that many flowfiles is because you are continously running the upstream processor that gets the CSV input on 0 Secs Schedule . You seem to be new to Nifi and its typical beginner mistake. we all have been there :). By default the scheduling on every processor is set to 0 secs in earlier version, but in later releases to help avoid getting this issue the default has changed to 1 min. To fix this, if you are doing testing , I would stop the processor that generates\gets the CSV input and whenever you want to run a test you can right click and select "Run Once". If you are planning to run the flow as batch process where every time you are expecting to git a different file, then go the processor configuration , under Scheduling tab you can adjust the schedule accordingly by selecting either "Timer Schedule" or "Cron Schedule". For more info on scheduling please refer to the following: https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#scheduling-tab https://www.youtube.com/watch?v=pZq0EbfDBy4 Hop that helps.
... View more
10-14-2024
05:21 PM
2 Kudos
Hi @drewski7 , I tried the code you have posted and it worked for me ! Here are the steps I followed: 1- Create Main Folder called TransformOpenskyStates-nar 2- Created TransformOpenskyStates.py with the code you posted under the main folder above. 3- Created the folder structure as follows: 3 - Under the META-INF I have created the MANIFEST.FM and add the following text: Manifest-Version: 1.0
Build-Timestamp: 2024-10-07T16:22:20Z
Nar-Id: TransformOpenskyStates-nar
Nar-Group: nifi.py.processors
Nar-Version: 0.0.2 4- Under NAR-INF I have created an Empty folder of "bundled-dependencies" since you dont seem to have any external dependencies. 5- I have downloaded and installed 7-zip then I went inside the main directory created at step 1 , selected all (2 folders and 1 py file) , right click , select 7-Zip menu item and then select Add to Archive 6- In the 7-Zip Add To Archive window, type the name of your package and save as .zip. No need to change any configuration. If you create a zip file on the main folder level then it will add the main folder to the package and that might cause problem as nifi expects the py file to be on the root level and that could be your probelm. 7- rename the .zip to .nar and then I would try first to place it under the lib folder and if it works you can move it to others. 8 - Restart Nifi and in my case the log file had the following entries regarding this processor: 2024-10-14 20:10:00,495 INFO [main] org.apache.nifi.nar.NarClassLoaders Loaded NAR file: \nifi-2.0.0-M4-dev\.\work\nar\extensions\TransformOpenskyStates-nar.nar-unpacked as class loader org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\TransformOpenskyStates-nar.nar-unpacked]
...
2024-10-14 20:10:02,036 INFO [main] o.a.n.n.StandardExtensionDiscoveringManager Loaded extensions for nifi.py.processors:TransformOpenskyStates-nar:0.0.2 in 3 millis
...
2024-10-14 20:10:14,316 INFO [main] o.a.n.n.StandardExtensionDiscoveringManager Discovered Python Processor TransformOpenskyStates 9- Once UI is up and running , I m able to see and select the processor: When I ran it however I got an error but that is a different story 🙂 Error Message: 20:14:13 EDT
ERROR
8d85ee2d-0192-1000-7594-fe7475f52c1d
PythonProcessor[type=TransformOpenskyStates, id=8d85ee2d-0192-1000-7594-fe7475f52c1d] Failed to transform FlowFile[filename=e8370eae-8bdd-4867-91dc-1416fd5d4727]: py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "F:\nifi-2.0.0-M4-dev\python\framework\py4j\java_gateway.py", line 2466, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "F:\nifi-2.0.0-M4-dev\python\api\nifiapi\flowfiletransform.py", line 33, in transformFlowFile
return self.transform(self.process_context, flowfile)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "F:\nifi-2.0.0-M4-dev\.\work\nar\extensions\TransformOpenskyStates-nar.nar-unpacked\TransformOpenskyStates.py", line 49, in transform
contents = json.loads(flow_file.getContentsAsBytes())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Program Files\Python\311\Lib\json\__init__.py", line 346, in loads
return _default_decoder.decode(s)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Program Files\Python\311\Lib\json\decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Program Files\Python\311\Lib\json\decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0) Im emailing you a copy of the zip file . Try to change it to nar and see if that works. Im suspecting that the way you package (zip) the folder structure is causing the issue. Hope that helps.
... View more
10-13-2024
04:31 PM
1 Kudo
Hi @drewski7 , I never tried this before but I decided to give a shot. What I did is I to take a Python Extension Sample like ChunkDocument and add it as regular python extension just to get all dependencies downloaded. After that I created the folder structure similar to what you have: my-nar.nar
+-- META-INF/
+-- MANIFEST.MF
+-- NAR-INF/
+-- bundled-dependencies/
+-- dependency1
+-- dependency2
+-- etc.
+-- MyProcessor.py Then I followed the following steps: I copied all the downloaded dependencies to NAR-INF/bundled-dependencies I created a MANIFEST.MF and copied the same info you have provided under META-INF/ I copied the ChunkDocument.py and then rename it to MFChunkDocument.py (I will explain later why I renamed to MFCh...) to be on the root level similar to where you have your custom TransformOpenskyStates.py. I Zipped the file and called it to "nifi-ChunkDocument-nar.zip" I rename from .zip to .nar and copied to NIFI-HOME/lib . I think you copied it under NIFI-HOME/extensions which should work as well. I deleted the created python extension for the same processor under work/python/extension to download all dependencies earlier. When I launched Nifi Im able to find the processor and set it up accordingly. The reason I renamed the file from ChunkDocument to MFChunkDocument because there seem to be a bug. Please refer to my comment here. I dont see anything wrong with what you did. However you mentioned your did not include anything under bundled-dependencies ....is this because you dont have any? if you do then you should copy all these dependencies. If that is not the issue , I would look into the processor itself and just for sake of testing add as python extension (not as NAR) to see if it works, if it doesnt then there is something wrong with the processor itself. If you want you can email a copy of your py file and I can try it from my end. Hope that helps. If it does, please accept solution. Thanks
... View more
10-09-2024
08:37 AM
1 Kudo
Can you break the data into smaller chunks ? this way you speed up the process by taking advantage of parallelism and multi threading.
... View more
09-18-2024
04:39 PM
2 Kudos
Hi , well lets start with the last thing you have said because Im carious how its happening that one records override the other record. I assume you are using ExcelReader , correct? If that is the case I have tried creating an excel with two sheets that share same schema and both sheers have the same records as follows: Sheet1: Address1 Sheet2: Address2 I read the file using FetchFile processor then I passed the content to ConvertRecord Processor where the reader is ExcelReader and the Writer JsonRrecordSetWriter configured as follows: ExcelReader: Im passing Avro schema to assign proper field name and type as follows: {
"namespace": "nifi",
"name": "user",
"type": "record",
"fields": [
{ "name": "ID", "type": "int" },
{ "name": "NAME", "type": "string" },
{ "name": "ADDRESS", "type": "string" }
]
} For JsonRecordSetWriter I used with the default settings no changes. Here is how my output looked like which account for all the records from both sheets even with duplicates (1, sam, TX): [ {
"ID" : 1,
"NAME" : "sam",
"ADDRESS" : "TX"
}, {
"ID" : 2,
"NAME" : "Ali",
"ADDRESS" : "WA"
}, {
"ID" : 1,
"NAME" : "sam",
"ADDRESS" : "TX"
}, {
"ID" : 2,
"NAME" : "Ali",
"ADDRESS" : "FL"
} ] So Im carious what is happening in your case so that the record is overwritten. Maybe if we figure out this problem we can solve it as you said by using JOLT. I think Fork\Join Enrichment will still work specially since you are reading the info that you are trying to merge from the same file but the flow is going to be different and you might need two ExcelReader for each sheet which means that you need to read the excel twice. The flow will look like the following in a high level: You can avoid having two excel reader by passing the Sheet info as flowfile attribute. However you still need to read it twice for each sheet and then join the info based on the joining strategy that works best for you. Hope that helps.
... View more
09-18-2024
05:31 AM
1 Kudo
Hi @Crags , It depends. Are you getting this data from the different sources around the same time - may be using common trigger or cron schedule - or are you getting the data independent of each other ? If its the first case, then you might be able to take advantage of ForkEnrichment\JoinEnrichment. The JoinEnrichment provides different merging strategy which you can select from based on the structure and the order of your data. If its the second case and lets assume that you end up saving this information to a database , then you can apply the merging in the data base by passing product data to sql stored proc for example, the stored proc will check if the data exist for the given Id and if it does then apply an update statement otherwise insert statement. Hope that helps.
... View more