Member since
07-29-2020
574
Posts
323
Kudos Received
176
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2143 | 12-20-2024 05:49 AM | |
| 2440 | 12-19-2024 08:33 PM | |
| 2189 | 12-19-2024 06:48 AM | |
| 1458 | 12-17-2024 12:56 PM | |
| 2084 | 12-16-2024 04:38 AM |
12-18-2023
06:06 AM
Is the problem that the other system doesnt take negative values?
... View more
12-18-2023
05:45 AM
Hi @MWM , The reason you are getting negative value is because the minimum date in epoch is "January 1st, 1970" which corresponds to "0". It turns out you can still convert the negative value back to date using the following expression: ${my_date:toDate():format("dd-MM-yyyy")} If that helps please accept solution. Thanks
... View more
12-15-2023
08:49 AM
Hi @MWM , There are couple of issues I noticed regarding the input json and the spec: 1- Regarding the "paycheck" value "3456,98" , I noticed there is coma (,) is that correct? if that is the case , Im not sure the conversion will work in this case. You have to probably replace the coma with dot (.) and then convert toDouble. It takes few steps to do this in jolt since there is no string replace function using split, join as follows: ,
{
"operation": "modify-overwrite-beta",
"spec": {
....
"paycheckArray": "=split('[,]',@(1,paycheck))",
"paycheckJoin": "=join('.',@(1,paycheckArray))",
"paycheck": "=toDouble(@(1,paycheckJoin))",
....
}
}
,
{
"operation": "remove",
"spec": {
"paycheckArray": "",
"paycheckJoin": ""
}
} 2- Regarding the boolean value "isOfAge" , since you have moved it under person object, the spec would as follows: {
"operation": "modify-overwrite-beta",
"spec": {
...
"person": {
"isOfAge": "=toBoolean(@(1,isOfAge))"
}
....
}
} 3- Regarding converting the date value "joiningDate" to number , this is tricky and I dont think you can do it via Jolt, you either have to extract it using EvaluateJsonPath as an attribute then do update attribute using expression language to do the proper formatting and conversion. Since JoltTransformation spec allows expression language , you can reference this attribute in the modify spec. here is an example: https://stackoverflow.com/questions/76678896/jolt-how-to-convert-datetimestamp-to-epoch-on-dynamic-json-keys Another option is to use UpdateRecord to do the proper formatting, conversion for all attributes that require such thing and then just use basic shift spec to re arrange values accordingly. With UpdateRecord you need to use Avro Schema to define the proper types in the JsonRecordSetWriter. UpdateRecord: JsonRecordSetWriter: AvroSchema in the Schema Text Property: {
"type": "record",
"name": "nifi-record",
"fields":
[
{ "name": "id", "type": "string"},
{ "name": "system", "type": "string"},
{ "name": "sourceSystem", "type": "string"},
{ "name": "name", "type": "string"},
{ "name": "lastName", "type": "string"},
{ "name": "mail", "type": "string"},
{ "name": "country", "type": "string"},
{ "name": "city", "type": "string"},
{ "name": "street", "type": "string"},
{ "name": "building", "type": "string"},
{ "name": "local", "type": "string"},
{ "name": "paycheck", "type": "double"},
{ "name": "joiningDate", "type": { "type":"int", "logicalType":"date" }},
{ "name": "isOfAge", "type": ["boolean","null"]}
]
} If you find this helpful please accept solution. Thanks
... View more
12-14-2023
07:03 AM
It seems that you need to process the 500 records once. In this case you dont need to start the ExecuteSQL processor nor worry about scheduling it unless you want to run it again in the future. Keep the processor stopped then right click on it and select Run Once. This should load the total number of records one time.
... View more
12-14-2023
06:44 AM
@FrankHaha, I need to understand the following: 1- Does the table have total of 500 records and if so are those records static or are they going to be modified over time and you need to keep processing them ? 2- Do you need to run the ExecuteSQL multiple times to process the same\updated 500 records or do you just need to process them once ? each run of ExecuteSQL with the provided select query will fetch the total records, so if you start the processor to start running indefinitly where the Run Schedule under the Scheduling tab is set to 0 then it will keep fetching the same total records again and again and it will fill up the queue .
... View more
12-14-2023
05:54 AM
Hi @Eva0 , You were close . I think the first astrik "*" was not needed because the input json is not contained in an array brackets. Then to include the email in each data element it has to be added at the date & type level but to reference the value you need to go to 2 levels up using the @ character as in @(2,mail) where the email is located in relative to date & type. [
{
"operation": "shift",
"spec": {
//level 2 where email originally is
"dates": {
// level 1
"*": {
// start count here at level 0
"@(2,email)": "[&1].email",
"date": "[&1].date",
"type": "[&1].type"
}
}
}
}
] If you find this helpful please accept solution. Thanks
... View more
12-13-2023
10:52 AM
@FrankHaha, Is there only 500 records in your table and that is all the records you want to processes? If so then you dont need to create schedule , you can run it once using the query in the "SQL Select Query" property: select * from public.art_title If you use the ExecuteSQLRecord then you can set the RecordWriter to Json using JsonRecordSetWriter, or XML using XMLRecordSetWriter to get the needed format, this way you save yourself having to convert from Avro to Json then Json to XML. You can get the result in XML right way. Also if you set the property "Max Rows Per FlowFile" to 1 , then it will create only one record per flowfile so there is no need to split. Hope that helps.
... View more
12-12-2023
06:15 PM
Hi @FrankHaha , The QueryDatabaseTable processor is used mainly when you are having a dynamic data table where the data is continuously being added\updated. The provided Maximum Value Column will always help tack the delta by saving the last max value retrieved from the last generated query and so on. This Max value column works better for columns where values increase over time for example numerical , datetime values. For string values it can work if that added\changed is always guaranteed to be greater than previous max value otherwise you might get unpredictable results. In your case, when you say that the "queue will have a lot of flowfile and it will mess up" can you explain that more? for example are you getting more flowfiles than whats in the table, getting duplicates or getting error? what exactly happen? If you have fixed number of records that is not going to change in the future as you are indicating there is 500 records and you want to process the data onetime , then you dont need to use QueryDatabaseTable with max value column, you can use ExecuteSQL or ExecuteSQLRecord and specify the select statement in the "SQL Select Query" property , In the ExecuteSQL you will get the result in Avro format but if you use the ExecuteSQLRecord you can decide the output format in the RecordWriter , for example if you are looking to convert data into Json or XML right away without having to go through Avro. If that helps please accept solution. Thanks
... View more
12-12-2023
06:57 AM
Can you post screenshot of the UpdateRecord processor configuration? Also you have to be careful with the provided input because there is an extra comma after last Garry value which makes the json invalid.
... View more
12-11-2023
11:14 AM
1 Kudo
@Anderosn, Another option is to use the UpdateRecod with an Avro schema in the JsonRecordSetWriter that reflects the actual json structure coming out of the ExecuteSQL The UpdateRecord will look like this: The JsonRecorSetWriter looks like this: The AvroSchema provided in the ShemaText property is the following: {
"name": "nifi",
"type": "record",
"namespace": "nifi.com",
"fields": [
{
"name": "SEARCH_RESULT",
"type": {
"name": "SEARCH_RESULT",
"type": "record",
"fields": [
{
"name": "requestId",
"type": "string"
},
{
"name": "responseData",
"type": {
"name": "responseData",
"type": "array",
"items": {
"name": "responseData",
"type": "record",
"fields": [
{
"name": "searchKey",
"type": "string"
},
{
"name": "data",
"type": {
"name": "data",
"type": "array",
"items": {
"name": "data",
"type": "record",
"fields": [
{
"name": "firstName",
"type": "string"
},
{
"name": "lastName",
"type": "string"
}
]
}
}
}
]
}
}
}
]
}
}
]
} This will produce the following output json out of the UpdateRecord: [ {
"SEARCH_RESULT" : {
"requestId" : "203680",
"responseData" : [ {
"searchKey" : "cardNumber",
"data" : [ {
"firstName" : "Martin",
"lastName" : "Garry"
}, {
"firstName" : "Martin",
"lastName" : "Garry"
}, {
"firstName" : "Martin",
"lastName" : "Garry"
} ]
} ]
}
} ] You can use EvaluateJsonPath to get the data as json array to do the needed processing. If that helps please accept solution. Thanks
... View more