Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11195 | 04-15-2020 05:01 PM | |
| 7093 | 10-15-2019 08:12 PM | |
| 3088 | 10-12-2019 08:29 PM | |
| 11413 | 09-21-2019 10:04 AM | |
| 4304 | 09-19-2019 07:11 AM |
02-04-2018
05:59 AM
1 Kudo
@elango vaithiyanathan
To get non group by columns after grouped dataframe, we need to use one of the aggregate(agg) function(max, min, mean and sum..etc) for all the non group by columns. Example:- val grpbyDF = updatedDf.groupBy("ROW_ID","ODS_WII_VERB").agg(<strong>max</strong>("STG_LOAD_TS"),<strong>min</strong>("non groupby column"),<strong>mean</strong>("non groupby column"),<strong>sum</strong>("non groupby column")) In the above grpbydf we are grouping by ROW_ID,ODS_WII_VERB and all non group by columns are in agg function with one of the function(max, min, mean and sum). Please Refer to below link for more details about groupBy http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy
... View more
02-03-2018
02:47 PM
1 Kudo
@elango vaithiyanathan
Apply Aggregate(agg) function after group by val grpbyDF = updatedDf.groupBy("ROW_ID","ODS_WII_VERB").agg(max("STG_LOAD_TS")) Now the grpbyDF dataframe is going to group by ROW_ID,ODS_WII_VERB and gets max value of STG_LOAD_TS column.
... View more
02-02-2018
10:45 PM
@Biswajit Chakraborty
If you are using GetFTP processor then after pulling files then processor going to add getftp.remote.source attribute to the flowfile, then you can use this flowfile attribute then prepare filename in update attribute processor Add new property in update attribute filename ${filename}_${getftp.remote.source} //add remote source name to the filename as you can change the way of using expression language to change filename as following ${filename:append(${getftp.remote.source})} //result 711866091328995HDF04-1
(or)
${filename}${getftp.remote.source} //result 711866091328995HDF04-1 Example:- if you are having filename value as 711866091328995 and getftp.remote.source value as HDF04-1 then output flowfile from update attribute will have filename as 711866091328995_HDF04-1 //because we are adding remote source value to filename with underscore (or) if you are having issue with the same filenames and they are getting overwritten, The FlowFile will also have an attribute named uuid, By using UUID(which is a unique identifier for this FlowFile) as filename, will keep every filename as unique so that we are not going to have any overwriting issues. Configs:- filename ${uuid}
... View more
02-02-2018
09:54 PM
3 Kudos
@Cesar Rodrigues
Use RouteText Processor (The data is then routed according to these rules, routing each line of the text individually) and then configure the processor as Route text Configs:- Routing Strategy Route to each matching Property Name Matching Strategy Contains
Ignore Leading/Trailing Whitespace true Ignore Case true Grouping Regular Expression
No value set
flowfile1 |1| more text //checks the line having the content and routes to flowfile1 relation flowfile2 |8| more text //checks the line having the content and routes to flowfile1 relation
Input:- Some fixed text |1| more text
Another field |8| more text
Last one |1| more text Output:- Flowfile1:- as we are checking if the contents having |1| more text so the following lines will routes to flowfile1 relationship Some fixed text |1| more text
Last one |1| more text Flowfile2:- lines having |8| more text will route to this relation Another field |8| more text Flow:-
... View more
01-30-2018
01:53 PM
1 Kudo
@Anil Reddy In addition to Matt's answer if you want to know all supported Functions in Expression Language for your version of NiFi then click on Right Up Corner GlobalMenu Button And click on Help 3.Then Click on Expression Language Guide on Right Side will shows up all the functions that are supported in your version of NiFi. If you want to implement ${closed_epoch:format("yyyy", "GMT")} this expression even though it is not supported in your version of NiFi then as a work around you can use plus,minus functions. Assuming your closed_epoch attribute value is 1453843201123 ${closed_epoch:minus(86400000)} //this expression will minus 86,400,000 Milliseconds(i.e 24 hrs) from closed_epoch value. New value for closed_epoch will be 1453756801123. *if you are having DayLightSaving time then you need to change the milliseconds value in minus(or) plus function.
... View more
01-27-2018
08:17 PM
1 Kudo
@Murat Menteşe
Instead of GetHTTP processor use InvokeHTTP processor,InvokeHTTP Processor accepts incoming connections. Use Generate Flowfile Processor and add all your URL's in GenerateFlowfile processors as each line having one URL in it. Generate Flowfile Processor:- Configs:- As shown in the above screenshot add all URL's one per each line in CustomText property and In Scheduling tab change the frequency of run. SplitText:- We are Triggering GenerateFlowfile processor and we are going to have onefile having all url's in it so we are going to split the file into 1 Line as new flowfile now. Configs:- Line Split Count 1 THen connect Splits relation from SplitText processor to ExtractText processor. ExtractText:- We are going to Extract the contents of flowfile and adding as flowfile attribute in this processor then the extracted attribute is going to be used in InvokeHTTP processor. Add new property in this processor by clicking on + sign at right corner URL
(.*) Configs:- ControlRate:- it's an optional processor as we are going to control the rate of flowfiles that are going to InvokeHTTP processor. Rate Control Criteria flowfile count Maximum Rate 1 Rate Controlled Attribute
No value set
Time Duration 1 min so with this configs we are going to process one flowfile per 1 min but you can change all this configs as per your requirements. InvokeHTTP:- we have configured RemoteURL property as ${URL} , this URL attribute we have added in ExtractText Processor. So we don't need to use Seperate GetHTTP processors for each Request by following above approach. Flow:- GenerateFLowFIle //keep all URL's and schedule to run
SplitText //Split into one line
Extract Text //extracting URL from content of ff and adding as attribute
ControlRate(optional) //control the rate of flowfiles passed to next processor
InvokeHTTP //Use the extracted URL attribute and Get Response. I'm attaching my xml here save, use as reference and modify as per your requirements. hcc-168913-gethttp.xml
... View more
01-26-2018
06:08 PM
1 Kudo
@Yogesh Singh The source file that you have share above not having any value as 2018-10-12 ,the NumberFormatException because the date format is not matching with the date format that you have specified(MM/dd/yy) in CsvReader and in CsvRecordSetWriter. I have tried with the source file you have provided and it works as expected Input:- Source_Code,Source_Product_ID,Source_Party_ID,Govt_Issued_Tax_ID,Prefix_Name,First_Name,Middle_Name,Last_Name,Suffix_Name,Full_Legal_Name,NickName,Birth_Date,Birth_Place_Name,Gender,Smoker_Indicator,Risk_Class,Marriage_Status_Code,Agreement_Name,Status_Code,Payment_Method,Agreement_Original_Inception_Date,Payment_Frequency,Payment_Frequency_Desc,Agreement_Covered_Amount,Agreement_Number,Agreement_Party_Role,Effective_Date,Expiration_Date
FAST,1,1,000-23-1017,Mrs,First2310,F,Last31017,Dr,First2310 F Last31017,First2310 F Last31017,8/5/2013,USA,F,Y,P,N,Policy,Open,Cash,8/5/2013,M,Monthly,300000,231017,Insured,8/5/2013,8/6/2013
FAST,1,2,000-26-6065,Mrs,First2660,F,Last66065,Phd,First2660 F Last66065,First2660 F Last66065,8/6/2013,USA,F,Y,P,N,Policy,Open,Card,8/6/2013,M,Monthly,600000,266065,Insured,8/5/2013,8/6/2013
FAST,1,3,000-35-8819,Mrs,First3588,F,Last58819,Dr,First3588 F Last58819,First3588 F Last58819,8/7/2013,USA,F,Y,P,Y,Policy,Open,Check,8/7/2013,M,Monthly,150000,358819,Insured,8/5/2013,8/6/2013
FAST,1,4,000-39-7033,Mr,First3970,M,Last97033,Phd,First3970 M Last97033,First3970 M Last97033,8/8/2013,USA,M,Y,P,N,Policy,Open,Cash,8/8/2013,A,Annual,3000000,397033,Insured,8/5/2013,8/6/2013 Output:- Source_Code,Source_Product_ID,Source_Party_ID,Govt_Issued_Tax_ID,Prefix_Name,First_Name,Middle_Name,Last_Name,Suffix_Name,Full_Legal_Name,NickName,Birth_Date
FAST,1,1,000-23-1017,Mrs,First2310,F,Last31017,Dr,First2310 F Last31017,First2310 F Last31017,8/5/2013
FAST,1,2,000-26-6065,Mrs,First2660,F,Last66065,Phd,First2660 F Last66065,First2660 F Last66065,8/6/2013
FAST,1,3,000-35-8819,Mrs,First3588,F,Last58819,Dr,First3588 F Last58819,First3588 F Last58819,8/7/2013
FAST,1,4,000-39-7033,Mr,First3970,M,Last97033,Phd,First3970 M Last97033,First3970 M Last97033,8/8/2013 Here is the attached xml that having your source data and all the new CsvReader,AvroSchemaRegistry,CsvRecordSetWriter. Use the below xml file and make sure your date formats(or) timestamp formats are matching with the formats that you are mentioning in CsvReader,CsvRecordSetwriter as i mentioned above comment. hcc-convertrecord-support-167066.xml
... View more
01-26-2018
03:02 AM
1 Kudo
@Yogesh Singh
It's better to open a new question for this date type issue for more visible to all community users. Here is what i tried Input:- Timestamp,DATE,Age,Sex,Country,City,PostalCode
2017-01-10 12:34:56.33,2018-10-12,21,M,US,OR,32811
2017-01-10 12:34:56.33,2018-10-13,22,F,US,OR,32746 Output expected:- Timestamp,City,DATE
2017-01-10 12:34:33.33,OR,2018-10-12
2017-01-10 12:34:33.33,OR,2018-10-13 As my Timestamp field format is yyyy-MM-dd HH:mm:ss.ss and DATE field format is yyyy-MM-dd, So in Convert Record Processor i specified these formats as shown below screenshot. In AvroSchemaRegistry i have changed the Schema to {
"namespace": "nifi",
"name": "balances",
"type": "record",
"fields": [
{ "name": "Timestamp", "type": { "type":"long", "logicalType":"timestamp-millis"} },
{ "name": "DATE" , "type": {"type":"int", "logicalType":"date"} },
{ "name": "Age" , "type": ["null", "int"]},
{ "name": "Sex" , "type": ["null", "string"]},
{ "name": "Country" , "type": ["null", "string"]},
{ "name": "City" , "type": ["null", "string"]},
{ "name": "PostalCode" , "type": ["null", "string"]}
]
} As i have defined Timestamp,DATE fileds with logicalTypes In CsvRecordSetWriter controller service also we need to mention Date,Timestampformat as So now CsvRecordSetWriter is going to write Timestamp,Date fields in that formats. Schema Text property is {
"namespace": "nifi",
"name": "balances",
"type": "record",
"fields": [
{ "name": "Timestamp", "type": { "type":"long", "logicalType":"timestamp-millis"} },
{ "name": "City", "type": ["null","string"]},
{ "name": "DATE" , "type": { "type":"int", "logicalType":"date"} }
]
} Once we are setup all this configs correctly then we are going to have output that we have expected Timestamp,City,DATE
2017-01-10 12:34:33.33,OR,2018-10-12
2017-01-10 12:34:33.33,OR,2018-10-13
... View more
01-26-2018
01:36 AM
1 Kudo
@Yogesh Singh Make sure your datatypes for 40 fields are matching in AvroSchemaRegistry and if there are null values for any of the fields then In AvroSchemaRegistry we need to add default value as null. Example:- { "name": "Name", "type": "string"} //with this avro schema for Name field won't allow null values for Name { "name": "Name", "type": ["null", "string"]} //with this avro schema allow null values for Name filed {
"namespace": "nifi",
"name": "balances",
"type": "record",
"fields": [
{ "name": "Name", "type": ["null", "string"]},
{ "name": "Age" , "type": ["null", "int"]},
{ "name": "Sex" , "type": ["null", "string"]},
{ "name": "Country" , "type": ["null", "string"]},
{ "name": "City" , "type": ["null", "string"]},
{ "name": "PostalCode" , "type": ["null", "string"]}
]
} it's recommended to add default values for the fields in Avro Schema. Make sure all the things are setup right, if you are still facing issues then share more info like logs,schema registry and sample input records that would be easy to understand the issue..
... View more
01-25-2018
10:06 PM
4 Kudos
@Yogesh Singh
If you are using NiFi 1.2+ then you can use Three ConverRecord processors in parallel to read your Csv file and create 3 different CsvRecordSetwriter with your required columns in it and processors will give you 3 different files. Flow:- ConvertRecord Processor:- Read the source file that having Name,Age,Sex,Country,City,PostalCode and writes the file with Name, Age, Country I have attached .xml and i have implemented for one convert record processor which is going to read the csv file and result only Name,City columns as an output file. You can save the xml file,upload to your nifi instance and enhance CsvRecordSetwriter as per your requirements. Example:- Input Csv File:- Name,Age,Sex,Country,City,PostalCode
hcc,21,M,US,OR,32811
HDP,22,F,US,OR,32746 Output Csv file:- Name,City
hcc,OR
HDP,OR So in Convert Record i have setup Csv Reader which is going to read the incoming flowfile and CsvRecordSetWriter which is going to output only the required columns that we have mentioned in the controller service. Xml file:- convertrecord.xml ConvertRecord example:- https://community.hortonworks.com/articles/115311/convert-csv-to-json-avro-xml-using-convertrecord-p.html https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi Flow:- (or) If you are using NiFi version prior to <1.2 then you need to use 1.execute stream command processor//Delete the header of the file using tail as command Path and command arguments as -n;+2 ,connect output stream relation to next processor
2.Split Text //to split the content of csv file to one line count, connect splits relation to next processor
3.Extract Text //add new properties and matching regex values that can extract the each value as header,connect success to all three parallel replace text processors.
4.Three Replace Text in parallel //specifying your required attribute names that you want on each file then each processor will result the required file.
5. MergeContent(optional) //to merge the each small flowfile content as one file before storing into directory. Extract text Example:- https://community.hortonworks.com/questions/160868/how-to-extracttext-from-flow-file-using-nifi-proce-1.html Replace text Example:- https://community.hortonworks.com/questions/158910/parsing-json-formatted-csv-file-using-nifi.html MergeContent Example:- https://community.hortonworks.com/questions/149047/nifi-how-to-handle-with-mergecontent-processor.html Let us know if you are having any issues..!!
... View more