Member since
07-30-2019
2922
Posts
1450
Kudos Received
850
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
45 | 05-06-2024 10:40 AM | |
57 | 05-03-2024 08:41 AM | |
162 | 04-26-2024 06:40 AM | |
214 | 04-25-2024 06:16 AM | |
511 | 04-23-2024 05:56 AM |
08-15-2022
07:50 AM
@VJ_0082 Since your log is being generated on a remote server, You will need to use a processor that can remotely connect to the exteranl server to retrieve that log Possible designs: 1. The could incorporate a FetchSFTP processor in to your existing flow. I assume your existing RouteOnAttriibute processor is checking for when an error happens with your script? If so, add the FetchSFTP processor between this processor and your PutEmail processor. Configured the FetchSFTP processor (configured with "Completion Strategy" of DELETE) fetch the specific log file created. This dataflow assumes the log's filename is always the same. 2. This second flow could be built using the ListSFTP (configured with filename filter) --> FetchSFTP --> any processors you want to use to manipulate log --> PutEmail. The ListSFTP processor would be configured to execute on "primary" node and be configured with a "File Filter Regex". When your 5 minute flow runs and if it encounters an exception resulting in the creation of the log file, this listSFTP processor will see that file and list it (0 byte FlowFile). That FlowFile will have all the FlowFile attributes needed for the FetchSFTP processor (configured with "Completion Strategy" of DELETE) to fetch the log which is added to the content of the existing FlowFile. If you do not need to extract from or modify that content, your next processor could just be the PutFile processor. If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more
08-11-2022
11:49 AM
@VJ_0082 I apologize if I am still not clear about your Script being executed by ExecuteStream command. The Script is writing to some log file on disk where (locally on the NiFi node host or some external server where NiFi is not located) when it fails/errors? I assume your flow is passing the "original" FlowFile with a new FlowFile Attribute "execution.error" with your exception/error in it? Does this attribute contain the details you want to send in your email? Then you are routing based on what attribute from Original FlowFile (execution.status)? How is your PutEmail processor configured. It can be configured to send either a FlowFiIe attribute or FlowFile content. If the content you to sent via PutEmail is not in the Content of the FlowFile and is also Not in a FlowFile Attribute on the FlowFile, but rather written to some location on disk, you would need a separate dataflow that watches that log file on disk for new entries ingesting them into a FlowFile that feeds a putEmial processor. This could be accomplished using the TailFile processor. If that content is being written out a log file on remote system that becomes a different challenge. If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more
08-11-2022
11:23 AM
1 Kudo
@Ray82 Thank you for the additional details. It appears we have a disconnect in NiFi terminology being used. A NiFi FlowFile is the object that is passed from one component to another via connection on the NiFi UI canvas. A FlowFile consists of two parts: 1. FlowFile Content - This is the content for the FlowFile and is stored in the NiFi content repository within a content claim. A single content claim may contain the content fro one too many FlowFiles. 2. FlowFile Attributes/Metadata - Written to the NiFi FlowFile repository. This is a collection of Metadata about the FlowFile such as which claim contains the content for the FlowFile. The offset within that claim where the content starts and then the length of the content. It also contains attributes associated to the specific FlowFile like filename, size, etc. User can also add additional FlowFile Attributes. These FlowFile attributes are not part of the content of the FlowFile. - Your use case starts with an existing FlowFile with specific content - So I am guessing that you are extracting portions of content from your original FlowFile and assigning them to NiFi FlowFile attributes? - Then using those FlowFile attributes in the SQL query executed by the ExecuteSQL processor? The ExecuteSQL processor is designed to write the SQL query response to the content of the FlowFile (new content, does not append to existing content) - What you really want to do is preserve the original FlowFile and add another FlowFile Attribute to it that was retrieved using your ExecuteSQL overwriting the original FlowFile content? Maybe you could use the DistributedMapCache processors to accomplish this: The processor in the upper right corner is an UpdateAttribute processor. It is used to create a cacheID that will persist on both FlowFiles output from this processor (original and clone since "success" relationship was drawn twice). It simply takes the UUID from original FlowFile and adds it to each FlowFile in the FlowFile attribute "cacheID". Then you would still have your ExecuteSQL and ExtractText flow to replace content with just "A3" model. Then I configure both my PutDistributedMapCache and FetchDistributedMapCache processors to use "${cacheID} as the "Cache Entry Identifier". The FlowFile with the original content will move to the FetchDistributedMapCache processor where it will loop in the "not.found" relationship connection until the other FlowFile using same cacheID value writes you model "A3" to that unique cache entry via the PutDistributedMapCache (writes content of FlowFile to cache entry) processor. FetchDistributedMapCache: PutDistributedMapCache: In my example I am using the "DistributedMapCacheClientService" (simply because it is quick and easy), but there are better options that offer High Availability (HA). The DistributedMapCacheClientService requires that NiFi also has a DistributedMapCacheServer Controller service for it talk to and store your cache entries in. If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more
08-11-2022
11:09 AM
@PradNiFi1236 What do you see in the nifi-user.log on the NiFi receiving the FlowFiles from this reporting task? I would expect it to show the 401 and hopefully the identity string that was used to identify the client. Matt
... View more
08-10-2022
01:54 PM
1 Kudo
@kellerj You can use the NiFi-Registry Rest-Api to change the description on an existing version controlled flow. https://nifi.apache.org/docs/nifi-registry-docs/rest-api/index.html#updateFlow curl -X 'PUT' 'https://<nifi-registry-hostname>:<nifi-registry-port>/nifi-registry-api/buckets/<bucket UUID>/flows/<versioned flow UUID>' -H 'Authorization: Bearer <token>' -H 'Content-Type: application/json' --data-raw '{"bucketIdentifier":"<bucket UUID>","description":"new-description","name":"<versioned flow name>","type":"Flow"}' --compressed --insecure My NiFi-Registry is secured and uses a login provider for user authentication. If you are doing the same, you will need to obtain a bearer token for your user who has read/write/delete on the bucket containing the flow you want to modify. The rest of what you need to make this rest-api call can be obtained via the NiFi-registry's UI for the flow to be modified: Using above example: <versioned flow name> = "test" <bucket uuid> = "17ca6981-c4a1-418f-807f-ab0b72a997ff" <versioned flow UUID> = "fa88a095-6867-45de-9dba-23e828224d3d" If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more
08-10-2022
11:36 AM
@SandyClouds The ExecuteSQL processors do not support SSH tunnel. The expectation by these processors is that the SQL server is listening on a port reachable on the network. SSH tunnels are used to access the server via remotely and then execute a command locally on that SQL utilizing the SQL client on that destination server. The ExecuteSQL processor uses a DBCPConnectionPool to facilitate the connection to the database. The DBCPConnectionPool establishes a pool of connections used by one too many processors sharing this connection to execute their code. A Validation Query is very import to make sure a connection from this pool is still good before being passed to requesting processor for use. While I have not done this myself, I suppose you could set up and SSH tunnel on each NiFi cluster server (example: https://linuxize.com/post/mysql-ssh-tunnel/). Then you could still use the DBCPConnectionPool except use the established tunnel address and port in the database connection URL. Downside to this is that NiFi has not control over that tunnel, so if the tunnel is closed, your dataflow will stop working until the tunnel is re-established. The Validation Query will verify the connection is still good. If it is not, the DBCPConnectionPool will drop it and try to establish a new connection. If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more
08-10-2022
11:02 AM
@Ray82 I am assuming you are using the ExecuteSQL processor to execute the SQL Select statement example you shared. The response would be written to the content of the FlowFile pass to the success relationship. You could use the ExtractText processor to extract content from the FlowFile and assign it to a new FlowFile attribute you name "model". If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more
08-10-2022
06:03 AM
@VJ_0082 So you custom PythonScript executes and when it errors, a log is written on some host other than the host where NiFi is running? The GetFile processor can only get files from the local filesystem. I can get files from a remote filesystem (unless that remote file system is locally mounted). What is the error and stack trace your are seeing in the nifi-app.log when your PutEmail processor executes against your source FlowFile? Do you have a sample of FlowFile content being passed to the PutEmail processor? How has your PutEmail processor been configured? Matt
... View more
08-10-2022
05:56 AM
1 Kudo
@Nifi- You'll need to provide more detail around your use case in order to get more specific assistance. NiFi offers a number of processor components that can be used to ingest from a database: - ExecuteSQL - ExecuteSQLRecord - CaptureChangeMySQL <-- probably what you are looking for These ExecuteSQL processors will utilize a DBCPConnectionPool controller service for connecting to your specific Database of choice. SQL is what is needs to passed to these processors in order to fetch database table entries. The following processors are often used to generate the SQL in different ways needed by your use case to do this in an incremental fashion (for example: generating new SQL for new entries only so you are not fetching entire table over and over) - GenerateTableFetch - QueryDatabaseTable - ListDatabaseTable The CaptureChangeMySQL processor will output FlowFiles for each individual event. You can then construct a dataflow to write these events to your choice of location. That might be some other database. Once you have your dataflow created for ingesting entries from your table in to NiFi, you'll need to use other processors within your dataflow for any routing or manipulation of that ingested data you may want to do before sending to a processor to write to the desired destination. Possibly using PutDatabaseRecord processor for example? If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more
08-08-2022
05:43 AM
1 Kudo
@nk20 I am confused by your concern about in memory state. Can you provide more detail around what you are being told or what you have read that has lead to this concern? Perhaps those concerns are about something more than component state? Perhaps I can address those specific concerns. Not all NiFi components retain state. Those that do either persist that state to disk in a local state directory or write that state to zookeeper. As long as that local disk where state directory is persisted is not lost and the Zookeeper has quorum (min three nodes), then you have your state protected for your NiFi components that write state. Out of all the components (processors, controller services, reporting tasks, etc), there are only about 25 that record state. The only thing that lives in memory only is component status (in, out, read, write, send, received). These are 5 minute stats that live in memory and thus any restart of the NiFi service would set these stats back to 0. These have nothing to do with the FlowFiles or execution of the processor. If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more