Member since
11-16-2015
911
Posts
668
Kudos Received
249
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 706 | 09-30-2025 05:23 AM | |
| 1076 | 06-26-2025 01:21 PM | |
| 933 | 06-19-2025 02:48 PM | |
| 1104 | 05-30-2025 01:53 PM | |
| 12290 | 02-22-2024 12:38 PM |
06-08-2017
06:36 PM
4 Kudos
In addition to @bhagan's answer, as of NiFi 1.2.0 (due to NIFI-3010), you can use NiFi Expression Language in JOLT specifications, so with a JoltTransformJSON processor you could have the following Chain spec: [{
"operation": "default",
"spec": {
"*": {
"ctime": "${now()}"
}
}
}] If you want the ctime field to contain a string with the number of milliseconds since Epoch, you can use now():toNumber() instead of just now(). If you want an actual numeric value (rather than a String containing the numeric value), you need an additional operation to change the value to a long: [{
"operation": "default",
"spec": {
"*": {
"ctime": "${now():toNumber()}"
}
}
}, {
"operation": "modify-overwrite-beta",
"spec": {
"*": {
"ctime": "=toLong"
}
}
}]
... View more
06-08-2017
05:59 PM
Can you provide some more details about your use case? It appears that you'd like to do the above except that the "ext_chkmk_hosts" table is not on the same Postgres instance as checkmk_hosts, meaning you can't just run that SQL statement as-is. If my above assumption is correct, then you should be able to use QueryDatabaseTable for the "select" part of your statement above; after being configured with a database connection pool, table name, max-value column (which in your case is ctime), etc. then when it executes, it will get all the rows from ext_checkmk_hosts whose ctime value is greater than the last time the processor ran. QueryDatabaseTable outputs its records in Avro format. Prior to NiFi 1.2.0, you can follow that with ConvertAvroToJSON then ConvertJSONToSQL, then PutSQL. This flow would perform one INSERT for each record in the source table, so you will find it less performant than what an INSERT INTO ... SELECT statement would normally be. As of NiFi 1.2.0, you can connect your QueryDatabaseTable to a PutDatabaseRecord processor, using an AvroReader configured with a schema specifying the field names and their datatypes. One possible schema (based on a glance at your field names) might be: {
"type": "record",
"name": "ServiceStatusRecord",
"fields" : [
{"name": "ctime", "type": "long", logicalType: "timestamp-millis"},
{"name": "host_state", "type": ["null", "string"]},
{"name": "host", "type": ["null", "string"]},
{"name": "host_icons", "type": ["null", "string"]},
{"name": "num_services_ok", "type": "int"},
{"name": "num_services_warn", "type": "int"},
{"name": "num_services_unknown", "type": "int"},
{"name": "num_services_crit", "type": "int"},
{"name": "num_services_pending", "type": "int"}
]
} You would point PutDatabaseRecord at your target database, setting the table name to checkmk_hosts, etc.
... View more
06-08-2017
03:29 PM
2 Kudos
For this kind of custom logic, it is probably possible to use some combination of standard processors, but that might result in an overcomplicated and brittle flow. An alternative is to use a scripting processor such as ExecuteScript or InvokeScriptedProcessor, although that requires knowledge of a scripting language such as Groovy, Javascript, Jython, JRuby, Clojure, or Lua, and also the NiFi Java API. Here is an example Groovy script for ExecuteScript that takes your original input and generates the specified flow files from it: import java.nio.charset.StandardCharsets
def flowFile = session.get()
if(!flowFile) return
def flowFiles = [] as List<FlowFile>
def inputStream = session.read(flowFile)
inputStream.eachLine { line ->
s = line.tokenize('|')
def prefix = s[0..2]
def numRecords = Integer.parseInt(s[3])
def leftoverFieldsIndex = 4 + numRecords
(4..leftoverFieldsIndex-1).each { i ->
def newFlowFile = session.create(flowFile)
newFlowFile = session.write(newFlowFile, { outputStream ->
outputStream.write( (prefix + s[i] + s[leftoverFieldsIndex..-1]).join('|').getBytes(StandardCharsets.UTF_8) )
} as OutputStreamCallback)
flowFiles << newFlowFile
}
}
session.transfer(flowFiles, REL_SUCCESS)
session.remove(flowFile) This script extracts the number of fields to follow, then for each of those fields, it creates a flow file containing the "prefix" fields, the current field value, and the "postfix" fields as you described above.
... View more
06-08-2017
01:59 PM
4 Kudos
1) You can execute a Python script (assuming you have Python installed locally) with ExecuteProcess or ExecuteStreamCommand. The former does not accept incoming connections, so your script would have to locate and use the Avro files on its own. The latter does accept incoming connections, so you can use ListFile -> FetchFile to get the Avro files into NiFi, then route those to ExecuteStreamCommand, where the contents will be available via STDIN to your script. 2) If your script takes paths to filenames, you can forgo the FetchFile processor and just send the results of ListFile to ExecuteStreamCommand. Or you could update your script to take STDIN as described above. 3) ExecuteStreamCommand writes an "execution.status" attribute containing the exit code for your script, so you can return > 0 for error condition(s) and use RouteOnAttribute to send the flow files down various paths for error-handling and what-not. 4) Alternatively, rather than going back out to the file system, if you can (in your script) write the contents of the file to STDOUT instead of a file, then you can route ExecuteStreamCommand directly to PutHDFS. This keeps the file contents "under management" by NiFi, which is great for provenance/lineage, since you will be able to use NiFi to see what the contents look like at each step of the flow. If (as you described) you want to write to an output file, then after the script is finished writing the file, you can write out the path to the file to STDOUT and choose an Output Destination Attribute of something like "absolute.path.to.file" in ExecuteStreamCommand. Then you can route to FetchFile and set the File To Fetch to "${absolute.path.to.file}", then a PutHDFS processor to land the file in Hadoop. Just for completeness (this won't apply to you as you're using numpy which is a CPython module): if you are using "pure" Python modules (meaning their code and dependencies are all Python and do not use natively compiled modules -- no CPython or .so/.dll, etc. -- you can use ExecuteScript or InvokeScriptedProcessor. This uses a Jython (not Python) engine to execute your script. These processors give you quite a bit more flexibility (albeit some more boilerplate code) as you can work with flow file attributes, content, controller services, and other NiFi features while inside your Python code. However the tradeoff (as mentioned) is needing pure Python modules/scripts.
... View more
06-07-2017
01:33 PM
4 Kudos
As of NiFi 1.2.0, you can use ConvertRecord, after configuring an AvroReader with the short schema and an AvroRecordSetWriter with the super schema. Prior to NiFi 1.2.0, you may be able to use ConvertAvroSchema, using the super-schema as both Input and Output Schema property values (if you use the short schema as Input, the processor complains about the unmapped fields in the super schema). I tried this by adding a single field to my "short schema" to make the "super schema": {"name": "extra_field", "type": "string", "default": "Hello"} I'm not sure if this will work with arbitrary super-sets, but it is worth a try 🙂
... View more
06-07-2017
01:10 PM
As of NiFi 1.2.0, you can use Expression Language in JOLT specifications (NIFI-3010). The following Chain spec adds the ctime field to your input with the current timestamp: [{
"operation": "default",
"spec": {
"*": {
"ctime": "${now()}"
}
}
}] If you want ctime you have an integer rather than a string, you can use the following EL expression: ${now():toNumber()}
... View more
06-07-2017
12:57 PM
What version of NiFi/HDF are you using? Also, are all those HiveConnectionPool instances valid (meaning they are used in various parts of your flow, in process groups, etc.)? If not, try deleting the invalid ones. Once you configure and save your FPCluster-HiveConnectionPool, it should be available to the Hive processors, so I'm not sure why you're not seeing it.
... View more
06-07-2017
02:00 AM
1 Kudo
NiFi doesn't come with a PostgreSQL driver, you will need to get one and point to its location in the "Database Driver Jar Url" property in the DBCPConnectionPool configuration dialog above. If you already have one and have placed it in NiFi's lib/ directory, this can cause problems and is not recommended. Best practice is to put JDBC drivers in a separate location and refer to that location in the "Database Driver Jar Url" property.
... View more
06-07-2017
01:27 AM
1 Kudo
The HiveConnectionPool controller service needs to be configured to connect to a Hive instance. If you edit the controller service (using the pencil icon on the right side of the controller service entry in your Process Group Configuration window above, it will list the properties (required properties are in bold). The Database Connection URL is probably something like jdbc:hive2://sandbox.hortonworks.com:10000/default. The Hive Configuration Resources property should point at a comma-separated list of configuration files (usually core-site.xml and hive-site.xml at least). If you have username/password authentication set up for your Hive instance, you will need to supply values for those properties as well. Alternatively if your Hive instance is secured with Kerberos, you will need to supply values for those properties. Once your HiveConnectionPool has been configured correctly (and saved by hitting the Apply button), you will need to enable it by clicking the lightning bolt icon (also on the right side near the pencil icon) then the Enable button. Return to the canvas and either route the "success" and/or "failure" connections from PutHiveQL to some other processor, or auto-terminate the relationship(s) by opening the Configuration dialog for PutHiveQL and selecting the checkboxes for the relationship(s) under the "Auto-terminate relationships" section. At that point your PutHiveQL processor should be valid (you will see a red square icon in the upper-left corner of the processor instead of the yellow triangle).
... View more
06-05-2017
01:58 PM
1 Kudo
You can use the JoltTransformJSON processor for this. Here is a Chain spec that should get you what you want: [
{
"operation": "shift",
"spec": {
"0": {
"*": "header"
},
"*": "data[]"
}
},
{
"operation": "shift",
"spec": {
"data": {
"*": {
"*": {
"@0": "[&2].@(4,header[&])"
}
}
}
}
}
] The first operation splits the top array into "header" (containing the first element) and "data" (containing the rest). The second operation creates an array of objects, each object having the keys from the header associated with the values from each element in the "data" array. With your input above, I get the following output: [ {
"host" : "DUSTSADMIN.ads.xyz.de",
"host_icons" : "menu",
"host_state" : "UP",
"num_services_crit" : "0",
"num_services_ok" : "28",
"num_services_pending" : "0",
"num_services_unknown" : "0",
"num_services_warn" : "0"
}, {
"host" : "DUSTSVMDC01.ads.xyz.de",
"host_icons" : "menu",
"host_state" : "UP",
"num_services_crit" : "0",
"num_services_ok" : "34",
"num_services_pending" : "0",
"num_services_unknown" : "0",
"num_services_warn" : "0"
}, {
"host" : "DUSTSVMDC02.ads.xyz.de",
"host_icons" : "menu",
"host_state" : "UP",
"num_services_crit" : "0",
"num_services_ok" : "34",
"num_services_pending" : "0",
"num_services_unknown" : "0",
"num_services_warn" : "0"
} ]
... View more