Member since
11-16-2015
911
Posts
668
Kudos Received
249
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 699 | 09-30-2025 05:23 AM | |
| 1071 | 06-26-2025 01:21 PM | |
| 930 | 06-19-2025 02:48 PM | |
| 1100 | 05-30-2025 01:53 PM | |
| 12263 | 02-22-2024 12:38 PM |
05-16-2018
11:52 AM
Looking at the parquet-avro code, I think your suggestion of the workaround to change decimal values to fixed is the right approach (for now). We could update the version of parquet-avro but I didn't see anything in there that would improve your situation, it was Impala that needed to support more incoming types.
... View more
05-15-2018
04:37 PM
Since this isn't related to the original question, please ask this as its own standalone question and I'd be happy to answer it. (The short answer is you might be able to use UpdateAttribute to change the 4 to the right column number for Table B if you can figure out whether a flow file is for Table A or B)
... View more
05-15-2018
02:22 PM
It comes from knowing that the ProcessContext passed into the script is actually a StandardProcessContext which has some variables you can get at with Groovy (provided there is no SecurityManager as mentioned).
... View more
05-15-2018
02:16 PM
What version of avro-tools are you using? You'll need something of version 1.8.x or later to support logical types. Also you won't see legit data for a decimal type as they are encoded in Avro as two-complement big-endian integer values, so they'll show up as something weird in the JSON output.
... View more
05-15-2018
01:43 AM
2 Kudos
Fetch Size is a hint to the driver that says "when I ask for a row, please send X rows over, if I need more rows I will iterate through them before asking you to send more rows". If Fetch Size was 1, the driver would send a network request to the DB, the DB would send a single row back, it would be processed by the client, then for the next row another network request would be sent, and so on. This is very inefficient as the network overhead would overtake the amount of data being passed (especially for tables with a small number of rows). On the other hand, if the entire table was sent at the first request, the client could run out of memory, especially for tables with a very large number of rows. As a tuning parameter, many drivers allow the client to specify the Fetch Size in order to select the appropriate number of rows to send during a network request/response. For example I think the Hive driver's default is 50 rows, or at least it used to be. Maximum Number of Fragments is an advanced user property usually only used for debugging. It basically stops fetching rows after a certain number have been processed, even if the result set has more rows available. For example, if Max Rows Per Flow File were set to 100, and Max Number of Fragments were set to 1, and a query returned 101 rows, then only 1 flow file (aka "fragment" in this case) would be sent, containing the first 100 rows. The 101th row would not be processed or sent in any flow file. Re-running the processor would have the exact same effect, and thus the 101th row is "unreachable". Maximum Number of Fragments was added to let the user avoid memory problems for large tables, as it would put an upper bound on the number of rows that would be processed. As mentioned, certain rows would not be available, but the processor wouldn't run out of memory either. Hope this helps, please let me know if you'd like more clarification.
... View more
05-12-2018
07:26 PM
1 Kudo
The schema is a valid Avro schema, but the problem is that the CSVReader is taking only the second field (a double) and trying to convert it into an array. It does not realize you'd like to convert the remaining column values into an array. This seems like a decent improvement for the CSVReader, I'll think it over and perhaps write an Improvement Jira for it (or feel free to write one yourself). Since you want JSON as the target, you can use JoltTransformJSON to get the output you want. If it were an arbitrary output format, you could use UpdateRecord, but you'd have 35 or so user-defined fields to put the individual column values into the array. In any case, you can use this schema to parse the column names (whether or not you have a header line, just make sure you skip any header line if using this schema): {
"type": "record",
"namespace": "testavro.schema",
"name": "test",
"fields": [
{ "name": "timestamp", "type": {"type": "long","logicalType": "timestamp-millis"} },
{ "name": "tupvalue0", "type" : "double" },
{ "name": "tupvalue1", "type" : "double" },
{ "name": "tupvalue2", "type" : "double" },
{ "name": "tupvalue3", "type" : "double" },
{ "name": "tupvalue4", "type" : "double" },
{ "name": "tupvalue5", "type" : "double" },
{ "name": "tupvalue6", "type" : "double" },
{ "name": "tupvalue7", "type" : "double" },
{ "name": "tupvalue8", "type" : "double" },
{ "name": "tupvalue9", "type" : "double" },
{ "name": "tupvalue10", "type" : "double" },
{ "name": "tupvalue11", "type" : "double" },
{ "name": "tupvalue12", "type" : "double" },
{ "name": "tupvalue13", "type" : "double" },
{ "name": "tupvalue14", "type" : "double" },
{ "name": "tupvalue15", "type" : "double" },
{ "name": "tupvalue16", "type" : "double" },
{ "name": "tupvalue17", "type" : "double" },
{ "name": "tupvalue18", "type" : "double" },
{ "name": "tupvalue19", "type" : "double" },
{ "name": "tupvalue20", "type" : "double" },
{ "name": "tupvalue21", "type" : "double" },
{ "name": "tupvalue22", "type" : "double" },
{ "name": "tupvalue23", "type" : "double" },
{ "name": "tupvalue24", "type" : "double" },
{ "name": "tupvalue25", "type" : "double" },
{ "name": "tupvalue26", "type" : "double" },
{ "name": "tupvalue27", "type" : "double" },
{ "name": "tupvalue28", "type" : "double" },
{ "name": "tupvalue29", "type" : "double" },
{ "name": "tupvalue30", "type" : "double" },
{ "name": "tupvalue31", "type" : "double" },
{ "name": "tupvalue32", "type" : "double" },
{ "name": "tupvalue33", "type" : "double" },
{ "name": "tupvalue34", "type" : "double" }
]
} Your JSONRecordSetWriter can inherit the schema or use the same one from the registry. Now you can send the output to either UpdateRecord or (for this case) JoltTransformJSON, the following is a Chain spec you can use to put the tupvalues into an array: [
{
"operation": "shift",
"spec": {
"*": {
"timestamp": "[&1].timestamp",
"tupvalue*": "[&1].tupvalue[&(0,1)]"
}
}
}
] This should give you the desired output and works on single lines of CSV or (more efficiently for) full CSV files.
... View more
05-08-2018
01:07 PM
4 Kudos
Do you mean MergeContent rather than UpdateAttribute? The former merges incoming flow files' content into outgoing flow file(s), the latter just adds/deletes/changes metadata about the flow files. If you mean MergeContent, try setting the Demarcator field to the newline character (\n), that should separate the incoming messages by a new line.
... View more
05-05-2018
01:42 AM
Unfortunately at the time of this answer, that field is not being populated by the framework and thus doesn't show up in the output. I have written NIFI-5155 to cover this improvement. Please feel free to comment on the Jira case as to whether you'd like to see the IP, hostname, or both, thanks in advance!
... View more
05-04-2018
01:52 PM
I use the Advanced UI in the JoltTransformJSON processor or this webapp to test out specs, also there are a bunch of examples and doc in the javadoc but it can be a bit difficult to follow. You can also search the jolt tag on StackOverflow for a number of questions, answers, and examples.
... View more
05-03-2018
09:33 PM
There's a bulletinNodeAddress field, it's probably an IP not a hostname (I didn't check), would that work?
... View more