Member since
01-27-2023
229
Posts
73
Kudos Received
45
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
661 | 02-23-2024 01:14 AM | |
849 | 01-26-2024 01:31 AM | |
583 | 11-22-2023 12:28 AM | |
1327 | 11-22-2023 12:10 AM | |
1527 | 11-06-2023 12:44 AM |
02-25-2024
11:49 PM
1 Kudo
@SAMSAL I just tried what you have describe using an GenerateFlowFile (where I set your JSON text) and a ValidateRecord and I can't really reproduce your error. (NiFi 1.19.1) I used an JSON Reader, with default settings - Infer Schema and no Date Format set. Next, in terms of writer, as you stated, I used an JsonRecordSetWriter, where I defined your mentioned schema and tried with and without the date format. Each time, my FlowFile when into the valid queue, without any errors. And the file from the valid queue looks like: Maybe there is something extra in your Flow which is causing this error, or maybe your NiFi version is causing this behavior? In terms of properties, the following have been configured in my ValidateRecord Processor:
... View more
02-23-2024
01:14 AM
1 Kudo
@jarviszzzz , well the remaining 111 flowfiles will remain in the queue until the minimum number of entries is reached again. As a best practice, confirmed by the documentation as well, it is recommended to use the property Max Bin Age as well so that you could avoid the behavior your just raised in this post :). Max Bin Age specifies the maximum amount of time that your flow can wait until the flowfiles are merged .... this basically provides a means to set a timeout on your bin so that even if you are no longer receiving any data, the flowfiles won't remain stuck in your queue. For this property you can use any positive integer with the desired time unit (seconds, minutes, hours) In your specific case, if you are going to set the Max Bin Age to 60 minutes, you will generate 10 bins counting 1000 entries and after 60 minutes, if no other data comes in your queue, you will generate a new bin, containing only the remaining 111 entries. More details can be found here, where the logic of this processes is quite well described: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.23.2/org.apache.nifi.processors.standard.MergeContent/additionalDetails.html
... View more
02-23-2024
12:50 AM
@rtambun , I am not quite sure how your data comes out of your Kafka cluster but if a message contains a single row, you could add a LookupRecord before saving the data in your database. With that LookupRecord you will check if those vales are present in your database, with the help of a key and if so, you can send that data into a different flow, otherwise, you save it into your Database. If your data does not come as a single message - single record from KAFKA, if you are not processing lots of data, you could try and split your data into single flowfiles (containing a single record) and process them further as stated above.
... View more
02-21-2024
11:50 PM
1 Kudo
@zukhriddin, when using PutDatabaseRecord, you have to set the Schema Name and the Table Name :). The Catalog Name is not quite necessary as is it not used for Oracle databases. Next, on the Database Type, make sure that you have selected the correct version for your Oracle DB... and of course the Statement Type should be INSERT (at least while you are testing) PS: make sure that the user you have configured in your Database Connection Pooling Service has all the necessary grants to see and modify your table.
... View more
02-21-2024
11:45 PM
2 Kudos
@hegdemahendra , try removing the "jks" string from the properties nifi.registry.security.keystoreType and nifi.registry.security.truststoreType and restart your NiFi Registry Instance. As well as for nifi.registry.security.needClientAuth=True, which should have no value. As I see you are using HTTP and not HTTPS, whereas you defined some properties relevant to HTTPS. My suggestion is either to go with full HTTP or with full HTTPS and not combine the properties.
... View more
02-21-2024
11:37 PM
1 Kudo
@ShyamKumar, if you expect an answer, try explaining exactly what you are trying to achieve, what you tried and did not work - why it did not work and so on 🙂 In addition, try saying the version of NiFi you are using and provide some further details. Based on how you wrote the post, you can use expect a generic answer like you can use a Remote Process Group to call a specific Process Group from your NiFi Instance. Another option would be to use the combination of Input Port and Output Port. Or you can even develop a custom script and execute it on a scripted processor. Again, to get a tailored answer to your use case, try providing some more details 🙂
... View more
02-14-2024
05:32 AM
2 Kudos
@iriszhuhao, to be really honest, your problem is not 100% related to NiFi, as this is more of an architectural question. How would you achieve what you are trying to achieve in another ETL Graphical Tool (Informatica, Talend, Knime, etc)? That logic should be applied in your case as well, as most of the tools perform the same thing but in a different manner. Now, going back to your question, bear in mind that this might not be the best solution, but you could implemented a signaling table where you store the execution of each of your 9 flows. Basically you create a database table in which you will store something like PROCESSOR_ID, PROCeSSOR_NAME, EXECUTION_TIME, SUCCES. From each of your success flows, you will go into a processor where you generate this data and afterwards insert the content into your table. Next, out of your final processor (the one saving the data in your database) you could into an RetryFlowFile where you set a penalty of X seconds/hours/minutes. From that flow file, you then go into an ExecuteSQLRecord and interogate your table. The output of the ExecuteSQLRecord is then sent into a query processor and check if all 9 flows have entered a row in your table. If all 9 inserted a row in your table, you proceed to do what you want. If not all 9 flows inserted a row in your table, you go into an processor where you call the NiFi's Rest API to check the status of your affected processors (you can do that with an ExecuteStreamCommand, ExecuteScript, InvokeHTTP, etc). You can call the /nifi-api/flow/processors/YOUR_MISSING_PROCESSOR/status API and check the results (statsLastRefreshed, flowFilesIn, flowFilesOut) to see whether something came or should have came through that flow. Next, based on your logic, you can proceed next to checking other processors or call the stored procedure to end your flow. Lastly, you can truncate your table once your execution is done, if you do not want to keep some historical data there. Again, this might not be the best solution, but this is the first thing I would try if I were in your situation.
... View more
02-12-2024
12:53 AM
2 Kudos
@PriyankaMondal first of all, I would personally not recommend you using single disk. All NiFi's repositories should be saved on a different disks especially in case of failures. Next, the size of your HDD/SSD should be configured based on your use case - working with lots of data require more space than working with fewer data (in terms of number of files not size of files). Secondly, the cluster configuration is not something you can read a forum and think that you can apply 100% in your case. When configuring a NiFi cluster you need to take several aspects into consideration, mostly related to what you are planning to do with your cluster. For example, if you are going to use NiFi for lots of streaming of data, it is recommend to go with SSDs instead of simple HDDs. If you are going to work mostly with batch and process/modify the information you are receiving (for example adding new columns based on other columns, aggregating data, using lookup, etc), you would need higher RAM and CPU, comparing to what you would need in case of a streaming cluster. Lastly, the number of nodes is again in correlation to the scope of the cluster. Using it for batching and working on larger chunks of data (for example extracting data from a DB and inserting in into AWS) does not require as many nodes as a streaming cluster which reads a constant amount of data from Kafka for example. My suggestion would be to first identify what use cases you are going to implement on your NiFi cluster, understand the data throughput and build your cluster based on those findings. And for safety reasons, use multiple HDDs/SDDs for NiFi's repositories and don't use the default configuration, with them saved into your conf folder 🙂
... View more
01-26-2024
01:31 AM
1 Kudo
@ALWOSABY, in EvaluateJsonPath, you set EvaluateJsonPath to write the information in the content of the flow file, and not in the attributes. In your next processor, you are using the FF_CONTENT as an attribute and it does not exist ... hence your flow not working as expected. Do you require the content of the JSON or do you need to generate some new attributes based on the content of the flow file? If you require the content as well, I would suggest to add a new EvaluateJsonPath after your first one and modify to write the information as attributes and generate your desired attributes. If you only require some new attributes, modify the logic in your first EvaluateJsonPath to write the data as attributes and generate your desired attributes, using the Record Path for JSON. Have a look at your first EvaluateJsonPath and make sure that the data is extracted correctly, because it first glance, I am not quite certain it works properly.
... View more
12-08-2023
10:38 AM
@SAMSAL thank you for your response 🙂 The thing is that I 100% require the value to be extracted and exported as long with logical type = local-timestamp-millis, as this is the only format which gets inserted automatically in a DATETIME column within BigQuery.... meaning that using a string is not acceptable in any way. After exporting the data out of the database, the column looks as follows within the AVRO File. This value get transformed into UTC automatically, due to the AVRO Reader and AVRO Writer. The AVRO Writer contained the schema long with local-timestamp-millis when extracting the value. However, if I open the AVRO File anywhere else but NiFi, I get the value 1337633975000, which is ok. Now, I am trying to transform it within Europe/Bucharest timezone, as I require it like this. I have tried using ${field.value:isEmpty():ifElse(${field.value}, ${field.value:toDate():format('yyyy-MM-dd HH:mm:ss','Europe/Bucharest'):toNumber()})} as well, but without success as toNumber will somehow get "" as input :(. I also tried extracting the date from the DB directly as long .... the value gets transformed directly into UTC. I have no idea what to try next and how to proceed further.
... View more