Member since
11-16-2015
890
Posts
648
Kudos Received
245
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
937 | 02-22-2024 12:38 PM | |
810 | 02-02-2023 07:07 AM | |
2159 | 12-07-2021 09:19 AM | |
3444 | 03-20-2020 12:34 PM | |
11643 | 01-27-2020 07:57 AM |
02-02-2023
07:07 AM
I'm not a Hive expert but I did author the original PutHive3Streaming processor for NiFi. My recommendation is setting Records Per Transaction greater than the number of records in a FlowFile (unless we are talking about super-huge files), and transactions per batch to 1. This makes the transaction semantics similar to how NiFi FlowFile sessions work (rollback, failure, success, e.g.). If the number of records is huge and is causing throughput problems, try dividing that number by 100 and making transactions per batch 100. When you multiply the two numbers together it should be greater than the total number of records in the FlowFile in order to avoid overhead with the Hive Metastore by requesting a large number of batches/transactions.
... View more
12-20-2022
05:52 AM
I wasn't able to reproduce this, I remember trying your example and the UPSERT worked for me, so I'm not sure what's going on
... View more
11-02-2022
05:37 AM
1 Kudo
Agreed, you do not have access to the fields in either the incoming or outgoing JSON objects using Expression Language in the spec.
... View more
10-05-2022
08:31 AM
I believe the type checking for logical types is more strict now as of https://issues.apache.org/jira/browse/AVRO-2493 and NiFi 1.17.0 (when we upgraded to Avro 1.11.1). Are you using "int" or "string" as the normal Avro type? According the spec (https://avro.apache.org/docs/1.11.1/specification/#timestamp-millisecond-precision) it must be "long".
... View more
12-07-2021
09:19 AM
The operation to add an attribute to a FlowFile is on the ProcessSession object not the FlowFile itself (so the session can keep track of changes). Try the following instead: session.putAttribute(destFlowFile, , "logMsg", "Testing Msg") session.putAllAttributes(destFlowFile, backupAttributes)
... View more
03-24-2021
03:23 PM
1 Kudo
What are the column names in your table? Assuming "carId" and "carType", you can use JoltTransformJson or JoltTransformRecord with the following spec: [ { "operation": "shift", "spec": { "*": { "$": "carId", "@": "carType" } } }, { "operation": "shift", "spec": { "carId": { "*": { "@": "[&0].carId" } }, "carType": { "*": { "@": "[&0].carType" } } } } ]
... View more
02-09-2021
02:12 PM
1 Kudo
If you use GrokReader you can use the same kv filter from logstash: https://community.cloudera.com/t5/Support-Questions/Grok-Patterns-Expressions-for-capturing-comma-separated-key/td-p/311126
... View more
01-29-2021
04:54 PM
Is there anything in the logs before/after the "already marked for transfer" entry? Trying to figure out how a flow file can get transferred and then something goes wrong (where we'd try to also send it to failure)
... View more
04-17-2020
08:07 AM
You can use Expression Language in the Max-Value Columns property to set them per-flowfile, but there currently isn't any way to fetch the primary key column(s) from the database and use those as the max-value columns. You could do that in upstream processors though, then set an attribute to those columns and pass that into GenerateTableFetch.
... View more
03-20-2020
12:34 PM
1 Kudo
You can refer to the "Fields" output field explicitly instead of needing another shift: [
{
"operation": "shift",
"spec": {
"*": {
"CUST_AC_NO": "[&1].ExternalSystemIdentifier",
"BRANCH_CODE": "[&1].Fields.FLD0001",
"CUST_NO": "[&1].Fields.FLD0002",
"AC_DESC": "[&1].Fields.FLD0003"
}
}
},
{
"operation": "default",
"spec": {
"*": {
"InstitutionId": "1"
}
}
}
]
... View more