Member since
01-27-2023
215
Posts
61
Kudos Received
42
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
62 | 11-22-2023 12:10 AM | |
167 | 11-06-2023 12:44 AM | |
267 | 11-02-2023 02:01 AM | |
350 | 10-18-2023 11:37 PM | |
315 | 10-09-2023 12:36 AM |
11-22-2023
12:28 AM
@CommanderLaus: First things first, as I see in your netcat command, you are connecting to port 31510, whereas in your error message it seems that you are going on port 1025. Something is not right here and you need to check your configurations. Try a netscan on port 1025 as well and see if you have any connectivity. And besides netscan try using telnet as well. Next, regarding your DBCP Connection Pool, in the property Database Driver Location(s), I highly recommend you to write the full path to the JAR File and not using "." or any other shortcuts.
... View more
11-22-2023
12:19 AM
@Fanxxx, the first question would be if your GetMongo writes the flowfile in a queue when nothing was found in the DB, or does it log something in the Bulletin Board? If a flowfile gets generated and sent to the failed queue, you can link that queue to your next processor and using the NiFi's Expression Language, you can perform any action you desire. However, if nothing gets sent in the failure queue, you will need to create something else. You would need an InvokeHTTP in which you call NiFi' REST Api and extract the Bulletin Board Errors. You then filter out the messages generated by your GetMongo Processor (using it's unique ID) and proceed by extracting what you need out of it. If within your error message you will have all your necessary information, you can extract that information and save it as attributes, send then the flowfile to further processing and process it using NiFi's Expression Language. If the required information is not present in your error message, you will need to extract the query you tried to perform and extract the required information from there. Next, you will basically use the same logic overall, extract the information as attributes and send them down the stream to further processing using NiFi's EL. NiFi's Expression Language: https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html NiFi's REST API: https://nifi.apache.org/docs/nifi-docs/rest-api/index.html
... View more
11-22-2023
12:10 AM
@Rohit1997jio, As @joseomjr already pointed out, by doing this, you defeat the single purpose of using Kafka. As you already know, Kafka is a stream-processing platform and basically function in a very basic way as a queue of messages. Integrating Kafka with NiFi, especially using the processor ConsumeKafka, you basically create a bucket at the end of the queue. As long as messages are present in the queue (Kafka in your case), you will have messages arriving in your bucket (you NiFi processing layer in your case). When you do not have any messages in the kafka system, your ConsumeKafka processor will be in a let's call it idle state, meaning that it will not waste resources in vain - it will however use some resources to check whether new messages arrived or not. That being said, I see no point in trying to kill a connection which is NOT affecting the involved systems in any way and basically defeats the entire purpose of using NiFi and Kafka. However, if you still want to achieve this, you will need to put some extra effort in doing this. First of all, you need to create a flow which checks the state of the desired processor using the NiFi's REST API - achievable in many ways, like InvokeHTTP, ExecuteStreamCommand, etc. If nothing has been done in the past 5 minutes (displayed in the JSON received as response from REST API) you will activate an InvokeHTTP in which you call again the REST API for STOPPING the ConsumeKafka Processor.
... View more
11-06-2023
12:44 AM
2 Kudos
@Chaitanya_, First of all, why the infinite loop? What is the point of that and what where you actually trying to achieve? ( just out of curiosity) Now, in terms of your problem, I can give you two possible scenarios which you could try, which might eventually help you: 1) Add a new processor on your Canvas (LogMessage for example) and try moving the queue from the center funnel towards the right funnel into the new processor. Please make sure that all your processors are stopped and disabled and your queues are empty. This should allow you to move the queue without any issues, while avoiding the infinite loop, which will eventually help you to remove the funnels from your canvas. Another thing, try also removing the purple highlighted queue, so you are certain that the loop is no longer a loop. Afterwards, you should be able to remove all the queues, starting from right to left. 2) This might be a little hard and requires plenty of attention (and it is not really recommended), but in times of desperation, you could try manually modifying the flow.xml.gz and the flow.json.gz and remove the parts of those funnels. You then can upload the new version of files in all NiFi nodes and you should no longer see those funnels on your canvas. However, before doing this, make sure that you create a backup of those files, in case you mess something up. Nevertheless, this is not really recommended so I highly advise you to try with the first solution before even trying this one. PS: make sure that all your nodes are up and running. Or stop the nodes, work on a single node and copy the flow.xml and flow.json to all the other nodes and start them. Hope it helps!
... View more
11-06-2023
12:28 AM
@user001, I do not know what your Sandbox environment is, but I just tested with NiFi 1.15 and the solution works like a charm. So either you are doing something wrong, or you have something configured incorrectly in your sandbox environment. Or maybe NiFI 1.18 is bugged - but I highly doubt it, as there would have been far more posts reporting similar issues.
... View more
11-02-2023
02:01 AM
1 Kudo
@user001, How I would do this: - Create an UpdateRecord Processor, where I define a JsonTreeReader, for reading the input file and a JsonRecordSetWriter, for writing the newly formatted file. - Within this same processor, I would add a new property where I define the path to the column you are trying to modify. In your case, assuming that this is the entire JSON File, you would have /Key/value[*]/Date. - The value for this newly generated property should be the transformation you are trying to achieve. In your case, it would be ${field.value:toDate("MM/yy"):format("yyyy-MM")} - Next, within the same exact processor, I would modify the Replacement Value Strategy from the default value into Literal Value. And that's pretty much all you have to do.
... View more
11-01-2023
02:42 AM
@Wadok88, The problem you are reporting is not related to the database and how it works, but to how you configured your NiFi Instance and especially your ZooKeeper. First of all, are you using embedded Zookeeper or external Zookeeper ? Did you configure the state-management.xml file and the nifi.properties file with the correct connection string for your Zookeeper nodes? Secondly, when using NiFi in a cluster manner, the Zookeeper is used to maintain the state of some processors within the entire cluster, meaning that those processors will try and attempt to use the state manager, even though it was not configured -- or in your case not configured correctly. So what I suggest you to do is check the zookeeper configurations from within NiFi. Next, set you processor on debug and check if NodeX is able to retrieve the state using the ZK and so on. Maybe you have a connectivity issue from a specific node.
... View more
10-22-2023
11:11 PM
1 Kudo
@AhmedParvez, have a look on what I told you in the previous post. EOF means that most likely you are sending the API call somehow wrong and you need to have a look at it. Try making that same exact API call from Postman or from Python (or actually anything you know how to use) and see if you encounter the same problem. Unfortunately this is not a NiFi Problem but a configuration problem and as you are very reluctant in providing all the details, nobody will be able to help you with your request 😞 Set your processor on debug and see what you can get extra from there as well. Maybe it will provide you with the necessary info to find your error.
... View more
10-19-2023
03:50 AM
@Kiranq, What did you configure in UpdateRecord? Most likely you problem starts from there.
... View more
10-18-2023
11:53 PM
1 Kudo
@Fanxxx, How I would do the first POC: 1) GetMongoRecord: execute the count on the first table. Using the property "Query Output Attribute" you save that value directly as an attribute. 2) connected to the success queue another GetMongoRecord: execute the count on the second table. Using the property "Query Output Attribute" you save that value directly as an attribute. 3) connected to the success queue an RouteOnAttribute: here you define a rules --> if count1=count2, do what you want to do, otherwise call the logic for the insert, as you said. (using NiFi Expression language: ${attribute1:equals(${attribute2})} )
... View more