Member since
01-27-2023
229
Posts
73
Kudos Received
45
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
618 | 02-23-2024 01:14 AM | |
769 | 01-26-2024 01:31 AM | |
556 | 11-22-2023 12:28 AM | |
1247 | 11-22-2023 12:10 AM | |
1438 | 11-06-2023 12:44 AM |
03-28-2023
02:34 AM
So guys, This time I could really use your help with something because I cannot figure this on my own and neither do I know where to look in the source code exactly. My flow is as follows: ConsumeKafka ----> MergeContent (as I have plenty of small files I prefer to merge them in bigger files for further processing) ----> ReplaceText (I have some empty spaces and I want them removed) ---> PartitionRecord. The problems comes here, in PartitionRecord. I have defined two Controller Services, one Record Reader (CSVReader, with a pre-defined working schema) and and Record Writer (ParquetRecordSetWriter, with the same exact schema as in the CSV reader). I have no strange data types, only a couple of FLOATs and around 100 STRINGS. I defined a property called time, which extracts the value from a field in our File. Unfortunately, when executing the flow, I keep on getting the following error message: " PartitionRecord[id=3be1c42e-5fa9-3144-3365-f568bb616028] Processing halted: yielding [1 sec]: java.lang.IllegalArgumentException: newLimit > capacity: (90 > 82) " I have nothing else in the logs. The data which enters the PartionRecord looks fine to me, but something happens when we transform it from CSV (plain text) to Parquet ... and I do not know at all what to further check. Has anybody encountered such and error and if so, what was the cause and how did you manage to solve it? Thank you in advance 🙂
... View more
Labels:
- Labels:
-
Apache NiFi
03-27-2023
04:17 AM
As far as I know, there is no Kotlin in the software stack ... but you should wait for an answer from somebody with more experience. Nevertheless, in the meantime, you can have a look for yourself, as the code is public: https://github.com/apache/nifi
... View more
03-27-2023
04:13 AM
Now, based on your Flow Logic, where do you encounter problems and what are those problems? In addition, how do you retrieve dynamically the destination and the size at destination? As far as I know, PutFile does not write those attributes so you will have to manually define them in your Flow and I do not see any UpdateAttribute or UpdateContent on your canvas. The size, the file type, the filename and the source are written by default when the file is extracted and your can further use them with the help of the NiFi Expression Language.
... View more
03-27-2023
03:09 AM
hi @NafeesShaikh93, What logs are you trying to push into Graylog? The logs generated by NiFi when transferring the files from Source to Target, or are you talking about other logs? What have you tried and why did it not succeed? Let's start with that, as your Flow Logic was built specific for your use case and somebody else's logic might not suite your needs.
... View more
03-27-2023
12:51 AM
2 Kudos
Hi @ManishR, 1. How to find the processor or processor group by Integration Name/Folder/Event Name? What do you mean by integration name, event name and folder? Folder I assume you refer to Processor Group (external or normal) but integration name and event name are unknown to me. If you are trying it identify Processors, Processor Group, Queues or any other objects from within your Canvas Board, you mostly have two options: 1-you either use the NiFi REST Api (https://nifi.apache.org/docs/nifi-docs/rest-api/index.html ) or 2-you use the search bar in the top right of the screen. 2. How to find the source and destination file details in any of the processor or processor group? What do you mean with this question? In NiFi there are a series of processors which can be used to retrieve SOURCE files (ListSFTP/FetchSFTP/GetSFTP,GenerateFlowFile,InvokeHTTP,ConsumeKafka,ExecuteSQLRecord,etc) and there is a series of processors used to save your data in a TARGET location (like PutFile,PublishKafka, PutGCSObject, PutHDFS, PutDatabaseRecord,PutS3Object, etc). If you are to speak about a certain processor, the source data is within the queue linked to the specific processor, whereas the target data is located within the queue linked from your specific processor to another processor. Yet again, your question is quite general and ambiguous to get a specific answer. Sorry but I do not understand what exactly you are trying to achieve so if you require a more technical and more elaborate answer, you will have to come with a more detailed questions, based on a specific use case - maybe even your NiFi Flow.
... View more
03-26-2023
11:51 PM
@ManishRcan you please describe your problem a little bit better? What did you try to do? Why did it not work? What should have been the expected outcome? What are you actually trying to achieve? What you have provided so far is quite general like " which is the best car in the world " / " how can I find the best milk " / "who came first? the chicken or the egg". As you described your question, the answer is: your existing processors are present on the canvas. The files are in the source, the target or the queues between the processors, whereas the integrations are the integrations which you have constructed or used so far.
... View more
03-26-2023
11:48 PM
@ManishRcan you please describe your problem a little bit better? What did you try to do? Why did it not work? What should have been the expected outcome? What are you actually trying to achieve? What you have provided so far is quite general like " which is the best car in the world " / " how can I find the best milk " / "who came first? the chicken or the egg". As you described your question, the answer is: you can find the source files in the source and the target files in the target, where the source is what you have defined your source to be and the target is the target you have set.
... View more
03-24-2023
10:35 AM
1 Kudo
hi @Satya1, The inactive sessions are getting terminated after 25 mins after the extraction is complete. For example, if you extraction takes 25 minutes to load the data from your database and into the flowfile (no matter the file type), you will keep on seeing those inactive sessions for another 25 more minutes, meaning that they will be completely gone after 50 mins, since you started your flow. If you want, you can reduce the time to a lower value, as 25 is a bit big for most cases. You can modify it into 5 minutes and test it for yourself. I did it and my flow took 2 minutes, the sessions got evicted after 5 and in 7 minutes all the sessions were gone.
... View more
03-24-2023
10:22 AM
hi @Satya1, Yes I did. Eventually i was able to identify what caused the issue. I am not 100% sure that this is the best fix for the problem, but in order to solve my issue I have modified the following two properties within my DBCPConnectionPool Controller Service: Max Total Connections: I have reduced the value because I saw that I did not require so many connections. previously: 51 now: 10 Time Between Eviction Runs: - I have modified the value from a negative value to a positive value. previously: -1 (meaning no idle connection evictor thread will be run) now: 1440000 millis (meaning that my sessions will get evicted after aprox. 25 mins) Let me know if this helped you as well 🙂
... View more
03-24-2023
04:34 AM
@hackerwayhow does your state-management.xml look like? Are you using the embedded zookeeper by any chance?
... View more