Member since
11-16-2015
905
Posts
666
Kudos Received
249
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 442 | 09-30-2025 05:23 AM | |
| 780 | 06-26-2025 01:21 PM | |
| 679 | 06-19-2025 02:48 PM | |
| 866 | 05-30-2025 01:53 PM | |
| 11450 | 02-22-2024 12:38 PM |
04-06-2017
02:07 PM
Try the following: Route the "failure" relationship to some processor you don't start (like UpdateAttribute) When a flow file fails, right-click on the failure connection and choose List Queue Click the info button (an "i" in a circle) to the left of the flow file, then choose View to see the contents Does that look like a valid SQL statement? Can you try running it from the command line or in a separate PutSQL instance? Also since you are getting the error after time, perhaps the connection is going idle and then failing. Is there any other error information around the error in the log file you mention above? Sometimes there is a "Caused by" line below it that refers to the lower-level error. What version of NiFi are you using? As of NiFi 1.1.0, you can supply a Validation Query to the DBCPConnectionPool, this ensures that the connection is valid. A Validation Query is a query that does as little work as possible while still establishing connectivity to the server. An example of this (for Oracle) is "select 1 from dual". If you have that property available to you, please try something like that as the Validation Query, it may alleviate this issue if it is intermittent and related to idle time.
... View more
03-30-2017
12:59 PM
1 Kudo
Try using the full path to powershell.exe as the command, rather than just "powershell.exe". It looks like the JVM doesn't have access to the same PATH variable that includes the location of that executable.
... View more
03-29-2017
03:26 PM
1 Kudo
An alternative to using an environment variable is to use the file-based Variable Registry, where you could create a file perhaps called var_registry.properties with the entry PGPASSWORD=<my_pg_password> And edit conf/nifi.properties to use that file for the registry: nifi.variable.registry.properties=var_registry.properties Then you could use PGPASSWORD in your groovy script the same way you'd use an environment variable: ${PGPASSWORD}
... View more
03-29-2017
02:22 PM
2 Kudos
The HTTP port defaults to 8080 but can be set in conf/nifi.properties: nifi.web.http.port=8080 You can also set up HTTPS (please see the Security Configuration section of the NiFi Admin Guide for more details). NiFi uses an embedded Jetty, it is not possible at this time to use a different engine.
... View more
03-29-2017
02:00 PM
I assume you are using a PostgreSQL version before 9.5. If not, you could try INSERT ... ON CONFLICT DO UPDATE. If you're using ConvertJSONToSQL that won't be an option, but you may be able to use EvaluateJsonPath -> ReplaceText to generate your own SQL statement instead. If you do have a PG version < 9.5, then "upsert" is a known and common issue with PostgreSQL (and many/most/all relational database systems). You could try the example cited by the article, but that likely involves a stored function. This is a kind of replication use case, where "change data" is coming from a source system and needs to be applied at a target system. Work is underway in NiFi for a Change Data Capture (CDC) processor, the first one will support only MySQL (NIFI-3413) but should provide APIs and/or a mini-framework such that other CDC processors could be written for various databases such as PostgreSQL.
... View more
03-29-2017
12:52 AM
AFAIK, there is no current capability for this, as GetFile/ListFile detect existing files, and GetFile/FetchFile sometimes handle deletes (if they are the ones deleting them). Perhaps a custom (hopefully shared with the Apache NiFi community?) processor called WatchFile would be prudent. It could implement the WatchService API and generate (perhaps empty) flow files whose attributes reflect the file and its change in state.
... View more
03-28-2017
06:12 PM
1 Kudo
There are examples of individual "recipes" in all sorts of languages on my 3-part ExecuteScript Cookbook article.
... View more
03-27-2017
07:38 PM
The aforementioned issue has been fixed as of HDF 2.1. Rather than downgrade your HDP, can you upgrade your HDF?
... View more
03-26-2017
01:27 AM
This looks like a bug in the processor, which processor is the "split" relationship going to? If a custom processor, can you share part/all of the relevant code? If it is a built-in processor please feel free to file a Jira as this should be handled better, if it is a custom processor, make sure you are keeping a reference to the latest version of the flow file (new versions are returned by various ProcessSession methods) and only transfer the latest version. A common error is when a custom processor/script forgets to save off the latest flow file reference from a call to ProcessSession.putAttribute() for example.
... View more
03-24-2017
09:02 PM
It is Option #2 above, the file contents are put in the content repo, and the filename is added as an attribute. If you want the file to reside on the local file system, you can use PutFile before your ExecuteScript processor, and you can get at the filename by getting the flow file from the session: def flowFile = session.get() and getting the "filename" attribute from it: def filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()) In your case, it doesn't seem like you really need a file per se; rather you need to get its contents to the "psql" command. For that I'd recommend using ProcessBuilder instead of just copyCommand.execute(). You might be able to do something like this (NOTE: not a complete script, just a snippet): import java.io.*
def pb = new ProcessBuilder('psql --host=<AWS RDS> --port=5432 --dbname=<AWS DB name> --username=<AWS DB user> --no-password --command="copy kiran_test.poc_test from STDIN"')
def p = pb.start()
def psqlStdin = p.getOutputStream()
// Something like:
// def bw = new BufferedWriter(new OutputStreamWriter(psqlStdIn))
// if you are using the Writer paradigm, or use PipedInputStream and PipedOutputStream
session.read(flowFile, {inputStream ->
// write from inputStream (i.e. flow file contents) to psqlStdIn (aka "pipe to psql")
} as InputStreamCallback)
This should alleviate the need to write out the file(s) to the file system, although it becomes a more complex script. A complete alternative is a PostgreSQL Bulk Loader processor or something, that can do all this with the driver 🙂
... View more