Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11248 | 04-15-2020 05:01 PM | |
| 7157 | 10-15-2019 08:12 PM | |
| 3134 | 10-12-2019 08:29 PM | |
| 11574 | 09-21-2019 10:04 AM | |
| 4365 | 09-19-2019 07:11 AM |
07-24-2018
10:00 PM
1 Kudo
@Sudheer K That's an expected behavior from Merge Record processor in NiFi < 1.7, all the records after merge record processor will be in an array [{},{},{}] In NiFi-1.7+ you can write one line per object i.e [{}] [{}] [{}] If your desired output is like one line per object then use Merge Content processor with delimiter strategy as text then demarcator as shift+enter, then you will achieve the same output flowfile one line per object. Output: [{}] [{}] [{}]
... View more
07-24-2018
02:29 AM
@Mahendra Hegde
You can use EvaluateJsonPath processor to extract the value from $.tripSummaryMetaData.orgId then keep as attribute to the flowfile then use RouteOnAttribute processor to check the value of the attribute, 1.If empty then feed to doesn't feed the json message to Jolt transform processor. 2.if not empty then feed the relationship to add the default operation. (or) Use Update Record processor and configure Record Reader/Writer controller services(in writer keep supress null values to true) then using RecordPathvalue, check the value exists for the specific record path by using record path guide, then use If else function to add default value. References regarding UpdateRecord and Record Path Guide
... View more
07-24-2018
12:57 AM
@Sudheer K
I'm able to reproduce the same issue in NiFi < 1.7 and it got fixed in NiFi-1.7, refer to this jira link to addressing same issue. in NiFi-1.7 with the same input json will produce output flowfile like below. [{"account":{"id":99999999999}},{"account":{"id":99999999999}},{"account":{"id":99999999999}},{"account":{"id":99999999999}},{"account":{"id":99999999999}},{"account":{"id":99999999999}}] In NiFi-1.6 we cannot read multiple array of json messages and to fix this issue, Method1: if you are having json array in each line then use SplitText processor to split the file into individual flowfiles for each message then feed the splits relationship to UpdateRecord processor. (or) Method2: You can also use SplitContent processor with Byte Sequence Format Text with }] byte sequence, Some reference regarding split content processor,then feed the splits relationship to UpdateRecord processor. in addition once you have split the content then you can feed the splits relation to Merge Record processor to merge these array of json flowfiles to make them as one json message, then we can use Update Record processor to work on array of json messages.
... View more
07-24-2018
12:00 AM
@Sudheer K Make sure your input json messages matches with the schema that you have configured.. i have just tried with the below input json [{"account":{"id":1}},{"account":{"id":2}}] UpdateRecord dynamic property is /account/id
replaceRegex(/account/id, '(^.*$)', '99999999999') Avro schema {
"type" : "record",
"name" : "sch",
"namespace" : "avro",
"fields" : [ {
"name" : "account",
"type" : {
"type" : "record",
"name" : "account",
"fields" : [ {
"name" : "id",
"type" : "long"
} ]
}
} ]
} Output: [{"account":{"id":99999999999}},{"account":{"id":99999999999}}] Worked as expected and replaced both records in the array.
... View more
07-21-2018
10:20 AM
1 Kudo
@Sudheer K Use update record processor configure record reader/writer controller services. Add the sensitive values field names as dynamic properties then replace the field.value with your desired value. For more reference regarding update record refer to this link. If you are using Nifi < 1.2 then use replace text processor add matching regex that extracts the sensitive values then replace them with some constant values.
... View more
07-17-2018
10:49 PM
@Andy Liang ExecuteSql processor will result output flowfile with embedded avro schema in it. So in your PutParquet processor configure the Record Reader as Avro Reader and use the embedded avro schema as Schema access strategy By using Embedded avro schema you don't have to set up any avro schema registry and this processor will work dynamically based on the embedded avro schema.
... View more
07-12-2018
04:55 AM
@Hoa Vuong
Feed the duplicate relation from DetectDuplicate processor to Update attribute processor with nextInt subject less function Add new property as filename
${filename}(${nextInt()}) By using above expression will add nextint to the filename For more reference look into this link regarding nextInt() function usage. (Or) By storing state in UpdateAttribute processor add new property as theCount
${getStateValue("theCount"):plus(1)} Use another update attribute processor to add theCount attribute to filename. refer to this regarding getStateValue funtion usage. add new property as filename ${filename}(${theCount}) By using this approach you can reset your state value to 0 once it reaches to your threshhold value(like if value is 100 then set to 0 again) and refer to this link regarding reset the value.
... View more
07-12-2018
04:27 AM
1 Kudo
@Bruno Gomes de Souza Make use of Record oriented processors to do your split on json array, Try with the below approach Once you feed the success relation to SplitRecord processor then you need to define RecordReader Controller service to read the contents of flowfile and Record Writer as JsonRecordSetWriter. Mention the Records per split property value as 1 and feed only the splits relationship from SplitRecord processor to PublishGCPubsub processor. If you find any OOM issues then it's better to use Series of SplitRecord processors to Make Records Per split to 1 message into each flowfile. Refer to this and this links regarding usage of series of split processors. Refer to this link regarding configuring Record Reader/Writer controller services. -
... View more
07-11-2018
09:15 AM
@Paul Burger Scan Attribute processor matches exactly the content of Dictionary File depending on Match Criteria value i.e as your dictionary file having /.*.tar.gz then if your file is exactly /.*.tar.gz ,it goes to matched relationship. If you want to filter out the filenames that are having .tar.gz then use RouteOnAttribute processor and add new property to check the files ${filename:contains(".tar.gz")} //look for is there .tar.gz in filename value ${filename:substringAfter("."):equals("tar.gz")} //get the filename value after . and match with tar.gz By using either of the above expression languages we are checking for .tar.gz in filename value and the matching flowfiles will routes to the newly added property. if you want to negate to the above expression then use :not() function For more details refer to this link regarding nifi expression language.
... View more
07-11-2018
09:01 AM
@Faisal Durrani Use UpdateRecord processor before PutHBaseRecord Processor and create a new field i.e concatenated with PK's then in PutHBaseRecord processor Record Reader add the newly created field in the Avro Schema so that you can use the concatenated field as row identifier. row_id //newly created field name concat(/pk1,/pk2) //processor gets pk1,pk2 field values from record and concatenates them and keep as row_id. By using UpdateRecord processor we are going to work on chunks of data and very efficient way of updating the contents of flowfile. For more reference regarding update record processor follow this link.
... View more