Member since
11-16-2015
905
Posts
665
Kudos Received
249
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 431 | 09-30-2025 05:23 AM | |
| 767 | 06-26-2025 01:21 PM | |
| 660 | 06-19-2025 02:48 PM | |
| 848 | 05-30-2025 01:53 PM | |
| 11387 | 02-22-2024 12:38 PM |
03-21-2019
12:55 PM
Instead of extracting the content into attributes (which will effectively turn the JSON objects into Strings), you should be able to add the core attributes using UpdateRecord or JoltTransformJSON/JoltTransformRecord. For the record processors, it looks like you could use the following schema for the reader: {
"namespace": "nifi",
"name": "myRecord",
"type": "record",
"fields": [
{"name": "headers","type": {"type": "map","values": "string"}},
{"name": "info","type": {"type": "map","values": "string"}},
{"name": "payload","type": {"type": "map","values": "string"}}
]
} For the subflow to add core attributes, you'd want the writer schema to include the additional fields such as kafka.partition, then in UpdateRecord for example, you could add a user-defined property /kafka.partition with value 1 (and Replacement Strategy "Literal Value"). If in the core attribute subflow you want to remove the payload field, you can just remove it from the writer schema, and it won't be included in the output. For example: {
"namespace": "nifi",
"name": "myRecord",
"type": "record",
"fields": [
{"name": "headers","type": {"type": "map","values": "string"}},
{"name": "info","type": {"type": "map","values": "string"}},
{"name": "uuid","type": "string"},
{"name": "kafka.topic","type": "string"},
{"name": "kafka.partition","type": "string"},
{"name": "kafka.offset","type": "long"}
]
} If I understand correctly, your headers+info+payload content would remain the same as the original. If instead you are trying to collapse all the key-value pairs from headers, info, etc. and possibly add core attributes, then JoltTransformJSON is probably the best choice. If that's what you meant please let me know and I'll help with the Jolt spec to do the transformation.
... View more
02-25-2019
04:57 PM
It sounds like you're using ExecuteSQL as a source processor, meaning there are no incoming connections. In that case no flow file will be created. Instead, use GenerateFlowFile upstream from ExecuteSQL. You can schedule GenerateFlowFile for 10 minutes and set the content of the flow file to the SQL statement you want to execute. Then in ExecuteSQL you can schedule for 0 sec (as fast as possible, since the upstream processor runs every 10 mins) and remove the SQL Select Query property. At that point ExecuteSQL will expect the flow file content to contain SQL (which it will) and will execute that. If an error occurs, the flow file should be routed to failure, and you can use a PutEmail processor or something downstream for notification.
... View more
02-20-2019
10:44 PM
1 Kudo
I left an answer to your SO post, basically there is a bug or two preventing BLOB transfer from source to target DBs, but I presented a possible workaround.
... View more
02-20-2019
02:55 PM
1 Kudo
Is this a standard format (I know it's not proper JSON but is it some other standard format)? If not, you could write a ScriptedReader to parse your records. If all the data looks like the above though, you could use ReplaceText to replace ( with [ and ) with ], also remove the "u" prefix from some strings (like the address field), thereby converting it to proper JSON so you can use components like JsonTreeReader for example.
... View more
02-20-2019
02:51 PM
1 Kudo
AFAIK a thread can only be terminated like that manually, perhaps someone right-clicked on the processor and chose Terminate? A Terminated thread is really an "interrupted" thread, once it has been interrupted it should close gracefully but I don't believe there is any such guarantee. In any case, the processor should continue to run successfully even with terminated threads, although there may be an underlying issue with why someone terminated the thread to begin with (infinite timeout, e.g.)
... View more
02-15-2019
06:43 PM
It's possible (but would be quite unfortunate) that there was a Thrift change between 1.2.1 and 1.2.2, there were quite a few cases that went into 1.2.2.
... View more
02-15-2019
06:35 PM
1 Kudo
Yes we currently build with JDK 8 so we get Nashorn "for free". This will be the case when building/shipping with Java 9 and 10 as well. However Nashorn is deprecated in JDK 11 (but we'll allow it as an option as long as the ScriptEngine is still available in the JRE). We could consider adding support for Google V8 (perhaps via this library, although I'm not sure about licensing or native target) or others.
... View more
02-14-2019
06:28 PM
1 Kudo
If your ID values can come in out of order, then it's not a good choice for a Maximum Value Column. Usually timestamps or always-increasing values are used as Maximum Value Columns. If you don't have a column of this type, how would you know that the row was new or not? In the worst case you could keep a duplicate copy of the table as GenerateTableFetch/QueryDatabaseTable knows about it, then do a JOIN against the current table to find the new rows, but that is very resource-intensive and does not scale well at all.
... View more
02-14-2019
03:08 PM
The Hive processors in Apache NiFi work against Apache Hive 1.2.1, not Hive 2.x. Hive 2.x has difference in the Thrift interface from Hive 1.x, so you won't be able to use the NiFi Hive 1.2.x processors against any Hive 2.x instance (Apache or vendor). There may be work someday to support Hive 2.x but currently only Hive 1 and Hive 3 are supported in NiFi.
... View more
02-13-2019
08:08 PM
1 Kudo
For #1 you can use QueryDatabaseTable or GenerateTableFetch -> ExecuteSQL, you can set the Maximum Value Column property to your timestamp property. If you schedule the processor to run once a day, it will get all records added since the maximum timestamp observed the last time the processor ran. It doesn't have fidelity based on the timestamp value itself, instead it keeps track of the maximum value it's seen so far, then adds a WHERE clause to the SQL statement to get all rows with a timestamp greater than its maximum observed value so far. For those rows, it keeps track of the new current maximum value, and so on. For #2 you'd need a way to know a row was deleted in the source. If you can intercept when the DELETE statement is issued to the target, you could at that time issue a DELETE to the target. Alternatively if your database has Change Data Capture (CDC) support, you may be able to query the delta tables or something. For MySQL we have the CaptureChangeMySQL processor, which reads the binary logs and sends each event downstream in your flow. In that case you'd get an event for the delete, which you can change to a SQL delete statement for PutSQL, or (better to) use PutDatabaseRecord using the "statement.type" attribute, which you would set to the value of the "cdc.event.type" attribute via an UpdateAttribute processor. The "cdc.event.type" attribute is set by the CaptureChangeMySQL processor.
... View more