Member since
01-27-2023
229
Posts
73
Kudos Received
45
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
197 | 02-23-2024 01:14 AM | |
263 | 01-26-2024 01:31 AM | |
229 | 11-22-2023 12:28 AM | |
493 | 11-22-2023 12:10 AM | |
657 | 11-06-2023 12:44 AM |
05-23-2023
01:21 AM
@mks27, To be really honest, when it comes to login and certificates, I am in no position to provide an input, as I never got to fully understand how they work 😞 While you are waiting for a better answer, from somebody with far more experience and knowledge as me, I would try the following: - Assuming that you configured LDAP authentication, I assume that you have the User and Policies Menu in your NiFi Menu (top Right). - Now, based on your error, I see that you are using the user mohit.kumar and you have no privileges to do anything. - What I would suggest is to login with the user which was provided as Initial Admin Identity and provide your user (mohit.kumar) with all the necessary roles to perform the action you are trying to perform. See: https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#initial-admin-identity Have a look here as well: https://pierrevillard.com/2017/01/24/integration-of-nifi-with-ldap/comment-page-1/
... View more
05-22-2023
06:55 AM
In this case, you will need some extra steps. You could try something like: You have an ExecuteSQLRecord or ExecuteSQL, configured with the Database Connection Pooling Service and the Record Writer (if using ExecuteSQLRecord). Next, I would link the success queue to an ExtractText Processor. In this Processor I would add a new property called number_of_rows having the value " .* ". This will extract the value returned by count(*) and save it as an attribute in your FlowFile. Once you did this, you can link the success queue to an RouteOnAttribute Processor. Here, you will define a new property named bigger_as_zero and use the NiFi's EL to validate if the attribute is bigger than zero or not. If bigger, you will send that message to the processor in which you have linked this queue. Otherwise, meaning that the value is zero, you can route the unmatched queue to another ExecuteSQL/ExecuteSQLRecord. You could also apply NiFi's EL to check if the value is exactly zero, if that is what you are looking for and route the file accordingly. NiFi's EL: https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html Everything mentioned above has to be done, in addition to what @SAMSAL already said, with the fact that you need to modify you running schedule from 0 seconds to a greater value. 0 means that you will constantly execute this SQL Record. Here you will have to modify the value based on your project's requirements.
... View more
05-15-2023
05:33 AM
1 Kudo
@DrManu, I do not think that you will find a processor in NiFi which will extract only the folder name out of your location 😞 You will either have to write your own processor or use a combination of several others, already part of NiFi. How I would honestly try the mentioned scenario: - An ExecuteStreamCommand Processor in which you have defined a custom made script which will read your folder structure and generate a JSON File, where each row is basically a complete path to a specific Folder. - Afterwards, you could use an SplitJson to generate a single FlowFile for each Folder and send it down your stream for further processing.
... View more
05-12-2023
04:35 AM
@sarithe, What is the format of the record you are trying to convert? What is the data type of the field you are trying to convert? Are you trying to modify the value of a record or the value of an attribute? If you are trying to update the values from within each record, you should try using an UpdateRecord processor, in which you define a Record Writer and a Record Reader. Next, you can add another property in your processor and define it like: Property: /your_column --> pay attention to the slash in front as it is very important. Value: ${field.value:multiply(1000):toNumber():toDate("yyyyMMdd", "GMT"):format("yyyyMMdd")} --> this is just an example, as I do not know how your data looks like and how you want it displayed. You can use NiFi's EL to define the exact format you required, but make sure you use field.value if you want to modify the value from within that specific column.
... View more
05-12-2023
04:28 AM
@ushasri, If you are reading an Excel File, you can use an ConvertRecord processor, in which you define an Record Reader (CSVReader most likely) where you tell NiFi how to read the value (value separator). Next, with the help of a Record Writer (I assume CSV again or maybe Avro) you define how your data should look like, including the header. For both the Reader and the Writer you will have to define a schema, where you can do the replace of blank space with underline.
... View more
05-12-2023
12:37 AM
@acasta, Don't get me wrong, but I highly doubt that NiFi is ignoring/deleting somehow the files, without your intervention or configurations. What I would suggest you are the following two actions: - First of all, add an LogMessage/LogAttribute after your have unzipped all those files. Basically, double your success queue from your processors where you unzip your tar file and log each file which was extracted. In this way, you get a list with all the files extracted out of your zip file. Make sure to set the Queue as single node to check nifi-app.logs on a single node. - Next, add another LogMessage/LogAttribute after your processor with which you save the data into your Bucket. Send the name of the unzipped files into the logs to get a list with all the files which have saved into your bucket.Make sure to set the Queue as single node to check nifi-app.logs on a single node. Afterwards, you can compare the lists and see if you have extracted and saved all your files. If the lists are 1:1, it means that the problem is not related to NiFi itself, but to something else: like another system doing something in your bucket, having files with the same name which get over written, etc. Another option would be to use DEBUG on all your processors and use RUN ONCE until you process everything you have to process and analyze in real time what is happening.
... View more
05-11-2023
12:08 AM
@acasta, What do you mean when saying that some files are not being processed? Are you not extracting all the ZIP Files from S3 or are the files extracted out of the zip files not present in your newly created folder? Have you checked if the files which are getting extracted have the same name? For example in zip 1 you have a file called ingested_data.csv and in your zip 2 you have the same exact file, but with different content? If this is the case when you files get saved in your folder (no matter the if we are talking about S3, GCP, PutFile, Azure or anything else) they will get overwritten with the latest file.
... View more
05-10-2023
11:37 PM
1 Kudo
@SandyClouds, I do not have a template because I no longer have access to that project but I have provided you will all the info you need to develop your own system 🙂 And it mostly depends on your use case. You can use any API you require for your use case. But in terms of processors your require the following: InvokeHTTP to perform the API Call, EvaluateJSONPath + SplitJSON to extract the relevant lines out of your bulletin boards and RouteOnAttribute to identify the errors you need. Other than that, your imagination is your best friend.
... View more
05-10-2023
01:57 AM
2 Kudos
@SandyClouds, You are correct when saying that you have no failure queue from CaptuerChangeMySQL and you are not missing anything 🙂 I do not know the perfect way to solve your issue, but for me, I have used the following two scenarios to achieve what you are looking for: 1) REST API to Bulletin Board and extract only the messages generated for the ID of your CaptuerChangeMySQL. To achieve this, you will need an InvokeHTTP Processor, which will cll your REST API and extract everything from your bulletin board. Next, you can use an EvaluateJsonPathto extract what you want from the logs (for example $.bulletinBoard.bulletins). Next, you can use a SplitJSON to split every bulletin message into single FlowFiles. Afterwards, you add a new EvaluateJsonPath where you extract the error level, error message, error source name and the timestamp and save them as attributes ($.bulletin.level , $.bulletin.message, $.bulletin.sourceName, $.bulletin.timestamp). Using an RouteOnAttribute, you then filter out your Error Messages generated by your CaptuerChangeMySQL Processor and send them by email. 2) TailFile on your nifi-app.logs and filter out all the error messages generated by your CaptuerChangeMySQL Processor. The results are then sent per email. Again, not the best solution but at least this got the things working for me 🙂
... View more
05-09-2023
11:40 PM
@nuxeo-nifi, What I would try to implement as a quick solution is: 1. Configure your ValidateRecord (or even maybe try ValidateCSV) so it identifies when your Records from your CSV are not valid. 2. From ValidateRecord, you have 3 possible queues: failure --> which you might want connect to an alert system, like PutEmail for example. valid --> which you might want to connect to your further processing. invalid --> what you are actually looking for :). Here, you can use an InvokeHTTP to call NiFi's REST API and stop your ValidateRecord Processor. In this way, if a single message was rejected, your entire flow will be stopped... this is actually not the best way to do things but if this is your project requirement, this is what you should do. 2a. From ValidateCSV, you have 2 possible queues: valid --> which you might want to connect to your further processing. invalid --> what you are actually looking for :). Here, you can use an InvokeHTTP to call NiFi's REST API and stop your ValidateCSV Processor. In this way, if a single message was rejected, your entire flow will be stopped... this is actually not the best way to do things but if this is your project requirement, this is what you should do. 3. If you are using this Flow in a so to say Streaming mode (you get files every second), you should modify ValidateRecord to run every 5 seconds or every 2 seconds (or something like that) so you have time to stop your processor using InvokeHTTP. If you leave it by default on Run Schedule 0 sec, you will process some additional messages before being able to stop your processor. Documentation: NiFi Rest API: https://nifi.apache.org/docs/nifi-docs/rest-api/index.html NiFi ValidateRecord: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.20.0/org.apache.nifi.processors.standard.ValidateRecord/index.html NiFi InvokeHTTP: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.20.0/org.apache.nifi.processors.standard.InvokeHTTP/index.html NiFi ValidateCSV: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.20.0/org.apache.nifi.processors.standard.ValidateCsv/index.html How To ValidateRecord: https://community.cloudera.com/t5/Community-Articles/Using-Apache-NiFi-to-Validate-that-Records-Adhere-to-a/ta-p/247299
... View more