Member since
11-16-2015
905
Posts
666
Kudos Received
249
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 509 | 09-30-2025 05:23 AM | |
| 826 | 06-26-2025 01:21 PM | |
| 751 | 06-19-2025 02:48 PM | |
| 935 | 05-30-2025 01:53 PM | |
| 11682 | 02-22-2024 12:38 PM |
05-22-2018
12:05 PM
1 Kudo
I added an answer to that question, but it is likely unsatisfying as it is an open issue. The Hive driver used in the Hive processors is based on Apache Hive 1.2.x which does not support a handful of JDBC API methods used by those processors.
... View more
05-21-2018
02:16 PM
For approach #1, you could use the FlattenJson processor, you'll likely want to set the Separator property to "_" rather than the default "." since Hive adds the table name to each column in a ResultSet. For approach #2, you could have a single column table (column of type String), then you'd query it with get_json_object (example here). Alternatively if you can map all the types (including the complex types like array, list, struct, etc.) to a Hive table definition, you could use a JSON SerDe to write the data (example here).
... View more
05-18-2018
01:31 AM
What version of NiFi are you using, and what does the CREATE TABLE statement for the source and target tables look like? Is Oracle your target DB or is it a different DB? I ran with ExecuteSQL against Oracle 11 (with NiFi's master branch, so 1.6.0+ or "almost 1.7.0"), populated with your actual data (using the same PutDatabaseRecord but with a JsonTreeReader). It generated the same Avro schema you have above with the same data, I changed PutDatabaseRecord to use an AvroReader with Use Embedded Schema, and everything ran fine, inserting the rows successfully. I'm guessing you have an older version of NiFi that might be missing some fixes and/or improvements around logical type (timestamp, e.g.) handling.
... View more
05-17-2018
01:32 PM
In NiFi 1.7.0 (via NIFI-4456) you will be able to select that your JsonRecordSetWriter issue one JSON object per line, to give you the output you want. In this case with JoltTransformJSON you'd need a ConvertRecord after the fact, since it will issue an array if you pass in an array. As a workaround you might be able to use ReplaceText to remove the first and last square brackets, and change },{ to },\n{ to put each value on the same line.
... View more
05-16-2018
11:52 AM
Looking at the parquet-avro code, I think your suggestion of the workaround to change decimal values to fixed is the right approach (for now). We could update the version of parquet-avro but I didn't see anything in there that would improve your situation, it was Impala that needed to support more incoming types.
... View more
05-15-2018
04:37 PM
Since this isn't related to the original question, please ask this as its own standalone question and I'd be happy to answer it. (The short answer is you might be able to use UpdateAttribute to change the 4 to the right column number for Table B if you can figure out whether a flow file is for Table A or B)
... View more
05-15-2018
02:22 PM
It comes from knowing that the ProcessContext passed into the script is actually a StandardProcessContext which has some variables you can get at with Groovy (provided there is no SecurityManager as mentioned).
... View more
05-15-2018
02:16 PM
What version of avro-tools are you using? You'll need something of version 1.8.x or later to support logical types. Also you won't see legit data for a decimal type as they are encoded in Avro as two-complement big-endian integer values, so they'll show up as something weird in the JSON output.
... View more
05-15-2018
01:43 AM
2 Kudos
Fetch Size is a hint to the driver that says "when I ask for a row, please send X rows over, if I need more rows I will iterate through them before asking you to send more rows". If Fetch Size was 1, the driver would send a network request to the DB, the DB would send a single row back, it would be processed by the client, then for the next row another network request would be sent, and so on. This is very inefficient as the network overhead would overtake the amount of data being passed (especially for tables with a small number of rows). On the other hand, if the entire table was sent at the first request, the client could run out of memory, especially for tables with a very large number of rows. As a tuning parameter, many drivers allow the client to specify the Fetch Size in order to select the appropriate number of rows to send during a network request/response. For example I think the Hive driver's default is 50 rows, or at least it used to be. Maximum Number of Fragments is an advanced user property usually only used for debugging. It basically stops fetching rows after a certain number have been processed, even if the result set has more rows available. For example, if Max Rows Per Flow File were set to 100, and Max Number of Fragments were set to 1, and a query returned 101 rows, then only 1 flow file (aka "fragment" in this case) would be sent, containing the first 100 rows. The 101th row would not be processed or sent in any flow file. Re-running the processor would have the exact same effect, and thus the 101th row is "unreachable". Maximum Number of Fragments was added to let the user avoid memory problems for large tables, as it would put an upper bound on the number of rows that would be processed. As mentioned, certain rows would not be available, but the processor wouldn't run out of memory either. Hope this helps, please let me know if you'd like more clarification.
... View more
05-12-2018
07:26 PM
1 Kudo
The schema is a valid Avro schema, but the problem is that the CSVReader is taking only the second field (a double) and trying to convert it into an array. It does not realize you'd like to convert the remaining column values into an array. This seems like a decent improvement for the CSVReader, I'll think it over and perhaps write an Improvement Jira for it (or feel free to write one yourself). Since you want JSON as the target, you can use JoltTransformJSON to get the output you want. If it were an arbitrary output format, you could use UpdateRecord, but you'd have 35 or so user-defined fields to put the individual column values into the array. In any case, you can use this schema to parse the column names (whether or not you have a header line, just make sure you skip any header line if using this schema): {
"type": "record",
"namespace": "testavro.schema",
"name": "test",
"fields": [
{ "name": "timestamp", "type": {"type": "long","logicalType": "timestamp-millis"} },
{ "name": "tupvalue0", "type" : "double" },
{ "name": "tupvalue1", "type" : "double" },
{ "name": "tupvalue2", "type" : "double" },
{ "name": "tupvalue3", "type" : "double" },
{ "name": "tupvalue4", "type" : "double" },
{ "name": "tupvalue5", "type" : "double" },
{ "name": "tupvalue6", "type" : "double" },
{ "name": "tupvalue7", "type" : "double" },
{ "name": "tupvalue8", "type" : "double" },
{ "name": "tupvalue9", "type" : "double" },
{ "name": "tupvalue10", "type" : "double" },
{ "name": "tupvalue11", "type" : "double" },
{ "name": "tupvalue12", "type" : "double" },
{ "name": "tupvalue13", "type" : "double" },
{ "name": "tupvalue14", "type" : "double" },
{ "name": "tupvalue15", "type" : "double" },
{ "name": "tupvalue16", "type" : "double" },
{ "name": "tupvalue17", "type" : "double" },
{ "name": "tupvalue18", "type" : "double" },
{ "name": "tupvalue19", "type" : "double" },
{ "name": "tupvalue20", "type" : "double" },
{ "name": "tupvalue21", "type" : "double" },
{ "name": "tupvalue22", "type" : "double" },
{ "name": "tupvalue23", "type" : "double" },
{ "name": "tupvalue24", "type" : "double" },
{ "name": "tupvalue25", "type" : "double" },
{ "name": "tupvalue26", "type" : "double" },
{ "name": "tupvalue27", "type" : "double" },
{ "name": "tupvalue28", "type" : "double" },
{ "name": "tupvalue29", "type" : "double" },
{ "name": "tupvalue30", "type" : "double" },
{ "name": "tupvalue31", "type" : "double" },
{ "name": "tupvalue32", "type" : "double" },
{ "name": "tupvalue33", "type" : "double" },
{ "name": "tupvalue34", "type" : "double" }
]
} Your JSONRecordSetWriter can inherit the schema or use the same one from the registry. Now you can send the output to either UpdateRecord or (for this case) JoltTransformJSON, the following is a Chain spec you can use to put the tupvalues into an array: [
{
"operation": "shift",
"spec": {
"*": {
"timestamp": "[&1].timestamp",
"tupvalue*": "[&1].tupvalue[&(0,1)]"
}
}
}
] This should give you the desired output and works on single lines of CSV or (more efficiently for) full CSV files.
... View more