Member since
04-06-2022
13
Posts
1
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
735 | 01-12-2023 06:48 AM |
04-12-2023
05:45 AM
Ok, now that makes sense. I downloaded from the official repository Index of /dist/nifi (apache.org) and I'm using version 1.13.2 (due to some project requirements)
... View more
01-13-2023
03:26 AM
Ok, I understand now what you are doing. The problem is that you are trying to merge information from one FlowFile to another, and this can't be done in the way you designed. Two FlowFiles mean two separate processes, they don't communicate with one another. You have to use one single FlowFile for the entire process if you need information from both the JSON and XML. I suggest putting the EvaluateXPath after the UpdateAttributes processor, setting the Destination property to "flowfile-attribute" in a way you will have all the attributes in the same flowfile (titlefromXML, guid, Contact Name, caseno, and docno) to use in the JOLT specification. -- Was your question answered? Please take some time to click on "Accept as Solution" below this post. If you find a reply useful, say thanks by clicking on the thumbs up button.
... View more
01-12-2023
03:44 AM
Idk if it is much of a help, but you can try checking the Jolt Transform quick reference There's this link to test JOLT specifications. You can try starting with something like [{
"operation": "shift",
"spec": {
"*": "&"
}
}, {
"operation": "default",
"spec": {
"*": {
"id": "&(1,0)"
}
}
}]
... View more
01-12-2023
02:15 AM
You can try modifying the Record Reader avro schema to something like this: {"name": "xxx", "type": "string"], "default": ""}
... View more
01-12-2023
01:09 AM
Hello, This may be a silly question but I'm in a bit of a loss and have been stuck for a couple days now trying to find a solution. I have a flow design where I have to validate a record against an avro schema and insert that record in a Kudu table. The avro validation schema is: {
"type" : "record",
"name" : "tbcertificati",
"fields" : [
{ "name" : "num_caso", "type" : "string" },
{ "name" : "num_rch", "type" : "string" },
{ "name" : "num_cert", "type" : "string" },
{ "name" : "seqno", "type" : "string" },
{ "name" : "data_inizio_kudu", "type" : {
"type":"long",
"logicalType":"timestamp-millis"}},
{ "name" : "num_vis", "type" : "double" },
{ "name" : "data_compilaz_cert", "type" : {
"type":"long",
"logicalType":"timestamp-millis"}},
{ "name" : "data_acquisiz_cert", "type" : {
"type":"long",
"logicalType":"timestamp-millis"}},
{ "name" : "cod_matr_med", "type" : "double" },
{ "name" : "medico_struttura", "type" : "string" },
{ "name" : "flag_int_ext", "type" : "string" },
{ "name" : "cod_cert", "type" : "string" },
{ "name" : "cod_prognosi", "type" : "double" },
{ "name" : "data_iniz_prognosi", "type" : {
"type":"long",
"logicalType":"timestamp-millis"}},
{ "name" : "data_fine_prognosi", "type" : {
"type":"long",
"logicalType":"timestamp-millis"}},
{ "name" : "desc_cert", "type" : "string" },
{ "name" : "data_ripresa_lav", "type" : {
"type":"long",
"logicalType":"timestamp-millis"}},
{ "name" : "flag_postumi", "type" : "string" },
{ "name" : "flag_medico_stru", "type" : "string" },
{ "name" : "note", "type" : "string" },
{ "name" : "prg", "type" : "double" },
{ "name" : "crt_time", "type" : {
"type":"long",
"logicalType":"timestamp-millis"}},
{ "name" : "id_usr_crt", "type" : "string" },
{ "name" : "mod_time", "type" : {
"type":"long",
"logicalType":"timestamp-millis"}},
{ "name" : "id_usr_mod", "type" : "string" },
{ "name" : "fl_annullato", "type" : "string" },
{ "name" : "dm_timestamp", "type" : "string" },
{ "name" : "dm_txid", "type" : "string" },
{ "name" : "dm_operation_type", "type" : "string" }
]
} A record can arrive like this: {
"DM_TIMESTAMP" : "1970-01-01 00:00:00.000",
"DM_TXID" : "1",
"DM_OPERATION_TYPE" : "A",
"DM_USER" : "STRING",
"NUM_CASO" : "1",
"NUM_RCH" : "1",
"NUM_CERT" : "1",
"NUM_VIS" : "",
"DATA_COMPILAZ_CERT" : "1970-01-01 00:00:00",
"DATA_ACQUISIZ_CERT" : "",
"COD_MATR_MED" : "",
"MEDICO_STRUTTURA" : "",
"FLAG_INT_EXT" : "A",
"COD_CERT" : "A",
"COD_PROGNOSI" : "1",
"DATA_INIZ_PROGNOSI" : "",
"DATA_FINE_PROGNOSI" : "",
"DESC_CERT" : "",
"DATA_RIPRESA_LAV" : "1970-01-01",
"FLAG_POSTUMI" : "",
"FLAG_MEDICO_STRU" : "A",
"NOTE" : "",
"PRG" : "1",
"CRT_TIME" : "1970-01-01 00:00:00",
"ID_USR_CRT" : "STRING",
"MOD_TIME" : "1970-01-01 00:00:00",
"ID_USR_MOD" : "STRING",
"FL_ANNULLATO" : "A",
"SEQNO" : "1",
"DATA_INIZIO_KUDU" : 1673274352029
} The putKudu processor is as following: Validation record always fails, and if trying to directly insert the record with the PutKudu processor, I get the error: Failed to write due to Can't set primary key column foo to null : java.land.IllegalArgumentException: Can't set primary key column num_caso to null I have trying capitalizing/decapitalizing the field names, with no success. The field clearly has a value associated to it, I don't understand why it is said to be null. Thanks in advance.
... View more
Labels:
- Labels:
-
Apache Kudu
-
Apache NiFi
07-17-2022
11:42 PM
Finally, it is working as intended. Both invokehttp processors are working. This article worked for me: Chaining Multiple Http API's in Apache NiFi | Medium
... View more
07-14-2022
09:19 AM
Hello, NiFi Version: 1.13.2 I am executing a python script within the Execute Script processor running on all nodes. Sometimes it runs all right, but sometimes it gets stuck and do not get flowfiles from the queue, throwing a NullPointerException. As far as I understood, it creates conflicts with the flowfiles resulting in a few files pending in the queue without the possibility to empty it. When right-clicking the queue (with a few hundred flowfiles in there) and selecting "Empty Queue", I just get the message ("X out of Y flowfiles removed"; where Y > X), or "No flowfiles removed" if attempting for a second time. Then, after few minutes, one cluster node crashes. The node error is the following: 07/14/2022 17:41:46 CEST: Node Status changed from CONNECTING to DISCONNECTED due to Proposed configuration is not inheritable by the flow controller because of flow differences: Found difference in Flows: Local Fingerprint: e48-a975-21705dd65dfa0e0aca15-0c1c-3c8a-9752-179b7491e6a273ebc2c5-31cb-37df-8dcf-5b955bce3d21FUNNELff90c485-cfc9-3b28-9113-f543ed94606b73ebc2c5-31cb-37df-8dcf-5b955bce3d21PROCESSORNO_VALUEDO_NOT_LOAD_ Cluster Fingerprint: e48-a975-21705dd65dfa0e0aca15-0c1c-3c8a-9752-179b7491e6a273ebc2c5-31cb-37df-8dcf-5b955bce3d21FUNNELf379500b-ae38-1ee5-0000-000021eee75273ebc2c5-31cb-37df-8dcf-5b955bce3d21PROCESSORNO_VALUEDO_NOT_LOAD_ I tried offloading the node and reconnecting it but it didn't work and the same error as above persists. Is this some known issue? When running a script accross all nodes causes flowfile conflicts? (I still didn't test running the same load through this processor with the "primary node" execution option - need to wait all nodes to be back up again) The python script I'm running is here on github. Thanks a lot in advance, S.
... View more
Labels:
- Labels:
-
Apache NiFi
07-07-2022
03:42 PM
1 Kudo
@linssab , You are probably running into this issue described in NIFI-9241, which has been fixed on NiFi 1.15. Cheers, André
... View more