Member since
07-29-2020
574
Posts
320
Kudos Received
175
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
245 | 12-20-2024 05:49 AM | |
280 | 12-19-2024 08:33 PM | |
291 | 12-19-2024 06:48 AM | |
242 | 12-17-2024 12:56 PM | |
233 | 12-16-2024 04:38 AM |
10-19-2024
07:49 PM
2 Kudos
Hi @AndreyDE , The reason you are getting that many flowfiles is because you are continously running the upstream processor that gets the CSV input on 0 Secs Schedule . You seem to be new to Nifi and its typical beginner mistake. we all have been there :). By default the scheduling on every processor is set to 0 secs in earlier version, but in later releases to help avoid getting this issue the default has changed to 1 min. To fix this, if you are doing testing , I would stop the processor that generates\gets the CSV input and whenever you want to run a test you can right click and select "Run Once". If you are planning to run the flow as batch process where every time you are expecting to git a different file, then go the processor configuration , under Scheduling tab you can adjust the schedule accordingly by selecting either "Timer Schedule" or "Cron Schedule". For more info on scheduling please refer to the following: https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#scheduling-tab https://www.youtube.com/watch?v=pZq0EbfDBy4 Hop that helps.
... View more
10-14-2024
05:21 PM
2 Kudos
Hi @drewski7 , I tried the code you have posted and it worked for me ! Here are the steps I followed: 1- Create Main Folder called TransformOpenskyStates-nar 2- Created TransformOpenskyStates.py with the code you posted under the main folder above. 3- Created the folder structure as follows: 3 - Under the META-INF I have created the MANIFEST.FM and add the following text: Manifest-Version: 1.0
Build-Timestamp: 2024-10-07T16:22:20Z
Nar-Id: TransformOpenskyStates-nar
Nar-Group: nifi.py.processors
Nar-Version: 0.0.2 4- Under NAR-INF I have created an Empty folder of "bundled-dependencies" since you dont seem to have any external dependencies. 5- I have downloaded and installed 7-zip then I went inside the main directory created at step 1 , selected all (2 folders and 1 py file) , right click , select 7-Zip menu item and then select Add to Archive 6- In the 7-Zip Add To Archive window, type the name of your package and save as .zip. No need to change any configuration. If you create a zip file on the main folder level then it will add the main folder to the package and that might cause problem as nifi expects the py file to be on the root level and that could be your probelm. 7- rename the .zip to .nar and then I would try first to place it under the lib folder and if it works you can move it to others. 8 - Restart Nifi and in my case the log file had the following entries regarding this processor: 2024-10-14 20:10:00,495 INFO [main] org.apache.nifi.nar.NarClassLoaders Loaded NAR file: \nifi-2.0.0-M4-dev\.\work\nar\extensions\TransformOpenskyStates-nar.nar-unpacked as class loader org.apache.nifi.nar.NarClassLoader[.\work\nar\extensions\TransformOpenskyStates-nar.nar-unpacked]
...
2024-10-14 20:10:02,036 INFO [main] o.a.n.n.StandardExtensionDiscoveringManager Loaded extensions for nifi.py.processors:TransformOpenskyStates-nar:0.0.2 in 3 millis
...
2024-10-14 20:10:14,316 INFO [main] o.a.n.n.StandardExtensionDiscoveringManager Discovered Python Processor TransformOpenskyStates 9- Once UI is up and running , I m able to see and select the processor: When I ran it however I got an error but that is a different story 🙂 Error Message: 20:14:13 EDT
ERROR
8d85ee2d-0192-1000-7594-fe7475f52c1d
PythonProcessor[type=TransformOpenskyStates, id=8d85ee2d-0192-1000-7594-fe7475f52c1d] Failed to transform FlowFile[filename=e8370eae-8bdd-4867-91dc-1416fd5d4727]: py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "F:\nifi-2.0.0-M4-dev\python\framework\py4j\java_gateway.py", line 2466, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "F:\nifi-2.0.0-M4-dev\python\api\nifiapi\flowfiletransform.py", line 33, in transformFlowFile
return self.transform(self.process_context, flowfile)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "F:\nifi-2.0.0-M4-dev\.\work\nar\extensions\TransformOpenskyStates-nar.nar-unpacked\TransformOpenskyStates.py", line 49, in transform
contents = json.loads(flow_file.getContentsAsBytes())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Program Files\Python\311\Lib\json\__init__.py", line 346, in loads
return _default_decoder.decode(s)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Program Files\Python\311\Lib\json\decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Program Files\Python\311\Lib\json\decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0) Im emailing you a copy of the zip file . Try to change it to nar and see if that works. Im suspecting that the way you package (zip) the folder structure is causing the issue. Hope that helps.
... View more
10-13-2024
04:31 PM
1 Kudo
Hi @drewski7 , I never tried this before but I decided to give a shot. What I did is I to take a Python Extension Sample like ChunkDocument and add it as regular python extension just to get all dependencies downloaded. After that I created the folder structure similar to what you have: my-nar.nar
+-- META-INF/
+-- MANIFEST.MF
+-- NAR-INF/
+-- bundled-dependencies/
+-- dependency1
+-- dependency2
+-- etc.
+-- MyProcessor.py Then I followed the following steps: I copied all the downloaded dependencies to NAR-INF/bundled-dependencies I created a MANIFEST.MF and copied the same info you have provided under META-INF/ I copied the ChunkDocument.py and then rename it to MFChunkDocument.py (I will explain later why I renamed to MFCh...) to be on the root level similar to where you have your custom TransformOpenskyStates.py. I Zipped the file and called it to "nifi-ChunkDocument-nar.zip" I rename from .zip to .nar and copied to NIFI-HOME/lib . I think you copied it under NIFI-HOME/extensions which should work as well. I deleted the created python extension for the same processor under work/python/extension to download all dependencies earlier. When I launched Nifi Im able to find the processor and set it up accordingly. The reason I renamed the file from ChunkDocument to MFChunkDocument because there seem to be a bug. Please refer to my comment here. I dont see anything wrong with what you did. However you mentioned your did not include anything under bundled-dependencies ....is this because you dont have any? if you do then you should copy all these dependencies. If that is not the issue , I would look into the processor itself and just for sake of testing add as python extension (not as NAR) to see if it works, if it doesnt then there is something wrong with the processor itself. If you want you can email a copy of your py file and I can try it from my end. Hope that helps. If it does, please accept solution. Thanks
... View more
10-09-2024
08:37 AM
1 Kudo
Can you break the data into smaller chunks ? this way you speed up the process by taking advantage of parallelism and multi threading.
... View more
09-18-2024
04:39 PM
2 Kudos
Hi , well lets start with the last thing you have said because Im carious how its happening that one records override the other record. I assume you are using ExcelReader , correct? If that is the case I have tried creating an excel with two sheets that share same schema and both sheers have the same records as follows: Sheet1: Address1 Sheet2: Address2 I read the file using FetchFile processor then I passed the content to ConvertRecord Processor where the reader is ExcelReader and the Writer JsonRrecordSetWriter configured as follows: ExcelReader: Im passing Avro schema to assign proper field name and type as follows: {
"namespace": "nifi",
"name": "user",
"type": "record",
"fields": [
{ "name": "ID", "type": "int" },
{ "name": "NAME", "type": "string" },
{ "name": "ADDRESS", "type": "string" }
]
} For JsonRecordSetWriter I used with the default settings no changes. Here is how my output looked like which account for all the records from both sheets even with duplicates (1, sam, TX): [ {
"ID" : 1,
"NAME" : "sam",
"ADDRESS" : "TX"
}, {
"ID" : 2,
"NAME" : "Ali",
"ADDRESS" : "WA"
}, {
"ID" : 1,
"NAME" : "sam",
"ADDRESS" : "TX"
}, {
"ID" : 2,
"NAME" : "Ali",
"ADDRESS" : "FL"
} ] So Im carious what is happening in your case so that the record is overwritten. Maybe if we figure out this problem we can solve it as you said by using JOLT. I think Fork\Join Enrichment will still work specially since you are reading the info that you are trying to merge from the same file but the flow is going to be different and you might need two ExcelReader for each sheet which means that you need to read the excel twice. The flow will look like the following in a high level: You can avoid having two excel reader by passing the Sheet info as flowfile attribute. However you still need to read it twice for each sheet and then join the info based on the joining strategy that works best for you. Hope that helps.
... View more
09-18-2024
05:31 AM
1 Kudo
Hi @Crags , It depends. Are you getting this data from the different sources around the same time - may be using common trigger or cron schedule - or are you getting the data independent of each other ? If its the first case, then you might be able to take advantage of ForkEnrichment\JoinEnrichment. The JoinEnrichment provides different merging strategy which you can select from based on the structure and the order of your data. If its the second case and lets assume that you end up saving this information to a database , then you can apply the merging in the data base by passing product data to sql stored proc for example, the stored proc will check if the data exist for the given Id and if it does then apply an update statement otherwise insert statement. Hope that helps.
... View more
09-13-2024
06:34 AM
2 Kudos
Hi @moonspa , You might want to look into ForkEnrichment/JoinEnrichment processors. Lets assume that CSV has the latest information that needs to be merged into the Database, then using processor like GetFile\FetchFile you get the CSV file, then you use ForkEnrichment processor to fork the flow into two: originl (csv) and enrichment using enrichment relationship for the DB, then you connect the result from the DB query (using ExecuteSQLRecord for example) and the original to the JoinEnrichment processor where you decide what is the merging strategy. For example you can use SQL merge strategy so that you can link records using the ID with Full Join. This will give you the full list from both csv and the dB, however since this not real sql you cant use sql function like isNull or conditional statement to set the status but from there you can use QueryRecord processor where you can utilize SQL Calcite syntax to generate the status (new, same , deleted, changed) column. Keep in mind as the documentation on the Fork\JoinEnrichment processor indicate that using SQL strategy with large amount of data might consume a lot of jvm heap which can lead to out of memory errors. To avoid this look into the section "More Complex Joining Strategies" toward the end to use a different approach with the Wrapper strategy alongside ScriptedTransformRecord processor. Hope that helps. let me know if you have more questions. If you found this is helpful please accept the solution. Thanks
... View more
09-11-2024
09:54 PM
Hi @mjmoore , Im not gradle expert but it seems like it has plugins for nifi: https://plugins.gradle.org/search?term=nifi will that work?
... View more
09-07-2024
07:58 AM
2 Kudos
Hi @xtd , Welcome to the community. you definitely look like newbie :), but dont worry, as this is common when you start with Nifi but with time and experience you get better for sure. There are many tutorials out there to help you get started that I would recommend going through to understand how nifi works and what are the best practices. However one crucial concept that every beginner needs to be familiar with early on can be found in series of youtube videos by Mark Payne who is one the creators of Nifi around Nifi Anti Pattern that I recommend you to watch. I can see many issues with this flow. Above all its not doing it the nifi way. Here are some notes on some of the main issues: 1- When you work with formats like CSV, Json , xml ...etc. You need to take advantage of Nifi out of the box capabilities to parse the data and not doing that using replace text which could be inefficient. 2- Its not recommended to use ExtractText to extract the whole content of a flowfile into an attribute when you cant control how big the content can be. you will hear that a lot that attribute are stored in JVM heap which could be limited and expensive which can cause problem if you have large content. 3-When passing parameters to your SQL query through file attribute the N placeholder in sql.args.N.value stands for the parameter index in the query , for example if you the following insert statement: insert into Table1 (col1, col2) values (?,?) Then the processor expects two set of sql attributes where for the first arg (N=0 ) and the second (N=1) as in sq1.args.0.value, sql.args.1.value...etc. The way I would design this flow depends on really what are you trying to do after the ExecuteSQL. The result of the ExecuteSQL if you pass in the list of ID's correctly and based on the the configuration will return a single flow file with the list of records matching the ID's in an Avro format, so what are you going to do with this result? My guess and based on similar scenarios you probably want to process each records on its own wither you want to do data enrichment or some API call ...etc. Based on that here is how I would design this flow: 1- Get the File from the source (GetFile, GetSFTP...etc.) 2- Use SplitRecord to to get every json record into singleflow and convert it into into format that is easy to parse and extract attributes using methods other than Regex in ExtractText, here is an example: Im converting to Json because I can extract the ID using json path as we will see in the next step. You can use the default setting for the reader and writer services. 3- User EvaluateJsonPath to extract the ID into an attribute. This way we know that the value we are going to extract is single integer value and we wont be concerned much about heap storing large content. 4- Fetch the record corresponding to each ID from SQL. I know you probably think that this is going to do sql transaction for each ID vs one select for all and maybe this is the pitfall of this design but those transaction are going to be short and simple and probably perform better specially when you have a cluster where do load balancing or you can increase the number of threads on the processor itself . As I mentioned above the ExecuteSQL will give you the result in Avro format which you cant do much parsing with unless you convert to other parsable format (json, csv, xml...etc) using ConvertRecord , however you can save the the need for ConvertRecord by using ExecuteSQLRecord instead of ExecuteSQL where you can set the record writer to whatever format before sending the result to the success relationship. Lets assume I want to convert the result back to CSV, the configuration would be: Once you get the record in the intended format you can do whatever needed afterward. I think you have enough to get going. Let me know if you have more questions. If you find this is helpful please accept the solution. Thanks
... View more
09-07-2024
12:59 AM
4 Kudos
Hi @shiva239 , I dont think there is a problem using the DuplicateFlowFile processor in production environment. If its really intended for test environment then it wont be provided as an option to begin with. Actually when I search the community for question around flowfile replication I found this post that mentions this processor as part of the solution but I dont see any comments advising against it siting test vs prod regardless if it helped in that case or not. However, if you are not comfortable using the Duplicate processor, there are of course different ways of doing the same thing in Nifi. Lets assume you want to write code to address this problem instead of using Nifi, how would you do it without having to repeat the same code for each target DB? I think one of the first things that comes to mind is using a loop. You can create a loop in Nifi in multiple ways, the easiest I can think of is to use the RetryFlowFile processor. Although its intended more for error handling you can still use it to replicate a loop flow. All you have to do after getting the file is to set the maximum number of retries to how many times ( or steps ) you want to execute against and then redirect the retry relationship to itself and the next step (assign DB & Table based on index). Once the number of retries exceeded you can handle that via the retries_exceeded relationship or you can simply terminate the relationship so its completed. The RetryFlowFile will set a counter on each retried flowfile which you can use to assign the DB and Table accordingly. Here is an example of simple flow that loops 5 times against a given flowfile input from GenerateFlowFile: This how the RetryFlowFile is configured: The Retry Attribute will store the retries count in the provided attribute name "flowfile.retries". If you run the GenerateFlowFile once and look at the LogMessage data provenance you will see it was executed 5 times against the same content : If you check the attributes on each event flow file you will see the flowfiles.retries is populated with the nth time the flowfile was retried. Keep in mind the data provenance stores the last event at the top which means the first event flowfile attribute will have the value of 5: hope that helps. If it does, please accept the solution. Thanks
... View more