Member since
11-24-2017
6
Posts
0
Kudos Received
0
Solutions
03-01-2019
09:42 PM
The following would purge all queues indiscriminately from NiFi using the python library nipyapi (assuming no auth) import nipyapi
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api
#Queues can be large - increase timeout
nipyapi.config.short_max_wait = 3600
for queue in nipyapi.canvas.list_all_connections():
print("clearing flow files from connection: " + queue.id, end=': ')
print("Successful") if nipyapi.canvas.purge_connection(queue.id) else print("Unsuccessful")
... View more
09-15-2018
08:53 PM
Hello, Lately I have been working on trying to ingest a set of rolling log files from a Unix machine using NiFi 1.7.1. I've gone through the documentation and what examples I could find but many of it is confusing and oversimplified. My set of logs and how the rotate is a bit awkward to deal with (i do not control it). Scenario: I have a folder of logs called /opt/logs. In this folder I have several similarly named log files: /opt/logs/application_george.log /opt/logs/application_alex.log Every 15 min the logs are rolled over by renaming them with an ascending numerical value. In this first rotations case it would turn the directory contents into: /opt/logs/application_george.log (NEW FILE) /opt/logs/application_george.1.log (OLD FILE - FIRST) /opt/logs/application_alex.log (NEW FILE) /opt/logs/application_alex.1.log (OLD FILE - FIRST) In the second rotations case it would turn the directory contents into: /opt/logs/application_george.log (NEW FILE) /opt/logs/application_george.1.log (OLD FILE - SECOND) /opt/logs/application_george.2.log (OLD FILE - FIRST) /opt/logs/application_alex.log (NEW FILE) /opt/logs/application_alex.1.log (OLD FILE - SECOND) /opt/logs/application_alex.2.log (OLD FILE - FIRST) This will continue until the maximum number of files is reached (lets say 4), after which the last file is deleted (so when it moves to the 5th rotation). Attached is my TailFile Processor, from NiFi 1.7.1, configuration. I am not sure if I have done it correctly as after a unknown period of time the state of the processor contains file.x.filenames that reflect the regular expression used in the configuration (ie. "application_[^\.]+\.log") which appear to have instances where one of the files in the directory has been re-read; as such causing duplication of events. Sometimes instead the state will show more than one of the same file listed; example: file.36.filename /opt/logs/application_alex.log file.19.filename /opt/logs/application_alex.log file.27.filename /opt/logs/application_alex.log I am not sure what I am doing wrong, if anything. Any guidance or suggestions are greatly appreciated. Thank you ~Regards
... View more
Labels:
01-12-2018
06:31 PM
Hello, Does anyone have any examples of a working setup for the NiFi processors "PublishKafka_0_11" and "ConsumeKafka_0_11" using the header feature available in in Kafka 0.11? My Kafka knowledge is week so it possible I am missing something on the configuration of the broker, but i wouldn't even know. Kafka is just running standalone with default config. Thank you ~Regards
... View more
Labels:
11-28-2017
03:52 PM
Hello, Is it possible to prevent the Back Pressure of one processor from affecting others? Such as in the case of duplicating data to a Dev/UAT environment in addition to a prod environment. For example, given the attached image, how would i prevent 'Processor A' from affecting the 'Main Processor' and 'Processor B' from operating when 'Processor A' queue becomes full? While their are options in queue settings such as "Back Pressure Object Threshold" and "Back Pressure Data Size Threshold" they are only good for dealing with bursting, processing dips, or brief infrastructure interruptions. Another option "FlowFile Expiration" exists and solve the majority of the concern by flushing older flow files from the queue, but under high volumes one still needs to resort to increasing the queue size overall. Is there no way to reach the max size of a queue and have NiFi drop older events (or newer) from the queue rather than put Back Pressure on the preceding processors? Thanks ~Regards
... View more
Labels:
11-28-2017
03:16 PM
Hi @Abdelkrim Hadjidj For the most part yes, especially since i didn't know how state functioned in XYZ processors. However in this setup the timestamp is always updated regardless of the success or failure of the invokehttp processor. In a failure scenario the poll would potential miss a period of time (and thus any events/data within) so the timestamp can only be updated under a success condition of a request. At the moment the only thing i can think of is using the distributed cache feature, but that seems like overkill ...
... View more
11-24-2017
08:04 PM
Hello, Have been recently investing a lot of time in learning and using NiFi for a multitude of cases. On in particular we have a hard time understand how to accomplish is pulling events from a remote restful API endpoint with a rolling time window. We have successfully used invokehttp processor to pull the desired events using a specific query that includes a time parameter (in this case its an epoch timestamp). While this is great the timestamp is static and the goal would be to update the timestamp with the time in which the last successful invokhttp request occurred. Does anyone know how this would be accomplished within NiFi? Thanks ~Regards
... View more
Labels: