Member since
01-27-2023
125
Posts
31
Kudos Received
23
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
43 | 05-31-2023 03:01 AM | |
143 | 05-22-2023 06:55 AM | |
145 | 05-15-2023 05:33 AM | |
284 | 05-10-2023 01:57 AM | |
97 | 05-09-2023 11:40 PM |
05-12-2023
04:28 AM
@ushasri, If you are reading an Excel File, you can use an ConvertRecord processor, in which you define an Record Reader (CSVReader most likely) where you tell NiFi how to read the value (value separator). Next, with the help of a Record Writer (I assume CSV again or maybe Avro) you define how your data should look like, including the header. For both the Reader and the Writer you will have to define a schema, where you can do the replace of blank space with underline.
... View more
05-12-2023
12:37 AM
@acasta, Don't get me wrong, but I highly doubt that NiFi is ignoring/deleting somehow the files, without your intervention or configurations. What I would suggest you are the following two actions: - First of all, add an LogMessage/LogAttribute after your have unzipped all those files. Basically, double your success queue from your processors where you unzip your tar file and log each file which was extracted. In this way, you get a list with all the files extracted out of your zip file. Make sure to set the Queue as single node to check nifi-app.logs on a single node. - Next, add another LogMessage/LogAttribute after your processor with which you save the data into your Bucket. Send the name of the unzipped files into the logs to get a list with all the files which have saved into your bucket.Make sure to set the Queue as single node to check nifi-app.logs on a single node. Afterwards, you can compare the lists and see if you have extracted and saved all your files. If the lists are 1:1, it means that the problem is not related to NiFi itself, but to something else: like another system doing something in your bucket, having files with the same name which get over written, etc. Another option would be to use DEBUG on all your processors and use RUN ONCE until you process everything you have to process and analyze in real time what is happening.
... View more
05-11-2023
12:08 AM
@acasta, What do you mean when saying that some files are not being processed? Are you not extracting all the ZIP Files from S3 or are the files extracted out of the zip files not present in your newly created folder? Have you checked if the files which are getting extracted have the same name? For example in zip 1 you have a file called ingested_data.csv and in your zip 2 you have the same exact file, but with different content? If this is the case when you files get saved in your folder (no matter the if we are talking about S3, GCP, PutFile, Azure or anything else) they will get overwritten with the latest file.
... View more
05-10-2023
11:37 PM
1 Kudo
@SandyClouds, I do not have a template because I no longer have access to that project but I have provided you will all the info you need to develop your own system 🙂 And it mostly depends on your use case. You can use any API you require for your use case. But in terms of processors your require the following: InvokeHTTP to perform the API Call, EvaluateJSONPath + SplitJSON to extract the relevant lines out of your bulletin boards and RouteOnAttribute to identify the errors you need. Other than that, your imagination is your best friend.
... View more
05-10-2023
01:57 AM
2 Kudos
@SandyClouds, You are correct when saying that you have no failure queue from CaptuerChangeMySQL and you are not missing anything 🙂 I do not know the perfect way to solve your issue, but for me, I have used the following two scenarios to achieve what you are looking for: 1) REST API to Bulletin Board and extract only the messages generated for the ID of your CaptuerChangeMySQL. To achieve this, you will need an InvokeHTTP Processor, which will cll your REST API and extract everything from your bulletin board. Next, you can use an EvaluateJsonPathto extract what you want from the logs (for example $.bulletinBoard.bulletins). Next, you can use a SplitJSON to split every bulletin message into single FlowFiles. Afterwards, you add a new EvaluateJsonPath where you extract the error level, error message, error source name and the timestamp and save them as attributes ($.bulletin.level , $.bulletin.message, $.bulletin.sourceName, $.bulletin.timestamp). Using an RouteOnAttribute, you then filter out your Error Messages generated by your CaptuerChangeMySQL Processor and send them by email. 2) TailFile on your nifi-app.logs and filter out all the error messages generated by your CaptuerChangeMySQL Processor. The results are then sent per email. Again, not the best solution but at least this got the things working for me 🙂
... View more
05-09-2023
11:40 PM
@nuxeo-nifi, What I would try to implement as a quick solution is: 1. Configure your ValidateRecord (or even maybe try ValidateCSV) so it identifies when your Records from your CSV are not valid. 2. From ValidateRecord, you have 3 possible queues: failure --> which you might want connect to an alert system, like PutEmail for example. valid --> which you might want to connect to your further processing. invalid --> what you are actually looking for :). Here, you can use an InvokeHTTP to call NiFi's REST API and stop your ValidateRecord Processor. In this way, if a single message was rejected, your entire flow will be stopped... this is actually not the best way to do things but if this is your project requirement, this is what you should do. 2a. From ValidateCSV, you have 2 possible queues: valid --> which you might want to connect to your further processing. invalid --> what you are actually looking for :). Here, you can use an InvokeHTTP to call NiFi's REST API and stop your ValidateCSV Processor. In this way, if a single message was rejected, your entire flow will be stopped... this is actually not the best way to do things but if this is your project requirement, this is what you should do. 3. If you are using this Flow in a so to say Streaming mode (you get files every second), you should modify ValidateRecord to run every 5 seconds or every 2 seconds (or something like that) so you have time to stop your processor using InvokeHTTP. If you leave it by default on Run Schedule 0 sec, you will process some additional messages before being able to stop your processor. Documentation: NiFi Rest API: https://nifi.apache.org/docs/nifi-docs/rest-api/index.html NiFi ValidateRecord: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.20.0/org.apache.nifi.processors.standard.ValidateRecord/index.html NiFi InvokeHTTP: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.20.0/org.apache.nifi.processors.standard.InvokeHTTP/index.html NiFi ValidateCSV: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.20.0/org.apache.nifi.processors.standard.ValidateCsv/index.html How To ValidateRecord: https://community.cloudera.com/t5/Community-Articles/Using-Apache-NiFi-to-Validate-that-Records-Adhere-to-a/ta-p/247299
... View more
05-09-2023
07:00 AM
@nuxeo-nifi, the processors you are referring to do not belong to any NiFi Version (Cloudera or Open-Source), meaning that they were built in house, specially for you and your project. In this case, you would need to speak to those who have developed those processors and identify the application logic. Once you have that, you can use PutMail to send email notifications and InvokeHTTP to do the other actions. I assume that from your processors you have has a failed connection queue, which might be linked to an PutMail Processor, in which you define whatever you want to be send as notification In case of no failures, you can link the success queue out of your nuxeo processor and into InvokeHTTP and perform the call you require. For that, make sure that all your certificates are in place and allow connection between the systems. Otherwise, you won't be able to use InvokeHTTP and you would have to find another solution, like a script.
... View more
05-09-2023
12:19 AM
1 Kudo
@nuxeo-nifi, I am not quite familiar with Nuxeo, but as far as I know, you could use the REST API to batch upload documents into the Nuxeo System. To achieve this, you could easily use InvokeHTTP to perform the REST API calls to your Nuxeo endpoint. Or you can develop a custom script and execute it within an ExecuteStreamCommand Processor. If it comes to the Nuxeo database (I do not know if this is necessary or what sort of DB you have configured), but you can use the PutSQL (or any DB related Processor) to save your data in the DB, assuming that you have the JDBC connection details configured into your DBConnectionPool
... View more
05-08-2023
03:01 AM
@Manimaran, For the future, it would really help if you could mention your NiFi Version and the Database you are using, because each version (NiFi and the DB) has different ways of working. Besides that, it would also help to know what processors you are using so that we could understand your flow and provide a personalized answer. As for your problem, without any other information, you could use an ExecuteStreamCommand in which you define an Python/Bash/Groovy (anything you want basically) which will call your stored procedure. You link your Processor which is saving the data into your database to ExecuteStrreamCommand using the success Queue and once the data is inserted into your database, the flowfile will go into your ExecuteStreamCommand and call your script, which will execute your stored procedure. In newer Versions of NiFi you could also try to call the procedure using PutSQL. Or you could further try ExecuteScript (have a look here: http://funnifi.blogspot.com/2016/04/sql-in-nifi-with-executescript.html). As for the Wait/Notify processor, as far as I know, there is nothing implemented to be used directly (out of the box) and you will have to use a combination of multiple processors to achieve this. A detailed answer to this would be: https://pierrevillard.com/2018/06/27/nifi-workflow-monitoring-wait-notify-pattern-with-split-and-merge/
... View more
05-04-2023
06:30 AM
You do not install the Cloudera version on your laptop 🙂 You need the Cloudera DataFlow for Public Cloud (CDF-PC), meaning that we are talking here about a license and some services. As @steven-matison already provided you with the perfect answer for your question, he might also be in the position to further assist you with everything you need to know about the Cloudera Data Flow and their Public Cloud. Unfortunately I am still learning about what Cloudera offers and how, so I am not the best one to answer your question. If you are going to use NiFi for some real data processing, I strongly recommend you to have a look to Cloudera Data Flow, as this will solve many issues and headaches 🙂
... View more