Member since
06-08-2017
1049
Posts
518
Kudos Received
311
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 12218 | 04-15-2020 05:01 PM | |
| 8140 | 10-15-2019 08:12 PM | |
| 3707 | 10-12-2019 08:29 PM | |
| 13461 | 09-21-2019 10:04 AM | |
| 4953 | 09-19-2019 07:11 AM |
01-16-2019
10:49 PM
@Venkat Use regexp_replace function in hive to replace $ with '' then cast to int. Example: select int(regexp_replace(string("124$"),'\\$',''));
+------+--+
| _c0 |
+------+--+
| 124 |
+------+--+
(or) Starting from Hive-1.3 version use replace function. select int(replace(string("124$"),'$',''));
+------+--+
| _c0 |
+------+--+
| 124 |
+------+--+
... View more
01-16-2019
12:18 AM
1 Kudo
@Deppu If you are able to identify insert/updates then set statement.type attribute to the flowfile then based on the attribute value PutDatabaseRecord processor will run Update/Insert/Delete Statements. - This is kind of hacky way to do this: If you are not able to identify inserts/updates then split the records into each record individually using SplitRecord processor then run two/three successive PutDatabaseRecord processors.
You need to make decision how to identify Deletes/Updates. because if insert fails then you can try with either update statement type (or) delete statement type. Flow: 1. FirstPutDatabaseRecord //insert statementtype use failure connection to next PutDatabaseRecord
2.SecondPutDatabaseRecord //update statementtype,use failure connection to next PutDatabaseRecord.
3.ThirdPutDatabaseRecord //delete statementtype Note: I'm guessing this flow but rearrange these processors as per your logic.
... View more
01-12-2019
06:53 PM
@Yasir Khokhar
If you are using DBCPConnectionPoolLookup controller service for ExecuteSQL processor then each flowfile feeding to ExecuteSQL processor needs to have database.name attribute with the value of DBCPconnectionpool name that is given. - Now based on database.name attribute value ExecuteSQL processor is going to select connectionpool dynamically, so we are going to use one ExecuteSQL processor using multiple DBCPconnectionPool dynamically. - Use UpdateAttribute Processor to set database.name attribute to the flowfile and configure/enable DBCPConnectionPoolLookup service select this service in ExecuteSQL processor.
... View more
01-12-2019
02:58 AM
1 Kudo
@Yasir Khokhar
Starting from NiFi-1.7 version we can dynamically assign connection pooling service by using DBCPConnectionPoolLookup controller service. From documentation: 1.Provides a DBCPService that can be used to dynamically select another DBCPService. This service requires an attribute named 'database.name' to be passed in when asking for a connection, and will throw an exception if the attribute is missing.
2.The value of 'database.name' will be used to select the DBCPService that has been registered with that name.
3.This will allow multiple DBCPServices to be defined and registered, and then selected dynamically at runtime by tagging flow files with the appropriate 'database.name' attribute If you are using NiFi version <1.7.0 then we need to have to define each DBCPConnection controller service for each connection of ExecuteSql processor.
... View more
01-12-2019
02:40 AM
1 Kudo
@Satya G Read the CSV file with header as described here: https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader Once you are able to read the csv file with header then use .select method and select the col's as #pyspark: df= spark.read.csv(<file>).option("header", "true") //read the csv with header
df1=df.select("A","B","C","D") //select the columns in an order
df1.write.mode("<overwrite/append>").saveAsTable("<db_name>.<tab_name>")
... View more
01-06-2019
02:42 PM
@Basil Paul 1.Try with below query: select count(*) from demo where 2gusage is NULL; 2.If literal NULL is in your data for 2gusage column then use the below query: select count(*) from demo where 2gusage = "NULL";
... View more
07-25-2018
01:21 PM
1 Kudo
@Mohammad
Soori
Connect only splits relationship from Split Text to Publish kafka processor Auto terminate original relationship Go to configure --> settings tab--> check the original box and regarding failure relationship connect to putEmail processor to get notified if some thing went wrong. Flow:-
... View more
06-04-2018
02:29 AM
@Raja M List file processor with default configurations already does what you are expecting(lists/sends oldest file first) unless if you configured sucess queue with some kind of prioritizers. ListFile is a stateful processor and creates a flowfile/s that represents the file/s based on the last modified time and default queue prioritizer is OldestFlowfileFirst prioritizer. if you are using default queue configs then the processor will send first the oldest flowfile in the data flow will be processed first. Available NiFi queue prioritizers are as follows:
FirstInFirstOutPrioritizer: Given two FlowFiles, the one that reached the connection first will be processed first. NewestFlowFileFirstPrioritizer: Given two FlowFiles, the one that is newest in the dataflow will be processed first. OldestFlowFileFirstPrioritizer: Given two FlowFiles, the one that is oldest in the dataflow will be processed first. 'This is the default scheme that is used if no prioritizers are selected.' PriorityAttributePrioritizer: Given two FlowFiles that both have a "priority" attribute, the one that has the highest priority value will be processed first. Note that an UpdateAttribute processor should be used to add the "priority" attribute to the FlowFiles before they reach a connection that has this prioritizer set. Values for the "priority" attribute may be alphanumeric, where "a" is a higher priority than "z", and "1" is a higher priority than "9", for example. To get to know about which files are transferred from the list file processor use Control Rate processor with Rate Control Criteria
flowfile count
Maximum Rate
1 Now you can view in the success queue of Control Rate processor which flowfiles are released from the list file processor. - If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
05-14-2018
11:55 AM
8 Kudos
Article
Short Description:
This Tutorial describes how to add partition field to the content of the flowfile, create dynamic partitions based on flowfile content and store data into HDFS directory using NiFi.
PartitionRecord Processor Params:
Record Reader :
Specifies the Controller Service to use for reading incoming data
Record Writer:
Specifies the Controller Service to use for writing out the records.
Dynamic Properties:
We need to add at least one property name and value(recordpath) that will be evaluated on each record of the flowfile.
Each record will be grouped with the other like records in the group.
**Note**
No attribute will be added if the value returned for the RecordPath is null or is not a scalar value (i.e., the value is an Array, Map, or Record).
Flow:
Flow Overview:
1. GenerateFlowfile //to generate csv data 2. UpdateAttribute //add schema name attribute to the flowfile 3. Update Record //add partition field to the flowfile content 4. Partition Record //create partitions based on flowfile content 5. ConvertAvroToORC //convert avro format data into ORC format 6. UpdateAttribute //change the filename to unique id(UUID) 7. PutHDFS //save the data into HDFS based on partition attribute value 8. ReplaceText //replace the content of the flowfile with HiveDDL and MSCK repair table statements 9. PutHiveQL //execute hive statements 10.PutEmail //sends an e-mail to configured recipients for each incoming failure flowfile.
Flow Explanation:
1.GenerateFlowFile Configs:
Configure the processor as shown below
Sample data data.txt
2.UpdateAttribute Configurations:
Configure the processor as shown below
Added schema.name attribute to the flowfile so that we can use this name in our AvroSchemaRegistry and use same AvroSchemaRegistry for Record Reader/Writer controller services.
3.UpdateRecord Configs:
Record Reader:
Our input data is in csv format configure CSV reader controller service as shown below.
As we are not having header so I’m defining the schema in AvroSchemaRegistry. If your incoming data having header then we can treat first line as header property to true and Schema Access Strategy Use String Fields From Header(nifi will take string type for all the header fields) by using these two properties we can generate schema dynamically.
Record Writer:
I’m using AvroRecordSetWriter to convert CSV format data into Avro then use ConvertAvroToORC processor to store orc files into HDFS.
Configure the controller service as shown below
Schema write strategy property value as Embed Avro Schema //we are embedded Avro Schema into the content of the flowfile.
AvroSchemaRegistry:
In this controller we are defining our shared avro schema used by both reader/writer controllers in this flow.
Validate Field Names value as true //if you are having – in the field names then keep this property value to false then controller service won’t validate field names.
Add new property shown below screenshot
Add new property
sch
{
"namespace": "nifi",
"name": "person",
"type": "record",
"fields": [
{ "name": "id", "type": ["null","int"] },
{ "name": "first_name", "type": ["null","string"]},
{ "name": "last_name", "type": ["null","string"]},
{ "name": "email", "type": ["null","string"]},
{ "name": "gender", "type": ["null","string"]},
{ "name": "joined_date", "type": ["null","string"]},
{ "name": "partition_dt", "type": ["null","string"]}
]
}
In this avro schema I’m allowing null values for all the fields, if you don’t want to allow null values for specific fields then keep the type as just string/int without default values.
Then the processor will route the flowfile to failure relationship if the field value having null’s. (or) We can use validate record processor to validate contents of flowfile against given schema.
Enable AvroSchemaRegistry and all the reference controller services
Add new property in update record processor shown below screenshot
/partition_dt
format(toDate( /joined_date, "yyyy-MM-dd HH:mm:ss"),"yyyy-MM-dd")
We are adding partition_dt column to the content of flowfile based on joined_date field value in the format of yyyy-MM-dd.
Please refer to this link for more details regarding RecordPathDomainSpecific Language.
4.PartitionRecord Configs:
Record Reader
Configure/enable AvroReader controller service as shown below
We are using Schema Access Strategy property value as Use Embedded Avro Schema //as the feeding avro file will have schema embedded in it.
Record Writer
Configure/enable the AvroSetWriter controller service as shown below
Add new dynamic property to Partition Record processor
partition_dt
/partition_dt
Now the processor reads the incoming data and partitions the records based on the partition_dt value.
Based on how many number of unique partition values we are having in the flowfile content those many output flowfiles are generated.
Each flowfile will have partition_dt attribute with the value describes which partition this flowfile belongs to.
Output Flowfile attributes:
The processor added partition_dt attribute with value as 2018-05-10 and record.count 357(number of records in this flowfile)
5.ConvertAvroToORC Configs:
Keep the processor configs to default unless if you need to add ORC configuration resources property value.
6.UpdateAttribute Configs:
In this processor we are changing the filename to unique value.
7.PutHDFS Configs:
Configure/enable the processor based on your NiFi instance setup(Kerberos principal/keytab..etc)
Directory
/user/shu/employee/partition_dt=${partition_dt}
In this expression hive table is created at /user/shu/employee location and partition_dt partition column name.
Based on partition_dt attribute value we are creating new directories (or) store files into existing directories dynamically.
Check for newly created sub-directories in employee directory
[bash$] hdfs dfs -ls /user/shu/employee/
Found 3 items
drwxr-xr-x - nifi hdfs 0 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-09
drwxr-xr-x - nifi hdfs 0 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-10
drwxr-xr-x - nifi hdfs 0 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-11
Check newly created files in each sub-directories of employee directory
[bash$] hdfs dfs -ls -h -r /user/shu/employee/*
Found 1 items
-rw-r--r-- 3 nifi hdfs 18.7 K 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-09/da5c645a-36d4-415e-9920-467314b31900.orc
Found 1 items
-rw-r--r-- 3 nifi hdfs 21.3 K 2018-05-13 10:16 /user/shu/employee/partition_dt=2018-05-10/121db330-7a61-4b8e-a1ee-5cb1692c6c22.orc
Found 1 items
-rw-r--r-- 3 nifi hdfs 19.8 K 2018-05-13 10:14 /user/shu/employee/partition_dt=2018-05-11/b3e15fec-b78c-4f0b-ab0b-3fa89d057a9b.orc
8.ReplaceText Configs:
Configure the processor as shown below.
In this processor we are having create table if not exist statement as we need to create partitioned table so we are not using hive.ddl attribute that has been added from ConvertAvroToORC processor.
Each time when we add new file into HDFS directory hive doesn’t know about newly added files, we need to run msck repair table (or) alter table add partition(<partition-name>) location <location name> statements to update the newly added information in Hive metastore.
9.PutHiveQL Configs:
Configure/enable Hive connection pool
Based on the contents of flowfile this processor executes those statements when we are having more than one statement as content of flowfile then processor separates each statement based on Statement Delimiter value.
Rollback On Failure
True //failed flowfiles will be stayed in incoming connection without penalizing and try to reprocess repeatedly until processed successfully.It is important to set adequate 'Yield Duration' to avoid retrying too frequently.
False(default) //based on the error type flowfiles will be routed to either failure (or) retry relationship and processor can continue with next flowfile.
Based on number of records you are getting in each flowfile we need to use SplitRecord processor to split millions of records into smaller chunks to avoid out of memory issues.
If we are ending up small size files in partitions then we need to run compaction on the small files, copy the specific partition where there are lot of small files to temp table, make sure while compacting specific partition we are not writing any data to be written to this partition/directory in HDFS and overwrite the same partition with less number of files.
Check Hive table about partitions and data
hive> show partitions default.employee;
+--------------------------+--+
| partition |
+--------------------------+--+
| partition_dt=2018-05-09 |
| partition_dt=2018-05-10 |
| partition_dt=2018-05-11 |
+--------------------------+--+
3 rows selected (0.488 seconds)
hive> select * from default.employee limit 10;
+-----+-------------+--------------+-------------------------------+---------+----------------------+---------------+--+
| id | first_name | last_name | email | gender | joined_date | partition_dt |
+-----+-------------+--------------+-------------------------------+---------+----------------------+---------------+--+
| 1 | Cordy | Dupxx | cdupey0@example.com | Male | 2018-05-09 12:56:15 | 2018-05-09 |
| 2 | Trumann | Hindxxxx | thindmoor1@example.com | Male | 2018-05-09 06:45:17 | 2018-05-09 |
| 7 | Stu | Manxxm | smanus6@example.net | Male | 2018-05-09 08:54:13 | 2018-05-09 |
| 8 | Petey | Petxxx | ppetofi7@example.edu | Male | 2018-05-09 02:04:42 | 2018-05-09 |
| 11 | Marilyn | Brodxxx | mbrodeaua@example.com | Female | 2018-05-09 21:47:17 | 2018-05-09 |
| 14 | Gibby | Branxxxx | gbrandinod@example.com | Male | 2018-05-09 10:52:04 | 2018-05-09 |
| 16 | Friederike | Schwxxxx | fschwartzf@example.gov | Female | 2018-05-09 02:42:17 | 2018-05-09 |
| 19 | Patrizius | Hallexxxxx | phalleybonei@example.net | Male | 2018-05-09 19:16:25 | 2018-05-09 |
| 20 | Chic | Ecclesxxxx | ceccleshallj@example.com | Male | 2018-05-09 16:39:13 | 2018-05-09 |
| 25 | Annabelle | Dwerryhxxxx | adwerryhouseo@example.com. | Female | 2018-05-09 16:20:31 | 2018-05-09 |
+-----+-------------+--------------+-------------------------------+---------+----------------------+---------------+--+
Now we have converted CSV data to ORC format, dynamically partitioned the records, stored to HDFS in NiFi and read data from Hive table.
All the failure relationships are feeded to funnel then to PutEmail processor to get notified to the configured recipients for each incoming failure flowfile.
Please refer to this and this links for more details regarding UpdateRecord Processor.
Please refer this link for NiFi Expression Language.
Reference flow.xml
create-dynamic-partitions-in-nifi.xml
... View more
Labels:
05-07-2018
03:08 AM
12 Kudos
Short Description: This Tutorial describes how to add fields,removing not required fields and change values of fields in flowfile. Article Introduction Using UpdateRecord processor we are can update the contents
of flowfile. Overview of article: Below sections describes the changes that are going to happen to the input flowfile content vs output flowfile contents. Input: input Json record as follows and having 6 fields/elements in it. [ {
"id" : 1,
"name" : "foo",
"age" : 20,
"state" : "FLORIDA",
"ts_milli" : 1525526792098,
"ts" : "2018-05-03 10:10:10.123"
}]
Expected Output: I'm writing output in Avro Format but for viewing purpose i have converted the output flowfile to json. [ {
"id" : "1",
"rename_id" : "1", //newly added field based on id
"state" : "florida", //changed the value of field
"rename_state" : "FLORIDA", //newly added field based on state
"unique_id" : "1-416425265990923-FLORIDA-aab46988-6a27-4008-9a4d-65655abe9c3c",//id-filename-state-UUID
"ts_milli" : 1525526792098,
"date" : "2018-05-05", //newly added field formatted as date based on ts_milli
"ts" : "2018-05-03 10:10:10.123",
"ts_tz" : "2018-05-03T10:10:10Z",//newly added field formatted as date based on ts
"current_ts" : "2018-05-05 14:23:15", //current timestamp value
"updated_by" : "NiFi", //newly added field based on user attribute value
"gmt_time" : "2018-05-05 18:23:18.611" //newly added field gmt time.
}]
We are missing name,age fields because those fields are not required in output contents. UpdateRecord Processor Params: Record Reader Specifies the Controller Service to use for reading incoming data Record Writer Specifies the Controller Service to use for writing out the records. Replacement Value Strategy Specifies how to interpret the configured replacement values 1.Literal Value The value entered for a Property (after Expression Language has been evaluated) is the desired value to update the Record Fields with. Expression Language may reference variables 'field.name', 'field.type', and 'field.value' to access information about the field and the value of the field being evaluated. 2.Record Path Value The value entered for a Property (after Expression Language has been evaluated) is not the literal value to use but rather is a Record Path that should be evaluated against the Record, and the result of the RecordPath will be used to update the Record. if this option is selected, and the Record Path results in multiple values for a given Record, the input FlowFile will be routed to the 'failure' Relationship. **Note** This Processor requires that at least one user-defined Property be added. Please refer to this link for more details regarding UpdateRecord Processor. Flow: GenerateFlowFile Configs: Configure
the processor as shown below Add new properties as schema.name sch user
NiFi
UpdateRecord Strategy as Record Path Configs:- As Replacement Value Strategy Record Path Value Record Reader: JsonTreeReader
Configure the controller service as shown below AvroSchemaRegistry Configs: Add new property sch
{
"namespace": "nifi",
"name": "person",
"type": "record",
"fields": [
{ "name": "id", "type": ["null","int"] },
{ "name": "name", "type": ["null","string"] },
{ "name": "age", "type": ["null","int"] },
{ "name": "state", "type": ["null","string"] },
{ "name": "ts_milli", "type" : ["null","long"] },
{ "name": "ts", "type": ["null","string"] }
]
}
By using above schema we are reading the incoming json message (or) you can even specify only the required fields in this schema then processor will read only those fields and all the names are case sensitive. Record Writer: I'm using AvroRecordSetWriter i.e we are reading input in json format and writing as avro format. Configure the Controller service as shown below. Schema Text: {
"namespace": "nifi",
"name": "person",
"type": "record",
"fields": [
{ "name": "id", "type": ["null","string"] },
{ "name": "rename_id", "type": ["null","string"] },
{ "name": "state", "type": ["null","string"] },
{ "name": "rename_state", "type": ["null","string"] },
{ "name": "unique_id", "type": ["null","string"] },
{ "name": "ts_milli", "type" : {
"type" : "long",
"logicalType" : "timestamp-millis"
}
},
{ "name": "date", "type": ["null","string"] },
{ "name": "ts", "type": ["null","string"] },
{ "name": "ts_tz", "type": ["null","string"] },
{ "name": "current_ts", "type": ["null","string"] },
{ "name": "gmt_time", "type": ["null","string"] },
{ "name": "updated_by", "type": ["null","string"] }
]
}
As you can see there are bunch of new fields are added in the above avro schema. Now we are going to add user-defined properties in update record processor 1./current_ts
replaceRegex( /id, '(^.*$)', '${now():format("yyyy-MM-dd HH:mm:ss")}')
To get current_ts field value we are using replaceRegex function of RecordPathDomainSpecific language and replacing the id value with current timestamp with the specific format 2./date format(/ts_milli,"yyyy-MM-dd") Based on ts_milli record path value and changing the format to get only the date from epoch time in milliseconds 3./rename_id
/id
Assigning the id value to new rename_id field we are going to have same id value for both fields 4./rename_state
/state Assigning the state value to new rename_state field we are going to have same state value for both fields because in our output avro schema includes both fields. 5./ts_tz
format(toDate( /ts, "yyyy-MM-dd HH:mm:ss.SSS"),"yyyy-MM-dd'T'HH:mm:ss'Z'") Changing the format of the ts field value and adding T and Z and assigning the new value to ts_tz field. 6./unique_id
concat(/id,'-','${filename}','-',/state,'-','${UUID()}')
In this field value we are concatenating id,filename,state,UUID() values with - to make the value unique and assigning the value to unique_id field. Our output flowfile will be in avro format to view the flowfile content we are using ConvertAvroToJSON processor to view each processor output. Output: [ {
"id" : "1",
"rename_id" : "1",
"state" : "FLORIDA",
"rename_state" : "FLORIDA",
"unique_id" : "1-423955681497409-FLORIDA-3d71d47d-da7a-44bb-b506-9f7a027933e2",
"ts_milli" : 1525526792098,
"date" : "2018-05-05",
"ts" : "2018-05-03 10:10:10.123",
"ts_tz" : "2018-05-03T10:10:10Z",
"current_ts" : "2018-05-05 16:28:39",
"gmt_time" : null, //this field is going to populated in next UpdateRecord processor
"updated_by" : null //this field is going to populated in next UpdateRecord processor
}] Update Record Strategy as Literal value configs:- Record Reader as Avro reader as we are having feeding avro file with embedded schema so we are keeping Schema Access Strategy as Embedded Avro Schema AvroSet writer configs: Schema Text: {
"namespace": "nifi",
"name": "person",
"type": "record",
"fields": [
{ "name": "id", "type": ["null","string"] },
{ "name": "rename_id", "type": ["null","string"] },
{ "name": "state", "type": ["null","string"] },
{ "name": "rename_state", "type": ["null","string"] },
{ "name": "unique_id", "type": ["null","string"] },
{ "name": "ts_milli", "type" : {
"type" : "long",
"logicalType" : "timestamp-millis"
}
},
{ "name": "date", "type": ["null","string"] },
{ "name": "ts", "type": ["null","string"] },
{ "name": "ts_tz", "type": ["null","string"] },
{ "name": "current_ts", "type": ["null","string"] },
{ "name": "gmt_time", "type": ["null","string"] },
{ "name": "updated_by", "type": ["null","string"] }
]
}
Add new user-defined properties as 1./gmt_time
${now():format("yyyy-MM-dd HH:mm:ss.SSS","GMT")} We are adding gmt time by using NiFi expression language 2./state
${field.value:isEmpty():not():ifElse('${field.value:toLower()}','${literal("empty")}')} in this value we are checking state field value and if it's not empty then changing the value to lower case,if the value is null or empty string or blank then keeping the value as literal "empty" 3./updated_by
${user}
We have added user attribute in GenerateFlowfile processor and now we assigning the user attribute value to update_by field value. Final Output: [ {
"id" : "1",
"rename_id" : "1",
"state" : "florida",
"rename_state" : "FLORIDA",
"unique_id" : "1-425389131155321-FLORIDA-c81faa7a-4f2e-414e-bff5-d8722485921c",
"ts_milli" : 1525526792098,
"date" : "2018-05-05",
"ts" : "2018-05-03 10:10:10.123",
"ts_tz" : "2018-05-03T10:10:10Z",
"current_ts" : "2018-05-05 16:52:32",
"gmt_time" : "2018-05-05 20:52:36.088",
"updated_by" : "NiFi"
}] As i have used output format as Avro but we can use same avro schema for other formats(json,csv..). Now we have prepared each record by adding new fields to it and removing unnecessary fields from the record, then Use success relation for further processing. Please refer to this link for more details regarding RecordPathDomainSpecific Language. Please refer this link for NiFi Expression Language. Reference flow.xml update-contents-using-updaterecord.xml
... View more
Labels:
- « Previous
- Next »