Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11284 | 04-15-2020 05:01 PM | |
| 7179 | 10-15-2019 08:12 PM | |
| 3165 | 10-12-2019 08:29 PM | |
| 11624 | 09-21-2019 10:04 AM | |
| 4396 | 09-19-2019 07:11 AM |
03-30-2018
01:06 PM
@Stefan Constantin The issue is with your json schema because schema key names are not matching with json message key names as json schema having lower case,camel case in name value but json message having upper case names (inv_idn,INV_IDN) Change the schema in Avro Schema registry to match with incoming json names(i.e upper case) everything will work as expected Json Schema:- {
"type": "record",
"name": "DnpReport",
"fields" : [
{"name": "INV_IDN", "type": ["null", "string"]},
{"name": "INV_NUMBER", "type": ["null", "string"]},
{"name": "USR_MDF", "type": ["null", "string"]},
{"name": "INVISSDAT", "type": ["null", "string"]},
{"name": "INVCLICOD", "type": ["null", "string"]},
{"name": "INVCLINAM", "type": ["null", "string"]},
{"name": "INVCLI_REGCOUNTRY", "type": ["null", "string"]},
{"name": "INVOICECLASS", "type": ["null", "string"]},
{"name": "CORCLICOD", "type": ["null", "string"]},
{"name": "OFOCHINMB", "type": ["null", "string"]},
{"name": "BSNCTGDSC", "type": ["null", "string"]},
{"name": "BSNCTGCOD", "type": ["null", "string"]},
{"name": "BSNCTGDSPCOD", "type": ["null", "string"]},
{"name": "INVTYP", "type": ["null", "string"]},
{"name": "INVSTU", "type": ["null", "string"]},
{"name": "INVORGCOD", "type": ["null", "string"]},
{"name": "CRYCOD", "type": ["null", "string"]},
{"name": "AMNWTHVAT", "type": ["null", "string"]},
{"name": "AMNWTHVATINEUR", "type": ["null", "string"]},
{"name": "MEDIAFEESINEUR", "type": ["null", "string"]},
{"name": "KCFEESINEUR", "type": ["null", "string"]}
]
} Output:- INV_IDN,INV_NUMBER,USR_MDF,INVISSDAT,INVCLICOD,INVCLINAM,INVCLI_REGCOUNTRY,INVOICECLASS,CORCLICOD,OFOCHINMB,BSNCTGDSC,BSNCTGCOD,BSNCTGDSPCOD,INVTYP,INVSTU,INVORGCOD,CRYCOD,AMNWTHVAT,AMNWTHVATINEUR,MEDIAFEESINEUR,KCFEESINEUR
247048764,181120060,15/03/2018 08:34:00 by LDL,15/03/2018,FUNDQ,FUNDQUEST,FR,CUS - assujetti in EU outside Lx,BNPAMFR,20173748,Fund Data Management,LIS,FDM,Credit Note,Validated,LU,EUR,"-7,543.23","-7,543.23",0,-7543.23 Reference template:- 181962-csv.xml
... View more
03-29-2018
09:02 PM
@siva vulli
You can use split json processor, this processor will split json array of messages into individual messages as content of each flowfile i.e if your json array having 100 messages in it then split json processor splits relation will output 100 flowfiles having each message in it. JsonPath Expression $.* with this configuration we are splitting json array into individual messages. Input to splitjson processor:- [{"Id":"43","name":"ABC"},{"Id":"44","name":"Xyz"}] Output from splitjson processor:- We are going to have 2 flowfiles(as we are having 2 json messages in array) flowfile1:- {"Id":"43","name":"ABC"} flowfile2:- {"Id":"44","name":"Xyz"} As you mentioned in the question your expected output would be {"Id":"43","name":"ABC"}{"Id":"44","name":"Xyz"} for this case after splitjson processor use merge content processor now in this merge content processor we are merging all the splits into one flowfile by using merge strategy as Defragment Use Merged relation from merge content processor which will give your desired results. Output from Merge Content processor:- {"Id":"43","name":"ABC"}{"Id":"44","name":"Xyz"} Flow:- 1.SplitJson processor //for splitting arrays into individual messages
2.Merge content //to merge the splitted messages into one flowfile content using defragment strategy (or) Method2:- By using Replace text processor:- {"Id":"43","name":"ABC"}{"Id":"44","name":"Xyz"} Search Value \[(.*?)\] Replacement Value $1
Maximum Buffer Size 1 MB //needs to change this value if your json message size is more than 1 MB Replacement Strategy Regex Replace Evaluation Mode Entire text
In this processor we are capturing all the json message without [](squarebrackets) and replacing that as flowfile content Input:- [{"Id":"43","name":"ABC"},{"Id":"44","name":"Xyz"}] Output:- Now in output flowfile content we are not having square brackets. {"Id":"43","name":"ABC"},{"Id":"44","name":"Xyz"} Use another Replace text processor to replace "}, with "} Now we are literally checking for the "}, in content and replacing with "} Configs:- Search Value "}, Replacement Value "}
Maximum Buffer Size 1 MB //needs to change this value if your json message size is more than 1 MB Replacement Strategy Literal Replace Evaluation Mode Entire text
Input flowfile:- {"Id":"43","name":"ABC"},{"Id":"44","name":"Xyz"} Output Flowfile:- {"Id":"43","name":"ABC"}{"Id":"44","name":"Xyz"} Flow:- 1.Replacetext processor //to capture all the content of flowfile without square brackets([])
2.Replacetext processor //replace "}, with "} In both methods we are going to have same output as you can decide which method best fits for your case. Let us know if you are having any issues ..!!
... View more
03-29-2018
04:25 PM
@Mark McGowan Try this template once 181963-loop-invokehttp.xml if you are still facing issues then i have added all processor configs below Flow:- Generate flowfile leave as default and schedule by using cron (or) timer driven and running on primary node. Update Attribute Configs:- Add new property dd
${dd:isNull():ifElse('20170101','${dd:toDate("yyyyMMdd"):toNumber():plus(100800000):format("yyyyMMdd")}')} Invoke http configs:- Remote URL
http://api.wunderground.com/api/dfe2f11fb7108c21/history_${dd}/q/CA/EWR.json Handle connection timeout issues and another relationships in invoke http processor(no retry..etc) RouteOnAttribute:- Add new property as in range
${dd:lt('20171231')} Connect all the relations as shown in the flow screenshot. Let us know if you are having issues..!!
... View more
03-29-2018
02:36 PM
1 Kudo
@Mark McGowan Yes, we can do that for this case we need to fork invokehttp processor response
relation to fork 1 response goes for processing, 2 response relation will goes
to RouteOnattribute processor to check is the date is in range or not. If in
range then we are updating the date and again going to invoke http processor. Loop:- UpdateAttribute processor configs:- dd ${dd:isNull():ifElse('20170101','${dd:toDate("yyyyMMdd"):toNumber():plus(100800000):format("yyyyMMdd")}')} Checking dd attribute value if the value is null
then 20170101 for the first run we are going to have this value. if not null i.e from second run we are adding 100800000
milliseconds(i.e 28 hrs because my server configured with est and we need to
consider day light saving we need to consider). Invoke Http processor configs:- Remote URL http://api.wunderground.com/api/dfe2f11fb7108c21/history_${dd}/q/CA/EWR.json this won’t change because we are modifying dd attribute value
in Update Attribute processor. RouteOnAttribute configs:- In this processor we need to add new property in
range ${dd:lt('20171231')} Checking dd attribute value less than 20171231 if the value
is less than then flowfile will goes to 1.Update Attribute processor and updating the dd value(+1 day) 2.then to invoke http processor fork response 2.1.first-fork goes to Routeonattribute processor 2.2 second-fork goes for processing i have attached my flow xml, use that for reference and make changes as per your use cases.. loop-invokehttp-181963.xm
... View more
03-29-2018
12:32 PM
@Mark McGowan
Use GenerateFlowFile processor with Invoke Http processor to get date dynamically. GenerateflowFile configs:- in this processor add new property as dd ${now():format("yyyyMMdd")} //Gives today's date and added as flowfile attribute with dd as name InvokeHttp Processor:- Change the remote url in Invoke http processor with http://api.wunderground.com/api/dfe2f11fb7108c21/history_${dd}/q/CA/EWR.json Use Response relationship from invoke http processor and we are using dd attribute value in the url request, now you don't need to change the url daily. Schedule the generate flowfile processor to run daily so that invoke http processor will run dynamically. Flow:-
... View more
03-29-2018
12:14 PM
1 Kudo
@Stefan Constantin Could you please change the below properties in Replace text processor and run again Make sure you are using same attribute names(case sensitive) in replace text processor those are matching with evaluatejson path processor. (or) More optimal way of doing this use case is if you are using NiFi 1.2+ then use ConvertRecord processor with Avro Reader as record reader and CSV Set writer as Record setwriter this processor even accepts array of json messages and does conversion to CSV. Flow:- Executesql ConvertRecord PutFile This Link describes how to use Convert Record processor. Let us know if you are having any issues..!!
... View more
03-28-2018
01:22 PM
@swathi thukkaraju
i didn't get the question, but if you are having a file with caret(^) as delimiter then we need to escape that caret with two back slashes as caret(^) is an special character in regex(line start position) Input file:- name^age^state
swathi^23^us
srivani^24^UK
ram^25^London scala> case class schema(name:String,age:Int,brand_code:String)
scala> val rdd = sc.textFile("file://<file-path>/test1.csv")
scala> val rdd1= rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
scala> val df1 = rdd1.map(_.split("\\^")).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF() (or) scala> val df1 = rdd1.map(_.split('^')).map(x=> schema(x(0).toString,x(1).toInt,x(2).toString)).toDF() Output:- scala> df1.show() +-------+---+----------+
| name|age|brand_code|
+-------+---+----------+
| swathi| 23| us|
|srivani| 24| UK|
| ram| 25| London|
+-------+---+----------+ if you are still facing issues then please share your input data, script that you have prepared and the expected output. so that its easily understand the root cause of the issue..!!
... View more
03-28-2018
09:07 AM
1 Kudo
@Vivek Singh it's already randomly generated id in NiFi, UUID Description: Returns a randomly generated UUID. Subject Type: No Subject Arguments: No arguments Return Type: String Examples: ${UUID()} returns a value similar to de305d54-75b4-431b-adb2-eb6b9e546013 for more reference in this link We cannot modify the "uuid" attribute associated with the flowfile using execute script also because it is fixed for a FlowFile, if the key is named "uuid", it will be ignored. more details regarding execute scripts are in this link How ever we can assaign new uuid that has been generated manually to new attribute and the new attribute name is something not "uuid" just for reference sample groovy script to generate UUID def flowFile = session.get()
if(!flowFile) return
def ruuid = UUID.randomUUID().toString()
flowFile = session.putAttribute(flowFile, 'uuid2', ruuid)
session.transfer(flowFile, REL_SUCCESS) in this script we are just adding new attribute called as uuid2 to the flowfile and assigning randomuuid value to it. Output:- For each flowfile we are going to have uuid which is assigned by NiFi and uuid2 assigned by the script. Let us know if you are having any issues..!!
... View more
03-28-2018
08:04 AM
1 Kudo
@Tony van Buuren van Heijst
Enclose your regex in parentheses() in Extract text processor because we need to have at least one capture group in extract text processor. Extract text processor Configs:- plex (.*) . If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
03-28-2018
07:49 AM
@Vivek Singh UUID is associated with each flowfile so you can use expression language like ${UUID()} to get the unique id associated with the flowfile.
... View more