Member since
11-16-2015
905
Posts
664
Kudos Received
249
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 252 | 09-30-2025 05:23 AM | |
| 665 | 06-26-2025 01:21 PM | |
| 510 | 06-19-2025 02:48 PM | |
| 758 | 05-30-2025 01:53 PM | |
| 11000 | 02-22-2024 12:38 PM |
06-11-2017
09:15 PM
1 Kudo
If you want to use a PreparedStatement (which can improve performance if PutSQL operates on a number of records up to the specified Batch Size), then your SQL could be: insert into Table(X, Y, Z) values(?,?,?) To use this you'd have to set up your attributes as follows: sql.args.1.value=<value of X>
sql.args.1.type=<type of X's value, see here for numerical values of datatypes>sql.args.2.value=<value of Y>sql.args.2.type=<type of Y's value>sql.args.3.value=<value of Z>sql.args.3.type=<type of Z's value> If you know the types of the columns, you can do all this work with a single UpdateAttribute processor before the PutSQL processor. For example, if X and Y were integers and Z was a string/varchar: sql.args.1.value=${X}sql.args.1.type=4sql.args.2.value=${Y}sql.args.2.type=4sql.args.3.value=${Z}sql.args.3.type=12 See the PutSQL documentation (especially the "Reads Attributes" section) for more information. If you don't know the types or order of your columns, then I recommend @Eyad Garelnabi's solution instead.
... View more
06-11-2017
08:54 PM
It looks like you are trying to connect via the Zookeeper port, but there are some issues with this: You'll likely have trouble because of the issue described in NIFI-2575. Even if that were not the issue, the following two things would have to be done. I believe you'd have to set some variables in the URL (such as serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2) The Zookeeper port is not exposed via the sandbox by default, you would have to forward that port if using NAT. For these reasons, I recommend you connect via the standard HiveServer2 port (10000), which is exposed by default for the sandbox's NAT configuration.
... View more
06-09-2017
10:07 PM
Are you using the JDBC Driver for HAWQ? If so, you should be able to use PutSQL to execute your INSERT...SELECT statement as-is. If you have tried this, what error(s) are you getting or did you have any trouble configuring things?
... View more
06-08-2017
07:45 PM
True, if you are just inserting a single new field at the top-level of a flat JSON doc, this should work fine. However you may want an optional newline between the array and the first object if the JSON is pretty printed. Also, would this work if you had a nested array of objects further down in the file? I guess you could put in a start-of-line into the regex, but if the JSON has weird whitespacing you may run into the same problem. I recommend JOLT in general because it handles that kind of thing, but if you know what your input JSON looks like, this solution is simple and works well.
... View more
06-08-2017
06:36 PM
4 Kudos
In addition to @bhagan's answer, as of NiFi 1.2.0 (due to NIFI-3010), you can use NiFi Expression Language in JOLT specifications, so with a JoltTransformJSON processor you could have the following Chain spec: [{
"operation": "default",
"spec": {
"*": {
"ctime": "${now()}"
}
}
}] If you want the ctime field to contain a string with the number of milliseconds since Epoch, you can use now():toNumber() instead of just now(). If you want an actual numeric value (rather than a String containing the numeric value), you need an additional operation to change the value to a long: [{
"operation": "default",
"spec": {
"*": {
"ctime": "${now():toNumber()}"
}
}
}, {
"operation": "modify-overwrite-beta",
"spec": {
"*": {
"ctime": "=toLong"
}
}
}]
... View more
06-08-2017
05:59 PM
Can you provide some more details about your use case? It appears that you'd like to do the above except that the "ext_chkmk_hosts" table is not on the same Postgres instance as checkmk_hosts, meaning you can't just run that SQL statement as-is. If my above assumption is correct, then you should be able to use QueryDatabaseTable for the "select" part of your statement above; after being configured with a database connection pool, table name, max-value column (which in your case is ctime), etc. then when it executes, it will get all the rows from ext_checkmk_hosts whose ctime value is greater than the last time the processor ran. QueryDatabaseTable outputs its records in Avro format. Prior to NiFi 1.2.0, you can follow that with ConvertAvroToJSON then ConvertJSONToSQL, then PutSQL. This flow would perform one INSERT for each record in the source table, so you will find it less performant than what an INSERT INTO ... SELECT statement would normally be. As of NiFi 1.2.0, you can connect your QueryDatabaseTable to a PutDatabaseRecord processor, using an AvroReader configured with a schema specifying the field names and their datatypes. One possible schema (based on a glance at your field names) might be: {
"type": "record",
"name": "ServiceStatusRecord",
"fields" : [
{"name": "ctime", "type": "long", logicalType: "timestamp-millis"},
{"name": "host_state", "type": ["null", "string"]},
{"name": "host", "type": ["null", "string"]},
{"name": "host_icons", "type": ["null", "string"]},
{"name": "num_services_ok", "type": "int"},
{"name": "num_services_warn", "type": "int"},
{"name": "num_services_unknown", "type": "int"},
{"name": "num_services_crit", "type": "int"},
{"name": "num_services_pending", "type": "int"}
]
} You would point PutDatabaseRecord at your target database, setting the table name to checkmk_hosts, etc.
... View more
06-08-2017
03:29 PM
2 Kudos
For this kind of custom logic, it is probably possible to use some combination of standard processors, but that might result in an overcomplicated and brittle flow. An alternative is to use a scripting processor such as ExecuteScript or InvokeScriptedProcessor, although that requires knowledge of a scripting language such as Groovy, Javascript, Jython, JRuby, Clojure, or Lua, and also the NiFi Java API. Here is an example Groovy script for ExecuteScript that takes your original input and generates the specified flow files from it: import java.nio.charset.StandardCharsets
def flowFile = session.get()
if(!flowFile) return
def flowFiles = [] as List<FlowFile>
def inputStream = session.read(flowFile)
inputStream.eachLine { line ->
s = line.tokenize('|')
def prefix = s[0..2]
def numRecords = Integer.parseInt(s[3])
def leftoverFieldsIndex = 4 + numRecords
(4..leftoverFieldsIndex-1).each { i ->
def newFlowFile = session.create(flowFile)
newFlowFile = session.write(newFlowFile, { outputStream ->
outputStream.write( (prefix + s[i] + s[leftoverFieldsIndex..-1]).join('|').getBytes(StandardCharsets.UTF_8) )
} as OutputStreamCallback)
flowFiles << newFlowFile
}
}
session.transfer(flowFiles, REL_SUCCESS)
session.remove(flowFile) This script extracts the number of fields to follow, then for each of those fields, it creates a flow file containing the "prefix" fields, the current field value, and the "postfix" fields as you described above.
... View more
06-08-2017
01:59 PM
4 Kudos
1) You can execute a Python script (assuming you have Python installed locally) with ExecuteProcess or ExecuteStreamCommand. The former does not accept incoming connections, so your script would have to locate and use the Avro files on its own. The latter does accept incoming connections, so you can use ListFile -> FetchFile to get the Avro files into NiFi, then route those to ExecuteStreamCommand, where the contents will be available via STDIN to your script. 2) If your script takes paths to filenames, you can forgo the FetchFile processor and just send the results of ListFile to ExecuteStreamCommand. Or you could update your script to take STDIN as described above. 3) ExecuteStreamCommand writes an "execution.status" attribute containing the exit code for your script, so you can return > 0 for error condition(s) and use RouteOnAttribute to send the flow files down various paths for error-handling and what-not. 4) Alternatively, rather than going back out to the file system, if you can (in your script) write the contents of the file to STDOUT instead of a file, then you can route ExecuteStreamCommand directly to PutHDFS. This keeps the file contents "under management" by NiFi, which is great for provenance/lineage, since you will be able to use NiFi to see what the contents look like at each step of the flow. If (as you described) you want to write to an output file, then after the script is finished writing the file, you can write out the path to the file to STDOUT and choose an Output Destination Attribute of something like "absolute.path.to.file" in ExecuteStreamCommand. Then you can route to FetchFile and set the File To Fetch to "${absolute.path.to.file}", then a PutHDFS processor to land the file in Hadoop. Just for completeness (this won't apply to you as you're using numpy which is a CPython module): if you are using "pure" Python modules (meaning their code and dependencies are all Python and do not use natively compiled modules -- no CPython or .so/.dll, etc. -- you can use ExecuteScript or InvokeScriptedProcessor. This uses a Jython (not Python) engine to execute your script. These processors give you quite a bit more flexibility (albeit some more boilerplate code) as you can work with flow file attributes, content, controller services, and other NiFi features while inside your Python code. However the tradeoff (as mentioned) is needing pure Python modules/scripts.
... View more
06-07-2017
01:33 PM
4 Kudos
As of NiFi 1.2.0, you can use ConvertRecord, after configuring an AvroReader with the short schema and an AvroRecordSetWriter with the super schema. Prior to NiFi 1.2.0, you may be able to use ConvertAvroSchema, using the super-schema as both Input and Output Schema property values (if you use the short schema as Input, the processor complains about the unmapped fields in the super schema). I tried this by adding a single field to my "short schema" to make the "super schema": {"name": "extra_field", "type": "string", "default": "Hello"} I'm not sure if this will work with arbitrary super-sets, but it is worth a try 🙂
... View more
06-07-2017
01:10 PM
As of NiFi 1.2.0, you can use Expression Language in JOLT specifications (NIFI-3010). The following Chain spec adds the ctime field to your input with the current timestamp: [{
"operation": "default",
"spec": {
"*": {
"ctime": "${now()}"
}
}
}] If you want ctime you have an integer rather than a string, you can use the following EL expression: ${now():toNumber()}
... View more