Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11248 | 04-15-2020 05:01 PM | |
| 7157 | 10-15-2019 08:12 PM | |
| 3134 | 10-12-2019 08:29 PM | |
| 11574 | 09-21-2019 10:04 AM | |
| 4365 | 09-19-2019 07:11 AM |
07-11-2018
08:39 AM
1 Kudo
@Sam LL You can do this in NiFi by using Record oriented processors (ConvertRecord). Using Convert Record processor: input json: {
NAME: "xxx",
CITY: "yyy",
AGE:"00",
ZIPCODE: "12345",
ADDRESS: " "}
Use ConvertRecord processor with Json Tree reader as controller service and give your matching avro schema and enable controller service. Avro Schema: {
"namespace": "nifi",
"name": "balances",
"type": "record",
"fields": [
{"name": "NAME", "type": ["null", "string"]},
{"name": "CITY", "type": ["null", "string"]},
{"name": "AGE", "type": ["null", "string"]},
{"name": "ZIPCODE", "type": ["null", "string"]},
{"name": "ADDRESS", "type": ["null", "string"]}
]
} By using the configured reader convertRecord processor is able to read your incoming data. To write in json format configure/enable JsonRecordSetWriter controller service and give the matching avro schema(change the order of the fields while writing) AvroSchema: {
"namespace": "nifi",
"name": "balances",
"type": "record",
"fields": [
{"name": "NAME", "type": ["null", "string"]},
{"name": "AGE", "type": ["null", "string"]},
{"name": "ADDRESS", "type": ["null", "string"]},
{"name": "CITY", "type": ["null", "string"]},
{"name": "ZIPCODE", "type": ["null", "string"]}
]
} Now while writing the output data processor writes with the matching avro schema with ordering the fields in same way Output: [{"NAME":"xxx","AGE":"00","ADDRESS":" ","CITY":"yyy","ZIPCODE":"12345"}] Refer to this link for configure/enabling ConvertRecord processor. - If the Answer helped to resolve your issue, Click on Accept button below to accept the answer.
... View more
07-10-2018
01:18 AM
1 Kudo
@Suhas Reddy
Use Replace Text processor to change the contents of flowfile. Search Value ("_id":.*?,).*("eyeColor":.*") Replacement Value $1$2 Character Set UTF-8 Maximum Buffer Size 1 MB //change the value according to flowfile size Replacement Strategy Regex Replace Evaluation Mode Entire text Input: {"_id": "5b42fe8f7f663540330b3bdc","index": 0,"guid": "60358c95-e50c-4f5e-ad48-00d9e1f9a849","isActive": true,"balance": "$2,483.56","picture": "http://placehold.it/32x32","age": 32,"eyeColor": "green"} Output: {"_id": "5b42fe8f7f663540330b3bdc","eyeColor": "green"} If your required output flowfile is {"eyeColor": "green","_id": "5b42fe8f7f663540330b3bdc"} then change the replace text configs to Search Value ("_id":.*?),.*("eyeColor":.*") Replacement Value $2,$1 In addition to achieve the same case by using record oriented processors(Convert Record (or) Update Record) if you know the schema of your json message then use ConvertRecord/UpdateRecord processor to read your incoming data with jsontree reader controller service and configure the only required fields(eyecolor,_id) in JsonRecordSetwriter so that your output flowfile will going to have only the configured fields and this processor works with array of json messages Refer to these links regarding convert record and update record processors. - If the Answer helped to resolve your issue, Click on Accept button below to accept the answer.
... View more
07-09-2018
09:29 AM
@sudhir reddy Try with csv writer as text expects only one column. df.write.mode("overwrite").csv("/user/root/JOUT") (or) Convert the df to rdd then use saveAsTextFile to write the json to hdfs df.rdd.saveAsTextFile("/user/root/JOUT") (or) using .format("json") and save the file to hdfs df.write.format("json").mode("append").save("/user/root/JOUT") Some useful links regarding similar errors link1,link2,link3
... View more
07-09-2018
08:42 AM
@Suhas Reddy You can use either Extract Text processor (or) EvaluateJsonPath processor to extract the values and keep them as attributes. Extract Text Configs: Add new dynamic properties in ExtractText processor eyecolor
eyeColor":\s"(.*)"
id
"_id":\s"(.*?)", EvaluateJsonPath Configs: Add new dynamic properties as eyecolor
$.eyeColor
id
$._id Both of these configs results same output flowfile with eyecolor,id attributes associated with it if you want to keep both attribute values in one attribute then use UpdateAttribute processor and create new attribute like Output flowfile will have attr attribute with both id and eyecolor values with : as seperator
... View more
07-08-2018
08:36 PM
@Ishan Kumar Schema Write Strategy is used to define schema i.e. do we need to add a schema.name attribute (or) Embed Avro Schema(this is newly defined schema in AvroSchemaRegistry in your case ) in a data file (or) etc... Add schema.name attribute to the flowfile that matches the avro schema registry name and convert record processor writes the schema that you have mentioned in the avro schema registry(i.e long type) in the output flowfile from ConvertRecord procesor. AvroSetWriter controller service configs: With these configs you are going to have new avro data file with AvroSchemaRegistry schema embed in it.
... View more
07-08-2018
09:52 AM
@Ishan Kumar In AvroRecordSetWriter controller service you need to select Schema Write Strategy property value Embed Avro Schema So that you are writing the new schema embed to the avro data file, when you use ConvertAVROToOrc processor there will be no issues when the schema was embedded. We are going to get issues java.io.IOException: Not a data file only when the processors are not able to find any schema in the avro data file.
... View more
07-07-2018
12:17 AM
1 Kudo
@Sam LL Search for white space in double quotes and replace that with "N/A" Search Value
"\s"
Replacement Value
"N/A"
Character Set
UTF-8
Maximum Buffer Size
1 MB //needs to change this value according to your flowfile size
Replacement Strategy
Regex Replace
Evaluation Mode
Entire text
... View more
07-05-2018
12:50 PM
2 Kudos
@Gulshan Agivetova We cannot use SplitJson processor for this flowfile content as this is not an valid json format, if you want to use SplitJson Processor then we need to build an valid json array with all json objects in it by using replace text.. etc processor then feed the valid json message to SplitJson Processor. Use Split Content processor to split the flowfile content based on the Byte Sequence Format and value is 7D for } Include the byte sequence and byte sequence location as trailing, by using these configs you are going to have a valid json object in each flowfile. even you can mention byte sequence format in Text also then specify byte sequence as } . In addition if your each json object is in one line then you can use Split Text processor with line split count value as 1 and if you are facing any OOM issues then follow this approach to avoid OOM issues.
... View more
07-05-2018
12:33 PM
@Markus Wilhelm I don't think we can make NiFi to read kerberos configs to read by default but you can make use of Process group variables in your HDFS processor configs and define the variables scope as NiFi Flow so that you can use same variables across all the processors in NiFi instance. You can copy hdfs-site.xml,core-site.xml to nifi lib path and restart nifi, then you don't have to specify the path because nifi will load all the .xml from lib path, but it's not recommended way of approach because if you want to change some configs in either of these two xml files then we need to restart NiFi to take those changes in to effect in NiFi instance. Refer to this link regarding Process Group variables in NiFi and refer to this link regarding copying xml files into nifi lib.
... View more
07-04-2018
10:51 PM
@Derek Calderon
You can achieve this by using these processors Flow: 1.ExecuteSql //output flowfile is in Avro format
2.SplitRecord //Read the incoming Avro datafile then write as Json/CSV ...etc and Records per split as 1(if you are getting OOM issues use series of Split record processors to split flowfile to 1 record each) 3.ExtractText //add new property with (.*) this regex to extract all the contents of flowfile and add to flowfile attributes(change the Maximum Buffer Size according to flowfile size) By using this flow you are having each record as flowfile attributes now. Converting Json records in splitrecord processor would be easier to access the values of keys because once we extract the contents of flowfile in Extracttext then by using jsonPath expression language you can extract the values for the keys easily. Example: i'm having json flowfile with {"id":1} and extracted as content attribute to the flowfile then used Updateattribute processor to create id attribute by using the below expression language. ${content:jsonPath('$.id')} Keep in mind all the attributes associated with the flowfile are stored in memory as if you are adding more attributes to the flowfile results utilizing more memory and it's better to delete all the unused attributes associated to the flowfile to utilize less memory. Please refer to this link how to use series of split processors.
... View more